MB-20079: Use std::chrono::steady_clock (ProcessClock) 99/69899/2
authorDaniel Owen <owend@couchbase.com>
Tue, 15 Nov 2016 10:33:24 +0000 (10:33 +0000)
committerDave Rigby <daver@couchbase.com>
Wed, 16 Nov 2016 16:19:23 +0000 (16:19 +0000)
Change task scheduling to use ProcessClock which is not
affected by changes to wall clock time.

Change-Id: I2fc9688abb782fe2c9e80efb6da840be3643d4a5
Reviewed-on: http://review.couchbase.org/69899
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
16 files changed:
src/ep.cc
src/ep.h
src/ep_engine.cc
src/ep_engine.h
src/executorpool.cc
src/executorthread.cc
src/executorthread.h
src/fakes/fake_executorpool.h
src/stats.h
src/syncobject.h
src/taskable.h
src/tasklogentry.h
src/taskqueue.cc
src/taskqueue.h
src/tasks.cc
src/tasks.h

index dc097cb..208d526 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -290,8 +290,9 @@ EventuallyPersistentStore::EventuallyPersistentStore(
 
     storageProperties = new StorageProperties(true, true, true, true);
 
-    stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
-    stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
+    const auto size = GlobalTask::allTaskIds.size();
+    stats.schedulingHisto = new Histogram<ProcessClock::duration::rep>[size];
+    stats.taskRuntimeHisto = new Histogram<ProcessClock::duration::rep>[size];
 
     for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
         stats.schedulingHisto[i].reset();
index 493bddc..2ab9d46 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -763,12 +763,16 @@ public:
         ++vb->numExpiredItems;
     }
 
