MB-18453: Give all tasks their own stats and priority 53/65253/10
authorJim Walker <jim@couchbase.com>
Sun, 26 Jun 2016 19:52:10 +0000 (20:52 +0100)
committerDave Rigby <daver@couchbase.com>
Mon, 4 Jul 2016 09:03:47 +0000 (09:03 +0000)
MB-18453 identified that tasks have copied & pasted
constructors which leads to some tasks all having the
same Priority object.

The fallout of this is that many tasks now all contribute
to the same histogram for runtime and scheduling waittime.
When debugging issues which lead to MB-18453 it is near
impossible at times to know which real task was delayed
as the stats can be attributed to many tasks.

This commit introduces makes all tasks have their own ID
and thus their own histograms and also makes it easier
for new tasks to be created without forgetting to create
a new Priority instance.

tasks.defs.h is a new file that captures every sub-class
of GlobalTask and shows the priority of all tasks.

TASK macros are now used to generate various switch
statements and enums used in task accounting.

The new system is not strict, MyTask could still be
assigned the priority/id of OldTask, however this
flexibility can be useful in some circumstances.

Note this patch has changed ep_testsuite test_item_pager
to increase the max_size value in the test config. This
is because this patch increases the baseline heap usage
of a bucket as we've increased the number of Histogram
object allocated by EventuallyPersistentStore.

Prior to this patch 28 were allocated, with this patch
51 are allocated (1 per task). Each Histogram<hrtime_t
is approx 1568 bytes (on OSX clang build).

The new max_size is 2.5MiB

Change-Id: I209c67945b964023615af37a12f83ca97142ce53
Reviewed-on: http://review.couchbase.org/65253
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
34 files changed:
CMakeLists.txt
README.md [new file with mode: 0644]
src/access_scanner.h
src/backfill.cc
src/backfill.h
src/bgfetcher.cc
src/checkpoint_remover.cc
src/checkpoint_remover.h
src/connmap.cc
src/dcp-backfill-manager.cc
src/dcp-consumer.cc
src/dcp-consumer.h
src/dcp-stream.h
src/defragmenter.cc
src/ep.cc
src/ep.h
src/ep_engine.cc
src/executorthread.cc
src/flusher.cc
src/htresizer.cc
src/htresizer.h
src/item_pager.cc
src/item_pager.h
src/priority.cc [deleted file]
src/priority.h [deleted file]
src/tapconnection.cc
src/tapconnection.h
src/tasks.cc
src/tasks.def.h [new file with mode: 0644]
src/tasks.h
src/warmup.cc
src/warmup.h
tests/ep_testsuite.cc
tests/module_tests/priority_test.cc [deleted file]

