MB-23936: Use Threadlocal variables to accumulate stats 54/76854/10
authorPremkumar Thangamani <premkumar.thangamani@couchbase.com>
Tue, 25 Apr 2017 18:19:56 +0000 (11:19 -0700)
committerDave Rigby <daver@couchbase.com>
Wed, 26 Apr 2017 12:59:11 +0000 (12:59 +0000)
Currently when we allocate/deallocate memory, we update the per bucket
variable `totalMemory`. Mutiple threads contend on this variable heavily
as mem allocation/deallocation happen often. The primary idea of
this commit is to maintain threadlocal mem counters for each bucket and
merge it to the `totalMemory` once the local counter reaches a threshold
either based on size or no.of times the local counter has been updated.

Performance Improvement (30%-35%):
--------------------------------
- Tests were run on a cluster of 2 nodes spec: 40 core/2.2 Ghz/64G memory
- On a high throughput read heavy test of 256B values we noticed a
  [30%] increase in total ops (3.5M ops/s)
- On a similar write heavy test, we noticed a [35%] increase in ops (1.9M ops/s)

Limitations:
-----------
- We create one thread local variable per bucket. Different OS'es seem
  to enforce different limits on the no.of tlv. Although we have a hard
  limit of 10 buckets, I'm noting this here for future reference.
  -> [NetBSD:256 Linux:1024 OSX:512 Windows:1088]

- Because we merge local mem stats after certain thresholds are reached, there
  might be some small delays in reflecting the precise memory usage of a
  bucket. On an active system, the delay should be minimal and not noticeable.

- Windows does not seem to provide an api for releasing the mem
  allocated for a thread-local on thread exit in a way like pthreads do.
  So there will be a small leak of about (3*long) on every bucket
  unload on each thread.

- Valgrind might say that the unit tests have leaks on the
  threadlocals. This is because, in the tests, bucket init happens on
  the main thread & when main thread exits the program it does not
  call pthread_exit, so the dtors are not called.

Change-Id: Id14ced2776a29afae18831b372140dd028136b32
Reviewed-on: http://review.couchbase.org/76854
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
configuration.json
docs/stats.org
src/ep_engine.cc
src/objectregistry.cc
src/objectregistry.h
src/stats.h
src/threadlocal.h
src/threadlocal_posix.h
src/threadlocal_win32.h
tests/ep_testsuite.cc

index 76e9368..0b10aa8 100644 (file)
                 }
             }
         },
+       "mem_merge_count_threshold" : {
+            "default": "100",
+            "descr": "No.of mem changes after which the thread-local mem is merged to the bucket counter",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 1000000,
+                    "min": 1
+                }
+            }
+        },
+       "mem_merge_bytes_threshold" : {
+            "default": "102400",
+            "descr": "Amount of mem changes after which the thread-local mem is merged to the bucket counter",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 104857600,
+                    "min": 1
+                }
+            }
+        },
         "num_reader_threads": {
             "default": "0",
             "descr": "Throttle max number of reader threads",
index 089af59..21193d1 100644 (file)
@@ -1091,6 +1091,12 @@ Note that tcmalloc stats are not available on some operating systems
 | ep_mem_low_wat_percent              | Low water mark (as a percentage)       |
 | ep_mem_high_wat                     | High water mark for auto-evictions   |
 | ep_mem_high_wat_percent             | High water mark (as a percentage)      |
+| ep_mem_merge_bytes_threshold        | The amount of thread-local memory    |
+|                                     | accumulation at which the local ctr  |
+|                                     | is to be merged with bucket level ctr|
+| ep_mem_merge_count_threshold        | No.of modifications to thread-local  |
+|                                     | mem ctr after which the ctr is to be |
+|                                     | merged with bucket level ctr         |
 | ep_oom_errors                       | Number of times unrecoverable OOMs   |
 |                                     | happened while processing operations |
 | ep_tmp_oom_errors                   | Number of times temporary OOMs       |
index 1499f43..97c2c15 100644 (file)
@@ -567,6 +567,10 @@ protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
             getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
             getConfiguration().setEphemeralMetadataPurgeInterval(
                     std::stoull(valz));
+        } else if (strcmp(keyz, "mem_merge_count_threshold") == 0) {
+            getConfiguration().setMemMergeCountThreshold(std::stoul(valz));
+        } else if (strcmp(keyz, "mem_merge_bytes_threshold") == 0) {
+            getConfiguration().setMemMergeBytesThreshold(std::stoul(valz));
         } else {
             msg = "Unknown config param";
             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
@@ -1871,6 +1875,10 @@ public:
             engine.setMaxItemSize(value);
         } else if (key.compare("max_item_privileged_bytes") == 0) {
             engine.setMaxItemPrivilegedBytes(value);
+        } else if (key.compare("mem_merge_count_threshold") == 0) {
+            engine.stats.mem_merge_count_threshold = value;
+        } else if (key.compare("mem_merge_bytes_threshold") == 0) {
+            engine.stats.mem_merge_bytes_threshold = value;
         }
     }
 
