MB-19259: Fix data race on DcpConsumer::backoffs
[ep-engine.git] / src / dcp-consumer.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_DCP_CONSUMER_H_
19 #define SRC_DCP_CONSUMER_H_ 1
20
21 #include "config.h"
22
23 #include "tapconnection.h"
24 #include "dcp-stream.h"
25
26 #include <relaxed_atomic.h>
27
28 typedef RCPtr<PassiveStream> passive_stream_t;
29
30 class DcpResponse;
31
32 class DcpConsumer : public Consumer, public Notifiable {
33 typedef std::map<uint32_t, std::pair<uint32_t, uint16_t> > opaque_map;
34 public:
35
36     DcpConsumer(EventuallyPersistentEngine &e, const void *cookie,
37                 const std::string &n);
38
39     ~DcpConsumer();
40
41     ENGINE_ERROR_CODE addStream(uint32_t opaque, uint16_t vbucket,
42                                 uint32_t flags);
43
44     ENGINE_ERROR_CODE closeStream(uint32_t opaque, uint16_t vbucket);
45
46     ENGINE_ERROR_CODE streamEnd(uint32_t opaque, uint16_t vbucket,
47                                 uint32_t flags);
48
49     ENGINE_ERROR_CODE mutation(uint32_t opaque, const void* key, uint16_t nkey,
50                                const void* value, uint32_t nvalue, uint64_t cas,
51                                uint16_t vbucket, uint32_t flags,
52                                uint8_t datatype, uint32_t locktime,
53                                uint64_t bySeqno, uint64_t revSeqno,
54                                uint32_t exptime, uint8_t nru, const void* meta,
55                                uint16_t nmeta);
56
57     ENGINE_ERROR_CODE deletion(uint32_t opaque, const void* key, uint16_t nkey,
58                                uint64_t cas, uint16_t vbucket, uint64_t bySeqno,
59                                uint64_t revSeqno, const void* meta,
60                                uint16_t nmeta);
61
62     ENGINE_ERROR_CODE expiration(uint32_t opaque, const void* key,
63                                  uint16_t nkey, uint64_t cas, uint16_t vbucket,
64                                  uint64_t bySeqno, uint64_t revSeqno,
65                                  const void* meta, uint16_t nmeta);
66
67     ENGINE_ERROR_CODE snapshotMarker(uint32_t opaque,
68                                      uint16_t vbucket,
69                                      uint64_t start_seqno,
70                                      uint64_t end_seqno,
71                                      uint32_t flags);
72
73     ENGINE_ERROR_CODE noop(uint32_t opaque);
74
75     ENGINE_ERROR_CODE flush(uint32_t opaque, uint16_t vbucket);
76
77     ENGINE_ERROR_CODE setVBucketState(uint32_t opaque, uint16_t vbucket,
78                                       vbucket_state_t state);
79
80     ENGINE_ERROR_CODE step(struct dcp_message_producers* producers);
81
82     ENGINE_ERROR_CODE handleResponse(protocol_binary_response_header *resp);
83
84     bool doRollback(uint32_t opaque, uint16_t vbid, uint64_t rollbackSeqno);
85
86     void addStats(ADD_STAT add_stat, const void *c);
87
88     void aggregateQueueStats(ConnCounter* aggregator);
89
90     void notifyStreamReady(uint16_t vbucket);
91
92     void closeAllStreams();
93
94     process_items_error_t processBufferedItems();
95
96     bool isStreamPresent(uint16_t vbucket);
97
98 private:
99
100     DcpResponse* getNextItem();
101
102     /**
103      * Check if the provided opaque id is one of the
104      * current open "session" id's
105      *
106      * @param opaque the provided opaque
107      * @param vbucket the provided vbucket
108      * @return true if the session is open, false otherwise
109      */
110     bool isValidOpaque(uint32_t opaque, uint16_t vbucket);
111
112     void streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body,
113                         uint32_t bodylen);
114
115     ENGINE_ERROR_CODE handleNoop(struct dcp_message_producers* producers);
116
117     ENGINE_ERROR_CODE handleFlowCtl(struct dcp_message_producers* producers);
118
119     inline bool isBufferSufficientlyDrained(uint32_t ackable_bytes);
120
121     uint64_t opaqueCounter;
122     size_t processTaskId;
123     AtomicValue<bool> itemsToProcess;
124
125     Mutex readyMutex;
126     std::list<uint16_t> ready;
127
128     passive_stream_t* streams;
129     opaque_map opaqueMap_;
130
131     rel_time_t lastNoopTime;
132     Couchbase::RelaxedAtomic<uint32_t> backoffs;
133     uint32_t noopInterval;
134     bool enableNoop;
135     bool sendNoopInterval;
136
137     struct FlowControl {
138         FlowControl() : enabled(true), pendingControl(true), bufferSize(0),
139                         maxUnackedBytes(0), lastBufferAck(ep_current_time()),
140                         freedBytes(0), ackedBytes(0) {}
141         bool enabled;
142         bool pendingControl;
143         uint32_t bufferSize;
144         uint32_t maxUnackedBytes;
145         rel_time_t lastBufferAck;
146         AtomicValue<uint32_t> freedBytes;
147         AtomicValue<uint64_t> ackedBytes;
148     } flowControl;
149 };
150
151 /*
152  * Task that orchestrates rollback on Consumer,
153  * runs in background.
154  */
155 class RollbackTask : public GlobalTask {
156 public:
157     RollbackTask(EventuallyPersistentEngine* e,
158                  uint32_t opaque_, uint16_t vbid_,
159                  uint64_t rollbackSeqno_, dcp_consumer_t conn,
160                  const Priority &p):
161         GlobalTask(e, p, 0, false), engine(e),
162         opaque(opaque_), vbid(vbid_), rollbackSeqno(rollbackSeqno_),
163         cons(conn) { }
164
165     std::string getDescription() {
166         return std::string("Running rollback task for vbucket %d", vbid);
167     }
168
169     bool run();
170
171 private:
172     EventuallyPersistentEngine *engine;
173     uint32_t opaque;
174     uint16_t vbid;
175     uint64_t rollbackSeqno;
176     dcp_consumer_t cons;
177 };
178
179 #endif  // SRC_DCP_CONSUMER_H_