RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
std::shared_ptr<RollbackCB> cb) {
-
- Db *db = NULL;
+ DbHolder db(this);
DbInfo info;
uint64_t fileRev = dbFileRevMap[vbid];
std::stringstream dbFileName;
dbFileName << dbname << "/" << vbid << ".couch." << fileRev;
couchstore_error_t errCode;
- errCode = openDB(vbid, fileRev, &db,
+ errCode = openDB(vbid, fileRev, db.getDbAddress(),
(uint64_t) COUCHSTORE_OPEN_FLAG_RDONLY);
if (errCode == COUCHSTORE_SUCCESS) {
- errCode = couchstore_db_info(db, &info);
+ errCode = couchstore_db_info(db.getDb(), &info);
if (errCode != COUCHSTORE_SUCCESS) {
LOG(EXTENSION_LOG_WARNING,
"Failed to read DB info, name=%s",
dbFileName.str().c_str());
- closeDatabaseHandle(db);
return RollbackResult(false, 0, 0, 0);
}
} else {
//Count from latest seq no to 0
uint64_t totSeqCount = 0;
- errCode = couchstore_changes_count(db, 0, latestSeqno, &totSeqCount);
+ errCode = couchstore_changes_count(db.getDb(), 0, latestSeqno, &totSeqCount);
if (errCode != COUCHSTORE_SUCCESS) {
LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
"rollback vBucket = %d, rev = %" PRIu64 ", error=%s [%s]",
vbid, fileRev, couchstore_strerror(errCode),
cb_strerror().c_str());
- closeDatabaseHandle(db);
return RollbackResult(false, 0, 0, 0);
}
- Db *newdb = NULL;
- errCode = openDB(vbid, fileRev, &newdb, 0);
+ DbHolder newdb(this);
+ errCode = openDB(vbid, fileRev, newdb.getDbAddress(), 0);
if (errCode != COUCHSTORE_SUCCESS) {
LOG(EXTENSION_LOG_WARNING,
"Failed to open database, name=%s",
dbFileName.str().c_str());
- closeDatabaseHandle(db);
return RollbackResult(false, 0, 0, 0);
}
while (info.last_sequence > rollbackSeqno) {
- errCode = couchstore_rewind_db_header(newdb);
+ errCode = couchstore_rewind_db_header(newdb.getDb());
if (errCode != COUCHSTORE_SUCCESS) {
LOG(EXTENSION_LOG_WARNING,
"Failed to rewind Db pointer "
couchstore_strerror(errCode), cb_strerror().c_str());
//Reset the vbucket and send the entire snapshot,
//as a previous header wasn't found.
- closeDatabaseHandle(db);
return RollbackResult(false, 0, 0, 0);
}
- errCode = couchstore_db_info(newdb, &info);
+ errCode = couchstore_db_info(newdb.getDb(), &info);
if (errCode != COUCHSTORE_SUCCESS) {
LOG(EXTENSION_LOG_WARNING,
"Failed to read DB info, name=%s",
dbFileName.str().c_str());
- closeDatabaseHandle(db);
- closeDatabaseHandle(newdb);
return RollbackResult(false, 0, 0, 0);
}
}
//Count from latest seq no to rollback seq no
uint64_t rollbackSeqCount = 0;
- errCode = couchstore_changes_count(db, info.last_sequence, latestSeqno,
+ errCode = couchstore_changes_count(db.getDb(), info.last_sequence, latestSeqno,
&rollbackSeqCount);
if (errCode != COUCHSTORE_SUCCESS) {
LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
"rollback vBucket = %d, rev = %" PRIu64 ", error=%s [%s]",
vbid, fileRev, couchstore_strerror(errCode), cb_strerror().c_str());
- closeDatabaseHandle(db);
- closeDatabaseHandle(newdb);
return RollbackResult(false, 0, 0, 0);
}
if ((totSeqCount / 2) <= rollbackSeqCount) {
//doresetVbucket flag set or rollback is greater than 50%,
//reset the vbucket and send the entire snapshot
- closeDatabaseHandle(db);
- closeDatabaseHandle(newdb);
return RollbackResult(false, 0, 0, 0);
}
- cb->setDbHeader(newdb);
-
+ cb->setDbHeader(newdb.getDb());
std::shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
- ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence + 1,
+ ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence+1,
DocumentFilter::ALL_ITEMS,
ValueFilter::KEYS_ONLY);
scan_error_t error = scan(ctx);
destroyScanContext(ctx);
if (error != scan_success) {
- closeDatabaseHandle(db);
- closeDatabaseHandle(newdb);
return RollbackResult(false, 0, 0, 0);
}
- readVBState(newdb, vbid);
+ readVBState(newdb.getDb(), vbid);
cachedDeleteCount[vbid] = info.deleted_count;
cachedDocCount[vbid] = info.doc_count;
- closeDatabaseHandle(db);
- //Append the rewinded header to the database file, before closing handle
- errCode = couchstore_commit(newdb);
- closeDatabaseHandle(newdb);
+ //Append the rewinded header to the database file
+ errCode = couchstore_commit(newdb.getDb());
if (errCode != COUCHSTORE_SUCCESS) {
return RollbackResult(false, 0, 0, 0);
scan_error_t scan(ScanContext* sctx);
void destroyScanContext(ScanContext* ctx);
+private:
+ class DbHolder {
+ public:
+ DbHolder(CouchKVStore* kvs) : kvstore(kvs), db(nullptr) {}
+
+ DbHolder(const DbHolder&) = delete;
+ DbHolder(const DbHolder&&) = delete;
+ DbHolder operator=(const DbHolder&) = delete;
+
+
+ Db** getDbAddress() {
+ return &db;
+ }
+
+ Db* getDb() {
+ return db;
+ }
+
+ ~DbHolder() {
+ if (db) {
+ kvstore->closeDatabaseHandle(db);
+ }
+ }
+ CouchKVStore* kvstore;
+ Db* db;
+ };
protected:
bool setVBucketState(uint16_t vbucketId, const vbucket_state &vbstate,
if (it->isDeleted()) {
LockHolder lh = vb->ht.getLockedBucket(it->getKey(),
&bucket_num);
-
bool ret = vb->ht.unlocked_del(it->getKey(), bucket_num);
if(!ret) {
setStatus(ENGINE_KEY_ENOENT);
EventuallyPersistentEngine& engine;
};
+/*
+ * Purge all unpersisted items from the current checkpoint(s) and fixup
+ * the hashtable for any that are > the rollbackSeqno.
+ */
+void EventuallyPersistentStore::rollbackCheckpoint(RCPtr<VBucket> &vb,
+ uint64_t rollbackSeqno) {
+ std::vector<queued_item> items;
+ vb->checkpointManager.getAllItemsForCursor(CheckpointManager::pCursorName,
+ items);
+ for (const auto& item : items) {
+ if (item->getBySeqno() > rollbackSeqno &&
+ !item->isCheckPointMetaItem()) {
+ RememberingCallback<GetValue> gcb;
+ getROUnderlying(vb->getId())->get(item->getKey(),
+ vb->getId(),
+ gcb);
+ gcb.waitForValue();
+
+ if (gcb.val.getStatus() == ENGINE_SUCCESS) {
+ vb->ht.set(*gcb.val.getValue(), 0, true);
+ } else {
+ vb->ht.del(item->getKey());
+ }
+
+ delete gcb.val.getValue();
+ }
+ }
+}
+
ENGINE_ERROR_CODE
EventuallyPersistentStore::rollback(uint16_t vbid,
uint64_t rollbackSeqno) {
}
RCPtr<VBucket> vb = vbMap.getBucket(vbid);
- uint64_t prevHighSeqno = static_cast<uint64_t>
- (vb->checkpointManager.getHighSeqno());
- if (rollbackSeqno != 0) {
- std::shared_ptr<Rollback> cb(new Rollback(engine));
- KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying();
- RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
-
- if (result.success) {
- vb->failovers->pruneEntries(result.highSeqno);
- vb->checkpointManager.clear(vb, result.highSeqno);
- vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
- vb->incrRollbackItemCount(prevHighSeqno - result.highSeqno);
- return ENGINE_SUCCESS;
+ ReaderLockHolder rlh(vb->getStateLock());
+ if (vb->getState() == vbucket_state_replica) {
+ uint64_t prevHighSeqno = static_cast<uint64_t>
+ (vb->checkpointManager.getHighSeqno());
+ if (rollbackSeqno != 0) {
+ std::shared_ptr<Rollback> cb(new Rollback(engine));
+ KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying();
+ RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
+
+ if (result.success) {
+ rollbackCheckpoint(vb, rollbackSeqno);
+ vb->failovers->pruneEntries(result.highSeqno);
+ vb->checkpointManager.clear(vb, result.highSeqno);
+ vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
+ vb->incrRollbackItemCount(prevHighSeqno - result.highSeqno);
+ return ENGINE_SUCCESS;
+ }
}
- }
- if (resetVBucket(vbid)) {
- RCPtr<VBucket> newVb = vbMap.getBucket(vbid);
- newVb->incrRollbackItemCount(prevHighSeqno);
- return ENGINE_SUCCESS;
+ if (resetVBucket(vbid)) {
+ RCPtr<VBucket> newVb = vbMap.getBucket(vbid);
+ newVb->incrRollbackItemCount(prevHighSeqno);
+ return ENGINE_SUCCESS;
+ }
+ return ENGINE_NOT_MY_VBUCKET;
+ } else {
+ return ENGINE_EINVAL;
}
- return ENGINE_NOT_MY_VBUCKET;
}
void EventuallyPersistentStore::runDefragmenterTask() {
return eviction_policy;
}
+ /*
+ * Request a rollback of the vbucket to the specified seqno.
+ * If the rollbackSeqno is not a checkpoint boundary, then the rollback
+ * will be to the nearest checkpoint.
+ * There are also cases where the rollback will be forced to 0.
+ * various failures or if the rollback is > 50% of the data.
+ *
+ * A check of the vbucket's high-seqno indicates if a rollback request
+ * was not honoured exactly.
+ *
+ * @param vbid The vbucket to rollback
+ * @rollbackSeqno The seqno to rollback to.
+ * @return ENGINE_EINVAL if VB is not replica, ENGINE_NOT_MY_VBUCKET if vbid
+ * is not managed by this instance or ENGINE_SUCCESS.
+ */
ENGINE_ERROR_CODE rollback(uint16_t vbid, uint64_t rollbackSeqno);
void wakeUpItemPager() {
uint16_t decrCommitInterval(uint16_t shardId);
+ /*
+ * Helper method for the rollback function.
+ * Drain the VB's checkpoints looking for items which have a seqno
+ * above the rollbackSeqno and must be rolled back themselves.
+ */
+ void rollbackCheckpoint(RCPtr<VBucket> &vb, uint64_t rollbackSeqno);
+
friend class Warmup;
friend class Flusher;
friend class BGFetchCallback;
#include "evp_store_test.h"
-class RollbackTest : public EventuallyPersistentStoreTest
+class RollbackTest : public EventuallyPersistentStoreTest,
+ public ::testing::WithParamInterface<std::string>
{
void SetUp() override {
EventuallyPersistentStoreTest::SetUp();
}
protected:
+ /**
+ * Test rollback after deleting an item.
+ * @param flush_before_rollback: Should the vbuckt be flushed to disk just
+ * before the rollback (i.e. guaranteeing the in-memory state is in sync
+ * with disk).
+ */
+ void rollback_after_deletion_test(bool flush_before_rollback) {
+ // Setup: Store an item then flush the vBucket (creating a checkpoint);
+ // then delete the item and create a second checkpoint.
+ auto item_v1 = store_item(vbid, "a", "1");
+ ASSERT_EQ(initial_seqno + 1, item_v1.getBySeqno());
+ ASSERT_EQ(1, store->flushVBucket(vbid));
+ uint64_t cas = item_v1.getCas();
+ mutation_descr_t mut_info;
+ ASSERT_EQ(ENGINE_SUCCESS,
+ store->deleteItem("a", &cas, vbid, /*cookie*/nullptr,
+ /*force*/false, /*itemMeta*/nullptr,
+ &mut_info));
+ if (flush_before_rollback) {
+ ASSERT_EQ(1, store->flushVBucket(vbid));
+ }
+ // Sanity-check - item should no longer exist.
+ EXPECT_EQ(ENGINE_KEY_ENOENT,
+ store->get("a", vbid, nullptr, {}).getStatus());
+
+ // Test - rollback to seqno of item_v1 and verify that the previous value
+ // of the item has been restored.
+ store->setVBucketState(vbid, vbucket_state_replica, false);
+ ASSERT_EQ(ENGINE_SUCCESS, store->rollback(vbid, item_v1.getBySeqno()));
+ auto result = store->public_getInternal("a", vbid, /*cookie*/nullptr,
+ vbucket_state_replica, {});
+ ASSERT_EQ(ENGINE_SUCCESS, result.getStatus());
+ EXPECT_EQ(item_v1, *result.getValue())
+ << "Fetched item after rollback should match item_v1";
+ delete result.getValue();
+
+ if (!flush_before_rollback) {
+ EXPECT_EQ(0, store->flushVBucket(vbid));
+ }
+ }
+
+ // Test rollback after modifying an item.
+ void rollback_after_mutation_test(bool flush_before_rollback) {
+ // Setup: Store an item then flush the vBucket (creating a checkpoint);
+ // then update the item with a new value and create a second checkpoint.
+ auto item_v1 = store_item(vbid, "a", "old");
+ ASSERT_EQ(initial_seqno + 1, item_v1.getBySeqno());
+ ASSERT_EQ(1, store->flushVBucket(vbid));
+
+ auto item2 = store_item(vbid, "a", "new");
+ ASSERT_EQ(initial_seqno + 2, item2.getBySeqno());
+
+ store_item(vbid, "key", "meh");
+
+ if (flush_before_rollback) {
+ EXPECT_EQ(2, store->flushVBucket(vbid));
+ }
+
+ // Test - rollback to seqno of item_v1 and verify that the previous value
+ // of the item has been restored.
+ store->setVBucketState(vbid, vbucket_state_replica, false);
+ ASSERT_EQ(ENGINE_SUCCESS, store->rollback(vbid, item_v1.getBySeqno()));
+ ASSERT_EQ(item_v1.getBySeqno(), store->getVBucket(vbid)->getHighSeqno());
+
+ // a should have the value of 'old'
+ {
+ auto result = store->get("a", vbid, nullptr, {});
+ ASSERT_EQ(ENGINE_SUCCESS, result.getStatus());
+ EXPECT_EQ(item_v1, *result.getValue())
+ << "Fetched item after rollback should match item_v1";
+ delete result.getValue();
+ }
+
+ // key should be gone
+ {
+ auto result = store->get("key", vbid, nullptr, {});
+ EXPECT_EQ(ENGINE_KEY_ENOENT, result.getStatus())
+ << "A key set after the rollback point was found";
+ }
+
+ if (!flush_before_rollback) {
+ // The rollback should of wiped out any keys waiting for persistence
+ EXPECT_EQ(0, store->flushVBucket(vbid));
+ }
+ }
+
+ // This test passes, but note that if we warmed up, there is data loss.
+ void rollback_to_middle_test(bool flush_before_rollback) {
+ // create some more checkpoints just to see a few iterations
+ // of parts of the rollback function.
+
+ // need to store a certain number of keys because rollback
+ // 'bails' if the rollback is too much.
+ for (int i = 0; i < 6; i++) {
+ std::string key = "key_" + std::to_string(i);
+ store_item(vbid, key.c_str(), "dontcare");
+ }
+ // the roll back function will rewind disk to key7.
+ auto rollback_item = store_item(vbid, "key7", "dontcare");
+ ASSERT_EQ(7, store->flushVBucket(vbid));
+
+ // every key past this point will be lost from disk in a mid-point.
+ auto item_v1 = store_item(vbid, "rollback-cp-1", "keep-me");
+ auto item_v2 = store_item(vbid, "rollback-cp-2", "rollback to me");
+ store_item(vbid, "rollback-cp-3", "i'm gone");
+ auto rollback = item_v2.getBySeqno(); // ask to rollback to here.
+ ASSERT_EQ(3, store->flushVBucket(vbid));
+
+ for (int i = 0; i < 3; i++) {
+ std::string key = "anotherkey_" + std::to_string(i);
+ store_item(vbid, key.c_str(), "dontcare");
+ }
+
+ if (flush_before_rollback) {
+ ASSERT_EQ(3, store->flushVBucket(vbid));
+ }
+
+
+ // Rollback should succeed, but rollback to 0
+ store->setVBucketState(vbid, vbucket_state_replica, false);
+ EXPECT_EQ(ENGINE_SUCCESS, store->rollback(vbid, rollback));
+
+ // These keys should be gone after the rollback
+ for (int i = 0; i < 3; i++) {
+ std::string key = "rollback-cp-" + std::to_string(i);
+ auto result = store->get(key.c_str(), vbid, nullptr, {});
+ EXPECT_EQ(ENGINE_KEY_ENOENT, result.getStatus())
+ << "A key set after the rollback point was found";
+ }
+
+ // These keys should be gone after the rollback
+ for (int i = 0; i < 3; i++) {
+ std::string key = "anotherkey_" + std::to_string(i);
+ auto result = store->get(key.c_str(), vbid, nullptr, {});
+ EXPECT_EQ(ENGINE_KEY_ENOENT, result.getStatus())
+ << "A key set after the rollback point was found";
+ }
+
+ // Rolled back to the previous checkpoint
+ EXPECT_EQ(rollback_item.getBySeqno(),
+ store->getVBucket(vbid)->getHighSeqno());
+ }
+
+protected:
int64_t initial_seqno;
};
-// Test rollback after modifying an item - regression test for MB-21587.
-TEST_F(RollbackTest, MB21587_RollbackAfterMutation) {
-
- // Setup: Store an item then flush the vBucket (creating a checkpoint);
- // then update the item with a new value and create a second checkpoint.
- auto item_v1 = store_item(vbid, "a", "old");
- ASSERT_EQ(initial_seqno + 1, item_v1.getBySeqno());
- ASSERT_EQ(1, store->flushVBucket(vbid));
-
- auto item2 = store_item(vbid, "a", "new");
- ASSERT_EQ(initial_seqno + 2, item2.getBySeqno());
- ASSERT_EQ(1, store->flushVBucket(vbid));
-
- // Test - rollback to seqno of item_v1 and verify that the previous value
- // of the item has been restored.
- ASSERT_EQ(ENGINE_SUCCESS, store->rollback(vbid, initial_seqno + 1));
- auto result = store->get("a", vbid, nullptr, {});
- EXPECT_EQ(ENGINE_SUCCESS, result.getStatus());
- EXPECT_EQ(item_v1, *result.getValue())
- << "Fetched item after rollback should match item_v1";
- delete result.getValue();
+TEST_P(RollbackTest, RollbackAfterMutation) {
+ rollback_after_mutation_test(/*flush_before_rollbaack*/true);
}
+
+TEST_P(RollbackTest, RollbackAfterMutationNoFlush) {
+ rollback_after_mutation_test(/*flush_before_rollback*/false);
+}
+
+TEST_P(RollbackTest, RollbackAfterDeletion) {
+ rollback_after_deletion_test(/*flush_before_rollback*/true);
+}
+
+TEST_P(RollbackTest, RollbackAfterDeletionNoFlush) {
+ rollback_after_deletion_test(/*flush_before_rollback*/false);
+}
+
+TEST_P(RollbackTest, RollbackToMiddleOfACheckpoint) {
+ rollback_to_middle_test(true);
+}
+
+TEST_P(RollbackTest, RollbackToMiddleOfACheckpointNoFlush) {
+ rollback_to_middle_test(false);
+}
+
+// Test cases which run in both Full and Value eviction
+INSTANTIATE_TEST_CASE_P(FullAndValueEviction,
+ RollbackTest,
+ ::testing::Values("value_only", "full_eviction"),
+ [] (const ::testing::TestParamInfo<std::string>& info) {
+ return info.param;
+ });
void public_stopWarmup() {
stopWarmup();
}
+
+ GetValue public_getInternal(const std::string& key, uint16_t vbucket,
+ const void* cookie, vbucket_state_t allowedState,
+ get_options_t options) {
+ return getInternal(key, vbucket, cookie, allowedState, options);
+ }
};
/* Actual test fixture class */