DCP Backfill: Use size_t instead of uint32_t to record mem usage
[ep-engine.git] / src / dcp / producer.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_DCP_PRODUCER_H_
19 #define SRC_DCP_PRODUCER_H_ 1
20
21 #include "config.h"
22
23 #include "atomic_unordered_map.h"
24 #include "dcp/dcp-types.h"
25 #include "tapconnection.h"
26
27 class BackfillManager;
28 class DcpResponse;
29
30 class DcpProducer : public Producer {
31 public:
32
33 /*
34  * MutationType is used to state whether the active streams associated with the
35  * the DCPProducer need to send both the key and value (AllValue) or whether
36  * they can send just the key (KeyOnly).
37  */
38 enum class MutationType {
39     KeyOnly,
40     KeyAndValue
41 };
42
43     /**
44      * Construct a DCP Producer
45      *
46      * @param e The engine.
47      * @param cookie Cookie of the connection creating the producer.
48      * @param n A name chosen by the client.
49      * @param notifyOnly If true the producer only notifies, i.e. no data is
50      *        sent.
51      * @param startTask If true an internal checkpoint task is created and
52      *        started. Test code may wish to defer or manually handle the task
53      *        creation.
54      * @param mutType The MutationType to use for the items sent from the
55      *        DcpProducer.
56      */
57     DcpProducer(EventuallyPersistentEngine& e,
58                 const void* cookie,
59                 const std::string& n,
60                 bool notifyOnly,
61                 bool startTask,
62                 MutationType mutType);
63
64     ~DcpProducer();
65
66     ENGINE_ERROR_CODE streamRequest(uint32_t flags, uint32_t opaque,
67                                     uint16_t vbucket, uint64_t start_seqno,
68                                     uint64_t end_seqno, uint64_t vbucket_uuid,
69                                     uint64_t last_seqno, uint64_t next_seqno,
70                                     uint64_t *rollback_seqno,
71                                     dcp_add_failover_log callback);
72
73     ENGINE_ERROR_CODE getFailoverLog(uint32_t opaque, uint16_t vbucket,
74                                      dcp_add_failover_log callback);
75
76     ENGINE_ERROR_CODE step(struct dcp_message_producers* producers);
77
78     ENGINE_ERROR_CODE bufferAcknowledgement(uint32_t opaque, uint16_t vbucket,
79                                             uint32_t buffer_bytes);
80
81     ENGINE_ERROR_CODE control(uint32_t opaque, const void* key, uint16_t nkey,
82                               const void* value, uint32_t nvalue);
83
84     /**
85      * Sub-classes must implement a method that processes a response
86      * to a request initiated by itself.
87      *
88      * @param resp A mcbp response message to process.
89      * @returns true/false which will be converted to SUCCESS/DISCONNECT by the
90      *          engine.
91      */
92     bool handleResponse(protocol_binary_response_header* resp);
93
94     void addStats(ADD_STAT add_stat, const void *c);
95
96     void addTakeoverStats(ADD_STAT add_stat, const void* c, const VBucket& vb);
97
98     void aggregateQueueStats(ConnCounter& aggregator);
99
100     void setDisconnect(bool disconnect);
101
102     void notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno);
103
104     void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
105
106     /* This function handles a stream that is detected as slow by the checkpoint
107        remover. Currently we handle the slow stream by switching from in-memory
108        to backfilling */
109     bool handleSlowStream(uint16_t vbid, const std::string &name);
110
111     void closeAllStreams();
112
113     const char *getType() const;
114
115     bool isTimeForNoop();
116
117     void setTimeForNoop();
118
119     void clearQueues();
120
121     size_t getBackfillQueueSize();
122
123     size_t getItemsSent();
124
125     size_t getTotalBytes();
126
127     std::vector<uint16_t> getVBVector(void);
128
129     /**
130      * Close the stream for given vbucket stream
131      *
132      * @param vbucket the if for the vbucket to close
133      * @return ENGINE_SUCCESS upon a successful close
134      *         ENGINE_NOT_MY_VBUCKET the vbucket stream doesn't exist
135      */
136     ENGINE_ERROR_CODE closeStream(uint32_t opaque, uint16_t vbucket);
137
138     void notifyStreamReady(uint16_t vbucket);
139
140     void notifyBackfillManager();
141     bool recordBackfillManagerBytesRead(size_t bytes);
142     void recordBackfillManagerBytesSent(size_t bytes);
143     void scheduleBackfillManager(VBucket& vb,
144                                  const active_stream_t& s,
145                                  uint64_t start,
146                                  uint64_t end);
147
148     bool isExtMetaDataEnabled () {
149         return enableExtMetaData;
150     }
151
152     bool isValueCompressionEnabled() {
153         return enableValueCompression;
154     }
155
156     void notifyPaused(bool schedule);
157
158     class BufferLog {
159     public:
160
161         /*
162             BufferLog has 3 states.
163             Disabled - Flow-control is not in-use.
164              This is indicated by setting the size to 0 (i.e. setBufferSize(0)).
165
166             SpaceAvailable - There is *some* space available. You can always
167              insert n-bytes even if there's n-1 bytes spare.
168
169             Full - inserts have taken the number of bytes available equal or
170              over the buffer size.
171         */
172         enum State {
173             Disabled,
174             Full,
175             SpaceAvailable
176         };
177
178         BufferLog(DcpProducer& p)
179             : producer(p), maxBytes(0), bytesSent(0), ackedBytes(0) {}
180
181         void setBufferSize(size_t maxBytes);
182
183         void addStats(ADD_STAT add_stat, const void *c);
184
185         /*
186             Return false if the log is full.
187
188             Returns true if the bytes fit or if the buffer log is disabled.
189               The tracked bytes is increased.
190         */
191         bool insert(size_t bytes);
192
193         /*
194             Acknowledge the bytes and unpause the producer if full.
195               The tracked bytes is decreased.
196         */
197         void acknowledge(size_t bytes);
198
199         /*
200             Pause the producer if full.
201         */
202         bool pauseIfFull();
203
204         /*
205             Unpause the producer if there's space (or disabled).
206         */
207         void unpauseIfSpaceAvailable();
208
209 private:
210
211         bool isEnabled_UNLOCKED() {
212             return maxBytes != 0;
213         }
214
215         bool isFull_UNLOCKED() {
216             return bytesSent >= maxBytes;
217         }
218
219         void release_UNLOCKED(size_t bytes);
220
221         State getState_UNLOCKED();
222
223         cb::RWLock logLock;
224         DcpProducer& producer;
225         size_t maxBytes;
226         size_t bytesSent;
227         size_t ackedBytes;
228     };
229
230     /*
231         Insert bytes into this producer's buffer log.
232
233         If the log is disabled or the insert was successful returns true.
234         Else return false.
235     */
236     bool bufferLogInsert(size_t bytes);
237
238     /*
239         Schedules active stream checkpoint processor task
240         for given stream.
241     */
242     void scheduleCheckpointProcessorTask(const stream_t& s);
243
244     /*
245         Clears active stream checkpoint processor task's queue.
246     */
247     void clearCheckpointProcessorTaskQueues();
248
249 protected:
250     /** Searches the streams map for a stream for vbucket ID. Returns the
251      *  found stream, or an empty pointer if none found.
252      */
253     SingleThreadedRCPtr<Stream> findStream(uint16_t vbid);
254
255     /** We may disconnect if noop messages are enabled and the last time we
256      *  received any message (including a noop) exceeds the dcpTimeout.
257      *  Returns ENGINE_DISCONNECT if noop messages are enabled and the timeout
258      *  is exceeded.
259      *  Returns ENGINE_FAILED if noop messages are disabled, or if the timeout
260      *  is not exceeded.  In this case continue without disconnecting.
261      */
262     ENGINE_ERROR_CODE maybeDisconnect();
263
264     /** We may send a noop if a noop acknowledgement is not pending and
265      *  we have exceeded the dcpNoopTxInterval since we last sent a noop.
266      *  Returns ENGINE_WANT_MORE if a noop was sent.
267      *  Returns ENGINE_FAILED if a noop is not required to be sent.
268      *  This occurs if noop messages are disabled, or because we have already
269      *  sent a noop and we are awaiting a receive, or because the time interval
270      *  has not passed.
271      */
272     ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers);
273
274     /**
275      * Create the ActiveStreamCheckpointProcessorTask and assign to
276      * checkpointCreatorTask
277      */
278     void createCheckpointProcessorTask();
279
280     /**
281      * Schedule the checkpointCreatorTask on the ExecutorPool
282      */
283     void scheduleCheckpointProcessorTask();
284
285     struct {
286         rel_time_t sendTime;
287         uint32_t opaque;
288         std::chrono::seconds dcpIdleTimeout;
289         std::chrono::seconds dcpNoopTxInterval;
290         Couchbase::RelaxedAtomic<bool> pendingRecv;
291         Couchbase::RelaxedAtomic<bool> enabled;
292     } noopCtx;
293
294     Couchbase::RelaxedAtomic<rel_time_t> lastReceiveTime;
295
296     DcpResponse* getNextItem();
297
298     size_t getItemsRemaining();
299     stream_t findStreamByVbid(uint16_t vbid);
300
301     std::string priority;
302
303     DcpResponse *rejectResp; // stash response for retry if E2BIG was hit
304
305     bool notifyOnly;
306
307     Couchbase::RelaxedAtomic<bool> enableExtMetaData;
308     Couchbase::RelaxedAtomic<bool> enableValueCompression;
309     Couchbase::RelaxedAtomic<bool> supportsCursorDropping;
310
311     Couchbase::RelaxedAtomic<rel_time_t> lastSendTime;
312     BufferLog log;
313
314     // backfill manager object is owned by this class, but use a
315     // shared_ptr as the lifetime of the manager is shared between the
316     // producer (this class) and BackfillManagerTask (which has a
317     // weak_ptr) to this.
318     std::shared_ptr<BackfillManager> backfillMgr;
319
320     DcpReadyQueue ready;
321
322     // Map of vbid -> stream. Map itself is atomic (thread-safe).
323     typedef AtomicUnorderedMap<uint16_t, SingleThreadedRCPtr<Stream>> StreamsMap;
324     StreamsMap streams;
325
326     std::atomic<size_t> itemsSent;
327     std::atomic<size_t> totalBytesSent;
328
329     ExTask checkpointCreatorTask;
330     static const std::chrono::seconds defaultDcpNoopTxInterval;
331
332     // mutationType i.e. KeyOnly or AllValue, used to determine how all
333     // active streams belonging to the DcpProducer should send their data.
334     MutationType mutationType;
335
336 };
337
338 #endif  // SRC_DCP_PRODUCER_H_