MB-23211: Allow the expiry callback to carry full item 61/75861/37
authorSriram Ganesan <sriram@couchbase.com>
Tue, 28 Mar 2017 09:36:22 +0000 (11:36 +0200)
committerSriram Ganesan <sriram@couchbase.com>
Wed, 3 May 2017 14:33:11 +0000 (14:33 +0000)
The expiry callback needs to carry the whole item so that in the
case of full eviction, the system xattrs can be retained in the
body after deleting the rest of the body

Change-Id: Id3cb613217f4882a0f0400c01318bb2efc58b8aa
Reviewed-on: http://review.couchbase.org/75861
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
12 files changed:
src/couch-kvstore/couch-kvstore.cc
src/ep_engine.cc
src/item_pager.cc
src/kv_bucket.cc
src/kv_bucket.h
src/kv_bucket_iface.h
src/kvstore.h
src/vbucket.cc
src/vbucket.h
tests/ep_testsuite.cc
tests/module_tests/evp_store_single_threaded_test.cc
tests/module_tests/evp_store_test.cc

index 1720059..a2629e5 100644 (file)
@@ -741,7 +741,64 @@ static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
     return 1;
 }
 
-static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
+/**
+ * Notify the expiry callback that a document has expired
+ *
+ * @param info     document information for the expired item
+ * @param metadata metadata of the document
+ * @param item     buffer containing data and size
+ * @param ctx      context for compaction
+ * @param currtime current time
+ */
+static int notify_expired_item(DocInfo& info,
+                               MetaData& metadata,
+                               sized_buf item,
+                               compaction_ctx& ctx,
+                               time_t currtime) {
+    std::array<uint8_t, 1> ext_meta = {{metadata.getDataType()}};
+    cb::char_buffer data;
+    cb::compression::Buffer inflated;
+
+    if (mcbp::datatype::is_xattr(metadata.getDataType())) {
+        if (item.buf == nullptr) {
+            // We need to pass on the entire document to the callback
+            return COUCHSTORE_COMPACT_NEED_BODY;
+        }
+
+        if (info.content_meta | COUCH_DOC_IS_COMPRESSED) {
+            using namespace cb::compression;
+
+            if (!inflate(Algorithm::Snappy,
+                         item.buf, item.size, inflated)) {
+                LOG(EXTENSION_LOG_WARNING,
+                    "time_purge_hook: failed to inflate document with seqno %" PRIu64 ""
+                    "revno: %" PRIu64, info.db_seq, info.rev_seq);
+                return COUCHSTORE_ERROR_CORRUPT;
+            }
+            data = {inflated.data.get(),inflated.len};
+        }
+    }
+
+    // Collections: TODO: Restore to stored namespace
+    Item it(makeDocKey(info.id, ctx.config->shouldPersistDocNamespace()),
+            metadata.getFlags(),
+            metadata.getExptime(),
+            data.buf,
+            data.len,
+            &ext_meta[0],
+            EXT_META_LEN,
+            metadata.getCas(),
+            info.db_seq,
+            ctx.db_file_id,
+            info.rev_seq);
+
+    it.setRevSeqno(info.rev_seq);
+    ctx.expiryCallback->callback(it, currtime);
+
+    return COUCHSTORE_SUCCESS;
+}
+
+static int time_purge_hook(Db* d, DocInfo* info, sized_buf item, void* ctx_p) {
     compaction_ctx* ctx = static_cast<compaction_ctx*>(ctx_p);
     const uint16_t vbid = ctx->db_file_id;
 
@@ -791,12 +848,19 @@ static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
         } else {
             time_t currtime = ep_real_time();
             if (exptime && exptime < currtime) {
-                // Collections: TODO: Permanently restore to stored namespace
-                DocKey key = makeDocKey(
-                        info->id,
-                        ctx->config->shouldPersistDocNamespace());
-                ctx->expiryCallback->callback(ctx->db_file_id, key,
-                                              info->rev_seq, currtime);
+                int ret;
+                try {
+                    ret = notify_expired_item(*info, *metadata, item,
+                                             *ctx, currtime);
+                } catch (const std::bad_alloc&) {
+                    LOG(EXTENSION_LOG_WARNING,
+                        "time_purge_hook: memory allocation failed");
+                    return COUCHSTORE_ERROR_ALLOC_FAIL;
+                }
+
+                if (ret != COUCHSTORE_SUCCESS) {
+                    return ret;
+                }
             }
         }
     }