@@ -1922,6 +1930,16 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
                 configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
     }
 
+    stats.mem_merge_count_threshold = configuration.getMemMergeCountThreshold();
+    configuration.addValueChangedListener(
+            "mem_merge_count_threshold",
+            new EpEngineValueChangeListener(*this));
+
+    stats.mem_merge_bytes_threshold = configuration.getMemMergeBytesThreshold();
+    configuration.addValueChangedListener(
+            "mem_merge_bytes_threshold",
+            new EpEngineValueChangeListener(*this));
+
     maxItemSize = configuration.getMaxItemSize();
     configuration.addValueChangedListener("max_item_size",
                                        new EpEngineValueChangeListener(*this));
@@ -6309,3 +6327,55 @@ void EpEngineTaskable::logRunTime(TaskId id,
                                   const ProcessClock::duration runTime) {
     myEngine->getKVBucket()->logRunTime(id, runTime);
 }
+
+void EPStats::memAllocated(size_t sz) {
+    if (isShutdown) {
+        return;
+    }
+
+    if (localMemCounter.get() == nullptr) {
+        // this HAS to be a non-bucket allocation
+        // or else the callbacks would try to call this
+        // function again & it would become an infinite loop
+        SystemAllocationGuard system_alloc_guard;
+        localMemCounter.set(new TLMemCounter());
+    }
+
+    if (0 == sz) {
+        return;
+    }
+
+    localMemCounter.get()->used += sz;
+    mergeMemCounter();
+}
+
+void EPStats::memDeallocated(size_t sz) {
+    if (isShutdown) {
+        return;
+    }
+
+    if (localMemCounter.get() == nullptr) {
+        // this HAS to be a non-bucket allocation
+        // or else the callbacks would try to call this
+        // function again & it would become an infinite loop
+        SystemAllocationGuard system_alloc_guard;
+        localMemCounter.set(new TLMemCounter());
+    }
+
+    if (0 == sz) {
+        return;
+    }
+
+    localMemCounter.get()->used -= sz;
+    mergeMemCounter();
+}
+
+void EPStats::mergeMemCounter(bool force) {
+    auto& counter = *(localMemCounter.get());
+    counter.count++;
+    if (force || counter.count % mem_merge_count_threshold == 0 ||
+        std::abs(counter.used) > (long)mem_merge_bytes_threshold) {
+        totalMemory->fetch_add(counter.used);
+        counter.used = 0;
+    }
+}
index 55ba2d3..190a675 100644 (file)
@@ -169,12 +169,13 @@ EventuallyPersistentEngine *ObjectRegistry::getCurrentEngine() {
 
 EventuallyPersistentEngine *ObjectRegistry::onSwitchThread(
                                             EventuallyPersistentEngine *engine,
-                                            bool want_old_thread_local)
-{
+                                            bool want_old_thread_local) {
     EventuallyPersistentEngine *old_engine = NULL;
+
     if (want_old_thread_local) {
         old_engine = th->get();
     }
+
     th->set(engine);
     return old_engine;
 }
@@ -192,7 +193,7 @@ bool ObjectRegistry::memoryAllocated(size_t mem) {
         return false;
     }
     EPStats &stats = engine->getEpStats();
-    stats.totalMemory->fetch_add(mem);
+    stats.memAllocated(mem);
     return true;
 }
 
@@ -205,7 +206,17 @@ bool ObjectRegistry::memoryDeallocated(size_t mem) {
         return false;
     }
     EPStats &stats = engine->getEpStats();
-    stats.totalMemory->fetch_sub(mem);
+    stats.memDeallocated(mem);
     return true;
 }
+
+SystemAllocationGuard::SystemAllocationGuard() {
+    engine = th->get();
+    th->set(nullptr);
+}
+
+SystemAllocationGuard::~SystemAllocationGuard() {
+    th->set(engine);
+}
+
 #endif
index e1a9273..1bf5a50 100644 (file)
@@ -62,4 +62,15 @@ public:
     static bool memoryDeallocated(size_t mem);
 };
 
+/**
+ * To avoid mem accounting within a block
+ */
+class SystemAllocationGuard {
+public:
+    SystemAllocationGuard();
+    ~SystemAllocationGuard();
+private:
+    EventuallyPersistentEngine* engine = nullptr;
+};
+
 #endif  // SRC_OBJECTREGISTRY_H_
index 949e011..9658b3d 100644 (file)
@@ -31,6 +31,8 @@
 #include <relaxed_atomic.h>
 #include <atomic>
 #include "memory_tracker.h"
