1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2016 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
23 #include "statwriter.h"
26 * HLC manages a hybrid logical clock for 'time' stamping events.
27 * The class only implements the send logic of the HLC algorithm.
28 * - http://www.cse.buffalo.edu/tech-reports/2014-04.pdf
30 * The class allows multiple threads to concurrently call
33 * - setMaxHLCAndTrackDrift
35 * The paired "drift" counter, which is used to monitor drift over time
36 * isn't atomic in that the total and update parts are not a consistent
46 struct DriftExceptions {
52 * @param initHLC a HLC value to start from
53 * @param aheadThresholdAhead threshold a peer can be ahead before we
54 * increment driftAheadExceeded
55 * @param behindThresholdhreshold a peer can be ahead before we
56 * increment driftBehindExceeded
58 HLC(uint64_t initHLC, uint64_t aheadThreshold, uint64_t behindThreshold)
61 cummulativeDriftIncrements(0),
63 driftAheadExceeded(0),
64 driftBehindExceeded(0),
65 driftAheadThreshold(aheadThreshold),
66 driftBehindThreshold(behindThreshold) {}
69 // Create a monotonic timestamp using part of the HLC algorithm by.
70 // a) Reading system time
71 // b) dropping 16-bits (done by nowHLC)
72 // c) comparing it with the last known time (max_cas)
73 // d) returning either now or max_cas + 1
74 uint64_t timeNow = getMasked48(getTime());
75 uint64_t l = maxHLC.load();
78 atomic_setIfBigger(maxHLC, timeNow);
82 atomic_setIfBigger(maxHLC, l + 1);
86 void setMaxHLCAndTrackDrift(uint64_t hlc) {
87 auto timeNow = getMasked48(getTime());
89 // Track the +/- difference between our time and their time
90 int64_t difference = getMasked48(hlc) - timeNow;
92 // Accumulate the absolute drift
93 cummulativeDrift += std::abs(difference);
94 cummulativeDriftIncrements++;
96 // If the difference is greater, count peer ahead exeception
97 // If the difference is less than our -ve threshold.. count that
98 if (difference > int64_t(driftAheadThreshold)) {
100 } else if(difference < (0 - int64_t(driftBehindThreshold))) {
101 driftBehindExceeded++;
107 void setMaxHLC(uint64_t hlc) {
108 atomic_setIfBigger(maxHLC, hlc);
111 void forceMaxHLC(uint64_t hlc) {
115 uint64_t getMaxHLC() const {
119 DriftStats getDriftStats() const {
120 // Deliberately not locking to read this pair
121 return {cummulativeDrift, cummulativeDriftIncrements};
124 DriftExceptions getDriftExceptionCounters() const {
125 // Deliberately not locking to read this pair
126 return {driftAheadExceeded, driftBehindExceeded};
129 void setDriftAheadThreshold(uint64_t threshold) {
130 driftAheadThreshold = threshold;
133 void setDriftBehindThreshold(uint64_t threshold) {
134 driftBehindThreshold = threshold;
137 void addStats(const std::string& prefix, ADD_STAT add_stat, const void *c) const {
138 add_prefixed_stat(prefix.data(), "max_cas", getMaxHLC(), add_stat, c);
139 add_prefixed_stat(prefix.data(), "total_abs_drift", cummulativeDrift.load(), add_stat, c);
140 add_prefixed_stat(prefix.data(), "total_abs_drift_count", cummulativeDriftIncrements.load(), add_stat, c);
141 add_prefixed_stat(prefix.data(), "drift_ahead_threshold_exceeded", driftAheadExceeded.load(), add_stat, c);
142 add_prefixed_stat(prefix.data(), "drift_ahead_threshold", driftAheadThreshold.load(), add_stat, c);
143 add_prefixed_stat(prefix.data(), "drift_behind_threshold_exceeded", driftBehindExceeded.load(), add_stat, c);
144 add_prefixed_stat(prefix.data(), "drift_behind_threshold", driftBehindThreshold.load(), add_stat, c);
145 add_prefixed_stat(prefix.data(), "logical_clock_ticks", logicalClockTicks.load(), add_stat, c);
149 // Don't clear max_cas or the threshold values.
150 cummulativeDrift = 0;
151 cummulativeDriftIncrements = 0;
152 driftAheadExceeded = 0;
153 driftBehindExceeded = 0;
154 logicalClockTicks = 0;
159 * Returns 48-bit of t (bottom 16-bit zero)
161 static int64_t getMasked48(int64_t t) {
162 return t & ~((1<<16)-1);
165 static int64_t getTime() {
166 auto now = std::chrono::system_clock::now();
167 return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
171 * maxHLC tracks the current highest time, either our own or a peer who
172 * has a larger clock value. nextHLC and setMax* methods change this and can
173 * be called from different threads
175 std::atomic<uint64_t> maxHLC;
178 * The following are used for stats/drift tracking.
179 * many threads could be setting cas so they need to be atomically
180 * updated for consisent totals.
182 std::atomic<uint64_t> cummulativeDrift;
183 std::atomic<uint64_t> cummulativeDriftIncrements;
184 std::atomic<uint64_t> logicalClockTicks;
185 std::atomic<uint32_t> driftAheadExceeded;
186 std::atomic<uint32_t> driftBehindExceeded;
187 std::atomic<uint64_t> driftAheadThreshold;
188 std::atomic<uint64_t> driftBehindThreshold;