MB-19223: Switch to hrtime from timeval in Global Thread Pool 12/62912/4
authorSundar Sridharan <sundar.sridharan@gmail.com>
Thu, 23 Jul 2015 22:39:59 +0000 (15:39 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Sat, 23 Apr 2016 00:51:25 +0000 (00:51 +0000)
This has small improvements in memory and cpu usage.
Also fixes several ThreadSanitizer races from unit tests - for example:

WARNING: ThreadSanitizer: data race (pid=21672)
  Write of size 8 at 0x7d140000e7b8 by main thread (mutexes: write M14972, write M14985):
    #0 memcpy <null> (engine_testapp+0x000000453040)
    #1 TaskQueue::_wake(SingleThreadedRCPtr<GlobalTask>&) /home/daver/repos/couchbase/server/ep-engine/src/taskqueue.cc:255 (ep.so+0x0000002577f6)
    #2 TaskQueue::wake(SingleThreadedRCPtr<GlobalTask>&) /home/daver/repos/couchbase/server/ep-engine/src/taskqueue.cc:282 (ep.so+0x000000257c73)
    #3 ExecutorPool::_wake(unsigned long) /home/daver/repos/couchbase/server/ep-engine/src/executorpool.cc:320 (ep.so+0x0000001acc76)
    #4 ExecutorPool::wake(unsigned long) /home/daver/repos/couchbase/server/ep-engine/src/executorpool.cc:328 (ep.so+0x0000001ace13)
    #5 Flusher::wait() /home/daver/repos/couchbase/server/ep-engine/src/flusher.cc:41 (ep.so+0x0000001cc4ff)
    #6 EventuallyPersistentStore::stopFlusher() /home/daver/repos/couchbase/server/ep-engine/src/ep.cc:402 (ep.so+0x0000000d54d5)
    #7 ~EventuallyPersistentStore /home/daver/repos/couchbase/server/ep-engine/src/ep.cc:364 (ep.so+0x0000000d49cb)
    #8 ~EventuallyPersistentEngine /home/daver/repos/couchbase/server/ep-engine/src/ep_engine.cc:5778 (ep.so+0x000000161043)
    #9 EvpDestroy(engine_interface*, bool) /home/daver/repos/couchbase/server/ep-engine/src/ep_engine.cc:143 (ep.so+0x000000135efa)
    #10 mock_destroy /home/daver/repos/couchbase/server/memcached/programs/engine_testapp/engine_testapp.c:61 (engine_testapp+0x0000004bb9d6)
    #11 destroy_engine /home/daver/repos/couchbase/server/memcached/programs/engine_testapp/engine_testapp.c:998 (engine_testapp+0x0000004bb646)
    #12 execute_test /home/daver/repos/couchbase/server/memcached/programs/engine_testapp/engine_testapp.c:1048 (engine_testapp+0x0000004baa11)
    #13 main /home/daver/repos/couchbase/server/memcached/programs/engine_testapp/engine_testapp.c:1296 (engine_testapp+0x0000004b8861)

  Previous read of size 8 at 0x7d140000e7b8 by thread T14:
    #0 ExecutorThread::run() /home/daver/repos/couchbase/server/ep-engine/src/executorthread.cc:106 (ep.so+0x0000001e3488)
    #1 launch_executor_thread(void*) /home/daver/repos/couchbase/server/ep-engine/src/executorthread.cc:34 (ep.so+0x0000001e2a5a)
    #2 platform_thread_wrap /home/daver/repos/couchbase/server/platform/src/cb_pthreads.c:19 (libplatform.so.0.1.0+0x0000000035dc)

Change-Id: I78fdddb832251fc062058c04f75f8d22c4c2f68d
Reviewed-on: http://review.couchbase.org/62912
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Will Gardner <will.gardner@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
src/access_scanner.cc
src/access_scanner.h
src/executorpool.cc
src/executorthread.cc
src/executorthread.h
src/syncobject.h
src/taskqueue.cc
src/taskqueue.h
src/tasks.cc
src/tasks.h

index c62b924..8daf420 100644 (file)
@@ -157,10 +157,18 @@ bool AccessScanner::run() {
         }
     }
     snooze(sleepTime);
