This is how things are done for TAP.
This pretty much removed the notifications' lock overhead on
store/delete/(front-end) OP latencies.
Change-Id: I32c3c26daf6ea8cebeecc2a81fb1f0e957ba3e3d
Reviewed-on: http://review.couchbase.org/56301
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
"dynamic": false,
"type": "size_t"
},
+ "dcp_producer_notifier_yield_limit": {
+ "default": "50",
+ "descr": "The number of notifications before DcpProducerNotifier::run yields.",
+ "type": "size_t"
+ },
"vb0": {
"default": "false",
"type": "bool"
| | during warmup |
| ep_warmup_thread | The status of the warmup thread |
| ep_warmup_time | The amount of time warmup took |
+| ep_dcp_pending_notifications | Shows true/false if notifications are |
+| | pending for the DcpProducerNotifier |
+| | task. |
| ep_workload_pattern | Workload pattern (mixed, read_heavy, |
| | write_heavy) monitored at runtime |
connNotifier_->start();
ExTask connMgr = new ConnManager(&engine, this);
ExecutorPool::get()->schedule(connMgr, NONIO_TASK_IDX);
+ if (ntype == DCP_CONN_NOTIFIER) {
+ dynamic_cast<DcpConnMap *>(this)->startProducerNotifier();
+ }
}
ConnMap::~ConnMap() {
}
DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
- : ConnMap(e) {
-
+ : ConnMap(e),
+ producerNotifier(NULL) {
}
+DcpConnMap::~DcpConnMap() {
+ stopProducerNotifier();
+}
DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
const std::string &name)
void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
{
- size_t lock_num = vbid % vbConnLockNum;
- SpinLockHolder lh(&vbConnLocks[lock_num]);
+ addNotification(vbid, bySeqno);
+ wakeProducerNotifier();
+}
- std::list<connection_t> &conns = vbConns[vbid];
- std::list<connection_t>::iterator it = conns.begin();
- for (; it != conns.end(); ++it) {
- DcpProducer *conn = static_cast<DcpProducer*>((*it).get());
- conn->notifySeqnoAvailable(vbid, bySeqno);
+void DcpConnMap::addNotification(uint16_t vbid, uint64_t bySeqno) {
+ LockHolder lh(notificationsLock);
+ std::deque<DcpProducerNotification>::iterator it = notifications.begin();
+ for (; it != notifications.end(); it++) {
+ if ((*it).vbid == vbid) {
+ (*it).seqno = bySeqno;
+ return;
+ }
}
+
+ notifications.push_back({vbid, bySeqno});
}
+
+bool DcpConnMap::getNextNotification(uint16_t& vbid, uint64_t& seqno) {
+ LockHolder lh(notificationsLock);
+ if (!notifications.empty()) {
+ vbid = notifications.front().vbid;
+ seqno = notifications.front().seqno;
+ notifications.pop_front();
+ return true;
+ }
+ return false;
+}
+
+bool DcpConnMap::notifyProducers() {
+ uint16_t vbid = 0;
+ uint64_t seqno = 0;
+
+ // NB: returns via reference parameters
+ if (getNextNotification(vbid, seqno)) {
+ size_t lock_num = vbid % vbConnLockNum;
+ SpinLockHolder lh(&vbConnLocks[lock_num]);
+ std::list<connection_t> &conns = vbConns[vbid];
+ if (!conns.empty()) {
+ std::list<connection_t>::iterator connection = conns.begin();
+ for (; connection != conns.end(); ++connection) {
+ DcpProducer *conn = static_cast<DcpProducer*>((*connection).get());
+ if (conn) {
+ conn->notifySeqnoAvailable(vbid, seqno);
+ }
+ }
+ }
+ return true;
+ }
+ return false;
+}
+
+bool DcpConnMap::DcpProducerNotifier::run() {
+ if (engine->getEpStats().isShutdown) {
+ return false;
+ }
+
+ // Indidicate that we will snooze forever
+ snooze(INT_MAX);
+
+ // Clear the notification flag.
+ notified.store(false);
+
+ // Process the incoming notification(s)
+ size_t iterations = 0;
+
+ while (dcpConnMap.notifyProducers()
+ && iterations < iterationsBeforeYield) {
+ iterations++;
+ }
+
+ // check if another notify came in or if there's more todo.
+ bool expected = true;
+ if (notified.compare_exchange_strong(expected, false)
+ || dcpConnMap.notificationsPending()) {
+ // sleep for 0 - yielding this task
+ snooze(0.0);
+ }
+
+ return true;
+}
+
+void DcpConnMap::startProducerNotifier() {
+ producerNotifier = new DcpProducerNotifier(&engine, *this);
+ ExecutorPool::get()->schedule(producerNotifier, AUXIO_TASK_IDX);
+}
+
+void DcpConnMap::wakeProducerNotifier() {
+ if (producerNotifier->wakeMeUp()) {
+ ExecutorPool::get()->wake(producerNotifier->getId());
+ }
+}
+
+void DcpConnMap::stopProducerNotifier() {
+ ExecutorPool::get()->cancel(producerNotifier->getId());
+}
\ No newline at end of file
class Item;
class EventuallyPersistentEngine;
-typedef SingleThreadedRCPtr<ConnHandler> connection_t;
+typedef RCPtr<ConnHandler> connection_t;
/**
* Base class for operations performed on tap connections.
*
};
-
class DcpConnMap : public ConnMap {
public:
DcpConnMap(EventuallyPersistentEngine &engine);
+ ~DcpConnMap();
+
/**
* Find or build a dcp connection for the given cookie and with
* the given name.
ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
uint16_t vbucket, uint32_t flags);
+
+ bool notifyProducers();
+ bool notificationsPending() {
+ LockHolder lh(notificationsLock);
+ return !notifications.empty();
+ }
+
+ void startProducerNotifier();
+ void wakeProducerNotifier();
+ void stopProducerNotifier();
+
private:
+ class DcpProducerNotifier : public GlobalTask {
+ public:
+ DcpProducerNotifier(EventuallyPersistentEngine *e,
+ DcpConnMap &dcm) :
+ GlobalTask(e, Priority::TapConnNotificationPriority, INT_MAX, true),
+ dcpConnMap(dcm),
+ notified(false),
+ iterationsBeforeYield(e->getConfiguration()
+ .getDcpProducerNotifierYieldLimit()) {}
+
+ std::string getDescription() {
+ std::string rv("Notifying DCP producers on store operations");
+ return rv;
+ }
+
+ bool run();
+
+ bool wakeMeUp() {
+ bool expected = false;
+ return notified.compare_exchange_strong(expected, true);
+ }
+
+ private:
+ DcpConnMap &dcpConnMap;
+ AtomicValue<bool> notified;
+ size_t iterationsBeforeYield;
+ };
+
+ struct DcpProducerNotification {
+ uint16_t vbid;
+ uint64_t seqno;
+ };
+
+ void addNotification(uint16_t vbid, uint64_t bySeqno);
+ bool getNextNotification(uint16_t& vbid, uint64_t& seqno);
+
bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
void disconnect_UNLOCKED(const void *cookie);
void closeAllStreams_UNLOCKED();
std::list<connection_t> deadConnections;
+
+ DcpProducerNotifier* producerNotifier;
+
+ std::deque<DcpProducerNotification> notifications;
+ Mutex notificationsLock;
};
add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
}
+ add_casted_stat("ep_dcp_pending_notifications",
+ dcpConnMap_->notificationsPending() ? "true" : "false",
+ add_stat, cookie);
add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
add_stat, cookie);
add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
}
wait_for_flusher_to_settle(h, h1);
- checkeq(num_keys, get_int_stat(h, h1, "ep_total_persisted"),
- "Expected ep_total_persisted equals 4");
+
+ std::stringstream error1, error2;
+ error1 << "Expected ep_total_persisted >= num_keys (" << num_keys << ")";
+ error2 << "Expected ep_total_persisted <= num_sets*num_keys ("
+ << num_sets*num_keys << ")";
+
+ // The flusher could of ran > 1 times. We can only assert
+ // that we persisted between num_keys and upto num_keys*num_sets
+ check(get_int_stat(h, h1, "ep_total_persisted") >= num_keys,
+ error1.str().c_str());
+ check(get_int_stat(h, h1, "ep_total_persisted") <= num_sets*num_keys,
+ error2.str().c_str());
return SUCCESS;
}
h1->release(h, NULL, i);
}
+ wait_for_str_stat_to_be(h, h1, "ep_dcp_pending_notifications", "false", NULL);
// Should get a stream end
dcp_step(h, h1, cookie);
check(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_STREAM_END,