MB-19248: Fix race in TaskQueue.{ready,future,pending}Queue access 69/62969/6
authorDave Rigby <daver@couchbase.com>
Wed, 12 Nov 2014 17:26:17 +0000 (17:26 +0000)
committerChiyoung Seo <chiyoung@couchbase.com>
Sat, 23 Apr 2016 01:03:50 +0000 (01:03 +0000)
Fix race as identified by ThreadSanitizer:

    WARNING: ThreadSanitizer: data race (pid=4243)
      Read of size 8 at 0x7d04000fde60 by main thread (mutexes: write M1367):
#0 std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> >::operator++() /usr/include/c++/4.8/bits/stl_list.h:235 (ep.so+0x00000013a129)
#1 std::iterator_traits<std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> > >::difference_type std::__distance<std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> > >(std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> >, std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> >, std::input_iterator_tag) /usr/include/c++/4.8/bits/stl_iterator_base_funcs.h:82 (ep.so+0x000000138b67)
#2 std::iterator_traits<std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> > >::difference_type std::distance<std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> > >(std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> >, std::_List_const_iterator<SingleThreadedRCPtr<GlobalTask> >) /usr/include/c++/4.8/bits/stl_iterator_base_funcs.h:118 (ep.so+0x000000136b63)
#3 std::list<SingleThreadedRCPtr<GlobalTask>, std::allocator<SingleThreadedRCPtr<GlobalTask> > >::size() const /usr/include/c++/4.8/bits/stl_list.h:874 (ep.so+0x000000135538)
#4 ExecutorPool::doTaskQStat(EventuallyPersistentEngine*, void const*, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) ep-engine/src/executorpool.cc:654 (ep.so+0x0000001331b6)
#5 EventuallyPersistentEngine::doWorkloadStats(void const*, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) ep-engine/src/ep_engine.cc:4198 (ep.so+0x000000112ed3)
#6 EventuallyPersistentEngine::getStats(void const*, char const*, int, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) ep-engine/src/ep_engine.cc:4454 (ep.so+0x000000114cb4)
#7 EvpGetStats ep-engine/src/ep_engine.cc:217 (ep.so+0x000000102b14)
#8 mock_get_stats memcached/programs/engine_testapp/engine_testapp.c:195 (exe+0x0000000026de)
#9 test_workload_stats ep-engine/tests/ep_testsuite.cc:7094 (ep_testsuite.so+0x00000004e931)
#10 execute_test memcached/programs/engine_testapp/engine_testapp.c:1055 (exe+0x0000000059fc)
#11 main memcached/programs/engine_testapp/engine_testapp.c:1313 (exe+0x000000006606)

      Previous write of size 8 at 0x7d04000fde60 by thread T5 (mutexes: write M45):
#0 RCValue::_rc_decref() const ep-engine/src/atomic.h:293 (ep.so+0x000000096797)
#1 __gnu_cxx::new_allocator<std::_List_node<SingleThreadedRCPtr<GlobalTask> > >::allocate(unsigned long, void const*) /usr/include/c++/4.8/ext/new_allocator.h:104 (ep.so+0x00000017b42b)
#2 std::_List_base<SingleThreadedRCPtr<GlobalTask>, std::allocator<SingleThreadedRCPtr<GlobalTask> > >::_M_get_node() /usr/include/c++/4.8/bits/stl_list.h:334 (ep.so+0x00000017b0dc)
#3 std::_List_node<SingleThreadedRCPtr<GlobalTask> >* std::list<SingleThreadedRCPtr<GlobalTask>, std::allocator<SingleThreadedRCPtr<GlobalTask> > >::_M_create_node<SingleThreadedRCPtr<GlobalTask> const&>(SingleThreadedRCPtr<GlobalTask> const&) /usr/include/c++/4.8/bits/stl_list.h:502 (ep.so+0x00000017a25a)
#4 void std::list<SingleThreadedRCPtr<GlobalTask>, std::allocator<SingleThreadedRCPtr<GlobalTask> > >::_M_insert<SingleThreadedRCPtr<GlobalTask> const&>(std::_List_iterator<SingleThreadedRCPtr<GlobalTask> >, SingleThreadedRCPtr<GlobalTask> const&) /usr/include/c++/4.8/bits/stl_list.h:1561 (ep.so+0x000000178e7b)
#5 std::list<SingleThreadedRCPtr<GlobalTask>, std::allocator<SingleThreadedRCPtr<GlobalTask> > >::push_back(SingleThreadedRCPtr<GlobalTask> const&) /usr/include/c++/4.8/bits/stl_list.h:1016 (ep.so+0x00000017817b)
#6 TaskQueue::_fetchNextTask(ExecutorThread&, bool) ep-engine/src/taskqueue.cc:126 (ep.so+0x000000176932)
#7 TaskQueue::fetchNextTask(ExecutorThread&, bool) ep-engine/src/taskqueue.cc:142 (ep.so+0x000000176acf)
#8 ExecutorPool::_nextTask(ExecutorThread&, unsigned char) ep-engine/src/executorpool.cc:214 (ep.so+0x000000130707)
#9 ExecutorPool::nextTask(ExecutorThread&, unsigned char) ep-engine/src/executorpool.cc:229 (ep.so+0x0000001307a5)
#10 ExecutorThread::run() ep-engine/src/executorthread.cc:78 (ep.so+0x000000149da0)
#11 launch_executor_thread ep-engine/src/executorthread.cc:34 (ep.so+0x00000014990a)
#12 platform_thread_wrap platform/src/cb_pthreads.c:19 (libplatform.so.0.1.0+0x000000002d8b)
#13 __tsan_write_range ??:0 (libtsan.so.0+0x00000001b1c9)