-    stats.alogTime.store(waketime.tv_sec);
+    updateAlogTime(sleepTime);
+
     return true;
 }
 
+void AccessScanner::updateAlogTime(double sleepSecs) {
+    struct timeval _waketime;
+    gettimeofday(&_waketime, NULL);
+    _waketime.tv_sec += sleepSecs;
+    stats.alogTime.store(_waketime.tv_sec);
+}
+
 std::string AccessScanner::getDescription() {
     return std::string("Generating access log");
 }
index fb07b89..4662b87 100644 (file)
@@ -47,6 +47,8 @@ public:
     AtomicValue<size_t> completedCount;
 
 private:
+    void updateAlogTime(double sleepSecs);
+
     EventuallyPersistentStore &store;
     EPStats &stats;
     double sleepTime;
index 42d9a91..571227c 100644 (file)
@@ -528,10 +528,7 @@ bool ExecutorPool::_stopTaskGroup(EventuallyPersistentEngine *e,
             }
         }
         if (unfinishedTask) {
-            struct timeval waktime;
-            gettimeofday(&waktime, NULL);
-            advance_tv(waktime, MIN_SLEEP_TIME);
-            tMutex.wait(waktime); // Wait till task gets cancelled
+            tMutex.wait(MIN_SLEEP_TIME); // Wait till task gets cancelled
         }
     } while (unfinishedTask);
 
@@ -692,13 +689,9 @@ static void addWorkerStats(const char *prefix, ExecutorThread *t,
                 (gethrtime() - t->getTaskStart()) / 1000, add_stat, cookie);
     }
     snprintf(statname, sizeof(statname), "%s:waketime", prefix);
-    uint64_t abstime = t->getWaketime().tv_sec*1000000 +
-                       t->getWaketime().tv_usec;
-    add_casted_stat(statname, abstime, add_stat, cookie);
+    add_casted_stat(statname, t->getWaketime(), add_stat, cookie);
     snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
