return 1;
}
-static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
+/**
+ * Notify the expiry callback that a document has expired
+ *
+ * @param info document information for the expired item
+ * @param metadata metadata of the document
+ * @param item buffer containing data and size
+ * @param ctx context for compaction
+ * @param currtime current time
+ */
+static int notify_expired_item(DocInfo& info,
+ MetaData& metadata,
+ sized_buf item,
+ compaction_ctx& ctx,
+ time_t currtime) {
+ std::array<uint8_t, 1> ext_meta = {{metadata.getDataType()}};
+ cb::char_buffer data;
+ cb::compression::Buffer inflated;
+
+ if (mcbp::datatype::is_xattr(metadata.getDataType())) {
+ if (item.buf == nullptr) {
+ // We need to pass on the entire document to the callback
+ return COUCHSTORE_COMPACT_NEED_BODY;
+ }
+
+ if (info.content_meta | COUCH_DOC_IS_COMPRESSED) {
+ using namespace cb::compression;
+
+ if (!inflate(Algorithm::Snappy,
+ item.buf, item.size, inflated)) {
+ LOG(EXTENSION_LOG_WARNING,
+ "time_purge_hook: failed to inflate document with seqno %" PRIu64 ""
+ "revno: %" PRIu64, info.db_seq, info.rev_seq);
+ return COUCHSTORE_ERROR_CORRUPT;
+ }
+ data = {inflated.data.get(),inflated.len};
+ }
+ }
+
+ // Collections: TODO: Restore to stored namespace
+ Item it(makeDocKey(info.id, ctx.config->shouldPersistDocNamespace()),
+ metadata.getFlags(),
+ metadata.getExptime(),
+ data.buf,
+ data.len,
+ &ext_meta[0],
+ EXT_META_LEN,
+ metadata.getCas(),
+ info.db_seq,
+ ctx.db_file_id,
+ info.rev_seq);
+
+ it.setRevSeqno(info.rev_seq);
+ ctx.expiryCallback->callback(it, currtime);
+
+ return COUCHSTORE_SUCCESS;
+}
+
+static int time_purge_hook(Db* d, DocInfo* info, sized_buf item, void* ctx_p) {
compaction_ctx* ctx = static_cast<compaction_ctx*>(ctx_p);
const uint16_t vbid = ctx->db_file_id;
} else {
time_t currtime = ep_real_time();
if (exptime && exptime < currtime) {
- // Collections: TODO: Permanently restore to stored namespace
- DocKey key = makeDocKey(
- info->id,
- ctx->config->shouldPersistDocNamespace());
- ctx->expiryCallback->callback(ctx->db_file_id, key,
- info->rev_seq, currtime);
+ int ret;
+ try {
+ ret = notify_expired_item(*info, *metadata, item,
+ *ctx, currtime);
+ } catch (const std::bad_alloc&) {
+ LOG(EXTENSION_LOG_WARNING,
+ "time_purge_hook: memory allocation failed");
+ return COUCHSTORE_ERROR_ALLOC_FAIL;
+ }
+
+ if (ret != COUCHSTORE_SUCCESS) {
+ return ret;
+ }
}
}
}
TRACK_REFERENCE |
DELETE_TEMP |
HIDE_LOCKED_CAS |
- ALLOW_META_ONLY);
+ ALLOW_META_ONLY |
+ GET_DELETED_VALUE);
if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
}
bool isExpired = (currentBucket->getState() == vbucket_state_active) &&
v->isExpired(startTime) && !v->isDeleted();
if (isExpired || v->isTempNonExistentItem() || v->isTempDeletedItem()) {
- expired.push_back(std::make_pair(currentBucket->getId(),
- StoredDocKey(v->getKey())));
+ std::unique_ptr<Item> it = v->toItem(false, currentBucket->getId());
+ expired.push_back(*it.get());
return;
}
}
}
- std::list<std::pair<uint16_t, StoredDocKey> > expired;
+ std::list<Item> expired;
KVBucketIface& store;
EPStats &stats;
return true;
}
-class ExpiredItemsCallback : public Callback<uint16_t&, const DocKey&, uint64_t&,
- time_t&> {
+class ExpiredItemsCallback : public Callback<Item&, time_t&> {
public:
ExpiredItemsCallback(KVBucket& store)
: epstore(store) { }
- void callback(uint16_t& vbid, const DocKey& key, uint64_t& revSeqno,
- time_t& startTime) {
+ void callback(Item& it, time_t& startTime) {
if (epstore.compactionCanExpireItems()) {
- epstore.deleteExpiredItem(
- vbid, key, startTime, revSeqno, ExpireBy::Compactor);
+ epstore.deleteExpiredItem(it, startTime, ExpireBy::Compactor);
}
}
return vb->evictKey(key, msg);
}
-void KVBucket::deleteExpiredItem(uint16_t vbid,
- const DocKey& key,
+void KVBucket::deleteExpiredItem(Item& it,
time_t startTime,
- uint64_t revSeqno,
ExpireBy source) {
- VBucketPtr vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(it.getVBucketId());
+
if (vb) {
+ auto info = it.toItemInfo(vb->failovers->getLatestUUID());
+ if (engine.getServerApi()->document->pre_expiry(info)) {
+ // The payload is modified and contains data we should use
+ value_t value(Blob::New(static_cast<char*>(info.value[0].iov_base),
+ info.value[0].iov_len,
+ &info.datatype, 1));
+ it.setValue(value);
+ } else {
+ // We should drop the entire body
+ it.setValue({});
+ }
+
// Obtain reader access to the VB state change lock so that
// the VB can't switch state whilst we're processing
ReaderLockHolder rlh(vb->getStateLock());
if (vb->getState() == vbucket_state_active) {
- vb->deleteExpiredItem(key, startTime, revSeqno, source);
+ vb->deleteExpiredItem(it, startTime, source);
}
}
}
void KVBucket::deleteExpiredItems(
- std::list<std::pair<uint16_t, StoredDocKey>>& keys, ExpireBy source) {
+ std::list<Item>& itms, ExpireBy source) {
std::list<std::pair<uint16_t, std::string> >::iterator it;
time_t startTime = ep_real_time();
- for (const auto& it : keys) {
- deleteExpiredItem(it.first, it.second, startTime, 0, source);
+ for (auto& it : itms) {
+ deleteExpiredItem(it, startTime, source);
}
}
VBucket::id_type vbucket,
const char** msg);
- void deleteExpiredItem(uint16_t, const DocKey&, time_t, uint64_t, ExpireBy);
- void deleteExpiredItems(std::list<std::pair<uint16_t, StoredDocKey>>&,
- ExpireBy);
+ void deleteExpiredItem(Item& it, time_t startTime, ExpireBy source);
+ void deleteExpiredItems(std::list<Item>&, ExpireBy);
/**
* Get the memoized storage properties from the DB.kv
virtual KVStore* getROUnderlying(uint16_t vbId) = 0;
- virtual void deleteExpiredItem(
- uint16_t, const DocKey&, time_t, uint64_t, ExpireBy) = 0;
- virtual void deleteExpiredItems(
- std::list<std::pair<uint16_t, StoredDocKey>>&, ExpireBy) = 0;
+ virtual void deleteExpiredItem(Item& it,
+ time_t startTime,
+ ExpireBy source) = 0;
+
+ virtual void deleteExpiredItems(std::list<Item>&, ExpireBy) = 0;
/**
* Get the memoized storage properties from the DB.kv
typedef uint16_t DBFileId;
typedef std::shared_ptr<Callback<uint16_t&, const DocKey&, bool&> > BloomFilterCBPtr;
-typedef std::shared_ptr<Callback<uint16_t&, const DocKey&, uint64_t&, time_t&> > ExpiredItemsCBPtr;
+typedef std::shared_ptr<Callback<Item&, time_t&> > ExpiredItemsCBPtr;
class KVStoreConfig;
typedef struct {
return ENGINE_SUCCESS;
}
-void VBucket::deleteExpiredItem(const DocKey& key,
+void VBucket::deleteExpiredItem(const Item& it,
time_t startTime,
- uint64_t revSeqno,
ExpireBy source) {
+
+ // The item is correctly trimmed (by the caller). Fetch the one in the
+ // hashtable and replace it if the CAS match (same item; no race).
+ // If not found in the hashtable we should add it as a deleted item
+ const DocKey& key = it.getKey();
auto hbl = ht.getLockedBucket(key);
StoredValue* v = ht.unlocked_find(
key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
if (v) {
+ if (v->getCas() != it.getCas()) {
+ return;
+ }
+
if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
- // This is a temporary item whose background fetch for metadata
- // has completed.
bool deleted = deleteStoredValue(hbl, *v);
if (!deleted) {
throw std::logic_error(
std::to_string(hbl.getBucketNum()));
}
} else if (v->isExpired(startTime) && !v->isDeleted()) {
- handlePreExpiry(*v);
VBNotifyCtx notifyCtx;
std::tie(std::ignore, std::ignore, notifyCtx) =
processExpiredItem(hbl, *v);
WantsDeleted::Yes,
TrackReference::No);
v->setDeleted();
- v->setRevSeqno(revSeqno);
+ v->setRevSeqno(it.getRevSeqno());
+ v->setValue(it, ht);
VBNotifyCtx notifyCtx;
std::tie(std::ignore, std::ignore, notifyCtx) =
processExpiredItem(hbl, *v);
/**
* Delete an expired item
*
- * @param key key to be deleted
+ * @param it item to be deleted
* @param startTime the time to be compared with this item's expiry time
- * @param revSeqno revision id sequence number
* @param source Expiry source
*/
- void deleteExpiredItem(const DocKey& key,
+ void deleteExpiredItem(const Item& it,
time_t startTime,
- uint64_t revSeqno,
ExpireBy source);
/**
TestCase("test MB-16421", test_mb16421,
test_setup, teardown, "item_eviction_policy=full_eviction",
prepare_full_eviction, cleanup),
-
TestCase("test eviction with xattr", test_eviction_with_xattr,
test_setup, teardown, "item_eviction_policy=full_eviction",
prepare, cleanup),
deleted, datatype);
auto prev_revseqno = metadata.revSeqno;
EXPECT_EQ(1, prev_revseqno) << "Unexpected revision sequence number";
+ itm.setRevSeqno(1);
+ kvbucket.deleteExpiredItem(itm, ep_real_time() + 1, ExpireBy::Pager);
- kvbucket.deleteExpiredItem(vbid, makeStoredDocKey("key"),
- ep_real_time() + 1, 1, ExpireBy::Pager);
get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
HONOR_STATES |
TRACK_REFERENCE |
#include "tests/module_tests/test_helpers.h"
#include "vbucketdeletiontask.h"
+#include <string_utilities.h>
+#include <xattr/blob.h>
+#include <xattr/utils.h>
+
#include <thread>
// Verify that when handling a bucket delete with open DCP
store->getVBucket(vbid)->getHighSeqno());
}
+TEST_P(EPStoreEvictionTest, xattrExpiryOnFullyEvictedItem) {
+ if (GetParam() == "value_only") {
+ return;
+ }
+
+ cb::xattr::Blob builder;
+
+ //Add a few values
+ builder.set(to_const_byte_buffer("_meta"),
+ to_const_byte_buffer("{\"rev\":10}"));
+ builder.set(to_const_byte_buffer("foo"),
+ to_const_byte_buffer("{\"blob\":true}"));
+
+ auto blob = builder.finalize();
+ auto blob_data = to_string(blob);
+ auto itm = store_item(vbid,
+ makeStoredDocKey("key"),
+ blob_data,
+ 0,
+ {cb::engine_errc::success},
+ (PROTOCOL_BINARY_DATATYPE_JSON |
+ PROTOCOL_BINARY_DATATYPE_XATTR));
+
+ GetValue gv = store->getAndUpdateTtl(makeStoredDocKey("key"), vbid, cookie,
+ time(NULL) + 120);
+ EXPECT_EQ(ENGINE_SUCCESS, gv.getStatus());
+ std::unique_ptr<Item> get_itm(gv.getValue());
+
+ flush_vbucket_to_disk(vbid);
+ evict_key(vbid, makeStoredDocKey("key"));
+ store->deleteExpiredItem(itm, time(NULL) + 121, ExpireBy::Compactor);
+
+ get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
+ HONOR_STATES |
+ TRACK_REFERENCE |
+ DELETE_TEMP |
+ HIDE_LOCKED_CAS |
+ TRACK_STATISTICS |
+ GET_DELETED_VALUE);
+
+ gv = store->get(makeStoredDocKey("key"), vbid, cookie, options);
+ EXPECT_EQ(ENGINE_SUCCESS, gv.getStatus());
+
+ get_itm.reset(gv.getValue());
+ auto get_data = const_cast<char*>(get_itm->getData());
+ EXPECT_EQ(PROTOCOL_BINARY_DATATYPE_XATTR, get_itm->getDataType())
+ << "Unexpected Datatype";
+
+ cb::byte_buffer value_buf{reinterpret_cast<uint8_t*>(get_data),
+ get_itm->getNBytes()};
+ cb::xattr::Blob new_blob(value_buf);
+
+ const std::string& rev_str{"{\"rev\":10}"};
+ const std::string& meta_str = to_string(new_blob.get(to_const_byte_buffer("_meta")));
+
+ EXPECT_EQ(rev_str, meta_str) << "Unexpected system xattrs";
+}
+
// Test cases which run in both Full and Value eviction
INSTANTIATE_TEST_CASE_P(FullAndValueEviction,
EPStoreEvictionTest,