Fix by adding new helper methods which will return the size of the
various queues (while holding the TaskQueue's mutex).

Change-Id: If5e8d357e45803d78c4ba6ed1475e6e1a90e1c89
Reviewed-on: http://review.couchbase.org/62969
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Will Gardner <will.gardner@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/executorpool.cc
src/taskqueue.cc
src/taskqueue.h

index 571227c..cfc0dcd 100644 (file)
@@ -615,17 +615,17 @@ void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
         for (size_t i = 0; i < numTaskSets; i++) {
             snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
                      hpTaskQ[i]->getName().c_str());
-            add_casted_stat(statname, hpTaskQ[i]->futureQueue.size(), add_stat,
+            add_casted_stat(statname, hpTaskQ[i]->getFutureQueueSize(), add_stat,
                             cookie);
             snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
                      hpTaskQ[i]->getName().c_str());
-            add_casted_stat(statname, hpTaskQ[i]->readyQueue.size(), add_stat,
+            add_casted_stat(statname, hpTaskQ[i]->getReadyQueueSize(), add_stat,
                             cookie);
-            if (!hpTaskQ[i]->pendingQueue.empty()) {
+            size_t pendingQsize = hpTaskQ[i]->getPendingQueueSize();
+            if (pendingQsize > 0) {
                 snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
                         hpTaskQ[i]->getName().c_str());
-                add_casted_stat(statname, hpTaskQ[i]->pendingQueue.size(),
-                                add_stat, cookie);
+                add_casted_stat(statname, pendingQsize, add_stat, cookie);
             }
         }
     }
@@ -633,17 +633,17 @@ void ExecutorPool::doTaskQStat(EventuallyPersistentEngine *engine,
         for (size_t i = 0; i < numTaskSets; i++) {
             snprintf(statname, sizeof(statname), "ep_workload:%s:InQsize",
                      lpTaskQ[i]->getName().c_str());
-            add_casted_stat(statname, lpTaskQ[i]->futureQueue.size(), add_stat,
+            add_casted_stat(statname, lpTaskQ[i]->getFutureQueueSize(), add_stat,
                             cookie);
             snprintf(statname, sizeof(statname), "ep_workload:%s:OutQsize",
                      lpTaskQ[i]->getName().c_str());
-            add_casted_stat(statname, lpTaskQ[i]->readyQueue.size(), add_stat,
+            add_casted_stat(statname, lpTaskQ[i]->getReadyQueueSize(), add_stat,
                             cookie);
-            if (!lpTaskQ[i]->pendingQueue.empty()) {
+            size_t pendingQsize = lpTaskQ[i]->getPendingQueueSize();
+            if (pendingQsize > 0) {
                 snprintf(statname, sizeof(statname), "ep_workload:%s:PendingQ",
                         lpTaskQ[i]->getName().c_str());
-                add_casted_stat(statname, lpTaskQ[i]->pendingQueue.size(),
-                                add_stat, cookie);
+                add_casted_stat(statname, pendingQsize, add_stat, cookie);
             }
         }
     }
index 21bffda..94c93fe 100644 (file)
@@ -34,6 +34,21 @@ const std::string TaskQueue::getName() const {
     return (name+taskType2Str(queueType));
 }
 
+size_t TaskQueue::getReadyQueueSize() {
+    LockHolder lh(mutex);
+    return readyQueue.size();
+}
+
+size_t TaskQueue::getFutureQueueSize() {
+    LockHolder lh(mutex);
+    return futureQueue.size();
+}
+
+size_t TaskQueue::getPendingQueueSize() {
+    LockHolder lh(mutex);
+    return pendingQueue.size();
+}
+
 ExTask TaskQueue::_popReadyTask(void) {
     ExTask t = readyQueue.top();
     readyQueue.pop();
index d50a9c0..8e685b7 100644 (file)
@@ -51,6 +51,12 @@ public:
 
     const task_type_t getQueueType() const { return queueType; }
 
+    size_t getReadyQueueSize();
+
+    size_t getFutureQueueSize();
+
+    size_t getPendingQueueSize();
+
 private:
     void _schedule(ExTask &task);
     hrtime_t _reschedule(ExTask &task, task_type_t &curTaskType);