{
if (resp) {
readyQ.push(resp);
- readyQueueMemory += resp->getMessageSize();
+ readyQueueMemory.fetch_add(resp->getMessageSize(),
+ memory_order_relaxed);
}
}
uint32_t respSize = readyQ.front()->getMessageSize();
readyQ.pop();
/* Decrement the readyQ size */
- if ((readyQueueMemory - respSize) <= readyQueueMemory) {
- readyQueueMemory -= respSize;
+ if (respSize <= readyQueueMemory.load(memory_order_relaxed)) {
+ readyQueueMemory.fetch_sub(respSize, memory_order_relaxed);
} else {
LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
"underflow, likely wrong stat calculation! curr size: %llu;"
- "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory,
+ "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory.load(),
respSize);
- readyQueueMemory = 0;
+ readyQueueMemory.store(0, memory_order_relaxed);
}
}
}
uint64_t Stream::getReadyQueueMemory() {
- return readyQueueMemory;
+ return readyQueueMemory.load(memory_order_relaxed);
}
const char * Stream::stateName(stream_state_t st) const {
const static uint64_t dcpMaxSeqno;
private:
- /* This tracks the memory occupied by elements in the readyQ */
- uint64_t readyQueueMemory;
+ /* readyQueueMemory tracks the memory occupied by elements
+ * in the readyQ. It is an atomic because otherwise
+ getReadyQueueMemory would need to acquire streamMutex.
+ */
+ AtomicValue <uint64_t> readyQueueMemory;
};
typedef RCPtr<Stream> stream_t;