MB-21568: Reconcile hashtable with disk following rollback 25/69725/16
authorDave Rigby <daver@couchbase.com>
Mon, 7 Nov 2016 06:36:42 +0000 (22:36 -0800)
committerDave Rigby <daver@couchbase.com>
Wed, 16 Nov 2016 18:02:44 +0000 (18:02 +0000)
After rolling back the disk store to the requested seqno a scan of the
vbucket's checkpoint must occur. Any item in the checkpoint with
a seqno > than the rollback must be dropped or rolled back.

+ A missing close is addressed by using a wrapper class that will
  RAII close each file opened in the rollback code.

Change-Id: Iabe43f59ed40931c1c97b65147b7a414d4ff7cc5
Reviewed-on: http://review.couchbase.org/69725
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/couch-kvstore/couch-kvstore.cc
src/couch-kvstore/couch-kvstore.h
src/ep.cc
src/ep.h
tests/module_tests/evp_store_rollback_test.cc
tests/module_tests/evp_store_test.h

index 087378d..c5080b2 100644 (file)
@@ -2267,24 +2267,22 @@ size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
 
 RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
                                       std::shared_ptr<RollbackCB> cb) {
-
-    Db *db = NULL;
+    DbHolder db(this);
     DbInfo info;
     uint64_t fileRev = dbFileRevMap[vbid];
     std::stringstream dbFileName;
     dbFileName << dbname << "/" << vbid << ".couch." << fileRev;
     couchstore_error_t errCode;
 
-    errCode = openDB(vbid, fileRev, &db,
+    errCode = openDB(vbid, fileRev, db.getDbAddress(),
                      (uint64_t) COUCHSTORE_OPEN_FLAG_RDONLY);
 
     if (errCode == COUCHSTORE_SUCCESS) {
-        errCode = couchstore_db_info(db, &info);
+        errCode = couchstore_db_info(db.getDb(), &info);
         if (errCode != COUCHSTORE_SUCCESS) {
             LOG(EXTENSION_LOG_WARNING,
                 "Failed to read DB info, name=%s",
                 dbFileName.str().c_str());
-            closeDatabaseHandle(db);
             return RollbackResult(false, 0, 0, 0);
         }
     } else {
@@ -2298,28 +2296,26 @@ RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
 
     //Count from latest seq no to 0
     uint64_t totSeqCount = 0;
-    errCode = couchstore_changes_count(db, 0, latestSeqno, &totSeqCount);
+    errCode = couchstore_changes_count(db.getDb(), 0, latestSeqno, &totSeqCount);
     if (errCode != COUCHSTORE_SUCCESS) {
         LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
             "rollback vBucket = %d, rev = %" PRIu64 ", error=%s [%s]",
             vbid, fileRev,  couchstore_strerror(errCode),
             cb_strerror().c_str());
-        closeDatabaseHandle(db);
         return RollbackResult(false, 0, 0, 0);
     }
 
-    Db *newdb = NULL;
-    errCode = openDB(vbid, fileRev, &newdb, 0);
+    DbHolder newdb(this);
+    errCode = openDB(vbid, fileRev, newdb.getDbAddress(), 0);
     if (errCode != COUCHSTORE_SUCCESS) {
         LOG(EXTENSION_LOG_WARNING,
                 "Failed to open database, name=%s",
                 dbFileName.str().c_str());
-        closeDatabaseHandle(db);
         return RollbackResult(false, 0, 0, 0);
     }
 
     while (info.last_sequence > rollbackSeqno) {
-        errCode = couchstore_rewind_db_header(newdb);
+        errCode = couchstore_rewind_db_header(newdb.getDb());
         if (errCode != COUCHSTORE_SUCCESS) {
             LOG(EXTENSION_LOG_WARNING,
                     "Failed to rewind Db pointer "
@@ -2330,64 +2326,52 @@ RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
                     couchstore_strerror(errCode), cb_strerror().c_str());
             //Reset the vbucket and send the entire snapshot,
             //as a previous header wasn't found.
-            closeDatabaseHandle(db);
             return RollbackResult(false, 0, 0, 0);
         }
-        errCode = couchstore_db_info(newdb, &info);
+        errCode = couchstore_db_info(newdb.getDb(), &info);
         if (errCode != COUCHSTORE_SUCCESS) {
             LOG(EXTENSION_LOG_WARNING,
                 "Failed to read DB info, name=%s",
                 dbFileName.str().c_str());
-            closeDatabaseHandle(db);
-            closeDatabaseHandle(newdb);
             return RollbackResult(false, 0, 0, 0);
         }
     }
 
     //Count from latest seq no to rollback seq no
     uint64_t rollbackSeqCount = 0;
-    errCode = couchstore_changes_count(db, info.last_sequence, latestSeqno,
+    errCode = couchstore_changes_count(db.getDb(), info.last_sequence, latestSeqno,
                                        &rollbackSeqCount);
     if (errCode != COUCHSTORE_SUCCESS) {
         LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
             "rollback vBucket = %d, rev = %" PRIu64 ", error=%s [%s]",
             vbid, fileRev, couchstore_strerror(errCode), cb_strerror().c_str());
-        closeDatabaseHandle(db);
-        closeDatabaseHandle(newdb);
         return RollbackResult(false, 0, 0, 0);
     }
 
     if ((totSeqCount / 2) <= rollbackSeqCount) {
         //doresetVbucket flag set or rollback is greater than 50%,
         //reset the vbucket and send the entire snapshot
-        closeDatabaseHandle(db);
-        closeDatabaseHandle(newdb);
         return RollbackResult(false, 0, 0, 0);
     }
 
-    cb->setDbHeader(newdb);
-
+    cb->setDbHeader(newdb.getDb());
     std::shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
-    ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence + 1,
+    ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence+1,
                                        DocumentFilter::ALL_ITEMS,
                                        ValueFilter::KEYS_ONLY);
     scan_error_t error = scan(ctx);
     destroyScanContext(ctx);
 
     if (error != scan_success) {
-        closeDatabaseHandle(db);
-        closeDatabaseHandle(newdb);
         return RollbackResult(false, 0, 0, 0);
     }
 
-    readVBState(newdb, vbid);
+    readVBState(newdb.getDb(), vbid);
     cachedDeleteCount[vbid] = info.deleted_count;
     cachedDocCount[vbid] = info.doc_count;
 
-    closeDatabaseHandle(db);
-    //Append the rewinded header to the database file, before closing handle
-    errCode = couchstore_commit(newdb);
-    closeDatabaseHandle(newdb);
+    //Append the rewinded header to the database file
+    errCode = couchstore_commit(newdb.getDb());
 
     if (errCode != COUCHSTORE_SUCCESS) {
         return RollbackResult(false, 0, 0, 0);
index b6b8d70..d6e7ffe 100644 (file)
@@ -486,6 +486,32 @@ public:
     scan_error_t scan(ScanContext* sctx);
 
     void destroyScanContext(ScanContext* ctx);
+private:
+    class DbHolder {
+    public:
+        DbHolder(CouchKVStore* kvs) : kvstore(kvs), db(nullptr) {}
+
+        DbHolder(const DbHolder&) = delete;
+        DbHolder(const DbHolder&&) = delete;
+        DbHolder operator=(const DbHolder&) = delete;
+
+
+        Db** getDbAddress() {
+            return &db;
+        }
+
+        Db* getDb() {
+            return db;
+        }
+
+        ~DbHolder() {
+            if (db) {
+                kvstore->closeDatabaseHandle(db);
+            }
+        }
+        CouchKVStore* kvstore;
+        Db* db;
+    };
 
 protected:
     bool setVBucketState(uint16_t vbucketId, const vbucket_state &vbstate,
index 208d526..7429aeb 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -3922,7 +3922,6 @@ public:
             if (it->isDeleted()) {
                 LockHolder lh = vb->ht.getLockedBucket(it->getKey(),
                         &bucket_num);
-
                 bool ret = vb->ht.unlocked_del(it->getKey(), bucket_num);
                 if(!ret) {
                     setStatus(ENGINE_KEY_ENOENT);
@@ -3959,6 +3958,35 @@ private:
     EventuallyPersistentEngine& engine;
 };
 
+/*
+ * Purge all unpersisted items from the current checkpoint(s) and fixup
+ * the hashtable for any that are > the rollbackSeqno.
+ */
+void EventuallyPersistentStore::rollbackCheckpoint(RCPtr<VBucket> &vb,
+                                                   uint64_t rollbackSeqno) {
+    std::vector<queued_item> items;
+    vb->checkpointManager.getAllItemsForCursor(CheckpointManager::pCursorName,
+                                               items);
+    for (const auto& item : items) {
+        if (item->getBySeqno() > rollbackSeqno &&
+            !item->isCheckPointMetaItem()) {
+            RememberingCallback<GetValue> gcb;
+            getROUnderlying(vb->getId())->get(item->getKey(),
+                                                    vb->getId(),
+                                                    gcb);
+            gcb.waitForValue();
+
+            if (gcb.val.getStatus() == ENGINE_SUCCESS) {
+                vb->ht.set(*gcb.val.getValue(), 0, true);
+            } else {
+                vb->ht.del(item->getKey());
+            }
+
+            delete gcb.val.getValue();
+        }
+    }
+}
+
 ENGINE_ERROR_CODE
 EventuallyPersistentStore::rollback(uint16_t vbid,
                                     uint64_t rollbackSeqno) {
@@ -3968,28 +3996,34 @@ EventuallyPersistentStore::rollback(uint16_t vbid,
     }
 
     RCPtr<VBucket> vb = vbMap.getBucket(vbid);
-    uint64_t prevHighSeqno = static_cast<uint64_t>
-                                    (vb->checkpointManager.getHighSeqno());
-    if (rollbackSeqno != 0) {
-        std::shared_ptr<Rollback> cb(new Rollback(engine));
-        KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying();
-        RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
-
-        if (result.success) {
-            vb->failovers->pruneEntries(result.highSeqno);
-            vb->checkpointManager.clear(vb, result.highSeqno);
-            vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
-            vb->incrRollbackItemCount(prevHighSeqno - result.highSeqno);
-            return ENGINE_SUCCESS;
+    ReaderLockHolder rlh(vb->getStateLock());
+    if (vb->getState() == vbucket_state_replica) {
+        uint64_t prevHighSeqno = static_cast<uint64_t>
+                                        (vb->checkpointManager.getHighSeqno());
+        if (rollbackSeqno != 0) {
+            std::shared_ptr<Rollback> cb(new Rollback(engine));
+            KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying();
+            RollbackResult result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
+
+            if (result.success) {
+                rollbackCheckpoint(vb, rollbackSeqno);
+                vb->failovers->pruneEntries(result.highSeqno);
+                vb->checkpointManager.clear(vb, result.highSeqno);
+                vb->setPersistedSnapshot(result.snapStartSeqno, result.snapEndSeqno);
+                vb->incrRollbackItemCount(prevHighSeqno - result.highSeqno);
+                return ENGINE_SUCCESS;
+            }
         }
-    }
 
-    if (resetVBucket(vbid)) {
-        RCPtr<VBucket> newVb = vbMap.getBucket(vbid);
-        newVb->incrRollbackItemCount(prevHighSeqno);
-        return ENGINE_SUCCESS;
+        if (resetVBucket(vbid)) {
+            RCPtr<VBucket> newVb = vbMap.getBucket(vbid);
+            newVb->incrRollbackItemCount(prevHighSeqno);
+            return ENGINE_SUCCESS;
+        }
+        return ENGINE_NOT_MY_VBUCKET;
+    } else {
+        return ENGINE_EINVAL;
     }
-    return ENGINE_NOT_MY_VBUCKET;
 }
 
 void EventuallyPersistentStore::runDefragmenterTask() {
index 2ab9d46..b78053d 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -825,6 +825,21 @@ public:
         return eviction_policy;
     }
 
+    /*
+     * Request a rollback of the vbucket to the specified seqno.
+     * If the rollbackSeqno is not a checkpoint boundary, then the rollback
+     * will be to the nearest checkpoint.
+     * There are also cases where the rollback will be forced to 0.
+     * various failures or if the rollback is > 50% of the data.
+     *
+     * A check of the vbucket's high-seqno indicates if a rollback request
+     * was not honoured exactly.
+     *
+     * @param vbid The vbucket to rollback
+     * @rollbackSeqno The seqno to rollback to.
+     * @return ENGINE_EINVAL if VB is not replica, ENGINE_NOT_MY_VBUCKET if vbid
+     *         is not managed by this instance or ENGINE_SUCCESS.
+     */
     ENGINE_ERROR_CODE rollback(uint16_t vbid, uint64_t rollbackSeqno);
 
     void wakeUpItemPager() {
@@ -996,6 +1011,13 @@ protected:
 
     uint16_t decrCommitInterval(uint16_t shardId);
 
+    /*
+     * Helper method for the rollback function.
+     * Drain the VB's checkpoints looking for items which have a seqno
+     * above the rollbackSeqno and must be rolled back themselves.
+     */
+    void rollbackCheckpoint(RCPtr<VBucket> &vb, uint64_t rollbackSeqno);
+
     friend class Warmup;
     friend class Flusher;
     friend class BGFetchCallback;
index 13f022d..3aa7424 100644 (file)
@@ -21,7 +21,8 @@
 
 #include "evp_store_test.h"
 
-class RollbackTest : public EventuallyPersistentStoreTest
+class RollbackTest : public EventuallyPersistentStoreTest,
+                     public ::testing::WithParamInterface<std::string>
 {
     void SetUp() override {
         EventuallyPersistentStoreTest::SetUp();
@@ -46,28 +47,181 @@ class RollbackTest : public EventuallyPersistentStoreTest
     }
 
 protected:
+    /**
+     * Test rollback after deleting an item.
+     * @param flush_before_rollback: Should the vbuckt be flushed to disk just
+     *        before the rollback (i.e. guaranteeing the in-memory state is in sync
+     *        with disk).
+     */
+    void rollback_after_deletion_test(bool flush_before_rollback) {
+        // Setup: Store an item then flush the vBucket (creating a checkpoint);
+        // then delete the item and create a second checkpoint.
+        auto item_v1 = store_item(vbid, "a", "1");
+        ASSERT_EQ(initial_seqno + 1, item_v1.getBySeqno());
+        ASSERT_EQ(1, store->flushVBucket(vbid));
+        uint64_t cas = item_v1.getCas();
+        mutation_descr_t mut_info;
+        ASSERT_EQ(ENGINE_SUCCESS,
+                  store->deleteItem("a", &cas, vbid, /*cookie*/nullptr,
+                                    /*force*/false, /*itemMeta*/nullptr,
+                                    &mut_info));
+        if (flush_before_rollback) {
+            ASSERT_EQ(1, store->flushVBucket(vbid));
+        }
+        // Sanity-check - item should no longer exist.
+        EXPECT_EQ(ENGINE_KEY_ENOENT,
+                  store->get("a", vbid, nullptr, {}).getStatus());
+
+        // Test - rollback to seqno of item_v1 and verify that the previous value
+        // of the item has been restored.
+        store->setVBucketState(vbid, vbucket_state_replica, false);
+        ASSERT_EQ(ENGINE_SUCCESS, store->rollback(vbid, item_v1.getBySeqno()));
+        auto result = store->public_getInternal("a", vbid, /*cookie*/nullptr,
+                                                vbucket_state_replica, {});
+        ASSERT_EQ(ENGINE_SUCCESS, result.getStatus());
+        EXPECT_EQ(item_v1, *result.getValue())
+            << "Fetched item after rollback should match item_v1";
+        delete result.getValue();
+
+        if (!flush_before_rollback) {
+            EXPECT_EQ(0, store->flushVBucket(vbid));
+        }
+    }
+
+    // Test rollback after modifying an item.
+    void rollback_after_mutation_test(bool flush_before_rollback) {
+        // Setup: Store an item then flush the vBucket (creating a checkpoint);
+        // then update the item with a new value and create a second checkpoint.
+        auto item_v1 = store_item(vbid, "a", "old");
+        ASSERT_EQ(initial_seqno + 1, item_v1.getBySeqno());
+        ASSERT_EQ(1, store->flushVBucket(vbid));
+
+        auto item2 = store_item(vbid, "a", "new");
+        ASSERT_EQ(initial_seqno + 2, item2.getBySeqno());
+
+        store_item(vbid, "key", "meh");
+
+        if (flush_before_rollback) {
+            EXPECT_EQ(2, store->flushVBucket(vbid));
+        }
+
+        // Test - rollback to seqno of item_v1 and verify that the previous value
+        // of the item has been restored.
+        store->setVBucketState(vbid, vbucket_state_replica, false);
+        ASSERT_EQ(ENGINE_SUCCESS, store->rollback(vbid, item_v1.getBySeqno()));
+        ASSERT_EQ(item_v1.getBySeqno(), store->getVBucket(vbid)->getHighSeqno());
+
+        // a should have the value of 'old'
+        {
+            auto result = store->get("a", vbid, nullptr, {});
+            ASSERT_EQ(ENGINE_SUCCESS, result.getStatus());
+            EXPECT_EQ(item_v1, *result.getValue())
+                << "Fetched item after rollback should match item_v1";
+            delete result.getValue();
+        }
+
+        // key should be gone
+        {
+            auto result = store->get("key", vbid, nullptr, {});
+            EXPECT_EQ(ENGINE_KEY_ENOENT, result.getStatus())
+                << "A key set after the rollback point was found";
+        }
+
+        if (!flush_before_rollback) {
+            // The rollback should of wiped out any keys waiting for persistence
+            EXPECT_EQ(0, store->flushVBucket(vbid));
+        }
+    }
+
+    // This test passes, but note that if we warmed up, there is data loss.
+    void rollback_to_middle_test(bool flush_before_rollback) {
+        // create some more checkpoints just to see a few iterations
+        // of parts of the rollback function.
+
+        // need to store a certain number of keys because rollback
+        // 'bails' if the rollback is too much.
+        for (int i = 0; i < 6; i++) {
+            std::string key = "key_" + std::to_string(i);
+            store_item(vbid, key.c_str(), "dontcare");
+        }
+        // the roll back function will rewind disk to key7.
+        auto rollback_item = store_item(vbid, "key7", "dontcare");
+        ASSERT_EQ(7, store->flushVBucket(vbid));
+
+        // every key past this point will be lost from disk in a mid-point.
+        auto item_v1 = store_item(vbid, "rollback-cp-1", "keep-me");
+        auto item_v2 = store_item(vbid, "rollback-cp-2", "rollback to me");
+        store_item(vbid, "rollback-cp-3", "i'm gone");
+        auto rollback = item_v2.getBySeqno(); // ask to rollback to here.
+        ASSERT_EQ(3, store->flushVBucket(vbid));
+
+        for (int i = 0; i < 3; i++) {
+            std::string key = "anotherkey_" + std::to_string(i);
+            store_item(vbid, key.c_str(), "dontcare");
+        }
+
+        if (flush_before_rollback) {
+            ASSERT_EQ(3, store->flushVBucket(vbid));
+        }
+
+
+        // Rollback should succeed, but rollback to 0
+        store->setVBucketState(vbid, vbucket_state_replica, false);
+        EXPECT_EQ(ENGINE_SUCCESS, store->rollback(vbid, rollback));
+
+        // These keys should be gone after the rollback
+        for (int i = 0; i < 3; i++) {
+            std::string key = "rollback-cp-" + std::to_string(i);
+            auto result = store->get(key.c_str(), vbid, nullptr, {});
+            EXPECT_EQ(ENGINE_KEY_ENOENT, result.getStatus())
+                << "A key set after the rollback point was found";
+        }
+
+        // These keys should be gone after the rollback
+        for (int i = 0; i < 3; i++) {
+            std::string key = "anotherkey_" + std::to_string(i);
+            auto result = store->get(key.c_str(), vbid, nullptr, {});
+            EXPECT_EQ(ENGINE_KEY_ENOENT, result.getStatus())
+                << "A key set after the rollback point was found";
+        }
+
+        // Rolled back to the previous checkpoint
+        EXPECT_EQ(rollback_item.getBySeqno(),
+                  store->getVBucket(vbid)->getHighSeqno());
+    }
+
+protected:
     int64_t initial_seqno;
 };
 
-// Test rollback after modifying an item - regression test for MB-21587.
-TEST_F(RollbackTest, MB21587_RollbackAfterMutation) {
-
-    // Setup: Store an item then flush the vBucket (creating a checkpoint);
-    // then update the item with a new value and create a second checkpoint.
-    auto item_v1 = store_item(vbid, "a", "old");
-    ASSERT_EQ(initial_seqno + 1, item_v1.getBySeqno());
-    ASSERT_EQ(1, store->flushVBucket(vbid));
-
-    auto item2 = store_item(vbid, "a", "new");
-    ASSERT_EQ(initial_seqno + 2, item2.getBySeqno());
-    ASSERT_EQ(1, store->flushVBucket(vbid));
-
-    // Test - rollback to seqno of item_v1 and verify that the previous value
-    // of the item has been restored.
-    ASSERT_EQ(ENGINE_SUCCESS, store->rollback(vbid, initial_seqno + 1));
-    auto result = store->get("a", vbid, nullptr, {});
-    EXPECT_EQ(ENGINE_SUCCESS, result.getStatus());
-    EXPECT_EQ(item_v1, *result.getValue())
-        << "Fetched item after rollback should match item_v1";
-    delete result.getValue();
+TEST_P(RollbackTest, RollbackAfterMutation) {
+    rollback_after_mutation_test(/*flush_before_rollbaack*/true);
 }
+
+TEST_P(RollbackTest, RollbackAfterMutationNoFlush) {
+    rollback_after_mutation_test(/*flush_before_rollback*/false);
+}
+
+TEST_P(RollbackTest, RollbackAfterDeletion) {
+    rollback_after_deletion_test(/*flush_before_rollback*/true);
+}
+
+TEST_P(RollbackTest, RollbackAfterDeletionNoFlush) {
+    rollback_after_deletion_test(/*flush_before_rollback*/false);
+}
+
+TEST_P(RollbackTest, RollbackToMiddleOfACheckpoint) {
+    rollback_to_middle_test(true);
+}
+
+TEST_P(RollbackTest, RollbackToMiddleOfACheckpointNoFlush) {
+    rollback_to_middle_test(false);
+}
+
+// Test cases which run in both Full and Value eviction
+INSTANTIATE_TEST_CASE_P(FullAndValueEviction,
+                        RollbackTest,
+                        ::testing::Values("value_only", "full_eviction"),
+                        [] (const ::testing::TestParamInfo<std::string>& info) {
+                            return info.param;
+                        });
index ac803d2..2945e90 100644 (file)
@@ -82,6 +82,12 @@ public:
     void public_stopWarmup() {
         stopWarmup();
     }
+
+    GetValue public_getInternal(const std::string& key, uint16_t vbucket,
+                                const void* cookie, vbucket_state_t allowedState,
+                                get_options_t options) {
+        return getInternal(key, vbucket, cookie, allowedState, options);
+    }
 };
 
 /* Actual test fixture class */