{
if (resp) {
readyQ.push(resp);
- readyQueueMemory += resp->getMessageSize();
+ readyQueueMemory.fetch_add(resp->getMessageSize(),
- memory_order_relaxed);
++ std::memory_order_relaxed);
}
}
uint32_t respSize = readyQ.front()->getMessageSize();
readyQ.pop();
/* Decrement the readyQ size */
- if ((readyQueueMemory - respSize) <= readyQueueMemory) {
- readyQueueMemory -= respSize;
- if (respSize <= readyQueueMemory.load(memory_order_relaxed)) {
- readyQueueMemory.fetch_sub(respSize, memory_order_relaxed);
++ if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
++ readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
} else {
LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
"underflow, likely wrong stat calculation! curr size: %llu;"
- "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory,
- "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory.load(),
-- respSize);
- readyQueueMemory = 0;
- readyQueueMemory.store(0, memory_order_relaxed);
++ "new size: %d", name_.c_str(), getVBucket(),
++ readyQueueMemory.load(std::memory_order_relaxed), respSize);
++ readyQueueMemory.store(0, std::memory_order_relaxed);
}
}
}
uint64_t Stream::getReadyQueueMemory() {
- return readyQueueMemory;
- return readyQueueMemory.load(memory_order_relaxed);
++ return readyQueueMemory.load(std::memory_order_relaxed);
}
const char * Stream::stateName(stream_state_t st) const {
{
LockHolder lh(streamMutex);
LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
- " from disk, last seqno read: %ld", producer->logHeader(), vb_,
- itemsFromBackfill, lastReadSeqno.load());
+ " from disk %d from memory, last seqno read: %llu",
+ producer->logHeader(), vb_, backfillItems.disk.load(),
- backfillItems.memory.load(), lastReadSeqno);
++ backfillItems.memory.load(), lastReadSeqno.load());
}
isBackfillTaskRunning.store(false);
}
if (!isBackfillTaskRunning && readyQ.empty()) {
- backfillRemaining = 0;
- if (lastReadSeqno >= end_seqno_) {
- backfillRemaining.store(0, memory_order_relaxed);
++ backfillRemaining.store(0, std::memory_order_relaxed);
+ if (lastReadSeqno.load() >= end_seqno_) {
endStream(END_STREAM_OK);
} else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
transitionState(STREAM_TAKEOVER_SEND);
void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
Stream::addStats(add_stat, c);
- const int bsize = 128;
+ const int bsize = 1024;
char buffer[bsize];
- snprintf(buffer, bsize, "%s:stream_%d_backfilled", name_.c_str(), vb_);
- add_casted_stat(buffer, itemsFromBackfill, add_stat, c);
- snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
- add_casted_stat(buffer, itemsFromMemory, add_stat, c);
+ snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
+ name_.c_str(), vb_);
+ add_casted_stat(buffer, backfillItems.disk, add_stat, c);
+ snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
+ name_.c_str(), vb_);
+ add_casted_stat(buffer, backfillItems.memory, add_stat, c);
+ snprintf(buffer, bsize, "%s:stream_%d_backfill_sent", name_.c_str(), vb_);
+ add_casted_stat(buffer, backfillItems.sent, add_stat, c);
+ snprintf(buffer, bsize, "%s:stream_%d_memory_phase", name_.c_str(), vb_);
+ add_casted_stat(buffer, itemsFromMemoryPhase, add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
- add_casted_stat(buffer, lastSentSeqno, add_stat, c);
+ add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
+ snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
+ name_.c_str(), vb_);
+ add_casted_stat(buffer,
+ lastSentSnapEndSeqno.load(std::memory_order_relaxed),
+ add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
- add_casted_stat(buffer, lastReadSeqno, add_stat, c);
+ add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
}
transitionState(STREAM_DEAD);
- LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
- " from disk, %llu items sent from memory, %llu was last seqno sent"
- " %s is the reason", producer->logHeader(), vb_, itemsFromBackfill,
- itemsFromMemory.load(), lastSentSeqno.load(),
+ LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Stream closing, "
+ "%" PRIu64 " items sent from backfill phase, %" PRIu64 " items "
+ "sent from memory phase, %" PRIu64 " was last seqno sent, "
+ "reason: %s", producer->logHeader(), vb_,
+ uint64_t(backfillItems.sent.load()),
- uint64_t(itemsFromMemoryPhase), lastSentSeqno,
++ uint64_t(itemsFromMemoryPhase), lastSentSeqno.load(),
getEndStreamStatusStr(reason));
}
}
const static uint64_t dcpMaxSeqno;
private:
- /* This tracks the memory occupied by elements in the readyQ */
- uint64_t readyQueueMemory;
+ /* readyQueueMemory tracks the memory occupied by elements
+ * in the readyQ. It is an atomic because otherwise
+ getReadyQueueMemory would need to acquire streamMutex.
+ */
+ AtomicValue <uint64_t> readyQueueMemory;
};
-typedef RCPtr<Stream> stream_t;
class ActiveStreamCheckpointProcessorTask;
uint64_t curChkSeqno;
//! The current vbucket state to send in the takeover stream
vbucket_state_t takeoverState;
- //! The amount of items remaining to be read from disk
- size_t backfillRemaining;
+ /* backfillRemaining is a stat recording the amount of
+ * items remaining to be read from disk. It is an atomic
+ * because otherwise the function incrBackfillRemaining
+ * must acquire the streamMutex lock.
+ */
+ AtomicValue <size_t> backfillRemaining;
-
- //! The amount of items that have been read from disk
- size_t itemsFromBackfill;
- //! The amount of items that have been read from memory
- AtomicValue<size_t> itemsFromMemory;
+ //! Stats to track items read and sent from the backfill phase
+ struct {
+ AtomicValue<size_t> memory;
+ AtomicValue<size_t> disk;
+ AtomicValue<size_t> sent;
+ } backfillItems;
+ //! The amount of items that have been sent during the memory phase
+ size_t itemsFromMemoryPhase;
//! Whether ot not this is the first snapshot marker sent
bool firstMarkerSent;