#include "warmup.h"
#include "connmap.h"
#include "replicationthrottle.h"
+#include "tasks.h"
class StatsValueChangeListener : public ValueChangedListener {
public:
bool residentRatioLessThanThreshold;
};
-class VBucketMemoryDeletionTask : public GlobalTask {
-public:
- VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
- RCPtr<VBucket> &vb, double delay) :
- GlobalTask(&eng,
- TaskId::VBucketMemoryDeletionTask, delay, true),
- e(eng), vbucket(vb), vbid(vb->getId()) { }
-
- std::string getDescription() {
- std::stringstream ss;
- ss << "Removing (dead) vbucket " << vbid << " from memory";
- return ss.str();
- }
-
- bool run(void) {
- vbucket->notifyAllPendingConnsFailed(e);
- vbucket->ht.clear();
- vbucket.reset();
- return false;
- }
-
-private:
- EventuallyPersistentEngine &e;
- RCPtr<VBucket> vbucket;
- uint16_t vbid;
-};
-
class PendingOpsNotification : public GlobalTask {
public:
PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
#include "ep_engine.h"
#include "flusher.h"
#include "tasks.h"
+#include "vbucket.h"
#include "warmup.h"
-#include "ep_engine.h"
#include <climits>
return false;
}
+VBucketMemoryDeletionTask::VBucketMemoryDeletionTask(
+ EventuallyPersistentEngine& eng, RCPtr<VBucket>& vb, double delay)
+ : GlobalTask(&eng, TaskId::VBucketMemoryDeletionTask, delay, true),
+ e(eng),
+ vbucket(vb) {
+ desc = "Removing (dead) vb:" + std::to_string(vbucket->getId()) +
+ " from memory";
+}
+
+bool VBucketMemoryDeletionTask::run() {
+ vbucket->notifyAllPendingConnsFailed(e);
+ vbucket->ht.clear();
+ vbucket.reset();
+ return false;
+}
+
WorkLoadMonitor::WorkLoadMonitor(EventuallyPersistentEngine *e,
bool completeBeforeShutdown) :
GlobalTask(e, TaskId::WorkLoadMonitor, WORKLOAD_MONITOR_FREQ,
class Flusher;
class Warmup;
class Taskable;
+class VBucket;
class GlobalTask : public RCValue {
friend class CompareByDueDate;
hrtime_t init;
};
+/*
+ * This is a NONIO task called as part of VB deletion. The task is responsible
+ * for clearing all the VBucket's pending operations and for clearing the
+ * VBucket's hash table.
+ */
+class VBucketMemoryDeletionTask : public GlobalTask {
+public:
+ VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
+ RCPtr<VBucket>& vb,
+ double delay);
+
+ std::string getDescription() {
+ return desc;
+ }
+
+ bool run();
+
+private:
+ EventuallyPersistentEngine& e;
+ RCPtr<VBucket> vbucket;
+ std::string desc;
+};
+
/**
* A task that monitors if a bucket is read-heavy, write-heavy, or mixed.
*/
stats.decrDiskQueueSize(dirtyQueueSize.load());
- size_t num_pending_fetches = 0;
- vb_bgfetch_queue_t::iterator itr = pendingBGFetches.begin();
- for (; itr != pendingBGFetches.end(); ++itr) {
- vb_bgfetch_item_ctx_t &bg_itm_ctx = itr->second;
- std::list<VBucketBGFetchItem *> &bgitems = bg_itm_ctx.bgfetched_list;
- std::list<VBucketBGFetchItem *>::iterator vit = bgitems.begin();
- for (; vit != bgitems.end(); ++vit) {
- delete (*vit);
- ++num_pending_fetches;
- }
- }
- stats.numRemainingBgJobs.fetch_sub(num_pending_fetches);
- pendingBGFetches.clear();
delete failovers;
// Clear out the bloomfilter(s)
}
lh.unlock();
+ // Add all the pendingBGFetches to the toNotify map
+ {
+ LockHolder lh(pendingBGFetchesLock);
+ size_t num_of_deleted_pending_fetches = 0;
+ for (auto& bgf : pendingBGFetches) {
+ vb_bgfetch_item_ctx_t& bg_itm_ctx = bgf.second;
+ for (auto& bgitem : bg_itm_ctx.bgfetched_list) {
+ toNotify[bgitem->cookie] = ENGINE_NOT_MY_VBUCKET;
+ e.storeEngineSpecific(bgitem->cookie, nullptr);
+ delete bgitem;
+ ++num_of_deleted_pending_fetches;
+ }
+ }
+ stats.numRemainingBgJobs.fetch_sub(num_of_deleted_pending_fetches);
+ pendingBGFetches.clear();
+ }
+
std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
for (; itr != toNotify.end(); ++itr) {
e.notifyIOComplete(itr->first, itr->second);
// disconnected).
EXPECT_TRUE(store->resetVBucket(vbid));
- runNextTask(lpNonioQ, "Removing (dead) vbucket 0 from memory");
+ runNextTask(lpNonioQ, "Removing (dead) vb:0 from memory");
runNextTask(lpWriterQ, "Deleting VBucket:0");
// [[2]] Ok, let's see if we can get TAP takeover stats. This will
#include "flusher.h"
#include "../mock/mock_dcp_producer.h"
#include "replicationthrottle.h"
+#include "tasks.h"
#include "programs/engine_testapp/mock_server.h"
/*wantsDeleted*/false));
}
+// Test to ensure all pendingBGfetches are deleted when the
+// VBucketMemoryDeletionTask is run
+TEST_P(EPStoreEvictionTest, MB_21976) {
+ // Store an item, then eject it.
+ std::string key("key");
+ auto item = store_item(vbid, key, "value");
+ flush_vbucket_to_disk(vbid);
+ const char* msg;
+ size_t msg_size{sizeof(msg)};
+ EXPECT_EQ(ENGINE_SUCCESS, store->evictKey(key, 0, &msg, &msg_size));
+
+ // Perform a get, which should EWOULDBLOCK
+ get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
+ HONOR_STATES |
+ TRACK_REFERENCE |
+ DELETE_TEMP |
+ HIDE_LOCKED_CAS |
+ TRACK_STATISTICS);
+ GetValue gv = store->get(key, vbid, cookie, options);
+ EXPECT_EQ(ENGINE_EWOULDBLOCK,gv.getStatus());
+ // Mark the status of the cookie so that we can see if notify is called
+ struct mock_connstruct* c = (struct mock_connstruct *)cookie;
+ c->status = ENGINE_E2BIG;
+
+ // Manually run the VBucketMemoryDeletionTask task
+ RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketMemoryDeletionTask deletionTask(*engine, vb, /*delay*/0.0);
+ deletionTask.run();
+
+ // Check the status of the cookie to see if the cookie status has changed
+ // to ENGINE_NOT_MY_VBUCKET, which means the notify was sent
+ EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, c->status);
+}
// Test cases which run in both Full and Value eviction
INSTANTIATE_TEST_CASE_P(FullAndValueEviction,