1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef SRC_DCP_PRODUCER_H_
19 #define SRC_DCP_PRODUCER_H_ 1
23 #include "atomic_unordered_map.h"
24 #include "dcp/dcp-types.h"
25 #include "tapconnection.h"
27 class BackfillManager;
30 class DcpProducer : public Producer {
34 * MutationType is used to state whether the active streams associated with the
35 * the DCPProducer need to send both the key and value (AllValue) or whether
36 * they can send just the key (KeyOnly).
38 enum class MutationType {
44 * Construct a DCP Producer
46 * @param e The engine.
47 * @param cookie Cookie of the connection creating the producer.
48 * @param n A name chosen by the client.
49 * @param notifyOnly If true the producer only notifies, i.e. no data is
51 * @param startTask If true an internal checkpoint task is created and
52 * started. Test code may wish to defer or manually handle the task
54 * @param mutType The MutationType to use for the items sent from the
57 DcpProducer(EventuallyPersistentEngine& e,
62 MutationType mutType);
66 ENGINE_ERROR_CODE streamRequest(uint32_t flags, uint32_t opaque,
67 uint16_t vbucket, uint64_t start_seqno,
68 uint64_t end_seqno, uint64_t vbucket_uuid,
69 uint64_t last_seqno, uint64_t next_seqno,
70 uint64_t *rollback_seqno,
71 dcp_add_failover_log callback);
73 ENGINE_ERROR_CODE getFailoverLog(uint32_t opaque, uint16_t vbucket,
74 dcp_add_failover_log callback);
76 ENGINE_ERROR_CODE step(struct dcp_message_producers* producers);
78 ENGINE_ERROR_CODE bufferAcknowledgement(uint32_t opaque, uint16_t vbucket,
79 uint32_t buffer_bytes);
81 ENGINE_ERROR_CODE control(uint32_t opaque, const void* key, uint16_t nkey,
82 const void* value, uint32_t nvalue);
85 * Sub-classes must implement a method that processes a response
86 * to a request initiated by itself.
88 * @param resp A mcbp response message to process.
89 * @returns true/false which will be converted to SUCCESS/DISCONNECT by the
92 bool handleResponse(protocol_binary_response_header* resp);
94 void addStats(ADD_STAT add_stat, const void *c);
96 void addTakeoverStats(ADD_STAT add_stat, const void* c, const VBucket& vb);
98 void aggregateQueueStats(ConnCounter& aggregator);
100 void setDisconnect(bool disconnect);
102 void notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno);
104 void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
106 /* This function handles a stream that is detected as slow by the checkpoint
107 remover. Currently we handle the slow stream by switching from in-memory
109 bool handleSlowStream(uint16_t vbid, const std::string &name);
111 void closeAllStreams();
113 const char *getType() const;
115 bool isTimeForNoop();
117 void setTimeForNoop();
121 size_t getBackfillQueueSize();
123 size_t getItemsSent();
125 size_t getTotalBytes();
127 std::vector<uint16_t> getVBVector(void);
130 * Close the stream for given vbucket stream
132 * @param vbucket the if for the vbucket to close
133 * @return ENGINE_SUCCESS upon a successful close
134 * ENGINE_NOT_MY_VBUCKET the vbucket stream doesn't exist
136 ENGINE_ERROR_CODE closeStream(uint32_t opaque, uint16_t vbucket);
138 void notifyStreamReady(uint16_t vbucket);
140 void notifyBackfillManager();
141 bool recordBackfillManagerBytesRead(size_t bytes);
142 void recordBackfillManagerBytesSent(size_t bytes);
143 void scheduleBackfillManager(VBucket& vb,
144 const active_stream_t& s,
148 bool isExtMetaDataEnabled () {
149 return enableExtMetaData;
152 bool isValueCompressionEnabled() {
153 return enableValueCompression;
156 void notifyPaused(bool schedule);
162 BufferLog has 3 states.
163 Disabled - Flow-control is not in-use.
164 This is indicated by setting the size to 0 (i.e. setBufferSize(0)).
166 SpaceAvailable - There is *some* space available. You can always
167 insert n-bytes even if there's n-1 bytes spare.
169 Full - inserts have taken the number of bytes available equal or
170 over the buffer size.
178 BufferLog(DcpProducer& p)
179 : producer(p), maxBytes(0), bytesSent(0), ackedBytes(0) {}
181 void setBufferSize(size_t maxBytes);
183 void addStats(ADD_STAT add_stat, const void *c);
186 Return false if the log is full.
188 Returns true if the bytes fit or if the buffer log is disabled.
189 The tracked bytes is increased.
191 bool insert(size_t bytes);
194 Acknowledge the bytes and unpause the producer if full.
195 The tracked bytes is decreased.
197 void acknowledge(size_t bytes);
200 Pause the producer if full.
205 Unpause the producer if there's space (or disabled).
207 void unpauseIfSpaceAvailable();
211 bool isEnabled_UNLOCKED() {
212 return maxBytes != 0;
215 bool isFull_UNLOCKED() {
216 return bytesSent >= maxBytes;
219 void release_UNLOCKED(size_t bytes);
221 State getState_UNLOCKED();
224 DcpProducer& producer;
231 Insert bytes into this producer's buffer log.
233 If the log is disabled or the insert was successful returns true.
236 bool bufferLogInsert(size_t bytes);
239 Schedules active stream checkpoint processor task
242 void scheduleCheckpointProcessorTask(const stream_t& s);
245 Clears active stream checkpoint processor task's queue.
247 void clearCheckpointProcessorTaskQueues();
250 /** Searches the streams map for a stream for vbucket ID. Returns the
251 * found stream, or an empty pointer if none found.
253 SingleThreadedRCPtr<Stream> findStream(uint16_t vbid);
255 /** We may disconnect if noop messages are enabled and the last time we
256 * received any message (including a noop) exceeds the dcpTimeout.
257 * Returns ENGINE_DISCONNECT if noop messages are enabled and the timeout
259 * Returns ENGINE_FAILED if noop messages are disabled, or if the timeout
260 * is not exceeded. In this case continue without disconnecting.
262 ENGINE_ERROR_CODE maybeDisconnect();
264 /** We may send a noop if a noop acknowledgement is not pending and
265 * we have exceeded the dcpNoopTxInterval since we last sent a noop.
266 * Returns ENGINE_WANT_MORE if a noop was sent.
267 * Returns ENGINE_FAILED if a noop is not required to be sent.
268 * This occurs if noop messages are disabled, or because we have already
269 * sent a noop and we are awaiting a receive, or because the time interval
272 ENGINE_ERROR_CODE maybeSendNoop(struct dcp_message_producers* producers);
275 * Create the ActiveStreamCheckpointProcessorTask and assign to
276 * checkpointCreatorTask
278 void createCheckpointProcessorTask();
281 * Schedule the checkpointCreatorTask on the ExecutorPool
283 void scheduleCheckpointProcessorTask();
288 std::chrono::seconds dcpIdleTimeout;
289 std::chrono::seconds dcpNoopTxInterval;
290 Couchbase::RelaxedAtomic<bool> pendingRecv;
291 Couchbase::RelaxedAtomic<bool> enabled;
294 Couchbase::RelaxedAtomic<rel_time_t> lastReceiveTime;
296 DcpResponse* getNextItem();
298 size_t getItemsRemaining();
299 stream_t findStreamByVbid(uint16_t vbid);
301 std::string priority;
303 DcpResponse *rejectResp; // stash response for retry if E2BIG was hit
307 Couchbase::RelaxedAtomic<bool> enableExtMetaData;
308 Couchbase::RelaxedAtomic<bool> enableValueCompression;
309 Couchbase::RelaxedAtomic<bool> supportsCursorDropping;
311 Couchbase::RelaxedAtomic<rel_time_t> lastSendTime;
314 // backfill manager object is owned by this class, but use a
315 // shared_ptr as the lifetime of the manager is shared between the
316 // producer (this class) and BackfillManagerTask (which has a
317 // weak_ptr) to this.
318 std::shared_ptr<BackfillManager> backfillMgr;
322 // Map of vbid -> stream. Map itself is atomic (thread-safe).
323 typedef AtomicUnorderedMap<uint16_t, SingleThreadedRCPtr<Stream>> StreamsMap;
326 std::atomic<size_t> itemsSent;
327 std::atomic<size_t> totalBytesSent;
329 ExTask checkpointCreatorTask;
330 static const std::chrono::seconds defaultDcpNoopTxInterval;
332 // mutationType i.e. KeyOnly or AllValue, used to determine how all
333 // active streams belonging to the DcpProducer should send their data.
334 MutationType mutationType;
338 #endif // SRC_DCP_PRODUCER_H_