MB-21617: Change CAS resolution to nanoseconds
[ep-engine.git] / src / hlc.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #pragma once
19
20 #include <chrono>
21
22 #include "atomic.h"
23 #include "statwriter.h"
24
25 #include <platform/checked_snprintf.h>
26
27 /*
28  * HLC manages a hybrid logical clock for 'time' stamping events.
29  * The class only implements the send logic of the HLC algorithm.
30  *   - http://www.cse.buffalo.edu/tech-reports/2014-04.pdf
31  *
32  * The class allows multiple threads to concurrently call
33  *  - nextHLC
34  *  - setMaxCas
35  *  - setMaxHLCAndTrackDrift
36  *
37  * The paired "drift" counter, which is used to monitor drift over time
38  * isn't atomic in that the total and update parts are not a consistent
39  * snapshot.
40  */
41 class HLC {
42 public:
43     struct DriftStats {
44         uint64_t total;
45         uint64_t updates;
46     };
47
48     struct DriftExceptions {
49         uint32_t ahead;
50         uint32_t behind;
51     };
52
53     /*
54      * @param initHLC a HLC value to start from
55      * @param aheadThresholdAhead threshold a peer can be ahead before we
56      *        increment driftAheadExceeded. Expressed in us.
57      * @param behindThresholdhreshold a peer can be ahead before we
58      *        increment driftBehindExceeded. Expressed in us.
59      */
60     HLC(uint64_t initHLC,
61         std::chrono::microseconds aheadThreshold,
62         std::chrono::microseconds behindThreshold)
63         : maxHLC(initHLC),
64           cummulativeDrift(0),
65           cummulativeDriftIncrements(0),
66           logicalClockTicks(0),
67           driftAheadExceeded(0),
68           driftBehindExceeded(0) {
69         setDriftAheadThreshold(aheadThreshold);
70         setDriftBehindThreshold(behindThreshold);
71     }
72
73     uint64_t nextHLC() {
74         // Create a monotonic timestamp using part of the HLC algorithm by.
75         // a) Reading system time
76         // b) dropping 16-bits (done by nowHLC)
77         // c) comparing it with the last known time (max_cas)
78         // d) returning either now or max_cas + 1
79         uint64_t timeNow = getMasked48(getTime());
80         uint64_t l = maxHLC.load();
81
82         if (timeNow > l) {
83             atomic_setIfBigger(maxHLC, timeNow);
84             return timeNow;
85         }
86         logicalClockTicks++;
87         atomic_setIfBigger(maxHLC, l + 1);
88         return l + 1;
89     }
90
91     void setMaxHLCAndTrackDrift(uint64_t hlc) {
92         auto timeNow = getMasked48(getTime());
93
94         // Track the +/- difference between our time and their time
95         int64_t difference = getMasked48(hlc) - timeNow;
96
97         // Accumulate the absolute drift in microseconds not nanoseconds.
98         // E.g. 5s drift then has ~3.6 trillion updates before overflow vs 3.6
99         // million if we tracked in nanoseconds.
100         const auto ns = std::chrono::nanoseconds(std::abs(difference));
101         cummulativeDrift +=
102             std::chrono::duration_cast<std::chrono::microseconds>(ns).count();
103         cummulativeDriftIncrements++;
104
105         // If the difference is greater, count peer ahead exeception
106         // If the difference is less than our -ve threshold.. count that
107         if (difference > int64_t(driftAheadThreshold)) {
108             driftAheadExceeded++;
109         } else if(difference < (0 - int64_t(driftBehindThreshold))) {
110             driftBehindExceeded++;
111         }
112
113         setMaxHLC(hlc);
114     }
115
116     void setMaxHLC(uint64_t hlc) {
117         atomic_setIfBigger(maxHLC, hlc);
118     }
119
120     void forceMaxHLC(uint64_t hlc) {
121         maxHLC = hlc;
122     }
123
124     uint64_t getMaxHLC() const {
125         return maxHLC;
126     }
127
128     DriftStats getDriftStats() const {
129         // Deliberately not locking to read this pair
130         return {cummulativeDrift, cummulativeDriftIncrements};
131     }
132
133     DriftExceptions getDriftExceptionCounters() const {
134         // Deliberately not locking to read this pair
135         return {driftAheadExceeded, driftBehindExceeded};
136     }
137
138     /*
139      * Set the drift threshold for ahead exception counting
140      * - externally we work in microseconds
141      * - internally we work in nanoseconds
142      */
143     void setDriftAheadThreshold(std::chrono::microseconds threshold) {
144         driftAheadThreshold =
145             std::chrono::duration_cast<std::chrono::nanoseconds>(threshold).count();
146     }
147
148     /*
149      * Set the drift threshold for behind exception counting
150      * - externally we work in (u) microseconds
151      * - internally we work in nanoseconds
152      */
153     void setDriftBehindThreshold(std::chrono::microseconds threshold) {
154         driftBehindThreshold =
155             std::chrono::duration_cast<std::chrono::nanoseconds>(threshold).count();
156     }
157
158     void addStats(const std::string& prefix, ADD_STAT add_stat, const void *c) const {
159         auto maxCas = getMaxHLC();
160         add_prefixed_stat(prefix.data(), "max_cas", maxCas, add_stat, c);
161
162         // Print max_cas as a UTC human readable string
163         auto nanoseconds = std::chrono::nanoseconds(maxCas);//duration
164         auto seconds = std::chrono::duration_cast<std::chrono::seconds>(nanoseconds);
165         time_t maxCasSeconds = seconds.count();
166
167         std::tm tm;
168         char timeString[100];
169         // Print as an ISO-8601 date format with nanosecond fractional part
170         if (cb_gmtime_r(&maxCasSeconds, &tm) == 0 &&
171             strftime(timeString, sizeof(timeString), "%Y-%m-%dT%H:%M:%S", &tm)) {
172             // Get the fractional nanosecond part
173             nanoseconds -= seconds;
174             try {
175                 checked_snprintf(timeString, sizeof(timeString), "%s.%lld",
176                                  timeString, nanoseconds.count());
177             } catch (...) { /* genuinely do nothing, we will just see that the
178                         fraction is missing and max_cas is already printed */
179             }
180             add_prefixed_stat(prefix.data(), "max_cas_str", timeString, add_stat, c);
181         } else {
182             add_prefixed_stat(prefix.data(), "max_cas_str", "could not get string", add_stat, c);
183         }
184
185         add_prefixed_stat(prefix.data(), "total_abs_drift", cummulativeDrift.load(), add_stat, c);
186         add_prefixed_stat(prefix.data(), "total_abs_drift_count", cummulativeDriftIncrements.load(), add_stat, c);
187         add_prefixed_stat(prefix.data(), "drift_ahead_threshold_exceeded", driftAheadExceeded.load(), add_stat, c);
188         add_prefixed_stat(prefix.data(), "drift_behind_threshold_exceeded", driftBehindExceeded.load(), add_stat, c);
189         add_prefixed_stat(prefix.data(), "logical_clock_ticks", logicalClockTicks.load(), add_stat, c);
190
191         // These are printed "as is" so we know what is being compared. Do not convert to microseconds
192         add_prefixed_stat(prefix.data(), "drift_ahead_threshold", driftAheadThreshold.load(), add_stat, c);
193         add_prefixed_stat(prefix.data(), "drift_behind_threshold", driftBehindThreshold.load(), add_stat, c);
194
195     }
196
197     void resetStats() {
198         // Don't clear max_cas or the threshold values.
199         cummulativeDrift = 0;
200         cummulativeDriftIncrements = 0;
201         driftAheadExceeded = 0;
202         driftBehindExceeded = 0;
203         logicalClockTicks = 0;
204     }
205
206 private:
207     /*
208      * Returns 48-bit of t (bottom 16-bit zero)
209      */
210     static int64_t getMasked48(int64_t t) {
211         return t & ~((1<<16)-1);
212     }
213
214     static int64_t getTime() {
215         auto now = std::chrono::system_clock::now();
216         return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
217     }
218
219     /*
220      * maxHLC tracks the current highest time, either our own or a peer who
221      * has a larger clock value. nextHLC and setMax* methods change this and can
222      * be called from different threads
223      */
224     std::atomic<uint64_t> maxHLC;
225
226     /*
227      * The following are used for stats/drift tracking.
228      * many threads could be setting cas so they need to be atomically
229      * updated for consisent totals.
230      */
231     std::atomic<uint64_t> cummulativeDrift;
232     std::atomic<uint64_t> cummulativeDriftIncrements;
233     std::atomic<uint64_t> logicalClockTicks;
234     std::atomic<uint32_t> driftAheadExceeded;
235     std::atomic<uint32_t> driftBehindExceeded;
236     std::atomic<uint64_t> driftAheadThreshold;
237     std::atomic<uint64_t> driftBehindThreshold;
238 };