index 97c2c15..fc54487 100644 (file)
@@ -2179,7 +2179,8 @@ cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
                                                   TRACK_REFERENCE |
                                                   DELETE_TEMP |
                                                   HIDE_LOCKED_CAS |
-                                                  ALLOW_META_ONLY);
+                                                  ALLOW_META_ONLY |
+                                                  GET_DELETED_VALUE);
         if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
             options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
         }
index d4f2483..c670058 100644 (file)
@@ -78,8 +78,8 @@ public:
         bool isExpired = (currentBucket->getState() == vbucket_state_active) &&
             v->isExpired(startTime) && !v->isDeleted();
         if (isExpired || v->isTempNonExistentItem() || v->isTempDeletedItem()) {
-            expired.push_back(std::make_pair(currentBucket->getId(),
-                                             StoredDocKey(v->getKey())));
+            std::unique_ptr<Item> it = v->toItem(false, currentBucket->getId());
+            expired.push_back(*it.get());
             return;
         }
 
@@ -238,7 +238,7 @@ private:
         }
     }
 
-    std::list<std::pair<uint16_t, StoredDocKey> > expired;
+    std::list<Item> expired;
 
     KVBucketIface& store;
     EPStats &stats;
index de7f965..6ab13d1 100644 (file)
@@ -300,17 +300,14 @@ bool BloomFilterCallback::initTempFilter(uint16_t vbucketId) {
     return true;
 }
 
