LockHolder lh(streamMutex);
LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
" from disk, last seqno read: %ld", producer->logHeader(), vb_,
- itemsFromBackfill, lastReadSeqno);
+ itemsFromBackfill, lastReadSeqno.load());
}
isBackfillTaskRunning.store(false);
RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Vbucket marked as "
"dead, last sent seqno: %" PRIu64 ", high seqno: %" PRIu64 "",
- producer->logHeader(), vb_, lastSentSeqno,
+ producer->logHeader(), vb_, lastSentSeqno.load(),
vbucket->getHighSeqno());
} else {
LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
}
if (!isBackfillTaskRunning && readyQ.empty()) {
- backfillRemaining = 0;
- if (lastReadSeqno >= end_seqno_) {
+ backfillRemaining.store(0, memory_order_relaxed);
+ if (lastReadSeqno.load() >= end_seqno_) {
endStream(END_STREAM_OK);
} else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
transitionState(STREAM_TAKEOVER_SEND);
}
DcpResponse* ActiveStream::inMemoryPhase() {
- if (lastSentSeqno >= end_seqno_) {
+ if (lastSentSeqno.load() >= end_seqno_) {
endStream(END_STREAM_OK);
} else if (readyQ.empty()) {
if (nextCheckpointItem()) {
snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
add_casted_stat(buffer, itemsFromMemory, add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
- add_casted_stat(buffer, lastSentSeqno, add_stat, c);
+ add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
- add_casted_stat(buffer, lastReadSeqno, add_stat, c);
+ add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
" from disk, %llu items sent from memory, %llu was last seqno sent"
" %s is the reason", producer->logHeader(), vb_, itemsFromBackfill,
- itemsFromMemory, lastSentSeqno, getEndStreamStatusStr(reason));
+ itemsFromMemory.load(), lastSentSeqno.load(),
+ getEndStreamStatusStr(reason));
}
}
uint64_t high_seqno = vbucket->getHighSeqno();
if (end_seqno_ < high_seqno) {
- if (end_seqno_ > lastSentSeqno) {
- return (end_seqno_ - lastSentSeqno);
+ if (end_seqno_ > lastSentSeqno.load()) {
+ return (end_seqno_ - lastSentSeqno.load());
}
} else {
- if (high_seqno > lastSentSeqno) {
- return (high_seqno - lastSentSeqno);
+ if (high_seqno > lastSentSeqno.load()) {
+ return (high_seqno - lastSentSeqno.load());
}
}
void setVBucketStateAckRecieved();
void incrBackfillRemaining(size_t by) {
- backfillRemaining += by;
+ backfillRemaining.fetch_add(by);
}
void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
bool isCurrentSnapshotCompleted() const;
//! The last sequence number queued from disk or memory
- uint64_t lastReadSeqno;
+ AtomicValue<uint64_t> lastReadSeqno;
//! The last sequence number sent to the network layer
- uint64_t lastSentSeqno;
+ AtomicValue<uint64_t> lastSentSeqno;
//! The last known seqno pointed to by the checkpoint cursor
uint64_t curChkSeqno;
//! The current vbucket state to send in the takeover stream
vbucket_state_t takeoverState;
- //! The amount of items remaining to be read from disk
- size_t backfillRemaining;
+ /* backfillRemaining is a stat recording the amount of
+ * items remaining to be read from disk. It is an atomic
+ * because otherwise the function incrBackfillRemaining
+ * must acquire the streamMutex lock.
+ */
+ AtomicValue <size_t> backfillRemaining;
//! The amount of items that have been read from disk
size_t itemsFromBackfill;
//! The amount of items that have been read from memory
- size_t itemsFromMemory;
+ AtomicValue<size_t> itemsFromMemory;
//! Whether ot not this is the first snapshot marker sent
bool firstMarkerSent;