opaqueCounter(0),
processTaskId(0),
itemsToProcess(false),
- lastNoopTime(ep_current_time()),
+ lastMessageTime(ep_current_time()),
backoffs(0),
processBufferedMessagesYieldThreshold(engine.getConfiguration().
getDcpConsumerProcessBufferedMessagesYieldLimit()),
ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
uint32_t flags) {
+ lastMessageTime = ep_current_time();
LockHolder lh(readyMutex);
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
streams[vbucket].reset();
return ENGINE_DISCONNECT;
ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque, uint16_t vbucket,
uint32_t flags) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
uint64_t bySeqno, uint64_t revSeqno,
uint32_t exptime, uint8_t nru,
const void* meta, uint16_t nmeta) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
uint16_t vbucket, uint64_t bySeqno,
uint64_t revSeqno, const void* meta,
uint16_t nmeta) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
uint16_t vbucket, uint64_t bySeqno,
uint64_t revSeqno, const void* meta,
uint16_t nmeta) {
+ // lastMessageTime is set in deletion function
return deletion(opaque, key, nkey, cas, vbucket, bySeqno, revSeqno, meta,
nmeta);
}
uint64_t start_seqno,
uint64_t end_seqno,
uint32_t flags) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
}
ENGINE_ERROR_CODE DcpConsumer::noop(uint32_t opaque) {
- lastNoopTime = ep_current_time();
+ lastMessageTime = ep_current_time();
return ENGINE_SUCCESS;
}
ENGINE_ERROR_CODE DcpConsumer::flush(uint32_t opaque, uint16_t vbucket) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
ENGINE_ERROR_CODE DcpConsumer::setVBucketState(uint32_t opaque,
uint16_t vbucket,
vbucket_state_t state) {
+ lastMessageTime = ep_current_time();
if (doDisconnect()) {
return ENGINE_DISCONNECT;
}
return ret;
}
- if ((ep_current_time() - lastNoopTime) > (noopInterval * 2)) {
- LOG(EXTENSION_LOG_WARNING, "%s Disconnecting because noop message has "
- "not been received for %u seconds", logHeader(), (noopInterval * 2));
+ if ((ep_current_time() - lastMessageTime) > (noopInterval * 2)) {
+ LOG(EXTENSION_LOG_WARNING, "%s Disconnecting because a message has "
+ "not been received for %u seconds. lastMessageTime was %u seconds ago.",
+ logHeader(), (noopInterval * 2), (ep_current_time() - lastMessageTime));
return ENGINE_DISCONNECT;
}