src/failover-table.cc src/flusher.cc src/htresizer.cc
src/item.cc src/item_pager.cc src/kvshard.cc
src/memory_tracker.cc src/murmurhash3.cc
- src/mutex.cc src/priority.cc
+ src/mutex.cc
src/executorthread.cc
src/sizes.cc
${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
src/mutation_log.cc
src/mutex.cc
src/objectregistry.cc
- src/priority.cc
src/tapconnection.cc
src/stored-value.cc
src/tapthrottle.cc
tests/module_tests/mutex_test.cc src/testlogger.cc src/mutex.cc)
TARGET_LINK_LIBRARIES(ep-engine_mutex_test platform)
-ADD_EXECUTABLE(ep-engine_priority_test tests/module_tests/priority_test.cc src/priority.cc)
-TARGET_LINK_LIBRARIES(ep-engine_priority_test platform)
-
ADD_EXECUTABLE(ep-engine_ringbuffer_test tests/module_tests/ringbuffer_test.cc)
TARGET_LINK_LIBRARIES(ep-engine_ringbuffer_test platform)
src/mutation_log.cc
src/mutex.cc
src/objectregistry.cc
- src/priority.cc
src/tapconnection.cc
src/stored-value.cc
src/tapthrottle.cc
ADD_TEST(ep-engine_hrtime_test ep-engine_hrtime_test)
ADD_TEST(ep-engine_misc_test ep-engine_misc_test)
ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
-ADD_TEST(ep-engine_priority_test ep-engine_priority_test)
ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
ADD_TEST(ep-engine_stream_test ep-engine_stream_test)
ADD_TEST(ep-engine_kvstore_test ep-engine_kvstore_test)
--- /dev/null
+# Eventually Persistent Engine
+## Threads
+Code in ep-engine is executing in a multithreaded environment, two classes of
+thread exist.
+
+1. memcached's threads, for servicing a client and calling in via the
+[engine API] (https://github.com/couchbase/memcached/blob/master/include/memcached/engine.h)
+2. ep-engine's threads, for running tasks such as the document expiry pager
+(see subclasses of `GlobalTasks`).
+
+## Synchronisation Primitives
+
+There are three mutual-exclusion primitives available in ep-engine.
+
+1. `Mutex` exclusive lock - [mutex.h](./src/mutex.h)
+2. `RWLock` shared, reader/writer lock - [rwlock.h](./src/rwlock.h)
+3. `SpinLock` 1-byte exclusive lock - [atomix.h](./src/atomic.h)
+
+A conditional-variable is also available called `SyncObject`
+[syncobject.h](./src/syncobject.h). `SyncObject` glues a `Mutex` and
+conditional-variable together in one object.
+
+These primitives are managed via RAII wrappers - [locks.h](./src/locks.h).
+
+1. `LockHolder` - for acquiring a `Mutex` or `SyncObject`.
+2. `MultiLockHolder` - for acquiring an array of `Mutex` or `SyncObject`.
+3. `WriterLockHolder` - for acquiring write access to a `RWLock`.
+4. `ReaderLockHolder` - for acquiring read access to a `RWLock`.
+5. `SpinLockHolder` - for acquiring a `SpinLock`.
+
+## Mutex
+The general style is to create a `LockHolder` when you need to acquire a
+`Mutex`, the constructor will acquire and when the `LockHolder` goes out of
+scope, the destructor will release the `Mutex`. For certain use-cases the
+caller can explicitly lock/unlock a `Mutex` via the `LockHolder` class.
+
+```c++
+Mutex mutex;
+void example1() {
+ LockHolder lockHolder(&mutex);
+ ...
+ return;
+}
+
+void example2() {
+ LockHolder lockHolder(&mutex);
+ ...
+ lockHolder.unlock();
+ ...
+ lockHolder.lock();
+ ...
+ return;
+}
+```
+
+A `MultiLockHolder` allows an array of locks to be conveniently acquired and
+released, and similarly to `LockHolder` the caller can choose to manually
+lock/unlock at any time (with all locks locked/unlocked via one call).
+
+```c++
+Mutex mutexes[10];
+Object objects[10];
+void foo() {
+ MultiLockHolder lockHolder(&mutexes, 10);
+ for (int ii = 0; ii < 10; ii++) {
+ objects[ii].doStuff();
+ }
+ return;
+}
+```
+
+## RWLock
+
+`RWLock` allows many readers to acquire it and exclusive access for a writer.
+`ReadLockHolder` acquires the lock for a reader and `WriteLockHolder` acquires
+the lock for a writer. Neither classes enable manual lock/unlock, all
+acquisitions and release are performed via the constructor and destructor.
+
+```c++
+RWLock rwLock;
+Object thing;
+
+void foo1() {
+ ReaderLockHolder rlh(&rwLock);
+ if (thing.getData()) {
+ ...
+ }
+}
+
+void foo2() {
+ WriterLockHolder wlh(&rwLock);
+ thing.setData(...);
+}
+```
+
+## SyncObject
+
+`SyncObject` inherits from `Mutex` and is thus managed via a `LockHolder` or
+`MultiLockHolder`. The `SyncObject` provides the conditional-variable
+synchronisation primitive enabling threads to block and be woken.
+
+The wait/wakeOne/wake method is provided by the `SyncObject`.
+
+Note that `wake` will wake up a single blocking thread, `wakeOne` will wake up
+every thread that is blocking on the `SyncObject`.
+
+```c++
+SyncObject syncObject;
+bool sleeping = false;
+void foo1() {
+ LockHolder lockHolder(&syncObject);
+ sleeping = true;
+ syncObject.wait(); // the mutex is released and the thread put to sleep
+ // when wait returns the mutex is reacquired
+ sleeping = false;
+}
+
+void foo2() {
+ LockHolder lockHolder(&syncObject);
+ if (sleeping) {
+ syncObject.notifyOne();
+ }
+}
+```
+
+## SpinLock
+
+A `SpinLock` uses a single byte for the lock and our own code to spin until the
+lock is acquired. The intention for this lock is for low contention locks.
+
+The RAII pattern is just like for a Mutex.
+
+
+```c++
+SpinLock spinLock;
+void example1() {
+ SpinLockHolder lockHolder(&spinLock);
+ ...
+ return;
+}
+```
+
+## _UNLOCKED convention
+
+ep-engine has a function naming convention that indicates the function should
+be called with a lock acquired.
+
+For example the following `doStuff_UNLOCKED` method indicates that it expect a
+lock to be held before the function is called. What lock should be acquired
+before calling is not defined by the convention.
+
+```c++
+void Object::doStuff_UNLOCKED() {
+}
+
+void Object::run() {
+ LockHolder lockHolder(&mutex);
+ doStuff_UNLOCKED();
+ return;
+}
+```
+## Thread Local Storage (ObjectRegistry).
+
+Threads in ep-engine are servicing buckets and when a thread is dispatched to
+serve a bucket, the pointer to the `EventuallyPersistentEngine` representing
+the bucket is placed into thread local storage, this avoids the need for the
+pointer to be passed along the chain of execution as a formal parameter.
+
+Both threads servicing frontend operations (memcached's threads) and ep-engine's
+own task threads will save the bucket's engine pointer before calling down into
+engine code.
+
+Calling `ObjectRegistry::onSwitchThread(enginePtr)` will save the `enginePtr`
+in thread-local-storage so that subsequent task code can retrieve the pointer
+with `ObjectRegistry::getCurrentEngine()`.
+
+## Tasks
+
+A task is created by creating a sub-class (the `run()` method is the entry point
+of the task) of the `GlobalTask` class and it is scheduled onto one of 4 task
+queue types. Each task should be declared in `src/tasks.defs.h` using the TASK
+macro. Using this macro ensures correct generation of a task-type ID, priority,
+task name and ultimately ensures each task gets its own scheduling statistics.
+
+The recipe is simple.
+
+### Add your task's class name with its priority into `src/tasks.defs.h`
+ * A lower value priority is 'higher'.
+```
+TASK(MyNewTask, 1) // MyNewTask has priority 1.
+```
+
+### Create your class and set its ID using `MY_TASK_ID`.
+
+```
+class MyNewTask : public GlobalTask {
+public:
+ MyNewTask(EventuallyPersistentEngine* e)
+ : GlobalTask(e/*engine/,
+ MY_TASK_ID(MyNewTask),
+ 0.0/*snooze*/){}
+...
+```
+
+### Define pure-virtual methods in `MyNewTask`
+* run method
+
+The run method is invoked when the task is executed. The method should return
+true if it should be scheduled again. If false is returned, the instance of the
+task is never re-scheduled and will deleted once all references to the instance are
+gone.
+
+```
+bool run() {
+ // Task code here
+ return schedule again?;
+}
+```
+
+* Define the `getDescription` method to aid debugging and statistics.
+```
+std::string getDescription() {
+ return "A brief description of what MyNewTask does";
+}
+```
+
+### Schedule your task to the desired queue.
+```
+ExTask myNewTask = new MyNewTask(&engine);
+myNewTaskId = ExecutorPool::get()->schedule(myNewTask, NONIO_TASK_IDX);
+```
+
+The 4 task queue types are:
+* Readers - `READER_TASK_IDX`
+ * Tasks that should primarily only read from 'disk'. They generally read from
+the vbucket database files, for example background fetch of a non-resident document.
+* Writers (they are allowed to read too) `WRITER_TASK_IDX`
+ * Tasks that should primarily only write to 'disk'. They generally write to
+the vbucket database files, for example when flushing the write queue.
+* Auxilliary IO `AUXIO_TASK_IDX`
+ * Tasks that read and write 'disk', but not necessarily the vbucket data files.
+* Non IO `NONIO_TASK_IDX`
+ * Tasks that do not perform 'disk' I/O.
+
+### Utilise `snooze`
+
+The snooze value of the task sets when the task should be executed. The initial snooze
+value is set when constructing `GlobalTask`. A value of 0.0 means attempt to execute
+the task as soon as scheduled and 5.0 would be 5 seconds from being scheduled
+(scheduled meaning when `ExecutorPool::get()->schedule(...)` is called).
+
+The `run()` function can also call `snooze(double snoozeAmount)` to set how long
+before the task is rescheduled.
+
+It is **best practice** for most tasks to actually do a sleep forever from their run function:
+
+```
+ snooze(INT_MAX);
+```
+
+Using `INT_MAX` means sleep forever and tasks should always sleep until they have
+real work todo. Tasks **should not periodically poll for work** with a snooze of
+n seconds.
+
+### Utilise `wake()`
+When a task has work todo, some other function should be waking the task using the wake method.
+
+```
+ExecutorPool::get()->wake(myNewTaskId)`
+```
friend class AccessScannerValueChangeListener;
public:
AccessScanner(EventuallyPersistentStore &_store, EPStats &st,
- const Priority &p, double sleeptime = 0,
+ double sleeptime = 0,
bool completeBeforeShutdown = false)
- : GlobalTask(&_store.getEPEngine(), p, sleeptime,
+ : GlobalTask(&_store.getEPEngine(), TaskId::AccessScanner, sleeptime,
completeBeforeShutdown),
completedCount(0), store(_store), stats(st), sleepTime(sleeptime),
available(true) { }
"Schedule a full backfill from disk for vbucket %d.", vb->getId());
ExTask task = new BackfillDiskLoad(name, engine, connMap,
underlying, vb->getId(), 0, connToken,
- Priority::TapBgFetcherPriority,
0, false);
ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
}
bool BackfillTask::run(void) {
engine->getEpStore()->visit(bfv, "Backfill task", NONIO_TASK_IDX,
- Priority::BackfillTaskPriority, 1);
+ TaskId::BackfillVisitorTask, 1);
return false;
}
BackfillDiskLoad(const std::string &n, EventuallyPersistentEngine* e,
TapConnMap &cm, KVStore *s, uint16_t vbid,
- uint64_t start_seqno, hrtime_t token, const Priority &p,
+ uint64_t start_seqno, hrtime_t token,
double sleeptime = 0, bool shutdown = false)
- : GlobalTask(e, p, sleeptime, shutdown),
+ : GlobalTask(e, TaskId::BackfillDiskLoad, sleeptime, shutdown),
name(n), engine(e), connMap(cm), store(s), vbucket(vbid),
startSeqno(start_seqno), connToken(token) {
ScheduleDiskBackfillTapOperation tapop;
BackfillTask(EventuallyPersistentEngine *e, TapConnMap &cm, Producer *tc,
const VBucketFilter &backfillVBFilter):
- GlobalTask(e, Priority::BackfillTaskPriority, 0, false),
+ GlobalTask(e, TaskId::BackfillTask, 0, false),
bfv(new BackFillVisitor(e, cm, tc, backfillVBFilter)), engine(e) {}
virtual ~BackfillTask() {}
bool inverse = false;
pendingFetch.compare_exchange_strong(inverse, true);
ExecutorPool* iom = ExecutorPool::get();
- ExTask task = new BgFetcherTask(&(store->getEPEngine()), this,
- Priority::BgFetcherPriority, false);
+ ExTask task = new MultiBGFetcherTask(&(store->getEPEngine()), this, false);
this->setTaskId(task->getId());
iom->schedule(task, READER_TASK_IDX);
cb_assert(taskId > 0);
shared_ptr<CheckpointVisitor> pv(new CheckpointVisitor(store, stats,
available));
store->visit(pv, "Checkpoint Remover", NONIO_TASK_IDX,
- Priority::CheckpointRemoverPriority);
+ TaskId::ClosedUnrefCheckpointRemoverVisitorTask);
}
snooze(sleepTime);
return true;
*/
ClosedUnrefCheckpointRemoverTask(EventuallyPersistentEngine *e,
EPStats &st, size_t interval) :
- GlobalTask(e, Priority::CheckpointRemoverPriority, interval, false),
+ GlobalTask(e, TaskId::ClosedUnrefCheckpointRemoverTask, interval, false),
engine(e), stats(st), sleepTime(interval), available(true) {}
bool run(void);
public:
ConnectionReaperCallback(EventuallyPersistentEngine &e, ConnMap& cm,
connection_t &conn)
- : GlobalTask(&e, Priority::TapConnectionReaperPriority),
+ : GlobalTask(&e, TaskId::ConnectionReaperCallback),
connMap(cm), connection(conn) {
- std::stringstream ss;
- ss << "Reaping tap or dcp connection: " << connection->getName();
- descr = ss.str();
}
bool run(void) {
}
std::string getDescription() {
- return descr;
+ return "Reaping tap or dcp connection: " + connection->getName();
}
private:
ConnMap &connMap;
connection_t connection;
- std::string descr;
};
/**
class ConnNotifierCallback : public GlobalTask {
public:
ConnNotifierCallback(EventuallyPersistentEngine *e, ConnNotifier *notifier)
- : GlobalTask(e, Priority::TapConnNotificationPriority),
+ : GlobalTask(e, TaskId::ConnNotifierCallback),
connNotifier(notifier) { }
bool run(void) {
class ConnManager : public GlobalTask {
public:
ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
- : GlobalTask(e, Priority::TapConnMgrPriority, MIN_SLEEP_TIME, false),
+ : GlobalTask(e, TaskId::ConnManager, MIN_SLEEP_TIME, false),
engine(e), connmap(cmap) { }
bool run(void) {
class BackfillManagerTask : public GlobalTask {
public:
- BackfillManagerTask(EventuallyPersistentEngine* e, BackfillManager* mgr,
- const Priority &p, double sleeptime = 0,
+ BackfillManagerTask(EventuallyPersistentEngine* e,
+ BackfillManager* mgr,
+ double sleeptime = 0,
bool shutdown = false)
- : GlobalTask(e, p, sleeptime, shutdown), manager(mgr) {}
+ : GlobalTask(e, TaskId::BackfillManagerTask, sleeptime, shutdown),
+ manager(mgr) {}
bool run();
return;
}
- managerTask.reset(new BackfillManagerTask(engine, this,
- Priority::BackfillTaskPriority));
+ managerTask.reset(new BackfillManagerTask(engine, this));
ExecutorPool::get()->schedule(managerTask, AUXIO_TASK_IDX);
}
#include "dcp-response.h"
#include "dcp-stream.h"
-class Processer : public GlobalTask {
+class Processor : public GlobalTask {
public:
- Processer(EventuallyPersistentEngine* e, connection_t c,
- const Priority &p, double sleeptime = 1, bool shutdown = false)
- : GlobalTask(e, p, sleeptime, shutdown), conn(c) {}
+ Processor(EventuallyPersistentEngine* e, connection_t c,
+ double sleeptime = 1, bool shutdown = false)
+ : GlobalTask(e, TaskId::Processor, sleeptime, shutdown), conn(c) {}
- bool run();
-
- std::string getDescription();
+ bool run() {
+ DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
+ if (consumer->doDisconnect()) {
+ return false;
+ }
-private:
- connection_t conn;
-};
+ switch (consumer->processBufferedItems()) {
+ case all_processed:
+ snooze(1);
+ break;
+ case more_to_process:
+ snooze(0);
+ break;
+ case cannot_process:
+ snooze(5);
+ break;
+ default:
+ abort();
+ }
-bool Processer::run() {
- DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
- if (consumer->doDisconnect()) {
- return false;
+ return true;
}
- switch (consumer->processBufferedItems()) {
- case all_processed:
- snooze(1);
- break;
- case more_to_process:
- snooze(0);
- break;
- case cannot_process:
- snooze(5);
- break;
- default:
- abort();
+ std::string getDescription() {
+ std::stringstream ss;
+ ss << "Processing buffered items for " << conn->getName();
+ return ss.str();
}
- return true;
-}
-
-std::string Processer::getDescription() {
- std::stringstream ss;
- ss << "Processing buffered items for " << conn->getName();
- return ss.str();
-}
+private:
+ connection_t conn;
+};
DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
const std::string &name)
setPriority = true;
enableExtMetaData = true;
- ExTask task = new Processer(&engine, this, Priority::PendingOpsPriority, 1);
+ ExTask task = new Processor(&engine, this, 1);
processTaskId = ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
}
"to rollback seq no. %llu", logHeader(), vbid, rollbackSeqno);
ExTask task = new RollbackTask(&engine_, opaque, vbid,
- rollbackSeqno, this,
- Priority::TapBgFetcherPriority);
+ rollbackSeqno, this);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
return ENGINE_SUCCESS;
}
RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
vb->failovers->replaceFailoverLog(body, bodylen);
EventuallyPersistentStore* st = engine_.getEpStore();
- st->scheduleVBSnapshot(Priority::VBucketPersistHighPriority,
+ st->scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH,
st->getVBuckets().getShard(vbucket)->getId());
}
LOG(EXTENSION_LOG_INFO, "%s (vb %d) Add stream for opaque %ld"
public:
RollbackTask(EventuallyPersistentEngine* e,
uint32_t opaque_, uint16_t vbid_,
- uint64_t rollbackSeqno_, dcp_consumer_t conn,
- const Priority &p):
- GlobalTask(e, p, 0, false), engine(e),
+ uint64_t rollbackSeqno_, dcp_consumer_t conn):
+ GlobalTask(e, TaskId::RollbackTask, 0, false), engine(e),
opaque(opaque_), vbid(vbid_), rollbackSeqno(rollbackSeqno_),
cons(conn) { }
class ActiveStreamCheckpointProcessorTask : public GlobalTask {
public:
ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
- : GlobalTask(&e, Priority::ActiveStreamCheckpointProcessor, INT_MAX, false),
+ : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
+ INT_MAX, false),
notified(false),
iterationsBeforeYield(e.getConfiguration()
.getDcpProducerSnapshotMarkerYieldLimit()) { }
DefragmenterTask::DefragmenterTask(EventuallyPersistentEngine* e,
EPStats& stats_)
- : GlobalTask(e, Priority::DefragmenterTaskPriority, false),
+ : GlobalTask(e, TaskId::DefragmenterTask, false),
stats(stats_),
epstore_position(engine->getEpStore()->startPosition()),
visitor(NULL) {
}
void DefragmenterTask::stop(void) {
- if (taskId) {
- ExecutorPool::get()->cancel(taskId);
+ if (uid) {
+ ExecutorPool::get()->cancel(uid);
}
}
VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
RCPtr<VBucket> &vb, double delay) :
GlobalTask(&eng,
- Priority::VBMemoryDeletionPriority, delay, true),
+ TaskId::VBucketMemoryDeletionTask, delay, true),
e(eng), vbucket(vb), vbid(vb->getId()) { }
std::string getDescription() {
class PendingOpsNotification : public GlobalTask {
public:
PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
- GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
+ GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
engine(e), vbucket(vb) { }
std::string getDescription() {
storageProperties = new StorageProperties(true, true, true, true);
- stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
- stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
+ stats.schedulingHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
+ stats.taskRuntimeHisto = new Histogram<hrtime_t>[GlobalTask::allTaskIds.size()];
- for (size_t i = 0; i < MAX_TYPE_ID; i++) {
+ for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
stats.schedulingHisto[i].reset();
stats.taskRuntimeHisto[i].reset();
}
EventuallyPersistentStore *epstore;
};
-void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
+void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
uint16_t shardId) {
class VBucketStateVisitor : public VBucketVisitor {
};
KVShard *shard = vbMap.shards[shardId];
- if (priority == Priority::VBucketPersistLowPriority) {
+ if (prio == VBSnapshotTask::Priority::LOW) {
shard->setLowPriorityVbSnapshotFlag(false);
} else {
shard->setHighPriorityVbSnapshotFlag(false);
break;
}
- if (priority == Priority::VBucketPersistHighPriority) {
+ if (prio == VBSnapshotTask::Priority::HIGH) {
if (vbMap.setBucketCreation(iter->first, false)) {
LOG(EXTENSION_LOG_INFO, "VBucket %d created", iter->first);
}
}
if (!success) {
- scheduleVBSnapshot(priority, shard->getId());
+ scheduleVBSnapshot(prio, shard->getId());
} else {
stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
}
}
-bool EventuallyPersistentStore::persistVBState(const Priority &priority,
- uint16_t vbid) {
+bool EventuallyPersistentStore::persistVBState(uint16_t vbid) {
schedule_vbstate_persist[vbid] = false;
RCPtr<VBucket> vb = getVBucket(vbid);
ExTask notifyTask = new PendingOpsNotification(engine, vb);
ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
}
- scheduleVBStatePersist(Priority::VBucketPersistLowPriority, vbid);
+ scheduleVBStatePersist(VBStatePersistTask::Priority::LOW, vbid);
} else {
FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
KVShard* shard = vbMap.getShard(vbid);
vbMap.setPersistenceSeqno(vbid, 0);
vbMap.setBucketCreation(vbid, true);
lh.unlock();
- scheduleVBStatePersist(Priority::VBucketPersistHighPriority, vbid);
+ scheduleVBStatePersist(VBStatePersistTask::Priority::HIGH, vbid);
}
return ENGINE_SUCCESS;
}
-bool EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
+bool EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio) {
KVShard *shard = NULL;
- if (p == Priority::VBucketPersistHighPriority) {
+ if (prio == VBSnapshotTask::Priority::HIGH) {
for (size_t i = 0; i < vbMap.numShards; ++i) {
shard = vbMap.shards[i];
if (shard->setHighPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTask(&engine, p, i, true);
+ ExTask task = new VBSnapshotTaskHigh(&engine, i, true);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
}
for (size_t i = 0; i < vbMap.numShards; ++i) {
shard = vbMap.shards[i];
if (shard->setLowPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTask(&engine, p, i, true);
+ ExTask task = new VBSnapshotTaskLow(&engine, i, true);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
}
return true;
}
-void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
+void EventuallyPersistentStore::scheduleVBSnapshot(VBSnapshotTask::Priority prio,
uint16_t shardId,
bool force) {
KVShard *shard = vbMap.shards[shardId];
- if (p == Priority::VBucketPersistHighPriority) {
+ if (prio == VBSnapshotTask::Priority::HIGH) {
if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTask(&engine, p, shardId, true);
+ ExTask task = new VBSnapshotTaskHigh(&engine, shardId, true);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
} else {
if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
- ExTask task = new VBSnapshotTask(&engine, p, shardId, true);
+ ExTask task = new VBSnapshotTaskLow(&engine, shardId, true);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
}
}
-void EventuallyPersistentStore::scheduleVBStatePersist(const Priority &priority,
+void EventuallyPersistentStore::scheduleVBStatePersist(VBStatePersistTask::Priority priority,
uint16_t vbid,
bool force) {
bool inverse = false;
if (force ||
schedule_vbstate_persist[vbid].compare_exchange_strong(inverse, true)) {
- ExTask task = new VBStatePersistTask(&engine, priority, vbid, true);
- ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
+ if (priority == VBStatePersistTask::Priority::HIGH) {
+ ExecutorPool::get()->schedule(new VBStatePersistTaskHigh(&engine, vbid, true), WRITER_TASK_IDX);
+ } else {
+ ExecutorPool::get()->schedule(new VBStatePersistTaskLow(&engine, vbid, true), WRITER_TASK_IDX);
+ }
}
}
ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
if (vbMap.setBucketDeletion(vb->getId(), true)) {
- ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie,
- Priority::VBucketDeletionPriority);
+ ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
}
}
}
LockHolder lh(compactionLock);
- ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
- vbid, c, cookie);
+ ExTask task = new CompactVBucketTask(&engine, vbid, c, cookie);
compactionTasks.push_back(std::make_pair(vbid, task));
if (compactionTasks.size() > 1) {
if ((stats.diskQueueSize > compactionWriteQueueCap &&
stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
bgFetchQueue.load());
ExecutorPool* iom = ExecutorPool::get();
- ExTask task = new BGFetchTask(&engine, key, vbucket, cookie,
- isMeta,
- Priority::BgFetcherGetMetaPriority,
- bgFetchDelay, false);
+ ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
+ isMeta, bgFetchDelay, false);
iom->schedule(task, READER_TASK_IDX);
ss << "Queued a background fetch, now at " << bgFetchQueue.load()
<< std::endl;
ExecutorPool* iom = ExecutorPool::get();
ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
v->getBySeqno(), cookie,
- Priority::VKeyStatBgFetcherPriority,
bgFetchDelay, false);
iom->schedule(task, READER_TASK_IDX);
return ENGINE_EWOULDBLOCK;
ExecutorPool* iom = ExecutorPool::get();
ExTask task = new VKeyStatBGFetchTask(&engine, key,
vbucket, -1, cookie,
- Priority::VKeyStatBgFetcherPriority,
bgFetchDelay, false);
iom->schedule(task, READER_TASK_IDX);
}
void EventuallyPersistentStore::warmupCompleted() {
// Run the vbucket state snapshot job once after the warmup
- scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
+ scheduleVBSnapshot(VBSnapshotTask::Priority::HIGH);
if (engine.getConfiguration().getAlogPath().length() > 0) {
// right after warmup. Subsequent snapshot tasks will be scheduled every
// 60 sec by default.
ExecutorPool *iom = ExecutorPool::get();
- ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
+ ExTask task = new StatSnap(&engine, 0, false);
statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
}
accessScanner.sleeptime = alogSleepTime * 60;
if (accessScanner.sleeptime != 0) {
ExTask task = new AccessScanner(*this, stats,
- Priority::AccessScannerPriority,
accessScanner.sleeptime);
accessScanner.task = ExecutorPool::get()->schedule(task,
AUXIO_TASK_IDX);
accessScanner.sleeptime = val * 60;
if (accessScanner.sleeptime != 0) {
ExTask task = new AccessScanner(*this, stats,
- Priority::AccessScannerPriority,
accessScanner.sleeptime);
accessScanner.task = ExecutorPool::get()->schedule(task,
AUXIO_TASK_IDX);
ExecutorPool::get()->cancel(accessScanner.task);
// re-schedule task according to the new task start hour
ExTask task = new AccessScanner(*this, stats,
- Priority::AccessScannerPriority,
accessScanner.sleeptime);
accessScanner.task = ExecutorPool::get()->schedule(task,
AUXIO_TASK_IDX);
return EventuallyPersistentStore::Position(vbMap.getSize());
}
-VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
+VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s, TaskId id,
shared_ptr<VBucketVisitor> v,
- const char *l, const Priority &p, double sleep) :
- GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
+ const char *l, double sleep) :
+ GlobalTask(&s->getEPEngine(), id, 0, false), store(s),
visitor(v), label(l), sleepTime(sleep), currentvb(0)
{
const VBucketFilter &vbFilter = visitor->getVBucketFilter();
VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
shared_ptr<VBucketVisitor> v,
uint16_t sh, const char *l,
- double sleep, bool shutdown):
- GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
- 0, shutdown),
- store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
- shardID(sh)
-{
+ double sleep, bool shutdown)
+ : GlobalTask(&(s->getEPEngine()), TaskId::VBucketVisitorTask, 0, shutdown),
+ store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
+ shardID(sh) {
const VBucketFilter &vbFilter = visitor->getVBucketFilter();
std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
shard->getROUnderlying()->resetStats();
}
- for (size_t i = 0; i < MAX_TYPE_ID; i++) {
+ for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
stats.schedulingHisto[i].reset();
stats.taskRuntimeHisto[i].reset();
}
class VBCBAdaptor : public GlobalTask {
public:
- VBCBAdaptor(EventuallyPersistentStore *s,
- shared_ptr<VBucketVisitor> v, const char *l, const Priority &p,
+ VBCBAdaptor(EventuallyPersistentStore *s, TaskId id,
+ shared_ptr<VBucketVisitor> v, const char *l,
double sleep=0);
std::string getDescription() {
return vbMap.getPersistenceSeqno(vb);
}
- void snapshotVBuckets(const Priority &priority, uint16_t shardId);
+ void snapshotVBuckets(VBSnapshotTask::Priority prio, uint16_t shardId);
/* transfer should be set to true *only* if this vbucket is becoming master
* as the result of the previous master cleanly handing off contorol. */
* Note that this is asynchronous.
*/
size_t visit(shared_ptr<VBucketVisitor> visitor, const char *lbl,
- task_type_t taskGroup, const Priority &prio,
+ task_type_t taskGroup, TaskId id,
double sleepTime=0) {
- return ExecutorPool::get()->schedule(new VBCBAdaptor(this, visitor,
- lbl, prio, sleepTime), taskGroup);
+ return ExecutorPool::get()->schedule(new VBCBAdaptor(this, id, visitor,
+ lbl, sleepTime), taskGroup);
}
/**
/**
* schedule a vb_state snapshot task for all the shards.
*/
- bool scheduleVBSnapshot(const Priority &priority);
+ bool scheduleVBSnapshot(VBSnapshotTask::Priority prio);
/**
* schedule a vb_state snapshot task for a given shard.
*/
- void scheduleVBSnapshot(const Priority &priority, uint16_t shardId,
+ void scheduleVBSnapshot(VBSnapshotTask::Priority prio, uint16_t shardId,
bool force = false);
/**
* Schedule a vbstate persistence task for a given vbucket.
*/
- void scheduleVBStatePersist(const Priority &priority, uint16_t vbid,
+ void scheduleVBStatePersist(VBStatePersistTask::Priority prio, uint16_t vbid,
bool force = false);
/**
* Persist a vbucket's state.
*/
- bool persistVBState(const Priority &priority, uint16_t vbid);
+ bool persistVBState(uint16_t vbid);
const VBucketMap &getVBuckets() {
return vbMap;
++vb->numExpiredItems;
}
- void logQTime(type_id_t taskType, hrtime_t enqTime) {
- stats.schedulingHisto[taskType].add(enqTime);
+ void logQTime(TaskId taskType, hrtime_t enqTime) {
+ stats.schedulingHisto[static_cast<int>(taskType)].add(enqTime);
}
- void logRunTime(type_id_t taskType, hrtime_t runTime) {
- stats.taskRuntimeHisto[taskType].add(runTime);
+ void logRunTime(TaskId taskType, hrtime_t runTime) {
+ stats.taskRuntimeHisto[static_cast<int>(taskType)].add(runTime);
}
bool multiBGFetchEnabled() {
class StatCheckpointTask : public GlobalTask {
public:
StatCheckpointTask(EventuallyPersistentEngine *e, const void *c,
- ADD_STAT a) : GlobalTask(e, Priority::CheckpointStatsPriority,
+ ADD_STAT a) : GlobalTask(e, TaskId::StatCheckpointTask,
0, false),
ep(e), cookie(c), add_stat(a) { }
bool run(void) {
*cookie,
ADD_STAT
add_stat) {
- for (size_t i = 0; i < MAX_TYPE_ID; ++i) {
- add_casted_stat(Priority::getTypeName(static_cast<type_id_t>(i)),
- stats.schedulingHisto[i],
+ for (TaskId id : GlobalTask::allTaskIds) {
+ add_casted_stat(GlobalTask::getTaskName(id),
+ stats.schedulingHisto[static_cast<int>(id)],
add_stat, cookie);
}
*cookie,
ADD_STAT
add_stat) {
- for (size_t i = 0; i < MAX_TYPE_ID; ++i) {
- add_casted_stat(Priority::getTypeName(static_cast<type_id_t>(i)),
- stats.taskRuntimeHisto[i],
+ for (TaskId id : GlobalTask::allTaskIds) {
+ add_casted_stat(GlobalTask::getTaskName(id),
+ stats.taskRuntimeHisto[static_cast<int>(id)],
add_stat, cookie);
}
public:
FetchAllKeysTask(EventuallyPersistentEngine *e, const void *c,
ADD_RESPONSE resp, const std::string &start_key_,
- uint16_t vbucket, uint32_t count_, const Priority &p) :
- GlobalTask(e, p, 0, false), engine(e), cookie(c),
+ uint16_t vbucket, uint32_t count_) :
+ GlobalTask(e, TaskId::FetchAllKeysTask, 0, false), engine(e), cookie(c),
response(resp), start_key(start_key_), vbid(vbucket),
count(count_) { }
std::string start_key(keyptr, keylen);
ExTask task = new FetchAllKeysTask(this, cookie, response, start_key,
- vbucket, count,
- Priority::BgFetcherPriority);
+ vbucket, count);
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
return ENGINE_EWOULDBLOCK;
}
if (currentTask->isdead()) {
// release capacity back to TaskQueue
manager->doneWork(curTaskType);
- manager->cancel(currentTask->taskId, true);
+ manager->cancel(currentTask->uid, true);
continue;
}
if (!again || currentTask->isdead()) {
// release capacity back to TaskQueue
manager->doneWork(curTaskType);
- manager->cancel(currentTask->taskId, true);
+ manager->cancel(currentTask->uid, true);
} else {
hrtime_t new_waketime;
// if a task has not set snooze, update its waketime to now
void Flusher::schedule_UNLOCKED() {
ExecutorPool* iom = ExecutorPool::get();
ExTask task = new FlusherTask(ObjectRegistry::getCurrentEngine(),
- this, Priority::FlusherPriority,
+ this,
shard->getId());
this->setTaskId(task->getId());
iom->schedule(task, WRITER_TASK_IDX);
bool HashtableResizerTask::run(void) {
shared_ptr<ResizingVisitor> pv(new ResizingVisitor);
store->visit(pv, "Hashtable resizer", NONIO_TASK_IDX,
- Priority::ItemPagerPriority);
+ TaskId::HashtableResizerVisitorTask);
snooze(FREQUENCY);
return true;
public:
HashtableResizerTask(EventuallyPersistentStore *s, double sleepTime) :
- GlobalTask(&s->getEPEngine(), Priority::HTResizePriority, sleepTime, false),
+ GlobalTask(&s->getEPEngine(), TaskId::HashtableResizerTask, sleepTime, false),
store(s) {}
bool run(void);
available,
false, bias, &phase));
store->visit(pv, "Item pager", NONIO_TASK_IDX,
- Priority::ItemPagerPriority);
+ TaskId::ItemPagerVisitor);
}
snooze(sleepTime);
true, 1, NULL));
// track spawned tasks for shutdown..
store->visit(pv, "Expired item remover", NONIO_TASK_IDX,
- Priority::ItemPagerPriority, 10);
+ TaskId::ExpiredItemPagerVisitor, 10);
}
snooze(sleepTime);
return true;
* @param st the stats
*/
ItemPager(EventuallyPersistentEngine *e, EPStats &st) :
- GlobalTask(e, Priority::ItemPagerPriority, 10, false),
+ GlobalTask(e, TaskId::ItemPager, 10, false),
engine(e), stats(st), available(true), phase(PAGING_UNREFERENCED),
doEvict(false) {}
*/
ExpiredItemPager(EventuallyPersistentEngine *e, EPStats &st,
size_t stime) :
- GlobalTask(e, Priority::ItemPagerPriority, static_cast<double>(stime),
+ GlobalTask(e, TaskId::ExpiredItemPager, static_cast<double>(stime),
false), engine(e), stats(st), sleepTime(static_cast<double>(stime)),
available(true) { }
+++ /dev/null
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- * Copyright 2010 Couchbase, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "config.h"
-
-#include "priority.h"
-
-// Priorities for Read-only IO tasks
-const Priority Priority::BgFetcherPriority(BGFETCHER_ID, 0);
-const Priority Priority::BgFetcherGetMetaPriority(BGFETCHER_GET_META_ID, 1);
-const Priority Priority::WarmupPriority(WARMUP_ID, 0);
-const Priority Priority::VKeyStatBgFetcherPriority(VKEY_STAT_BGFETCHER_ID, 3);
-
-// Priorities for Auxiliary IO tasks
-const Priority Priority::TapBgFetcherPriority(TAP_BGFETCHER_ID, 1);
-const Priority Priority::AccessScannerPriority(ACCESS_SCANNER_ID, 3);
-const Priority Priority::BackfillTaskPriority(BACKFILL_TASK_ID, 8);
-
-// Priorities for Read-Write IO tasks
-const Priority Priority::VBucketDeletionPriority(VBUCKET_DELETION_ID, 1);
-const Priority Priority::CompactorPriority(COMPACTOR_ID, 2);
-const Priority Priority::VBucketPersistHighPriority(VBUCKET_PERSIST_HIGH_ID, 2);
-const Priority Priority::FlushAllPriority(FLUSHALL_ID, 3);
-const Priority Priority::FlusherPriority(FLUSHER_ID, 5);
-const Priority Priority::VBucketPersistLowPriority(VBUCKET_PERSIST_LOW_ID, 9);
-const Priority Priority::StatSnapPriority(STAT_SNAP_ID, 9);
-const Priority Priority::MutationLogCompactorPriority(MUTATION_LOG_COMPACTOR_ID, 9);
-
-// Priorities for NON-IO tasks
-const Priority Priority::PendingOpsPriority(PENDING_OPS_ID, 0);
-const Priority Priority::TapConnNotificationPriority(TAP_CONN_NOTIFICATION_ID, 5);
-const Priority Priority::CheckpointRemoverPriority(CHECKPOINT_REMOVER_ID, 6);
-const Priority Priority::TapConnectionReaperPriority(TAP_CONNECTION_REAPER_ID, 6);
-const Priority Priority::VBMemoryDeletionPriority(VB_MEMORY_DELETION_ID, 6);
-const Priority Priority::CheckpointStatsPriority(CHECKPOINT_STATS_ID, 7);
-const Priority Priority::ItemPagerPriority(ITEM_PAGER_ID, 7);
-const Priority Priority::DefragmenterTaskPriority(DEFRAGMENTER_ID, 7);
-const Priority Priority::TapConnMgrPriority(TAP_CONN_MGR_ID, 8);
-const Priority Priority::WorkLoadMonitorPriority(WORKLOAD_MONITOR_TASK_ID, 10);
-const Priority Priority::HTResizePriority(HT_RESIZER_ID, 211);
-const Priority Priority::TapResumePriority(TAP_RESUME_ID, 316);
-const Priority Priority::ActiveStreamCheckpointProcessor(ACTIVE_STREAM_CHKPT_PROCESSOR_ID, 5);
-
-const char *Priority::getTypeName(const type_id_t i) {
- switch (i) {
- case BGFETCHER_ID:
- return "bg_fetcher_tasks";
- case BGFETCHER_GET_META_ID:
- return "bg_fetcher_meta_tasks";
- case TAP_BGFETCHER_ID:
- return "tap_bg_fetcher_tasks";
- case VKEY_STAT_BGFETCHER_ID:
- return "vkey_stat_bg_fetcher_tasks";
- case WARMUP_ID:
- return "warmup_tasks";
- case VBUCKET_PERSIST_HIGH_ID:
- return "vbucket_persist_high_tasks";
- case VBUCKET_DELETION_ID:
- return "vbucket_deletion_tasks";
- case FLUSHER_ID:
- return "flusher_tasks";
- case FLUSHALL_ID:
- return "flush_all_tasks";
- case COMPACTOR_ID:
- return "compactor_tasks";
- case VBUCKET_PERSIST_LOW_ID:
- return "vbucket_persist_low_tasks";
- case STAT_SNAP_ID:
- return "statsnap_tasks";
- case MUTATION_LOG_COMPACTOR_ID:
- return "mutation_log_compactor_tasks";
- case ACCESS_SCANNER_ID:
- return "access_scanner_tasks";
- case TAP_CONN_NOTIFICATION_ID:
- return "conn_notification_tasks";
- case CHECKPOINT_REMOVER_ID:
- return "checkpoint_remover_tasks";
- case VB_MEMORY_DELETION_ID:
- return "vb_memory_deletion_tasks";
- case CHECKPOINT_STATS_ID:
- return "checkpoint_stats_tasks";
- case ITEM_PAGER_ID:
- return "item_pager_tasks";
- case BACKFILL_TASK_ID:
- return "backfill_tasks_tasks";
- case WORKLOAD_MONITOR_TASK_ID:
- return "workload_monitor_tasks";
- case TAP_RESUME_ID:
- return "tap_resume_tasks";
- case TAP_CONNECTION_REAPER_ID:
- return "tapconnection_reaper_tasks";
- case HT_RESIZER_ID:
- return "hashtable_resize_tasks";
- case PENDING_OPS_ID:
- return "pending_ops_tasks";
- case TAP_CONN_MGR_ID:
- return "conn_manager_tasks";
- case DEFRAGMENTER_ID:
- return "defragmenter_tasks";
- case ACTIVE_STREAM_CHKPT_PROCESSOR_ID:
- return "activestream_chkpt_processor_tasks";
- default: break;
- }
-
- return "error";
- }
+++ /dev/null
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- * Copyright 2010 Couchbase, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef SRC_PRIORITY_H_
-#define SRC_PRIORITY_H_ 1
-
-#include "config.h"
-
-#include <string>
-
-#include "common.h"
-
-typedef enum {
- BGFETCHER_ID=0,
- BGFETCHER_GET_META_ID,
- TAP_BGFETCHER_ID,
- VKEY_STAT_BGFETCHER_ID,
- WARMUP_ID,
- VBUCKET_PERSIST_HIGH_ID,
- VBUCKET_DELETION_ID,
- FLUSHER_ID,
- FLUSHALL_ID,
- COMPACTOR_ID,
- VBUCKET_PERSIST_LOW_ID,
- STAT_SNAP_ID,
- MUTATION_LOG_COMPACTOR_ID,
- ACCESS_SCANNER_ID,
- TAP_CONN_NOTIFICATION_ID,
- CHECKPOINT_REMOVER_ID,
- VB_MEMORY_DELETION_ID,
- CHECKPOINT_STATS_ID,
- ITEM_PAGER_ID,
- BACKFILL_TASK_ID,
- WORKLOAD_MONITOR_TASK_ID,
- TAP_RESUME_ID,
- TAP_CONNECTION_REAPER_ID,
- HT_RESIZER_ID,
- PENDING_OPS_ID,
- TAP_CONN_MGR_ID,
- DEFRAGMENTER_ID,
- ACTIVE_STREAM_CHKPT_PROCESSOR_ID,
-
- MAX_TYPE_ID // Keep this as the last enum value
-} type_id_t;
-
-/**
- * Task priority definition.
- */
-class Priority {
-public:
- // Priorities for Read-only tasks
- static const Priority BgFetcherPriority;
- static const Priority BgFetcherGetMetaPriority;
- static const Priority TapBgFetcherPriority;
- static const Priority VKeyStatBgFetcherPriority;
- static const Priority WarmupPriority;
-
- // Priorities for Read-Write tasks
- static const Priority VBucketPersistHighPriority;
- static const Priority VBucketDeletionPriority;
- static const Priority FlusherPriority;
- static const Priority FlushAllPriority;
- static const Priority CompactorPriority;
- static const Priority VBucketPersistLowPriority;
- static const Priority StatSnapPriority;
- static const Priority MutationLogCompactorPriority;
- static const Priority AccessScannerPriority;
-
- // Priorities for NON-IO tasks
- static const Priority TapConnNotificationPriority;
- static const Priority CheckpointRemoverPriority;
- static const Priority VBMemoryDeletionPriority;
- static const Priority CheckpointStatsPriority;
- static const Priority ItemPagerPriority;
- static const Priority BackfillTaskPriority;
- static const Priority WorkLoadMonitorPriority;
- static const Priority TapResumePriority;
- static const Priority TapConnectionReaperPriority;
- static const Priority HTResizePriority;
- static const Priority PendingOpsPriority;
- static const Priority TapConnMgrPriority;
- static const Priority DefragmenterTaskPriority;
- static const Priority ActiveStreamCheckpointProcessor;
-
- bool operator==(const Priority &other) const {
- return other.getPriorityValue() == this->priority;
- }
-
- bool operator<(const Priority &other) const {
- return this->priority > other.getPriorityValue();
- }
-
- bool operator>(const Priority &other) const {
- return this->priority < other.getPriorityValue();
- }
-
- /**
- * Return the task name.
- *
- * @return a task name
- */
- static const char *getTypeName(const type_id_t i);
-
- /**
- * Return the type id representing a task
- *
- * @return type id
- */
- type_id_t getTypeId() const {
- return t_id;
- }
-
- /**
- * Return an integer value that represents a priority.
- *
- * @return a priority value
- */
- int getPriorityValue() const {
- return priority;
- }
-
- // gcc didn't like the idea of having a class with no constructor
- // available to anyone.. let's make it protected instead to shut
- // gcc up :(
-protected:
- Priority(type_id_t id, int p) : t_id(id), priority(p) { }
- type_id_t t_id;
- int priority;
-
-private:
- DISALLOW_COPY_AND_ASSIGN(Priority);
-};
-
-#endif // SRC_PRIORITY_H_
public:
ResumeCallback(EventuallyPersistentEngine &e, Producer *c,
double sleepTime)
- : GlobalTask(&e, Priority::TapResumePriority, sleepTime),
+ : GlobalTask(&e, TaskId::ResumeCallback, sleepTime),
engine(e), conn(c) {
std::stringstream ss;
ss << "Resuming suspended tap connection: " << conn->getName();
void TapProducer::queueBGFetch_UNLOCKED(const std::string &key, uint64_t id, uint16_t vb) {
ExTask task = new BGFetchCallback(&engine(), getName(), key, vb,
- getConnectionToken(),
- Priority::TapBgFetcherPriority, 0);
+ getConnectionToken(), 0);
ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
++bgJobIssued;
std::map<uint16_t, CheckpointState>::iterator it = checkpointState_.find(vb);
public:
BGFetchCallback(EventuallyPersistentEngine *e, const std::string &n,
const std::string &k, uint16_t vbid, hrtime_t token,
- const Priority &p, double sleeptime = 0) :
- GlobalTask(e, p, sleeptime, false), name(n), key(k), epe(e),
+ double sleeptime = 0) :
+ GlobalTask(e, TaskId::BGFetchCallback, sleeptime, false), name(n), key(k), epe(e),
init(gethrtime()), connToken(token), vbucket(vbid)
{
cb_assert(epe);
#include "tasks.h"
#include "warmup.h"
+#include <type_traits>
+
static const double VBSTATE_SNAPSHOT_FREQ(300.0);
static const double WORKLOAD_MONITOR_FREQ(5.0);
+GlobalTask::GlobalTask(EventuallyPersistentEngine *e,
+ TaskId taskId,
+ double sleeptime,
+ bool completeBeforeShutdown)
+ : RCValue(),
+ blockShutdown(completeBeforeShutdown),
+ state(TASK_RUNNING),
+ uid(nextTaskId()),
+ typeId(taskId),
+ engine(e) {
+ priority = getTaskPriority(taskId);
+ snooze(sleeptime);
+}
+
void GlobalTask::snooze(const double secs) {
if (secs == INT_MAX) {
setState(TASK_SNOOZED, TASK_RUNNING);
}
}
+// These static_asserts previously were in priority_test.cc
+static_assert(TaskPriority::MultiBGFetcherTask < TaskPriority::BGFetchCallback,
+ "MultiBGFetcherTask not less than BGFetchCallback");
+
+static_assert(TaskPriority::BGFetchCallback == TaskPriority::VBDeleteTask,
+ "BGFetchCallback not equal VBDeleteTask");
+
+static_assert(TaskPriority::VBStatePersistTaskHigh <
+ TaskPriority::VKeyStatBGFetchTask,
+ "VBStatePersistTaskHigh not less than VKeyStatBGFetchTask");
+
+static_assert(TaskPriority::VKeyStatBGFetchTask < TaskPriority::FlusherTask,
+ "VKeyStatBGFetchTask not less than FlusherTask");
+
+static_assert(TaskPriority::FlusherTask < TaskPriority::ItemPager,
+ "FlusherTask not less than ItemPager");
+
+static_assert(TaskPriority::ItemPager < TaskPriority::BackfillManagerTask,
+ "ItemPager not less than BackfillManagerTask");
+
+/*
+ * Generate a switch statement from tasks.def.h that maps TaskId to a
+ * stringified value of the task's name.
+ */
+const char* GlobalTask::getTaskName(TaskId id) {
+ switch(id) {
+#define TASK(name, prio) case TaskId::name: {return #name;}
+#include "tasks.def.h"
+#undef TASK
+ case TaskId::TASK_COUNT: {
+ throw std::invalid_argument("GlobalTask::getTaskName(TaskId::TASK_COUNT) called.");
+ }
+ }
+ throw std::logic_error("GlobalTask::getTaskName() unknown id " +
+ std::to_string(static_cast<int>(id)));
+ return nullptr;
+}
+
+/*
+ * Generate a switch statement from tasks.def.h that maps TaskId to priority
+ */
+TaskPriority GlobalTask::getTaskPriority(TaskId id) {
+ switch(id) {
+#define TASK(name, prio) case TaskId::name: {return TaskPriority::name;}
+#include "tasks.def.h"
+#undef TASK
+ case TaskId::TASK_COUNT: {
+ throw std::invalid_argument("GlobalTask::getTaskPriority(TaskId::TASK_COUNT) called.");
+ }
+ }
+ throw std::logic_error("GlobalTask::getTaskPriority() unknown id " +
+ std::to_string(static_cast<int>(id)));
+ return TaskPriority::PRIORITY_COUNT;
+}
+
+std::array<TaskId, static_cast<int>(TaskId::TASK_COUNT)> GlobalTask::allTaskIds = {{
+#define TASK(name, prio) TaskId::name,
+#include "tasks.def.h"
+#undef TASK
+}};
+
+
bool FlusherTask::run() {
return flusher->step(this);
}
}
DaemonVBSnapshotTask::DaemonVBSnapshotTask(EventuallyPersistentEngine *e,
- bool completeBeforeShutdown) :
- GlobalTask(e, Priority::VBucketPersistLowPriority, VBSTATE_SNAPSHOT_FREQ,
- completeBeforeShutdown) {
- desc = "Snapshotting vbucket states";
+ bool completeBeforeShutdown)
+ : GlobalTask(e, TaskId::DaemonVBSnapshotTask,
+ VBSTATE_SNAPSHOT_FREQ, completeBeforeShutdown) {
+ desc = "Snapshotting vbucket states";
}
bool DaemonVBSnapshotTask::run() {
- bool ret = engine->getEpStore()->scheduleVBSnapshot(
- Priority::VBucketPersistLowPriority);
+ bool ret = engine->getEpStore()->scheduleVBSnapshot(VBSnapshotTask::Priority::LOW);
snooze(VBSTATE_SNAPSHOT_FREQ);
return ret;
}
bool VBStatePersistTask::run() {
- return engine->getEpStore()->persistVBState(priority, vbid);
+ return engine->getEpStore()->persistVBState(vbid);
}
bool VBDeleteTask::run() {
if (runOnce) {
return false;
}
- ExecutorPool::get()->snooze(taskId, 60);
+ ExecutorPool::get()->snooze(uid, 60);
return true;
}
-bool BgFetcherTask::run() {
+bool MultiBGFetcherTask::run() {
return bgfetcher->run(this);
}
}
-bool BGFetchTask::run() {
+bool SingleBGFetcherTask::run() {
engine->getEpStore()->completeBGFetch(key, vbucket, cookie, init,
metaFetch);
return false;
WorkLoadMonitor::WorkLoadMonitor(EventuallyPersistentEngine *e,
bool completeBeforeShutdown) :
- GlobalTask(e, Priority::WorkLoadMonitorPriority, WORKLOAD_MONITOR_FREQ,
+ GlobalTask(e, TaskId::WorkLoadMonitor, WORKLOAD_MONITOR_FREQ,
completeBeforeShutdown) {
prevNumMutations = getNumMutations();
prevNumGets = getNumGets();
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2016 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Every task within ep-engine is declared in this file
+ *
+ * The TASK(name, priority) macro will be pre-processed to generate
+ * - a unique std::string name
+ * - a unique type-id
+ * - a unique priority object
+ *
+ * task.h and .cc include this file with a customised TASK macro.
+ */
+
+// Read IO tasks
+TASK(MultiBGFetcherTask, 0)
+TASK(FetchAllKeysTask, 0)
+TASK(Warmup, 0)
+TASK(WarmupInitialize, 0)
+TASK(WarmupCreateVBuckets, 0)
+TASK(WarmupEstimateDatabaseItemCount, 0)
+TASK(WarmupKeyDump, 0)
+TASK(WarmupCheckforAccessLog, 0)
+TASK(WarmupLoadAccessLog, 0)
+TASK(WarmupLoadingKVPairs, 0)
+TASK(WarmupLoadingData, 0)
+TASK(WarmupCompletion, 0)
+TASK(SingleBGFetcherTask, 1)
+TASK(VKeyStatBGFetchTask, 3)
+
+// Aux IO tasks
+TASK(BackfillDiskLoad, 1)
+TASK(BGFetchCallback, 1)
+TASK(AccessScanner, 3)
+TASK(VBucketVisitorTask, 3)
+TASK(ActiveStreamCheckpointProcessorTask, 5)
+TASK(BackfillManagerTask, 8)
+TASK(BackfillTask, 8)
+TASK(BackfillVisitorTask, 8)
+
+// Read/Write IO tasks
+TASK(VBDeleteTask, 1)
+TASK(RollbackTask, 1)
+TASK(CompactVBucketTask, 2)
+TASK(VBStatePersistTaskHigh, 2)
+TASK(VBSnapshotTaskHigh, 2)
+TASK(FlushAllTask, 3)
+TASK(FlusherTask, 5)
+TASK(VBStatePersistTaskLow, 9)
+TASK(VBSnapshotTaskLow, 9)
+TASK(DaemonVBSnapshotTask, 9)
+TASK(StatSnap, 9)
+
+// Non-IO tasks
+TASK(PendingOpsNotification, 0)
+TASK(Processor, 0)
+TASK(ConnNotifierCallback, 5)
+TASK(ConnectionReaperCallback, 6)
+TASK(ClosedUnrefCheckpointRemoverTask, 6)
+TASK(ClosedUnrefCheckpointRemoverVisitorTask, 6)
+TASK(VBucketMemoryDeletionTask, 6)
+TASK(StatCheckpointTask, 7)
+TASK(ItemPager, 7)
+TASK(ExpiredItemPager, 7)
+TASK(ItemPagerVisitor, 7)
+TASK(ExpiredItemPagerVisitor, 7)
+TASK(DefragmenterTask, 7)
+TASK(ConnManager, 8)
+TASK(WorkLoadMonitor, 10)
+TASK(ResumeCallback, 316)
+TASK(HashtableResizerTask, 211)
+TASK(HashtableResizerVisitorTask, 7)
#include "config.h"
+#include <array>
#include <list>
#include <string>
#include <utility>
#include "atomic.h"
-#include "priority.h"
typedef enum {
TASK_RUNNING,
TASK_DEAD
} task_state_t;
+enum class TaskId : int {
+#define TASK(name, prio) name,
+#include "tasks.def.h"
+#undef TASK
+ TASK_COUNT
+};
+
+typedef int queue_priority_t;
+
+enum class TaskPriority : int {
+#define TASK(name, prio) name = prio,
+#include "tasks.def.h"
+#undef TASK
+ PRIORITY_COUNT
+};
+
class BfilterCB;
class BgFetcher;
class CompareTasksByDueDate;
friend class ExecutorThread;
friend class TaskQueue;
public:
- GlobalTask(EventuallyPersistentEngine *e, const Priority &p,
- double sleeptime = 0, bool completeBeforeShutdown = true) :
- RCValue(), priority(p),
- blockShutdown(completeBeforeShutdown),
- state(TASK_RUNNING), taskId(nextTaskId()), engine(e) {
- snooze(sleeptime);
- }
+ GlobalTask(EventuallyPersistentEngine *e,
+ TaskId taskId,
+ double sleeptime = 0,
+ bool completeBeforeShutdown = true);
/* destructor */
- virtual ~GlobalTask(void) {
- }
+ virtual ~GlobalTask(void) {}
/**
* The invoked function when the task is executed.
/**
* test if a task is dead
*/
- bool isdead(void) {
+ bool isdead(void) {
return (state == TASK_DEAD);
- }
-
+ }
/**
* Cancels this task by marking it dead.
*
* @return A unique task id number.
*/
- size_t getId() { return taskId; }
+ size_t getId() const { return uid; }
/**
* Returns the type id of this task.
*
* @return A type id of the task.
*/
- type_id_t getTypeId() { return priority.getTypeId(); }
+ TaskId getTypeId() const { return typeId; }
/**
* Gets the engine that this task was scheduled from
state.compare_exchange_strong(expected, tstate);
}
+ queue_priority_t getQueuePriority() const {
+ return static_cast<queue_priority_t>(priority);
+ }
+
+ /*
+ * Lookup the task name for TaskId id.
+ * The data used is generated from tasks.def.h
+ */
+ static const char* getTaskName(TaskId id);
+
+ /*
+ * Lookup the task priority for TaskId id.
+ * The data used is generated from tasks.def.h
+ */
+ static TaskPriority getTaskPriority(TaskId id);
+
+ /*
+ * A vector of all TaskId generated from tasks.def.h
+ */
+ static std::array<TaskId, static_cast<int>(TaskId::TASK_COUNT)> allTaskIds;
+
protected:
- const Priority &priority;
bool blockShutdown;
AtomicValue<task_state_t> state;
- const size_t taskId;
+ const size_t uid;
+ TaskId typeId;
+ TaskPriority priority;
EventuallyPersistentEngine *engine;
static AtomicValue<size_t> task_id_counter;
*/
class FlusherTask : public GlobalTask {
public:
- FlusherTask(EventuallyPersistentEngine *e, Flusher* f, const Priority &p,
- uint16_t shardid, bool completeBeforeShutdown = true) :
- GlobalTask(e, p, 0, completeBeforeShutdown), flusher(f) {
+ FlusherTask(EventuallyPersistentEngine *e, Flusher* f, uint16_t shardid,
+ bool completeBeforeShutdown = true)
+ : GlobalTask(e, TaskId::FlusherTask, 0, completeBeforeShutdown),
+ flusher(f) {
std::stringstream ss;
ss<<"Running a flusher loop: shard "<<shardid;
desc = ss.str();
*/
class VBSnapshotTask : public GlobalTask {
public:
- VBSnapshotTask(EventuallyPersistentEngine *e, const Priority &p,
- uint16_t sID = 0, bool completeBeforeShutdown = true) :
- GlobalTask(e, p, 0, completeBeforeShutdown), shardID(sID) {
- std::stringstream ss;
- ss<<"Snapshotting vbucket states for the shard: "<<shardID;
- desc = ss.str();
- }
+ enum class Priority {
+ HIGH,
+ LOW
+ };
bool run();
std::string getDescription() {
- return desc;
+ return "Snapshotting vbucket states for the shard: " + std::to_string(shardID);
}
+protected:
+ VBSnapshotTask(EventuallyPersistentEngine *e,
+ TaskId id,
+ uint16_t sID = 0,
+ bool completeBeforeShutdown = true)
+ : GlobalTask(e, id, 0, completeBeforeShutdown),
+ shardID(sID) {}
+
private:
uint16_t shardID;
- std::string desc;
+ Priority priority;
+};
+
+class VBSnapshotTaskHigh : public VBSnapshotTask {
+public:
+ VBSnapshotTaskHigh(EventuallyPersistentEngine *e,
+ uint16_t sID = 0,
+ bool completeBeforeShutdown = true)
+ : VBSnapshotTask(e, TaskId::VBSnapshotTaskHigh,
+ sID, completeBeforeShutdown){}
+};
+
+class VBSnapshotTaskLow : public VBSnapshotTask {
+public:
+ VBSnapshotTaskLow(EventuallyPersistentEngine *e,
+ uint16_t sID = 0,
+ bool completeBeforeShutdown = true)
+ : VBSnapshotTask(e, TaskId::VBSnapshotTaskLow,
+ sID, completeBeforeShutdown){}
};
/**
*/
class VBStatePersistTask : public GlobalTask {
public:
- VBStatePersistTask(EventuallyPersistentEngine *e, const Priority &p,
- uint16_t vbucket, bool completeBeforeShutdown = true) :
- GlobalTask(e, p, 0, completeBeforeShutdown), vbid(vbucket) {
- std::stringstream ss;
- ss<<"Persisting a vbucket state for vbucket: "<< vbid;
- desc = ss.str();
- }
+ enum class Priority {
+ HIGH,
+ LOW
+ };
bool run();
std::string getDescription() {
- return desc;
+ return "Persisting a vbucket state for vbucket: " + std::to_string(vbid);
+ }
+
+protected:
+ VBStatePersistTask(EventuallyPersistentEngine *e,
+ TaskId taskId,
+ uint16_t vbucket,
+ bool completeBeforeShutdown = true)
+ : GlobalTask(e, taskId, 0, completeBeforeShutdown),
+ vbid(vbucket) {
}
private:
uint16_t vbid;
- std::string desc;
+};
+
+class VBStatePersistTaskHigh : public VBStatePersistTask {
+public:
+ VBStatePersistTaskHigh(EventuallyPersistentEngine *e,
+ uint16_t vbucket,
+ bool completeBeforeShutdown = true)
+ : VBStatePersistTask(e, TaskId::VBStatePersistTaskHigh,
+ vbucket, completeBeforeShutdown) {}
+};
+
+class VBStatePersistTaskLow : public VBStatePersistTask {
+public:
+ VBStatePersistTaskLow(EventuallyPersistentEngine *e,
+ uint16_t vbucket,
+ bool completeBeforeShutdown = true)
+ : VBStatePersistTask(e, TaskId::VBStatePersistTaskLow,
+ vbucket, completeBeforeShutdown) {}
};
/**
class VBDeleteTask : public GlobalTask {
public:
VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c,
- const Priority &p, bool completeBeforeShutdown = true) :
- GlobalTask(e, p, 0, completeBeforeShutdown),
- vbucketId(vbid), cookie(c) { }
+ bool completeBeforeShutdown = true)
+ : GlobalTask(e, TaskId::VBDeleteTask, 0, completeBeforeShutdown),
+ vbucketId(vbid), cookie(c) {}
bool run();
*/
class CompactVBucketTask : public GlobalTask {
public:
- CompactVBucketTask(EventuallyPersistentEngine *e, const Priority &p,
- uint16_t vbucket, compaction_ctx c, const void *ck,
+ CompactVBucketTask(EventuallyPersistentEngine *e, uint16_t vbucket,
+ compaction_ctx c, const void *ck,
bool completeBeforeShutdown = false) :
- GlobalTask(e, p, 0, completeBeforeShutdown),
+ GlobalTask(e, TaskId::CompactVBucketTask, 0, completeBeforeShutdown),
vbid(vbucket), compactCtx(c), cookie(ck)
{
std::stringstream ss;
*/
class StatSnap : public GlobalTask {
public:
- StatSnap(EventuallyPersistentEngine *e, const Priority &p,
- bool runOneTimeOnly = false, bool sleeptime = 0,
- bool completeBeforeShutdown = false) :
- GlobalTask(e, p, sleeptime, completeBeforeShutdown),
- runOnce(runOneTimeOnly) { }
+ StatSnap(EventuallyPersistentEngine *e, bool runOneTimeOnly = false,
+ bool sleeptime = 0, bool completeBeforeShutdown = false)
+ : GlobalTask(e, TaskId::StatSnap, sleeptime, completeBeforeShutdown),
+ runOnce(runOneTimeOnly) {}
bool run();
/**
* A task for fetching items from disk.
+ * This task is used if EventuallyPersistentStore::multiBGFetchEnabled is true.
*/
-class BgFetcherTask : public GlobalTask {
+class MultiBGFetcherTask : public GlobalTask {
public:
- BgFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b,
- const Priority &p, bool sleeptime = 0,
- bool completeBeforeShutdown = false) :
- GlobalTask(e, p, sleeptime, completeBeforeShutdown),
- bgfetcher(b) { }
+ MultiBGFetcherTask(EventuallyPersistentEngine *e, BgFetcher *b, bool sleeptime = 0,
+ bool completeBeforeShutdown = false)
+ : GlobalTask(e, TaskId::MultiBGFetcherTask, sleeptime, completeBeforeShutdown),
+ bgfetcher(b) {}
bool run();
*/
class FlushAllTask : public GlobalTask {
public:
- FlushAllTask(EventuallyPersistentEngine *e, double when) :
- GlobalTask(e, Priority::FlushAllPriority, when, false) { }
+ FlushAllTask(EventuallyPersistentEngine *e, double when)
+ : GlobalTask(e, TaskId::FlushAllTask, when, false) {}
bool run();
class VKeyStatBGFetchTask : public GlobalTask {
public:
VKeyStatBGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
- uint16_t vbid, uint64_t s, const void *c,
- const Priority &p, int sleeptime = 0,
- bool completeBeforeShutdown = false) :
- GlobalTask(e, p, sleeptime, completeBeforeShutdown), key(k),
- vbucket(vbid), bySeqNum(s), cookie(c) { }
+ uint16_t vbid, uint64_t s, const void *c, int sleeptime = 0,
+ bool completeBeforeShutdown = false)
+ : GlobalTask(e, TaskId::VKeyStatBGFetchTask, sleeptime, completeBeforeShutdown),
+ key(k),
+ vbucket(vbid),
+ bySeqNum(s),
+ cookie(c) {}
bool run();
/**
* A task that performs disk fetches for non-resident get requests.
+ * This task is used if EventuallyPersistentStore::multiBGFetchEnabled is false.
*/
-class BGFetchTask : public GlobalTask {
+class SingleBGFetcherTask : public GlobalTask {
public:
- BGFetchTask(EventuallyPersistentEngine *e, const std::string &k,
- uint16_t vbid, const void *c, bool isMeta,
- const Priority &p, int sleeptime = 0,
- bool completeBeforeShutdown = false) :
- GlobalTask(e, p, sleeptime, completeBeforeShutdown),
- key(k), vbucket(vbid), cookie(c), metaFetch(isMeta),
- init(gethrtime()) { }
+ SingleBGFetcherTask(EventuallyPersistentEngine *e, const std::string &k,
+ uint16_t vbid, const void *c, bool isMeta,
+ int sleeptime = 0, bool completeBeforeShutdown = false)
+ : GlobalTask(e, TaskId::SingleBGFetcherTask, sleeptime, completeBeforeShutdown),
+ key(k),
+ vbucket(vbid),
+ cookie(c),
+ metaFetch(isMeta),
+ init(gethrtime()) {}
bool run();
/**
* Order tasks by their priority and taskId (try to ensure FIFO)
+ * @return true if t2 should have priority over t1
*/
class CompareByPriority {
public:
bool operator()(ExTask &t1, ExTask &t2) {
- return (t1->priority == t2->priority) ?
- (t1->taskId > t2->taskId) :
- (t1->priority < t2->priority);
+ return (t1->getQueuePriority() == t2->getQueuePriority()) ?
+ (t1->uid > t2->uid) :
+ (t1->getQueuePriority() > t2->getQueuePriority());
}
};
/**
* Order tasks by their ready date.
+ * @return true if t2 should have priority over t1
*/
class CompareByDueDate {
public:
void Warmup::scheduleInitialize()
{
- ExTask task = new WarmupInitialize(*store, this,
- Priority::WarmupPriority);
+ ExTask task = new WarmupInitialize(*store, this);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
{
threadtask_count = 0;
for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
- ExTask task = new WarmupCreateVBuckets(*store, i, this,
- Priority::WarmupPriority);
+ ExTask task = new WarmupCreateVBuckets(*store, i, this);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
estimateTime = 0;
estimatedItemCount = 0;
for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
- ExTask task = new WarmupEstimateDatabaseItemCount(
- *store, i, this, Priority::WarmupPriority);
+ ExTask task = new WarmupEstimateDatabaseItemCount(*store, i, this);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
{
threadtask_count = 0;
for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
- ExTask task = new WarmupKeyDump(*store, this,
- i, Priority::WarmupPriority);
+ ExTask task = new WarmupKeyDump(*store, this, i);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
void Warmup::scheduleCheckForAccessLog()
{
- ExTask task = new WarmupCheckforAccessLog(*store, this,
- Priority::WarmupPriority);
+ ExTask task = new WarmupCheckforAccessLog(*store, this);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
{
threadtask_count = 0;
for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
- ExTask task = new WarmupLoadAccessLog(*store, this, i,
- Priority::WarmupPriority);
+ ExTask task = new WarmupLoadAccessLog(*store, this, i);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
threadtask_count = 0;
for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
- ExTask task = new WarmupLoadingKVPairs(*store, this,
- i, Priority::WarmupPriority);
+ ExTask task = new WarmupLoadingKVPairs(*store, this, i);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
threadtask_count = 0;
for (size_t i = 0; i < store->vbMap.shards.size(); i++) {
- ExTask task = new WarmupLoadingData(*store, this,
- i, Priority::WarmupPriority);
+ ExTask task = new WarmupLoadingData(*store, this, i);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
}
void Warmup::scheduleCompletion() {
- ExTask task = new WarmupCompletion(*store, this,
- Priority::WarmupPriority);
+ ExTask task = new WarmupCompletion(*store, this);
taskId = task->getId();
ExecutorPool::get()->schedule(task, READER_TASK_IDX);
}
class WarmupInitialize : public GlobalTask {
public:
WarmupInitialize(EventuallyPersistentStore &st,
- Warmup *w, const Priority &p) :
- GlobalTask(&st.getEPEngine(), p, 0, false), _warmup(w) { }
+ Warmup *w) :
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupInitialize, 0, false),
+ _warmup(w) {}
std::string getDescription() {
std::stringstream ss;
class WarmupCreateVBuckets : public GlobalTask {
public:
- WarmupCreateVBuckets(EventuallyPersistentStore &st, uint16_t sh,
- Warmup *w, const Priority &p):
- GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) {}
+ WarmupCreateVBuckets(EventuallyPersistentStore &st,
+ uint16_t sh, Warmup *w):
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupCreateVBuckets, 0, false),
+ _shardId(sh),
+ _warmup(w) {}
std::string getDescription() {
std::stringstream ss;
class WarmupEstimateDatabaseItemCount : public GlobalTask {
public:
WarmupEstimateDatabaseItemCount(EventuallyPersistentStore &st,
- uint16_t sh, Warmup *w, const Priority &p):
- GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) {}
+ uint16_t sh, Warmup *w):
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupEstimateDatabaseItemCount, 0, false),
+ _shardId(sh),
+ _warmup(w) {}
std::string getDescription() {
std::stringstream ss;
class WarmupKeyDump : public GlobalTask {
public:
- WarmupKeyDump(EventuallyPersistentStore &st, Warmup* w,
- uint16_t sh, const Priority &p) :
- GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) {}
+ WarmupKeyDump(EventuallyPersistentStore &st, Warmup *w, uint16_t sh) :
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupKeyDump, 0, false),
+ _shardId(sh),
+ _warmup(w) {}
std::string getDescription() {
std::stringstream ss;
}
bool run() {
-
_warmup->keyDumpforShard(_shardId);
return false;
}
class WarmupCheckforAccessLog : public GlobalTask {
public:
WarmupCheckforAccessLog(EventuallyPersistentStore &st,
- Warmup *w, const Priority &p) :
- GlobalTask(&st.getEPEngine(), p, 0, false), _warmup(w) { }
+ Warmup *w) :
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupCheckforAccessLog, 0, false),
+ _warmup(w) {}
std::string getDescription() {
std::stringstream ss;
class WarmupLoadAccessLog : public GlobalTask {
public:
- WarmupLoadAccessLog(EventuallyPersistentStore &st,
- Warmup *w, uint16_t sh, const Priority &p) :
- GlobalTask(&st.getEPEngine(), p, 0, false), _warmup(w), _shardId(sh) { }
+ WarmupLoadAccessLog(EventuallyPersistentStore &st, Warmup *w, uint16_t sh) :
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupLoadAccessLog, 0, false),
+ _warmup(w), _shardId(sh) {}
std::string getDescription() {
std::stringstream ss;
class WarmupLoadingKVPairs : public GlobalTask {
public:
- WarmupLoadingKVPairs(EventuallyPersistentStore &st, Warmup* w,
- uint16_t sh, const Priority &p) :
- GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) { }
+ WarmupLoadingKVPairs(EventuallyPersistentStore &st, Warmup *w, uint16_t sh) :
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupLoadingKVPairs, 0, false),
+ _shardId(sh),
+ _warmup(w) {}
std::string getDescription() {
std::stringstream ss;
class WarmupLoadingData : public GlobalTask {
public:
- WarmupLoadingData(EventuallyPersistentStore &st, Warmup* w,
- uint16_t sh, const Priority &p) :
- GlobalTask(&st.getEPEngine(), p, 0, false), _shardId(sh), _warmup(w) {}
+ WarmupLoadingData(EventuallyPersistentStore &st, Warmup *w, uint16_t sh) :
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupLoadingData, 0, false),
+ _shardId(sh),
+ _warmup(w) {}
std::string getDescription() {
std::stringstream ss;
class WarmupCompletion : public GlobalTask {
public:
WarmupCompletion(EventuallyPersistentStore &st,
- Warmup *w, const Priority &p) :
- GlobalTask(&st.getEPEngine(), p, 0, false), _warmup(w) { }
+ Warmup *w) :
+ GlobalTask(&st.getEPEngine(), TaskId::WarmupCompletion, 0, false),
+ _warmup(w) {}
std::string getDescription() {
std::stringstream ss;
TestCase("test observe seqno error", test_observe_seqno_error,
test_setup, teardown, NULL, prepare, cleanup),
TestCase("test item pager", test_item_pager, test_setup,
- teardown, "max_size=2048000", prepare, cleanup),
+ teardown, "max_size=2621440", prepare, cleanup),
TestCase("warmup conf", test_warmup_conf, test_setup,
teardown, NULL, prepare, cleanup),
TestCase("bloomfilter conf", test_bloomfilter_conf, test_setup,
+++ /dev/null
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- * Copyright 2010 Couchbase, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "config.h"
-
-#include "priority.h"
-#undef NDEBUG
-
-int main(int argc, char **argv) {
- (void)argc; (void)argv;
-
- cb_assert(Priority::BgFetcherPriority > Priority::TapBgFetcherPriority);
- cb_assert(Priority::TapBgFetcherPriority == Priority::VBucketDeletionPriority);
- cb_assert(Priority::VBucketPersistHighPriority > Priority::VKeyStatBgFetcherPriority);
- cb_assert(Priority::VKeyStatBgFetcherPriority > Priority::FlusherPriority);
- cb_assert(Priority::FlusherPriority > Priority::ItemPagerPriority);
- cb_assert(Priority::ItemPagerPriority > Priority::BackfillTaskPriority);
-
- return 0;
-}