****Per Stream Stats
-| backfilled | The amount of items sent from disk |
-| end_seqno | The seqno send mutations up to |
-| flags | The flags supplied in the stream request |
-| items_ready | Whether the stream has items ready to send |
-| last_sent_seqno | The last seqno sent by this stream |
-| last_read_seqno | The last seqno read by this stream from disk or memory |
-| ready_queue_memory | Memory occupied by elements in the DCP readyQ |
-| memory | The amount of items sent from memory |
-| opaque | The unique stream identifier |
-| snap_end_seqno | The last snapshot end seqno (Used if a consumer is |
-| | resuming a stream) |
-| snap_start_seqno | The last snapshot start seqno (Used if a consumer is |
-| | resuming a stream) |
-| start_seqno | The seqno to start sending mutations from |
-| state | The stream state (pending, backfilling, in-memory, |
-| | takeover-send, takeover-wait, or dead) |
-| vb_uuid | The vb uuid used in the stream request |
-| cur_snapshot_type | The type of the current snapshot being received |
-| cur_snapshot_start | The start seqno of the current snapshot being received |
-| cur_snapshot_end | The end seqno of the current snapshot being received |
+| backfilled | The amount of items sent from disk |
+| end_seqno | The seqno send mutations up to |
+| flags | The flags supplied in the stream request |
+| items_ready | Whether the stream has items ready to send |
+| last_sent_seqno | The last seqno sent by this stream |
+| last_sent_snap_end_seqno | The last snapshot end seqno sent by active stream |
+| last_read_seqno | The last seqno read by this stream from disk or memory |
+| ready_queue_memory | Memory occupied by elements in the DCP readyQ |
+| memory | The amount of items sent from memory |
+| opaque | The unique stream identifier |
+| snap_end_seqno | The last snapshot end seqno (Used if a consumer is |
+| | resuming a stream) |
+| snap_start_seqno | The last snapshot start seqno (Used if a consumer is |
+| | resuming a stream) |
+| start_seqno | The seqno to start sending mutations from |
+| state | The stream state (pending, backfilling, in-memory, |
+| | takeover-send, takeover-wait, or dead) |
+| vb_uuid | The vb uuid used in the stream request |
+| cur_snapshot_type | The type of the current snapshot being received |
+| cur_snapshot_start | The start seqno of the current snapshot being received |
+| cur_snapshot_end | The end seqno of the current snapshot being received |
** Dcp Aggregated Stats
if (end_seqno_ > endSeqno) {
/* We possibly have items in the open checkpoint
(incomplete snapshot) */
- LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Merging backfill"
- " and memory snapshot for a replica vbucket, start seqno "
- "%" PRIu64 " and end seqno %" PRIu64 "",
- producer->logHeader(), vb_, startSeqno, endSeqno);
- endSeqno = end_seqno_;
+ uint64_t snapshot_start, snapshot_end;
+ vb->getCurrentSnapshot(snapshot_start, snapshot_end);
+ LOG(EXTENSION_LOG_WARNING,
+ "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
+ "replica vbucket, backfill start seqno %" PRIu64 ", "
+ "backfill end seqno %" PRIu64 ", "
+ "snapshot end seqno after merge %" PRIu64,
+ vb_, startSeqno, endSeqno, snapshot_end);
+ endSeqno = snapshot_end;
}
}
endSeqno);
pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
MARKER_FLAG_DISK));
- lastSentSnapEndSeqno = endSeqno;
+ lastSentSnapEndSeqno.store(endSeqno);
if (!vb) {
endStream(END_STREAM_STATE);
} else {
- if (endSeqno > end_seqno_) {
- chkCursorSeqno = end_seqno_;
- }
// Only re-register the cursor if we still need to get memory snapshots
CursorRegResult result =
vb->checkpointManager.registerTAPCursorBySeqno(name_,
add_casted_stat(buffer, itemsFromMemory, add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
+ snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
+ name_.c_str(), vb_);
+ add_casted_stat(buffer, lastSentSnapEndSeqno.load(), add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
}
pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
flags));
- lastSentSnapEndSeqno = snapEnd;
+ lastSentSnapEndSeqno.store(snapEnd);
}
std::deque<MutationResponse*>::iterator itemItr;
// An atomic read of vbucket state without acquiring the
// reader lock for state should suffice here.
if (vbucket && vbucket->getState() == vbucket_state_replica) {
- if (lastSentSnapEndSeqno >= lastReadSeqno) {
+ if (lastSentSnapEndSeqno.load() >= lastReadSeqno) {
return false;
}
}