MB-19224: Address possible data race with global task's waketime 13/62913/4
authorabhinavdangeti <abhinav@couchbase.com>
Mon, 5 Oct 2015 19:18:47 +0000 (12:18 -0700)
committerChiyoung Seo <chiyoung@couchbase.com>
Sat, 23 Apr 2016 00:53:28 +0000 (00:53 +0000)
WARINING: ThreadSanitizer: data race (pid=7728)

Write of size 8 at 0x7d140000e8a8 by main thread (mutexes: write M11519, write M11535):
0 TaskQueue::_wake(SingleThreadedRCPtr<GlobalTask>&) /home/abhinav/couchbase/ep-engine/src/taskqueue.cc:272 (ep.so+0x000000142b59)
1 TaskQueue::wake(SingleThreadedRCPtr<GlobalTask>&) /home/abhinav/couchbase/ep-engine/src/taskqueue.cc:299 (ep.so+0x00000014382e)
2 ExecutorPool::_stopTaskGroup(unsigned long, task_type_t, bool) /home/abhinav/couchbase/ep-engine/src/executorpool.cc:568 (ep.so+0x0000000f2f46)
3 ExecutorPool::stopTaskGroup(unsigned long, task_type_t, bool) /home/abhinav/couchbase/ep-engine/src/executorpool.cc:585 (ep.so+0x0000000f31ee)
4 ~EventuallyPersistentStore /home/abhinav/couchbase/ep-engine/src/ep.cc:468 (ep.so+0x0000000830f6)
5 ~EventuallyPersistentEngine /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:6326 (ep.so+0x0000000d42ba)
6 EvpDestroy(engine_interface*, bool) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:141 (ep.so+0x0000000b3fbc)
7 mock_destroy(engine_interface*, bool) /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:98 (engine_testapp+0x0000000ba027)
8 destroy_bucket(engine_interface*, engine_interface_v1*, bool) /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:995 (engine_testapp+0x0000000b952e)
9 __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287 (libc.so.6+0x000000021ec4)

Previous write of size 8 at 0x7d140000e8a8 by thread T10:
0 GlobalTask::snooze(double) /home/abhinav/couchbase/ep-engine/src/tasks.cc:56 (ep.so+0x00000013b6fa)
1 ConnManager::run() /home/abhinav/couchbase/ep-engine/src/connmap.cc:151 (ep.so+0x00000005032e)
2 ExecutorThread::run() /home/abhinav/couchbase/ep-engine/src/executorthread.cc:112 (ep.so+0x0000000f86da)
3 launch_executor_thread(void*) /home/abhinav/couchbase/ep-engine/src/executorthread.cc:33 (ep.so+0x0000000f82e5)
4 platform_thread_wrap /home/abhinav/couchbase/platform/src/cb_pthreads.c:23 (libplatform.so.0.1.0+0x000000003d31)

Change-Id: Ib11f9b3cd6919e292f84cc08260eabd8e1381aa6
Reviewed-on: http://review.couchbase.org/62913
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Will Gardner <will.gardner@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
src/executorthread.cc
src/taskqueue.cc
src/tasks.cc
src/tasks.h