+#include "objectregistry.h"
+#include "threadlocal.h"
 #include "utility.h"
 
 #ifndef DEFAULT_MAX_DATA_SIZE
@@ -173,6 +175,17 @@ public:
         diskCommitHisto(GrowingWidthGenerator<hrtime_t>(0, ONE_SECOND, 1.4), 25),
         mlogCompactorHisto(GrowingWidthGenerator<hrtime_t>(0, ONE_SECOND, 1.4), 25),
         timingLog(NULL),
+        mem_merge_count_threshold(1),
+        mem_merge_bytes_threshold(0),
+        localMemCounter([](void* ptr) -> void {
+                if (ptr != nullptr) {
+                    // This HAS to be a non-bucket deallocation
+                    // or else the callbacks could try to update counters
+                    // that no longer exist
+                    SystemAllocationGuard system_alloc_guard;
+                    delete (TLMemCounter*)ptr;
+                }
+            }),
         maxDataSize(DEFAULT_MAX_DATA_SIZE) {}
 
     ~EPStats() {
@@ -191,11 +204,24 @@ public:
 
     size_t getTotalMemoryUsed() {
         if (memoryTrackerEnabled.load()) {
-            return totalMemory->load();
+            auto val = totalMemory->load();
+            return val >= 0 ? val : 0;
         }
         return currentSize.load() + memOverhead->load();
     }
 
+    // account for allocated mem
+    void memAllocated(size_t sz);
+
+    // account for deallocated mem
+    void memDeallocated(size_t sz);
+
+    // merge accumulated local memory to the bucket variable
+    // the boolean force, if set to true, will skip checking
+    // threshold constraints and merge the local counters
+    // immediately
+    void mergeMemCounter(bool force = false);
+
     //! Number of keys warmed up during key-only loading.
     Counter warmedUpKeys;
     //! Number of key-values warmed up during data loading.
@@ -315,12 +341,13 @@ public:
     //! Total number of Item objects
     cb::CachelinePadded<Counter> numItem;
     //! The total amount of memory used by this bucket (From memory tracking)
-    cb::CachelinePadded<Counter> totalMemory;
+    // This is a signed variable as depdending on how/when the thread-local
+    // counters merge their info, this could be negative
+    cb::CachelinePadded<Couchbase::RelaxedAtomic<long long> > totalMemory;
     //! True if the memory usage tracker is enabled.
     std::atomic<bool> memoryTrackerEnabled;
     //! Whether or not to force engine shutdown.
     std::atomic<bool> forceShutdown;
-
     //! Number of times unrecoverable oom errors happened while processing operations.
     Counter oom_errors;
     //! Number of times temporary oom errors encountered while processing operations.
@@ -656,7 +683,21 @@ public:
     // Used by stats logging infrastructure.
     std::ostream *timingLog;
 
+    //! These 2 thresholds define when the thread local
+    //  mem counters are merged to the bucket counter
+    size_t mem_merge_count_threshold;
+    size_t mem_merge_bytes_threshold;
+
 private:
+    struct TLMemCounter {
+        // accumulated mem
+        long long used = 0;
+
+        // no.of times mem accounting has happened
+        size_t count = 0;
+    };
+
+    ThreadLocalPtr<TLMemCounter> localMemCounter;
 
     //! Max allowable memory size.
     std::atomic<size_t> maxDataSize;
index 741f28c..85a7e47 100644 (file)
@@ -18,6 +18,9 @@
 #ifndef SRC_THREADLOCAL_H_
 #define SRC_THREADLOCAL_H_ 1
 
+// thread local variable dtor
+using ThreadLocalDestructor = void (*)(void*);
+
 #ifdef WIN32
 #include "threadlocal_win32.h"
 template<typename T>
@@ -34,7 +37,9 @@ using ThreadLocal = ThreadLocalPosix<T>;
 template <typename T>
 class ThreadLocalPtr : public ThreadLocal<T*> {
 public:
-    ThreadLocalPtr() : ThreadLocal<T*>() {}
+    ThreadLocalPtr(ThreadLocalDestructor dtor = nullptr)
+        : ThreadLocal<T*>(dtor) {
+    }
 
     ~ThreadLocalPtr() {}
 
index d7c48e2..d3a469d 100644 (file)
@@ -37,8 +37,8 @@
 template<typename T>
 class ThreadLocalPosix {
 public:
-    ThreadLocalPosix() {
-        int rc = pthread_key_create(&key, NULL);
+    ThreadLocalPosix(ThreadLocalDestructor dtor = nullptr) : dtor(dtor) {
+        int rc = pthread_key_create(&key, dtor);
         if (rc != 0) {
             std::string msg = "Failed to create a thread-specific key: ";
             msg.append(strerror(rc));
@@ -78,6 +78,7 @@ public:
 
 private:
     pthread_key_t key;
+    ThreadLocalDestructor dtor;
 };
 
 #endif  // SRC_THREADLOCAL_POSIX_H_
index 391708b..c5614ea 100644 (file)
@@ -37,7 +37,7 @@
 template<typename T>
 class ThreadLocalWin32 {
 public:
-    ThreadLocalWin32() {
+    ThreadLocalWin32(ThreadLocalDestructor dtor = nullptr) {
         tlsIndex = TlsAlloc();
         if (tlsIndex == TLS_OUT_OF_INDEXES) {
             DWORD err = GetLastError();
@@ -77,6 +77,8 @@ public:
     }
 
 private:
+    // No use in Win32. Only for compatibility
+    ThreadLocalDestructor dtor;
     DWORD tlsIndex;
 };
 
index 98be806..d3a927d 100644 (file)
@@ -71,6 +71,7 @@
 // away ;)
 typedef void (*UNLOCK_COOKIE_T)(const void *cookie);
 
+#define IMMEDIATE_MEM_STATS ";mem_merge_count_threshold=1"
 #define MULTI_DISPATCHER_CONFIG \
     "ht_size=129;ht_locks=3;chk_remover_stime=1;chk_period=60"
 
@@ -6500,6 +6501,8 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "ep_max_vbuckets",
                 "ep_mem_high_wat",
                 "ep_mem_low_wat",
+                "ep_mem_merge_bytes_threshold",
+                "ep_mem_merge_count_threshold",
                 "ep_mutation_mem_threshold",
                 "ep_num_auxio_threads",
                 "ep_num_nonio_threads",
@@ -6693,6 +6696,8 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "ep_mem_high_wat_percent",
                 "ep_mem_low_wat",
                 "ep_mem_low_wat_percent",
+                "ep_mem_merge_bytes_threshold",
+                "ep_mem_merge_count_threshold",
                 "ep_mem_tracker_enabled",
                 "ep_meta_data_disk",
                 "ep_meta_data_memory",
@@ -7546,7 +7551,7 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("bg meta stats", test_bg_meta_stats, test_setup, teardown,
                  NULL, prepare_ep_bucket, cleanup),
         TestCase("mem stats", test_mem_stats, test_setup, teardown,
-                 "chk_remover_stime=1;chk_period=60", prepare, cleanup),
+                 "chk_remover_stime=1;chk_period=60" IMMEDIATE_MEM_STATS, prepare, cleanup),
         TestCase("stats key", test_key_stats, test_setup, teardown,
                  NULL, prepare, cleanup),
         TestCase("stats vkey", test_vkey_stats, test_setup,
@@ -7647,11 +7652,11 @@ BaseTestCase testsuite_testcases[] = {
         // disk>RAM tests
         TestCase("disk>RAM golden path", test_disk_gt_ram_golden,
                  test_setup, teardown,
-                 "chk_remover_stime=1;chk_period=60", prepare_ep_bucket,
+                 "chk_remover_stime=1;chk_period=60" IMMEDIATE_MEM_STATS, prepare_ep_bucket,
                  cleanup),
         TestCase("disk>RAM paged-out rm", test_disk_gt_ram_paged_rm,
                  test_setup, teardown,
-                 "chk_remover_stime=1;chk_period=60", prepare_ep_bucket,
+                 "chk_remover_stime=1;chk_period=60" IMMEDIATE_MEM_STATS, prepare_ep_bucket,
                  cleanup),
         TestCase("disk>RAM update paged-out", test_disk_gt_ram_update_paged_out,
                  test_setup, teardown, NULL, prepare_ep_bucket, cleanup),
@@ -7663,10 +7668,10 @@ BaseTestCase testsuite_testcases[] = {
                  test_setup, teardown, NULL, prepare_ep_bucket, cleanup, true),
         // disk>RAM tests with WAL
         TestCase("disk>RAM golden path (wal)", test_disk_gt_ram_golden,
-                 test_setup, teardown, MULTI_DISPATCHER_CONFIG,
+                 test_setup, teardown, MULTI_DISPATCHER_CONFIG IMMEDIATE_MEM_STATS,
                  prepare_ep_bucket, cleanup),
         TestCase("disk>RAM paged-out rm (wal)", test_disk_gt_ram_paged_rm,
-                 test_setup, teardown, MULTI_DISPATCHER_CONFIG,
+                 test_setup, teardown, MULTI_DISPATCHER_CONFIG IMMEDIATE_MEM_STATS,
                  prepare_ep_bucket, cleanup),
         TestCase("disk>RAM update paged-out (wal)",
                  test_disk_gt_ram_update_paged_out, test_setup,