MB-21650: Prevent false sharing of frequently modified memory stats 51/69951/8
authorDave Rigby <daver@couchbase.com>
Wed, 16 Nov 2016 17:59:55 +0000 (17:59 +0000)
committerDave Rigby <daver@couchbase.com>
Fri, 18 Nov 2016 13:14:37 +0000 (13:14 +0000)
We record the memory usage, memory overhead and number of Items per
bucket, in the EPStats object. These stats are very frequenly updated
(on every memory allocation/deallocation, and every Item
creation/destruction), and they are updated from all threads. This can
cause false sharing between these values if they co-exist in the same
cache line.

A recent change (66882e8 - MB-20852 [17/N]: Serialize VB state
changes) added a new element to EPStats which resulted in the layout
of that class changing such that memory stat variables (memOverhead,
numItem, totalMemory) ended up on the same cache line. This resulted
in a signficant regression (1.9M op/s -> 1.4M ops) in the performance
of the 'kv_max_ops_reads_512_avg_ops_hera_kv' test, due to false
sharing between these variables.

To address this, ensure that each variable is placed on its own cache
line to prevent false sharing, by using the CachelinePadded template
class.

Change-Id: I8316637a7a0c6fd05fd6f6ba24a1df44c43390c5
Reviewed-on: http://review.couchbase.org/69951
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
Tested-by: buildbot <build@couchbase.com>
12 files changed:
src/checkpoint.cc
src/checkpoint.h
src/ep.cc
src/ep_engine.cc
src/ep_engine.h
src/objectregistry.cc
src/stats.h
src/stored-value.cc
src/tapconnection.cc
src/tapconnection.h
src/vbucket.cc
src/vbucket.h

index f306147..1486af7 100644 (file)
@@ -111,11 +111,11 @@ Checkpoint::~Checkpoint() {
     LOG(EXTENSION_LOG_INFO,
         "Checkpoint %" PRIu64 " for vbucket %d is purged from memory",
         checkpointId, vbucketId);
-    stats.memOverhead.fetch_sub(memorySize());
-    if (stats.memOverhead.load() >= GIGANTOR) {
+    stats.memOverhead->fetch_sub(memorySize());
+    if (stats.memOverhead->load() >= GIGANTOR) {
         LOG(EXTENSION_LOG_WARNING,
             "Checkpoint::~Checkpoint: stats.memOverhead (which is %" PRId64
-            ") is greater than %" PRId64, uint64_t(stats.memOverhead.load()),
+            ") is greater than %" PRId64, uint64_t(stats.memOverhead->load()),
             uint64_t(GIGANTOR));
     }
 }
@@ -249,11 +249,11 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
             size_t newEntrySize = qi->getNKey() + sizeof(index_entry) +
                                   sizeof(queued_item);
             memOverhead += newEntrySize;
-            stats.memOverhead.fetch_add(newEntrySize);
-            if (stats.memOverhead.load() >= GIGANTOR) {
+            stats.memOverhead->fetch_add(newEntrySize);
+            if (stats.memOverhead->load() >= GIGANTOR) {
                 LOG(EXTENSION_LOG_WARNING,
                     "Checkpoint::queueDirty: stats.memOverhead (which is %" PRId64
-                    ") is greater than %" PRId64, uint64_t(stats.memOverhead.load()),
+                    ") is greater than %" PRId64, uint64_t(stats.memOverhead->load()),
                     uint64_t(GIGANTOR));
             }
         }
@@ -365,10 +365,10 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
     setSnapshotStartSeqno(getLowSeqno());
 
     memOverhead += newEntryMemOverhead;
-    stats.memOverhead.fetch_add(newEntryMemOverhead);
+    stats.memOverhead->fetch_add(newEntryMemOverhead);
     LOG(EXTENSION_LOG_WARNING,
         "Checkpoint::mergePrevCheckpoint: stats.memOverhead (which is %" PRId64
-        ") is greater than %" PRId64, uint64_t(stats.memOverhead.load()),
+        ") is greater than %" PRId64, uint64_t(stats.memOverhead->load()),
         uint64_t(GIGANTOR));
     return numNewItems;
 }
