a1d2695583333018daf715dee7e898d245295b33
[ep-engine.git] / src / hlc.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #pragma once
19
20 #include <chrono>
21
22 #include "atomic.h"
23 #include "statwriter.h"
24
25 /*
26  * HLC manages a hybrid logical clock for 'time' stamping events.
27  * The class only implements the send logic of the HLC algorithm.
28  *   - http://www.cse.buffalo.edu/tech-reports/2014-04.pdf
29  *
30  * The class allows multiple threads to concurrently call
31  *  - nextHLC
32  *  - setMaxCas
33  *  - setMaxHLCAndTrackDrift
34  *
35  * The paired "drift" counter, which is used to monitor drift over time
36  * isn't atomic in that the total and update parts are not a consistent
37  * snapshot.
38  */
39 class HLC {
40 public:
41     struct DriftStats {
42         uint64_t total;
43         uint64_t updates;
44     };
45
46     struct DriftExceptions {
47         uint32_t ahead;
48         uint32_t behind;
49     };
50
51     /*
52      * @param initHLC a HLC value to start from
53      * @param aheadThresholdAhead threshold a peer can be ahead before we
54      *        increment driftAheadExceeded
55      * @param behindThresholdhreshold a peer can be ahead before we
56      *        increment driftBehindExceeded
57      */
58     HLC(uint64_t initHLC, uint64_t aheadThreshold, uint64_t behindThreshold)
59         : maxHLC(initHLC),
60           cummulativeDrift(0),
61           cummulativeDriftIncrements(0),
62           logicalClockTicks(0),
63           driftAheadExceeded(0),
64           driftBehindExceeded(0),
65           driftAheadThreshold(aheadThreshold),
66           driftBehindThreshold(behindThreshold) {}
67
68     uint64_t nextHLC() {
69         // Create a monotonic timestamp using part of the HLC algorithm by.
70         // a) Reading system time
71         // b) dropping 16-bits (done by nowHLC)
72         // c) comparing it with the last known time (max_cas)
73         // d) returning either now or max_cas + 1
74         uint64_t timeNow = getMasked48(getTime());
75         uint64_t l = maxHLC.load();
76
77         if (timeNow > l) {
78             atomic_setIfBigger(maxHLC, timeNow);
79             return timeNow;
80         }
81         logicalClockTicks++;
82         atomic_setIfBigger(maxHLC, l + 1);
83         return l + 1;
84     }
85
86     void setMaxHLCAndTrackDrift(uint64_t hlc) {
87         auto timeNow = getMasked48(getTime());
88
89         // Track the +/- difference between our time and their time
90         int64_t difference = getMasked48(hlc) - timeNow;
91
92         // Accumulate the absolute drift
93         cummulativeDrift += std::abs(difference);
94         cummulativeDriftIncrements++;
95
96         // If the difference is greater, count peer ahead exeception
97         // If the difference is less than our -ve threshold.. count that
98         if (difference > int64_t(driftAheadThreshold)) {
99             driftAheadExceeded++;
100         } else if(difference < (0 - int64_t(driftBehindThreshold))) {
101             driftBehindExceeded++;
102         }
103
104         setMaxHLC(hlc);
105     }
106
107     void setMaxHLC(uint64_t hlc) {
108         atomic_setIfBigger(maxHLC, hlc);
109     }
110
111     void forceMaxHLC(uint64_t hlc) {
112         maxHLC = hlc;
113     }
114
115     uint64_t getMaxHLC() const {
116         return maxHLC;
117     }
118
119     DriftStats getDriftStats() const {
120         // Deliberately not locking to read this pair
121         return {cummulativeDrift, cummulativeDriftIncrements};
122     }
123
124     DriftExceptions getDriftExceptionCounters() const {
125         // Deliberately not locking to read this pair
126         return {driftAheadExceeded, driftBehindExceeded};
127     }
128
129     void setDriftAheadThreshold(uint64_t threshold) {
130         driftAheadThreshold = threshold;
131     }
132
133     void setDriftBehindThreshold(uint64_t threshold) {
134         driftBehindThreshold = threshold;
135     }
136
137     void addStats(const std::string& prefix, ADD_STAT add_stat, const void *c) const {
138         add_prefixed_stat(prefix.data(), "max_cas", getMaxHLC(), add_stat, c);
139         add_prefixed_stat(prefix.data(), "total_abs_drift", cummulativeDrift.load(), add_stat, c);
140         add_prefixed_stat(prefix.data(), "total_abs_drift_count", cummulativeDriftIncrements.load(), add_stat, c);
141         add_prefixed_stat(prefix.data(), "drift_ahead_threshold_exceeded", driftAheadExceeded.load(), add_stat, c);
142         add_prefixed_stat(prefix.data(), "drift_ahead_threshold", driftAheadThreshold.load(), add_stat, c);
143         add_prefixed_stat(prefix.data(), "drift_behind_threshold_exceeded", driftBehindExceeded.load(), add_stat, c);
144         add_prefixed_stat(prefix.data(), "drift_behind_threshold", driftBehindThreshold.load(), add_stat, c);
145         add_prefixed_stat(prefix.data(), "logical_clock_ticks", logicalClockTicks.load(), add_stat, c);
146     }
147
148     void resetStats() {
149         // Don't clear max_cas or the threshold values.
150         cummulativeDrift = 0;
151         cummulativeDriftIncrements = 0;
152         driftAheadExceeded = 0;
153         driftBehindExceeded = 0;
154         logicalClockTicks = 0;
155     }
156
157 private:
158     /*
159      * Returns 48-bit of t (bottom 16-bit zero)
160      */
161     static int64_t getMasked48(int64_t t) {
162         return t & ~((1<<16)-1);
163     }
164
165     static int64_t getTime() {
166         auto now = std::chrono::system_clock::now();
167         return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
168     }
169
170     /*
171      * maxHLC tracks the current highest time, either our own or a peer who
172      * has a larger clock value. nextHLC and setMax* methods change this and can
173      * be called from different threads
174      */
175     std::atomic<uint64_t> maxHLC;
176
177     /*
178      * The following are used for stats/drift tracking.
179      * many threads could be setting cas so they need to be atomically
180      * updated for consisent totals.
181      */
182     std::atomic<uint64_t> cummulativeDrift;
183     std::atomic<uint64_t> cummulativeDriftIncrements;
184     std::atomic<uint64_t> logicalClockTicks;
185     std::atomic<uint32_t> driftAheadExceeded;
186     std::atomic<uint32_t> driftBehindExceeded;
187     std::atomic<uint64_t> driftAheadThreshold;
188     std::atomic<uint64_t> driftBehindThreshold;
189 };