LOG(EXTENSION_LOG_INFO,
"Checkpoint %" PRIu64 " for vbucket %d is purged from memory",
checkpointId, vbucketId);
- stats.memOverhead.fetch_sub(memorySize());
- if (stats.memOverhead.load() >= GIGANTOR) {
+ stats.memOverhead->fetch_sub(memorySize());
+ if (stats.memOverhead->load() >= GIGANTOR) {
LOG(EXTENSION_LOG_WARNING,
"Checkpoint::~Checkpoint: stats.memOverhead (which is %" PRId64
- ") is greater than %" PRId64, uint64_t(stats.memOverhead.load()),
+ ") is greater than %" PRId64, uint64_t(stats.memOverhead->load()),
uint64_t(GIGANTOR));
}
}
size_t newEntrySize = qi->getNKey() + sizeof(index_entry) +
sizeof(queued_item);
memOverhead += newEntrySize;
- stats.memOverhead.fetch_add(newEntrySize);
- if (stats.memOverhead.load() >= GIGANTOR) {
+ stats.memOverhead->fetch_add(newEntrySize);
+ if (stats.memOverhead->load() >= GIGANTOR) {
LOG(EXTENSION_LOG_WARNING,
"Checkpoint::queueDirty: stats.memOverhead (which is %" PRId64
- ") is greater than %" PRId64, uint64_t(stats.memOverhead.load()),
+ ") is greater than %" PRId64, uint64_t(stats.memOverhead->load()),
uint64_t(GIGANTOR));
}
}
setSnapshotStartSeqno(getLowSeqno());
memOverhead += newEntryMemOverhead;
- stats.memOverhead.fetch_add(newEntryMemOverhead);
+ stats.memOverhead->fetch_add(newEntryMemOverhead);
LOG(EXTENSION_LOG_WARNING,
"Checkpoint::mergePrevCheckpoint: stats.memOverhead (which is %" PRId64
- ") is greater than %" PRId64, uint64_t(stats.memOverhead.load()),
+ ") is greater than %" PRId64, uint64_t(stats.memOverhead->load()),
uint64_t(GIGANTOR));
return numNewItems;
}
numMetaItems(0),
memOverhead(0),
effectiveMemUsage(0) {
- stats.memOverhead.fetch_add(memorySize());
- if (stats.memOverhead.load() >= GIGANTOR) {
+ stats.memOverhead->fetch_add(memorySize());
+ if (stats.memOverhead->load() >= GIGANTOR) {
LOG(EXTENSION_LOG_WARNING,
"Checkpoint::Checkpoint: stats.memOverhead (which is %" PRId64
- ") is greater than %" PRId64, uint64_t(stats.memOverhead.load()),
+ ") is greater than %" PRId64, uint64_t(stats.memOverhead->load()),
uint64_t(GIGANTOR));
}
}
size_t num_vbs = config.getMaxVbuckets();
vb_mutexes = new Mutex[num_vbs];
- stats.memOverhead = sizeof(EventuallyPersistentStore);
+ *stats.memOverhead = sizeof(EventuallyPersistentStore);
if (config.getConflictResolutionType().compare("lww") == 0) {
conflictResolver.reset(new LastWriteWinsResolution());
if (MemoryTracker::trackingMemoryAllocations()) {
engine->getEpStats().memoryTrackerEnabled.store(true);
- engine->getEpStats().totalMemory.store(inital_tracking->load());
+ engine->getEpStats().totalMemory->store(inital_tracking->load());
}
delete inital_tracking;
*/
ENGINE_ERROR_CODE memoryCondition() {
// Do we think it's possible we could free something?
- bool haveEvidenceWeCanFreeMemory(stats.getMaxDataSize() > stats.memOverhead);
+ bool haveEvidenceWeCanFreeMemory =
+ (stats.getMaxDataSize() > stats.memOverhead->load());
if (haveEvidenceWeCanFreeMemory) {
// Look for more evidence by seeing if we have resident items.
VBucketCountVisitor countVisitor(*this, vbucket_state_active);
EventuallyPersistentEngine *engine = th->get();
if (verifyEngine(engine)) {
EPStats &stats = engine->getEpStats();
- stats.memOverhead.fetch_add(pItem->size() - pItem->getValMemSize());
- stats.numItem++;
+ stats.memOverhead->fetch_add(pItem->size() - pItem->getValMemSize());
+ ++(*stats.numItem);
}
}
EventuallyPersistentEngine *engine = th->get();
if (verifyEngine(engine)) {
EPStats &stats = engine->getEpStats();
- stats.memOverhead.fetch_sub(pItem->size() - pItem->getValMemSize());
- stats.numItem--;
+ stats.memOverhead->fetch_sub(pItem->size() - pItem->getValMemSize());
+ ++(*stats.numItem);
}
}
return false;
}
EPStats &stats = engine->getEpStats();
- stats.totalMemory.fetch_add(mem);
+ stats.totalMemory->fetch_add(mem);
return true;
}
return false;
}
EPStats &stats = engine->getEpStats();
- stats.totalMemory.fetch_sub(mem);
+ stats.totalMemory->fetch_sub(mem);
return true;
}
#include <map>
#include "atomic.h"
+#include <platform/cacheline_padded.h>
#include <platform/histogram.h>
#include <platform/processclock.h>
#include "memory_tracker.h"
size_t getTotalMemoryUsed() {
if (memoryTrackerEnabled.load()) {
- return totalMemory.load();
+ return totalMemory->load();
}
- return currentSize.load() + memOverhead.load();
+ return currentSize.load() + memOverhead->load();
}
bool decrDiskQueueSize(size_t decrementBy) {
//! Total size of StoredVal memory overhead
AtomicValue<size_t> storedValOverhead;
//! Amount of memory used to track items and what-not.
- AtomicValue<size_t> memOverhead;
+ cb::CachelinePadded<AtomicValue<size_t>> memOverhead;
//! Total number of Item objects
- AtomicValue<size_t> numItem;
+ cb::CachelinePadded<AtomicValue<size_t>> numItem;
//! The total amount of memory used by this bucket (From memory tracking)
- AtomicValue<size_t> totalMemory;
+ cb::CachelinePadded<AtomicValue<size_t>> totalMemory;
//! True if the memory usage tracker is enabled.
AtomicValue<bool> memoryTrackerEnabled;
//! Whether or not to force engine shutdown.
return;
}
- stats.memOverhead.fetch_sub(memorySize());
+ stats.memOverhead->fetch_sub(memorySize());
++numResizes;
// Set the new size so all the hashy stuff works.
cb_free(values);
values = newValues;
- stats.memOverhead.fetch_add(memorySize());
+ stats.memOverhead->fetch_add(memorySize());
}
static size_t distance(size_t a, size_t b) {
mem_overhead += (ackLog_.size() * sizeof(TapLogElement));
ackLog_.clear();
- stats.memOverhead.fetch_sub(mem_overhead);
+ stats.memOverhead->fetch_sub(mem_overhead);
logger.log(EXTENSION_LOG_WARNING, "Clear the tap queues by force");
}
++ackLogSize;
}
- stats.memOverhead.fetch_sub(ackLogSize * sizeof(TapLogElement));
+ stats.memOverhead->fetch_sub(ackLogSize * sizeof(TapLogElement));
seqnoReceived = seqno - 1;
seqnoAckRequested = seqno - 1;
ret = ENGINE_DISCONNECT;
}
- stats.memOverhead.fetch_sub(num_logs * sizeof(TapLogElement));
+ stats.memOverhead->fetch_sub(num_logs * sizeof(TapLogElement));
return ret;
}
if (it != checkpointState_.end()) {
++(it->second.bgResultSize);
}
- stats.memOverhead.fetch_add(sizeof(Item *));
+ stats.memOverhead->fetch_add(sizeof(Item *));
} else {
delete itm;
}
--(it->second.bgResultSize);
}
- stats.memOverhead.fetch_sub(sizeof(Item *));
+ stats.memOverhead->fetch_sub(sizeof(Item *));
return rv;
}
} else {
queueMemSize.store(0);
}
- stats.memOverhead.fetch_sub(sizeof(queued_item));
+ stats.memOverhead->fetch_sub(sizeof(queued_item));
++recordsFetched;
return qi;
}
queue->push_back(it);
++queueSize;
queueMemSize.fetch_add(sizeof(queued_item));
- stats.memOverhead.fetch_add(sizeof(queued_item));
+ stats.memOverhead->fetch_add(sizeof(queued_item));
return wasEmpty;
} else {
return queue->empty();
if (supportsAck()) {
TapLogElement log(seqno, qi);
ackLog_.push_back(log);
- stats.memOverhead.fetch_add(sizeof(TapLogElement));
+ stats.memOverhead->fetch_add(sizeof(TapLogElement));
}
}
// add to the log!
TapLogElement log(seqno, e);
ackLog_.push_back(log);
- stats.memOverhead.fetch_add(sizeof(TapLogElement));
+ stats.memOverhead->fetch_add(sizeof(TapLogElement));
}
}
// Clear out the bloomfilter(s)
clearFilter();
- stats.memOverhead.fetch_sub(sizeof(VBucket) + ht.memorySize() +
+ stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
sizeof(CheckpointManager));
LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
{
backfill.isBackfillPhase = false;
pendingOpsStart = 0;
- stats.memOverhead.fetch_add(sizeof(VBucket)
+ stats.memOverhead->fetch_add(sizeof(VBucket)
+ ht.memorySize() + sizeof(CheckpointManager));
LOG(EXTENSION_LOG_NOTICE,
"VBucket: created vbucket:%" PRIu16 " with state:%s "
++stats.diskQueueSize;
++stats.totalEnqueued;
doStatsForQueueing(*qi, qi->size());
- stats.memOverhead.fetch_add(sizeof(queued_item));
+ stats.memOverhead->fetch_add(sizeof(queued_item));
return true;
}
void getBackfillItems(std::vector<queued_item> &items) {
items.push_back(backfill.items.front());
backfill.items.pop();
}
- stats.memOverhead.fetch_sub(num_items * sizeof(queued_item));
+ stats.memOverhead->fetch_sub(num_items * sizeof(queued_item));
}
bool isBackfillPhase() {
LockHolder lh(backfill.mutex);