}
DcpResponse* ActiveStream::deadPhase() {
- return nextQueuedItem();
+ DcpResponse* resp = nextQueuedItem();
+ if (!resp) {
+ LOG(EXTENSION_LOG_WARNING,
+ "(vb %" PRIu16 ") Stream closed, "
+ "%" PRIu64 " items sent from backfill phase, "
+ "%" PRIu64 " items sent from memory phase, "
+ "%" PRIu64 " was last seqno sent",
+ vb_,
+ uint64_t(itemsFromBackfill),
+ uint64_t(itemsFromMemory),
+ lastSentSeqno.load());
+ }
+ return resp;
}
void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
}
transitionState(STREAM_DEAD);
- LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
- " from disk, %llu items sent from memory, %llu was last seqno sent"
- " %s is the reason", producer->logHeader(), vb_, itemsFromBackfill,
- itemsFromMemory.load(), lastSentSeqno.load(),
+ LOG(EXTENSION_LOG_WARNING,
+ "(vb %" PRIu16 ") Stream closing, "
+ "sent until seqno %" PRIu64 " "
+ "reason: %s",
+ vb_,
+ lastSentSeqno.load(),
getEndStreamStatusStr(reason));
}
}