storageProperties = new StorageProperties(true, true, true, true);
- stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
- stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
+ const auto size = GlobalTask::allTaskIds.size();
+ stats.schedulingHisto = new Histogram<ProcessClock::duration::rep>[size];
+ stats.taskRuntimeHisto = new Histogram<ProcessClock::duration::rep>[size];
for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
stats.schedulingHisto[i].reset();
++vb->numExpiredItems;
}
- void logQTime(TaskId taskType, hrtime_t enqTime) {
- stats.schedulingHisto[static_cast<int>(taskType)].add(enqTime);
+ void logQTime(TaskId taskType, const ProcessClock::duration enqTime) {
+ const auto ns_count = std::chrono::duration_cast
+ <std::chrono::microseconds>(enqTime).count();
+ stats.schedulingHisto[static_cast<int>(taskType)].add(ns_count);
}
- void logRunTime(TaskId taskType, hrtime_t runTime) {
- stats.taskRuntimeHisto[static_cast<int>(taskType)].add(runTime);
+ void logRunTime(TaskId taskType, const ProcessClock::duration runTime) {
+ const auto ns_count = std::chrono::duration_cast
+ <std::chrono::microseconds>(runTime).count();
+ stats.taskRuntimeHisto[static_cast<int>(taskType)].add(ns_count);
}
bool multiBGFetchEnabled() {
#include <memcached/protocol_binary.h>
#include <memcached/util.h>
#include <platform/platform.h>
+#include <platform/processclock.h>
#include <stdarg.h>
#include <cstdio>
return myEngine->getWorkLoadPolicy();
}
-void EpEngineTaskable::logQTime(TaskId id, hrtime_t enqTime) {
+void EpEngineTaskable::logQTime(TaskId id,
+ const ProcessClock::duration enqTime) {
myEngine->getEpStore()->logQTime(id, enqTime);
}
-void EpEngineTaskable::logRunTime(TaskId id, hrtime_t runTime) {
+void EpEngineTaskable::logRunTime(TaskId id,
+ const ProcessClock::duration runTime) {
myEngine->getEpStore()->logRunTime(id, runTime);
}
#include "vbucket.h"
#include <memcached/engine.h>
+#include <platform/processclock.h>
#include <string>
WorkLoadPolicy& getWorkLoadPolicy(void);
- void logQTime(TaskId id, hrtime_t enqTime);
+ void logQTime(TaskId id, const ProcessClock::duration enqTime);
- void logRunTime(TaskId id, hrtime_t runTime);
+ void logRunTime(TaskId id, const ProcessClock::duration runTime);
private:
EventuallyPersistentEngine* myEngine;
#include "config.h"
#include <algorithm>
+#include <chrono>
#include <platform/checked_snprintf.h>
+#include <platform/processclock.h>
#include <platform/sysinfo.h>
#include <queue>
#include <sstream>
cookie);
checked_snprintf(statname, sizeof(statname), "%s:%s:%d:runtime",
prefix, logname, static_cast<int>(i));
- add_casted_stat(statname, log[i].getDuration(), add_stat,
- cookie);
+ const auto duration_ms = std::chrono::duration_cast
+ <std::chrono::microseconds>(log[i].getDuration()).count();
+ add_casted_stat(statname, duration_ms, add_stat, cookie);
} catch (std::exception& error) {
LOG(EXTENSION_LOG_WARNING,
"showJobLog: Failed to build stats: %s", error.what());
if (strcmp(t->getStateName().c_str(), "running") == 0) {
checked_snprintf(statname, sizeof(statname), "%s:runtime", prefix);
- add_casted_stat(statname,
- (gethrtime() - t->getTaskStart()) / 1000, add_stat,
- cookie);
+ const auto duration = ProcessClock::now() - t->getTaskStart();
+ add_casted_stat(statname, std::chrono::duration_cast<
+ std::chrono::microseconds>(duration).count(),
+ add_stat, cookie);
}
checked_snprintf(statname, sizeof(statname), "%s:waketime", prefix);
- add_casted_stat(statname, t->getWaketime(), add_stat, cookie);
+ add_casted_stat(statname, to_ns_since_epoch(t->getWaketime()).count(),
+ add_stat, cookie);
checked_snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
- add_casted_stat(statname, t->getCurTime(), add_stat, cookie);
+ add_casted_stat(statname, to_ns_since_epoch(t->getCurTime()).count(),
+ add_stat, cookie);
} catch (std::exception& error) {
LOG(EXTENSION_LOG_WARNING,
"addWorkerStats: Failed to build stats: %s", error.what());
#include "config.h"
+#include <chrono>
#include <queue>
#include "common.h"
break;
}
- now = gethrtime();
+ updateCurrentTime();
if (TaskQueue *q = manager->nextTask(*this, tick)) {
EventuallyPersistentEngine *engine = currentTask->getEngine();
// Measure scheduling overhead as difference between the time
// that the task wanted to wake up and the current time
- hrtime_t woketime = currentTask->getWaketime();
- currentTask->getTaskable().logQTime(currentTask->getTypeId(),
- now > woketime ?
- (now - woketime) / 1000 : 0);
-
- taskStart = now;
+ const ProcessClock::time_point woketime =
+ currentTask->getWaketime();
+ currentTask->
+ getTaskable().logQTime(currentTask->getTypeId(),
+ getCurTime() > woketime ?
+ getCurTime() - woketime :
+ ProcessClock::duration::zero());
+ updateTaskStart();
rel_time_t startReltime = ep_current_time();
LOG(EXTENSION_LOG_DEBUG,
bool again = currentTask->run();
// Task done, log it ...
- hrtime_t runtime((gethrtime() - taskStart) / 1000);
+ const ProcessClock::duration runtime(ProcessClock::now() -
+ getTaskStart());
currentTask->getTaskable().logRunTime(currentTask->getTypeId(),
runtime);
if (engine) {
addLogEntry(currentTask->getTaskable().getName() +
currentTask->getDescription(),
q->getQueueType(), runtime, startReltime,
- (runtime >
- (hrtime_t)currentTask->maxExpectedDuration()));
+ (runtime > std::chrono::seconds(
+ currentTask->maxExpectedDuration())));
if (engine) {
ObjectRegistry::onSwitchThread(engine);
manager->doneWork(curTaskType);
manager->cancel(currentTask->uid, true);
} else {
- hrtime_t new_waketime;
// if a task has not set snooze, update its waketime to now
// before rescheduling for more accurate timing histograms
- currentTask->updateWaketimeIfLessThan(now);
+ currentTask->updateWaketimeIfLessThan(getCurTime());
// release capacity back to TaskQueue ..
manager->doneWork(curTaskType);
- new_waketime = q->reschedule(currentTask, curTaskType);
+ const ProcessClock::time_point new_waketime =
+ q->reschedule(currentTask, curTaskType);
// record min waketime ...
- if (new_waketime < waketime) {
- waketime = new_waketime;
+ if (new_waketime < getWaketime()) {
+ setWaketime(new_waketime);
}
LOG(EXTENSION_LOG_DEBUG, "%s: Reschedule a task"
" \"%s\" id %" PRIu64 "[%" PRIu64 " %" PRIu64 " |%" PRIu64 "]",
name.c_str(),
currentTask->getDescription().c_str(),
- uint64_t(currentTask->getId()), uint64_t(new_waketime),
- uint64_t(currentTask->getWaketime()),
- uint64_t(waketime.load()));
+ uint64_t(currentTask->getId()),
+ uint64_t(to_ns_since_epoch(new_waketime).count()),
+ uint64_t(to_ns_since_epoch(currentTask->getWaketime()).
+ count()),
+ uint64_t(to_ns_since_epoch(getWaketime()).count()));
}
}
}
void ExecutorThread::addLogEntry(const std::string &desc,
const task_type_t taskType,
- const hrtime_t runtime,
+ const ProcessClock::duration runtime,
rel_time_t t, bool isSlowJob) {
LockHolder lh(logMutex);
TaskLogEntry tle(desc, taskType, runtime, t);
#include "config.h"
+#include <chrono>
#include <deque>
#include <list>
#include <map>
+#include <mutex>
#include <queue>
#include <string>
#include <utility>
#include <vector>
+#include <platform/processclock.h>
#include <relaxed_atomic.h>
#include "atomic.h"
friend class TaskQueue;
public:
+ /* The AtomicProcessTime class provides an abstraction for ensuring that
+ * changes to a ProcessClock::time_point are atomic. This is achieved by
+ * ensuring that all accesses are protected by a mutex.
+ */
+ class AtomicProcessTime {
+ public:
+ AtomicProcessTime() {}
+ AtomicProcessTime(const ProcessClock::time_point& tp) : timepoint(tp) {}
+
+ void setTimePoint(const ProcessClock::time_point& tp) {
+ std::lock_guard<std::mutex> lock(mutex);
+ timepoint = tp;
+ }
+
+ ProcessClock::time_point getTimePoint() const {
+ std::lock_guard<std::mutex> lock(mutex);
+ return timepoint;
+ }
+
+ private:
+ mutable std::mutex mutex;
+ ProcessClock::time_point timepoint;
+ };
+
ExecutorThread(ExecutorPool *m, int startingQueue,
const std::string nm) : manager(m),
startIndex(startingQueue), name(nm),
- state(EXECUTOR_RUNNING), taskStart(0),
+ state(EXECUTOR_RUNNING),
+ now(ProcessClock::now()),
+ waketime(ProcessClock::time_point::max()),
+ taskStart(),
currentTask(NULL), curTaskType(NO_TASK_TYPE),
- tasklog(TASK_LOG_SIZE), slowjobs(TASK_LOG_SIZE) {
- now = gethrtime();
- waketime = hrtime_t(-1);
- }
+ tasklog(TASK_LOG_SIZE), slowjobs(TASK_LOG_SIZE) {}
~ExecutorThread() {
LOG(EXTENSION_LOG_INFO, "Executor killing %s", name.c_str());
}
}
- hrtime_t getTaskStart() const { return taskStart; }
+ ProcessClock::time_point getTaskStart() const {
+ return taskStart.getTimePoint();
+ }
+
+ void updateTaskStart(void) {
+ taskStart.setTimePoint(ProcessClock::now());
+ }
const std::string getStateName();
void addLogEntry(const std::string &desc, const task_type_t taskType,
- const hrtime_t runtime, rel_time_t startRelTime,
- bool isSlowJob);
+ const ProcessClock::duration runtime,
+ rel_time_t startRelTime, bool isSlowJob);
const std::vector<TaskLogEntry> getLog() {
LockHolder lh(logMutex);
return slowjobs.contents();
}
- const hrtime_t getWaketime(void) { return waketime; }
+ ProcessClock::time_point getWaketime() const {
+ return waketime.getTimePoint();
+ }
+
+ void setWaketime(const ProcessClock::time_point tp) {
+ waketime.setTimePoint(tp);
+ }
+
+ ProcessClock::time_point getCurTime() const {
+ return now.getTimePoint();
+ }
- const hrtime_t getCurTime(void) { return now; }
+ void updateCurrentTime(void) {
+ now.setTimePoint(ProcessClock::now());
+ }
protected:
const std::string name;
AtomicValue<executor_state_t> state;
- AtomicValue<hrtime_t> now; // record of current time
- AtomicValue<hrtime_t> waketime; // set to the earliest
-
- Couchbase::RelaxedAtomic<hrtime_t> taskStart;
+ // record of current time
+ AtomicProcessTime now;
+ // record of the earliest time the task can be woken-up
+ AtomicProcessTime waketime;
+ AtomicProcessTime taskStart;
Mutex currentTaskMutex; // Protects currentTask
ExTask currentTask;
}
}
- void updateCurrentTime() {
- now = gethrtime();
- }
-
ExTask& getCurrentTask() {
return currentTask;
}
#include "atomic.h"
#include <platform/histogram.h>
+#include <platform/processclock.h>
#include "memory_tracker.h"
#include "mutex.h"
#include "utility.h"
Histogram<hrtime_t> getMultiHisto;
// ! Histogram of various task wait times
- Histogram<hrtime_t> *schedulingHisto;
+ Histogram<ProcessClock::duration::rep> *schedulingHisto;
// ! Histogram of various task run times
- Histogram<hrtime_t> *taskRuntimeHisto;
+ Histogram<ProcessClock::duration::rep> *taskRuntimeHisto;
//! Reset all stats to reasonable values.
void reset() {
#include "utility.h"
+#include <chrono>
#include <condition_variable>
/**
}
void wait_for(std::unique_lock<std::mutex>& lock,
- const hrtime_t nanoSecs) {
- cond.wait_for(lock, std::chrono::nanoseconds(nanoSecs));
+ const std::chrono::nanoseconds nanoSecs) {
+ cond.wait_for(lock, nanoSecs);
}
void notify_all() {
#pragma once
+#include <platform/processclock.h>
+
#include "workload.h"
#include "tasks.h"
/*
Called with the time spent queued
*/
- virtual void logQTime(TaskId id, hrtime_t enqTime) = 0;
+ virtual void logQTime(TaskId id, const ProcessClock::duration enqTime) = 0;
/*
Called with the time spent running
*/
- virtual void logRunTime(TaskId id, hrtime_t runTime) = 0;
+ virtual void logRunTime(TaskId id,
+ const ProcessClock::duration runTime) = 0;
protected:
virtual ~Taskable() {}
#include "config.h"
+#include <platform/processclock.h>
+
+#include <chrono>
#include <deque>
#include <list>
#include <map>
public:
// This is useful for the ringbuffer to initialize
- TaskLogEntry() : name("invalid"), duration(0) {}
- TaskLogEntry(const std::string &n, task_type_t type, const hrtime_t d,
+ TaskLogEntry() : name("invalid"),
+ duration(ProcessClock::duration::zero()) {}
+
+ TaskLogEntry(const std::string &n, task_type_t type,
+ const ProcessClock::duration d,
rel_time_t t = 0)
: name(n), taskType(type), ts(t), duration(d) {}
task_type_t getTaskType() const { return taskType; }
/**
- * Get the amount of time (in microseconds) this job ran.
+ * Get the duration this job took to run.
*/
- hrtime_t getDuration() const { return duration; }
+ ProcessClock::duration getDuration() const { return duration; }
/**
* Get a timestamp indicating when this thing started.
std::string name;
task_type_t taskType;
rel_time_t ts;
- hrtime_t duration;
+ ProcessClock::duration duration;
};
#endif // SRC_TASKLOGENTRY_H_
#include "executorpool.h"
#include "executorthread.h"
+#include <cmath>
+
TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
name(nm), queueType(t), manager(m), sleepers(0)
{
bool TaskQueue::_doSleep(ExecutorThread &t,
std::unique_lock<std::mutex>& lock) {
- t.now = gethrtime();
- if (t.now < t.waketime && manager->trySleep(queueType)) {
+ t.updateCurrentTime();
+ if (t.getCurTime() < t.getWaketime() && manager->trySleep(queueType)) {
// Atomically switch from running to sleeping; iff we were previously
// running.
executor_state_t expected_state = EXECUTOR_RUNNING;
}
sleepers++;
// zzz....
- hrtime_t snooze_nsecs = t.waketime - t.now;
+ const auto snooze = t.getWaketime() - t.getCurTime();
- if (snooze_nsecs > MIN_SLEEP_TIME * 1000000000) {
+ if (snooze > std::chrono::seconds((int)round(MIN_SLEEP_TIME))) {
mutex.wait_for(lock, MIN_SLEEP_TIME);
} else {
- mutex.wait_for(lock, snooze_nsecs);
+ mutex.wait_for(lock, snooze);
}
// ... woke!
sleepers--;
EXECUTOR_RUNNING)) {
return false;
}
- t.now = gethrtime();
+ t.updateCurrentTime();
}
- t.waketime = hrtime_t(-1);
+ t.setWaketime(ProcessClock::time_point(ProcessClock::time_point::max()));
return true;
}
return ret; // shutting down
}
- size_t numToWake = _moveReadyTasks(t.now);
+ size_t numToWake = _moveReadyTasks(t.getCurTime());
if (!futureQueue.empty() && t.startIndex == queueType &&
- futureQueue.top()->getWaketime() < t.waketime) {
- t.waketime = futureQueue.top()->getWaketime(); // record earliest waketime
+ futureQueue.top()->getWaketime() < t.getWaketime()) {
+ // record earliest waketime
+ t.setWaketime(futureQueue.top()->getWaketime());
}
if (!readyQueue.empty() && readyQueue.top()->isdead()) {
return rv;
}
-size_t TaskQueue::_moveReadyTasks(hrtime_t tv) {
+size_t TaskQueue::_moveReadyTasks(const ProcessClock::time_point tv) {
if (!readyQueue.empty()) {
return 0;
}
}
}
-hrtime_t TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
- hrtime_t wakeTime;
+ProcessClock::time_point TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
+ ProcessClock::time_point wakeTime;
manager->doneWork(curTaskType);
LockHolder lh(mutex);
if (curTaskType == queueType) {
wakeTime = futureQueue.top()->getWaketime();
} else {
- wakeTime = hrtime_t(-1);
+ wakeTime = ProcessClock::time_point::max();
}
return wakeTime;
}
-hrtime_t TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
+ProcessClock::time_point TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
- hrtime_t rv = _reschedule(task, curTaskType);
+ const ProcessClock::time_point rv = _reschedule(task, curTaskType);
ObjectRegistry::onSwitchThread(epe);
return rv;
}
void TaskQueue::_wake(ExTask &task) {
size_t numReady = 0;
- const hrtime_t now = gethrtime();
+ const ProcessClock::time_point now = ProcessClock::now();
LockHolder lh(mutex);
LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %" PRIu64,
#include <queue>
+#include <platform/processclock.h>
+
#include "ringbuffer.h"
#include "task_type.h"
#include "tasks.h"
void schedule(ExTask &task);
- hrtime_t reschedule(ExTask &task, task_type_t &curTaskType);
+ ProcessClock::time_point reschedule(ExTask &task, task_type_t &curTaskType);
void checkPendingQueue(void);
private:
void _schedule(ExTask &task);
- hrtime_t _reschedule(ExTask &task, task_type_t &curTaskType);
+ ProcessClock::time_point _reschedule(ExTask &task,
+ task_type_t &curTaskType);
void _checkPendingQueue(void);
bool _fetchNextTask(ExecutorThread &thread, bool toSleep);
void _wake(ExTask &task);
bool _doSleep(ExecutorThread &thread, std::unique_lock<std::mutex>& lock);
void _doWake_UNLOCKED(size_t &numToWake);
- size_t _moveReadyTasks(hrtime_t tv);
+ size_t _moveReadyTasks(const ProcessClock::time_point tv);
ExTask _popReadyTask(void);
SyncObject mutex;
void GlobalTask::snooze(const double secs) {
if (secs == INT_MAX) {
setState(TASK_SNOOZED, TASK_RUNNING);
- updateWaketime(hrtime_t(-1));
+ updateWaketime(ProcessClock::time_point::max());
return;
}
- hrtime_t curTime = gethrtime();
+ const auto curTime = ProcessClock::now();
if (secs) {
setState(TASK_SNOOZED, TASK_RUNNING);
- waketime.store(curTime + hrtime_t(secs * 1000000000));
+ updateWaketime(curTime + std::chrono::seconds((int)round(secs)));
} else {
- waketime.store(curTime);
+ updateWaketime(curTime);
}
}
#include "config.h"
+#include <platform/processclock.h>
+
#include <array>
+#include <chrono>
#include <string>
#include "atomic.h"
#include "kvstore.h"
static AtomicValue<size_t> task_id_counter;
static size_t nextTaskId() { return task_id_counter.fetch_add(1); }
- hrtime_t getWaketime() {
- return waketime.load();
+ ProcessClock::time_point getWaketime() const {
+ const auto waketime_chrono = std::chrono::nanoseconds(waketime);
+ return ProcessClock::time_point(waketime_chrono);
}
- void updateWaketime(hrtime_t to) {
- waketime.store(to);
+ void updateWaketime(const ProcessClock::time_point tp) {
+ waketime = to_ns_since_epoch(tp).count();
}
- void updateWaketimeIfLessThan(hrtime_t to) {
- atomic_setIfBigger(waketime, to);
+ void updateWaketimeIfLessThan(const ProcessClock::time_point tp) {
+ const auto tp_ns = to_ns_since_epoch(tp).count();
+ if (tp_ns > waketime) {
+ waketime = tp_ns;
+ }
}
private:
- AtomicValue<hrtime_t> waketime; // used for priority_queue
+ /**
+ * We are using a uint64_t as opposed to ProcessTime::time_point because
+ * was want the access to be atomic without the use of a mutex.
+ * The reason for this is that the CompareByDueDate function has been shown
+ * to be pretty hot and we want to avoid the overhead of acquiring
+ * two mutexes (one for ExTask 1 and one for ExTask 2) for every invocation
+ * of the function.
+ */
+ std::atomic<int64_t> waketime; // used for priority_queue
};
typedef SingleThreadedRCPtr<GlobalTask> ExTask;