src/vb_count_visitor.cc
src/vbucket.cc
src/vbucketmap.cc
- src/vbucketmemorydeletiontask.cc
+ src/vbucketdeletiontask.cc
src/warmup.cc
${OBJECTREGISTRY_SOURCE}
${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
bool read_only)
: KVStore(config, read_only),
dbname(config.getDBName()),
+ dbFileRevMap(configuration.getMaxVBuckets()),
intransaction(false),
scanCounter(0),
logger(config.getLogger()),
// pre-allocate lookup maps (vectors) given we have a relatively
// small, fixed number of vBuckets.
- dbFileRevMap.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(1));
cachedDocCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(0));
cachedDeleteCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(-1));
cachedFileSize.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(0));
CouchKVStore::CouchKVStore(const CouchKVStore ©From)
: KVStore(copyFrom),
dbname(copyFrom.dbname),
- dbFileRevMap(copyFrom.dbFileRevMap),
+ dbFileRevMap(copyFrom.dbFileRevMap.size()),
numDbFiles(copyFrom.numDbFiles),
intransaction(false),
logger(copyFrom.logger),
base_ops(copyFrom.base_ops)
{
+ // std::atomic needs to be manually copied (not copy constructor)
+ for (size_t ii = 0; ii < dbFileRevMap.size(); ii++) {
+ dbFileRevMap[ii].store(copyFrom.dbFileRevMap[ii].load());
+ }
createDataDir(dbname);
statCollectingFileOps = getCouchstoreStatsOps(st.fsStats, base_ops);
statCollectingFileOpsCompaction = getCouchstoreStatsOps(
cachedFileSize[vbucketId] = 0;
cachedSpaceUsed[vbucketId] = 0;
- //Unlink the couchstore file upon reset
+ // Unlink the current revision and then increment it to ensure any
+ // pending delete doesn't delete us. Note that the expectation is that
+ // some higher level per VB lock is required to prevent data-races here.
+ // KVBucket::vb_mutexes is used in this case.
unlinkCouchFile(vbucketId, dbFileRevMap[vbucketId]);
+ incrementRevision(vbucketId);
+
setVBucketState(vbucketId, *state, VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT,
true);
- updateDbFileMap(vbucketId, 1);
} else {
throw std::invalid_argument("CouchKVStore::reset: No entry in cached "
"states for vbucket " + std::to_string(vbucketId));
pendingReqsQ.push_back(req);
}
-bool CouchKVStore::delVBucket(uint16_t vbucket) {
+void CouchKVStore::delVBucket(uint16_t vbucket, uint64_t fileRev) {
if (isReadOnly()) {
throw std::logic_error("CouchKVStore::delVBucket: Not valid on a "
"read-only object.");
}
- unlinkCouchFile(vbucket, dbFileRevMap[vbucket]);
-
- if (cachedVBStates[vbucket]) {
- delete cachedVBStates[vbucket];
- }
-
- cachedDocCount[vbucket] = 0;
- cachedDeleteCount[vbucket] = 0;
- cachedFileSize[vbucket] = 0;
- cachedSpaceUsed[vbucket] = 0;
-
- std::string failovers("[{\"id\":0, \"seq\":0}]");
- cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
- 0, 0, 0, failovers);
- updateDbFileMap(vbucket, 1);
-
- return true;
+ unlinkCouchFile(vbucket, fileRev);
}
std::vector<vbucket_state *> CouchKVStore::listPersistedVbuckets() {
return readCollectionsManifest(*db.getDb());
}
+void CouchKVStore::incrementRevision(uint16_t vbid) {
+ dbFileRevMap[vbid]++;
+}
+
+uint64_t CouchKVStore::prepareToDelete(uint16_t vbid) {
+ // Clear the stats so it looks empty (real deletion of the disk data occurs
+ // later)
+ cachedDocCount[vbid] = 0;
+ cachedDeleteCount[vbid] = 0;
+ cachedFileSize[vbid] = 0;
+ cachedSpaceUsed[vbid] = 0;
+ return dbFileRevMap[vbid];
+}
+
/* end of couch-kvstore.cc */
void initialize();
/**
- * Reset database to a clean state.
+ * Reset vbucket to a clean state.
*/
void reset(uint16_t vbucketId) override;
*/
void del(const Item &itm, Callback<int> &cb) override;
- bool delVBucket(uint16_t vbucket) override;
+ /**
+ * Delete a given vbucket database instance from underlying storage
+ *
+ * @param vbucket vbucket id
+ * @param fileRev the revision of the file to delete
+ */
+ void delVBucket(uint16_t vbucket, uint64_t fileRev) override;
/**
* Retrieve the list of persisted vbucket states
std::string getCollectionsManifest(uint16_t vbid) override;
+ /**
+ * Increment the revision number of the vbucket.
+ * @param vbid ID of the vbucket to change.
+ */
+ void incrementRevision(uint16_t vbid) override;
+
+ /**
+ * Prepare for delete of the vbucket file, this just removes the in-memory
+ * stats for the vbucket and returns the current file revision (which is
+ * the revision that must later be unlinked).
+ *
+ * @param vbid ID of the vbucket being deleted
+ * @return the revision ID to delete (via ::delVBucket)
+ */
+ uint64_t prepareToDelete(uint16_t vbid) override;
+
protected:
/*
* Returns the DbInfo for the given vbucket database.
*/
DbInfo getDbInfo(uint16_t vbid);
-protected:
bool setVBucketState(uint16_t vbucketId, const vbucket_state &vbstate,
VBStatePersist options, bool reset=false);
const std::string dbname;
- // Map of the fileRev for each vBucket. Using RelaxedAtomic so
- // stats gathering (doDcpVbTakeoverStats) can get a snapshot
- // without having to lock.
- std::vector<Couchbase::RelaxedAtomic<uint64_t>> dbFileRevMap;
+ /**
+ * Per-vbucket file revision atomic to ensure writer threads see increments
+ */
+ std::vector<std::atomic<uint64_t>> dbFileRevMap;
uint16_t numDbFiles;
std::vector<CouchRequest *> pendingReqsQ;
uint64_t maxCas,
const std::string& collectionsManifest) {
auto flusherCb = std::make_shared<NotifyFlusherCB>(shard);
- return std::make_shared<EPVBucket>(id,
- state,
- stats,
- engine.getCheckpointConfig(),
- shard,
- lastSeqno,
- lastSnapStart,
- lastSnapEnd,
- std::move(table),
- flusherCb,
- std::move(newSeqnoCb),
- engine.getConfiguration(),
- eviction_policy,
- initState,
- purgeSeqno,
- maxCas,
- collectionsManifest);
+ // Not using make_shared or allocate_shared
+ // 1. make_shared doesn't accept a Deleter
+ // 2. allocate_shared has inconsistencies between platforms in calling
+ // alloc.destroy (libc++ doesn't call it)
+ return VBucketPtr(new EPVBucket(id,
+ state,
+ stats,
+ engine.getCheckpointConfig(),
+ shard,
+ lastSeqno,
+ lastSnapStart,
+ lastSnapEnd,
+ std::move(table),
+ flusherCb,
+ std::move(newSeqnoCb),
+ engine.getConfiguration(),
+ eviction_policy,
+ initState,
+ purgeSeqno,
+ maxCas,
+ collectionsManifest),
+ VBucket::DeferredDeleter(engine));
}
ENGINE_ERROR_CODE EPBucket::statsVKey(const DocKey& key,
#include "failover-table.h"
#include "kvshard.h"
#include "stored_value_factories.h"
+#include "vbucketdeletiontask.h"
EPVBucket::EPVBucket(id_type i,
vbucket_state_t newState,
return GetValue(
NULL, ENGINE_EWOULDBLOCK, v.getBySeqno(), true, v.getNRUValue());
}
+
+void EPVBucket::setupDeferredDeletion(const void* cookie) {
+ setDeferredDeletionCookie(cookie);
+ deferredDeletionFileRevision.store(
+ getShard()->getRWUnderlying()->prepareToDelete(getId()));
+ setDeferredDeletion(true);
+}
+
+void EPVBucket::scheduleDeferredDeletion(EventuallyPersistentEngine& engine) {
+ ExTask task = new VBucketMemoryAndDiskDeletionTask(engine, *shard, this);
+ ExecutorPool::get()->schedule(task);
+}
\ No newline at end of file
void queueBackfillItem(queued_item& qi,
const GenerateBySeqno generateBySeqno) override;
+ /**
+ * Setup deferred deletion, this is where deletion of the vbucket is
+ * deferred and completed by an AUXIO task as it will hit disk for the data
+ * file unlink.
+ *
+ * @param cookie A cookie to notify when the deletion task completes.
+ */
+ void setupDeferredDeletion(const void* cookie) override;
+
+ /**
+ * @return the file revision to be unlinked by the deferred deletion task
+ */
+ uint64_t getDeferredDeletionFileRevision() const {
+ return deferredDeletionFileRevision;
+ }
+
+ /**
+ * Schedule a VBucketMemoryAndDiskDeletionTask to delete this object.
+ * @param engine owning engine (required for task construction)
+ */
+ void scheduleDeferredDeletion(EventuallyPersistentEngine& engine) override;
+
protected:
/**
* queue a background fetch of the specified item.
/* Pointer to the shard to which this VBucket belongs to */
KVShard* shard;
+ /**
+ * When deferred deletion is enabled for this object we store the database
+ * file revision we will unlink from disk.
+ */
+ std::atomic<uint64_t> deferredDeletionFileRevision;
+
friend class EPVBucketTest;
-};
+};
\ No newline at end of file
uint64_t purgeSeqno,
uint64_t maxCas,
const std::string& collectionsManifest) {
- return std::make_shared<EphemeralVBucket>(id,
- state,
- stats,
- engine.getCheckpointConfig(),
- shard,
- lastSeqno,
- lastSnapStart,
- lastSnapEnd,
- std::move(table),
- std::move(newSeqnoCb),
- engine.getConfiguration(),
- eviction_policy,
- initState,
- purgeSeqno,
- maxCas,
- collectionsManifest);
+ // Not using make_shared or allocate_shared
+ // 1. make_shared doesn't accept a Deleter
+ // 2. allocate_shared has inconsistencies between platforms in calling
+ // alloc.destroy (libc++ doesn't call it)
+ return VBucketPtr(new EphemeralVBucket(id,
+ state,
+ stats,
+ engine.getCheckpointConfig(),
+ shard,
+ lastSeqno,
+ lastSnapStart,
+ lastSnapEnd,
+ std::move(table),
+ std::move(newSeqnoCb),
+ engine.getConfiguration(),
+ eviction_policy,
+ initState,
+ purgeSeqno,
+ maxCas,
+ collectionsManifest),
+ VBucket::DeferredDeleter(engine));
}
void EphemeralBucket::completeStatsVKey(const void* cookie,
#include "failover-table.h"
#include "linked_list.h"
#include "stored_value_factories.h"
+#include "vbucketdeletiontask.h"
EphemeralVBucket::EphemeralVBucket(id_type i,
vbucket_state_t newState,
/* We reach here only if the v is deleted and does not have any value */
return GetValue();
}
+
+void EphemeralVBucket::setupDeferredDeletion(const void* cookie) {
+ setDeferredDeletionCookie(cookie);
+ setDeferredDeletion(true);
+}
+
+void EphemeralVBucket::scheduleDeferredDeletion(
+ EventuallyPersistentEngine& engine) {
+ ExTask task = new VBucketMemoryDeletionTask(engine, this);
+ ExecutorPool::get()->schedule(task);
+}
*/
size_t purgeTombstones(rel_time_t purgeAge);
+ void setupDeferredDeletion(const void* cookie) override;
+
+ /**
+ * Schedule a VBucketMemoryDeletionTask to delete this object.
+ * @param engine owning engine (required for task construction)
+ */
+ void scheduleDeferredDeletion(EventuallyPersistentEngine& engine) override;
+
protected:
/* Data structure for in-memory sequential storage */
std::unique_ptr<SequenceList> seqList;
static_assert(TaskPriority::MultiBGFetcherTask < TaskPriority::BGFetchCallback,
"MultiBGFetcherTask not less than BGFetchCallback");
-static_assert(TaskPriority::BGFetchCallback == TaskPriority::VBDeleteTask,
- "BGFetchCallback not equal VBDeleteTask");
+static_assert(TaskPriority::BGFetchCallback ==
+ TaskPriority::VBucketMemoryAndDiskDeletionTask,
+ "BGFetchCallback not equal VBucketMemoryAndDiskDeletionTask");
static_assert(TaskPriority::VKeyStatBGFetchTask < TaskPriority::FlusherTask,
"VKeyStatBGFetchTask not less than FlusherTask");
}
HashTable::~HashTable() {
- clear(true);
+ // Use unlocked clear for the destructor, avoids lock inversions on VBucket
+ // delete
+ clear_UNLOCKED(true);
// Wait for any outstanding visitors to finish.
while (visitors > 0) {
#ifdef _MSC_VER
}
}
MultiLockHolder mlh(mutexes, n_locks);
+ clear_UNLOCKED(deactivate);
+}
+
+void HashTable::clear_UNLOCKED(bool deactivate) {
if (deactivate) {
setActiveState(false);
}
return nullptr;
}
+ void clear_UNLOCKED(bool deactivate);
+
DISALLOW_COPY_AND_ASSIGN(HashTable);
};
#include "replicationthrottle.h"
#include "statwriter.h"
#include "tapconnmap.h"
-#include "vbucketmemorydeletiontask.h"
+#include "vbucketdeletiontask.h"
#include "vb_count_visitor.h"
class StatsValueChangeListener : public ValueChangedListener {
bool transfer,
bool notify_dcp) {
// Lock to prevent a race condition between a failed update and add.
- LockHolder lh(vbsetMutex);
+ std::unique_lock<std::mutex> lh(vbsetMutex);
return setVBucketState_UNLOCKED(vbid, to, transfer, notify_dcp, lh);
}
-ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(uint16_t vbid,
- vbucket_state_t to,
- bool transfer,
- bool notify_dcp,
- LockHolder& vbset) {
+ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(
+ uint16_t vbid,
+ vbucket_state_t to,
+ bool transfer,
+ bool notify_dcp,
+ std::unique_lock<std::mutex>& vbset) {
VBucketPtr vb = vbMap.getBucket(vbid);
if (vb && to == vb->getState()) {
return ENGINE_SUCCESS;
// The first checkpoint for active vbucket should start with id 2.
uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
+
+ // Before adding the VB to the map increment the revision
+ getRWUnderlying(vbid)->incrementRevision(vbid);
+
if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
return ENGINE_ERANGE;
}
vb->checkpointManager.queueSetVBState(*vb);
}
-bool KVBucket::completeVBucketDeletion(uint16_t vbid, const void* cookie) {
- hrtime_t start_time(gethrtime());
- bool bucketDeleting;
- {
- LockHolder lh(vbsetMutex);
- VBucketPtr vb = vbMap.getBucket(vbid);
- bucketDeleting = !vb ||
- vb->getState() == vbucket_state_dead ||
- vb->isBucketDeletion();
-
- if (bucketDeleting) {
- LockHolder vlh(vb_mutexes[vbid]);
- if (!getRWUnderlying(vbid)->delVBucket(vbid)) {
- return false;
- }
- if (vb) {
- vb->setBucketDeletion(false);
- vb->setBucketCreation(false);
- vb->setPersistenceSeqno(0);
- }
- ++stats.vbucketDeletions;
- }
- }
-
- hrtime_t spent(gethrtime() - start_time);
- hrtime_t wall_time = spent / 1000;
- BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
- stats.diskVBDelHisto.add(wall_time);
- atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
- stats.vbucketDelTotWalltime.fetch_add(wall_time);
- if (cookie) {
- engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
- }
-
- return true;
-}
-
-void KVBucket::scheduleVBDeletion(VBucketPtr &vb, const void* cookie,
- double delay) {
- ExTask delTask = make_STRCPtr<VBucketMemoryDeletionTask>(engine, vb, delay);
- ExecutorPool::get()->schedule(delTask);
-
- if (vb->setBucketDeletion(true)) {
- ExTask task = make_STRCPtr<VBDeleteTask>(&engine, vb->getId(), cookie);
- ExecutorPool::get()->schedule(task);
- }
-}
-
ENGINE_ERROR_CODE KVBucket::deleteVBucket(uint16_t vbid, const void* c) {
// Lock to prevent a race condition between a failed update and add
// (and delete).
- VBucketPtr vb;
+ VBucketPtr vb = vbMap.getBucket(vbid);
+ if (!vb) {
+ return ENGINE_NOT_MY_VBUCKET;
+ }
+
{
- LockHolder lh(vbsetMutex);
- vb = vbMap.getBucket(vbid);
- if (!vb) {
- return ENGINE_NOT_MY_VBUCKET;
- }
+ std::unique_lock<std::mutex> vbSetLh(vbsetMutex);
+ // Obtain the vb_mutex for the VB to ensure we interlock with other
+ // threads that are manipulating the VB (particularly ones which may
+ // try and change the disk revision e.g. deleteAll and compaction).
+ std::unique_lock<std::mutex> vbMutLh(vb_mutexes[vbid]);
vb->setState(vbucket_state_dead);
engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
- vbMap.removeBucket(vbid);
+
+ // Drop the VB to begin the delete, the last holder of the VB will
+ // unknowingly trigger the destructor which schedules a deletion task.
+ vbMap.dropVBucketAndSetupDeferredDeletion(vbid, c);
}
- scheduleVBDeletion(vb, c);
+
if (c) {
return ENGINE_EWOULDBLOCK;
}
}
bool KVBucket::resetVBucket(uint16_t vbid) {
- LockHolder lh(vbsetMutex);
- return resetVBucket_UNLOCKED(vbid, lh);
+ std::unique_lock<std::mutex> lh(vbsetMutex);
+ // Obtain the vb_mutex for the VB to ensure we interlock with other
+ // threads that are manipulating the VB (particularly ones which may
+ // try and change the disk revision).
+ std::unique_lock<std::mutex> vbMutLh(vb_mutexes[vbid]);
+ return resetVBucket_UNLOCKED(vbid, lh, vbMutLh);
}
-bool KVBucket::resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset) {
+bool KVBucket::resetVBucket_UNLOCKED(uint16_t vbid,
+ std::unique_lock<std::mutex>& vbset,
+ std::unique_lock<std::mutex>& vbMutex) {
bool rv(false);
VBucketPtr vb = vbMap.getBucket(vbid);
if (vb) {
vbucket_state_t vbstate = vb->getState();
- vbMap.removeBucket(vbid);
+ vbMap.dropVBucketAndSetupDeferredDeletion(vbid, nullptr /*no cookie*/);
checkpointCursorInfoList cursors =
vb->checkpointManager.getAllCursors();
// Delete and recreate the vbucket database file
- scheduleVBDeletion(vb, NULL, 0);
setVBucketState_UNLOCKED(vbid, vbstate,
false/*transfer*/, true/*notifyDcp*/, vbset);
private:
void redirty() {
- if (vbucket->isBucketDeletion()) {
+ if (vbucket->isDeletionDeferred()) {
// updating the member stats for the vbucket is not really necessary
// as the vbucket is about to be deleted
vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
VBucketPtr vb = getVBucket(i);
// Reset the vBucket if it's non-null and not already in the middle of
// being created / destroyed.
- if (vb && !(vb->isBucketCreation() || vb->isBucketDeletion())) {
+ if (vb && !(vb->isBucketCreation() || vb->isDeletionDeferred())) {
LockHolder lh(vb_mutexes[vb->getId()]);
getRWUnderlying(vb->getId())->reset(i);
}
}
ENGINE_ERROR_CODE KVBucket::rollback(uint16_t vbid, uint64_t rollbackSeqno) {
- LockHolder vbset(vbsetMutex);
+ std::unique_lock<std::mutex> vbset(vbsetMutex);
- std::unique_lock<std::mutex> lh(vb_mutexes[vbid], std::try_to_lock);
+ std::unique_lock<std::mutex> vbMutexLh(vb_mutexes[vbid], std::try_to_lock);
- if (!lh.owns_lock()) {
+ if (!vbMutexLh.owns_lock()) {
return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
}
}
}
- if (resetVBucket_UNLOCKED(vbid, vbset)) {
+ if (resetVBucket_UNLOCKED(vbid, vbset, vbMutexLh)) {
VBucketPtr newVb = vbMap.getBucket(vbid);
newVb->incrRollbackItemCount(prevHighSeqno);
return ENGINE_SUCCESS;
bool transfer, bool notify_dcp = true);
/**
- * Physically deletes a VBucket from disk. This function should only
- * be called on a VBucket that has already been logically deleted.
- *
- * @param vbid vbucket id
- * @param cookie The connection that requested the deletion
- */
- bool completeVBucketDeletion(uint16_t vbid, const void* cookie);
-
- /**
* Deletes a vbucket
*
* @param vbid The vbucket to delete.
*/
void compactInternal(compaction_ctx *ctx);
- void scheduleVBDeletion(VBucketPtr &vb,
- const void* cookie,
- double delay = 0);
-
void flushOneDeleteAll(void);
PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
VBucketPtr &vb);
vbucket_state_t allowedState,
get_options_t options = TRACK_REFERENCE);
- bool resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset);
+ bool resetVBucket_UNLOCKED(uint16_t vbid,
+ std::unique_lock<std::mutex>& vbset,
+ std::unique_lock<std::mutex>& vbMutex);
- ENGINE_ERROR_CODE setVBucketState_UNLOCKED(uint16_t vbid, vbucket_state_t state,
- bool transfer, bool notify_dcp,
- LockHolder& vbset);
+ ENGINE_ERROR_CODE setVBucketState_UNLOCKED(
+ uint16_t vbid,
+ vbucket_state_t state,
+ bool transfer,
+ bool notify_dcp,
+ std::unique_lock<std::mutex>& vbset);
/* Notify flusher of a new seqno being added in the vbucket */
virtual void notifyFlusher(const uint16_t vbid);
bool notify_dcp = true) = 0;
/**
- * Physically deletes a VBucket from disk. This function should only
- * be called on a VBucket that has already been logically deleted.
- *
- * @param vbid vbucket id
- * @param cookie The connection that requested the deletion
- */
- virtual bool completeVBucketDeletion(uint16_t vbid, const void* cookie) = 0;
-
- /**
* Deletes a vbucket
*
* @param vbid The vbucket to delete.
*/
virtual void compactInternal(compaction_ctx *ctx) = 0;
- virtual void scheduleVBDeletion(VBucketPtr &vb,
- const void* cookie,
- double delay = 0) = 0;
-
virtual void flushOneDeleteAll(void) = 0;
virtual PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
VBucketPtr &vb) = 0;
vbuckets[vb->getId()].lock().set(vb);
}
-void KVShard::resetBucket(uint16_t id) {
- vbuckets[id].lock().reset();
+void KVShard::dropVBucketAndSetupDeferredDeletion(VBucket::id_type id,
+ const void* cookie) {
+ auto vb = vbuckets[id].lock();
+ auto vbPtr = vb.get();
+ vbPtr->setupDeferredDeletion(cookie);
+ vb.reset();
}
std::vector<VBucket::id_type> KVShard::getVBucketsSortedByState() {
#include "kvstore.h"
#include "utility.h"
+#include "vbucket.h"
#include <atomic>
VBucketPtr getBucket(VBucket::id_type id) const;
void setBucket(VBucketPtr vb);
- void resetBucket(VBucket::id_type id);
+
+ /**
+ * Drop the vbucket from the map and setup deferred deletion of the VBucket.
+ * Once the VBucketPtr has no more references the vbucket is deleted, but
+ * deletion occurs via a task that is scheduled by the VBucketPtr deleter,
+ * ensuring no front-end thread deletes the memory/disk associated with the
+ * VBucket.
+ *
+ * @param id The VB to drop
+ * @param cookie Optional connection cookie, this cookie will be notified
+ * when the deletion task is completed.
+ */
+ void dropVBucketAndSetupDeferredDeletion(VBucket::id_type id,
+ const void* cookie);
KVShard::id_type getId() const {
return kvConfig.getShardId();
}
/**
- * Reset the store to a clean state.
+ * Reset the vbucket to a clean state.
*/
- virtual void reset(uint16_t shardId) = 0;
+ virtual void reset(uint16_t vbid) = 0;
/**
* Begin a transaction (if not already in one).
* Delete a given vbucket database instance from underlying storage
*
* @param vbucket vbucket id
- * return true, if vbucket deletion was successful. Else, false.
+ * @param fileRev the revision of the file to delete
*/
- virtual bool delVBucket(uint16_t vbucket) = 0;
+ virtual void delVBucket(uint16_t vbucket, uint64_t fileRev) = 0;
/**
* Get a list of all persisted vbuckets (with their states).
*/
virtual std::string getCollectionsManifest(uint16_t vbid) = 0;
+ /**
+ * Increment the revision number of the vbucket.
+ * @param vbid ID of the vbucket to change.
+ */
+ virtual void incrementRevision(uint16_t vbid) = 0;
+
+ /**
+ * Prepare for delete of the vbucket file
+ *
+ * @param vbid ID of the vbucket being deleted
+ * @return the revision ID to delete (via ::delVBucket)
+ */
+ virtual uint64_t prepareToDelete(uint16_t vbid) = 0;
+
protected:
/* all stats */
Couchbase::RelaxedAtomic<uint16_t> cachedValidVBCount;
std::list<PersistenceCallback *> pcbs;
-protected:
-
void createDataDir(const std::string& dbname);
template <typename T>
void addStat(const std::string& prefix, const char* nm, T& val,
return flusher->step(this);
}
-bool VBDeleteTask::run() {
- TRACE_EVENT("ep-engine/task", "VBDeleteTask", vbucketId, cookie);
- return !engine->getKVBucket()->completeVBucketDeletion(vbucketId, cookie);
-}
-
bool CompactTask::run() {
TRACE_EVENT("ep-engine/task", "CompactTask", compactCtx.db_file_id);
return engine->getKVBucket()->doCompact(&compactCtx, cookie);
TASK(AccessScannerVisitor, AUXIO_TASK_IDX, 3)
TASK(ActiveStreamCheckpointProcessorTask, AUXIO_TASK_IDX, 5)
TASK(BackfillManagerTask, AUXIO_TASK_IDX, 8)
+TASK(VBucketMemoryAndDiskDeletionTask, AUXIO_TASK_IDX, 1)
// Read/Write IO tasks
-TASK(VBDeleteTask, WRITER_TASK_IDX, 1)
TASK(RollbackTask, WRITER_TASK_IDX, 1)
TASK(CompactVBucketTask, WRITER_TASK_IDX, 2)
TASK(FlusherTask, WRITER_TASK_IDX, 5)
};
/**
- * A task for deleting VBucket files from disk and cleaning up any outstanding
- * writes for that VBucket file.
- * sid (shard ID) passed on to GlobalTask indicates that task needs to be
- * serialized with other tasks that require serialization on its shard
- */
-class VBDeleteTask : public GlobalTask {
-public:
- VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c,
- bool completeBeforeShutdown = true)
- : GlobalTask(e, TaskId::VBDeleteTask, 0, completeBeforeShutdown),
- vbucketId(vbid), cookie(c),
- description("Deleting VBucket:" + std::to_string(vbucketId)) {}
-
- bool run();
-
- cb::const_char_buffer getDescription() {
- return description;
- }
-
-private:
- uint16_t vbucketId;
- const void* cookie;
- const std::string description;
-};
-
-/**
* A task for compacting a vbucket db file
*/
class CompactTask : public GlobalTask {
#include "failover-table.h"
#include "flusher.h"
#include "pre_link_document_context.h"
+#include "vbucketdeletiontask.h"
#define STATWRITER_NAMESPACE vbucket
#include "statwriter.h"
statPrefix("vb_" + std::to_string(i)),
persistenceCheckpointId(0),
bucketCreation(false),
- bucketDeletion(false),
+ deferredDeletion(false),
+ deferredDeletionCookie(nullptr),
newSeqnoCb(std::move(newSeqnoCb)),
manifest(collectionsManifest) {
if (config.getConflictResolutionType().compare("lww") == 0) {
return {};
}
}
+
+void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
+ // If the vbucket is marked as deleting then we must schedule task to
+ // perform the resource destruction (memory/disk).
+ if (vb->isDeletionDeferred()) {
+ vb->scheduleDeferredDeletion(engine);
+ return;
+ }
+ delete vb;
+}
class EventuallyPersistentEngine;
class FailoverTable;
class KVShard;
+class VBucketMemoryDeletionTask;
/**
* An individual vbucket.
return bucketCreation.compare_exchange_strong(inverse, rv);
}
- // States whether the VBucket is in the process of being deleted
- bool isBucketDeletion() const {
- return bucketDeletion.load();
+ /**
+ * @return true if the vbucket deletion is to be deferred to a background
+ * task.
+ */
+ bool isDeletionDeferred() const {
+ return deferredDeletion.load();
}
- bool setBucketDeletion(bool delBucket) {
- bool inverse = !delBucket;
- return bucketDeletion.compare_exchange_strong(inverse, delBucket);
+ /**
+ * @param value true if the vbucket's deletion should be deferred to a
+ * background task. This is for VBucket objects created by
+ * makeVBucket and owned by a VBucketPtr. If the VBucket was manually
+ * created this will have no effect on deletion.
+ */
+ void setDeferredDeletion(bool value) {
+ deferredDeletion.store(true);
}
+ /**
+ * @param A cookie to notify when the deferred deletion completes.
+ */
+ void setDeferredDeletionCookie(const void* cookie) {
+ deferredDeletionCookie = cookie;
+ }
+
+ /**
+ * @return the cookie which could of been set when setupDeferredDeletion was
+ * called.
+ */
+ const void* getDeferredDeletionCookie() const {
+ return deferredDeletionCookie;
+ }
+
+ /**
+ * Setup deferred deletion, this is where deletion of the vbucket is
+ * deferred and completed by an AUXIO/NONIO task. AUXIO for EPVBucket
+ * as it will hit disk for the data file unlink, NONIO is used for
+ * EphemeralVBucket as only memory resources need freeing.
+ * @param cookie A cookie to notify when the deletion task completes.
+ */
+ virtual void setupDeferredDeletion(const void* cookie) = 0;
+
// Returns the last persisted sequence number for the VBucket
virtual uint64_t getPersistenceSeqno() const = 0;
std::atomic<size_t> numExpiredItems;
+ /**
+ * A custom delete function for deleting VBucket objects. Any thread could
+ * be the last thread to release a VBucketPtr and deleting a VB will
+ * eventually hit the I/O sub-system when we unlink the file, to be sure no
+ * front-end thread does this work, we schedule the deletion to a background
+ * task. This task scheduling is triggered by the shared_ptr/VBucketPtr
+ * using this object as the deleter.
+ */
+ struct DeferredDeleter {
+ DeferredDeleter(EventuallyPersistentEngine& engine) : engine(engine) {
+ }
+
+ /**
+ * Called when the VBucketPtr has no more owners and runs delete on
+ * the object.
+ */
+ void operator()(VBucket* vb) const;
+
+ EventuallyPersistentEngine& engine;
+ };
+
protected:
/**
* This function checks cas, expiry and other partition (vbucket) related
/* size of list hpVBReqs (to avoid MB-9434) */
Couchbase::RelaxedAtomic<size_t> numHpVBReqs;
+ /**
+ * VBucket sub-classes must implement a function that will schedule
+ * an appropriate task that will delete the VBucket and its resources.
+ *
+ * @param engine owning engine (required for task construction)
+ */
+ virtual void scheduleDeferredDeletion(
+ EventuallyPersistentEngine& engine) = 0;
+
private:
void fireAllOps(EventuallyPersistentEngine& engine, ENGINE_ERROR_CODE code);
std::string statPrefix;
// The persistence checkpoint ID for this vbucket.
std::atomic<uint64_t> persistenceCheckpointId;
- // Flag to indicate the bucket is being created
+ // Flag to indicate the vbucket is being created
std::atomic<bool> bucketCreation;
- // Flag to indicate the bucket is being deleted
- std::atomic<bool> bucketDeletion;
+ // Flag to indicate the vbucket deletion is deferred
+ std::atomic<bool> deferredDeletion;
+ /// A cookie that can be set when the vbucket is deletion is deferred, the
+ /// cookie will be notified when the deferred deletion completes
+ const void* deferredDeletionCookie;
// Ptr to the item conflict resolution module
std::unique_ptr<ConflictResolution> conflictResolver;
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2017 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "vbucketdeletiontask.h"
+#include "ep_engine.h"
+#include "ep_vb.h"
+#include "executorpool.h"
+#include "kvshard.h"
+
+#include <phosphor/phosphor.h>
+#include <platform/processclock.h>
+
+VBucketMemoryDeletionTask::VBucketMemoryDeletionTask(
+ EventuallyPersistentEngine& eng, VBucket* vb, TaskId tid)
+ : GlobalTask(&eng, tid, 0.0, true), vbucket(vb) {
+ if (!vbucket) {
+ throw std::logic_error(
+ "VBucketMemoryDeletionTask::VBucketMemoryDeletionTask no "
+ "vbucket");
+ }
+ description = "Removing (dead) vb:" + std::to_string(vbucket->getId()) +
+ " from memory";
+}
+
+cb::const_char_buffer VBucketMemoryDeletionTask::getDescription() {
+ return description;
+}
+
+bool VBucketMemoryDeletionTask::run() {
+ TRACE_EVENT(
+ "ep-engine/task", "VBucketMemoryDeletionTask", vbucket->getId());
+
+ notifyAllPendingConnsFailed(true);
+
+ return false;
+}
+
+void VBucketMemoryDeletionTask::notifyAllPendingConnsFailed(
+ bool notifyIfCookieSet) {
+ vbucket->notifyAllPendingConnsFailed(*engine);
+
+ if (notifyIfCookieSet && vbucket->getDeferredDeletionCookie()) {
+ engine->notifyIOComplete(vbucket->getDeferredDeletionCookie(),
+ ENGINE_SUCCESS);
+ }
+}
+
+VBucketMemoryAndDiskDeletionTask::VBucketMemoryAndDiskDeletionTask(
+ EventuallyPersistentEngine& eng, KVShard& shard, EPVBucket* vb)
+ : VBucketMemoryDeletionTask(eng,
+ static_cast<VBucket*>(vb),
+ TaskId::VBucketMemoryAndDiskDeletionTask),
+ shard(shard),
+ vbDeleteRevision(vb->getDeferredDeletionFileRevision()) {
+ description += " and disk";
+}
+
+bool VBucketMemoryAndDiskDeletionTask::run() {
+ TRACE_EVENT("ep-engine/task",
+ "VBucketMemoryAndDiskDeletionTask",
+ vbucket->getId());
+ notifyAllPendingConnsFailed(false);
+
+ auto start = ProcessClock::now();
+ shard.getRWUnderlying()->delVBucket(vbucket->getId(), vbDeleteRevision);
+ auto elapsed = ProcessClock::now() - start;
+ auto wallTime =
+ std::chrono::duration_cast<std::chrono::microseconds>(elapsed);
+
+ engine->getEpStats().vbucketDeletions++;
+ BlockTimer::log(
+ elapsed.count(), "disk_vb_del", engine->getEpStats().timingLog);
+ engine->getEpStats().diskVBDelHisto.add(wallTime.count());
+ atomic_setIfBigger(engine->getEpStats().vbucketDelMaxWalltime,
+ hrtime_t(wallTime.count()));
+ engine->getEpStats().vbucketDelTotWalltime.fetch_add(wallTime.count());
+
+ if (vbucket->getDeferredDeletionCookie()) {
+ engine->notifyIOComplete(vbucket->getDeferredDeletionCookie(),
+ ENGINE_SUCCESS);
+ }
+
+ return false;
+}
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2017 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "globaltask.h"
+#include "vbucket.h"
+
+class EPVBucket;
+
+/*
+ * This is a NONIO task called as part of VB deletion. The task is responsible
+ * for clearing all the VBucket's pending operations and for deleting the
+ * VBucket (via a smart pointer).
+ *
+ * This task is designed to be invoked only when the VBucket has no owners.
+ */
+class VBucketMemoryDeletionTask : public GlobalTask {
+public:
+ /**
+ * @param engine required for GlobalTask construction
+ * @param vbucket the vbucket object to delete
+ */
+ VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
+ VBucket* vbucket,
+ TaskId tid = TaskId::VBucketMemoryDeletionTask);
+
+ cb::const_char_buffer getDescription();
+
+ bool run();
+
+protected:
+ /**
+ * Call vbucket->notifyAllPendingConnsFailed and optionally perform
+ * notifyIOComplete
+ *
+ * @pararm notifyIfCookieSet set to true if the function should perform
+ * notifyIOComplete on vbucket->getDeletingCookie()
+ */
+ void notifyAllPendingConnsFailed(bool notifyIfCookieSet);
+
+ /**
+ * The vbucket we are deleting is stored in a unique_ptr for RAII deletion
+ * once this task is finished and itself deleted, the VBucket will be
+ * deleted.
+ */
+ std::unique_ptr<VBucket> vbucket;
+ std::string description;
+};
+
+/*
+ * This is an AUXIO task called as part of EPVBucket deletion. The task is
+ * responsible for clearing all the VBucket's pending operations and for
+ * clearing the VBucket's hash table and removing the disk file.
+ *
+ * This task is designed to be invoked only when the EPVBucket has no owners.
+ */
+class VBucketMemoryAndDiskDeletionTask : public VBucketMemoryDeletionTask {
+public:
+ /**
+ * This task will as part of construction increase the vbucket's disk
+ * revision so that the delete can remove the file without new
+ * instances of the same vbucket writing to the file.
+ *
+ * @param engine requird for GlobalTask construction
+ * @param shard the KVShard to use for deletion
+ * @param vbucket the Eventually Persistent vbucket object to delete
+ */
+ VBucketMemoryAndDiskDeletionTask(EventuallyPersistentEngine& engine,
+ KVShard& shard,
+ EPVBucket* vbucket);
+
+ bool run();
+
+protected:
+ KVShard& shard;
+ uint64_t vbDeleteRevision;
+};
\ No newline at end of file
return ENGINE_ERANGE;
}
-void VBucketMap::removeBucket(id_type id) {
+void VBucketMap::dropVBucketAndSetupDeferredDeletion(id_type id,
+ const void* cookie) {
if (id < size) {
- // Theoretically, this could be off slightly. In
- // practice, this happens only on dead vbuckets.
- getShardByVbId(id)->resetBucket(id);
+ getShardByVbId(id)->dropVBucketAndSetupDeferredDeletion(id, cookie);
}
}
/**
* Add the VBucket to the map - extending the lifetime of the object until
- * it is removed from the map via removeBucket.
+ * it is removed from the map via dropAndDeleteVBucket.
* @param vb shared pointer to the VBucket we are storing.
*/
ENGINE_ERROR_CODE addBucket(VBucketPtr vb);
- void removeBucket(id_type id);
+ /**
+ * Drop the vbucket from the map and setup deferred deletion of the VBucket.
+ * Once the VBucketPtr has no more references the vbucket is deleted, but
+ * deletion occurs via a task that is scheduled by the VBucketPtr deleter,
+ * ensuring no front-end thread deletes the memory/disk associated with the
+ * VBucket.
+ *
+ * @param id The VB to drop
+ * @param cookie Optional connection cookie, this cookie will be notified
+ * when the deletion task is completed.
+ */
+ void dropVBucketAndSetupDeferredDeletion(id_type id, const void* cookie);
VBucketPtr getBucket(id_type id) const;
// Returns the size of the map, i.e. the total number of VBuckets it can
+++ /dev/null
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- * Copyright 2016 Couchbase, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "vbucketmemorydeletiontask.h"
-
-#include <phosphor/phosphor.h>
-
-#include <sstream>
-
-VBucketMemoryDeletionTask::VBucketMemoryDeletionTask(
- EventuallyPersistentEngine& eng, VBucketPtr& vb, double delay)
- : GlobalTask(&eng, TaskId::VBucketMemoryDeletionTask, delay, true),
- e(eng),
- vbucket(vb) {
- if (!vb) {
- throw std::invalid_argument(
- "VBucketMemoryDeletionTask: vb to delete cannot be null");
- }
- description = "Removing (dead) vb:" + std::to_string(vbucket->getId()) +
- " from memory";
-}
-
-cb::const_char_buffer VBucketMemoryDeletionTask::getDescription() {
- return description;
-}
-
-bool VBucketMemoryDeletionTask::run() {
- TRACE_EVENT("ep-engine/task", "VBucketMemoryDeletionTask",
- vbucket->getId());
- vbucket->notifyAllPendingConnsFailed(e);
- vbucket.reset();
- return false;
-}
+++ /dev/null
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- * Copyright 2016 Couchbase, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include "globaltask.h"
-#include "vbucket.h"
-/*
- * This is a NONIO task called as part of VB deletion. The task is responsible
- * for clearing all the VBucket's pending operations and for clearing the
- * VBucket's hash table.
- */
-class VBucketMemoryDeletionTask : public GlobalTask {
-public:
- VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
- VBucketPtr& vb,
- double delay);
-
- cb::const_char_buffer getDescription();
-
- bool run();
-
-private:
- EventuallyPersistentEngine& e;
- VBucketPtr vbucket;
- std::string description;
-};
static enum test_result test_bug7023(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
std::vector<std::string> keys;
// Make a vbucket mess.
- for (int j = 0; j < 10000; ++j) {
- std::stringstream ss;
- ss << "key" << j;
- std::string key(ss.str());
- keys.push_back(key);
+ const int nitems = 10000;
+ const int iterations = 5;
+ for (int j = 0; j < nitems; ++j) {
+ keys.push_back("key" + std::to_string(j));
}
std::vector<std::string>::iterator it;
- for (int j = 0; j < 5; ++j) {
+ for (int j = 0; j < iterations; ++j) {
check(set_vbucket_state(h, h1, 0, vbucket_state_dead),
"Failed set set vbucket 0 dead.");
- vbucketDelete(h, h1, 0, "async=0");
+ vbucketDelete(h, h1, 0);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS,
last_status.load(),
"Expected vbucket deletion to work.");
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
- return get_int_stat(h, h1, "ep_warmup_value_count", "warmup") == 10000 ? SUCCESS : FAIL;
+ checkeq(nitems,
+ get_int_stat(h, h1, "ep_warmup_value_count", "warmup"),
+ "Incorrect items following warmup");
}
return SUCCESS;
}
// [[1] Set our state to replica.
setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
- auto& lpWriterQ = *task_executor->getLpTaskQ()[WRITER_TASK_IDX];
- auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
-
// [[2]] Perform a vbucket reset. This will perform some work synchronously,
- // but also created 2 tasks and notifies the flusher:
- // 1. vbucket memory deletion (NONIO)
- // 2. vbucket disk deletion (WRITER)
- // 3. FlusherTask notified (WRITER)
+ // but also creates the task that will delete the VB.
+ // * vbucket memory and disk deletion (AUXIO)
// MB-19695: If we try to get the number of persisted deletes between
- // steps (2) and (3) running then an exception is thrown (and client
+ // steps [[2]] and [[3]] running then an exception is thrown (and client
// disconnected).
EXPECT_TRUE(store->resetVBucket(vbid));
+ auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
+ runNextTask(lpAuxioQ, "Removing (dead) vb:0 from memory and disk");
- runNextTask(lpNonioQ, "Removing (dead) vb:0 from memory");
- runNextTask(lpWriterQ, "Deleting VBucket:0");
-
- // [[2]] Ok, let's see if we can get TAP takeover stats. This will
+ // [[3]] Ok, let's see if we can get TAP takeover stats. This will
// fail with MB-19695.
// Dummy callback to pass into the stats function below.
auto dummy_cb = [](const char *key, const uint16_t klen,
#include "tapconnmap.h"
#include "tests/mock/mock_global_task.h"
#include "tests/module_tests/test_helpers.h"
-#include "vbucketmemorydeletiontask.h"
+#include "vbucketdeletiontask.h"
#include <thread>
HIDE_LOCKED_CAS |
TRACK_STATISTICS);
GetValue gv = store->get(makeStoredDocKey("key"), vbid, cookie, options);
- EXPECT_EQ(ENGINE_EWOULDBLOCK,gv.getStatus());
+ EXPECT_EQ(ENGINE_EWOULDBLOCK, gv.getStatus());
// Mark the status of the cookie so that we can see if notify is called
lock_mock_cookie(cookie);
c->status = ENGINE_E2BIG;
unlock_mock_cookie(cookie);
- // Manually run the VBucketMemoryDeletionTask task
- VBucketPtr vb = store->getVBucket(vbid);
- VBucketMemoryDeletionTask deletionTask(*engine, vb, /*delay*/0.0);
- deletionTask.run();
+ const void* deleteCookie = create_mock_cookie();
+
+ // lock the cookie, waitfor will release and enter the internal cond-var
+ lock_mock_cookie(deleteCookie);
+ store->deleteVBucket(vbid, deleteCookie);
+ waitfor_mock_cookie(deleteCookie);
+ unlock_mock_cookie(deleteCookie);
// Check the status of the cookie to see if the cookie status has changed
// to ENGINE_NOT_MY_VBUCKET, which means the notify was sent
EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, c->status);
+
+ destroy_mock_cookie(deleteCookie);
}
TEST_P(EPStoreEvictionTest, TouchCmdDuringBgFetch) {
#include "tasks.h"
#include "tests/mock/mock_global_task.h"
#include "tests/module_tests/test_helpers.h"
-#include "vbucketmemorydeletiontask.h"
+#include "vbucketdeletiontask.h"
#include <platform/dirutils.h>
#include <chrono>