index 3bdf830..0836d63 100644 (file)
@@ -131,7 +131,7 @@ ADD_LIBRARY(ep SHARED
             src/failover-table.cc src/flusher.cc src/htresizer.cc
             src/item.cc src/item_pager.cc src/kvshard.cc
             src/memory_tracker.cc src/murmurhash3.cc
-            src/mutex.cc src/priority.cc
+            src/mutex.cc
             src/executorthread.cc
             src/sizes.cc
             ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
@@ -194,7 +194,6 @@ ADD_EXECUTABLE(ep-engine_stream_test
   src/mutation_log.cc
   src/mutex.cc
   src/objectregistry.cc
-  src/priority.cc
   src/tapconnection.cc
   src/stored-value.cc
   src/tapthrottle.cc
@@ -258,9 +257,6 @@ ADD_EXECUTABLE(ep-engine_mutex_test
   tests/module_tests/mutex_test.cc src/testlogger.cc src/mutex.cc)
 TARGET_LINK_LIBRARIES(ep-engine_mutex_test platform)
 
-ADD_EXECUTABLE(ep-engine_priority_test  tests/module_tests/priority_test.cc src/priority.cc)
-TARGET_LINK_LIBRARIES(ep-engine_priority_test platform)
-
 ADD_EXECUTABLE(ep-engine_ringbuffer_test tests/module_tests/ringbuffer_test.cc)
 TARGET_LINK_LIBRARIES(ep-engine_ringbuffer_test platform)
 
@@ -306,7 +302,6 @@ ADD_EXECUTABLE(ep-engine_kvstore_test
   src/mutation_log.cc
   src/mutex.cc
   src/objectregistry.cc
-  src/priority.cc
   src/tapconnection.cc
   src/stored-value.cc
   src/tapthrottle.cc
@@ -334,7 +329,6 @@ ADD_TEST(ep-engine_histo_test ep-engine_histo_test)
 ADD_TEST(ep-engine_hrtime_test ep-engine_hrtime_test)
 ADD_TEST(ep-engine_misc_test ep-engine_misc_test)
 ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
-ADD_TEST(ep-engine_priority_test ep-engine_priority_test)
 ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
 ADD_TEST(ep-engine_stream_test ep-engine_stream_test)
 ADD_TEST(ep-engine_kvstore_test ep-engine_kvstore_test)
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..0b83470
--- /dev/null
+++ b/README.md
@@ -0,0 +1,270 @@
+# Eventually Persistent Engine
+## Threads
+Code in ep-engine is executing in a multithreaded environment, two classes of
+thread exist.
+
+1. memcached's threads, for servicing a client and calling in via the
+[engine API] (https://github.com/couchbase/memcached/blob/master/include/memcached/engine.h)
+2. ep-engine's threads, for running tasks such as the document expiry pager
+(see subclasses of `GlobalTasks`).
+
+## Synchronisation Primitives
+
+There are three mutual-exclusion primitives available in ep-engine.
+
+1. `Mutex` exclusive lock - [mutex.h](./src/mutex.h)
+2. `RWLock` shared, reader/writer lock - [rwlock.h](./src/rwlock.h)
+3. `SpinLock` 1-byte exclusive lock - [atomix.h](./src/atomic.h)
+
+A conditional-variable is also available called `SyncObject`
+[syncobject.h](./src/syncobject.h). `SyncObject` glues a `Mutex` and
+conditional-variable together in one object.
+
+These primitives are managed via RAII wrappers - [locks.h](./src/locks.h).
+
+1. `LockHolder` - for acquiring a `Mutex` or `SyncObject`.
+2. `MultiLockHolder` - for acquiring an array of `Mutex` or `SyncObject`.
+3. `WriterLockHolder` - for acquiring write access to a `RWLock`.
+4. `ReaderLockHolder` - for acquiring read access to a `RWLock`.
+5. `SpinLockHolder` - for acquiring a `SpinLock`.
+
+## Mutex
+The general style is to create a `LockHolder` when you need to acquire a
+`Mutex`, the constructor will acquire and when the `LockHolder` goes out of
+scope, the destructor will release the `Mutex`. For certain use-cases the
+caller can explicitly lock/unlock a `Mutex` via the `LockHolder` class.
+
+```c++
+Mutex mutex;
+void example1() {
+    LockHolder lockHolder(&mutex);
+    ...
+    return;
+}
+
+void example2() {
+    LockHolder lockHolder(&mutex);
+    ...
+    lockHolder.unlock();
+    ...
+    lockHolder.lock();
+    ...
+    return;
+}
+```
+
+A `MultiLockHolder` allows an array of locks to be conveniently acquired and
+released, and similarly to `LockHolder` the caller can choose to manually
+lock/unlock at any time (with all locks locked/unlocked via one call).
+
+```c++
+Mutex mutexes[10];
+Object objects[10];
+void foo() {
+    MultiLockHolder lockHolder(&mutexes, 10);
+    for (int ii = 0; ii < 10; ii++) {
+        objects[ii].doStuff();
+    }
+    return;
+}
+```
+
+## RWLock
+
+`RWLock` allows many readers to acquire it and exclusive access for a writer.
+`ReadLockHolder` acquires the lock for a reader and `WriteLockHolder` acquires
+the lock for a writer. Neither classes enable manual lock/unlock, all
+acquisitions and release are performed via the constructor and destructor.
+
+```c++
+RWLock rwLock;
+Object thing;
+
+void foo1() {
+    ReaderLockHolder rlh(&rwLock);
+    if (thing.getData()) {
+    ...
+    }
+}
+
+void foo2() {
+    WriterLockHolder wlh(&rwLock);
+    thing.setData(...);
+}
+```
+
+## SyncObject
+
+`SyncObject` inherits from `Mutex` and is thus managed via a `LockHolder` or
+`MultiLockHolder`. The `SyncObject` provides the conditional-variable
+synchronisation primitive enabling threads to block and be woken.
+
+The wait/wakeOne/wake method is provided by the `SyncObject`.
+
+Note that `wake` will wake up a single blocking thread, `wakeOne` will wake up
+every thread that is blocking on the `SyncObject`.
+
+```c++
+SyncObject syncObject;
+bool sleeping = false;
+void foo1() {
+    LockHolder lockHolder(&syncObject);
+    sleeping = true;
+    syncObject.wait(); // the mutex is released and the thread put to sleep
+    // when wait returns the mutex is reacquired
+    sleeping = false;
+}
+
+void foo2() {
+    LockHolder lockHolder(&syncObject);
+    if (sleeping) {
+        syncObject.notifyOne();
+    }
+}
+```
+
+## SpinLock
+
+A `SpinLock` uses a single byte for the lock and our own code to spin until the
+lock is acquired. The intention for this lock is for low contention locks.
+
+The RAII pattern is just like for a Mutex.
+
+
+```c++
+SpinLock spinLock;
+void example1() {
+    SpinLockHolder lockHolder(&spinLock);
+    ...
+    return;
+}
+```
+
+## _UNLOCKED convention
+
+ep-engine has a function naming convention that indicates the function should
+be called with a lock acquired.
+
+For example the following `doStuff_UNLOCKED` method indicates that it expect a
+lock to be held before the function is called. What lock should be acquired
+before calling is not defined by the convention.
+
+```c++
+void Object::doStuff_UNLOCKED() {
+}
+
+void Object::run() {
+    LockHolder lockHolder(&mutex);
+    doStuff_UNLOCKED();
+    return;
+}
+```
+## Thread Local Storage (ObjectRegistry).
+
+Threads in ep-engine are servicing buckets and when a thread is dispatched to
+serve a bucket, the pointer to the `EventuallyPersistentEngine` representing
+the bucket is placed into thread local storage, this avoids the need for the
+pointer to be passed along the chain of execution as a formal parameter.
+
+Both threads servicing frontend operations (memcached's threads) and ep-engine's
+own task threads will save the bucket's engine pointer before calling down into
+engine code.
+
+Calling `ObjectRegistry::onSwitchThread(enginePtr)` will save the `enginePtr`
+in thread-local-storage so that subsequent task code can retrieve the pointer
+with `ObjectRegistry::getCurrentEngine()`.
+
+## Tasks
+
+A task is created by creating a sub-class (the `run()` method is the entry point
+of the task) of the `GlobalTask` class and it is scheduled onto one of 4 task
+queue types. Each task should be declared in `src/tasks.defs.h` using the TASK
+macro. Using this macro ensures correct generation of a task-type ID, priority,
+task name and ultimately ensures each task gets its own scheduling statistics.
+
+The recipe is simple.
+
+### Add your task's class name with its priority into `src/tasks.defs.h`
+ * A lower value priority is 'higher'.
+```
+TASK(MyNewTask, 1) // MyNewTask has priority 1.
+```
+
+### Create your class and set its ID using `MY_TASK_ID`.
+
+```
+class MyNewTask : public GlobalTask {
+public:
+    MyNewTask(EventuallyPersistentEngine* e)
+        : GlobalTask(e/*engine/,
+                     MY_TASK_ID(MyNewTask),
+                     0.0/*snooze*/){}
+...
+```
+
+### Define pure-virtual methods in `MyNewTask`
+* run method
+
+The run method is invoked when the task is executed. The method should return
+true if it should be scheduled again. If false is returned, the instance of the
+task is never re-scheduled and will deleted once all references to the instance are
+gone.
+
+```
+bool run() {
+   // Task code here
+   return schedule again?;
+}
+```
+
+* Define the `getDescription` method to aid debugging and statistics.
+```
+std::string getDescription() {
+    return "A brief description of what MyNewTask does";
+}
+```
+
+### Schedule your task to the desired queue.
+```
+ExTask myNewTask = new MyNewTask(&engine);
+myNewTaskId = ExecutorPool::get()->schedule(myNewTask, NONIO_TASK_IDX);
+```
+
+The 4 task queue types are:
+* Readers -  `READER_TASK_IDX`
+ * Tasks that should primarily only read from 'disk'. They generally read from
+the vbucket database files, for example background fetch of a non-resident document.
+* Writers (they are allowed to read too) `WRITER_TASK_IDX`
+ * Tasks that should primarily only write to 'disk'. They generally write to
+the vbucket database files, for example when flushing the write queue.
+* Auxilliary IO `AUXIO_TASK_IDX`
+ * Tasks that read and write 'disk', but not necessarily the vbucket data files.
+* Non IO `NONIO_TASK_IDX`
+ * Tasks that do not perform 'disk' I/O.
+
+### Utilise `snooze`
+
+The snooze value of the task sets when the task should be executed. The initial snooze
+value is set when constructing `GlobalTask`. A value of 0.0 means attempt to execute
+the task as soon as scheduled and 5.0 would be 5 seconds from being scheduled
+(scheduled meaning when `ExecutorPool::get()->schedule(...)` is called).
+
+The `run()` function can also call `snooze(double snoozeAmount)` to set how long
+before the task is rescheduled.
+
+It is **best practice** for most tasks to actually do a sleep forever from their run function:
+
+```
+  snooze(INT_MAX);
+```
+
+Using `INT_MAX` means sleep forever and tasks should always sleep until they have
+real work todo. Tasks **should not periodically poll for work** with a snooze of
+n seconds.
+
+### Utilise `wake()`
+When a task has work todo, some other function should be waking the task using the wake method.
+
+```
+ExecutorPool::get()->wake(myNewTaskId)`
+```
index 4662b87..f8f9a62 100644 (file)
@@ -34,9 +34,9 @@ class AccessScanner : public GlobalTask {
     friend class AccessScannerValueChangeListener;
 public:
     AccessScanner(EventuallyPersistentStore &_store, EPStats &st,
-                  const Priority &p, double sleeptime = 0,
+                  double sleeptime = 0,
                   bool completeBeforeShutdown = false)
-        : GlobalTask(&_store.getEPEngine(), p, sleeptime,
+        : GlobalTask(&_store.getEPEngine(), TaskId::AccessScanner, sleeptime,
                      completeBeforeShutdown),
           completedCount(0), store(_store), stats(st), sleepTime(sleeptime),
           available(true) { }
index e1f3631..d15d75e 100644 (file)
@@ -156,7 +156,6 @@ bool BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
             "Schedule a full backfill from disk for vbucket %d.", vb->getId());
         ExTask task = new BackfillDiskLoad(name, engine, connMap,
                                           underlying, vb->getId(), 0, connToken,
-                                          Priority::TapBgFetcherPriority,
                                           0, false);
         ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
     }
@@ -210,6 +209,6 @@ bool BackFillVisitor::checkValidity() {
 
 bool BackfillTask::run(void) {
     engine->getEpStore()->visit(bfv, "Backfill task", NONIO_TASK_IDX,
-                                Priority::BackfillTaskPriority, 1);
+                                TaskId::BackfillVisitorTask, 1);
     return false;
 }
index dc84e36..454266a 100644 (file)
@@ -51,9 +51,9 @@ public:
 
     BackfillDiskLoad(const std::string &n, EventuallyPersistentEngine* e,
                      TapConnMap &cm, KVStore *s, uint16_t vbid,
-                     uint64_t start_seqno, hrtime_t token, const Priority &p,
+                     uint64_t start_seqno, hrtime_t token,
                      double sleeptime = 0, bool shutdown = false)
-        : GlobalTask(e, p, sleeptime, shutdown),
+        : GlobalTask(e, TaskId::BackfillDiskLoad, sleeptime, shutdown),
           name(n), engine(e), connMap(cm), store(s), vbucket(vbid),
           startSeqno(start_seqno), connToken(token) {
         ScheduleDiskBackfillTapOperation tapop;
@@ -123,7 +123,7 @@ public:
 
     BackfillTask(EventuallyPersistentEngine *e, TapConnMap &cm, Producer *tc,
                  const VBucketFilter &backfillVBFilter):
-                 GlobalTask(e, Priority::BackfillTaskPriority, 0, false),
+                 GlobalTask(e, TaskId::BackfillTask, 0, false),
       bfv(new BackFillVisitor(e, cm, tc, backfillVBFilter)), engine(e) {}
 
     virtual ~BackfillTask() {}
index 3ad0e38..6bbe3d5 100644 (file)
@@ -31,8 +31,7 @@ void BgFetcher::start() {
     bool inverse = false;
     pendingFetch.compare_exchange_strong(inverse, true);
     ExecutorPool* iom = ExecutorPool::get();
-    ExTask task = new BgFetcherTask(&(store->getEPEngine()), this,
-                                      Priority::BgFetcherPriority, false);
+    ExTask task = new MultiBGFetcherTask(&(store->getEPEngine()), this, false);
     this->setTaskId(task->getId());
     iom->schedule(task, READER_TASK_IDX);
     cb_assert(taskId > 0);
index 937569a..bc480fe 100644 (file)
@@ -91,7 +91,7 @@ bool ClosedUnrefCheckpointRemoverTask::run(void) {
         shared_ptr<CheckpointVisitor> pv(new CheckpointVisitor(store, stats,
                                                                available));
         store->visit(pv, "Checkpoint Remover", NONIO_TASK_IDX,
-                     Priority::CheckpointRemoverPriority);
+                     TaskId::ClosedUnrefCheckpointRemoverVisitorTask);
     }
     snooze(sleepTime);
     return true;
index 2a4b939..3e96be7 100644 (file)
@@ -43,7 +43,7 @@ public:
      */
     ClosedUnrefCheckpointRemoverTask(EventuallyPersistentEngine *e,
                                      EPStats &st, size_t interval) :
-        GlobalTask(e, Priority::CheckpointRemoverPriority, interval, false),
+        GlobalTask(e, TaskId::ClosedUnrefCheckpointRemoverTask, interval, false),
         engine(e), stats(st), sleepTime(interval), available(true) {}
 
     bool run(void);
index 6fd4c05..b41591e 100644 (file)
@@ -45,11 +45,8 @@ class ConnectionReaperCallback : public GlobalTask {
 public:
     ConnectionReaperCallback(EventuallyPersistentEngine &e, ConnMap& cm,
                              connection_t &conn)
-        : GlobalTask(&e, Priority::TapConnectionReaperPriority),
+        : GlobalTask(&e, TaskId::ConnectionReaperCallback),
           connMap(cm), connection(conn) {
-        std::stringstream ss;
-        ss << "Reaping tap or dcp connection: " << connection->getName();
-        descr = ss.str();
     }
 
     bool run(void) {
@@ -62,13 +59,12 @@ public:
     }
 
     std::string getDescription() {
-        return descr;
+        return "Reaping tap or dcp connection: " + connection->getName();
     }
 
 private:
     ConnMap &connMap;
     connection_t connection;
-    std::string descr;
 };
 
 /**
@@ -77,7 +73,7 @@ private:
 class ConnNotifierCallback : public GlobalTask {
 public:
     ConnNotifierCallback(EventuallyPersistentEngine *e, ConnNotifier *notifier)
-    : GlobalTask(e, Priority::TapConnNotificationPriority),
+    : GlobalTask(e, TaskId::ConnNotifierCallback),
       connNotifier(notifier) { }
 
     bool run(void) {
@@ -145,7 +141,7 @@ bool ConnNotifier::notifyConnections() {
 class ConnManager : public GlobalTask {
 public:
     ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
-        : GlobalTask(e, Priority::TapConnMgrPriority, MIN_SLEEP_TIME, false),
+        : GlobalTask(e, TaskId::ConnManager, MIN_SLEEP_TIME, false),
           engine(e), connmap(cmap) { }
 
     bool run(void) {
index 6877082..fe62667 100644 (file)
@@ -26,10 +26,12 @@ static const size_t sleepTime = 1;
 
 class BackfillManagerTask : public GlobalTask {
 public:
-    BackfillManagerTask(EventuallyPersistentEngine* e, BackfillManager* mgr,
-                        const Priority &p, double sleeptime = 0,
+    BackfillManagerTask(EventuallyPersistentEngine* e,
+                        BackfillManager* mgr,
+                        double sleeptime = 0,
                         bool shutdown = false)
-        : GlobalTask(e, p, sleeptime, shutdown), manager(mgr) {}
+        : GlobalTask(e, TaskId::BackfillManagerTask, sleeptime, shutdown),
+          manager(mgr) {}
 
     bool run();
 
@@ -129,8 +131,7 @@ void BackfillManager::schedule(stream_t stream, uint64_t start, uint64_t end) {
         return;
     }
 
-    managerTask.reset(new BackfillManagerTask(engine, this,
-                                              Priority::BackfillTaskPriority));
+    managerTask.reset(new BackfillManagerTask(engine, this));
     ExecutorPool::get()->schedule(managerTask, AUXIO_TASK_IDX);
 }
 
index b009f36..bc14b74 100644 (file)
 #include "dcp-response.h"
 #include "dcp-stream.h"
 
-class Processer : public GlobalTask {
+class Processor : public GlobalTask {
 public:
-    Processer(EventuallyPersistentEngine* e, connection_t c,
-                const Priority &p, double sleeptime = 1, bool shutdown = false)
-        : GlobalTask(e, p, sleeptime, shutdown), conn(c) {}
+    Processor(EventuallyPersistentEngine* e, connection_t c,
+              double sleeptime = 1, bool shutdown = false)
+        : GlobalTask(e, TaskId::Processor, sleeptime, shutdown), conn(c) {}
 
-    bool run();
-
-    std::string getDescription();
+    bool run() {
+        DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
+        if (consumer->doDisconnect()) {
+            return false;
+        }
 
-private:
-    connection_t conn;
-};
+        switch (consumer->processBufferedItems()) {
+            case all_processed:
+                snooze(1);
+                break;
+            case more_to_process:
+                snooze(0);
+                break;
+            case cannot_process:
+                snooze(5);
+                break;
+            default:
+                abort();
+        }
 
-bool Processer::run() {
-    DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
-    if (consumer->doDisconnect()) {
-        return false;
+        return true;
     }
 
-    switch (consumer->processBufferedItems()) {
-        case all_processed:
-            snooze(1);
-            break;
-        case more_to_process:
-            snooze(0);
-            break;
-        case cannot_process:
-            snooze(5);
-            break;
-        default:
-            abort();
+    std::string getDescription() {
+        std::stringstream ss;
+        ss << "Processing buffered items for " << conn->getName();
+        return ss.str();
     }
 
-    return true;
-}
-
-std::string Processer::getDescription() {
-    std::stringstream ss;
-    ss << "Processing buffered items for " << conn->getName();
-    return ss.str();
-}
+private:
+    connection_t conn;
+};
 
 DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
                          const std::string &name)
@@ -144,7 +140,7 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
     setPriority = true;
     enableExtMetaData = true;
 
-    ExTask task = new Processer(&engine, this, Priority::PendingOpsPriority, 1);
+    ExTask task = new Processor(&engine, this, 1);
     processTaskId = ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
 }
 
@@ -642,8 +638,7 @@ ENGINE_ERROR_CODE DcpConsumer::handleResponse(
                 "to rollback seq no. %llu", logHeader(), vbid, rollbackSeqno);
 
             ExTask task = new RollbackTask(&engine_, opaque, vbid,
-                                           rollbackSeqno, this,
-                                           Priority::TapBgFetcherPriority);
+                                           rollbackSeqno, this);
             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
             return ENGINE_SUCCESS;
         }
@@ -852,7 +847,7 @@ void DcpConsumer::streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body
                 RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
                 vb->failovers->replaceFailoverLog(body, bodylen);
                 EventuallyPersistentStore* st = engine_.getEpStore();
-                st->scheduleVBSnapshot(Priority::VBucketPersistHighPriority,
+                st->scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH,
                                 st->getVBuckets().getShard(vbucket)->getId());
             }
             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Add stream for opaque %ld"
index c67803d..004ad2b 100644 (file)
@@ -200,9 +200,8 @@ class RollbackTask : public GlobalTask {
 public:
     RollbackTask(EventuallyPersistentEngine* e,
                  uint32_t opaque_, uint16_t vbid_,
-                 uint64_t rollbackSeqno_, dcp_consumer_t conn,
-                 const Priority &p):
-        GlobalTask(e, p, 0, false), engine(e),
+                 uint64_t rollbackSeqno_, dcp_consumer_t conn):
+        GlobalTask(e, TaskId::RollbackTask, 0, false), engine(e),
         opaque(opaque_), vbid(vbid_), rollbackSeqno(rollbackSeqno_),
         cons(conn) { }
 
index 696dad2..ca735b6 100644 (file)
@@ -314,7 +314,8 @@ private:
 class ActiveStreamCheckpointProcessorTask : public GlobalTask {
 public:
     ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
-      : GlobalTask(&e, Priority::ActiveStreamCheckpointProcessor, INT_MAX, false),
+        : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
+                     INT_MAX, false),
       notified(false),
       iterationsBeforeYield(e.getConfiguration()
                             .getDcpProducerSnapshotMarkerYieldLimit()) { }
index eff4cac..160f907 100644 (file)
@@ -23,7 +23,7 @@
 
 DefragmenterTask::DefragmenterTask(EventuallyPersistentEngine* e,
                                    EPStats& stats_)
-  : GlobalTask(e, Priority::DefragmenterTaskPriority, false),
+  : GlobalTask(e, TaskId::DefragmenterTask, false),
     stats(stats_),
     epstore_position(engine->getEpStore()->startPosition()),
     visitor(NULL) {
@@ -119,8 +119,8 @@ bool DefragmenterTask::run(void) {
 }
 
 void DefragmenterTask::stop(void) {
-    if (taskId) {
-        ExecutorPool::get()->cancel(taskId);
+    if (uid) {
+        ExecutorPool::get()->cancel(uid);
     }
 }
 
index 76bde3b..ddb847c 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -154,7 +154,7 @@ public:
     VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
                               RCPtr<VBucket> &vb, double delay) :
                               GlobalTask(&eng,
-                              Priority::VBMemoryDeletionPriority, delay, true),
+                              TaskId::VBucketMemoryDeletionTask, delay, true),
                               e(eng), vbucket(vb), vbid(vb->getId()) { }
 
     std::string getDescription() {
@@ -179,7 +179,7 @@ private:
 class PendingOpsNotification : public GlobalTask {
 public:
     PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
-        GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
+        GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
         engine(e), vbucket(vb) { }
 
     std::string getDescription() {
@@ -224,10 +224,10 @@ EventuallyPersistentStore::EventuallyPersistentStore(
 
     storageProperties = new StorageProperties(true, true, true, true);
 
-    stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
-    stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
+    stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
+    stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
 
-    for (size_t i = 0; i < MAX_TYPE_ID; i++) {
+    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
         stats.schedulingHisto[i].reset();
         stats.taskRuntimeHisto[i].reset();
     }
@@ -1037,7 +1037,7 @@ class KVStatsCallback : public Callback<kvstats_ctx> {
         EventuallyPersistentStore *epstore;
 };
 
-void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
+void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
                                                  uint16_t shardId) {
 
     class VBucketStateVisitor : public VBucketVisitor {
@@ -1072,7 +1072,7 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
     };
 
     KVShard *shard = vbMap.shards[shardId];
-    if (priority == Priority::VBucketPersistLowPriority) {
+    if (prio == VBSnapshotTask::Priority::LOW) {
         shard->setLowPriorityVbSnapshotFlag(false);
     } else {
         shard->setHighPriorityVbSnapshotFlag(false);
@@ -1099,7 +1099,7 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
             break;
         }
 
-        if (priority == Priority::VBucketPersistHighPriority) {
+        if (prio == VBSnapshotTask::Priority::HIGH) {
             if (vbMap.setBucketCreation(iter->first, false)) {
                 LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
             }
@@ -1107,14 +1107,13 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
     }
 
     if (!success) {
-        scheduleVBSnapshot(priority, shard->getId());
+        scheduleVBSnapshot(prio, shard->getId());
     } else {
         stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
     }
 }
 
-bool EventuallyPersistentStore::persistVBState(const Priority &priority,
-                                               uint16_t vbid) {
+bool EventuallyPersistentStore::persistVBState(uint16_t vbid) {
     schedule_vbstate_persist[vbid] = false;
 
     RCPtr<VBucket> vb = getVBucket(vbid);
@@ -1212,7 +1211,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
             ExTask notifyTask = new PendingOpsNotification(engine, vb);
             ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
         }
-        scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
+        scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
     } else {
         FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
         KVShard* shard = vbMap.getShard(vbid);
@@ -1231,18 +1230,18 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
         vbMap.setPersistenceSeqno(vbid, 0);
         vbMap.setBucketCreation(vbid, true);
         lh.unlock();
-        scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
+        scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
     }
     return ENGINE_SUCCESS;
 }
 
-bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
+bool EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio) {
     KVShard *shard = NULL;
-    if (p == Priority::VBucketPersistHighPriority) {
+    if (prio == VBSnapshotTask::Priority::HIGH) {
         for (size_t i = 0; i < vbMap.numShards; ++i) {
             shard = vbMap.shards[i];
             if (shard->setHighPriorityVbSnapshotFlag(true)) {
-                ExTask task = new VBSnapshotTask(&engine, p, i, true);
+                ExTask task = new VBSnapshotTaskHigh(&engine, i, true);
                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
             }
         }
@@ -1250,7 +1249,7 @@ bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
         for (size_t i = 0; i < vbMap.numShards; ++i) {
             shard = vbMap.shards[i];
             if (shard->setLowPriorityVbSnapshotFlag(true)) {
-                ExTask task = new VBSnapshotTask(&engine, p, i, true);
+                ExTask task = new VBSnapshotTaskLow(&engine, i, true);
                 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
             }
         }
@@ -1261,31 +1260,34 @@ bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
     return true;
 }
 
-void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
+void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
                                                    uint16_t shardId,
                                                    bool force) {
     KVShard *shard = vbMap.shards[shardId];
-    if (p == Priority::VBucketPersistHighPriority) {
+    if (prio == VBSnapshotTask::Priority::HIGH) {
         if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
-            ExTask task = new VBSnapshotTask(&engine, p, shardId, true);
+            ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
         }
     } else {
         if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
-            ExTask task = new VBSnapshotTask(&engine, p, shardId, true);
+            ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
         }
     }
 }
 
-void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
+void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
                                                        uint16_t vbid,
                                                        bool force) {
     bool inverse = false;
     if (force ||
         schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
-        ExTask task = new VBStatePersistTask(&engine, priority, vbid, true);
-        ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
+        if (priority == VBStatePersistTask::Priority::HIGH) {
+            ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
+        } else {
+            ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
+        }
     }
 }
 
@@ -1325,8 +1327,7 @@ void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
     ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
 
     if (vbMap.setBucketDeletion(vb->getId(), true)) {
-        ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
-                                       Priority::VBucketDeletionPriority);
+        ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
         ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
     }
 }
@@ -1361,8 +1362,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
     }
 
     LockHolder lh(compactionLock);
-    ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
-                                         vbid, c, cookie);
+    ExTask task = new CompactVBucketTask(&engine, vbid, c, cookie);
     compactionTasks.push_back(std::make_pair(vbid, task));
     if (compactionTasks.size() > 1) {
         if ((stats.diskQueueSize > compactionWriteQueueCap &&
@@ -1873,10 +1873,8 @@ void EventuallyPersistentStore::bgFetch(const std::string &key,
         stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
                                             bgFetchQueue.load());
         ExecutorPool* iom = ExecutorPool::get();
-        ExTask task = new BGFetchTask(&engine, key, vbucket, cookie,
-                                      isMeta,
-                                      Priority::BgFetcherGetMetaPriority,
-                                      bgFetchDelay, false);
+        ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
+                                              isMeta, bgFetchDelay, false);
         iom->schedule(task, READER_TASK_IDX);
         ss << "Queued a background fetch, now at " << bgFetchQueue.load()
            << std::endl;
@@ -2313,7 +2311,6 @@ EventuallyPersistentStore::statsVKey(const std::string &key,
         ExecutorPool* iom = ExecutorPool::get();
         ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
                                            v->getBySeqno(), cookie,
-                                           Priority::VKeyStatBgFetcherPriority,
                                            bgFetchDelay, false);
         iom->schedule(task, READER_TASK_IDX);
         return ENGINE_EWOULDBLOCK;
@@ -2339,7 +2336,6 @@ EventuallyPersistentStore::statsVKey(const std::string &key,
                     ExecutorPool* iom = ExecutorPool::get();
                     ExTask task = new VKeyStatBGFetchTask(&engine, key,
                                                           vbucket, -1, cookie,
-                                           Priority::VKeyStatBgFetcherPriority,
                                                           bgFetchDelay, false);
                     iom->schedule(task, READER_TASK_IDX);
                 }
@@ -3354,7 +3350,7 @@ std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
 
 void EventuallyPersistentStore::warmupCompleted() {
     // Run the vbucket state snapshot job once after the warmup
-    scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
+    scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
 
     if (engine.getConfiguration().getAlogPath().length() > 0) {
 
@@ -3384,7 +3380,7 @@ void EventuallyPersistentStore::warmupCompleted() {
     // right after warmup. Subsequent snapshot tasks will be scheduled every
     // 60 sec by default.
     ExecutorPool *iom = ExecutorPool::get();
-    ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
+    ExTask task = new StatSnap(&engine, 0, false);
     statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
 }
 
@@ -3479,7 +3475,6 @@ void EventuallyPersistentStore::enableAccessScannerTask() {
         accessScanner.sleeptime = alogSleepTime * 60;
         if (accessScanner.sleeptime != 0) {
             ExTask task = new AccessScanner(*this, stats,
-                                            Priority::AccessScannerPriority,
                                             accessScanner.sleeptime);
             accessScanner.task = ExecutorPool::get()->schedule(task,
                                                                AUXIO_TASK_IDX);
@@ -3519,7 +3514,6 @@ void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
         accessScanner.sleeptime = val * 60;
         if (accessScanner.sleeptime != 0) {
             ExTask task = new AccessScanner(*this, stats,
-                                            Priority::AccessScannerPriority,
                                             accessScanner.sleeptime);
             accessScanner.task = ExecutorPool::get()->schedule(task,
                                                                AUXIO_TASK_IDX);
@@ -3540,7 +3534,6 @@ void EventuallyPersistentStore::resetAccessScannerStartTime() {
             ExecutorPool::get()->cancel(accessScanner.task);
             // re-schedule task according to the new task start hour
             ExTask task = new AccessScanner(*this, stats,
-                                            Priority::AccessScannerPriority,
                                             accessScanner.sleeptime);
             accessScanner.task = ExecutorPool::get()->schedule(task,
                                                                AUXIO_TASK_IDX);
@@ -3619,10 +3612,10 @@ EventuallyPersistentStore::endPosition() const
     return EventuallyPersistentStore::Position(vbMap.getSize());
 }
 
-VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
+VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s, TaskId id,
                          shared_ptr<VBucketVisitor> v,
-                         const char *l, const Priority &p, double sleep) :
-    GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
+                         const char *l, double sleep) :
+    GlobalTask(&s->getEPEngine(), id, 0, false), store(s),
     visitor(v), label(l), sleepTime(sleep), currentvb(0)
 {
     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
@@ -3663,12 +3656,10 @@ bool VBCBAdaptor::run(void) {
 VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
                                        shared_ptr<VBucketVisitor> v,
                                        uint16_t sh, const char *l,
-                                       double sleep, bool shutdown):
-    GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
-               0, shutdown),
-    store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
-    shardID(sh)
-{
+                                       double sleep, bool shutdown)
+    : GlobalTask(&(s->getEPEngine()), TaskId::VBucketVisitorTask, 0, shutdown),
+      store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
+      shardID(sh) {
     const VBucketFilter &vbFilter = visitor->getVBucketFilter();
     std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
     cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
@@ -3713,7 +3704,7 @@ void EventuallyPersistentStore::resetUnderlyingStats(void)
         shard->getROUnderlying()->resetStats();
     }
 
-    for (size_t i = 0; i < MAX_TYPE_ID; i++) {
+    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
         stats.schedulingHisto[i].reset();
         stats.taskRuntimeHisto[i].reset();
     }
index af1b9c1..3a2ecf5 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -119,8 +119,8 @@ class Warmup;
 class VBCBAdaptor : public GlobalTask {
 public:
 
-    VBCBAdaptor(EventuallyPersistentStore *s,
-                shared_ptr<VBucketVisitor> v, const char *l, const Priority &p,
+    VBCBAdaptor(EventuallyPersistentStore *s, TaskId id,
+                shared_ptr<VBucketVisitor> v, const char *l,
                 double sleep=0);
 
     std::string getDescription() {
@@ -516,7 +516,7 @@ public:
         return vbMap.getPersistenceSeqno(vb);
     }
 
-    void snapshotVBuckets(const Priority &priority, uint16_t shardId);
+    void snapshotVBuckets(VBSnapshotTask::Priority prio, uint16_t shardId);
 
     /* transfer should be set to true *only* if this vbucket is becoming master
      * as the result of the previous master cleanly handing off contorol. */
@@ -579,10 +579,10 @@ public:
      * Note that this is asynchronous.
      */
     size_t visit(shared_ptr<VBucketVisitor> visitor, const char *lbl,
-               task_type_t taskGroup, const Priority &prio,
+               task_type_t taskGroup, TaskId id,
                double sleepTime=0) {
-        return ExecutorPool::get()->schedule(new VBCBAdaptor(this, visitor,
-                                             lbl, prio, sleepTime), taskGroup);
+        return ExecutorPool::get()->schedule(new VBCBAdaptor(this, id, visitor,
+                                             lbl, sleepTime), taskGroup);
     }
 
     /**
@@ -671,24 +671,24 @@ public:
     /**
      * schedule a vb_state snapshot task for all the shards.
      */
-    bool scheduleVBSnapshot(const Priority &priority);
+    bool scheduleVBSnapshot(VBSnapshotTask::Priority prio);
 
     /**
      * schedule a vb_state snapshot task for a given shard.
      */
-    void scheduleVBSnapshot(const Priority &priority, uint16_t shardId,
+    void scheduleVBSnapshot(VBSnapshotTask::Priority prio, uint16_t shardId,
                             bool force = false);
 
     /**
      * Schedule a vbstate persistence task for a given vbucket.
      */
-    void scheduleVBStatePersist(const Priority &priority, uint16_t vbid,
+    void scheduleVBStatePersist(VBStatePersistTask::Priority prio, uint16_t vbid,
                                 bool force = false);
 
     /**
      * Persist a vbucket's state.
      */
-    bool persistVBState(const Priority &priority, uint16_t vbid);
+    bool persistVBState(uint16_t vbid);
 
     const VBucketMap &getVBuckets() {
         return vbMap;
@@ -749,12 +749,12 @@ public:
         ++vb->numExpiredItems;
     }
 
-    void logQTime(type_id_t taskType, hrtime_t enqTime) {
-        stats.schedulingHisto[taskType].add(enqTime);
+    void logQTime(TaskId taskType, hrtime_t enqTime) {
+        stats.schedulingHisto[static_cast<int>(taskType)].add(enqTime);
     }
 
-    void logRunTime(type_id_t taskType, hrtime_t runTime) {
-        stats.taskRuntimeHisto[taskType].add(runTime);
+    void logRunTime(TaskId taskType, hrtime_t runTime) {
+        stats.taskRuntimeHisto[static_cast<int>(taskType)].add(runTime);
     }
 
     bool multiBGFetchEnabled() {
index a822d1a..77abafe 100644 (file)
@@ -3737,7 +3737,7 @@ public:
 class StatCheckpointTask : public GlobalTask {
 public:
     StatCheckpointTask(EventuallyPersistentEngine *e, const void *c,
-            ADD_STAT a) : GlobalTask(e, Priority::CheckpointStatsPriority,
+            ADD_STAT a) : GlobalTask(e, TaskId::StatCheckpointTask,
                                      0, false),
                           ep(e), cookie(c), add_stat(a) { }
     bool run(void) {
@@ -4220,9 +4220,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doSchedulerStats(const void
                                                                 *cookie,
                                                                 ADD_STAT
                                                                 add_stat) {
-    for (size_t i = 0; i < MAX_TYPE_ID; ++i) {
-        add_casted_stat(Priority::getTypeName(static_cast<type_id_t>(i)),
-                        stats.schedulingHisto[i],
+    for (TaskId id : GlobalTask::allTaskIds) {
+        add_casted_stat(GlobalTask::getTaskName(id),
+                        stats.schedulingHisto[static_cast<int>(id)],
                         add_stat, cookie);
     }
 
@@ -4233,9 +4233,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doRunTimeStats(const void
                                                                 *cookie,
                                                                 ADD_STAT
                                                                 add_stat) {
-    for (size_t i = 0; i < MAX_TYPE_ID; ++i) {
-        add_casted_stat(Priority::getTypeName(static_cast<type_id_t>(i)),
-                        stats.taskRuntimeHisto[i],
+    for (TaskId id : GlobalTask::allTaskIds) {
+        add_casted_stat(GlobalTask::getTaskName(id),
+                        stats.taskRuntimeHisto[static_cast<int>(id)],
                         add_stat, cookie);
     }
 
@@ -5859,8 +5859,8 @@ class FetchAllKeysTask : public GlobalTask {
 public:
     FetchAllKeysTask(EventuallyPersistentEngine *e, const void *c,
                      ADD_RESPONSE resp, const std::string &start_key_,
-                     uint16_t vbucket, uint32_t count_, const Priority &p) :
-        GlobalTask(e, p, 0, false), engine(e), cookie(c),
+                     uint16_t vbucket, uint32_t count_) :
+        GlobalTask(e, TaskId::FetchAllKeysTask, 0, false), engine(e), cookie(c),
         response(resp), start_key(start_key_), vbid(vbucket),
         count(count_) { }
 
@@ -5945,8 +5945,7 @@ EventuallyPersistentEngine::getAllKeys(const void* cookie,
     std::string start_key(keyptr, keylen);
 
     ExTask task = new FetchAllKeysTask(this, cookie, response, start_key,
-                                       vbucket, count,
-                                       Priority::BgFetcherPriority);
+                                       vbucket, count);
     ExecutorPool::get()->schedule(task, READER_TASK_IDX);
     return ENGINE_EWOULDBLOCK;
 }
index 38b5b3a..8fe7fcb 100644 (file)
@@ -93,7 +93,7 @@ void ExecutorThread::run() {
             if (currentTask->isdead()) {
                 // release capacity back to TaskQueue
                 manager->doneWork(curTaskType);
-                manager->cancel(currentTask->taskId, true);
+                manager->cancel(currentTask->uid, true);
                 continue;
             }
 
@@ -131,7 +131,7 @@ void ExecutorThread::run() {
                 if (!again || currentTask->isdead()) {
                     // release capacity back to TaskQueue
                     manager->doneWork(curTaskType);
-                    manager->cancel(currentTask->taskId, true);
+                    manager->cancel(currentTask->uid, true);
                 } else {
                     hrtime_t new_waketime;
                     // if a task has not set snooze, update its waketime to now
index d4c6a61..4bf964f 100644 (file)
@@ -132,7 +132,7 @@ void Flusher::initialize(size_t tid) {
 void Flusher::schedule_UNLOCKED() {
     ExecutorPool* iom = ExecutorPool::get();
     ExTask task = new FlusherTask(ObjectRegistry::getCurrentEngine(),
-                                  this, Priority::FlusherPriority,
+                                  this,
                                   shard->getId());
     this->setTaskId(task->getId());
     iom->schedule(task, WRITER_TASK_IDX);
index 7a1a1d8..bd85051 100644 (file)
@@ -41,7 +41,7 @@ public:
 bool HashtableResizerTask::run(void) {
     shared_ptr<ResizingVisitor> pv(new ResizingVisitor);
     store->visit(pv, "Hashtable resizer", NONIO_TASK_IDX,
-            Priority::ItemPagerPriority);
+            TaskId::HashtableResizerVisitorTask);
 
     snooze(FREQUENCY);
     return true;
index 7fd9b88..c63d0b6 100644 (file)
@@ -34,7 +34,7 @@ class HashtableResizerTask : public GlobalTask {
 public:
 
     HashtableResizerTask(EventuallyPersistentStore *s, double sleepTime) :
-    GlobalTask(&s->getEPEngine(), Priority::HTResizePriority, sleepTime, false),
+    GlobalTask(&s->getEPEngine(), TaskId::HashtableResizerTask, sleepTime, false),
     store(s) {}
 
     bool run(void);
index 02a4885..3e8f860 100644 (file)
@@ -263,7 +263,7 @@ bool ItemPager::run(void) {
                                                        available,
                                                        false, bias, &phase));
         store->visit(pv, "Item pager", NONIO_TASK_IDX,
-                    Priority::ItemPagerPriority);
+                     TaskId::ItemPagerVisitor);
     }
 
     snooze(sleepTime);
@@ -281,7 +281,7 @@ bool ExpiredItemPager::run(void) {
                                                        true, 1, NULL));
         // track spawned tasks for shutdown..
         store->visit(pv, "Expired item remover", NONIO_TASK_IDX,
-                Priority::ItemPagerPriority, 10);
+                     TaskId::ExpiredItemPagerVisitor, 10);
     }
     snooze(sleepTime);
     return true;
index 213161e..611bc72 100644 (file)
@@ -63,7 +63,7 @@ public:
      * @param st the stats
      */
     ItemPager(EventuallyPersistentEngine *e, EPStats &st) :
-        GlobalTask(e, Priority::ItemPagerPriority, 10, false),
+        GlobalTask(e, TaskId::ItemPager, 10, false),
         engine(e), stats(st), available(true), phase(PAGING_UNREFERENCED),
         doEvict(false) {}
 
@@ -104,7 +104,7 @@ public:
      */
     ExpiredItemPager(EventuallyPersistentEngine *e, EPStats &st,
                      size_t stime) :
-        GlobalTask(e, Priority::ItemPagerPriority, static_cast<double>(stime),
+        GlobalTask(e, TaskId::ExpiredItemPager, static_cast<double>(stime),
         false), engine(e), stats(st), sleepTime(static_cast<double>(stime)),
         available(true) { }
 
diff --git a/src/priority.cc b/src/priority.cc
deleted file mode 100644 (file)
index c7bcae4..0000000
+++ /dev/null
@@ -1,120 +0,0 @@
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- *     Copyright 2010 Couchbase, Inc
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-#include "config.h"
-
-#include "priority.h"
-
-// Priorities for Read-only IO tasks
-const Priority Priority::BgFetcherPriority(BGFETCHER_ID, 0);
-const Priority Priority::BgFetcherGetMetaPriority(BGFETCHER_GET_META_ID, 1);
-const Priority Priority::WarmupPriority(WARMUP_ID, 0);
-const Priority Priority::VKeyStatBgFetcherPriority(VKEY_STAT_BGFETCHER_ID, 3);
-
-// Priorities for Auxiliary IO tasks
-const Priority Priority::TapBgFetcherPriority(TAP_BGFETCHER_ID, 1);
-const Priority Priority::AccessScannerPriority(ACCESS_SCANNER_ID, 3);
-const Priority Priority::BackfillTaskPriority(BACKFILL_TASK_ID, 8);
-
-// Priorities for Read-Write IO tasks
-const Priority Priority::VBucketDeletionPriority(VBUCKET_DELETION_ID, 1);
-const Priority Priority::CompactorPriority(COMPACTOR_ID, 2);
-const Priority Priority::VBucketPersistHighPriority(VBUCKET_PERSIST_HIGH_ID, 2);
-const Priority Priority::FlushAllPriority(FLUSHALL_ID, 3);
-const Priority Priority::FlusherPriority(FLUSHER_ID, 5);
-const Priority Priority::VBucketPersistLowPriority(VBUCKET_PERSIST_LOW_ID, 9);
-const Priority Priority::StatSnapPriority(STAT_SNAP_ID, 9);
-const Priority Priority::MutationLogCompactorPriority(MUTATION_LOG_COMPACTOR_ID, 9);
-
-// Priorities for NON-IO tasks
-const Priority Priority::PendingOpsPriority(PENDING_OPS_ID, 0);
-const Priority Priority::TapConnNotificationPriority(TAP_CONN_NOTIFICATION_ID, 5);
-const Priority Priority::CheckpointRemoverPriority(CHECKPOINT_REMOVER_ID, 6);
-const Priority Priority::TapConnectionReaperPriority(TAP_CONNECTION_REAPER_ID, 6);
-const Priority Priority::VBMemoryDeletionPriority(VB_MEMORY_DELETION_ID, 6);
-const Priority Priority::CheckpointStatsPriority(CHECKPOINT_STATS_ID, 7);
-const Priority Priority::ItemPagerPriority(ITEM_PAGER_ID, 7);
-const Priority Priority::DefragmenterTaskPriority(DEFRAGMENTER_ID, 7);
-const Priority Priority::TapConnMgrPriority(TAP_CONN_MGR_ID, 8);
-const Priority Priority::WorkLoadMonitorPriority(WORKLOAD_MONITOR_TASK_ID, 10);
-const Priority Priority::HTResizePriority(HT_RESIZER_ID, 211);
-const Priority Priority::TapResumePriority(TAP_RESUME_ID, 316);
-const Priority Priority::ActiveStreamCheckpointProcessor(ACTIVE_STREAM_CHKPT_PROCESSOR_ID, 5);
-
-const char *Priority::getTypeName(const type_id_t i) {
-        switch (i) {
-            case BGFETCHER_ID:
-                return "bg_fetcher_tasks";
-            case BGFETCHER_GET_META_ID:
-                return "bg_fetcher_meta_tasks";
-            case TAP_BGFETCHER_ID:
-                return "tap_bg_fetcher_tasks";
-            case VKEY_STAT_BGFETCHER_ID:
-                return "vkey_stat_bg_fetcher_tasks";
-            case WARMUP_ID:
-                return "warmup_tasks";
-            case VBUCKET_PERSIST_HIGH_ID:
-                return "vbucket_persist_high_tasks";
-            case VBUCKET_DELETION_ID:
-                return "vbucket_deletion_tasks";
-            case FLUSHER_ID:
-                return "flusher_tasks";
-            case FLUSHALL_ID:
-                return "flush_all_tasks";
-            case COMPACTOR_ID:
-                return "compactor_tasks";
-            case VBUCKET_PERSIST_LOW_ID:
-                return "vbucket_persist_low_tasks";
-            case STAT_SNAP_ID:
-                return "statsnap_tasks";
-            case MUTATION_LOG_COMPACTOR_ID:
-                return "mutation_log_compactor_tasks";
-            case ACCESS_SCANNER_ID:
-                return "access_scanner_tasks";
-            case TAP_CONN_NOTIFICATION_ID:
-                return "conn_notification_tasks";
-            case CHECKPOINT_REMOVER_ID:
-                return "checkpoint_remover_tasks";
-            case VB_MEMORY_DELETION_ID:
-                return "vb_memory_deletion_tasks";
-            case CHECKPOINT_STATS_ID:
-                return "checkpoint_stats_tasks";
-            case ITEM_PAGER_ID:
-                return "item_pager_tasks";
-            case BACKFILL_TASK_ID:
-                return "backfill_tasks_tasks";
-            case WORKLOAD_MONITOR_TASK_ID:
-                return "workload_monitor_tasks";
-            case TAP_RESUME_ID:
-                return "tap_resume_tasks";
-            case TAP_CONNECTION_REAPER_ID:
-                return "tapconnection_reaper_tasks";
-            case HT_RESIZER_ID:
-                return "hashtable_resize_tasks";
-            case PENDING_OPS_ID:
-                return "pending_ops_tasks";
-            case TAP_CONN_MGR_ID:
-                return "conn_manager_tasks";
-            case DEFRAGMENTER_ID:
-                return "defragmenter_tasks";
-            case ACTIVE_STREAM_CHKPT_PROCESSOR_ID:
-                return "activestream_chkpt_processor_tasks";
-            default: break;
-        }
-
-        return "error";
-    }
diff --git a/src/priority.h b/src/priority.h
deleted file mode 100644 (file)
index 6378d9f..0000000
+++ /dev/null
@@ -1,148 +0,0 @@
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- *     Copyright 2010 Couchbase, Inc
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-#ifndef SRC_PRIORITY_H_
-#define SRC_PRIORITY_H_ 1
-
-#include "config.h"
-
-#include <string>
-
-#include "common.h"
-
-typedef enum {
-    BGFETCHER_ID=0,
-    BGFETCHER_GET_META_ID,
-    TAP_BGFETCHER_ID,
-    VKEY_STAT_BGFETCHER_ID,
-    WARMUP_ID,
-    VBUCKET_PERSIST_HIGH_ID,
-    VBUCKET_DELETION_ID,
-    FLUSHER_ID,
-    FLUSHALL_ID,
-    COMPACTOR_ID,
-    VBUCKET_PERSIST_LOW_ID,
-    STAT_SNAP_ID,
-    MUTATION_LOG_COMPACTOR_ID,
-    ACCESS_SCANNER_ID,
-    TAP_CONN_NOTIFICATION_ID,
-    CHECKPOINT_REMOVER_ID,
-    VB_MEMORY_DELETION_ID,
-    CHECKPOINT_STATS_ID,
-    ITEM_PAGER_ID,
-    BACKFILL_TASK_ID,
-    WORKLOAD_MONITOR_TASK_ID,
-    TAP_RESUME_ID,
-    TAP_CONNECTION_REAPER_ID,
-    HT_RESIZER_ID,
-    PENDING_OPS_ID,
-    TAP_CONN_MGR_ID,
-    DEFRAGMENTER_ID,
-    ACTIVE_STREAM_CHKPT_PROCESSOR_ID,
-
-    MAX_TYPE_ID // Keep this as the last enum value
-} type_id_t;
-
-/**
- * Task priority definition.
- */
-class Priority {
-public:
-    // Priorities for Read-only tasks
-    static const Priority BgFetcherPriority;
-    static const Priority BgFetcherGetMetaPriority;
-    static const Priority TapBgFetcherPriority;
-    static const Priority VKeyStatBgFetcherPriority;
-    static const Priority WarmupPriority;
-
-    // Priorities for Read-Write tasks
-    static const Priority VBucketPersistHighPriority;
-    static const Priority VBucketDeletionPriority;
-    static const Priority FlusherPriority;
-    static const Priority FlushAllPriority;
-    static const Priority CompactorPriority;
-    static const Priority VBucketPersistLowPriority;
-    static const Priority StatSnapPriority;
-    static const Priority MutationLogCompactorPriority;
-    static const Priority AccessScannerPriority;
-
-    // Priorities for NON-IO tasks
-    static const Priority TapConnNotificationPriority;
-    static const Priority CheckpointRemoverPriority;
-    static const Priority VBMemoryDeletionPriority;
-    static const Priority CheckpointStatsPriority;
-    static const Priority ItemPagerPriority;
-    static const Priority BackfillTaskPriority;
-    static const Priority WorkLoadMonitorPriority;
-    static const Priority TapResumePriority;
-    static const Priority TapConnectionReaperPriority;
-    static const Priority HTResizePriority;
-    static const Priority PendingOpsPriority;
-    static const Priority TapConnMgrPriority;
-    static const Priority DefragmenterTaskPriority;
-    static const Priority ActiveStreamCheckpointProcessor;
-
-    bool operator==(const Priority &other) const {
-        return other.getPriorityValue() == this->priority;
-    }
-
-    bool operator<(const Priority &other) const {
-        return this->priority > other.getPriorityValue();
-    }
-
-    bool operator>(const Priority &other) const {
-        return this->priority < other.getPriorityValue();
-    }
-
-    /**
-     * Return the task name.
-     *
-     * @return a task name
-     */
-    static const char *getTypeName(const type_id_t i);
-
-    /**
-     * Return the type id representing a task
-     *
-     * @return type id
-     */
-    type_id_t getTypeId() const {
-        return t_id;
-    }
-
-    /**
-     * Return an integer value that represents a priority.
-     *
-     * @return a priority value
-     */
-    int getPriorityValue() const {
-        return priority;
-    }
-
-    // gcc didn't like the idea of having a class with no constructor
-    // available to anyone.. let's make it protected instead to shut
-    // gcc up :(
-protected:
-    Priority(type_id_t id, int p) : t_id(id), priority(p) { }
-    type_id_t t_id;
-    int priority;
-
-private:
-    DISALLOW_COPY_AND_ASSIGN(Priority);
-};
-
-#endif  // SRC_PRIORITY_H_
index 2f04265..f99e980 100644 (file)
@@ -664,7 +664,7 @@ class ResumeCallback : public GlobalTask {
 public:
     ResumeCallback(EventuallyPersistentEngine &e, Producer *c,
                    double sleepTime)
-        : GlobalTask(&e, Priority::TapResumePriority, sleepTime),
+        : GlobalTask(&e, TaskId::ResumeCallback, sleepTime),
           engine(e), conn(c) {
         std::stringstream ss;
         ss << "Resuming suspended tap connection: " << conn->getName();
@@ -1029,8 +1029,7 @@ const char *TapProducer::opaqueCmdToString(uint32_t opaque_code) {
 
 void TapProducer::queueBGFetch_UNLOCKED(const std::string &key, uint64_t id, uint16_t vb) {
     ExTask task = new BGFetchCallback(&engine(), getName(), key, vb,
-                                      getConnectionToken(),
-                                      Priority::TapBgFetcherPriority, 0);
+                                      getConnectionToken(), 0);
     ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
     ++bgJobIssued;
     std::map<uint16_t, CheckpointState>::iterator it = checkpointState_.find(vb);
index f10b331..ea3441a 100644 (file)
@@ -678,8 +678,8 @@ class BGFetchCallback : public GlobalTask {
 public:
     BGFetchCallback(EventuallyPersistentEngine *e, const std::string &n,
                     const std::string &k, uint16_t vbid, hrtime_t token,
-                    const Priority &p, double sleeptime = 0) :
-        GlobalTask(e, p, sleeptime, false), name(n), key(k), epe(e),
+                    double sleeptime = 0) :
+        GlobalTask(e, TaskId::BGFetchCallback, sleeptime, false), name(n), key(k), epe(e),
         init(gethrtime()), connToken(token), vbucket(vbid)
     {
         cb_assert(epe);
index fe46ddb..1a14552 100644 (file)
 #include "tasks.h"
 #include "warmup.h"
 
+#include <type_traits>
+
 static const double VBSTATE_SNAPSHOT_FREQ(300.0);
 static const double WORKLOAD_MONITOR_FREQ(5.0);
 
+GlobalTask::GlobalTask(EventuallyPersistentEngine *e,
+                       TaskId taskId,
+                       double sleeptime,
+                       bool completeBeforeShutdown)
+    : RCValue(),
+      blockShutdown(completeBeforeShutdown),
+      state(TASK_RUNNING),
+      uid(nextTaskId()),
+      typeId(taskId),
+      engine(e) {
+    priority = getTaskPriority(taskId);
+    snooze(sleeptime);
+}
+
 void GlobalTask::snooze(const double secs) {
     if (secs == INT_MAX) {
         setState(TASK_SNOOZED, TASK_RUNNING);
@@ -41,6 +57,68 @@ void GlobalTask::snooze(const double secs) {
     }
 }
 
+// These static_asserts previously were in priority_test.cc
+static_assert(TaskPriority::MultiBGFetcherTask < TaskPriority::BGFetchCallback,
+              "MultiBGFetcherTask not less than BGFetchCallback");
+
+static_assert(TaskPriority::BGFetchCallback == TaskPriority::VBDeleteTask,
+              "BGFetchCallback not equal VBDeleteTask");
+
+static_assert(TaskPriority::VBStatePersistTaskHigh <
+              TaskPriority::VKeyStatBGFetchTask,
+              "VBStatePersistTaskHigh not less than VKeyStatBGFetchTask");
+
+static_assert(TaskPriority::VKeyStatBGFetchTask < TaskPriority::FlusherTask,
+              "VKeyStatBGFetchTask not less than FlusherTask");
+
+static_assert(TaskPriority::FlusherTask < TaskPriority::ItemPager,
+              "FlusherTask not less than ItemPager");
+
+static_assert(TaskPriority::ItemPager < TaskPriority::BackfillManagerTask,
+              "ItemPager not less than BackfillManagerTask");
+
+/*
+ * Generate a switch statement from tasks.def.h that maps TaskId to a
+ * stringified value of the task's name.
+ */
+const char* GlobalTask::getTaskName(TaskId id) {
+    switch(id) {
+#define TASK(name, prio) case TaskId::name: {return #name;}
+#include "tasks.def.h"
+#undef TASK
+        case TaskId::TASK_COUNT: {
+            throw std::invalid_argument("GlobalTask::getTaskName(TaskId::TASK_COUNT) called.");
+        }
+    }
+    throw std::logic_error("GlobalTask::getTaskName() unknown id " +
+                          std::to_string(static_cast<int>(id)));
+    return nullptr;
+}
+
+/*
+ * Generate a switch statement from tasks.def.h that maps TaskId to priority
+ */
+TaskPriority GlobalTask::getTaskPriority(TaskId id) {
+   switch(id) {
+#define TASK(name, prio) case TaskId::name: {return TaskPriority::name;}
+#include "tasks.def.h"
+#undef TASK
+        case TaskId::TASK_COUNT: {
+            throw std::invalid_argument("GlobalTask::getTaskPriority(TaskId::TASK_COUNT) called.");
+        }
+    }
+    throw std::logic_error("GlobalTask::getTaskPriority() unknown id " +
+                           std::to_string(static_cast<int>(id)));
+    return TaskPriority::PRIORITY_COUNT;
+}
+
+std::array<TaskId, static_cast<int>(TaskId::TASK_COUNT)> GlobalTask::allTaskIds = {{
+#define TASK(name, prio) TaskId::name,
+#include "tasks.def.h"
+#undef TASK
+}};
+
+
 bool FlusherTask::run() {
     return flusher->step(this);
 }
@@ -51,21 +129,20 @@ bool VBSnapshotTask::run() {
 }
 
 DaemonVBSnapshotTask::DaemonVBSnapshotTask(EventuallyPersistentEngine *e,
-                                           bool completeBeforeShutdown) :
-    GlobalTask(e, Priority::VBucketPersistLowPriority, VBSTATE_SNAPSHOT_FREQ,
-               completeBeforeShutdown) {
-        desc = "Snapshotting vbucket states";
+                                           bool completeBeforeShutdown)
+    : GlobalTask(e, TaskId::DaemonVBSnapshotTask,
+                 VBSTATE_SNAPSHOT_FREQ, completeBeforeShutdown) {
+    desc = "Snapshotting vbucket states";
 }
 
 bool DaemonVBSnapshotTask::run() {
-    bool ret = engine->getEpStore()->scheduleVBSnapshot(
-               Priority::VBucketPersistLowPriority);
+    bool ret = engine->getEpStore()->scheduleVBSnapshot(VBSnapshotTask::Priority::LOW);
     snooze(VBSTATE_SNAPSHOT_FREQ);
     return ret;
 }
 
 bool VBStatePersistTask::run() {
-    return engine->getEpStore()->persistVBState(priority, vbid);
+    return engine->getEpStore()->persistVBState(vbid);
 }
 
 bool VBDeleteTask::run() {
@@ -85,11 +162,11 @@ bool StatSnap::run() {
     if (runOnce) {
         return false;
     }
-    ExecutorPool::get()->snooze(taskId, 60);
+    ExecutorPool::get()->snooze(uid, 60);
     return true;
 }
 
-bool BgFetcherTask::run() {
+bool MultiBGFetcherTask::run() {
     return bgfetcher->run(this);
 }
 
@@ -104,7 +181,7 @@ bool VKeyStatBGFetchTask::run() {
 }
 
 
-bool BGFetchTask::run() {
+bool SingleBGFetcherTask::run() {
     engine->getEpStore()->completeBGFetch(key, vbucket, cookie, init,
                                           metaFetch);
     return false;
@@ -112,7 +189,7 @@ bool BGFetchTask::run() {
 
 WorkLoadMonitor::WorkLoadMonitor(EventuallyPersistentEngine *e,
                                  bool completeBeforeShutdown) :
-    GlobalTask(e, Priority::WorkLoadMonitorPriority, WORKLOAD_MONITOR_FREQ,
+    GlobalTask(e, TaskId::WorkLoadMonitor, WORKLOAD_MONITOR_FREQ,
                completeBeforeShutdown) {
     prevNumMutations = getNumMutations();
     prevNumGets = getNumGets();
diff --git a/src/tasks.def.h b/src/tasks.def.h
new file mode 100644 (file)
index 0000000..fa4ef48
--- /dev/null
@@ -0,0 +1,86 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/*
+ * Every task within ep-engine is declared in this file
+ *
+ * The TASK(name, priority) macro will be pre-processed to generate
+ *   - a unique std::string name
+ *   - a unique type-id
+ *   - a unique priority object
+ *
+ * task.h and .cc include this file with a customised TASK macro.
+ */
+
+// Read IO tasks
+TASK(MultiBGFetcherTask, 0)
+TASK(FetchAllKeysTask, 0)
+TASK(Warmup, 0)
+TASK(WarmupInitialize, 0)
+TASK(WarmupCreateVBuckets, 0)
+TASK(WarmupEstimateDatabaseItemCount, 0)
+TASK(WarmupKeyDump, 0)
+TASK(WarmupCheckforAccessLog, 0)
+TASK(WarmupLoadAccessLog, 0)
+TASK(WarmupLoadingKVPairs, 0)
+TASK(WarmupLoadingData, 0)
+TASK(WarmupCompletion, 0)
+TASK(SingleBGFetcherTask, 1)
+TASK(VKeyStatBGFetchTask, 3)
+
+// Aux IO tasks
+TASK(BackfillDiskLoad, 1)
+TASK(BGFetchCallback, 1)
+TASK(AccessScanner, 3)
+TASK(VBucketVisitorTask, 3)
+TASK(ActiveStreamCheckpointProcessorTask, 5)
+TASK(BackfillManagerTask, 8)
+TASK(BackfillTask, 8)
+TASK(BackfillVisitorTask, 8)
+
+// Read/Write IO tasks
+TASK(VBDeleteTask, 1)
+TASK(RollbackTask, 1)
+TASK(CompactVBucketTask, 2)
+TASK(VBStatePersistTaskHigh, 2)
+TASK(VBSnapshotTaskHigh, 2)
+TASK(FlushAllTask, 3)
+TASK(FlusherTask, 5)
+TASK(VBStatePersistTaskLow, 9)
+TASK(VBSnapshotTaskLow, 9)
+TASK(DaemonVBSnapshotTask, 9)
+TASK(StatSnap, 9)
+
+// Non-IO tasks
+TASK(PendingOpsNotification, 0)
+TASK(Processor, 0)
+TASK(ConnNotifierCallback, 5)
+TASK(ConnectionReaperCallback, 6)
+TASK(ClosedUnrefCheckpointRemoverTask, 6)
+TASK(ClosedUnrefCheckpointRemoverVisitorTask, 6)
+TASK(VBucketMemoryDeletionTask, 6)
+TASK(StatCheckpointTask, 7)
+TASK(ItemPager, 7)
+TASK(ExpiredItemPager, 7)
+TASK(ItemPagerVisitor, 7)
+TASK(ExpiredItemPagerVisitor, 7)
+TASK(DefragmenterTask, 7)
+TASK(ConnManager, 8)
+TASK(WorkLoadMonitor, 10)
+TASK(ResumeCallback, 316)
+TASK(HashtableResizerTask, 211)
+TASK(HashtableResizerVisitorTask, 7)
index 692997f..c24abb5 100644 (file)
 
 #include "config.h"
 
+#include <array>
 #include <list>
 #include <string>
 #include <utility>
 
 #include "atomic.h"
-#include "priority.h"
 
 typedef enum {
     TASK_RUNNING,
@@ -33,6 +33,22 @@ typedef enum {
     TASK_DEAD
 } task_state_t;
 
+enum class TaskId : int {
+#define TASK(name, prio) name,
+#include "tasks.def.h"
+#undef TASK
+    TASK_COUNT
+};
+
+typedef int queue_priority_t;
+
+enum class TaskPriority : int {
+#define TASK(name, prio) name = prio,
+#include "tasks.def.h"
+#undef TASK
+    PRIORITY_COUNT
+};
+
 class BfilterCB;
 class BgFetcher;
 class CompareTasksByDueDate;
@@ -68,17 +84,13 @@ friend class ExecutorPool;
 friend class ExecutorThread;
 friend class TaskQueue;
 public:
-    GlobalTask(EventuallyPersistentEngine *e, const Priority &p,
-               double sleeptime = 0, bool completeBeforeShutdown = true) :
-          RCValue(), priority(p),
-          blockShutdown(completeBeforeShutdown),
-          state(TASK_RUNNING), taskId(nextTaskId()), engine(e) {
-        snooze(sleeptime);
-    }
+    GlobalTask(EventuallyPersistentEngine *e,
+               TaskId taskId,
+               double sleeptime = 0,
+               bool completeBeforeShutdown = true);
 
     /* destructor */
-    virtual ~GlobalTask(void) {
-    }
+    virtual ~GlobalTask(void) {}
 
     /**
      * The invoked function when the task is executed.
@@ -101,10 +113,9 @@ public:
     /**
      * test if a task is dead
      */
-     bool isdead(void) {
+    bool isdead(void) {
         return (state == TASK_DEAD);
-     }
-
+    }
 
     /**
      * Cancels this task by marking it dead.
@@ -123,14 +134,14 @@ public:
      *
      * @return A unique task id number.
      */
-    size_t getId() { return taskId; }
+    size_t getId() const { return uid; }
 
     /**
      * Returns the type id of this task.
      *
      * @return A type id of the task.
      */
-    type_id_t getTypeId() { return priority.getTypeId(); }
+    TaskId getTypeId() const { return typeId; }
 
     /**
      * Gets the engine that this task was scheduled from
@@ -147,11 +158,33 @@ public:
         state.compare_exchange_strong(expected, tstate);
     }
 
+    queue_priority_t getQueuePriority() const {
+        return static_cast<queue_priority_t>(priority);
+    }
+
+    /*
+     * Lookup the task name for TaskId id.
+     * The data used is generated from tasks.def.h
+     */
+    static const char* getTaskName(TaskId id);
+
+    /*
+     * Lookup the task priority for TaskId id.
+     * The data used is generated from tasks.def.h
+     */
+    static TaskPriority getTaskPriority(TaskId id);
+
+    /*
+     * A vector of all TaskId generated from tasks.def.h
+     */
+    static std::array<TaskId, static_cast<int>(TaskId::TASK_COUNT)> allTaskIds;
+
 protected:
-    const Priority &priority;
     bool blockShutdown;
     AtomicValue<task_state_t> state;
-    const size_t taskId;
+    const size_t uid;
+    TaskId typeId;
+    TaskPriority priority;
     EventuallyPersistentEngine *engine;
 
     static AtomicValue<size_t> task_id_counter;
@@ -180,9 +213,10 @@ typedef SingleThreadedRCPtr<GlobalTask> ExTask;
  */
 class FlusherTask : public GlobalTask {
 public:
-    FlusherTask(EventuallyPersistentEngine *e, Flusher* f, const Priority &p,
-                uint16_t shardid, bool completeBeforeShutdown = true) :
-                GlobalTask(e, p, 0, completeBeforeShutdown), flusher(f) {
+    FlusherTask(EventuallyPersistentEngine *e, Flusher* f, uint16_t shardid,
+                bool completeBeforeShutdown = true)
+        : GlobalTask(e, TaskId::FlusherTask, 0, completeBeforeShutdown),
+          flusher(f) {
         std::stringstream ss;
         ss<<"Running a flusher loop: shard "<<shardid;
         desc = ss.str();
@@ -207,23 +241,46 @@ private:
  */
 class VBSnapshotTask : public GlobalTask {
 public:
-    VBSnapshotTask(EventuallyPersistentEngine *e, const Priority &p,
-                uint16_t sID = 0, bool completeBeforeShutdown = true) :
-                GlobalTask(e, p, 0, completeBeforeShutdown), shardID(sID) {
-        std::stringstream ss;
-        ss<<"Snapshotting vbucket states for the shard: "<<shardID;
-        desc = ss.str();
-    }
+    enum class Priority {
+        HIGH,
+        LOW
+    };
 
     bool run();
 
     std::string getDescription() {
-        return desc;
+        return "Snapshotting vbucket states for the shard: " + std::to_string(shardID);
     }
 
+protected:
+    VBSnapshotTask(EventuallyPersistentEngine *e,
+                   TaskId id,
+                   uint16_t sID = 0,
+                   bool completeBeforeShutdown = true)
+        : GlobalTask(e, id, 0, completeBeforeShutdown),
+          shardID(sID) {}
+
 private:
     uint16_t shardID;
-    std::string desc;
+    Priority priority;
+};
+
+class VBSnapshotTaskHigh : public VBSnapshotTask {
+public:
+    VBSnapshotTaskHigh(EventuallyPersistentEngine *e,
+                       uint16_t sID = 0,
+                       bool completeBeforeShutdown = true)
+        : VBSnapshotTask(e, TaskId::VBSnapshotTaskHigh,
+                         sID, completeBeforeShutdown){}
+};
+
+class VBSnapshotTaskLow : public VBSnapshotTask {
+public:
+    VBSnapshotTaskLow(EventuallyPersistentEngine *e,
+                      uint16_t sID = 0,
+                      bool completeBeforeShutdown = true)
+        : VBSnapshotTask(e, TaskId::VBSnapshotTaskLow,
+                         sID, completeBeforeShutdown){}
 };
 
 /**
@@ -250,23 +307,46 @@ private:
  */
 class VBStatePersistTask : public GlobalTask {
 public:
-    VBStatePersistTask(EventuallyPersistentEngine *e, const Priority &p,
-                       uint16_t vbucket, bool completeBeforeShutdown = true) :
-        GlobalTask(e, p, 0, completeBeforeShutdown), vbid(vbucket) {
-        std::stringstream ss;
-        ss<<"Persisting a vbucket state for vbucket: "<< vbid;
-        desc = ss.str();
-    }
+    enum class Priority {
+        HIGH,
+        LOW
+    };
 
     bool run();
 
     std::string getDescription() {
-        return desc;
+        return "Persisting a vbucket state for vbucket: " + std::to_string(vbid);
+    }
+
+protected:
+    VBStatePersistTask(EventuallyPersistentEngine *e,
+                       TaskId taskId,
+                       uint16_t vbucket,
+                       bool completeBeforeShutdown = true)
+        : GlobalTask(e, taskId, 0, completeBeforeShutdown),
+          vbid(vbucket) {
     }
 
 private:
     uint16_t vbid;
-    std::string desc;
+};
+
+class VBStatePersistTaskHigh : public VBStatePersistTask {
+public:
+    VBStatePersistTaskHigh(EventuallyPersistentEngine *e,
+                           uint16_t vbucket,
+                           bool completeBeforeShutdown = true)
+        : VBStatePersistTask(e, TaskId::VBStatePersistTaskHigh,
+                             vbucket, completeBeforeShutdown) {}
+};
+
+class VBStatePersistTaskLow : public VBStatePersistTask {
+public:
+    VBStatePersistTaskLow(EventuallyPersistentEngine *e,
+                          uint16_t vbucket,
+                          bool completeBeforeShutdown = true)
+        : VBStatePersistTask(e, TaskId::VBStatePersistTaskLow,
+                             vbucket, completeBeforeShutdown) {}
 };
 
 /**
@@ -278,9 +358,9 @@ private:
 class VBDeleteTask : public GlobalTask {
 public:
     VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c,
-                 const Priority &p, bool completeBeforeShutdown = true) :
-        GlobalTask(e, p, 0, completeBeforeShutdown),
-        vbucketId(vbid), cookie(c) { }
+                 bool completeBeforeShutdown = true)
+        : GlobalTask(e, TaskId::VBDeleteTask, 0, completeBeforeShutdown),
+          vbucketId(vbid), cookie(c) {}
 
     bool run();
 
@@ -300,10 +380,10 @@ private:
  */
 class CompactVBucketTask : public GlobalTask {
 public:
-    CompactVBucketTask(EventuallyPersistentEngine *e, const Priority &p,
-                uint16_t vbucket, compaction_ctx c, const void *ck,
+    CompactVBucketTask(EventuallyPersistentEngine *e, uint16_t vbucket,
+                       compaction_ctx c, const void *ck,
                 bool completeBeforeShutdown = false) :
-                GlobalTask(e, p, 0, completeBeforeShutdown),
+                GlobalTask(e, TaskId::CompactVBucketTask, 0, completeBeforeShutdown),
                            vbid(vbucket), compactCtx(c), cookie(ck)
     {
         std::stringstream ss;
@@ -332,11 +412,10 @@ private:
  */
 class StatSnap : public GlobalTask {
 public:
-    StatSnap(EventuallyPersistentEngine *e, const Priority &p,
-             bool runOneTimeOnly = false, bool sleeptime = 0,
-             bool completeBeforeShutdown = false) :
-        GlobalTask(e, p, sleeptime, completeBeforeShutdown),
-        runOnce(runOneTimeOnly) { }
+    StatSnap(EventuallyPersistentEngine *e, bool runOneTimeOnly = false,
+             bool sleeptime = 0, bool completeBeforeShutdown = false)
+        : GlobalTask(e, TaskId::StatSnap, sleeptime, completeBeforeShutdown),
+          runOnce(runOneTimeOnly) {}
 
     bool run();
 
@@ -351,14 +430,14 @@ private:
 
 /**
  * A task for fetching items from disk.
+ * This task is used if EventuallyPersistentStore::multiBGFetchEnabled is true.
  */
-class BgFetcherTask : public GlobalTask {
+class MultiBGFetcherTask : public GlobalTask {
 public:
-    BgFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b,
-                  const Priority &p, bool sleeptime = 0,
-                  bool completeBeforeShutdown = false) :
-        GlobalTask(e, p, sleeptime, completeBeforeShutdown),
-        bgfetcher(b) { }
+    MultiBGFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b, bool sleeptime = 0,
+                        bool completeBeforeShutdown = false)
+        : GlobalTask(e, TaskId::MultiBGFetcherTask, sleeptime, completeBeforeShutdown),
+          bgfetcher(b) {}
 
     bool run();
 
@@ -375,8 +454,8 @@ private:
  */
 class FlushAllTask : public GlobalTask {
 public:
-    FlushAllTask(EventuallyPersistentEngine *e, double when) :
-        GlobalTask(e, Priority::FlushAllPriority, when, false) { }
+    FlushAllTask(EventuallyPersistentEngine *e, double when)
+        : GlobalTask(e, TaskId::FlushAllTask, when, false) {}
 
     bool run();
 
@@ -393,11 +472,13 @@ public:
 class VKeyStatBGFetchTask : public GlobalTask {
 public:
     VKeyStatBGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
-                        uint16_t vbid, uint64_t s, const void *c,
-                        const Priority &p, int sleeptime = 0,
-                        bool completeBeforeShutdown = false) :
-        GlobalTask(e, p, sleeptime, completeBeforeShutdown), key(k),
-                   vbucket(vbid), bySeqNum(s), cookie(c) { }
+                        uint16_t vbid, uint64_t s, const void *c, int sleeptime = 0,
+                        bool completeBeforeShutdown = false)
+        : GlobalTask(e, TaskId::VKeyStatBGFetchTask, sleeptime, completeBeforeShutdown),
+          key(k),
+          vbucket(vbid),
+          bySeqNum(s),
+          cookie(c) {}
 
     bool run();
 
@@ -417,16 +498,19 @@ private:
 
 /**
  * A task that performs disk fetches for non-resident get requests.
+ * This task is used if EventuallyPersistentStore::multiBGFetchEnabled is false.
  */
-class BGFetchTask : public GlobalTask {
+class SingleBGFetcherTask : public GlobalTask {
 public:
-    BGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
-            uint16_t vbid, const void *c, bool isMeta,
-            const Priority &p, int sleeptime = 0,
-            bool completeBeforeShutdown = false) :
-        GlobalTask(e, p, sleeptime, completeBeforeShutdown),
-        key(k), vbucket(vbid), cookie(c), metaFetch(isMeta),
-        init(gethrtime()) { }
+    SingleBGFetcherTask(EventuallyPersistentEngine *e, const std::string &k,
+                       uint16_t vbid, const void *c, bool isMeta,
+                       int sleeptime = 0, bool completeBeforeShutdown = false)
+        : GlobalTask(e, TaskId::SingleBGFetcherTask, sleeptime, completeBeforeShutdown),
+          key(k),
+          vbucket(vbid),
+          cookie(c),
+          metaFetch(isMeta),
+          init(gethrtime()) {}
 
     bool run();
 
@@ -470,18 +554,20 @@ private:
 
 /**
  * Order tasks by their priority and taskId (try to ensure FIFO)
+ * @return true if t2 should have priority over t1
  */
 class CompareByPriority {
 public:
     bool operator()(ExTask &t1, ExTask &t2) {
-        return (t1->priority == t2->priority) ?
-               (t1->taskId   > t2->taskId)    :
-               (t1->priority < t2->priority);
+        return (t1->getQueuePriority() == t2->getQueuePriority()) ?
+               (t1->uid > t2->uid) :
+               (t1->getQueuePriority() > t2->getQueuePriority());
     }
 };
 
 /**
  * Order tasks by their ready date.
+ * @return true if t2 should have priority over t1
  */
 class CompareByDueDate {
 public:
index 2d373d1..dc49ea1 100644 (file)
@@ -421,8 +421,7 @@ void Warmup::stop(void)
 
 void Warmup::scheduleInitialize()
 {
-    ExTask task = new WarmupInitialize(*store, this,
-            Priority::WarmupPriority);
+    ExTask task = new WarmupInitialize(*store, this);
     taskId = task->getId();
     ExecutorPool::get()->schedule(task, READER_TASK_IDX);
 }
@@ -452,8 +451,7 @@ void Warmup::scheduleCreateVBuckets()
 {
     threadtask_count = 0;
     for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
-        ExTask task = new WarmupCreateVBuckets(*store, i, this,
-                                               Priority::WarmupPriority);
+        ExTask task = new WarmupCreateVBuckets(*store, i, this);
         taskId = task->getId();
         ExecutorPool::get()->schedule(task, READER_TASK_IDX);
     }
@@ -520,8 +518,7 @@ void Warmup::scheduleEstimateDatabaseItemCount()
     estimateTime = 0;
     estimatedItemCount = 0;
     for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
-        ExTask task = new WarmupEstimateDatabaseItemCount(
-                                *store, i, this, Priority::WarmupPriority);
+        ExTask task = new WarmupEstimateDatabaseItemCount(*store, i, this);
         taskId = task->getId();
         ExecutorPool::get()->schedule(task, READER_TASK_IDX);
     }
@@ -564,8 +561,7 @@ void Warmup::scheduleKeyDump()
 {
     threadtask_count = 0;
     for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
-        ExTask task = new WarmupKeyDump(*store, this,
-                                        i, Priority::WarmupPriority);
+        ExTask task = new WarmupKeyDump(*store, this, i);
         taskId = task->getId();
         ExecutorPool::get()->schedule(task, READER_TASK_IDX);
     }
@@ -619,8 +615,7 @@ void Warmup::keyDumpforShard(uint16_t shardId)
 
 void Warmup::scheduleCheckForAccessLog()
 {
-    ExTask task = new WarmupCheckforAccessLog(*store, this,
-            Priority::WarmupPriority);
+    ExTask task = new WarmupCheckforAccessLog(*store, this);
     taskId = task->getId();
     ExecutorPool::get()->schedule(task, READER_TASK_IDX);
 }
@@ -661,8 +656,7 @@ void Warmup::scheduleLoadingAccessLog()
 {
     threadtask_count = 0;
     for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
-        ExTask task = new WarmupLoadAccessLog(*store, this, i,
-                Priority::WarmupPriority);
+        ExTask task = new WarmupLoadAccessLog(*store, this, i);
         taskId = task->getId();
         ExecutorPool::get()->schedule(task, READER_TASK_IDX);
     }
@@ -774,8 +768,7 @@ void Warmup::scheduleLoadingKVPairs()
 
     threadtask_count = 0;
     for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
-        ExTask task = new WarmupLoadingKVPairs(*store, this,
-                                               i, Priority::WarmupPriority);
+        ExTask task = new WarmupLoadingKVPairs(*store, this, i);
         taskId = task->getId();
         ExecutorPool::get()->schedule(task, READER_TASK_IDX);
     }
@@ -824,8 +817,7 @@ void Warmup::scheduleLoadingData()
 
     threadtask_count = 0;
     for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
-        ExTask task = new WarmupLoadingData(*store, this,
-                                            i, Priority::WarmupPriority);
+        ExTask task = new WarmupLoadingData(*store, this, i);
         taskId = task->getId();
         ExecutorPool::get()->schedule(task, READER_TASK_IDX);
     }
@@ -862,8 +854,7 @@ void Warmup::loadDataforShard(uint16_t shardId)
 }
 
 void Warmup::scheduleCompletion() {
-    ExTask task = new WarmupCompletion(*store, this,
-                                       Priority::WarmupPriority);
+    ExTask task = new WarmupCompletion(*store, this);
     taskId = task->getId();
     ExecutorPool::get()->schedule(task, READER_TASK_IDX);
 }
index 1982f98..62728b6 100644 (file)
@@ -198,8 +198,9 @@ private:
 class WarmupInitialize : public GlobalTask {
 public:
     WarmupInitialize(EventuallyPersistentStore &st,
-                     Warmup *w, const Priority &p) :
-        GlobalTask(&st.getEPEngine(), p, 0, false), _warmup(w) { }
+                     Warmup *w) :
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupInitialize, 0, false),
+        _warmup(w) {}
 
     std::string getDescription() {
         std::stringstream ss;
@@ -219,9 +220,11 @@ private:
 
 class WarmupCreateVBuckets : public GlobalTask {
 public:
-    WarmupCreateVBuckets(EventuallyPersistentStore &st, uint16_t sh,
-                         Warmup *w, const Priority &p):
-        GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) {}
+    WarmupCreateVBuckets(EventuallyPersistentStore &st,
+                         uint16_t sh, Warmup *w):
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupCreateVBuckets, 0, false),
+        _shardId(sh),
+        _warmup(w) {}
 
     std::string getDescription() {
         std::stringstream ss;
@@ -242,8 +245,10 @@ private:
 class WarmupEstimateDatabaseItemCount : public GlobalTask {
 public:
     WarmupEstimateDatabaseItemCount(EventuallyPersistentStore &st,
-                                    uint16_t sh, Warmup *w, const Priority &p):
-        GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) {}
+                                    uint16_t sh, Warmup *w):
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupEstimateDatabaseItemCount, 0, false),
+        _shardId(sh),
+        _warmup(w) {}
 
     std::string getDescription() {
         std::stringstream ss;
@@ -264,9 +269,10 @@ private:
 
 class WarmupKeyDump : public GlobalTask {
 public:
-    WarmupKeyDump(EventuallyPersistentStore &st, Warmup* w,
-                  uint16_t sh, const Priority &p) :
-        GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) {}
+    WarmupKeyDump(EventuallyPersistentStore &st, Warmup *w, uint16_t sh) :
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupKeyDump, 0, false),
+        _shardId(sh),
+        _warmup(w) {}
 
     std::string getDescription() {
         std::stringstream ss;
@@ -275,7 +281,6 @@ public:
     }
 
     bool run() {
-
         _warmup->keyDumpforShard(_shardId);
         return false;
     }
@@ -288,8 +293,9 @@ private:
 class WarmupCheckforAccessLog : public GlobalTask {
 public:
     WarmupCheckforAccessLog(EventuallyPersistentStore &st,
-                            Warmup *w, const Priority &p) :
-        GlobalTask(&st.getEPEngine(), p, 0, false), _warmup(w) { }
+                            Warmup *w) :
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupCheckforAccessLog, 0, false),
+        _warmup(w) {}
 
     std::string getDescription() {
         std::stringstream ss;
@@ -309,9 +315,9 @@ private:
 
 class WarmupLoadAccessLog : public GlobalTask {
 public:
-    WarmupLoadAccessLog(EventuallyPersistentStore &st,
-                        Warmup *w, uint16_t sh, const Priority &p) :
-        GlobalTask(&st.getEPEngine(), p, 0, false), _warmup(w), _shardId(sh) { }
+    WarmupLoadAccessLog(EventuallyPersistentStore &st, Warmup *w, uint16_t sh) :
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupLoadAccessLog, 0, false),
+        _warmup(w), _shardId(sh) {}
 
     std::string getDescription() {
         std::stringstream ss;
@@ -332,9 +338,10 @@ private:
 
 class WarmupLoadingKVPairs : public GlobalTask {
 public:
-    WarmupLoadingKVPairs(EventuallyPersistentStore &st, Warmup* w,
-                         uint16_t sh, const Priority &p) :
-        GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) { }
+    WarmupLoadingKVPairs(EventuallyPersistentStore &st, Warmup *w, uint16_t sh) :
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupLoadingKVPairs, 0, false),
+        _shardId(sh),
+        _warmup(w) {}
 
     std::string getDescription() {
         std::stringstream ss;
@@ -355,9 +362,10 @@ private:
 
 class WarmupLoadingData : public GlobalTask {
 public:
-    WarmupLoadingData(EventuallyPersistentStore &st, Warmup* w,
-                      uint16_t sh, const Priority &p) :
-        GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) {}
+    WarmupLoadingData(EventuallyPersistentStore &st, Warmup *w, uint16_t sh) :
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupLoadingData, 0, false),
+        _shardId(sh),
+        _warmup(w) {}
 
     std::string getDescription() {
         std::stringstream ss;
@@ -379,8 +387,9 @@ private:
 class WarmupCompletion : public GlobalTask {
 public:
     WarmupCompletion(EventuallyPersistentStore &st,
-                     Warmup *w, const Priority &p) :
-        GlobalTask(&st.getEPEngine(), p, 0, false), _warmup(w) { }
+                     Warmup *w) :
+        GlobalTask(&st.getEPEngine(), TaskId::WarmupCompletion, 0, false),
+        _warmup(w) {}
 
     std::string getDescription() {
         std::stringstream ss;
index 31055cf..314d7f2 100644 (file)
@@ -14842,7 +14842,7 @@ engine_test_t* get_tests(void) {
         TestCase("test observe seqno error", test_observe_seqno_error,
                  test_setup, teardown, NULL, prepare, cleanup),
         TestCase("test item pager", test_item_pager, test_setup,
-                 teardown, "max_size=2048000", prepare, cleanup),
+                 teardown, "max_size=2621440", prepare, cleanup),
         TestCase("warmup conf", test_warmup_conf, test_setup,
                  teardown, NULL, prepare, cleanup),
         TestCase("bloomfilter conf", test_bloomfilter_conf, test_setup,
diff --git a/tests/module_tests/priority_test.cc b/tests/module_tests/priority_test.cc
deleted file mode 100644 (file)
index 0170ca2..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- *     Copyright 2010 Couchbase, Inc
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-#include "config.h"
-
-#include "priority.h"
-#undef NDEBUG
-
-int main(int argc, char **argv) {
-   (void)argc; (void)argv;
-
-   cb_assert(Priority::BgFetcherPriority > Priority::TapBgFetcherPriority);
-   cb_assert(Priority::TapBgFetcherPriority == Priority::VBucketDeletionPriority);
-   cb_assert(Priority::VBucketPersistHighPriority > Priority::VKeyStatBgFetcherPriority);
-   cb_assert(Priority::VKeyStatBgFetcherPriority > Priority::FlusherPriority);
-   cb_assert(Priority::FlusherPriority > Priority::ItemPagerPriority);
-   cb_assert(Priority::ItemPagerPriority > Priority::BackfillTaskPriority);
-
-   return 0;
-}