DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
const std::string &name)
: Consumer(engine, cookie, name), opaqueCounter(0), processTaskId(0),
- itemsToProcess(false), lastNoopTime(ep_current_time()), backoffs(0) {
+ itemsToProcess(false), lastMessageTime(ep_current_time()), backoffs(0) {
Configuration& config = engine.getConfiguration();
streams = new passive_stream_t[config.getMaxVbuckets()];
setSupportAck(false);
ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
uint32_t flags) {
+ lastMessageTime = ep_current_time();
LockHolder lh(readyMutex);
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
streams[vbucket].reset();
return ENGINE_DISCONNECT;
ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque, uint16_t vbucket,
uint32_t flags) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
uint64_t bySeqno, uint64_t revSeqno,
uint32_t exptime, uint8_t nru,
const void* meta, uint16_t nmeta) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
uint16_t vbucket, uint64_t bySeqno,
uint64_t revSeqno, const void* meta,
uint16_t nmeta) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
uint16_t vbucket, uint64_t bySeqno,
uint64_t revSeqno, const void* meta,
uint16_t nmeta) {
+ // lastMessageTime is set in deletion function
return deletion(opaque, key, nkey, cas, vbucket, bySeqno, revSeqno, meta,
nmeta);
}
uint64_t start_seqno,
uint64_t end_seqno,
uint32_t flags) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
}
ENGINE_ERROR_CODE DcpConsumer::noop(uint32_t opaque) {
- lastNoopTime = ep_current_time();
+ lastMessageTime = ep_current_time();
return ENGINE_SUCCESS;
}
ENGINE_ERROR_CODE DcpConsumer::flush(uint32_t opaque, uint16_t vbucket) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
ENGINE_ERROR_CODE DcpConsumer::setVBucketState(uint32_t opaque,
uint16_t vbucket,
vbucket_state_t state) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
return ret;
}
- if ((ep_current_time() - lastNoopTime) > (noopInterval * 2)) {
- LOG(EXTENSION_LOG_WARNING, "%s Disconnecting because noop message has "
- "no been received for %u seconds", logHeader(), (noopInterval * 2));
+ if ((ep_current_time() - lastMessageTime) > (noopInterval * 2)) {
+ LOG(EXTENSION_LOG_WARNING, "%s Disconnecting because a message has "
+ "not been received for %u seconds. lastMessageTime was %u seconds ago.",
+ logHeader(), (noopInterval * 2), (ep_current_time() - lastMessageTime));
return ENGINE_DISCONNECT;
}