index 8f03ee8..0698dd3 100644 (file)
@@ -339,11 +339,11 @@ public:
         numMetaItems(0),
         memOverhead(0),
         effectiveMemUsage(0) {
-        stats.memOverhead.fetch_add(memorySize());
-        if (stats.memOverhead.load() >= GIGANTOR) {
+        stats.memOverhead->fetch_add(memorySize());
+        if (stats.memOverhead->load() >= GIGANTOR) {
             LOG(EXTENSION_LOG_WARNING,
                 "Checkpoint::Checkpoint: stats.memOverhead (which is %" PRId64
-                ") is greater than %" PRId64, uint64_t(stats.memOverhead.load()),
+                ") is greater than %" PRId64, uint64_t(stats.memOverhead->load()),
                 uint64_t(GIGANTOR));
         }
     }
index d48485b..08b1a54 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -304,7 +304,7 @@ EventuallyPersistentStore::EventuallyPersistentStore(
     size_t num_vbs = config.getMaxVbuckets();
     vb_mutexes = new Mutex[num_vbs];
 
-    stats.memOverhead = sizeof(EventuallyPersistentStore);
+    *stats.memOverhead = sizeof(EventuallyPersistentStore);
 
     if (config.getConflictResolutionType().compare("lww") == 0) {
         conflictResolver.reset(new LastWriteWinsResolution());
index 3d9eeb6..358f3b4 100644 (file)
@@ -1906,7 +1906,7 @@ extern "C" {
 
         if (MemoryTracker::trackingMemoryAllocations()) {
             engine->getEpStats().memoryTrackerEnabled.store(true);
-            engine->getEpStats().totalMemory.store(inital_tracking->load());
+            engine->getEpStats().totalMemory->store(inital_tracking->load());
         }
         delete inital_tracking;
 
index 1fea559..d16b735 100644 (file)
@@ -887,7 +887,8 @@ protected:
      */
     ENGINE_ERROR_CODE memoryCondition() {
         // Do we think it's possible we could free something?
-        bool haveEvidenceWeCanFreeMemory(stats.getMaxDataSize() > stats.memOverhead);
+        bool haveEvidenceWeCanFreeMemory =
+            (stats.getMaxDataSize() > stats.memOverhead->load());
         if (haveEvidenceWeCanFreeMemory) {
             // Look for more evidence by seeing if we have resident items.
             VBucketCountVisitor countVisitor(*this, vbucket_state_active);
index dcd79b8..dd5b53b 100644 (file)
@@ -143,8 +143,8 @@ void ObjectRegistry::onCreateItem(const Item *pItem)
    EventuallyPersistentEngine *engine = th->get();
    if (verifyEngine(engine)) {
        EPStats &stats = engine->getEpStats();
-       stats.memOverhead.fetch_add(pItem->size() - pItem->getValMemSize());
-       stats.numItem++;
+       stats.memOverhead->fetch_add(pItem->size() - pItem->getValMemSize());
+       ++(*stats.numItem);
    }
 }
 
@@ -153,8 +153,8 @@ void ObjectRegistry::onDeleteItem(const Item *pItem)
    EventuallyPersistentEngine *engine = th->get();
    if (verifyEngine(engine)) {
        EPStats &stats = engine->getEpStats();
-       stats.memOverhead.fetch_sub(pItem->size() - pItem->getValMemSize());
-       stats.numItem--;
+       stats.memOverhead->fetch_sub(pItem->size() - pItem->getValMemSize());
+       ++(*stats.numItem);
    }
 }
 
@@ -187,7 +187,7 @@ bool ObjectRegistry::memoryAllocated(size_t mem) {
         return false;
     }
     EPStats &stats = engine->getEpStats();
-    stats.totalMemory.fetch_add(mem);
+    stats.totalMemory->fetch_add(mem);
     return true;
 }
 
@@ -200,6 +200,6 @@ bool ObjectRegistry::memoryDeallocated(size_t mem) {
         return false;
     }
     EPStats &stats = engine->getEpStats();
-    stats.totalMemory.fetch_sub(mem);
+    stats.totalMemory->fetch_sub(mem);
     return true;
 }
index c1f5837..77fea5f 100644 (file)
@@ -25,6 +25,7 @@
 #include <map>
 
 #include "atomic.h"
+#include <platform/cacheline_padded.h>
 #include <platform/histogram.h>
 #include <platform/processclock.h>
 #include "memory_tracker.h"
@@ -177,9 +178,9 @@ public:
 
     size_t getTotalMemoryUsed() {
         if (memoryTrackerEnabled.load()) {
-            return totalMemory.load();
+            return totalMemory->load();
         }
-        return currentSize.load() + memOverhead.load();
+        return currentSize.load() + memOverhead->load();
     }
 
     bool decrDiskQueueSize(size_t decrementBy) {
@@ -310,11 +311,11 @@ public:
     //! Total size of StoredVal memory overhead
     AtomicValue<size_t> storedValOverhead;
     //! Amount of memory used to track items and what-not.
-    AtomicValue<size_t> memOverhead;
+    cb::CachelinePadded<AtomicValue<size_t>> memOverhead;
     //! Total number of Item objects
-    AtomicValue<size_t> numItem;
+    cb::CachelinePadded<AtomicValue<size_t>> numItem;
     //! The total amount of memory used by this bucket (From memory tracking)
-    AtomicValue<size_t> totalMemory;
+    cb::CachelinePadded<AtomicValue<size_t>> totalMemory;
     //! True if the memory usage tracker is enabled.
     AtomicValue<bool> memoryTrackerEnabled;
     //! Whether or not to force engine shutdown.
index 11d4182..b575b2e 100644 (file)
@@ -365,7 +365,7 @@ void HashTable::resize(size_t newSize) {
         return;
     }
 
-    stats.memOverhead.fetch_sub(memorySize());
+    stats.memOverhead->fetch_sub(memorySize());
     ++numResizes;
 
     // Set the new size so all the hashy stuff works.
@@ -389,7 +389,7 @@ void HashTable::resize(size_t newSize) {
     cb_free(values);
     values = newValues;
 
-    stats.memOverhead.fetch_add(memorySize());
+    stats.memOverhead->fetch_add(memorySize());
 }
 
 static size_t distance(size_t a, size_t b) {
index a7411df..65c3c41 100644 (file)
@@ -565,7 +565,7 @@ void TapProducer::clearQueues_UNLOCKED() {
     mem_overhead += (ackLog_.size() * sizeof(TapLogElement));
     ackLog_.clear();
 
-    stats.memOverhead.fetch_sub(mem_overhead);
+    stats.memOverhead->fetch_sub(mem_overhead);
 
     logger.log(EXTENSION_LOG_WARNING, "Clear the tap queues by force");
 }
@@ -648,7 +648,7 @@ void TapProducer::rollback() {
         ++ackLogSize;
     }
 
-    stats.memOverhead.fetch_sub(ackLogSize * sizeof(TapLogElement));
+    stats.memOverhead->fetch_sub(ackLogSize * sizeof(TapLogElement));
 
     seqnoReceived = seqno - 1;
     seqnoAckRequested = seqno - 1;
@@ -886,7 +886,7 @@ ENGINE_ERROR_CODE TapProducer::processAck(uint32_t s,
         ret = ENGINE_DISCONNECT;
     }
 
-    stats.memOverhead.fetch_sub(num_logs * sizeof(TapLogElement));
+    stats.memOverhead->fetch_sub(num_logs * sizeof(TapLogElement));
 
     return ret;
 }
@@ -1082,7 +1082,7 @@ void TapProducer::completeBGFetchJob(Item *itm, uint16_t vbid, bool implicitEnqu
         if (it != checkpointState_.end()) {
             ++(it->second.bgResultSize);
         }
-        stats.memOverhead.fetch_add(sizeof(Item *));
+        stats.memOverhead->fetch_add(sizeof(Item *));
     } else {
         delete itm;
     }
@@ -1107,7 +1107,7 @@ Item* TapProducer::nextBgFetchedItem_UNLOCKED() {
         --(it->second.bgResultSize);
     }
 
-    stats.memOverhead.fetch_sub(sizeof(Item *));
+    stats.memOverhead->fetch_sub(sizeof(Item *));
 
     return rv;
 }
@@ -1323,7 +1323,7 @@ queued_item TapProducer::nextFgFetched_UNLOCKED(bool &shouldPause) {
         } else {
             queueMemSize.store(0);
         }
-        stats.memOverhead.fetch_sub(sizeof(queued_item));
+        stats.memOverhead->fetch_sub(sizeof(queued_item));
         ++recordsFetched;
         return qi;
     }
@@ -1465,7 +1465,7 @@ bool TapProducer::addEvent_UNLOCKED(const queued_item &it) {
         queue->push_back(it);
         ++queueSize;
         queueMemSize.fetch_add(sizeof(queued_item));
-        stats.memOverhead.fetch_add(sizeof(queued_item));
+        stats.memOverhead->fetch_add(sizeof(queued_item));
         return wasEmpty;
     } else {
         return queue->empty();
index 83754e5..b814c24 100644 (file)
@@ -998,7 +998,7 @@ protected:
         if (supportsAck()) {
             TapLogElement log(seqno, qi);
             ackLog_.push_back(log);
-            stats.memOverhead.fetch_add(sizeof(TapLogElement));
+            stats.memOverhead->fetch_add(sizeof(TapLogElement));
         }
     }
 
@@ -1012,7 +1012,7 @@ protected:
             // add to the log!
             TapLogElement log(seqno, e);
             ackLog_.push_back(log);
-            stats.memOverhead.fetch_add(sizeof(TapLogElement));
+            stats.memOverhead->fetch_add(sizeof(TapLogElement));
         }
     }
 