-    abstime = t->getCurTime().tv_sec*1000000 +
-              t->getCurTime().tv_usec;
-    add_casted_stat(statname, abstime, add_stat, cookie);
+    add_casted_stat(statname, t->getCurTime(), add_stat, cookie);
 }
 
 void ExecutorPool::doWorkerStat(EventuallyPersistentEngine *engine,
index 2d20c6b..e4d6b3b 100644 (file)
@@ -87,23 +87,19 @@ void ExecutorThread::run() {
 
             // Measure scheduling overhead as difference between the time
             // that the task wanted to wake up and the current time
-            gettimeofday(&now, NULL);
-            struct timeval woketime = currentTask->waketime;
-            uint64_t diffsec = now.tv_sec > woketime.tv_sec ?
-                               now.tv_sec - woketime.tv_sec : 0;
-            uint64_t diffusec = now.tv_usec > woketime.tv_usec ?
-                                now.tv_usec - woketime.tv_usec : 0;
-
+            now = gethrtime();
+            hrtime_t woketime = currentTask->waketime;
             engine->getEpStore()->logQTime(currentTask->getTypeId(),
-                                   diffsec*1000000 + diffusec);
+                                           now > woketime ? now - woketime
+                                                          : 0);
 
-            taskStart = gethrtime();
+            taskStart = now;
             rel_time_t startReltime = ep_current_time();
             try {
                 LOG(EXTENSION_LOG_DEBUG,
-                    "%s: Run task \"%s\" id %d waketime %d",
+                    "%s: Run task \"%s\" id %d",
                 getName().c_str(), currentTask->getDescription().c_str(),
-                currentTask->getId(), currentTask->waketime.tv_sec);
+                currentTask->getId());
 
                 // Now Run the Task ....
                 currentTask->setState(TASK_RUNNING, TASK_SNOOZED);
@@ -125,26 +121,26 @@ void ExecutorThread::run() {
                     manager->doneWork(curTaskType);
                     manager->cancel(currentTask->taskId, true);
                 } else {
-                    struct timeval timetowake;
+                    hrtime_t new_waketime;
                     // if a task has not set snooze, update its waketime to now
                     // before rescheduling for more accurate timing histograms
-                    if (less_eq_tv(currentTask->waketime, now)) {
+                    if (currentTask->waketime <= now) {
                         currentTask->waketime = now;
                     }
                     // release capacity back to TaskQueue ..
                     manager->doneWork(curTaskType);
-                    timetowake = q->reschedule(currentTask, curTaskType);
+                    new_waketime = q->reschedule(currentTask, curTaskType);
                     // record min waketime ...
-                    if (less_tv(timetowake, waketime)) {
-                        waketime = timetowake;
+                    if (new_waketime < waketime) {
+                        waketime = new_waketime;
                     }
-                    LOG(EXTENSION_LOG_DEBUG,
-                            "%s: Reschedule a task \"%s\" id %d[%d %d |%d]",
+                    LOG(EXTENSION_LOG_DEBUG, "%s: Reschedule a task"
+                            " \"%s\" id %d[%llu %llu |%llu]",
                             name.c_str(),
                             currentTask->getDescription().c_str(),
-                            currentTask->getId(), timetowake.tv_sec,
-                            currentTask->waketime.tv_sec,
-                            waketime.tv_sec);
+                            currentTask->getId(), new_waketime,
+                            currentTask->waketime,
+                            waketime);
                 }
             } catch (std::exception& e) {
                 LOG(EXTENSION_LOG_WARNING,
index cd45007..b2f2725 100644 (file)
@@ -64,7 +64,7 @@ public:
           state(EXECUTOR_CREATING), taskStart(0),
           currentTask(NULL), curTaskType(NO_TASK_TYPE),
           tasklog(TASK_LOG_SIZE), slowjobs(TASK_LOG_SIZE) {
-              set_max_tv(waketime);
+              waketime = hrtime_t(-1);
     }
 
     ~ExecutorThread() {
@@ -113,9 +113,9 @@ public:
         return slowjobs.contents();
     }
 
-    struct timeval getWaketime(void) { return waketime; }
+    const hrtime_t getWaketime(void) { return waketime; }
 
-    struct timeval getCurTime(void) { return now; }
+    const hrtime_t getCurTime(void) { return now; }
 
 private:
 
@@ -125,8 +125,8 @@ private:
     const std::string name;
     AtomicValue<executor_state_t> state;
 
-    struct  timeval    now;  // record of current time
-    struct timeval waketime; // set to the earliest
+    hrtime_t     now;  // record of current time
+    hrtime_t waketime; // set to the earliest
 
     hrtime_t taskStart;
     ExTask currentTask;
index 7813125..6824e7c 100644 (file)
@@ -40,43 +40,13 @@ public:
         setHolder(true);
     }
 
-    void wait(const struct timeval &tv) {
-        // Todo:
-        //   This logic is a bit weird, because normally we want to
-        //   sleep for a certain amount of time, but since we built
-        //   the stuff on pthreads and the API looked like it did we
-        //   used the absolute timers making us sleep to a certain
-        //   point in the future.. now we need to roll back that work
-        //   and do it again in the library API...
-        //   I believe we should rather try to modify our own code
-        //   to only do relative waits, and then have the native
-        //   calls do the either absolute or relative checks.
-        //
-        //   There is no point of having an explicit return
-        //   value if it was a timeout or something else, because
-        //   you would have to evaluate the reason you waited anyway
-        //   (because one could have spurious wakeups etc)
-        struct timeval now;
-        gettimeofday(&now, NULL);
-
-        if (tv.tv_sec < now.tv_sec) {
-            return ;
-        }
-
-        uint64_t a = (now.tv_sec * 1000) + (now.tv_usec / 1000);
-        uint64_t b = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
-
-        if (b < a) {
-            // Already expired
-            return ;
-        }
-
-        cb_cond_timedwait(&cond, &mutex, (int)(b - a));
+    void wait(const double secs) {
+        cb_cond_timedwait(&cond, &mutex, (unsigned int)(secs * 1000.0));
         setHolder(true);
     }
 
-    void wait(const double secs) {
-        cb_cond_timedwait(&cond, &mutex, (unsigned int)(secs * 1000.0));
+    void wait(const hrtime_t nanoSecs) {
+        cb_cond_timedwait(&cond, &mutex, (unsigned int)(nanoSecs/1000000));
         setHolder(true);
     }
 
index 03bf90a..c9cba7b 100644 (file)
@@ -60,8 +60,8 @@ void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
 }
 
 bool TaskQueue::_doSleep(ExecutorThread &t) {
-    gettimeofday(&t.now, NULL);
-    if (less_tv(t.now, t.waketime) && manager->trySleep(queueType)) {
+    t.now = gethrtime();
+    if (t.now < t.waketime && manager->trySleep(queueType)) {
         // Atomically switch from running to sleeping; iff we were previously
         // running.
         executor_state_t expected_state = EXECUTOR_RUNNING;
@@ -71,12 +71,12 @@ bool TaskQueue::_doSleep(ExecutorThread &t) {
         }
         sleepers++;
         // zzz....
-        struct timeval waketime = t.now;
-        advance_tv(waketime, MIN_SLEEP_TIME); // avoid sleeping more than this
-        if (less_tv(waketime, t.waketime)) { // to prevent losing posts
-            mutex.wait(waketime);
+        hrtime_t snooze_nsecs = t.waketime - t.now;
+
+        if (snooze_nsecs > MIN_SLEEP_TIME * 1000000000) {
+            mutex.wait(MIN_SLEEP_TIME);
         } else {
-            mutex.wait(t.waketime);
+            mutex.wait(snooze_nsecs);
         }
         // ... woke!
         sleepers--;
@@ -89,9 +89,9 @@ bool TaskQueue::_doSleep(ExecutorThread &t) {
                                              EXECUTOR_RUNNING)) {
             return false;
         }
-        gettimeofday(&t.now, NULL);
+        t.now = gethrtime();
     }
-    set_max_tv(t.waketime);
+    t.waketime = hrtime_t(-1);
     return true;
 }
 
@@ -106,7 +106,7 @@ bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
     size_t numToWake = _moveReadyTasks(t.now);
 
     if (!futureQueue.empty() && t.startIndex == queueType &&
-        less_tv(futureQueue.top()->waketime, t.waketime)) {
+        futureQueue.top()->waketime < t.waketime) {
         t.waketime = futureQueue.top()->waketime; // record earliest waketime
     }
 
@@ -148,7 +148,7 @@ bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
     return rv;
 }
 
-size_t TaskQueue::_moveReadyTasks(struct timeval tv) {
+size_t TaskQueue::_moveReadyTasks(hrtime_t tv) {
     if (!readyQueue.empty()) {
         return 0;
     }
@@ -156,7 +156,7 @@ size_t TaskQueue::_moveReadyTasks(struct timeval tv) {
     size_t numReady = 0;
     while (!futureQueue.empty()) {
         ExTask tid = futureQueue.top();
-        if (less_eq_tv(tid->waketime, tv)) {
+        if (tid->waketime <= tv) {
             futureQueue.pop();
             readyQueue.push(tid);
             numReady++;
@@ -180,25 +180,25 @@ void TaskQueue::_checkPendingQueue(void) {
     }
 }
 
-struct timeval TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
-    struct timeval waktime;
+hrtime_t TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
+    hrtime_t wakeTime;
     manager->doneWork(curTaskType);
 
     LockHolder lh(mutex);
 
     futureQueue.push(task);
     if (curTaskType == queueType) {
-        waktime = futureQueue.top()->waketime;
+        wakeTime = futureQueue.top()->waketime;
     } else {
-        set_max_tv(waktime);
+        wakeTime = hrtime_t(-1);
     }
 
-    return waktime;
+    return wakeTime;
 }
 
-struct timeval TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
+hrtime_t TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
-    struct timeval rv = _reschedule(task, curTaskType);
+    hrtime_t rv = _reschedule(task, curTaskType);
     ObjectRegistry::onSwitchThread(epe);
     return rv;
 }
@@ -227,9 +227,8 @@ void TaskQueue::schedule(ExTask &task) {
 }
 
 void TaskQueue::_wake(ExTask &task) {
-    struct  timeval    now;
     size_t numReady = 0;
-    gettimeofday(&now, NULL);
+    const hrtime_t now = gethrtime();
 
     LockHolder lh(mutex);
     LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %d", name.c_str(),
@@ -261,7 +260,7 @@ void TaskQueue::_wake(ExTask &task) {
 
     while (!notReady.empty()) {
         ExTask tid = notReady.front();
-        if (less_eq_tv(tid->waketime, now) || tid->isdead()) {
+        if (tid->waketime <= now || tid->isdead()) {
             readyQueue.push(tid);
             numReady++;
         } else {
index 1a7d577..d50a9c0 100644 (file)
@@ -35,7 +35,7 @@ public:
 
     void schedule(ExTask &task);
 
-    struct timeval reschedule(ExTask &task, task_type_t &curTaskType);
+    hrtime_t reschedule(ExTask &task, task_type_t &curTaskType);
 
     void checkPendingQueue(void);
 
@@ -53,13 +53,13 @@ public:
 
 private:
     void _schedule(ExTask &task);
-    struct timeval _reschedule(ExTask &task, task_type_t &curTaskType);
+    hrtime_t _reschedule(ExTask &task, task_type_t &curTaskType);
     void _checkPendingQueue(void);
     bool _fetchNextTask(ExecutorThread &thread, bool toSleep);
     void _wake(ExTask &task);
     bool _doSleep(ExecutorThread &thread);
     void _doWake_UNLOCKED(size_t &numToWake);
-    size_t _moveReadyTasks(struct timeval tv);
+    size_t _moveReadyTasks(hrtime_t tv);
     ExTask _popReadyTask(void);
 
     SyncObject mutex;
index 929c193..1da1f9c 100644 (file)
@@ -28,15 +28,14 @@ static const double WORKLOAD_MONITOR_FREQ(5.0);
 void GlobalTask::snooze(const double secs) {
     if (secs == INT_MAX) {
         setState(TASK_SNOOZED, TASK_RUNNING);
-        set_max_tv(waketime);
+        waketime = hrtime_t(-1);
         return;
     }
 
-    gettimeofday(&waketime, NULL);
-
+    waketime = gethrtime();
     if (secs) {
         setState(TASK_SNOOZED, TASK_RUNNING);
-        advance_tv(waketime, secs);
+        waketime += hrtime_t(secs * 1000000000);
     }
 }
 
index 29eef4a..42aa807 100644 (file)
@@ -150,7 +150,7 @@ protected:
     bool blockShutdown;
     AtomicValue<task_state_t> state;
     const size_t taskId;
-    struct timeval waketime;
+    hrtime_t waketime; // used for priority_queue, guarded by TaskQ mutex
     EventuallyPersistentEngine *engine;
 
     static AtomicValue<size_t> task_id_counter;
@@ -452,7 +452,7 @@ public:
 class CompareByDueDate {
 public:
     bool operator()(ExTask &t1, ExTask &t2) {
-        return less_tv(t2->waketime, t1->waketime);
+        return t2->waketime < t1->waketime;
     }
 };