index e4d6b3b..365105a 100644 (file)
@@ -88,7 +88,7 @@ void ExecutorThread::run() {
             // Measure scheduling overhead as difference between the time
             // that the task wanted to wake up and the current time
             now = gethrtime();
-            hrtime_t woketime = currentTask->waketime;
+            hrtime_t woketime = currentTask->getWaketime();
             engine->getEpStore()->logQTime(currentTask->getTypeId(),
                                            now > woketime ? now - woketime
                                                           : 0);
@@ -124,9 +124,8 @@ void ExecutorThread::run() {
                     hrtime_t new_waketime;
                     // if a task has not set snooze, update its waketime to now
                     // before rescheduling for more accurate timing histograms
-                    if (currentTask->waketime <= now) {
-                        currentTask->waketime = now;
-                    }
+                    currentTask->updateWaketimeIfLessThan(now);
+
                     // release capacity back to TaskQueue ..
                     manager->doneWork(curTaskType);
                     new_waketime = q->reschedule(currentTask, curTaskType);
@@ -139,7 +138,7 @@ void ExecutorThread::run() {
                             name.c_str(),
                             currentTask->getDescription().c_str(),
                             currentTask->getId(), new_waketime,
-                            currentTask->waketime,
+                            currentTask->getWaketime(),
                             waketime);
                 }
             } catch (std::exception& e) {
index c9cba7b..21bffda 100644 (file)
@@ -106,8 +106,8 @@ bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
     size_t numToWake = _moveReadyTasks(t.now);
 
     if (!futureQueue.empty() && t.startIndex == queueType &&
-        futureQueue.top()->waketime < t.waketime) {
-        t.waketime = futureQueue.top()->waketime; // record earliest waketime
+        futureQueue.top()->getWaketime() < t.waketime) {
+        t.waketime = futureQueue.top()->getWaketime(); // record earliest waketime
     }
 
     if (!readyQueue.empty() && readyQueue.top()->isdead()) {
@@ -156,7 +156,7 @@ size_t TaskQueue::_moveReadyTasks(hrtime_t tv) {
     size_t numReady = 0;
     while (!futureQueue.empty()) {
         ExTask tid = futureQueue.top();
-        if (tid->waketime <= tv) {
+        if (tid->getWaketime() <= tv) {
             futureQueue.pop();
             readyQueue.push(tid);
             numReady++;
@@ -188,7 +188,7 @@ hrtime_t TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
 
     futureQueue.push(task);
     if (curTaskType == queueType) {
-        wakeTime = futureQueue.top()->waketime;
+        wakeTime = futureQueue.top()->getWaketime();
     } else {
         wakeTime = hrtime_t(-1);
     }
@@ -255,12 +255,12 @@ void TaskQueue::_wake(ExTask &task) {
     }
 
     // Note that this task that we are waking may nor may not be blocked in Q
-    task->waketime = now;
+    task->updateWaketime(now);
     task->setState(TASK_RUNNING, TASK_SNOOZED);
 
     while (!notReady.empty()) {
         ExTask tid = notReady.front();
-        if (tid->waketime <= now || tid->isdead()) {
+        if (tid->getWaketime() <= now || tid->isdead()) {
             readyQueue.push(tid);
             numReady++;
         } else {
index 1da1f9c..b585b5a 100644 (file)
@@ -28,14 +28,16 @@ static const double WORKLOAD_MONITOR_FREQ(5.0);
 void GlobalTask::snooze(const double secs) {
     if (secs == INT_MAX) {
         setState(TASK_SNOOZED, TASK_RUNNING);
-        waketime = hrtime_t(-1);
+        updateWaketime(hrtime_t(-1));
         return;
     }
 
-    waketime = gethrtime();
+    hrtime_t curTime = gethrtime();
     if (secs) {
         setState(TASK_SNOOZED, TASK_RUNNING);
-        waketime += hrtime_t(secs * 1000000000);
+        waketime.store(curTime + hrtime_t(secs * 1000000000));
+    } else {
+        waketime.store(curTime);
     }
 }
 
index 42aa807..d74322c 100644 (file)
@@ -145,16 +145,29 @@ public:
     }
 
 protected:
-
     const Priority &priority;
     bool blockShutdown;
     AtomicValue<task_state_t> state;
     const size_t taskId;
-    hrtime_t waketime; // used for priority_queue, guarded by TaskQ mutex
     EventuallyPersistentEngine *engine;
 
     static AtomicValue<size_t> task_id_counter;
     static size_t nextTaskId() { return task_id_counter.fetch_add(1); }
+
+    hrtime_t getWaketime() {
+        return waketime.load();
+    }
+
+    void updateWaketime(hrtime_t to) {
+        waketime.store(to);
+    }
+
+    void updateWaketimeIfLessThan(hrtime_t to) {
+        atomic_setIfBigger(waketime, to);
+    }
+
+private:
+    AtomicValue<hrtime_t> waketime;      // used for priority_queue
 };
 
 typedef SingleThreadedRCPtr<GlobalTask> ExTask;