-    void logQTime(TaskId taskType, hrtime_t enqTime) {
-        stats.schedulingHisto[static_cast<int>(taskType)].add(enqTime);
+    void logQTime(TaskId taskType, const ProcessClock::duration enqTime) {
+        const auto ns_count = std::chrono::duration_cast
+                <std::chrono::microseconds>(enqTime).count();
+        stats.schedulingHisto[static_cast<int>(taskType)].add(ns_count);
     }
 
-    void logRunTime(TaskId taskType, hrtime_t runTime) {
-        stats.taskRuntimeHisto[static_cast<int>(taskType)].add(runTime);
+    void logRunTime(TaskId taskType, const ProcessClock::duration runTime) {
+        const auto ns_count = std::chrono::duration_cast
+                <std::chrono::microseconds>(runTime).count();
+        stats.taskRuntimeHisto[static_cast<int>(taskType)].add(ns_count);
     }
 
     bool multiBGFetchEnabled() {
index d694e12..3d9eeb6 100644 (file)
@@ -23,6 +23,7 @@
 #include <memcached/protocol_binary.h>
 #include <memcached/util.h>
 #include <platform/platform.h>
+#include <platform/processclock.h>
 #include <stdarg.h>
 
 #include <cstdio>
@@ -6496,10 +6497,12 @@ WorkLoadPolicy&  EpEngineTaskable::getWorkLoadPolicy(void) {
     return myEngine->getWorkLoadPolicy();
 }
 
-void EpEngineTaskable::logQTime(TaskId id, hrtime_t enqTime) {
+void EpEngineTaskable::logQTime(TaskId id,
+                                const ProcessClock::duration enqTime) {
     myEngine->getEpStore()->logQTime(id, enqTime);
 }
 
-void EpEngineTaskable::logRunTime(TaskId id, hrtime_t runTime) {
+void EpEngineTaskable::logRunTime(TaskId id,
+                                  const ProcessClock::duration runTime) {
     myEngine->getEpStore()->logRunTime(id, runTime);
 }
index 6e9f6b3..01a91c3 100644 (file)
@@ -26,6 +26,7 @@
 #include "vbucket.h"
 
 #include <memcached/engine.h>
+#include <platform/processclock.h>
 
 #include <string>
 
@@ -78,9 +79,9 @@ public:
 
     WorkLoadPolicy& getWorkLoadPolicy(void);
 
-    void logQTime(TaskId id, hrtime_t enqTime);
+    void logQTime(TaskId id, const ProcessClock::duration enqTime);
 
-    void logRunTime(TaskId id, hrtime_t runTime);
+    void logRunTime(TaskId id, const ProcessClock::duration runTime);
 
 private:
     EventuallyPersistentEngine* myEngine;
index 5b9a32c..4b53bc9 100644 (file)
@@ -18,7 +18,9 @@
 #include "config.h"
 
 #include <algorithm>
+#include <chrono>
 #include <platform/checked_snprintf.h>
+#include <platform/processclock.h>
 #include <platform/sysinfo.h>
 #include <queue>
 #include <sstream>
@@ -736,8 +738,9 @@ static void showJobLog(const char *logname, const char *prefix,
                             cookie);
             checked_snprintf(statname, sizeof(statname), "%s:%s:%d:runtime",
                              prefix, logname, static_cast<int>(i));
-            add_casted_stat(statname, log[i].getDuration(), add_stat,
-                            cookie);
+            const auto duration_ms = std::chrono::duration_cast
+                    <std::chrono::microseconds>(log[i].getDuration()).count();
+            add_casted_stat(statname, duration_ms, add_stat, cookie);
         } catch (std::exception& error) {
             LOG(EXTENSION_LOG_WARNING,
                 "showJobLog: Failed to build stats: %s", error.what());
@@ -763,14 +766,17 @@ static void addWorkerStats(const char *prefix, ExecutorThread *t,
 
         if (strcmp(t->getStateName().c_str(), "running") == 0) {
             checked_snprintf(statname, sizeof(statname), "%s:runtime", prefix);
-            add_casted_stat(statname,
-                            (gethrtime() - t->getTaskStart()) / 1000, add_stat,
-                            cookie);
+            const auto duration = ProcessClock::now() - t->getTaskStart();
+            add_casted_stat(statname, std::chrono::duration_cast<
+                            std::chrono::microseconds>(duration).count(),
+                            add_stat, cookie);
         }
         checked_snprintf(statname, sizeof(statname), "%s:waketime", prefix);
-        add_casted_stat(statname, t->getWaketime(), add_stat, cookie);
+        add_casted_stat(statname, to_ns_since_epoch(t->getWaketime()).count(),
+                        add_stat, cookie);
         checked_snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
-        add_casted_stat(statname, t->getCurTime(), add_stat, cookie);
+        add_casted_stat(statname, to_ns_since_epoch(t->getCurTime()).count(),
+                        add_stat, cookie);
     } catch (std::exception& error) {
         LOG(EXTENSION_LOG_WARNING,
             "addWorkerStats: Failed to build stats: %s", error.what());
index 709340d..c2423f7 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "config.h"
 
+#include <chrono>
 #include <queue>
 
 #include "common.h"
@@ -78,7 +79,7 @@ void ExecutorThread::run() {
             break;
         }
 
-        now = gethrtime();
+        updateCurrentTime();
         if (TaskQueue *q = manager->nextTask(*this, tick)) {
             EventuallyPersistentEngine *engine = currentTask->getEngine();
 
@@ -97,12 +98,14 @@ void ExecutorThread::run() {
 
             // Measure scheduling overhead as difference between the time
             // that the task wanted to wake up and the current time
-            hrtime_t woketime = currentTask->getWaketime();
-            currentTask->getTaskable().logQTime(currentTask->getTypeId(),
-                                                now > woketime ?
-                                                (now - woketime) / 1000 : 0);
-
-            taskStart = now;
+            const ProcessClock::time_point woketime =
+                    currentTask->getWaketime();
+            currentTask->
+            getTaskable().logQTime(currentTask->getTypeId(),
+                                   getCurTime() > woketime ?
+                                           getCurTime() - woketime :
+                                           ProcessClock::duration::zero());
+            updateTaskStart();
             rel_time_t startReltime = ep_current_time();
 
             LOG(EXTENSION_LOG_DEBUG,
@@ -115,7 +118,8 @@ void ExecutorThread::run() {
             bool again = currentTask->run();
 
             // Task done, log it ...
-            hrtime_t runtime((gethrtime() - taskStart) / 1000);
+            const ProcessClock::duration runtime(ProcessClock::now() -
+                                                 getTaskStart());
             currentTask->getTaskable().logRunTime(currentTask->getTypeId(),
                                                   runtime);
             if (engine) {
@@ -125,8 +129,8 @@ void ExecutorThread::run() {
             addLogEntry(currentTask->getTaskable().getName() +
                         currentTask->getDescription(),
                        q->getQueueType(), runtime, startReltime,
-                       (runtime >
-                       (hrtime_t)currentTask->maxExpectedDuration()));
+                       (runtime > std::chrono::seconds(
+                               currentTask->maxExpectedDuration())));
 
             if (engine) {
                 ObjectRegistry::onSwitchThread(engine);
@@ -138,25 +142,27 @@ void ExecutorThread::run() {
                 manager->doneWork(curTaskType);
                 manager->cancel(currentTask->uid, true);
             } else {
-                hrtime_t new_waketime;
                 // if a task has not set snooze, update its waketime to now
                 // before rescheduling for more accurate timing histograms
-                currentTask->updateWaketimeIfLessThan(now);
+                currentTask->updateWaketimeIfLessThan(getCurTime());
 
                 // release capacity back to TaskQueue ..
                 manager->doneWork(curTaskType);
-                new_waketime = q->reschedule(currentTask, curTaskType);
+                const ProcessClock::time_point new_waketime =
+                        q->reschedule(currentTask, curTaskType);
                 // record min waketime ...
-                if (new_waketime < waketime) {
-                    waketime = new_waketime;
+                if (new_waketime < getWaketime()) {
+                    setWaketime(new_waketime);
                 }
                 LOG(EXTENSION_LOG_DEBUG, "%s: Reschedule a task"
                         " \"%s\" id %" PRIu64 "[%" PRIu64 " %" PRIu64 " |%" PRIu64 "]",
                         name.c_str(),
                         currentTask->getDescription().c_str(),
-                        uint64_t(currentTask->getId()), uint64_t(new_waketime),
-                        uint64_t(currentTask->getWaketime()),
-                        uint64_t(waketime.load()));
+                        uint64_t(currentTask->getId()),
+                        uint64_t(to_ns_since_epoch(new_waketime).count()),
+                        uint64_t(to_ns_since_epoch(currentTask->getWaketime()).
+                                 count()),
+                        uint64_t(to_ns_since_epoch(getWaketime()).count()));
             }
         }
     }
@@ -173,7 +179,7 @@ void ExecutorThread::setCurrentTask(ExTask newTask) {
 
 void ExecutorThread::addLogEntry(const std::string &desc,
                                  const task_type_t taskType,
-                                 const hrtime_t runtime,
+                                 const ProcessClock::duration runtime,
                                  rel_time_t t, bool isSlowJob) {
     LockHolder lh(logMutex);
     TaskLogEntry tle(desc, taskType, runtime, t);
index e850114..cf68188 100644 (file)
 
 #include "config.h"
 
+#include <chrono>
 #include <deque>
 #include <list>
 #include <map>
+#include <mutex>
 #include <queue>
 #include <string>
 #include <utility>
 #include <vector>
 
+#include <platform/processclock.h>
 #include <relaxed_atomic.h>
 
 #include "atomic.h"
@@ -58,15 +61,39 @@ class ExecutorThread {
     friend class TaskQueue;
 public:
 
+    /* The AtomicProcessTime class provides an abstraction for ensuring that
+     * changes to a ProcessClock::time_point are atomic.  This is achieved by
+     * ensuring that all accesses are protected by a mutex.
+     */
+    class AtomicProcessTime {
+    public:
+        AtomicProcessTime() {}
+        AtomicProcessTime(const ProcessClock::time_point& tp) : timepoint(tp) {}
+
+        void setTimePoint(const ProcessClock::time_point& tp) {
+            std::lock_guard<std::mutex> lock(mutex);
+            timepoint = tp;
+        }
+
+        ProcessClock::time_point getTimePoint() const {
+            std::lock_guard<std::mutex> lock(mutex);
+            return timepoint;
+        }
+
+    private:
+        mutable std::mutex mutex;
+        ProcessClock::time_point timepoint;
+    };
+
     ExecutorThread(ExecutorPool *m, int startingQueue,
                    const std::string nm) : manager(m),
           startIndex(startingQueue), name(nm),
-          state(EXECUTOR_RUNNING), taskStart(0),
+          state(EXECUTOR_RUNNING),
+          now(ProcessClock::now()),
+          waketime(ProcessClock::time_point::max()),
+          taskStart(),
           currentTask(NULL), curTaskType(NO_TASK_TYPE),
-          tasklog(TASK_LOG_SIZE), slowjobs(TASK_LOG_SIZE) {
-              now = gethrtime();
-              waketime = hrtime_t(-1);
-    }
+          tasklog(TASK_LOG_SIZE), slowjobs(TASK_LOG_SIZE) {}
 
     ~ExecutorThread() {
         LOG(EXTENSION_LOG_INFO, "Executor killing %s", name.c_str());
@@ -107,13 +134,19 @@ public:
         }
     }
 
-    hrtime_t getTaskStart() const { return taskStart; }
+    ProcessClock::time_point getTaskStart() const {
+        return taskStart.getTimePoint();
+    }
+
+    void updateTaskStart(void) {
+        taskStart.setTimePoint(ProcessClock::now());
+    }
 
     const std::string getStateName();
 
     void addLogEntry(const std::string &desc, const task_type_t taskType,
-                     const hrtime_t runtime, rel_time_t startRelTime,
-                     bool isSlowJob);
+                     const ProcessClock::duration runtime,
+                     rel_time_t startRelTime, bool isSlowJob);
 
     const std::vector<TaskLogEntry> getLog() {
         LockHolder lh(logMutex);
@@ -125,9 +158,21 @@ public:
         return slowjobs.contents();
     }
 
-    const hrtime_t getWaketime(void) { return waketime; }
+    ProcessClock::time_point getWaketime() const {
+        return waketime.getTimePoint();
+    }
+
+    void setWaketime(const ProcessClock::time_point tp) {
+        waketime.setTimePoint(tp);
+    }
+
+    ProcessClock::time_point getCurTime() const {
+        return now.getTimePoint();
+    }
 
-    const hrtime_t getCurTime(void) { return now; }
+    void updateCurrentTime(void) {
+        now.setTimePoint(ProcessClock::now());
+    }
 
 protected:
 
@@ -137,10 +182,11 @@ protected:
     const std::string name;
     AtomicValue<executor_state_t> state;
 
-    AtomicValue<hrtime_t> now;  // record of current time
-    AtomicValue<hrtime_t> waketime; // set to the earliest
-
-    Couchbase::RelaxedAtomic<hrtime_t> taskStart;
+    // record of current time
+    AtomicProcessTime now;
+    // record of the earliest time the task can be woken-up
+    AtomicProcessTime waketime;
+    AtomicProcessTime taskStart;
 
     Mutex currentTaskMutex; // Protects currentTask
     ExTask currentTask;
index fc1796a..766d492 100644 (file)
@@ -172,10 +172,6 @@ public:
         }
     }
 
-    void updateCurrentTime() {
-        now = gethrtime();
-    }
-
     ExTask& getCurrentTask() {
         return currentTask;
     }
index 66834e5..c1f5837 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "atomic.h"
 #include <platform/histogram.h>
+#include <platform/processclock.h>
 #include "memory_tracker.h"
 #include "mutex.h"
 #include "utility.h"
@@ -559,10 +560,10 @@ public:
     Histogram<hrtime_t> getMultiHisto;
 
     // ! Histogram of various task wait times
-    Histogram<hrtime_t> *schedulingHisto;
+    Histogram<ProcessClock::duration::rep> *schedulingHisto;
 
     // ! Histogram of various task run times
-    Histogram<hrtime_t> *taskRuntimeHisto;
+    Histogram<ProcessClock::duration::rep> *taskRuntimeHisto;
 
     //! Reset all stats to reasonable values.
     void reset() {
index e62bc1d..d2dbaba 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "utility.h"
 
+#include <chrono>
 #include <condition_variable>
 
 /**
@@ -45,8 +46,8 @@ public:
     }
 
     void wait_for(std::unique_lock<std::mutex>& lock,
-                  const hrtime_t nanoSecs) {
-        cond.wait_for(lock, std::chrono::nanoseconds(nanoSecs));
+                  const std::chrono::nanoseconds nanoSecs) {
+        cond.wait_for(lock, nanoSecs);
     }
 
     void notify_all() {
index 69a17e9..3b60ca8 100644 (file)
@@ -21,6 +21,8 @@
 
 #pragma once
 
+#include <platform/processclock.h>
+
 #include "workload.h"
 #include "tasks.h"
 
@@ -64,12 +66,13 @@ public:
     /*
         Called with the time spent queued
     */
-    virtual void logQTime(TaskId id, hrtime_t enqTime) = 0;
+    virtual void logQTime(TaskId id, const ProcessClock::duration enqTime) = 0;
 
     /*
         Called with the time spent running
     */
-    virtual void logRunTime(TaskId id, hrtime_t runTime) = 0;
+    virtual void logRunTime(TaskId id,
+                            const ProcessClock::duration runTime) = 0;
 
 protected:
     virtual ~Taskable() {}
index 4cdde58..0c62345 100644 (file)
@@ -19,6 +19,9 @@
 
 #include "config.h"
 
+#include <platform/processclock.h>
+
+#include <chrono>
 #include <deque>
 #include <list>
 #include <map>
@@ -35,8 +38,11 @@ class TaskLogEntry {
 public:
 
     // This is useful for the ringbuffer to initialize
-    TaskLogEntry() : name("invalid"), duration(0) {}
-    TaskLogEntry(const std::string &n, task_type_t type, const hrtime_t d,
+    TaskLogEntry() : name("invalid"),
+                     duration(ProcessClock::duration::zero()) {}
+
+    TaskLogEntry(const std::string &n, task_type_t type,
+                 const ProcessClock::duration d,
                  rel_time_t t = 0)
         : name(n), taskType(type), ts(t), duration(d) {}
 
@@ -51,9 +57,9 @@ public:
      task_type_t getTaskType() const { return taskType; }
 
     /**
-     * Get the amount of time (in microseconds) this job ran.
+     * Get the duration this job took to run.
      */
-    hrtime_t getDuration() const { return duration; }
+    ProcessClock::duration getDuration() const { return duration; }
 
     /**
      * Get a timestamp indicating when this thing started.
@@ -64,7 +70,7 @@ private:
     std::string name;
     task_type_t taskType;
     rel_time_t ts;
-    hrtime_t duration;
+    ProcessClock::duration duration;
 };
 
 #endif  // SRC_TASKLOGENTRY_H_
index ecffa01..2e65a11 100644 (file)
@@ -20,6 +20,8 @@
 #include "executorpool.h"
 #include "executorthread.h"
 
+#include <cmath>
+
 TaskQueue::TaskQueue(ExecutorPool *m, task_type_t t, const char *nm) :
     name(nm), queueType(t), manager(m), sleepers(0)
 {
@@ -76,8 +78,8 @@ void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
 
 bool TaskQueue::_doSleep(ExecutorThread &t,
                          std::unique_lock<std::mutex>& lock) {
-    t.now = gethrtime();
-    if (t.now < t.waketime && manager->trySleep(queueType)) {
+    t.updateCurrentTime();
+    if (t.getCurTime() < t.getWaketime() && manager->trySleep(queueType)) {
         // Atomically switch from running to sleeping; iff we were previously
         // running.
         executor_state_t expected_state = EXECUTOR_RUNNING;
@@ -87,12 +89,12 @@ bool TaskQueue::_doSleep(ExecutorThread &t,
         }
         sleepers++;
         // zzz....
-        hrtime_t snooze_nsecs = t.waketime - t.now;
+        const auto snooze = t.getWaketime() - t.getCurTime();
 
-        if (snooze_nsecs > MIN_SLEEP_TIME * 1000000000) {
+        if (snooze > std::chrono::seconds((int)round(MIN_SLEEP_TIME))) {
             mutex.wait_for(lock, MIN_SLEEP_TIME);
         } else {
-            mutex.wait_for(lock, snooze_nsecs);
+            mutex.wait_for(lock, snooze);
         }
         // ... woke!
         sleepers--;
@@ -105,9 +107,9 @@ bool TaskQueue::_doSleep(ExecutorThread &t,
                                              EXECUTOR_RUNNING)) {
             return false;
         }
-        t.now = gethrtime();
+        t.updateCurrentTime();
     }
-    t.waketime = hrtime_t(-1);
+    t.setWaketime(ProcessClock::time_point(ProcessClock::time_point::max()));
     return true;
 }
 
@@ -119,11 +121,12 @@ bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
         return ret; // shutting down
     }
 
-    size_t numToWake = _moveReadyTasks(t.now);
+    size_t numToWake = _moveReadyTasks(t.getCurTime());
 
     if (!futureQueue.empty() && t.startIndex == queueType &&
-        futureQueue.top()->getWaketime() < t.waketime) {
-        t.waketime = futureQueue.top()->getWaketime(); // record earliest waketime
+        futureQueue.top()->getWaketime() < t.getWaketime()) {
+        // record earliest waketime
+        t.setWaketime(futureQueue.top()->getWaketime());
     }
 
     if (!readyQueue.empty() && readyQueue.top()->isdead()) {
@@ -163,7 +166,7 @@ bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
     return rv;
 }
 
-size_t TaskQueue::_moveReadyTasks(hrtime_t tv) {
+size_t TaskQueue::_moveReadyTasks(const ProcessClock::time_point tv) {
     if (!readyQueue.empty()) {
         return 0;
     }
@@ -195,8 +198,8 @@ void TaskQueue::_checkPendingQueue(void) {
     }
 }
 
-hrtime_t TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
-    hrtime_t wakeTime;
+ProcessClock::time_point TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
+    ProcessClock::time_point wakeTime;
     manager->doneWork(curTaskType);
 
     LockHolder lh(mutex);
@@ -205,15 +208,15 @@ hrtime_t TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
     if (curTaskType == queueType) {
         wakeTime = futureQueue.top()->getWaketime();
     } else {
-        wakeTime = hrtime_t(-1);
+        wakeTime = ProcessClock::time_point::max();
     }
 
     return wakeTime;
 }
 
-hrtime_t TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
+ProcessClock::time_point TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
-    hrtime_t rv = _reschedule(task, curTaskType);
+    const ProcessClock::time_point rv = _reschedule(task, curTaskType);
     ObjectRegistry::onSwitchThread(epe);
     return rv;
 }
@@ -243,7 +246,7 @@ void TaskQueue::schedule(ExTask &task) {
 
 void TaskQueue::_wake(ExTask &task) {
     size_t numReady = 0;
-    const hrtime_t now = gethrtime();
+    const ProcessClock::time_point now = ProcessClock::now();
 
     LockHolder lh(mutex);
     LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %" PRIu64,
index fd37929..476eb7b 100644 (file)
@@ -21,6 +21,8 @@
 
 #include <queue>
 
+#include <platform/processclock.h>
+
 #include "ringbuffer.h"
 #include "task_type.h"
 #include "tasks.h"
@@ -35,7 +37,7 @@ public:
 
     void schedule(ExTask &task);
 
-    hrtime_t reschedule(ExTask &task, task_type_t &curTaskType);
+    ProcessClock::time_point reschedule(ExTask &task, task_type_t &curTaskType);
 
     void checkPendingQueue(void);
 
@@ -59,13 +61,14 @@ public:
 
 private:
     void _schedule(ExTask &task);
-    hrtime_t _reschedule(ExTask &task, task_type_t &curTaskType);
+    ProcessClock::time_point _reschedule(ExTask &task,
+                                         task_type_t &curTaskType);
     void _checkPendingQueue(void);
     bool _fetchNextTask(ExecutorThread &thread, bool toSleep);
     void _wake(ExTask &task);
     bool _doSleep(ExecutorThread &thread, std::unique_lock<std::mutex>& lock);
     void _doWake_UNLOCKED(size_t &numToWake);
-    size_t _moveReadyTasks(hrtime_t tv);
+    size_t _moveReadyTasks(const ProcessClock::time_point tv);
     ExTask _popReadyTask(void);
 
     SyncObject mutex;
index c08d549..7d6d0c5 100644 (file)
@@ -58,16 +58,16 @@ GlobalTask::GlobalTask(EventuallyPersistentEngine *e,
 void GlobalTask::snooze(const double secs) {
     if (secs == INT_MAX) {
         setState(TASK_SNOOZED, TASK_RUNNING);
-        updateWaketime(hrtime_t(-1));
+        updateWaketime(ProcessClock::time_point::max());
         return;
     }
 
-    hrtime_t curTime = gethrtime();
+    const auto curTime = ProcessClock::now();
     if (secs) {
         setState(TASK_SNOOZED, TASK_RUNNING);
-        waketime.store(curTime + hrtime_t(secs * 1000000000));
+        updateWaketime(curTime + std::chrono::seconds((int)round(secs)));
     } else {
-        waketime.store(curTime);
+        updateWaketime(curTime);
     }
 }
 
index f26489e..1d82a2f 100644 (file)
 
 #include "config.h"
 
+#include <platform/processclock.h>
+
 #include <array>
+#include <chrono>
 #include <string>
 #include "atomic.h"
 #include "kvstore.h"
@@ -179,20 +182,32 @@ protected:
     static AtomicValue<size_t> task_id_counter;
     static size_t nextTaskId() { return task_id_counter.fetch_add(1); }
 
-    hrtime_t getWaketime() {
-        return waketime.load();
+    ProcessClock::time_point getWaketime() const {
+        const auto waketime_chrono = std::chrono::nanoseconds(waketime);
+        return ProcessClock::time_point(waketime_chrono);
     }
 
-    void updateWaketime(hrtime_t to) {
-        waketime.store(to);
+    void updateWaketime(const ProcessClock::time_point tp) {
+        waketime = to_ns_since_epoch(tp).count();
     }
 
-    void updateWaketimeIfLessThan(hrtime_t to) {
-        atomic_setIfBigger(waketime, to);
+    void updateWaketimeIfLessThan(const ProcessClock::time_point tp) {
+        const auto tp_ns = to_ns_since_epoch(tp).count();
+        if (tp_ns > waketime) {
+            waketime = tp_ns;
+        }
     }
 
 private:
-    AtomicValue<hrtime_t> waketime;      // used for priority_queue
+    /**
+     * We are using a uint64_t as opposed to ProcessTime::time_point because
+     * was want the access to be atomic without the use of a mutex.
+     * The reason for this is that the CompareByDueDate function has been shown
+     * to be pretty hot and we want to avoid the overhead of acquiring
+     * two mutexes (one for ExTask 1 and one for ExTask 2) for every invocation
+     * of the function.
+     */
+    std::atomic<int64_t> waketime; // used for priority_queue
 };
 
 typedef SingleThreadedRCPtr<GlobalTask> ExTask;