****Per Stream Stats
-| backfill_disk_items | The amount of items read during backfill from disk |
-| backfill_mem_items | The amount of items read during backfill from memory |
-| backfill_sent | The amount of items sent to the consumer during the |
-| end_seqno | The seqno send mutations up to |
-| flags | The flags supplied in the stream request |
-| items_ready | Whether the stream has items ready to send |
-| last_sent_seqno | The last seqno sent by this stream |
-| last_read_seqno | The last seqno read by this stream from disk or memory|
-| ready_queue_memory | Memory occupied by elements in the DCP readyQ |
-| memory_phase | The amount of items sent during the memory phase |
-| opaque | The unique stream identifier |
-| snap_end_seqno | The last snapshot end seqno (Used if a consumer is |
-| | resuming a stream) |
-| snap_start_seqno | The last snapshot start seqno (Used if a consumer is |
-| | resuming a stream) |
-| start_seqno | The seqno to start sending mutations from |
-| state | The stream state (pending, backfilling, in-memory, |
-| | takeover-send, takeover-wait, or dead) |
-| vb_uuid | The vb uuid used in the stream request |
-| cur_snapshot_type | The type of the current snapshot being received |
-| cur_snapshot_start | The start seqno of the current snapshot being |
-| | received |
-| cur_snapshot_end | The end seqno of the current snapshot being received |
+| backfill_disk_items | The amount of items read during backfill from disk |
+| backfill_mem_items | The amount of items read during backfill from memory |
+| backfill_sent | The amount of items sent to the consumer during the |
+| end_seqno | The seqno send mutations up to |
+| flags | The flags supplied in the stream request |
+| items_ready | Whether the stream has items ready to send |
+| last_sent_seqno | The last seqno sent by this stream |
+| last_sent_snap_end_seqno | The last snapshot end seqno sent by active stream |
+| last_read_seqno | The last seqno read by this stream from disk or memory|
+| ready_queue_memory | Memory occupied by elements in the DCP readyQ |
+| memory_phase | The amount of items sent during the memory phase |
+| opaque | The unique stream identifier |
+| snap_end_seqno | The last snapshot end seqno (Used if a consumer is |
+| | resuming a stream) |
+| snap_start_seqno | The last snapshot start seqno (Used if a consumer is |
+| | resuming a stream) |
+| start_seqno | The seqno to start sending mutations from |
+| state | The stream state (pending, backfilling, in-memory, |
+| | takeover-send, takeover-wait, or dead) |
+| vb_uuid | The vb uuid used in the stream request |
+| cur_snapshot_type | The type of the current snapshot being received |
+| cur_snapshot_start | The start seqno of the current snapshot being |
+| | received |
+| cur_snapshot_end | The end seqno of the current snapshot being received |
** Dcp Aggregated Stats
if (end_seqno_ > endSeqno) {
/* We possibly have items in the open checkpoint
(incomplete snapshot) */
- LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Merging backfill"
- " and memory snapshot for a replica vbucket, start seqno "
- "%" PRIu64 " and end seqno %" PRIu64 "",
- producer->logHeader(), vb_, startSeqno, endSeqno);
- endSeqno = end_seqno_;
+ snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
+ LOG(EXTENSION_LOG_WARNING,
+ "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
+ "replica vbucket, backfill start seqno %" PRIu64 ", "
+ "backfill end seqno %" PRIu64 ", "
+ "snapshot end seqno after merge %" PRIu64,
+ vb_, startSeqno, endSeqno, info.range.end);
+ endSeqno = info.range.end;
}
}
endSeqno);
pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
MARKER_FLAG_DISK));
- lastSentSnapEndSeqno = endSeqno;
+ lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
if (!vb) {
endStream(END_STREAM_STATE);
} else {
- if (endSeqno > end_seqno_) {
- chkCursorSeqno = end_seqno_;
- }
// Only re-register the cursor if we still need to get memory snapshots
CursorRegResult result =
vb->checkpointManager.registerCursorBySeqno(name_, chkCursorSeqno);
add_casted_stat(buffer, itemsFromMemoryPhase, add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
add_casted_stat(buffer, lastSentSeqno, add_stat, c);
+ snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
+ name_.c_str(), vb_);
+ add_casted_stat(buffer,
+ lastSentSnapEndSeqno.load(std::memory_order_relaxed),
+ add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
add_casted_stat(buffer, lastReadSeqno, add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
}
pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
flags));
- lastSentSnapEndSeqno = snapEnd;
+ lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
}
std::deque<MutationResponse*>::iterator itemItr;
// An atomic read of vbucket state without acquiring the
// reader lock for state should suffice here.
if (vbucket && vbucket->getState() == vbucket_state_replica) {
- if (lastSentSnapEndSeqno >= lastReadSeqno) {
+ if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
+ lastReadSeqno) {
return false;
}
}