-class ExpiredItemsCallback : public Callback<uint16_t&, const DocKey&, uint64_t&,
-                                             time_t&> {
+class ExpiredItemsCallback : public Callback<Item&, time_t&> {
     public:
         ExpiredItemsCallback(KVBucket& store)
             : epstore(store) { }
 
-        void callback(uint16_t& vbid, const DocKey& key, uint64_t& revSeqno,
-                      time_t& startTime) {
+        void callback(Item& it, time_t& startTime) {
             if (epstore.compactionCanExpireItems()) {
-                epstore.deleteExpiredItem(
-                        vbid, key, startTime, revSeqno, ExpireBy::Compactor);
+                epstore.deleteExpiredItem(it, startTime, ExpireBy::Compactor);
             }
         }
 
@@ -559,28 +556,39 @@ protocol_binary_response_status KVBucket::evictKey(const DocKey& key,
     return vb->evictKey(key, msg);
 }
 
-void KVBucket::deleteExpiredItem(uint16_t vbid,
-                                 const DocKey& key,
+void KVBucket::deleteExpiredItem(Item& it,
                                  time_t startTime,
-                                 uint64_t revSeqno,
                                  ExpireBy source) {
-    VBucketPtr vb = getVBucket(vbid);
+    VBucketPtr vb = getVBucket(it.getVBucketId());
+
     if (vb) {
+        auto info = it.toItemInfo(vb->failovers->getLatestUUID());
+        if (engine.getServerApi()->document->pre_expiry(info)) {
+            // The payload is modified and contains data we should use
+            value_t value(Blob::New(static_cast<char*>(info.value[0].iov_base),
+                                    info.value[0].iov_len,
+                                    &info.datatype, 1));
+            it.setValue(value);
+        } else {
+            // We should drop the entire body
+            it.setValue({});
+        }
+
         // Obtain reader access to the VB state change lock so that
         // the VB can't switch state whilst we're processing
         ReaderLockHolder rlh(vb->getStateLock());
         if (vb->getState() == vbucket_state_active) {
-            vb->deleteExpiredItem(key, startTime, revSeqno, source);
+            vb->deleteExpiredItem(it, startTime, source);
         }
     }
 }
 
 void KVBucket::deleteExpiredItems(
-        std::list<std::pair<uint16_t, StoredDocKey>>& keys, ExpireBy source) {
+        std::list<Item>& itms, ExpireBy source) {
     std::list<std::pair<uint16_t, std::string> >::iterator it;
     time_t startTime = ep_real_time();
-    for (const auto& it : keys) {
-        deleteExpiredItem(it.first, it.second, startTime, 0, source);
+    for (auto& it : itms) {
+        deleteExpiredItem(it, startTime, source);
     }
 }
 
index 76262b8..5283021 100644 (file)
@@ -505,9 +505,8 @@ public:
                                              VBucket::id_type vbucket,
                                              const char** msg);
 
-    void deleteExpiredItem(uint16_t, const DocKey&, time_t, uint64_t, ExpireBy);
-    void deleteExpiredItems(std::list<std::pair<uint16_t, StoredDocKey>>&,
-                            ExpireBy);
+    void deleteExpiredItem(Item& it, time_t startTime, ExpireBy source);
+    void deleteExpiredItems(std::list<Item>&, ExpireBy);
 
     /**
      * Get the memoized storage properties from the DB.kv
index 9a7c610..b3d0422 100644 (file)
@@ -624,10 +624,11 @@ public:
 
     virtual KVStore* getROUnderlying(uint16_t vbId) = 0;
 
-    virtual void deleteExpiredItem(
-            uint16_t, const DocKey&, time_t, uint64_t, ExpireBy) = 0;
-    virtual void deleteExpiredItems(
-            std::list<std::pair<uint16_t, StoredDocKey>>&, ExpireBy) = 0;
+    virtual void deleteExpiredItem(Item& it,
+                                   time_t startTime,
+                                   ExpireBy source) = 0;
+
+    virtual void deleteExpiredItems(std::list<Item>&, ExpireBy) = 0;
 
     /**
      * Get the memoized storage properties from the DB.kv
index 2620e7a..bbd3a10 100644 (file)
@@ -109,7 +109,7 @@ typedef struct {
 typedef uint16_t DBFileId;
 
 typedef std::shared_ptr<Callback<uint16_t&, const DocKey&, bool&> > BloomFilterCBPtr;
-typedef std::shared_ptr<Callback<uint16_t&, const DocKey&, uint64_t&, time_t&> > ExpiredItemsCBPtr;
+typedef std::shared_ptr<Callback<Item&, time_t&> > ExpiredItemsCBPtr;
 
 class KVStoreConfig;
 typedef struct {
index 6f6ff9a..9500426 100644 (file)
@@ -1298,17 +1298,23 @@ ENGINE_ERROR_CODE VBucket::deleteWithMeta(const DocKey& key,
     return ENGINE_SUCCESS;
 }
 
-void VBucket::deleteExpiredItem(const DocKey& key,
+void VBucket::deleteExpiredItem(const Item& it,
                                 time_t startTime,
-                                uint64_t revSeqno,
                                 ExpireBy source) {
+
+    // The item is correctly trimmed (by the caller). Fetch the one in the
+    // hashtable and replace it if the CAS match (same item; no race).
+    // If not found in the hashtable we should add it as a deleted item
+    const DocKey& key = it.getKey();
     auto hbl = ht.getLockedBucket(key);
     StoredValue* v = ht.unlocked_find(
             key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
     if (v) {
+        if (v->getCas() != it.getCas()) {
+            return;
+        }
+
         if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
-            // This is a temporary item whose background fetch for metadata
-            // has completed.
             bool deleted = deleteStoredValue(hbl, *v);
             if (!deleted) {
                 throw std::logic_error(
@@ -1318,7 +1324,6 @@ void VBucket::deleteExpiredItem(const DocKey& key,
                         std::to_string(hbl.getBucketNum()));
             }
         } else if (v->isExpired(startTime) && !v->isDeleted()) {
-            handlePreExpiry(*v);
             VBNotifyCtx notifyCtx;
             std::tie(std::ignore, std::ignore, notifyCtx) =
                     processExpiredItem(hbl, *v);
@@ -1342,7 +1347,8 @@ void VBucket::deleteExpiredItem(const DocKey& key,
                                      WantsDeleted::Yes,
                                      TrackReference::No);
                 v->setDeleted();
-                v->setRevSeqno(revSeqno);
+                v->setRevSeqno(it.getRevSeqno());
+                v->setValue(it, ht);
                 VBNotifyCtx notifyCtx;
                 std::tie(std::ignore, std::ignore, notifyCtx) =
                         processExpiredItem(hbl, *v);
index f64116e..84aa25a 100644 (file)
@@ -932,14 +932,12 @@ public:
     /**
      * Delete an expired item
      *
-     * @param key key to be deleted
+     * @param it item to be deleted
      * @param startTime the time to be compared with this item's expiry time
-     * @param revSeqno revision id sequence number
      * @param source Expiry source
      */
-    void deleteExpiredItem(const DocKey& key,
+    void deleteExpiredItem(const Item& it,
                            time_t startTime,
-                           uint64_t revSeqno,
                            ExpireBy source);
 
     /**
index d3a927d..9ccf959 100644 (file)
@@ -7867,7 +7867,6 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("test MB-16421", test_mb16421,
                  test_setup, teardown, "item_eviction_policy=full_eviction",
                  prepare_full_eviction, cleanup),
-
         TestCase("test eviction with xattr", test_eviction_with_xattr,
                  test_setup, teardown, "item_eviction_policy=full_eviction",
                  prepare, cleanup),
index 9925665..0f6dc3f 100644 (file)
@@ -1068,9 +1068,9 @@ TEST_F(SingleThreadedEPBucketTest, pre_expiry_xattrs) {
                          deleted, datatype);
     auto prev_revseqno = metadata.revSeqno;
     EXPECT_EQ(1, prev_revseqno) << "Unexpected revision sequence number";
+    itm.setRevSeqno(1);
+    kvbucket.deleteExpiredItem(itm, ep_real_time() + 1, ExpireBy::Pager);
 
-    kvbucket.deleteExpiredItem(vbid, makeStoredDocKey("key"),
-                               ep_real_time() + 1, 1, ExpireBy::Pager);
     get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
                                                        HONOR_STATES |
                                                        TRACK_REFERENCE |
index 4ac54c1..82d6761 100644 (file)
 #include "tests/module_tests/test_helpers.h"
 #include "vbucketdeletiontask.h"
 
+#include <string_utilities.h>
+#include <xattr/blob.h>
+#include <xattr/utils.h>
+
 #include <thread>
 
 // Verify that when handling a bucket delete with open DCP
@@ -398,6 +402,64 @@ TEST_P(EPStoreEvictionTest, TouchCmdDuringBgFetch) {
               store->getVBucket(vbid)->getHighSeqno());
 }
 
+TEST_P(EPStoreEvictionTest, xattrExpiryOnFullyEvictedItem) {
+    if (GetParam() == "value_only") {
+        return;
+    }
+
+    cb::xattr::Blob builder;
+
+    //Add a few values
+    builder.set(to_const_byte_buffer("_meta"),
+                to_const_byte_buffer("{\"rev\":10}"));
+    builder.set(to_const_byte_buffer("foo"),
+                to_const_byte_buffer("{\"blob\":true}"));
+
+    auto blob = builder.finalize();
+    auto blob_data = to_string(blob);
+    auto itm = store_item(vbid,
+                          makeStoredDocKey("key"),
+                          blob_data,
+                          0,
+                          {cb::engine_errc::success},
+                          (PROTOCOL_BINARY_DATATYPE_JSON |
+                           PROTOCOL_BINARY_DATATYPE_XATTR));
+
+    GetValue gv = store->getAndUpdateTtl(makeStoredDocKey("key"), vbid, cookie,
+                                         time(NULL) + 120);
+    EXPECT_EQ(ENGINE_SUCCESS, gv.getStatus());
+    std::unique_ptr<Item> get_itm(gv.getValue());
+
+    flush_vbucket_to_disk(vbid);
+    evict_key(vbid, makeStoredDocKey("key"));
+    store->deleteExpiredItem(itm, time(NULL) + 121, ExpireBy::Compactor);
+
+    get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
+                                                       HONOR_STATES |
+                                                       TRACK_REFERENCE |
+                                                       DELETE_TEMP |
+                                                       HIDE_LOCKED_CAS |
+                                                       TRACK_STATISTICS |
+                                                       GET_DELETED_VALUE);
+
+    gv = store->get(makeStoredDocKey("key"), vbid, cookie, options);
+    EXPECT_EQ(ENGINE_SUCCESS, gv.getStatus());
+
+    get_itm.reset(gv.getValue());
+    auto get_data = const_cast<char*>(get_itm->getData());
+    EXPECT_EQ(PROTOCOL_BINARY_DATATYPE_XATTR, get_itm->getDataType())
+              << "Unexpected Datatype";
+
+    cb::byte_buffer value_buf{reinterpret_cast<uint8_t*>(get_data),
+                              get_itm->getNBytes()};
+    cb::xattr::Blob new_blob(value_buf);
+
+    const std::string& rev_str{"{\"rev\":10}"};
+    const std::string& meta_str = to_string(new_blob.get(to_const_byte_buffer("_meta")));
+
+    EXPECT_EQ(rev_str, meta_str) << "Unexpected system xattrs";
+}
+
 // Test cases which run in both Full and Value eviction
 INSTANTIATE_TEST_CASE_P(FullAndValueEviction,
                         EPStoreEvictionTest,