index a532c6f..5056fff 100644 (file)
@@ -146,7 +146,7 @@ VBucket::~VBucket() {
     // Clear out the bloomfilter(s)
     clearFilter();
 
-    stats.memOverhead.fetch_sub(sizeof(VBucket) + ht.memorySize() +
+    stats.memOverhead->fetch_sub(sizeof(VBucket) + ht.memorySize() +
                                 sizeof(CheckpointManager));
 
     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
index 04c2fdd..ede95f4 100644 (file)
@@ -187,7 +187,7 @@ public:
     {
         backfill.isBackfillPhase = false;
         pendingOpsStart = 0;
-        stats.memOverhead.fetch_add(sizeof(VBucket)
+        stats.memOverhead->fetch_add(sizeof(VBucket)
                                + ht.memorySize() + sizeof(CheckpointManager));
         LOG(EXTENSION_LOG_NOTICE,
             "VBucket: created vbucket:%" PRIu16 " with state:%s "
@@ -346,7 +346,7 @@ public:
         ++stats.diskQueueSize;
         ++stats.totalEnqueued;
         doStatsForQueueing(*qi, qi->size());
-        stats.memOverhead.fetch_add(sizeof(queued_item));
+        stats.memOverhead->fetch_add(sizeof(queued_item));
         return true;
     }
     void getBackfillItems(std::vector<queued_item> &items) {
@@ -356,7 +356,7 @@ public:
             items.push_back(backfill.items.front());
             backfill.items.pop();
         }
-        stats.memOverhead.fetch_sub(num_items * sizeof(queued_item));
+        stats.memOverhead->fetch_sub(num_items * sizeof(queued_item));
     }
     bool isBackfillPhase() {
         LockHolder lh(backfill.mutex);