const std::string &name, bool isNotifier)
: Producer(e, cookie, name), rejectResp(NULL),
notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(*this),
- vbReady(e.getConfiguration().getMaxVbuckets()),
+ vbReady(e.getConfiguration().getMaxVbuckets()), notifiedVbReady(false),
itemsSent(0), totalBytesSent(0), roundRobinVbReady(0) {
setSupportAck(true);
setReserved(true);
static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
}
vbReady[vbucket].store(true);
+ bool inverse = false;
+ if (notifiedVbReady.compare_exchange_strong(inverse, true)) {
+ log.unpauseIfSpaceAvailable();
+ }
if (add_vb_conn_map) {
connection_t conn(this);
{
WriterLockHolder wlh(streamsMutex);
streams.erase(vbucket);
- vbReady[vbucket].store(false);
}
return ret;
uint16_t vbid = itr->first;
itr->second->setDead(END_STREAM_DISCONNECTED);
streams.erase(vbid);
- vbReady[vbid].store(false);
vblist.push_back(vbid);
}
}
DcpResponse* DcpProducer::getNextItem() {
setPaused(false);
- if (roundRobinVbReady >= vbReady.size()) {
- roundRobinVbReady = 0;
- }
- for (; roundRobinVbReady < vbReady.size(); roundRobinVbReady++) {
-
- if (log.pauseIfFull()) {
- return NULL;
+ bool inverse = true;
+ do {
+ if (roundRobinVbReady >= vbReady.size()) {
+ roundRobinVbReady = 0;
}
+ for (; roundRobinVbReady < vbReady.size(); roundRobinVbReady++) {
- bool expected = true;
- if (vbReady[roundRobinVbReady].compare_exchange_strong(expected, false)) {
- uint16_t vbucket = roundRobinVbReady;
- DcpResponse *op = NULL;
- std::map<uint16_t, stream_t>::iterator it;
- stream_t stream;
- {
- ReaderLockHolder rlh(streamsMutex);
- it = streams.find(vbucket);
- if (it == streams.end()) {
- continue;
- }
- stream.reset(it->second);
+ if (log.pauseIfFull()) {
+ return NULL;
}
- // Return the next operation
- // When an op is returned it is assumed
- // our bufferLog has been updated.
- op = stream->next();
+ bool expected = true;
+ if (vbReady[roundRobinVbReady].compare_exchange_strong(expected, false)) {
+ uint16_t vbucket = roundRobinVbReady;
+ DcpResponse *op = NULL;
+ std::map<uint16_t, stream_t>::iterator it;
+ stream_t stream;
+ {
+ ReaderLockHolder rlh(streamsMutex);
+ it = streams.find(vbucket);
+ if (it == streams.end()) {
+ continue;
+ }
+ stream.reset(it->second);
+ }
- if (!op) {
- continue;
- }
+ // Return the next operation
+ // When an op is returned it is assumed
+ // our bufferLog has been updated.
+ op = stream->next();
- switch (op->getEvent()) {
- case DCP_SNAPSHOT_MARKER:
- case DCP_MUTATION:
- case DCP_DELETION:
- case DCP_EXPIRATION:
- case DCP_STREAM_END:
- case DCP_SET_VBUCKET:
- break;
- default:
- LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to "
- "write an unexpected event %d",
- logHeader(), op->getEvent());
- abort();
- }
+ if (!op) {
+ continue;
+ }
- vbReady[vbucket].store(true);
+ switch (op->getEvent()) {
+ case DCP_SNAPSHOT_MARKER:
+ case DCP_MUTATION:
+ case DCP_DELETION:
+ case DCP_EXPIRATION:
+ case DCP_STREAM_END:
+ case DCP_SET_VBUCKET:
+ break;
+ default:
+ LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to "
+ "write an unexpected event %d",
+ logHeader(), op->getEvent());
+ abort();
+ }
- if (op->getEvent() == DCP_MUTATION ||
- op->getEvent() == DCP_DELETION ||
- op->getEvent() == DCP_EXPIRATION) {
- itemsSent++;
- }
+ vbReady[vbucket].store(true);
+ notifiedVbReady.store(true);
+ ++roundRobinVbReady;
- totalBytesSent.fetch_add(op->getMessageSize());
+ if (op->getEvent() == DCP_MUTATION ||
+ op->getEvent() == DCP_DELETION ||
+ op->getEvent() == DCP_EXPIRATION) {
+ itemsSent++;
+ }
+
+ totalBytesSent.fetch_add(op->getMessageSize());
- return op;
+ return op;
+ }
}
- }
+ } while (notifiedVbReady.compare_exchange_strong(inverse, false));
setPaused(true);
return NULL;
void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
bool expected = false;
- if (vbReady[vbucket].compare_exchange_strong(expected, true)) {
+ if (vbReady[vbucket].compare_exchange_strong(expected, true) &&
+ notifiedVbReady.compare_exchange_strong(expected, true)) {
log.unpauseIfSpaceAvailable();
}
}