7c856ad9b2375bffe080a54f6c6326d868d3ed30
[ep-engine.git] / src / couch-kvstore / couch-kvstore.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #ifdef _MSC_VER
21 #define PATH_MAX MAX_PATH
22 #endif
23
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/stat.h>
28 #include <sys/types.h>
29
30 #include <algorithm>
31 #include <cctype>
32 #include <cstdlib>
33 #include <fstream>
34 #include <iostream>
35 #include <list>
36 #include <map>
37 #include <string>
38 #include <utility>
39 #include <vector>
40 #include <platform/dirutils.h>
41
42 #include "couch-kvstore/couch-kvstore.h"
43 #define STATWRITER_NAMESPACE couchstore_engine
44 #include "statwriter.h"
45 #undef STATWRITER_NAMESPACE
46
47 #include <JSON_checker.h>
48 #include <snappy-c.h>
49
50 using namespace CouchbaseDirectoryUtilities;
51
52 static const int MAX_OPEN_DB_RETRY = 10;
53
54 static const uint32_t DEFAULT_META_LEN = 16;
55
56 extern "C" {
57     static int recordDbDumpC(Db *db, DocInfo *docinfo, void *ctx)
58     {
59         return CouchKVStore::recordDbDump(db, docinfo, ctx);
60     }
61 }
62
63 extern "C" {
64     static int getMultiCbC(Db *db, DocInfo *docinfo, void *ctx)
65     {
66         return CouchKVStore::getMultiCb(db, docinfo, ctx);
67     }
68 }
69
70 static std::string getStrError(Db *db) {
71     const size_t max_msg_len = 256;
72     char msg[max_msg_len];
73     couchstore_last_os_error(db, msg, max_msg_len);
74     std::string errorStr(msg);
75     return errorStr;
76 }
77
78 static std::string getSystemStrerror(void) {
79     std::stringstream ss;
80 #ifdef WIN32
81     char* win_msg = NULL;
82     DWORD err = GetLastError();
83     FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
84                    FORMAT_MESSAGE_FROM_SYSTEM |
85                    FORMAT_MESSAGE_IGNORE_INSERTS,
86                    NULL, err,
87                    MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
88                    (LPTSTR) &win_msg,
89                    0, NULL);
90     ss << "errno = " << err << ": '" << win_msg << "'";
91     LocalFree(win_msg);
92 #else
93     ss << "errno = " << errno << ": '" << strerror(errno) << "'";
94 #endif
95
96     return ss.str();
97 }
98
99 static uint8_t determine_datatype(const unsigned char* value,
100                                   size_t length) {
101     if (checkUTF8JSON(value, length)) {
102         return PROTOCOL_BINARY_DATATYPE_JSON;
103     } else {
104         return PROTOCOL_BINARY_RAW_BYTES;
105     }
106 }
107
108 static bool endWithCompact(const std::string &filename) {
109     size_t pos = filename.find(".compact");
110     if (pos == std::string::npos ||
111                         (filename.size() - sizeof(".compact")) != pos) {
112         return false;
113     }
114     return true;
115 }
116
117 static void discoverDbFiles(const std::string &dir,
118                             std::vector<std::string> &v) {
119     std::vector<std::string> files = findFilesContaining(dir, ".couch");
120     std::vector<std::string>::iterator ii;
121     for (ii = files.begin(); ii != files.end(); ++ii) {
122         if (!endWithCompact(*ii)) {
123             v.push_back(*ii);
124         }
125     }
126 }
127
128 static int getMutationStatus(couchstore_error_t errCode) {
129     switch (errCode) {
130     case COUCHSTORE_SUCCESS:
131         return MUTATION_SUCCESS;
132     case COUCHSTORE_ERROR_NO_HEADER:
133     case COUCHSTORE_ERROR_NO_SUCH_FILE:
134     case COUCHSTORE_ERROR_DOC_NOT_FOUND:
135         // this return causes ep engine to drop the failed flush
136         // of an item since it does not know about the itme any longer
137         return DOC_NOT_FOUND;
138     default:
139         // this return causes ep engine to keep requeuing the failed
140         // flush of an item
141         return MUTATION_FAILED;
142     }
143 }
144
145 static bool allDigit(std::string &input) {
146     size_t numchar = input.length();
147     for(size_t i = 0; i < numchar; ++i) {
148         if (!isdigit(input[i])) {
149             return false;
150         }
151     }
152     return true;
153 }
154
155 static std::string couchkvstore_strerrno(Db *db, couchstore_error_t err) {
156     return (err == COUCHSTORE_ERROR_OPEN_FILE ||
157             err == COUCHSTORE_ERROR_READ ||
158             err == COUCHSTORE_ERROR_WRITE) ? getStrError(db) : "none";
159 }
160
161 struct GetMultiCbCtx {
162     GetMultiCbCtx(CouchKVStore &c, uint16_t v, vb_bgfetch_queue_t &f) :
163         cks(c), vbId(v), fetches(f) {}
164
165     CouchKVStore &cks;
166     uint16_t vbId;
167     vb_bgfetch_queue_t &fetches;
168 };
169
170 struct StatResponseCtx {
171 public:
172     StatResponseCtx(std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &sm,
173                     uint16_t vb) : statMap(sm), vbId(vb) {
174         /* EMPTY */
175     }
176
177     std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &statMap;
178     uint16_t vbId;
179 };
180
181 struct AllKeysCtx {
182     AllKeysCtx(shared_ptr<Callback<uint16_t&, char*&> > callback, uint32_t cnt)
183         : cb(callback), count(cnt) { }
184
185     shared_ptr<Callback<uint16_t&, char*&> > cb;
186     uint32_t count;
187 };
188
189 CouchRequest::CouchRequest(const Item &it, uint64_t rev,
190                            MutationRequestCallback &cb, bool del)
191     : IORequest(it.getVBucketId(), cb, del, it.getKey()), value(it.getValue()),
192       fileRevNum(rev)
193 {
194     uint64_t cas = htonll(it.getCas());
195     uint32_t flags = it.getFlags();
196     uint32_t vlen = it.getNBytes();
197     uint32_t exptime = it.getExptime();
198     uint8_t confresmode = static_cast<uint8_t>(it.getConflictResMode());
199
200     // Datatype used to determine whether document requires compression or not
201     uint8_t datatype;
202
203     // Save time of deletion in expiry time field of deleted item's metadata.
204     if (del) {
205         exptime = ep_real_time();
206     }
207     exptime = htonl(exptime);
208
209     dbDoc.id.buf = const_cast<char *>(key.c_str());
210     dbDoc.id.size = it.getNKey();
211     if (vlen) {
212         dbDoc.data.buf = const_cast<char *>(value->getData());
213         dbDoc.data.size = vlen;
214         datatype = it.getDataType();
215     } else {
216         dbDoc.data.buf = NULL;
217         dbDoc.data.size = 0;
218         datatype = 0x00;
219     }
220
221     memset(meta, 0, sizeof(meta));
222     memcpy(meta, &cas, 8);
223     memcpy(meta + 8, &exptime, 4);
224     memcpy(meta + 12, &flags, 4);
225     *(meta + DEFAULT_META_LEN) = FLEX_META_CODE;
226
227     //For a deleted item, there is no extended meta data available
228     //as part of the item object, hence by default populate the
229     //data type to PROTOCOL_BINARY_RAW_BYTES
230     if (del) {
231         uint8_t del_datatype = PROTOCOL_BINARY_RAW_BYTES;
232         memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
233                &del_datatype, sizeof(uint8_t));
234     } else {
235         memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET, it.getExtMeta(),
236                it.getExtMetaLen());
237     }
238     memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET + EXT_META_LEN,
239            &confresmode, CONFLICT_RES_META_LEN);
240
241     dbDocInfo.db_seq = it.getBySeqno();
242     dbDocInfo.rev_meta.buf = reinterpret_cast<char *>(meta);
243     dbDocInfo.rev_meta.size = COUCHSTORE_METADATA_SIZE;
244     dbDocInfo.rev_seq = it.getRevSeqno();
245     dbDocInfo.size = dbDoc.data.size;
246     if (del) {
247         dbDocInfo.deleted =  1;
248     } else {
249         dbDocInfo.deleted = 0;
250     }
251     dbDocInfo.id = dbDoc.id;
252     dbDocInfo.content_meta = (datatype == PROTOCOL_BINARY_DATATYPE_JSON) ?
253                                     COUCH_DOC_IS_JSON : COUCH_DOC_NON_JSON_MODE;
254
255     //Compress only those documents that aren't already compressed.
256     if (dbDoc.data.size > 0 && !deleteItem) {
257         if (datatype == PROTOCOL_BINARY_RAW_BYTES ||
258                 datatype == PROTOCOL_BINARY_DATATYPE_JSON) {
259             dbDocInfo.content_meta |= COUCH_DOC_IS_COMPRESSED;
260         }
261     }
262 }
263
264 CouchKVStore::CouchKVStore(KVStoreConfig &config, bool read_only) :
265     KVStore(config, read_only), dbname(config.getDBName()),
266     intransaction(false), backfillCounter(0)
267 {
268     createDataDir(dbname);
269     statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
270
271     // init db file map with default revision number, 1
272     numDbFiles = configuration.getMaxVBuckets();
273     cachedVBStates.reserve(numDbFiles);
274
275     // pre-allocate lookup maps (vectors) given we have a relatively
276     // small, fixed number of vBuckets.
277     dbFileRevMap.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(1));
278     cachedDocCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(-1));
279     cachedDeleteCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(-1));
280     cachedVBStates.assign(numDbFiles, nullptr);
281
282     initialize();
283 }
284
285 CouchKVStore::CouchKVStore(const CouchKVStore &copyFrom) :
286     KVStore(copyFrom), dbname(copyFrom.dbname),
287     dbFileRevMap(copyFrom.dbFileRevMap), numDbFiles(copyFrom.numDbFiles),
288     intransaction(false)
289 {
290     createDataDir(dbname);
291     statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
292 }
293
294 void CouchKVStore::initialize() {
295     std::vector<uint16_t> vbids;
296     std::vector<std::string> files;
297     discoverDbFiles(dbname, files);
298     populateFileNameMap(files, &vbids);
299
300     Db *db = NULL;
301     couchstore_error_t errorCode;
302
303     std::vector<uint16_t>::iterator itr = vbids.begin();
304     for (; itr != vbids.end(); ++itr) {
305         uint16_t id = *itr;
306         uint64_t rev = dbFileRevMap[id];
307
308         errorCode = openDB(id, rev, &db, COUCHSTORE_OPEN_FLAG_RDONLY);
309         if (errorCode == COUCHSTORE_SUCCESS) {
310             readVBState(db, id);
311             /* update stat */
312             ++st.numLoadedVb;
313             closeDatabaseHandle(db);
314         } else {
315             LOG(EXTENSION_LOG_WARNING, "Failed to open database file "
316                 "%s/%" PRIu16 ".couch.%" PRIu64, dbname.c_str(), id, rev);
317             remVBucketFromDbFileMap(id);
318             cachedVBStates[id] = NULL;
319         }
320
321         db = NULL;
322         if (!isReadOnly()) {
323             removeCompactFile(dbname, id, rev);
324         }
325     }
326 }
327
328 CouchKVStore::~CouchKVStore() {
329     close();
330
331     for (std::vector<vbucket_state *>::iterator it = cachedVBStates.begin();
332          it != cachedVBStates.end(); it++) {
333         vbucket_state *vbstate = *it;
334         if (vbstate) {
335             delete vbstate;
336             *it = NULL;
337         }
338     }
339 }
340 void CouchKVStore::reset(uint16_t vbucketId) {
341     if (isReadOnly()) {
342         throw std::logic_error("CouchKVStore::reset: Not valid on a read-only "
343                         "object.");
344     }
345
346     vbucket_state *state = cachedVBStates[vbucketId];
347     if (state) {
348         state->reset();
349
350         cachedDocCount[vbucketId] = 0;
351         cachedDeleteCount[vbucketId] = 0;
352
353         //Unlink the couchstore file upon reset
354         unlinkCouchFile(vbucketId, dbFileRevMap[vbucketId]);
355         setVBucketState(vbucketId, *state, NULL, true);
356         updateDbFileMap(vbucketId, 1);
357     } else {
358         throw std::invalid_argument("CouchKVStore::reset: No entry in cached "
359                         "states for vbucket " + std::to_string(vbucketId));
360     }
361 }
362
363 void CouchKVStore::set(const Item &itm, Callback<mutation_result> &cb) {
364     if (isReadOnly()) {
365         throw std::logic_error("CouchKVStore::set: Not valid on a read-only "
366                         "object.");
367     }
368     if (!intransaction) {
369         throw std::invalid_argument("CouchKVStore::set: intransaction must be "
370                         "true to perform a set operation.");
371     }
372
373     bool deleteItem = false;
374     MutationRequestCallback requestcb;
375     uint64_t fileRev = dbFileRevMap[itm.getVBucketId()];
376
377     // each req will be de-allocated after commit
378     requestcb.setCb = &cb;
379     CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, deleteItem);
380     pendingReqsQ.push_back(req);
381 }
382
383 void CouchKVStore::get(const std::string &key, uint16_t vb,
384                        Callback<GetValue> &cb, bool fetchDelete) {
385     Db *db = NULL;
386     GetValue rv;
387     uint64_t fileRev = dbFileRevMap[vb];
388
389     couchstore_error_t errCode = openDB(vb, fileRev, &db,
390                                         COUCHSTORE_OPEN_FLAG_RDONLY);
391     if (errCode != COUCHSTORE_SUCCESS) {
392         ++st.numGetFailure;
393         LOG(EXTENSION_LOG_WARNING,
394             "Failed to open database to retrieve data "
395             "from vBucketId = %d, key = %s\n",
396             vb, key.c_str());
397         rv.setStatus(couchErr2EngineErr(errCode));
398         cb.callback(rv);
399         return;
400     }
401
402     getWithHeader(db, key, vb, cb, fetchDelete);
403     closeDatabaseHandle(db);
404 }
405
406 void CouchKVStore::getWithHeader(void *dbHandle, const std::string &key,
407                                  uint16_t vb, Callback<GetValue> &cb,
408                                  bool fetchDelete) {
409
410     Db *db = (Db *)dbHandle;
411     hrtime_t start = gethrtime();
412     RememberingCallback<GetValue> *rc = dynamic_cast<RememberingCallback<GetValue> *>(&cb);
413     bool getMetaOnly = rc && rc->val.isPartial();
414     DocInfo *docInfo = NULL;
415     sized_buf id;
416     GetValue rv;
417
418     id.size = key.size();
419     id.buf = const_cast<char *>(key.c_str());
420
421     couchstore_error_t errCode = couchstore_docinfo_by_id(db, (uint8_t *)id.buf,
422                                                           id.size, &docInfo);
423     if (errCode != COUCHSTORE_SUCCESS) {
424         if (!getMetaOnly) {
425             // log error only if this is non-xdcr case
426             LOG(EXTENSION_LOG_WARNING,
427                 "Failed to retrieve doc info from "
428                 "database, vbucketId=%d, key=%s error=%s [%s]\n",
429                 vb, id.buf, couchstore_strerror(errCode),
430                 couchkvstore_strerrno(db, errCode).c_str());
431         }
432     } else {
433         if (docInfo == nullptr) {
434             throw std::logic_error("CouchKVStore::getWithHeader: "
435                     "couchstore_docinfo_by_id returned success but docInfo "
436                     "is NULL");
437         }
438         errCode = fetchDoc(db, docInfo, rv, vb, getMetaOnly, fetchDelete);
439         if (errCode != COUCHSTORE_SUCCESS) {
440             LOG(EXTENSION_LOG_WARNING,
441                 "Failed to retrieve key value from "
442                 "database, vbucketId=%d key=%s error=%s [%s] "
443                 "deleted=%s", vb, id.buf,
444                 couchstore_strerror(errCode),
445                 couchkvstore_strerrno(db, errCode).c_str(),
446                 docInfo->deleted ? "yes" : "no");
447         }
448
449         // record stats
450         st.readTimeHisto.add((gethrtime() - start) / 1000);
451         if (errCode == COUCHSTORE_SUCCESS) {
452             st.readSizeHisto.add(key.length() + rv.getValue()->getNBytes());
453         }
454     }
455
456     if(errCode != COUCHSTORE_SUCCESS) {
457         ++st.numGetFailure;
458     }
459
460     couchstore_free_docinfo(docInfo);
461     rv.setStatus(couchErr2EngineErr(errCode));
462     cb.callback(rv);
463 }
464
465 void CouchKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t &itms) {
466     int numItems = itms.size();
467     uint64_t fileRev = dbFileRevMap[vb];
468
469     Db *db = NULL;
470     couchstore_error_t errCode = openDB(vb, fileRev, &db,
471                                         COUCHSTORE_OPEN_FLAG_RDONLY);
472     if (errCode != COUCHSTORE_SUCCESS) {
473         LOG(EXTENSION_LOG_WARNING,
474             "Failed to open database for data fetch, "
475             "vBucketId = %d numDocs = %d\n",
476             vb, numItems);
477         st.numGetFailure.fetch_add(numItems);
478         vb_bgfetch_queue_t::iterator itr = itms.begin();
479         for (; itr != itms.end(); ++itr) {
480             vb_bgfetch_item_ctx_t &bg_itm_ctx = (*itr).second;
481             std::list<VBucketBGFetchItem *> &fetches = bg_itm_ctx.bgfetched_list;
482             std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
483             for (; fitr != fetches.end(); ++fitr) {
484                 (*fitr)->value.setStatus(ENGINE_NOT_MY_VBUCKET);
485             }
486         }
487         return;
488     }
489
490     size_t idx = 0;
491     sized_buf *ids = new sized_buf[itms.size()];
492     vb_bgfetch_queue_t::iterator itr = itms.begin();
493     for (; itr != itms.end(); ++itr) {
494         ids[idx].size = itr->first.size();
495         ids[idx].buf = const_cast<char *>(itr->first.c_str());
496         ++idx;
497     }
498
499     GetMultiCbCtx ctx(*this, vb, itms);
500
501     errCode = couchstore_docinfos_by_id(db, ids, itms.size(),
502                                         0, getMultiCbC, &ctx);
503     if (errCode != COUCHSTORE_SUCCESS) {
504         st.numGetFailure.fetch_add(numItems);
505         for (itr = itms.begin(); itr != itms.end(); ++itr) {
506             LOG(EXTENSION_LOG_WARNING, "Failed to read database by"
507                 " vBucketId = %d key = %s error = %s [%s]\n",
508                 vb, (*itr).first.c_str(),
509                 couchstore_strerror(errCode),
510                 couchkvstore_strerrno(db, errCode).c_str());
511             vb_bgfetch_item_ctx_t &bg_itm_ctx = (*itr).second;
512             std::list<VBucketBGFetchItem *> &fetches = bg_itm_ctx.bgfetched_list;
513             std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
514             for (; fitr != fetches.end(); ++fitr) {
515                 (*fitr)->value.setStatus(couchErr2EngineErr(errCode));
516             }
517         }
518     }
519     closeDatabaseHandle(db);
520     delete []ids;
521 }
522
523 void CouchKVStore::del(const Item &itm,
524                        Callback<int> &cb) {
525     if (isReadOnly()) {
526         throw std::logic_error("CouchKVStore::del: Not valid on a read-only "
527                         "object.");
528     }
529     if (!intransaction) {
530         throw std::invalid_argument("CouchKVStore::del: intransaction must be "
531                         "true to perform a delete operation.");
532     }
533
534     uint64_t fileRev = dbFileRevMap[itm.getVBucketId()];
535     MutationRequestCallback requestcb;
536     requestcb.delCb = &cb;
537     CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, true);
538     pendingReqsQ.push_back(req);
539 }
540
541 void CouchKVStore::delVBucket(uint16_t vbucket) {
542     if (isReadOnly()) {
543         throw std::logic_error("CouchKVStore::delVBucket: Not valid on a "
544                         "read-only object.");
545     }
546
547     unlinkCouchFile(vbucket, dbFileRevMap[vbucket]);
548
549     if (cachedVBStates[vbucket]) {
550         delete cachedVBStates[vbucket];
551     }
552
553     std::string failovers("[{\"id\":0, \"seq\":0}]");
554     cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
555                                                 0, 0, 0, INITIAL_DRIFT,
556                                                 failovers);
557     updateDbFileMap(vbucket, 1);
558 }
559
560 std::vector<vbucket_state *> CouchKVStore::listPersistedVbuckets() {
561     return cachedVBStates;
562 }
563
564 void CouchKVStore::getPersistedStats(std::map<std::string,
565                                      std::string> &stats) {
566     char *buffer = NULL;
567     std::string fname = dbname + "/stats.json";
568     if (access(fname.c_str(), R_OK) == -1) {
569         return ;
570     }
571
572     std::ifstream session_stats;
573     session_stats.exceptions (session_stats.failbit | session_stats.badbit);
574     try {
575         session_stats.open(fname.c_str(), std::ios::binary);
576         session_stats.seekg(0, std::ios::end);
577         int flen = session_stats.tellg();
578         if (flen < 0) {
579             LOG(EXTENSION_LOG_WARNING,
580                 "Error in session stats ifstream!!!");
581             session_stats.close();
582             return;
583         }
584         session_stats.seekg(0, std::ios::beg);
585         buffer = new char[flen + 1];
586         session_stats.read(buffer, flen);
587         session_stats.close();
588         buffer[flen] = '\0';
589
590         cJSON *json_obj = cJSON_Parse(buffer);
591         if (!json_obj) {
592             LOG(EXTENSION_LOG_WARNING,
593                 "Failed to parse the session stats json doc!!!");
594             delete[] buffer;
595             return;
596         }
597
598         int json_arr_size = cJSON_GetArraySize(json_obj);
599         for (int i = 0; i < json_arr_size; ++i) {
600             cJSON *obj = cJSON_GetArrayItem(json_obj, i);
601             if (obj) {
602                 stats[obj->string] = obj->valuestring ? obj->valuestring : "";
603             }
604         }
605         cJSON_Delete(json_obj);
606
607     } catch (const std::ifstream::failure &e) {
608         LOG(EXTENSION_LOG_WARNING,
609             "Failed to load the engine session stats "
610             "due to IO exception \"%s\"", e.what());
611     } catch (...) {
612         LOG(EXTENSION_LOG_WARNING,
613             "Failed to load the engine session stats "
614             "due to IO exception");
615     }
616
617     delete[] buffer;
618 }
619
620 static std::string getDBFileName(const std::string &dbname,
621                                  uint16_t vbid,
622                                  uint64_t rev) {
623     std::stringstream ss;
624     ss << dbname << "/" << vbid << ".couch." << rev;
625     return ss.str();
626 }
627
628 static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
629     if ((*info)->rev_meta.size == DEFAULT_META_LEN) {
630         // Metadata doesn't have flex_meta_code, datatype and
631         // conflict_resolution_mode, provision space for
632         // these paramenters.
633         const unsigned char* data;
634         bool ret;
635         if (((*info)->content_meta | COUCH_DOC_IS_COMPRESSED) ==
636                 (*info)->content_meta) {
637             size_t uncompr_len;
638             snappy_uncompressed_length(item->buf, item->size, &uncompr_len);
639             char *dbuf = (char *) malloc(uncompr_len);
640             snappy_uncompress(item->buf, item->size, dbuf, &uncompr_len);
641             data = (const unsigned char*)dbuf;
642             ret = checkUTF8JSON(data, uncompr_len);
643             free(dbuf);
644         } else {
645             data = (const unsigned char*)item->buf;
646             ret = checkUTF8JSON(data, item->size);
647         }
648         uint8_t flex_code = FLEX_META_CODE;
649         uint8_t datatype;
650         if (ret) {
651             datatype = PROTOCOL_BINARY_DATATYPE_JSON;
652         } else {
653             datatype = PROTOCOL_BINARY_RAW_BYTES;
654         }
655
656         DocInfo *docinfo = (DocInfo *) calloc (sizeof(DocInfo) +
657                                                (*info)->id.size +
658                                                (*info)->rev_meta.size +
659                                                FLEX_DATA_OFFSET + EXT_META_LEN +
660                                                sizeof(uint8_t),
661                                                sizeof(uint8_t));
662         if (!docinfo) {
663             LOG(EXTENSION_LOG_WARNING, "Failed to allocate docInfo, "
664                     "while editing docinfo in the compaction's docinfo_hook");
665             return 0;
666         }
667
668         char *extra = (char *)docinfo + sizeof(DocInfo);
669         memcpy(extra, (*info)->id.buf, (*info)->id.size);
670         docinfo->id.buf = extra;
671         docinfo->id.size = (*info)->id.size;
672
673         extra += (*info)->id.size;
674         memcpy(extra, (*info)->rev_meta.buf, (*info)->rev_meta.size);
675         memcpy(extra + (*info)->rev_meta.size,
676                &flex_code, FLEX_DATA_OFFSET);
677         memcpy(extra + (*info)->rev_meta.size + FLEX_DATA_OFFSET,
678                &datatype, sizeof(uint8_t));
679         uint8_t conflict_resolution_mode = revision_seqno;
680         memcpy(extra + (*info)->rev_meta.size + FLEX_DATA_OFFSET + EXT_META_LEN,
681                &conflict_resolution_mode, sizeof(uint8_t));
682         docinfo->rev_meta.buf = extra;
683         docinfo->rev_meta.size = (*info)->rev_meta.size +
684                                  FLEX_DATA_OFFSET + EXT_META_LEN +
685                                  sizeof(uint8_t);
686
687         docinfo->db_seq = (*info)->db_seq;
688         docinfo->rev_seq = (*info)->rev_seq;
689         docinfo->deleted = (*info)->deleted;
690         docinfo->content_meta = (*info)->content_meta;
691         docinfo->bp = (*info)->bp;
692         docinfo->size = (*info)->size;
693
694         couchstore_free_docinfo(*info);
695         *info = docinfo;
696         return 1;
697     } else if ((*info)->rev_meta.size == DEFAULT_META_LEN + 2) {
698         // Metadata doesn't have conflict_resolution_mode,
699         // provision space for this flag.
700         DocInfo *docinfo = (DocInfo *) calloc (sizeof(DocInfo) +
701                                                (*info)->id.size +
702                                                (*info)->rev_meta.size +
703                                                sizeof(uint8_t),
704                                                sizeof(uint8_t));
705         if (!docinfo) {
706             LOG(EXTENSION_LOG_WARNING, "Failed to allocate docInfo, "
707                     "while editing docinfo in the compaction's docinfo_hook");
708             return 0;
709         }
710
711         char *extra = (char *)docinfo + sizeof(DocInfo);
712         memcpy(extra, (*info)->id.buf, (*info)->id.size);
713         docinfo->id.buf = extra;
714         docinfo->id.size = (*info)->id.size;
715
716         extra += (*info)->id.size;
717         memcpy(extra, (*info)->rev_meta.buf, (*info)->rev_meta.size);
718         uint8_t conflict_resolution_mode = revision_seqno;
719         memcpy(extra + (*info)->rev_meta.size,
720                &conflict_resolution_mode, sizeof(uint8_t));
721         docinfo->rev_meta.buf = extra;
722         docinfo->rev_meta.size = (*info)->rev_meta.size +
723                                  sizeof(uint8_t);
724
725         docinfo->db_seq = (*info)->db_seq;
726         docinfo->rev_seq = (*info)->rev_seq;
727         docinfo->deleted = (*info)->deleted;
728         docinfo->content_meta = (*info)->content_meta;
729         docinfo->bp = (*info)->bp;
730         docinfo->size = (*info)->size;
731
732         couchstore_free_docinfo(*info);
733         *info = docinfo;
734         return 1;
735     }
736     return 0;
737 }
738
739 static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
740     compaction_ctx* ctx = (compaction_ctx*) ctx_p;
741     DbInfo infoDb;
742
743     couchstore_db_info(d, &infoDb);
744     //Compaction finished
745     if (info == NULL) {
746         return couchstore_set_purge_seq(d, ctx->max_purged_seq);
747     }
748
749     if (info->rev_meta.size >= DEFAULT_META_LEN) {
750         uint32_t exptime;
751         memcpy(&exptime, info->rev_meta.buf + 8, 4);
752         exptime = ntohl(exptime);
753         if (info->deleted) {
754             if (info->db_seq != infoDb.last_sequence) {
755                 if (ctx->drop_deletes) { // all deleted items must be dropped ...
756                     if (ctx->max_purged_seq < info->db_seq) {
757                         ctx->max_purged_seq = info->db_seq; // track max_purged_seq
758                     }
759                     return COUCHSTORE_COMPACT_DROP_ITEM;      // ...unconditionally
760                 }
761                 if (exptime < ctx->purge_before_ts &&
762                         (!ctx->purge_before_seq ||
763                          info->db_seq <= ctx->purge_before_seq)) {
764                     if (ctx->max_purged_seq < info->db_seq) {
765                         ctx->max_purged_seq = info->db_seq;
766                     }
767                     return COUCHSTORE_COMPACT_DROP_ITEM;
768                 }
769             }
770         } else if (exptime && exptime < ctx->curr_time) {
771             std::string key(info->id.buf, info->id.size);
772             ctx->expiryCallback->callback(key, info->rev_seq);
773         }
774     }
775
776     if (ctx->bloomFilterCallback) {
777         bool deleted = info->deleted;
778         std::string key((const char *)info->id.buf, info->id.size);
779         ctx->bloomFilterCallback->callback(key, deleted);
780     }
781
782     return COUCHSTORE_COMPACT_KEEP_ITEM;
783 }
784
785 bool CouchKVStore::compactDB(compaction_ctx *hook_ctx,
786                              Callback<kvstats_ctx> &kvcb) {
787     if (isReadOnly()) {
788         throw std::logic_error("CouchKVStore::compactDB: Cannot perform "
789                         "on a read-only instance.");
790     }
791
792     couchstore_compact_hook       hook = time_purge_hook;
793     couchstore_docinfo_hook      dhook = edit_docinfo_hook;
794     const couch_file_ops     *def_iops = couchstore_get_default_file_ops();
795     Db                      *compactdb = NULL;
796     Db                       *targetDb = NULL;
797     couchstore_error_t         errCode = COUCHSTORE_SUCCESS;
798     hrtime_t                     start = gethrtime();
799     std::string                 dbfile;
800     std::string           compact_file;
801     std::string               new_file;
802     kvstats_ctx                  kvctx;
803     DbInfo                        info;
804     uint16_t                      vbid = hook_ctx->db_file_id;
805     uint64_t                   fileRev = dbFileRevMap[vbid];
806     uint64_t                   new_rev = fileRev + 1;
807
808     // Open the source VBucket database file ...
809     errCode = openDB(vbid, fileRev, &compactdb,
810                      (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
811     if (errCode != COUCHSTORE_SUCCESS) {
812         LOG(EXTENSION_LOG_WARNING,
813                 "Failed to open database, vbucketId = %d "
814                 "fileRev = %" PRIu64, vbid, fileRev);
815         return false;
816     }
817
818     // Build the temporary vbucket.compact file name
819     dbfile       = getDBFileName(dbname, vbid, fileRev);
820     compact_file = dbfile + ".compact";
821
822     // Perform COMPACTION of vbucket.couch.rev into vbucket.couch.rev.compact
823     errCode = couchstore_compact_db_ex(compactdb, compact_file.c_str(), 0,
824                                        hook, dhook, hook_ctx, def_iops);
825     if (errCode != COUCHSTORE_SUCCESS) {
826         LOG(EXTENSION_LOG_WARNING,
827             "Failed to compact database with name=%s "
828             "error=%s errno=%s",
829             dbfile.c_str(),
830             couchstore_strerror(errCode),
831             couchkvstore_strerrno(compactdb, errCode).c_str());
832         closeDatabaseHandle(compactdb);
833         return false;
834     }
835
836     // Close the source Database File once compaction is done
837     closeDatabaseHandle(compactdb);
838
839     // Rename the .compact file to one with the next revision number
840     new_file = getDBFileName(dbname, vbid, new_rev);
841     if (rename(compact_file.c_str(), new_file.c_str()) != 0) {
842         LOG(EXTENSION_LOG_WARNING,
843             "Failed to rename '%s' to '%s': %s",
844             compact_file.c_str(), new_file.c_str(),
845             getSystemStrerror().c_str());
846
847         removeCompactFile(compact_file);
848         return false;
849     }
850
851     // Open the newly compacted VBucket database file ...
852     errCode = openDB(vbid, new_rev, &targetDb,
853                      (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
854     if (errCode != COUCHSTORE_SUCCESS) {
855         LOG(EXTENSION_LOG_WARNING,
856                 "Failed to open compacted database file %s "
857                 "fileRev = %" PRIu64, new_file.c_str(), new_rev);
858         if (remove(new_file.c_str()) != 0) {
859             LOG(EXTENSION_LOG_WARNING,
860                 "Failed to remove '%s': %s",
861                 new_file.c_str(), getSystemStrerror().c_str());
862         }
863         return false;
864     }
865
866     // Update the global VBucket file map so all operations use the new file
867     updateDbFileMap(vbid, new_rev);
868
869     LOG(EXTENSION_LOG_INFO,
870             "INFO: created new couch db file, name=%s rev=%" PRIu64,
871             new_file.c_str(), new_rev);
872
873     // Update stats to caller
874     kvctx.vbucket = vbid;
875     couchstore_db_info(targetDb, &info);
876     kvctx.fileSpaceUsed = info.space_used;
877     kvctx.fileSize = info.file_size;
878     kvcb.callback(kvctx);
879
880     // also update cached state with dbinfo
881     vbucket_state *state = cachedVBStates[vbid];
882     if (state) {
883         state->highSeqno = info.last_sequence;
884         state->purgeSeqno = info.purge_seq;
885         cachedDeleteCount[vbid] = info.deleted_count;
886         cachedDocCount[vbid] = info.doc_count;
887     }
888
889     closeDatabaseHandle(targetDb);
890
891     // Removing the stale couch file
892     unlinkCouchFile(vbid, fileRev);
893
894     st.compactHisto.add((gethrtime() - start) / 1000);
895
896     return true;
897 }
898
899 vbucket_state * CouchKVStore::getVBucketState(uint16_t vbucketId) {
900     return cachedVBStates[vbucketId];
901 }
902
903 bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
904                                    Callback<kvstats_ctx> *kvcb, bool reset) {
905     Db *db = NULL;
906     uint64_t fileRev, newFileRev;
907     std::stringstream id, rev;
908     std::string dbFileName;
909     std::map<uint16_t, uint64_t>::iterator mapItr;
910     kvstats_ctx kvctx;
911     kvctx.vbucket = vbucketId;
912
913     id << vbucketId;
914     fileRev = dbFileRevMap[vbucketId];
915     rev << fileRev;
916     dbFileName = dbname + "/" + id.str() + ".couch." + rev.str();
917
918     couchstore_error_t errorCode;
919     errorCode = openDB(vbucketId, fileRev, &db,
920             (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE, &newFileRev, reset);
921     if (errorCode != COUCHSTORE_SUCCESS) {
922         ++st.numVbSetFailure;
923         LOG(EXTENSION_LOG_WARNING,
924                 "Failed to open database, name=%s",
925                 dbFileName.c_str());
926         return false;
927     }
928
929     fileRev = newFileRev;
930     rev << fileRev;
931     dbFileName = dbname + "/" + id.str() + ".couch." + rev.str();
932
933     vbucket_state *state = cachedVBStates[vbucketId];
934     vbstate.highSeqno = state->highSeqno;
935     vbstate.lastSnapStart = state->lastSnapStart;
936     vbstate.lastSnapEnd = state->lastSnapEnd;
937     vbstate.maxDeletedSeqno = state->maxDeletedSeqno;
938     vbstate.maxCas = state->maxCas;
939     vbstate.driftCounter = state->driftCounter;
940
941     errorCode = saveVBState(db, vbstate);
942     if (errorCode != COUCHSTORE_SUCCESS) {
943         ++st.numVbSetFailure;
944         LOG(EXTENSION_LOG_WARNING,
945                 "Failed to save local doc, name=%s",
946                 dbFileName.c_str());
947         closeDatabaseHandle(db);
948         return false;
949     }
950
951     errorCode = couchstore_commit(db);
952     if (errorCode != COUCHSTORE_SUCCESS) {
953         ++st.numVbSetFailure;
954         LOG(EXTENSION_LOG_WARNING,
955                 "Commit failed, vbid=%u rev=%" PRIu64 " error=%s [%s]",
956                 vbucketId, fileRev, couchstore_strerror(errorCode),
957                 couchkvstore_strerrno(db, errorCode).c_str());
958         closeDatabaseHandle(db);
959         return false;
960     }
961     if (kvcb) {
962         DbInfo info;
963         couchstore_db_info(db, &info);
964         kvctx.fileSpaceUsed = info.space_used;
965         kvctx.fileSize = info.file_size;
966         kvcb->callback(kvctx);
967     }
968     closeDatabaseHandle(db);
969
970     return true;
971 }
972
973 bool CouchKVStore::snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
974                                    Callback<kvstats_ctx> *cb, bool persist) {
975     if (isReadOnly()) {
976         LOG(EXTENSION_LOG_WARNING,
977             "Snapshotting a vbucket cannot be performed on a read-only "
978             "KVStore instance");
979         return false;
980     }
981
982     hrtime_t start = gethrtime();
983
984     std::string stateStr = updateCachedVBState(vbucketId, vbstate);
985
986     if (!stateStr.empty() && persist) {
987         if (!setVBucketState(vbucketId, vbstate, cb)) {
988             LOG(EXTENSION_LOG_WARNING,
989                 "Failed to persist new state, %s, for vbucket %d\n",
990                 VBucket::toString(vbstate.state), vbucketId);
991            return false;
992         }
993     }
994
995     st.snapshotHisto.add((gethrtime() - start) / 1000);
996
997     return true;
998 }
999
1000 StorageProperties CouchKVStore::getStorageProperties() {
1001     StorageProperties rv(true, true, true, true);
1002     return rv;
1003 }
1004
1005 bool CouchKVStore::commit(Callback<kvstats_ctx> *cb) {
1006     if (isReadOnly()) {
1007         throw std::logic_error("CouchKVStore::commit: Not valid on a read-only "
1008                         "object.");
1009     }
1010
1011     if (intransaction) {
1012         if (commit2couchstore(cb)) {
1013             intransaction = false;
1014         }
1015     }
1016
1017     return !intransaction;
1018 }
1019
1020 void CouchKVStore::addStats(const std::string &prefix,
1021                             ADD_STAT add_stat,
1022                             const void *c) {
1023     const char *prefix_str = prefix.c_str();
1024
1025     /* stats for both read-only and read-write threads */
1026     addStat(prefix_str, "backend_type",   "couchstore",       add_stat, c);
1027     addStat(prefix_str, "open",           st.numOpen,         add_stat, c);
1028     addStat(prefix_str, "close",          st.numClose,        add_stat, c);
1029     addStat(prefix_str, "readTime",       st.readTimeHisto,   add_stat, c);
1030     addStat(prefix_str, "readSize",       st.readSizeHisto,   add_stat, c);
1031     addStat(prefix_str, "numLoadedVb",    st.numLoadedVb,     add_stat, c);
1032
1033     // failure stats
1034     addStat(prefix_str, "failure_open",   st.numOpenFailure, add_stat, c);
1035     addStat(prefix_str, "failure_get",    st.numGetFailure,  add_stat, c);
1036
1037     if (!isReadOnly()) {
1038         addStat(prefix_str, "failure_set",   st.numSetFailure,   add_stat, c);
1039         addStat(prefix_str, "failure_del",   st.numDelFailure,   add_stat, c);
1040         addStat(prefix_str, "failure_vbset", st.numVbSetFailure, add_stat, c);
1041         addStat(prefix_str, "lastCommDocs",  st.docsCommitted,   add_stat, c);
1042     }
1043
1044     addStat(prefix_str, "io_num_read", st.io_num_read, add_stat, c);
1045     addStat(prefix_str, "io_num_write", st.io_num_write, add_stat, c);
1046     addStat(prefix_str, "io_read_bytes", st.io_read_bytes, add_stat, c);
1047     addStat(prefix_str, "io_write_bytes", st.io_write_bytes, add_stat, c);
1048
1049 }
1050
1051 void CouchKVStore::addTimingStats(const std::string &prefix,
1052                                   ADD_STAT add_stat, const void *c) {
1053     if (isReadOnly()) {
1054         return;
1055     }
1056     const char *prefix_str = prefix.c_str();
1057     addStat(prefix_str, "commit",      st.commitHisto,      add_stat, c);
1058     addStat(prefix_str, "compact",     st.compactHisto,     add_stat, c);
1059     addStat(prefix_str, "snapshot",    st.snapshotHisto,    add_stat, c);
1060     addStat(prefix_str, "delete",      st.delTimeHisto,     add_stat, c);
1061     addStat(prefix_str, "save_documents", st.saveDocsHisto, add_stat, c);
1062     addStat(prefix_str, "writeTime",   st.writeTimeHisto,   add_stat, c);
1063     addStat(prefix_str, "writeSize",   st.writeSizeHisto,   add_stat, c);
1064     addStat(prefix_str, "bulkSize",    st.batchSize,        add_stat, c);
1065
1066     // Couchstore file ops stats
1067     addStat(prefix_str, "fsReadTime",  st.fsStats.readTimeHisto,  add_stat, c);
1068     addStat(prefix_str, "fsWriteTime", st.fsStats.writeTimeHisto, add_stat, c);
1069     addStat(prefix_str, "fsSyncTime",  st.fsStats.syncTimeHisto,  add_stat, c);
1070     addStat(prefix_str, "fsReadSize",  st.fsStats.readSizeHisto,  add_stat, c);
1071     addStat(prefix_str, "fsWriteSize", st.fsStats.writeSizeHisto, add_stat, c);
1072     addStat(prefix_str, "fsReadSeek",  st.fsStats.readSeekHisto,  add_stat, c);
1073 }
1074
1075 template <typename T>
1076 void CouchKVStore::addStat(const std::string &prefix, const char *stat, T &val,
1077                            ADD_STAT add_stat, const void *c) {
1078     std::stringstream fullstat;
1079     fullstat << prefix << ":" << stat;
1080     add_casted_stat(fullstat.str().c_str(), val, add_stat, c);
1081 }
1082
1083 void CouchKVStore::pendingTasks() {
1084     if (isReadOnly()) {
1085         throw std::logic_error("CouchKVStore::pendingTasks: Not valid on a "
1086                         "read-only object.");
1087     }
1088
1089     if (!pendingFileDeletions.empty()) {
1090         std::queue<std::string> queue;
1091         pendingFileDeletions.getAll(queue);
1092
1093         while (!queue.empty()) {
1094             std::string filename_str = queue.front();
1095             if (remove(filename_str.c_str()) == -1) {
1096                 LOG(EXTENSION_LOG_WARNING, "Failed to remove file '%s' "
1097                     "with error code: %d", filename_str.c_str(), errno);
1098                 if (errno != ENOENT) {
1099                     pendingFileDeletions.push(filename_str);
1100                 }
1101             }
1102             queue.pop();
1103         }
1104     }
1105 }
1106
1107 ScanContext* CouchKVStore::initScanContext(shared_ptr<Callback<GetValue> > cb,
1108                                            shared_ptr<Callback<CacheLookup> > cl,
1109                                            uint16_t vbid, uint64_t startSeqno,
1110                                            DocumentFilter options,
1111                                            ValueFilter valOptions) {
1112     Db *db = NULL;
1113     uint64_t rev = dbFileRevMap[vbid];
1114     couchstore_error_t errorCode = openDB(vbid, rev, &db,
1115                                           COUCHSTORE_OPEN_FLAG_RDONLY);
1116     if (errorCode != COUCHSTORE_SUCCESS) {
1117         LOG(EXTENSION_LOG_WARNING, "Failed to open database, "
1118             "name=%s/%" PRIu16 ".couch.%" PRIu64, dbname.c_str(), vbid, rev);
1119         remVBucketFromDbFileMap(vbid);
1120         return NULL;
1121     }
1122
1123     DbInfo info;
1124     errorCode = couchstore_db_info(db, &info);
1125     if (errorCode != COUCHSTORE_SUCCESS) {
1126         LOG(EXTENSION_LOG_WARNING, "Failed to read DB info for backfill");
1127         closeDatabaseHandle(db);
1128         abort();
1129     }
1130
1131     uint64_t count = 0;
1132     errorCode = couchstore_changes_count(db,
1133                                          startSeqno,
1134                                          std::numeric_limits<uint64_t>::max(),
1135                                          &count);
1136     if (errorCode != COUCHSTORE_SUCCESS) {
1137         std::string err("CouchKVStore::initScanContext:Failed to obtain changes "
1138                         "count with error: " +
1139                         std::string(couchstore_strerror(errorCode)));
1140         closeDatabaseHandle(db);
1141         throw std::runtime_error(err);
1142     }
1143
1144     size_t backfillId = backfillCounter++;
1145
1146     LockHolder lh(backfillLock);
1147     backfills[backfillId] = db;
1148
1149     return new ScanContext(cb, cl, vbid, backfillId, startSeqno,
1150                            info.last_sequence, options,
1151                            valOptions, count);
1152 }
1153
1154 scan_error_t CouchKVStore::scan(ScanContext* ctx) {
1155     if (!ctx) {
1156         return scan_failed;
1157     }
1158
1159     if (ctx->lastReadSeqno == ctx->maxSeqno) {
1160         return scan_success;
1161     }
1162
1163     LockHolder lh(backfillLock);
1164     std::map<size_t, Db*>::iterator itr = backfills.find(ctx->scanId);
1165     if (itr == backfills.end()) {
1166         return scan_failed;
1167     }
1168
1169     Db* db = itr->second;
1170     lh.unlock();
1171
1172     couchstore_docinfos_options options;
1173     switch (ctx->docFilter) {
1174         case DocumentFilter::NO_DELETES:
1175             options = COUCHSTORE_NO_DELETES;
1176             break;
1177         case DocumentFilter::ONLY_DELETES:
1178             options = COUCHSTORE_DELETES_ONLY;
1179             break;
1180         case DocumentFilter::ALL_ITEMS:
1181             options = COUCHSTORE_NO_OPTIONS;
1182             break;
1183         default:
1184             std::string err("CouchKVStore::scan:Illegal document filter!" +
1185                             std::to_string(static_cast<int>(ctx->docFilter)));
1186             throw std::runtime_error(err);
1187     }
1188
1189     uint64_t start = ctx->startSeqno;
1190     if (ctx->lastReadSeqno != 0) {
1191         start = ctx->lastReadSeqno + 1;
1192     }
1193
1194     couchstore_error_t errorCode;
1195     errorCode = couchstore_changes_since(db, start, options, recordDbDumpC,
1196                                          static_cast<void*>(ctx));
1197     if (errorCode != COUCHSTORE_SUCCESS) {
1198         if (errorCode == COUCHSTORE_ERROR_CANCEL) {
1199             return scan_again;
1200         } else {
1201             LOG(EXTENSION_LOG_WARNING,
1202                 "couchstore_changes_since failed, error=%s [%s]",
1203                 couchstore_strerror(errorCode),
1204                 couchkvstore_strerrno(db, errorCode).c_str());
1205             remVBucketFromDbFileMap(ctx->vbid);
1206             return scan_failed;
1207         }
1208     }
1209     return scan_success;
1210 }
1211
1212 void CouchKVStore::destroyScanContext(ScanContext* ctx) {
1213     if (!ctx) {
1214         return;
1215     }
1216
1217     LockHolder lh(backfillLock);
1218     std::map<size_t, Db*>::iterator itr = backfills.find(ctx->scanId);
1219     if (itr != backfills.end()) {
1220         closeDatabaseHandle(itr->second);
1221         backfills.erase(itr);
1222     }
1223     delete ctx;
1224 }
1225
1226 void CouchKVStore::close() {
1227     intransaction = false;
1228 }
1229
1230 uint64_t CouchKVStore::checkNewRevNum(std::string &dbFileName, bool newFile) {
1231     uint64_t newrev = 0;
1232     std::string nameKey;
1233
1234     if (!newFile) {
1235         // extract out the file revision number first
1236         size_t secondDot = dbFileName.rfind(".");
1237         nameKey = dbFileName.substr(0, secondDot);
1238     } else {
1239         nameKey = dbFileName;
1240     }
1241     nameKey.append(".");
1242     const std::vector<std::string> files = findFilesWithPrefix(nameKey);
1243     std::vector<std::string>::const_iterator itor;
1244     // found file(s) whoes name has the same key name pair with different
1245     // revision number
1246     for (itor = files.begin(); itor != files.end(); ++itor) {
1247         const std::string &filename = *itor;
1248         if (endWithCompact(filename)) {
1249             continue;
1250         }
1251
1252         size_t secondDot = filename.rfind(".");
1253         char *ptr = NULL;
1254         uint64_t revnum = strtoull(filename.substr(secondDot + 1).c_str(), &ptr, 10);
1255         if (newrev < revnum) {
1256             newrev = revnum;
1257             dbFileName = filename;
1258         }
1259     }
1260     return newrev;
1261 }
1262
1263 void CouchKVStore::updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev) {
1264     if (vbucketId >= numDbFiles) {
1265         LOG(EXTENSION_LOG_WARNING,
1266             "Cannot update db file map for an invalid vbucket, "
1267             "vbucket id = %d, rev = %" PRIu64, vbucketId, newFileRev);
1268         return;
1269     }
1270
1271     dbFileRevMap[vbucketId] = newFileRev;
1272 }
1273
1274 couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
1275                                         uint64_t fileRev,
1276                                         Db **db,
1277                                         uint64_t options,
1278                                         uint64_t *newFileRev,
1279                                         bool reset) {
1280     std::string dbFileName = getDBFileName(dbname, vbucketId, fileRev);
1281     couch_file_ops* ops = &statCollectingFileOps;
1282
1283     uint64_t newRevNum = fileRev;
1284     couchstore_error_t errorCode = COUCHSTORE_SUCCESS;
1285
1286     if (reset) {
1287         errorCode = couchstore_open_db_ex(dbFileName.c_str(), options,
1288                                           ops, db);
1289         if (errorCode == COUCHSTORE_SUCCESS) {
1290             newRevNum = 1;
1291             updateDbFileMap(vbucketId, fileRev);
1292             LOG(EXTENSION_LOG_INFO,
1293                 "reset: created new couchstore file, name=%s rev=%" PRIu64,
1294                 dbFileName.c_str(), fileRev);
1295         } else {
1296             LOG(EXTENSION_LOG_WARNING,
1297                 "reset: creating a new couchstore file,"
1298                 "name=%s rev=%" PRIu64 " failed with error=%s", dbFileName.c_str(),
1299                 fileRev, couchstore_strerror(errorCode));
1300         }
1301     } else {
1302         if (options == COUCHSTORE_OPEN_FLAG_CREATE) {
1303             // first try to open the requested file without the
1304             // create option in case it does already exist
1305             errorCode = couchstore_open_db_ex(dbFileName.c_str(), 0, ops, db);
1306             if (errorCode != COUCHSTORE_SUCCESS) {
1307                 // open_db failed but still check if the file exists
1308                 newRevNum = checkNewRevNum(dbFileName);
1309                 bool fileExists = (newRevNum) ? true : false;
1310                 if (fileExists) {
1311                     errorCode = openDB_retry(dbFileName, 0, ops, db,
1312                                              &newRevNum);
1313                 } else {
1314                     // requested file doesn't seem to exist, just create one
1315                     errorCode = couchstore_open_db_ex(dbFileName.c_str(),
1316                                                       options, ops, db);
1317                     if (errorCode == COUCHSTORE_SUCCESS) {
1318                         newRevNum = 1;
1319                         updateDbFileMap(vbucketId, fileRev);
1320                         LOG(EXTENSION_LOG_INFO,
1321                             "INFO: created new couch db file, name=%s rev=%" PRIu64,
1322                             dbFileName.c_str(), fileRev);
1323                     }
1324                 }
1325             }
1326         } else {
1327             errorCode = openDB_retry(dbFileName, options, ops, db,
1328                                      &newRevNum);
1329         }
1330     }
1331
1332     /* update command statistics */
1333     st.numOpen++;
1334     if (errorCode) {
1335         st.numOpenFailure++;
1336         LOG(EXTENSION_LOG_WARNING, "couchstore_open_db failed, name=%s"
1337             " option=%" PRIX64 " rev=%" PRIu64 " error=%s [%s]",
1338             dbFileName.c_str(), options,
1339             ((newRevNum > fileRev) ? newRevNum : fileRev),
1340             couchstore_strerror(errorCode),
1341             getSystemStrerror().c_str());
1342     } else {
1343         if (newRevNum > fileRev) {
1344             // new revision number found, update it
1345             updateDbFileMap(vbucketId, newRevNum);
1346         }
1347     }
1348
1349     if (newFileRev != NULL) {
1350         *newFileRev = (newRevNum > fileRev) ? newRevNum : fileRev;
1351     }
1352     return errorCode;
1353 }
1354
1355 couchstore_error_t CouchKVStore::openDB_retry(std::string &dbfile,
1356                                               uint64_t options,
1357                                               const couch_file_ops *ops,
1358                                               Db** db, uint64_t *newFileRev) {
1359     int retry = 0;
1360     couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1361
1362     while (retry < MAX_OPEN_DB_RETRY) {
1363         errCode = couchstore_open_db_ex(dbfile.c_str(), options, ops, db);
1364         if (errCode == COUCHSTORE_SUCCESS) {
1365             return errCode;
1366         }
1367         LOG(EXTENSION_LOG_INFO, "INFO: couchstore_open_db failed, name=%s "
1368             "options=%" PRIX64 " error=%s [%s], try it again!",
1369             dbfile.c_str(), options, couchstore_strerror(errCode),
1370             getSystemStrerror().c_str());
1371         *newFileRev = checkNewRevNum(dbfile);
1372         ++retry;
1373         if (retry == MAX_OPEN_DB_RETRY - 1 && options == 0 &&
1374             errCode == COUCHSTORE_ERROR_NO_SUCH_FILE) {
1375             options = COUCHSTORE_OPEN_FLAG_CREATE;
1376         }
1377     }
1378     return errCode;
1379 }
1380
1381 void CouchKVStore::populateFileNameMap(std::vector<std::string> &filenames,
1382                                        std::vector<uint16_t> *vbids) {
1383     std::vector<std::string>::iterator fileItr;
1384
1385     for (fileItr = filenames.begin(); fileItr != filenames.end(); ++fileItr) {
1386         const std::string &filename = *fileItr;
1387         size_t secondDot = filename.rfind(".");
1388         std::string nameKey = filename.substr(0, secondDot);
1389         size_t firstDot = nameKey.rfind(".");
1390 #ifdef _MSC_VER
1391         size_t firstSlash = nameKey.rfind("\\");
1392 #else
1393         size_t firstSlash = nameKey.rfind("/");
1394 #endif
1395
1396         std::string revNumStr = filename.substr(secondDot + 1);
1397         char *ptr = NULL;
1398         uint64_t revNum = strtoull(revNumStr.c_str(), &ptr, 10);
1399
1400         std::string vbIdStr = nameKey.substr(firstSlash + 1,
1401                                             (firstDot - firstSlash) - 1);
1402         if (allDigit(vbIdStr)) {
1403             int vbId = atoi(vbIdStr.c_str());
1404             if (vbids) {
1405                 vbids->push_back(static_cast<uint16_t>(vbId));
1406             }
1407             uint64_t old_rev_num = dbFileRevMap[vbId];
1408             if (old_rev_num == revNum) {
1409                 continue;
1410             } else if (old_rev_num < revNum) { // stale revision found
1411                 dbFileRevMap[vbId] = revNum;
1412             } else { // stale file found (revision id has rolled over)
1413                 old_rev_num = revNum;
1414             }
1415             std::stringstream old_file;
1416             old_file << dbname << "/" << vbId << ".couch." << old_rev_num;
1417             if (access(old_file.str().c_str(), F_OK) == 0) {
1418                 if (!isReadOnly()) {
1419                     if (remove(old_file.str().c_str()) == 0) {
1420                         LOG(EXTENSION_LOG_INFO, "Removed stale file '%s'",
1421                             old_file.str().c_str());
1422                     } else {
1423                         LOG(EXTENSION_LOG_WARNING,
1424                             "Warning: Failed to remove the stale file '%s': %s",
1425                             old_file.str().c_str(), getSystemStrerror().c_str());
1426                     }
1427                 } else {
1428                     LOG(EXTENSION_LOG_WARNING,
1429                         "A read-only instance of the underlying store was not "
1430                         "allowed to delete a stale file: %s!",
1431                         old_file.str().c_str());
1432                 }
1433             }
1434         } else {
1435             // skip non-vbucket database file, master.couch etc
1436             LOG(EXTENSION_LOG_DEBUG,
1437                 "Non-vbucket database file, %s, skip adding "
1438                 "to CouchKVStore dbFileMap\n", filename.c_str());
1439         }
1440     }
1441 }
1442
1443 couchstore_error_t CouchKVStore::fetchDoc(Db *db, DocInfo *docinfo,
1444                                           GetValue &docValue, uint16_t vbId,
1445                                           bool metaOnly, bool fetchDelete) {
1446     couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1447     sized_buf metadata = docinfo->rev_meta;
1448     uint32_t itemFlags;
1449     uint64_t cas;
1450     time_t exptime;
1451     uint8_t ext_meta[EXT_META_LEN];
1452     uint8_t ext_len;
1453     uint8_t conf_res_mode = 0;
1454
1455     if (metadata.size < DEFAULT_META_LEN) {
1456         throw std::invalid_argument("CouchKVStore::fetchDoc: "
1457                         "docValue->rev_meta.size (which is " +
1458                         std::to_string(metadata.size) +
1459                         ") is less than DEFAULT_META_LEN (which is " +
1460                         std::to_string(DEFAULT_META_LEN) + ")");
1461     }
1462
1463     if (metadata.size == DEFAULT_META_LEN) {
1464         memcpy(&cas, (metadata.buf), 8);
1465         memcpy(&exptime, (metadata.buf) + 8, 4);
1466         memcpy(&itemFlags, (metadata.buf) + 12, 4);
1467         ext_len = 0;
1468     } else {
1469         //metadata.size => 19, FLEX_META_CODE at offset 16
1470         memcpy(&cas, (metadata.buf), 8);
1471         memcpy(&exptime, (metadata.buf) + 8, 4);
1472         memcpy(&itemFlags, (metadata.buf) + 12, 4);
1473         memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1474                EXT_META_LEN);
1475         memcpy(&conf_res_mode, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET +
1476                EXT_META_LEN, CONFLICT_RES_META_LEN);
1477         ext_len = EXT_META_LEN;
1478     }
1479
1480     cas = ntohll(cas);
1481     exptime = ntohl(exptime);
1482
1483     if (metaOnly || (fetchDelete && docinfo->deleted)) {
1484         Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1485                             itemFlags, (time_t)exptime, NULL, docinfo->size,
1486                             ext_meta, ext_len, cas, docinfo->db_seq, vbId);
1487         if (docinfo->deleted) {
1488             it->setDeleted();
1489         }
1490
1491         it->setConflictResMode(
1492                 static_cast<enum conflict_resolution_mode>(conf_res_mode));
1493         it->setRevSeqno(docinfo->rev_seq);
1494         docValue = GetValue(it);
1495         // update ep-engine IO stats
1496         ++st.io_num_read;
1497         st.io_read_bytes.fetch_add(docinfo->id.size);
1498     } else {
1499         Doc *doc = NULL;
1500         errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1501                                                    DECOMPRESS_DOC_BODIES);
1502         if (errCode == COUCHSTORE_SUCCESS) {
1503             if (docinfo->deleted) {
1504                 // do not read a doc that is marked deleted, just return the
1505                 // error code as not found but still release the document body.
1506                 errCode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1507             } else {
1508                 if (doc == nullptr) {
1509                     throw std::logic_error("CouchKVStore::fetchDoc: doc is NULL");
1510                 }
1511                 if (doc->id.size > UINT16_MAX) {
1512                     throw std::logic_error("CouchKVStore::fetchDoc: "
1513                             "doc->id.size (which is" +
1514                             std::to_string(doc->id.size) + ") is greater than "
1515                             + std::to_string(UINT16_MAX));
1516                 }
1517
1518                 size_t valuelen = doc->data.size;
1519                 void *valuePtr = doc->data.buf;
1520
1521                 /**
1522                  * Set Datatype correctly if data is being
1523                  * read from couch files where datatype is
1524                  * not supported.
1525                  */
1526                 if (metadata.size == DEFAULT_META_LEN) {
1527                     ext_len = EXT_META_LEN;
1528                     ext_meta[0] = determine_datatype((const unsigned char*)valuePtr,
1529                                                      valuelen);
1530                 }
1531
1532                 Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1533                                     itemFlags, (time_t)exptime, valuePtr, valuelen,
1534                                     ext_meta, ext_len, cas, docinfo->db_seq, vbId,
1535                                     docinfo->rev_seq);
1536
1537                 it->setConflictResMode(
1538                            static_cast<enum conflict_resolution_mode>(conf_res_mode));
1539
1540                 docValue = GetValue(it);
1541
1542                 // update ep-engine IO stats
1543                 ++st.io_num_read;
1544                 st.io_read_bytes.fetch_add(docinfo->id.size + valuelen);
1545             }
1546             couchstore_free_document(doc);
1547         }
1548     }
1549     return errCode;
1550 }
1551
1552 int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
1553
1554     ScanContext* sctx = static_cast<ScanContext*>(ctx);
1555     shared_ptr<Callback<GetValue> > cb = sctx->callback;
1556     shared_ptr<Callback<CacheLookup> > cl = sctx->lookup;
1557
1558     Doc *doc = NULL;
1559     void *valuePtr = NULL;
1560     size_t valuelen = 0;
1561     uint64_t byseqno = docinfo->db_seq;
1562     sized_buf  metadata = docinfo->rev_meta;
1563     uint16_t vbucketId = sctx->vbid;
1564     sized_buf key = docinfo->id;
1565     uint32_t itemflags;
1566     uint64_t cas;
1567     uint32_t exptime;
1568     uint8_t ext_meta[EXT_META_LEN] = {0};
1569     uint8_t ext_len;
1570     uint8_t conf_res_mode = 0;
1571
1572     if (key.size > UINT16_MAX) {
1573         throw std::invalid_argument("CouchKVStore::recordDbDump: "
1574                         "docinfo->id.size (which is " + std::to_string(key.size) +
1575                         ") is greater than " + std::to_string(UINT16_MAX));
1576     }
1577     if (metadata.size < DEFAULT_META_LEN) {
1578         throw std::invalid_argument("CouchKVStore::recordDbDump: "
1579                         "docinfo->rev_meta.size (which is " + std::to_string(key.size) +
1580                         ") is less than " + std::to_string(DEFAULT_META_LEN));
1581     }
1582
1583     std::string docKey(docinfo->id.buf, docinfo->id.size);
1584     CacheLookup lookup(docKey, byseqno, vbucketId);
1585     cl->callback(lookup);
1586     if (cl->getStatus() == ENGINE_KEY_EEXISTS) {
1587         sctx->lastReadSeqno = byseqno;
1588         return COUCHSTORE_SUCCESS;
1589     } else if (cl->getStatus() == ENGINE_ENOMEM) {
1590         return COUCHSTORE_ERROR_CANCEL;
1591     }
1592
1593     if (metadata.size == DEFAULT_META_LEN) {
1594         memcpy(&cas, (metadata.buf), 8);
1595         memcpy(&exptime, (metadata.buf) + 8, 4);
1596         memcpy(&itemflags, (metadata.buf) + 12, 4);
1597         ext_len = 0;
1598     } else {
1599         //metadata.size > 16, FLEX_META_CODE at offset 16
1600         memcpy(&cas, (metadata.buf), 8);
1601         memcpy(&exptime, (metadata.buf) + 8, 4);
1602         memcpy(&itemflags, (metadata.buf) + 12, 4);
1603         memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1604                EXT_META_LEN);
1605         memcpy(&conf_res_mode, (metadata.buf) + DEFAULT_META_LEN +
1606                FLEX_DATA_OFFSET + EXT_META_LEN, CONFLICT_RES_META_LEN);
1607         ext_len = EXT_META_LEN;
1608     }
1609     exptime = ntohl(exptime);
1610     cas = ntohll(cas);
1611
1612     if (sctx->valFilter != ValueFilter::KEYS_ONLY && !docinfo->deleted) {
1613         couchstore_error_t errCode;
1614         bool expectCompressed = false;
1615         /**
1616          * If couch files do not support datatype or no special
1617          * request is made to retrieve compressed documents as is,
1618          * then DECOMPRESS the document.
1619          */
1620         couchstore_open_options openOptions = 0;
1621         if (metadata.size == DEFAULT_META_LEN ||
1622             sctx->valFilter == ValueFilter::VALUES_DECOMPRESSED) {
1623             openOptions = DECOMPRESS_DOC_BODIES;
1624         } else {
1625             // => sctx->valFilter == ValueFilter::VALUES_COMPRESSED
1626             expectCompressed = true;
1627         }
1628         errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc, openOptions);
1629
1630         if (errCode == COUCHSTORE_SUCCESS) {
1631             if (doc->data.size) {
1632                 valuelen = doc->data.size;
1633                 valuePtr = doc->data.buf;
1634
1635                 /**
1636                  * Set Datatype correctly if data is being
1637                  * read from couch files where datatype is
1638                  * not supported.
1639                  */
1640                 if (metadata.size == DEFAULT_META_LEN) {
1641                     ext_len = EXT_META_LEN;
1642                     ext_meta[0] = determine_datatype((const unsigned char*)valuePtr,
1643                                                      valuelen);
1644                 }
1645
1646                 if (expectCompressed) {
1647                     /**
1648                      * If a compressed document was retrieved as is,
1649                      * update the datatype of the document.
1650                      */
1651                     uint8_t datatype = ext_meta[0];
1652                     if (datatype == PROTOCOL_BINARY_DATATYPE_JSON) {
1653                         ext_meta[0] = PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON;
1654                     } else if (datatype == PROTOCOL_BINARY_RAW_BYTES) {
1655                         ext_meta[0] = PROTOCOL_BINARY_DATATYPE_COMPRESSED;
1656                     }
1657                 }
1658             }
1659         } else {
1660             LOG(EXTENSION_LOG_WARNING,
1661                 "Failed to retrieve key value from database "
1662                 "database, vBucket=%d key=%s error=%s [%s]\n",
1663                 vbucketId, key.buf, couchstore_strerror(errCode),
1664                 couchkvstore_strerrno(db, errCode).c_str());
1665             return COUCHSTORE_SUCCESS;
1666         }
1667     }
1668
1669     Item *it = new Item((void *)key.buf,
1670                         key.size,
1671                         itemflags,
1672                         (time_t)exptime,
1673                         valuePtr, valuelen,
1674                         ext_meta, ext_len,
1675                         cas,
1676                         docinfo->db_seq, // return seq number being persisted on disk
1677                         vbucketId,
1678                         docinfo->rev_seq);
1679     if (docinfo->deleted) {
1680         it->setDeleted();
1681     }
1682
1683     it->setConflictResMode(
1684                  static_cast<enum conflict_resolution_mode>(conf_res_mode));
1685
1686     bool onlyKeys = (sctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false;
1687     GetValue rv(it, ENGINE_SUCCESS, -1, onlyKeys);
1688     cb->callback(rv);
1689
1690     couchstore_free_document(doc);
1691
1692     if (cb->getStatus() == ENGINE_ENOMEM) {
1693         return COUCHSTORE_ERROR_CANCEL;
1694     }
1695
1696     sctx->lastReadSeqno = byseqno;
1697     return COUCHSTORE_SUCCESS;
1698 }
1699
1700 bool CouchKVStore::commit2couchstore(Callback<kvstats_ctx> *cb) {
1701     bool success = true;
1702
1703     size_t pendingCommitCnt = pendingReqsQ.size();
1704     if (pendingCommitCnt == 0) {
1705         return success;
1706     }
1707
1708     Doc **docs = new Doc *[pendingCommitCnt];
1709     DocInfo **docinfos = new DocInfo *[pendingCommitCnt];
1710
1711     if (pendingReqsQ[0] == nullptr) {
1712         throw std::logic_error("CouchKVStore::commit2couchstore: "
1713                         "pendingReqsQ[0] is NULL");
1714     }
1715     uint16_t vbucket2flush = pendingReqsQ[0]->getVBucketId();
1716     uint64_t fileRev = pendingReqsQ[0]->getRevNum();
1717     for (size_t i = 0; i < pendingCommitCnt; ++i) {
1718         CouchRequest *req = pendingReqsQ[i];
1719         if (req == nullptr) {
1720             throw std::logic_error("CouchKVStore::commit2couchstore: "
1721                                        "pendingReqsQ["
1722                                        + std::to_string(i) + "] is NULL");
1723         }
1724         docs[i] = (Doc *)req->getDbDoc();
1725         docinfos[i] = req->getDbDocInfo();
1726         if (vbucket2flush != req->getVBucketId()) {
1727             throw std::logic_error(
1728                     "CouchKVStore::commit2couchstore: "
1729                     "mismatch between vbucket2flush (which is "
1730                     + std::to_string(vbucket2flush) + ") and pendingReqsQ["
1731                     + std::to_string(i) + "] (which is "
1732                     + std::to_string(req->getVBucketId()) + ")");
1733         }
1734     }
1735
1736     kvstats_ctx kvctx;
1737     kvctx.vbucket = vbucket2flush;
1738     // flush all
1739     couchstore_error_t errCode = saveDocs(vbucket2flush, fileRev, docs,
1740                                           docinfos, pendingCommitCnt,
1741                                           kvctx);
1742     if (errCode) {
1743         LOG(EXTENSION_LOG_WARNING,
1744             "Commit failed, cannot save CouchDB docs "
1745             "for vbucket = %d rev = %" PRIu64, vbucket2flush, fileRev);
1746     }
1747     if (cb) {
1748         cb->callback(kvctx);
1749     }
1750     commitCallback(pendingReqsQ, kvctx, errCode);
1751
1752     // clean up
1753     for (size_t i = 0; i < pendingCommitCnt; ++i) {
1754         delete pendingReqsQ[i];
1755     }
1756     pendingReqsQ.clear();
1757     delete [] docs;
1758     delete [] docinfos;
1759     return success;
1760 }
1761
1762 static int readDocInfos(Db *db, DocInfo *docinfo, void *ctx) {
1763     if (ctx == nullptr) {
1764         throw std::invalid_argument("readDocInfos: ctx must be non-NULL");
1765     }
1766     kvstats_ctx *cbCtx = static_cast<kvstats_ctx *>(ctx);
1767     if(docinfo) {
1768         // An item exists in the VB DB file.
1769         if (!docinfo->deleted) {
1770             std::string key(docinfo->id.buf, docinfo->id.size);
1771             unordered_map<std::string, kstat_entry_t>::iterator itr =
1772                 cbCtx->keyStats.find(key);
1773             if (itr != cbCtx->keyStats.end()) {
1774                 itr->second.first = true;
1775             }
1776         }
1777     }
1778     return 0;
1779 }
1780
1781 couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev,
1782                                           Doc **docs, DocInfo **docinfos,
1783                                           size_t docCount, kvstats_ctx &kvctx) {
1784     couchstore_error_t errCode;
1785     uint64_t fileRev = rev;
1786     DbInfo info;
1787     if (rev == 0) {
1788         throw std::invalid_argument("CouchKVStore::saveDocs: rev must be non-zero");
1789     }
1790
1791     Db *db = NULL;
1792     uint64_t newFileRev;
1793     errCode = openDB(vbid, fileRev, &db, 0, &newFileRev);
1794     if (errCode != COUCHSTORE_SUCCESS) {
1795         LOG(EXTENSION_LOG_WARNING,
1796                 "Failed to open database, vbucketId = %d "
1797                 "fileRev = %" PRIu64 " numDocs = %" PRIu64, vbid, fileRev,
1798                 uint64_t(docCount));
1799         return errCode;
1800     } else {
1801         vbucket_state *state = cachedVBStates[vbid];
1802         if (state == nullptr) {
1803             throw std::logic_error(
1804                     "CouchKVStore::saveDocs: cachedVBStates[" +
1805                     std::to_string(vbid) + "] is NULL");
1806         }
1807
1808         uint64_t maxDBSeqno = 0;
1809         sized_buf *ids = new sized_buf[docCount];
1810         for (size_t idx = 0; idx < docCount; idx++) {
1811             ids[idx] = docinfos[idx]->id;
1812             maxDBSeqno = std::max(maxDBSeqno, docinfos[idx]->db_seq);
1813             std::string key(ids[idx].buf, ids[idx].size);
1814             kvctx.keyStats[key] = std::make_pair(false,
1815                     !docinfos[idx]->deleted);
1816         }
1817         couchstore_docinfos_by_id(db, ids, (unsigned) docCount, 0,
1818                 readDocInfos, &kvctx);
1819         delete[] ids;
1820
1821         hrtime_t cs_begin = gethrtime();
1822         uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
1823         errCode = couchstore_save_documents(db, docs, docinfos,
1824                 (unsigned) docCount, flags);
1825         st.saveDocsHisto.add((gethrtime() - cs_begin) / 1000);
1826         if (errCode != COUCHSTORE_SUCCESS) {
1827             LOG(EXTENSION_LOG_WARNING,
1828                     "Failed to save docs to database, "
1829                     "numDocs = %" PRIu64 " error=%s [%s]\n",
1830                     uint64_t(docCount), couchstore_strerror(errCode),
1831                     couchkvstore_strerrno(db, errCode).c_str());
1832             closeDatabaseHandle(db);
1833             return errCode;
1834         }
1835
1836         errCode = saveVBState(db, *state);
1837         if (errCode != COUCHSTORE_SUCCESS) {
1838             LOG(EXTENSION_LOG_WARNING, "Failed to save local docs to "
1839                 "database, error=%s [%s]", couchstore_strerror(errCode),
1840                 couchkvstore_strerrno(db, errCode).c_str());
1841                 closeDatabaseHandle(db);
1842                 return errCode;
1843         }
1844
1845         cs_begin = gethrtime();
1846         errCode = couchstore_commit(db);
1847         st.commitHisto.add((gethrtime() - cs_begin) / 1000);
1848         if (errCode) {
1849             LOG(EXTENSION_LOG_WARNING,
1850                     "couchstore_commit failed, error=%s [%s]",
1851                     couchstore_strerror(errCode),
1852                     couchkvstore_strerrno(db, errCode).c_str());
1853             closeDatabaseHandle(db);
1854             return errCode;
1855         }
1856
1857         st.batchSize.add(docCount);
1858
1859         // retrieve storage system stats for file fragmentation computation
1860         couchstore_db_info(db, &info);
1861         kvctx.fileSpaceUsed = info.space_used;
1862         kvctx.fileSize = info.file_size;
1863         cachedDeleteCount[vbid] = info.deleted_count;
1864         cachedDocCount[vbid] = info.doc_count;
1865
1866         if (maxDBSeqno != info.last_sequence) {
1867             LOG(EXTENSION_LOG_WARNING, "Seqno in db header (%" PRIu64 ")"
1868                 " is not matched with what was persisted (%" PRIu64 ")"
1869                 " for vbucket %d",
1870                 info.last_sequence, maxDBSeqno, vbid);
1871         }
1872         state->highSeqno = info.last_sequence;
1873
1874         closeDatabaseHandle(db);
1875     }
1876
1877     /* update stat */
1878     if(errCode == COUCHSTORE_SUCCESS) {
1879         st.docsCommitted = docCount;
1880     }
1881
1882     return errCode;
1883 }
1884
1885 void CouchKVStore::remVBucketFromDbFileMap(uint16_t vbucketId) {
1886     if (vbucketId >= numDbFiles) {
1887         LOG(EXTENSION_LOG_WARNING,
1888             "Cannot remove db file map entry for an invalid vbucket, "
1889             "vbucket id = %d\n", vbucketId);
1890         return;
1891     }
1892
1893     // just reset revision number of the requested vbucket
1894     dbFileRevMap[vbucketId] = 1;
1895 }
1896
1897 void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
1898                                   kvstats_ctx &kvctx,
1899                                   couchstore_error_t errCode) {
1900     size_t commitSize = committedReqs.size();
1901
1902     for (size_t index = 0; index < commitSize; index++) {
1903         size_t dataSize = committedReqs[index]->getNBytes();
1904         size_t keySize = committedReqs[index]->getKey().length();
1905         /* update ep stats */
1906         ++st.io_num_write;
1907         st.io_write_bytes.fetch_add(keySize + dataSize);
1908
1909         if (committedReqs[index]->isDelete()) {
1910             int rv = getMutationStatus(errCode);
1911             if (rv != -1) {
1912                 const std::string &key = committedReqs[index]->getKey();
1913                 if (kvctx.keyStats[key].first) {
1914                     rv = 1; // Deletion is for an existing item on DB file.
1915                 } else {
1916                     rv = 0; // Deletion is for a non-existing item on DB file.
1917                 }
1918             }
1919             if (errCode) {
1920                 ++st.numDelFailure;
1921             } else {
1922                 st.delTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1923             }
1924             committedReqs[index]->getDelCallback()->callback(rv);
1925         } else {
1926             int rv = getMutationStatus(errCode);
1927             const std::string &key = committedReqs[index]->getKey();
1928             bool insertion = !kvctx.keyStats[key].first;
1929             if (errCode) {
1930                 ++st.numSetFailure;
1931             } else {
1932                 st.writeTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1933                 st.writeSizeHisto.add(dataSize + keySize);
1934             }
1935             mutation_result p(rv, insertion);
1936             committedReqs[index]->getSetCallback()->callback(p);
1937         }
1938     }
1939 }
1940
1941 ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
1942     sized_buf id;
1943     LocalDoc *ldoc = NULL;
1944     couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1945     vbucket_state_t state = vbucket_state_dead;
1946     uint64_t checkpointId = 0;
1947     uint64_t maxDeletedSeqno = 0;
1948     int64_t highSeqno = 0;
1949     std::string failovers("[{\"id\":0,\"seq\":0}]");
1950     uint64_t purgeSeqno = 0;
1951     uint64_t lastSnapStart = 0;
1952     uint64_t lastSnapEnd = 0;
1953     uint64_t maxCas = 0;
1954     int64_t driftCounter = INITIAL_DRIFT;
1955
1956     DbInfo info;
1957     errCode = couchstore_db_info(db, &info);
1958     if (errCode == COUCHSTORE_SUCCESS) {
1959         highSeqno = info.last_sequence;
1960         purgeSeqno = info.purge_seq;
1961     } else {
1962         LOG(EXTENSION_LOG_WARNING,
1963             "CouchKVStore::readVBState:Failed to read database info "
1964             "for vbucket: %d with error: %s", vbId,
1965             couchstore_strerror(errCode));
1966         return couchErr2EngineErr(errCode);
1967     }
1968
1969     id.buf = (char *)"_local/vbstate";
1970     id.size = sizeof("_local/vbstate") - 1;
1971     errCode = couchstore_open_local_document(db, (void *)id.buf,
1972                                              id.size, &ldoc);
1973     if (errCode != COUCHSTORE_SUCCESS) {
1974         LOG(EXTENSION_LOG_DEBUG,
1975             "CouchKVStore::readVBState: Failed to "
1976             "retrieve stat info for vBucket: %d with error: %s",
1977             vbId, couchstore_strerror(errCode));
1978     } else {
1979         const std::string statjson(ldoc->json.buf, ldoc->json.size);
1980         cJSON *jsonObj = cJSON_Parse(statjson.c_str());
1981         if (!jsonObj) {
1982             couchstore_free_local_document(ldoc);
1983             LOG(EXTENSION_LOG_WARNING, "CouchKVStore::readVBState: Failed to "
1984                 "parse the vbstat json doc for vbucket %d: %s",
1985                 vbId , statjson.c_str());
1986             return couchErr2EngineErr(errCode);
1987         }
1988
1989         const std::string vb_state = getJSONObjString(
1990                                 cJSON_GetObjectItem(jsonObj, "state"));
1991         const std::string checkpoint_id = getJSONObjString(
1992                                 cJSON_GetObjectItem(jsonObj,"checkpoint_id"));
1993         const std::string max_deleted_seqno = getJSONObjString(
1994                                 cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
1995         const std::string snapStart = getJSONObjString(
1996                                 cJSON_GetObjectItem(jsonObj, "snap_start"));
1997         const std::string snapEnd = getJSONObjString(
1998                                 cJSON_GetObjectItem(jsonObj, "snap_end"));
1999         const std::string maxCasValue = getJSONObjString(
2000                                 cJSON_GetObjectItem(jsonObj, "max_cas"));
2001         const std::string driftCount = getJSONObjString(
2002                                 cJSON_GetObjectItem(jsonObj, "drift_counter"));
2003         cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
2004         if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
2005                 || max_deleted_seqno.compare("") == 0) {
2006             LOG(EXTENSION_LOG_WARNING, "CouchKVStore::readVBState: State JSON doc "
2007                 "for vbucket: %d is in the wrong format: %s, vb state: %s,"
2008                 "checkpoint id: %s and max deleted seqno: %s",
2009                 vbId, statjson.c_str(), vb_state.c_str(),
2010                 checkpoint_id.c_str(), max_deleted_seqno.c_str());
2011         } else {
2012             state = VBucket::fromString(vb_state.c_str());
2013             parseUint64(max_deleted_seqno.c_str(), &maxDeletedSeqno);
2014             parseUint64(checkpoint_id.c_str(), &checkpointId);
2015
2016             if (snapStart.compare("") == 0) {
2017                 lastSnapStart = highSeqno;
2018             } else {
2019                 parseUint64(snapStart.c_str(), &lastSnapStart);
2020             }
2021
2022             if (snapEnd.compare("") == 0) {
2023                 lastSnapEnd = highSeqno;
2024             } else {
2025                 parseUint64(snapEnd.c_str(), &lastSnapEnd);
2026             }
2027
2028             if (maxCasValue.compare("") != 0) {
2029                 parseUint64(maxCasValue.c_str(), &maxCas);
2030             }
2031
2032             if (driftCount.compare("") != 0) {
2033                 parseInt64(driftCount.c_str(), &driftCounter);
2034             }
2035
2036             if (failover_json) {
2037                 char* json = cJSON_PrintUnformatted(failover_json);
2038                 failovers.assign(json);
2039                 free(json);
2040             }
2041         }
2042         cJSON_Delete(jsonObj);
2043         couchstore_free_local_document(ldoc);
2044     }
2045
2046     delete cachedVBStates[vbId];
2047     cachedVBStates[vbId] = new vbucket_state(state, checkpointId,
2048                                              maxDeletedSeqno, highSeqno,
2049                                              purgeSeqno, lastSnapStart,
2050                                              lastSnapEnd, maxCas, driftCounter,
2051                                              failovers);
2052
2053     return couchErr2EngineErr(errCode);
2054 }
2055
2056 couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState) {
2057     std::stringstream jsonState;
2058
2059     jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
2060               << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
2061               << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno << "\""
2062               << ",\"failover_table\": " << vbState.failovers
2063               << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
2064               << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
2065               << ",\"max_cas\": \"" << vbState.maxCas << "\""
2066               << ",\"drift_counter\": \"" << vbState.driftCounter << "\""
2067               << "}";
2068
2069     LocalDoc lDoc;
2070     lDoc.id.buf = (char *)"_local/vbstate";
2071     lDoc.id.size = sizeof("_local/vbstate") - 1;
2072     std::string state = jsonState.str();
2073     lDoc.json.buf = (char *)state.c_str();
2074     lDoc.json.size = state.size();
2075     lDoc.deleted = 0;
2076
2077     couchstore_error_t errCode = couchstore_save_local_document(db, &lDoc);
2078     if (errCode != COUCHSTORE_SUCCESS) {
2079         LOG(EXTENSION_LOG_WARNING,
2080             "couchstore_save_local_document failed "
2081             "error=%s [%s]\n", couchstore_strerror(errCode),
2082             couchkvstore_strerrno(db, errCode).c_str());
2083     }
2084     return errCode;
2085 }
2086
2087 int CouchKVStore::getMultiCb(Db *db, DocInfo *docinfo, void *ctx) {
2088     if (docinfo == nullptr) {
2089         throw std::invalid_argument("CouchKVStore::getMultiCb: docinfo "
2090                 "must be non-NULL");
2091     }
2092     if (ctx == nullptr) {
2093         throw std::invalid_argument("CouchKVStore::getMultiCb: ctx must "
2094                 "be non-NULL");
2095     }
2096
2097     std::string keyStr(docinfo->id.buf, docinfo->id.size);
2098     GetMultiCbCtx *cbCtx = static_cast<GetMultiCbCtx *>(ctx);
2099     CouchKVStoreStats &st = cbCtx->cks.getCKVStoreStat();
2100
2101     vb_bgfetch_queue_t::iterator qitr = cbCtx->fetches.find(keyStr);
2102     if (qitr == cbCtx->fetches.end()) {
2103         // this could be a serious race condition in couchstore,
2104         // log a warning message and continue
2105         LOG(EXTENSION_LOG_WARNING,
2106             "Couchstore returned invalid docinfo, "
2107             "no pending bgfetch has been issued for key = %s\n",
2108             keyStr.c_str());
2109         return 0;
2110     }
2111
2112     vb_bgfetch_item_ctx_t& bg_itm_ctx = (*qitr).second;
2113     bool meta_only = bg_itm_ctx.isMetaOnly;
2114
2115     GetValue returnVal;
2116
2117     couchstore_error_t errCode = cbCtx->cks.fetchDoc(db, docinfo, returnVal,
2118                                                      cbCtx->vbId, meta_only);
2119     if (errCode != COUCHSTORE_SUCCESS && !meta_only) {
2120         LOG(EXTENSION_LOG_WARNING, "Failed to fetch data from database, "
2121             "vBucket=%d key=%s error=%s [%s]", cbCtx->vbId,
2122             keyStr.c_str(), couchstore_strerror(errCode),
2123             couchkvstore_strerrno(db, errCode).c_str());
2124         st.numGetFailure++;
2125     }
2126
2127     returnVal.setStatus(cbCtx->cks.couchErr2EngineErr(errCode));
2128
2129     std::list<VBucketBGFetchItem *> &fetches = bg_itm_ctx.bgfetched_list;
2130     std::list<VBucketBGFetchItem *>::iterator itr = fetches.begin();
2131
2132     for (itr = fetches.begin(); itr != fetches.end(); ++itr) {
2133         // populate return value for remaining fetch items with the
2134         // same seqid
2135         (*itr)->value = returnVal;
2136         st.readTimeHisto.add((gethrtime() - (*itr)->initTime) / 1000);
2137         if (errCode == COUCHSTORE_SUCCESS) {
2138             st.readSizeHisto.add(returnVal.getValue()->getNKey() +
2139                                  returnVal.getValue()->getNBytes());
2140         }
2141     }
2142     return 0;
2143 }
2144
2145
2146 void CouchKVStore::closeDatabaseHandle(Db *db) {
2147     couchstore_error_t ret = couchstore_close_db(db);
2148     if (ret != COUCHSTORE_SUCCESS) {
2149         LOG(EXTENSION_LOG_WARNING,
2150             "couchstore_close_db failed, error=%s [%s]",
2151             couchstore_strerror(ret), couchkvstore_strerrno(NULL, ret).c_str());
2152     }
2153     st.numClose++;
2154 }
2155
2156 ENGINE_ERROR_CODE CouchKVStore::couchErr2EngineErr(couchstore_error_t errCode) {
2157     switch (errCode) {
2158     case COUCHSTORE_SUCCESS:
2159         return ENGINE_SUCCESS;
2160     case COUCHSTORE_ERROR_ALLOC_FAIL:
2161         return ENGINE_ENOMEM;
2162     case COUCHSTORE_ERROR_DOC_NOT_FOUND:
2163         return ENGINE_KEY_ENOENT;
2164     case COUCHSTORE_ERROR_NO_SUCH_FILE:
2165     case COUCHSTORE_ERROR_NO_HEADER:
2166     default:
2167         // same as the general error return code of
2168         // EventuallyPersistentStore::getInternal
2169         return ENGINE_TMPFAIL;
2170     }
2171 }
2172
2173 size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
2174     size_t delCount = cachedDeleteCount[vbid];
2175     if (delCount != (size_t) -1) {
2176         return delCount;
2177     }
2178
2179     Db *db = NULL;
2180     uint64_t rev = dbFileRevMap[vbid];
2181     couchstore_error_t errCode = openDB(vbid, rev, &db,
2182                                         COUCHSTORE_OPEN_FLAG_RDONLY);
2183     if (errCode == COUCHSTORE_SUCCESS) {
2184         DbInfo info;
2185         errCode = couchstore_db_info(db, &info);
2186         if (errCode == COUCHSTORE_SUCCESS) {
2187             cachedDeleteCount[vbid] = info.deleted_count;
2188             closeDatabaseHandle(db);
2189             return info.deleted_count;
2190         } else {
2191             throw std::runtime_error("CouchKVStore::getNumPersistedDeletes:"
2192                 "Failed to read database info for vBucket = " +
2193                 std::to_string(vbid) + " rev = " + std::to_string(rev) +
2194                 " with error:" + couchstore_strerror(errCode));
2195         }
2196         closeDatabaseHandle(db);
2197     } else {
2198         throw std::invalid_argument("CouchKVStore::getNumPersistedDeletes:"
2199             "Failed to open database file for vBucket = " +
2200             std::to_string(vbid) + " rev = " + std::to_string(rev) +
2201             " with error:" + couchstore_strerror(errCode));
2202     }
2203     return 0;
2204 }
2205
2206 DBFileInfo CouchKVStore::getDbFileInfo(uint16_t vbid) {
2207     Db *db = NULL;
2208     uint64_t rev = dbFileRevMap[vbid];
2209
2210     DBFileInfo vbinfo;
2211
2212     couchstore_error_t errCode = openDB(vbid, rev, &db,
2213                                         COUCHSTORE_OPEN_FLAG_RDONLY);
2214     if (errCode == COUCHSTORE_SUCCESS) {
2215         DbInfo info;
2216         errCode = couchstore_db_info(db, &info);
2217         if (errCode == COUCHSTORE_SUCCESS) {
2218             cachedDocCount[vbid] = info.doc_count;
2219             vbinfo.itemCount = info.doc_count;
2220             vbinfo.fileSize = info.file_size;
2221             vbinfo.spaceUsed = info.space_used;
2222         } else {
2223             throw std::runtime_error("CouchKVStore::getDbFileInfo: Failed "
2224                 "to read database info for vBucket = " + std::to_string(vbid) +
2225                 " rev = " + std::to_string(rev) +
2226                 " with error:" + couchstore_strerror(errCode));
2227         }
2228         closeDatabaseHandle(db);
2229     } else {
2230         throw std::invalid_argument("CouchKVStore::getDbFileInfo: Failed "
2231             "to open database file for vBucket = " + std::to_string(vbid) +
2232             " rev = " + std::to_string(rev) +
2233             " with error:" + couchstore_strerror(errCode));
2234     }
2235     return vbinfo;
2236 }
2237
2238 size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
2239                                  uint64_t max_seq) {
2240     Db *db = NULL;
2241     uint64_t count = 0;
2242     uint64_t rev = dbFileRevMap[vbid];
2243     couchstore_error_t errCode = openDB(vbid, rev, &db,
2244                                         COUCHSTORE_OPEN_FLAG_RDONLY);
2245     if (errCode == COUCHSTORE_SUCCESS) {
2246         errCode = couchstore_changes_count(db, min_seq, max_seq, &count);
2247         if (errCode != COUCHSTORE_SUCCESS) {
2248             throw std::runtime_error("CouchKVStore::getNumItems: Failed to "
2249                 "get changes count for vBucket = " + std::to_string(vbid) +
2250                 " rev = " + std::to_string(rev) +
2251                 " with error:" + couchstore_strerror(errCode));
2252         }
2253         closeDatabaseHandle(db);
2254     } else {
2255         throw std::invalid_argument("CouchKVStore::getNumItems: Failed to "
2256             "open database file for vBucket = " + std::to_string(vbid) +
2257             " rev = " + std::to_string(rev) +
2258             " with error:" + couchstore_strerror(errCode));
2259     }
2260     return count;
2261 }
2262
2263 RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
2264                                       shared_ptr<RollbackCB> cb) {
2265
2266     Db *db = NULL;
2267     DbInfo info;
2268     uint64_t fileRev = dbFileRevMap[vbid];
2269     std::stringstream dbFileName;
2270     dbFileName << dbname << "/" << vbid << ".couch." << fileRev;
2271     couchstore_error_t errCode;
2272
2273     errCode = openDB(vbid, fileRev, &db,
2274                      (uint64_t) COUCHSTORE_OPEN_FLAG_RDONLY);
2275
2276     if (errCode == COUCHSTORE_SUCCESS) {
2277         errCode = couchstore_db_info(db, &info);
2278         if (errCode != COUCHSTORE_SUCCESS) {
2279             LOG(EXTENSION_LOG_WARNING,
2280                 "Failed to read DB info, name=%s",
2281                 dbFileName.str().c_str());
2282             closeDatabaseHandle(db);
2283             return RollbackResult(false, 0, 0, 0);
2284         }
2285     } else {
2286         LOG(EXTENSION_LOG_WARNING,
2287                 "Failed to open database, name=%s",
2288                 dbFileName.str().c_str());
2289         return RollbackResult(false, 0, 0, 0);
2290     }
2291
2292     uint64_t latestSeqno = info.last_sequence;
2293
2294     //Count from latest seq no to 0
2295     uint64_t totSeqCount = 0;
2296     errCode = couchstore_changes_count(db, 0, latestSeqno, &totSeqCount);
2297     if (errCode != COUCHSTORE_SUCCESS) {
2298         LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2299             "rollback vBucket = %d, rev = %" PRIu64, vbid, fileRev);
2300         closeDatabaseHandle(db);
2301         return RollbackResult(false, 0, 0, 0);
2302     }
2303
2304     Db *newdb = NULL;
2305     errCode = openDB(vbid, fileRev, &newdb, 0);
2306     if (errCode != COUCHSTORE_SUCCESS) {
2307         LOG(EXTENSION_LOG_WARNING,
2308                 "Failed to open database, name=%s",
2309                 dbFileName.str().c_str());
2310         closeDatabaseHandle(db);
2311         return RollbackResult(false, 0, 0, 0);
2312     }
2313
2314     while (info.last_sequence > rollbackSeqno) {
2315         errCode = couchstore_rewind_db_header(newdb);
2316         if (errCode != COUCHSTORE_SUCCESS) {
2317             LOG(EXTENSION_LOG_WARNING,
2318                     "Failed to rewind Db pointer "
2319                     "for couch file with vbid: %u, whose "
2320                     "lastSeqno: %" PRIu64 ", while trying to roll back "
2321                     "to seqNo: %" PRIu64,
2322                     vbid, latestSeqno, rollbackSeqno);
2323             //Reset the vbucket and send the entire snapshot,
2324             //as a previous header wasn't found.
2325             closeDatabaseHandle(db);
2326             return RollbackResult(false, 0, 0, 0);
2327         }
2328         errCode = couchstore_db_info(newdb, &info);
2329         if (errCode != COUCHSTORE_SUCCESS) {
2330             LOG(EXTENSION_LOG_WARNING,
2331                 "Failed to read DB info, name=%s",
2332                 dbFileName.str().c_str());
2333             closeDatabaseHandle(db);
2334             closeDatabaseHandle(newdb);
2335             return RollbackResult(false, 0, 0, 0);
2336         }
2337     }
2338
2339     //Count from latest seq no to rollback seq no
2340     uint64_t rollbackSeqCount = 0;
2341     errCode = couchstore_changes_count(db, info.last_sequence, latestSeqno,
2342                                        &rollbackSeqCount);
2343     if (errCode != COUCHSTORE_SUCCESS) {
2344         LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2345             "rollback vBucket = %d, rev = %" PRIu64, vbid, fileRev);
2346         closeDatabaseHandle(db);
2347         closeDatabaseHandle(newdb);
2348         return RollbackResult(false, 0, 0, 0);
2349     }
2350
2351     if ((totSeqCount / 2) <= rollbackSeqCount) {
2352         //doresetVbucket flag set or rollback is greater than 50%,
2353         //reset the vbucket and send the entire snapshot
2354         closeDatabaseHandle(db);
2355         closeDatabaseHandle(newdb);
2356         return RollbackResult(false, 0, 0, 0);
2357     }
2358
2359     cb->setDbHeader(newdb);
2360
2361     shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
2362     ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence + 1,
2363                                        DocumentFilter::ALL_ITEMS,
2364                                        ValueFilter::KEYS_ONLY);
2365     scan_error_t error = scan(ctx);
2366     destroyScanContext(ctx);
2367
2368     if (error != scan_success) {
2369         closeDatabaseHandle(db);
2370         closeDatabaseHandle(newdb);
2371         return RollbackResult(false, 0, 0, 0);
2372     }
2373
2374     readVBState(newdb, vbid);
2375     cachedDeleteCount[vbid] = info.deleted_count;
2376     cachedDocCount[vbid] = info.doc_count;
2377
2378     closeDatabaseHandle(db);
2379     //Append the rewinded header to the database file, before closing handle
2380     errCode = couchstore_commit(newdb);
2381     closeDatabaseHandle(newdb);
2382
2383     if (errCode != COUCHSTORE_SUCCESS) {
2384         return RollbackResult(false, 0, 0, 0);
2385     }
2386
2387     vbucket_state *vb_state = cachedVBStates[vbid];
2388     return RollbackResult(true, vb_state->highSeqno,
2389                           vb_state->lastSnapStart, vb_state->lastSnapEnd);
2390 }
2391
2392 int populateAllKeys(Db *db, DocInfo *docinfo, void *ctx) {
2393     AllKeysCtx *allKeysCtx = (AllKeysCtx *)ctx;
2394     uint16_t keylen = docinfo->id.size;
2395     char *key = docinfo->id.buf;
2396     (allKeysCtx->cb)->callback(keylen, key);
2397     if (--(allKeysCtx->count) <= 0) {
2398         //Only when count met is less than the actual number of entries
2399         return COUCHSTORE_ERROR_CANCEL;
2400     }
2401     return COUCHSTORE_SUCCESS;
2402 }
2403
2404 ENGINE_ERROR_CODE
2405 CouchKVStore::getAllKeys(uint16_t vbid, std::string &start_key, uint32_t count,
2406                          shared_ptr<Callback<uint16_t&, char*&> > cb) {
2407     Db *db = NULL;
2408     uint64_t rev = dbFileRevMap[vbid];
2409     couchstore_error_t errCode = openDB(vbid, rev, &db,
2410                                         COUCHSTORE_OPEN_FLAG_RDONLY);
2411     if(errCode == COUCHSTORE_SUCCESS) {
2412         sized_buf ref = {NULL, 0};
2413         ref.buf = (char*) start_key.c_str();
2414         ref.size = start_key.size();
2415         AllKeysCtx ctx(cb, count);
2416         errCode = couchstore_all_docs(db, &ref, COUCHSTORE_NO_OPTIONS,
2417                                       populateAllKeys,
2418                                       static_cast<void *>(&ctx));
2419         closeDatabaseHandle(db);
2420         if (errCode == COUCHSTORE_SUCCESS ||
2421                 errCode == COUCHSTORE_ERROR_CANCEL)  {
2422             return ENGINE_SUCCESS;
2423         } else {
2424             LOG(EXTENSION_LOG_WARNING, "couchstore_all_docs failed for "
2425                     "database file of vbucket = %d rev = %" PRIu64 ", errCode = %u",
2426                     vbid, rev, errCode);
2427         }
2428     } else {
2429         LOG(EXTENSION_LOG_WARNING, "Failed to open database file for "
2430                 "vbucket = %d rev = %" PRIu64 ", errCode = %u", vbid, rev, errCode);
2431
2432     }
2433     return ENGINE_FAILED;
2434 }
2435
2436 void CouchKVStore::unlinkCouchFile(uint16_t vbucket,
2437                                    uint64_t fRev) {
2438
2439     if (isReadOnly()) {
2440         throw std::logic_error("CouchKVStore::unlinkCouchFile: Not valid on a "
2441                 "read-only object.");
2442     }
2443     char fname[PATH_MAX];
2444     snprintf(fname, sizeof(fname), "%s/%d.couch.%" PRIu64,
2445              dbname.c_str(), vbucket, fRev);
2446
2447     if (remove(fname) == -1) {
2448         LOG(EXTENSION_LOG_WARNING, "Failed to remove database file for "
2449             "vbucket = %d rev = %" PRIu64 ", errCode = %u", vbucket, fRev,
2450             errno);
2451
2452         if (errno != ENOENT) {
2453             std::string file_str = fname;
2454             pendingFileDeletions.push(file_str);
2455         }
2456     }
2457 }
2458
2459 void CouchKVStore::removeCompactFile(const std::string &dbname,
2460                                      uint16_t vbid,
2461                                      uint64_t fileRev) {
2462
2463     std::string dbfile = getDBFileName(dbname, vbid, fileRev);
2464     std::string compact_file = dbfile + ".compact";
2465
2466     if (!isReadOnly()) {
2467         removeCompactFile(compact_file);
2468     } else {
2469         LOG(EXTENSION_LOG_WARNING,
2470             "A read-only instance of the underlying store was not allowed "
2471             "to delete a temporary file: %s", compact_file.c_str());
2472     }
2473 }
2474
2475 void CouchKVStore::removeCompactFile(const std::string &filename) {
2476     if (isReadOnly()) {
2477         throw std::logic_error("CouchKVStore::removeCompactFile: Not valid on "
2478                 "a read-only object.");
2479     }
2480
2481     if (access(filename.c_str(), F_OK) == 0) {
2482         if (remove(filename.c_str()) == 0) {
2483             LOG(EXTENSION_LOG_WARNING,
2484                 "Removed compact file '%s'", filename.c_str());
2485         }
2486         else {
2487             LOG(EXTENSION_LOG_WARNING,
2488                 "Failed to remove compact file '%s': %s",
2489                 filename.c_str(), getSystemStrerror().c_str());
2490
2491             if (errno != ENOENT) {
2492                 pendingFileDeletions.push(const_cast<std::string &>(filename));
2493             }
2494         }
2495     }
2496 }
2497
2498 /* end of couch-kvstore.cc */