void EPVBucket::notifyHighPriorityRequests(EventuallyPersistentEngine& engine,
uint64_t idNum,
HighPriorityVBNotify notifyType) {
- auto toNotify = getHighPriorityNotifies(engine, idNum, notifyType);
+ auto toNotify = getHighPriorityNotifications(engine, idNum, notifyType);
if (shard) {
shard->highPriorityCount.fetch_sub(toNotify.size());
};
EphemeralBucket::EphemeralBucket(EventuallyPersistentEngine& theEngine)
- : KVBucket(theEngine) {
+ : KVBucket(theEngine),
+ notifyHpReqTask(make_STRCPtr<NotifyHighPriorityReqTask>(theEngine)) {
/* We always have VALUE_ONLY eviction policy because a key not
present in HashTable implies key not present at all.
Note: This should not be confused with the eviction algorithm
eviction_policy = VALUE_ONLY;
}
+EphemeralBucket::~EphemeralBucket() {
+ ExecutorPool::get()->cancel(notifyHpReqTask->getId());
+}
+
bool EphemeralBucket::initialize() {
KVBucket::initialize();
auto& config = engine.getConfiguration();
config.addValueChangedListener("ephemeral_metadata_purge_interval",
new EphemeralValueChangedListener(*this));
+ // High priority vbucket request notification task
+ ExecutorPool::get()->schedule(notifyHpReqTask);
+
return true;
}
/* In ephemeral buckets we must notify high priority requests as well.
We do not wait for persistence to notify high priority requests */
VBucketPtr vb = getVBucket(vbid);
- vb->notifyHighPriorityRequests(
+
+ auto toNotify = vb->getHighPriorityNotifications(
engine, notifyCtx.bySeqno, HighPriorityVBNotify::Seqno);
+
+ if (toNotify.size() && notifyHpReqTask) {
+ notifyHpReqTask->wakeup(std::move(toNotify));
+ }
}
// Protected methods //////////////////////////////////////////////////////////
ephPending.seqlistStaleMetadataBytes);
#undef DO_STAT
}
+
+EphemeralBucket::NotifyHighPriorityReqTask::NotifyHighPriorityReqTask(
+ EventuallyPersistentEngine& e)
+ : GlobalTask(&e,
+ TaskId::NotifyHighPriorityReqTask,
+ std::numeric_limits<int>::max(),
+ false) {
+}
+
+bool EphemeralBucket::NotifyHighPriorityReqTask::run() {
+ std::map<const void*, ENGINE_ERROR_CODE> notifyQ;
+ {
+ /* It is necessary that the toNotifyLock is not held while
+ actually notifying. */
+ std::lock_guard<std::mutex> lg(toNotifyLock);
+ notifyQ = std::move(toNotify);
+ }
+
+ for (auto& notify : notifyQ) {
+ LOG(EXTENSION_LOG_NOTICE,
+ "%s for cookie :%p and status %d",
+ to_string(getDescription()).c_str(),
+ notify.first,
+ notify.second);
+ engine->notifyIOComplete(notify.first, notify.second);
+ }
+
+ /* Lets assume that the task will be explicitly woken */
+ snooze(std::numeric_limits<int>::max());
+
+ /* But, also check if another thread already tried to wake up the task */
+ bool scheduleSoon = false;
+ {
+ std::lock_guard<std::mutex> lg(toNotifyLock);
+ if (toNotify.size()) {
+ scheduleSoon = true;
+ }
+ }
+
+ if (scheduleSoon) {
+ /* Good to call snooze without holding toNotifyLock */
+ snooze(0);
+ }
+
+ /* Run the task again after snoozing */
+ return true;
+}
+
+cb::const_char_buffer
+EphemeralBucket::NotifyHighPriorityReqTask::getDescription() {
+ return "Ephemeral: Notify HighPriority Request";
+}
+
+void EphemeralBucket::NotifyHighPriorityReqTask::wakeup(
+ std::map<const void*, ENGINE_ERROR_CODE> notifies) {
+ {
+ /* Add the connections to be notified */
+ std::lock_guard<std::mutex> lg(toNotifyLock);
+ toNotify.insert(make_move_iterator(begin(notifies)),
+ make_move_iterator(end(notifies)));
+ }
+
+ /* wake up the task */
+ ExecutorPool::get()->wake(getId());
+}
public:
EphemeralBucket(EventuallyPersistentEngine& theEngine);
+ ~EphemeralBucket();
+
bool initialize() override;
/// Eviction not supported for Ephemeral buckets - without some backing
/// Task responsible for purging in-memory tombstones.
ExTask tombstonePurgerTask;
+
+private:
+ /**
+ * Task responsible for notifying high priority requests (usually during
+ * rebalance)
+ */
+ class NotifyHighPriorityReqTask : public GlobalTask {
+ public:
+ NotifyHighPriorityReqTask(EventuallyPersistentEngine& e);
+
+ bool run() override;
+
+ cb::const_char_buffer getDescription() override;
+
+ /**
+ * Adds the connections to be notified by the task and then wakes up
+ * the task.
+ *
+ * @param notifies Map of connections to be notified
+ */
+ void wakeup(std::map<const void*, ENGINE_ERROR_CODE> notifies);
+
+ private:
+ /* All the notifications to be called by the task */
+ std::map<const void*, ENGINE_ERROR_CODE> toNotify;
+
+ /* Serialize access to write/read of toNotify */
+ std::mutex toNotifyLock;
+ };
+
+ // Private member variables ///////////////////////////////////////////////
+ SingleThreadedRCPtr<NotifyHighPriorityReqTask> notifyHpReqTask;
};
EventuallyPersistentEngine& engine,
uint64_t idNum,
HighPriorityVBNotify notifyType) {
- auto toNotify = getHighPriorityNotifies(engine, idNum, notifyType);
-
- for (auto& notify : toNotify) {
- engine.notifyIOComplete(notify.first, notify.second);
- }
+ throw std::logic_error(
+ "EphemeralVBucket::notifyHighPriorityRequests() is not valid. "
+ "Called on vb " +
+ std::to_string(getId()));
}
void EphemeralVBucket::notifyAllPendingConnsFailed(
// Non-IO tasks
TASK(PendingOpsNotification, NONIO_TASK_IDX, 0)
+TASK(NotifyHighPriorityReqTask, NONIO_TASK_IDX, 0)
TASK(Processor, NONIO_TASK_IDX, 0)
TASK(FlushAllTask, NONIO_TASK_IDX, 3)
TASK(ConnNotifierCallback, NONIO_TASK_IDX, 5)
cookie);
}
-std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifies(
+std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
EventuallyPersistentEngine& engine,
uint64_t idNum,
HighPriorityVBNotify notifyType) {
virtual void notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) = 0;
+ /**
+ * Get high priority notifications for a seqno or checkpoint persisted
+ *
+ * @param engine Ref to ep-engine
+ * @param id seqno or checkpoint id for which notifies are to be found
+ * @param notifyType indicating notify for seqno or chk persistence
+ *
+ * @return map of notifications with conn cookie as the key and notify
+ * status as the value
+ */
+ std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifications(
+ EventuallyPersistentEngine& engine,
+ uint64_t idNum,
+ HighPriorityVBNotify notifyType);
+
size_t getHighPriorityChkSize() {
return numHpVBReqs.load();
}
HighPriorityVBNotify reqType);
/**
- * Get high priority notifications for a seqno or checkpoint persisted
- *
- * @param engine Ref to ep-engine
- * @param id seqno or checkpoint id for which notifies are to be found
- * @param notifyType indicating notify for seqno or chk persistence
- *
- * @return map of notifies with conn cookie as the key and notify status as
- * the value
- */
- std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifies(
- EventuallyPersistentEngine& engine,
- uint64_t idNum,
- HighPriorityVBNotify notifyType);
-
- /**
* Get all high priority notifications as temporary failures because they
* could not be completed.
*