1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef SRC_DCP_STREAM_H_
19 #define SRC_DCP_STREAM_H_ 1
23 #include "ep_engine.h"
24 #include "ext_meta_parser.h"
25 #include "dcp/dcp-types.h"
26 #include "dcp/producer.h"
34 class EventuallyPersistentEngine;
35 class MutationResponse;
36 class SetVBucketState;
40 enum end_stream_status_t {
41 //! The stream ended due to all items being streamed
43 //! The stream closed early due to a close stream message
45 //! The stream closed early because the vbucket state changed
47 //! The stream closed early because the connection was disconnected
48 END_STREAM_DISCONNECTED,
49 //! The stream was closed early because it was too slow (currently unused,
50 //! but not deleted because it is part of the externally-visible API)
52 //! The stream closed early due to backfill failure
53 END_STREAM_BACKFILL_FAIL
56 enum process_items_error_t {
62 enum backfill_source_t {
67 class Stream : public RCValue {
82 Stream(const std::string &name,
89 uint64_t snap_start_seqno,
90 uint64_t snap_end_seqno,
95 uint32_t getFlags() { return flags_; }
97 uint16_t getVBucket() { return vb_; }
99 uint32_t getOpaque() { return opaque_; }
101 uint64_t getStartSeqno() { return start_seqno_; }
103 uint64_t getEndSeqno() { return end_seqno_; }
105 uint64_t getVBucketUUID() { return vb_uuid_; }
107 uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
109 uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
111 virtual void addStats(ADD_STAT add_stat, const void *c);
113 virtual DcpResponse* next() = 0;
115 virtual uint32_t setDead(end_stream_status_t status) = 0;
117 virtual void notifySeqnoAvailable(uint64_t seqno) {}
119 const std::string& getName() {
123 virtual void setActive() {
124 // Stream defaults to do nothing
127 Type getType() { return type_; }
129 /// @returns true if the stream type is Active
130 bool isTypeActive() const;
132 /// @returns true if state_ is not Dead
133 bool isActive() const;
135 /// @Returns true if state_ is Backfilling
136 bool isBackfilling() const;
138 /// @Returns true if state_ is InMemory
139 bool isInMemory() const;
141 /// @Returns true if state_ is Pending
142 bool isPending() const;
144 /// @Returns true if state_ is TakeoverSend
145 bool isTakeoverSend() const;
147 /// @Returns true if state_ is TakeoverWait
148 bool isTakeoverWait() const;
151 LockHolder lh(streamMutex);
157 // The StreamState is protected as it needs to be accessed by sub-classes
158 enum class StreamState {
168 static const std::string to_string(Stream::StreamState type);
170 StreamState getState() const { return state_; }
172 void clear_UNLOCKED();
174 /* To be called after getting streamMutex lock */
175 void pushToReadyQ(DcpResponse* resp);
177 /* To be called after getting streamMutex lock */
178 void popFromReadyQ(void);
180 uint64_t getReadyQueueMemory(void);
182 const std::string &name_;
186 uint64_t start_seqno_;
189 uint64_t snap_start_seqno_;
190 uint64_t snap_end_seqno_;
191 std::atomic<StreamState> state_;
194 std::atomic<bool> itemsReady;
195 std::mutex streamMutex;
196 std::queue<DcpResponse*> readyQ;
198 // Number of items in the readyQ that are not meta items. Used for
199 // calculating getItemsRemaining(). Atomic so it can be safely read by
200 // getItemsRemaining() without acquiring streamMutex.
201 std::atomic<size_t> readyQ_non_meta_items;
203 const static uint64_t dcpMaxSeqno;
206 /* readyQueueMemory tracks the memory occupied by elements
207 * in the readyQ. It is an atomic because otherwise
208 getReadyQueueMemory would need to acquire streamMutex.
210 std::atomic <uint64_t> readyQueueMemory;
213 const char* to_string(Stream::Snapshot type);
214 const std::string to_string(Stream::Type type);
216 class ActiveStreamCheckpointProcessorTask;
218 class ActiveStream : public Stream {
220 ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
221 const std::string &name, uint32_t flags, uint32_t opaque,
222 uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
223 uint64_t vb_uuid, uint64_t snap_start_seqno,
224 uint64_t snap_end_seqno, bool isKeyOnly);
231 LockHolder lh(streamMutex);
233 transitionState(StreamState::Backfilling);
237 uint32_t setDead(end_stream_status_t status);
239 void notifySeqnoAvailable(uint64_t seqno);
241 void snapshotMarkerAckReceived();
243 void setVBucketStateAckRecieved();
245 void incrBackfillRemaining(size_t by) {
246 backfillRemaining.fetch_add(by, std::memory_order_relaxed);
249 void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
251 bool backfillReceived(std::unique_ptr<Item> itm,
252 backfill_source_t backfill_source);
254 void completeBackfill();
256 bool isCompressionEnabled();
258 void addStats(ADD_STAT add_stat, const void *c);
260 void addTakeoverStats(ADD_STAT add_stat, const void *c, const VBucket& vb);
262 /* Returns a count of how many items are outstanding to be sent for this
265 size_t getItemsRemaining();
267 uint64_t getLastReadSeqno() const;
269 uint64_t getLastSentSeqno() const;
271 const Logger& getLogger() const;
273 // Runs on ActiveStreamCheckpointProcessorTask
274 void nextCheckpointItemTask();
276 /* Function to handle a slow stream that is supposedly hogging memory in
277 checkpoint mgr. Currently we handle the slow stream by switching from
278 in-memory to backfilling */
279 void handleSlowStream();
281 /// @Returns true if keyOnly is true and false if KeyOnly is false
282 bool isKeyOnly() const {
287 // Returns the outstanding items for the stream's checkpoint cursor.
288 void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
290 // Given a set of queued items, create mutation responses for each item,
291 // and pass onto the producer associated with this stream.
292 void processItems(std::vector<queued_item>& items);
294 bool nextCheckpointItem();
296 DcpResponse* nextQueuedItem();
299 * @return a DcpResponse to represent the item. This will be either a
300 * MutationResponse or SystemEventProducerMessage.
302 std::unique_ptr<DcpResponse> makeResponseFromItem(queued_item& item);
304 /* The transitionState function is protected (as opposed to private) for
307 void transitionState(StreamState newState);
309 /* Indicates that a backfill has been scheduled and has not yet completed.
310 * Is protected (as opposed to private) for testing purposes.
312 std::atomic<bool> isBackfillTaskRunning;
314 /* Indicates if another backfill must be scheduled following the completion
315 * of current running backfill. Guarded by streamMutex.
316 * Is protected (as opposed to private) for testing purposes.
318 bool pendingBackfill;
320 //! Stats to track items read and sent from the backfill phase
322 std::atomic<size_t> memory;
323 std::atomic<size_t> disk;
324 std::atomic<size_t> sent;
327 /* The last sequence number queued from disk or memory and is
328 snapshotted and put onto readyQ */
329 std::atomic<uint64_t> lastReadSeqno;
331 /* backfillRemaining is a stat recording the amount of
332 * items remaining to be read from disk. It is an atomic
333 * because otherwise the function incrBackfillRemaining
334 * must acquire the streamMutex lock.
336 std::atomic<size_t> backfillRemaining;
340 DcpResponse* next(std::lock_guard<std::mutex>& lh);
342 DcpResponse* backfillPhase(std::lock_guard<std::mutex>& lh);
344 DcpResponse* inMemoryPhase();
346 DcpResponse* takeoverSendPhase();
348 DcpResponse* takeoverWaitPhase();
350 DcpResponse* deadPhase();
352 void snapshot(std::deque<DcpResponse*>& snapshot, bool mark);
354 void endStream(end_stream_status_t reason);
356 /* reschedule = FALSE ==> First backfill on the stream
357 * reschedule = TRUE ==> Schedules another backfill on the stream that has
358 * finished backfilling once and still in
359 * STREAM_BACKFILLING state or in STREAM_IN_MEMORY
361 * Note: Expects the streamMutex to be acquired when called
363 void scheduleBackfill_UNLOCKED(bool reschedule);
365 const char* getEndStreamStatusStr(end_stream_status_t status);
367 bool isCurrentSnapshotCompleted() const;
369 /* Drop the cursor registered with the checkpoint manager.
370 * Note: Expects the streamMutex to be acquired when called
372 void dropCheckpointCursor_UNLOCKED();
374 /* The last sequence number queued from disk or memory, but is yet to be
375 snapshotted and put onto readyQ */
376 std::atomic<uint64_t> lastReadSeqnoUnSnapshotted;
378 //! The last sequence number sent to the network layer
379 std::atomic<uint64_t> lastSentSeqno;
381 //! The last known seqno pointed to by the checkpoint cursor
382 std::atomic<uint64_t> curChkSeqno;
384 //! The current vbucket state to send in the takeover stream
385 vbucket_state_t takeoverState;
387 //! The amount of items that have been sent during the memory phase
388 std::atomic<size_t> itemsFromMemoryPhase;
390 //! Whether ot not this is the first snapshot marker sent
391 bool firstMarkerSent;
393 std::atomic<int> waitForSnapshot;
395 EventuallyPersistentEngine* engine;
396 dcp_producer_t producer;
399 std::atomic<size_t> bytes;
400 std::atomic<size_t> items;
403 std::atomic<rel_time_t> takeoverStart;
404 size_t takeoverSendMaxTime;
406 //! Last snapshot end seqno sent to the DCP client
407 std::atomic<uint64_t> lastSentSnapEndSeqno;
409 /* Flag used by checkpointCreatorTask that is set before all items are
410 extracted for given checkpoint cursor, and is unset after all retrieved
411 items are added to the readyQ */
412 std::atomic<bool> chkptItemsExtractionInProgress;
414 // Whether the responses sent using this stream should contain the key and
415 // value or just the key
421 class ActiveStreamCheckpointProcessorTask : public GlobalTask {
423 ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
424 : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
427 iterationsBeforeYield(e.getConfiguration()
428 .getDcpProducerSnapshotMarkerYieldLimit()) { }
430 cb::const_char_buffer getDescription() {
431 return "Process checkpoint(s) for DCP producer";
435 void schedule(const stream_t& stream);
439 LockHolder lh(workQueueLock);
445 stream_t queuePop() {
447 LockHolder lh(workQueueLock);
448 if (!queue.empty()) {
449 rval = queue.front();
451 queuedVbuckets.erase(rval->getVBucket());
457 LockHolder lh(workQueueLock);
458 return queue.empty();
461 void pushUnique(const stream_t& stream) {
462 LockHolder lh(workQueueLock);
463 if (queuedVbuckets.count(stream->getVBucket()) == 0) {
465 queuedVbuckets.insert(stream->getVBucket());
469 std::mutex workQueueLock;
472 * Maintain a queue of unique stream_t
473 * There's no need to have the same stream in the queue more than once
475 std::queue<stream_t> queue;
476 std::set<uint16_t> queuedVbuckets;
478 std::atomic<bool> notified;
479 size_t iterationsBeforeYield;
482 class NotifierStream : public Stream {
484 NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
485 const std::string &name, uint32_t flags, uint32_t opaque,
486 uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
487 uint64_t vb_uuid, uint64_t snap_start_seqno,
488 uint64_t snap_end_seqno);
491 transitionState(StreamState::Dead);
496 uint32_t setDead(end_stream_status_t status);
498 void notifySeqnoAvailable(uint64_t seqno);
502 void transitionState(StreamState newState);
504 dcp_producer_t producer;
507 class PassiveStream : public Stream {
509 PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
510 const std::string &name, uint32_t flags, uint32_t opaque,
511 uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
512 uint64_t vb_uuid, uint64_t snap_start_seqno,
513 uint64_t snap_end_seqno, uint64_t vb_high_seqno);
517 process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
522 uint32_t setDead(end_stream_status_t status);
525 * Place a StreamRequest message into the readyQueue, requesting a DCP
526 * stream for the given UUID.
528 * @params vb_uuid The UUID to use in the StreamRequest.
530 void streamRequest(uint64_t vb_uuid);
532 void acceptStream(uint16_t status, uint32_t add_opaque);
534 void reconnectStream(VBucketPtr &vb, uint32_t new_opaque,
535 uint64_t start_seqno);
537 ENGINE_ERROR_CODE messageReceived(std::unique_ptr<DcpResponse> response);
539 void addStats(ADD_STAT add_stat, const void *c);
541 static const size_t batchSize;
545 bool transitionState(StreamState newState);
547 ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
549 ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
552 * Handle DCP system events against this stream.
554 * @param event The system-event to process against the stream.
556 ENGINE_ERROR_CODE processSystemEvent(const SystemEventMessage& event);
559 * Process a create collection event, creating the collection on vb
561 * @param vb Vbucket onto which the collection is created.
562 * @param event The collection system event creating the collection.
564 ENGINE_ERROR_CODE processCreateCollection(VBucket& vb,
565 const CollectionsEvent& event);
568 * Process a begin delete collection event.
570 * @param vb Vbucket which we apply the delete on.
571 * @param event The collection system event deleting the collection.
573 ENGINE_ERROR_CODE processBeginDeleteCollection(
574 VBucket& vb, const CollectionsEvent& event);
577 * Process a collections change separator event.
579 * @param vb Vbucket which we apply the delete on.
580 * @param event The collection system event changing the separator.
582 ENGINE_ERROR_CODE processSeparatorChanged(VBucket& vb,
583 const CollectionsEvent& event);
585 void handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno);
587 void processMarker(SnapshotMarker* marker);
589 void processSetVBucketState(SetVBucketState* state);
591 uint32_t clearBuffer_UNLOCKED();
593 const char* getEndStreamStatusStr(end_stream_status_t status);
596 * Push a StreamRequest into the readyQueue. The StreamRequest is initiaised
597 * from the object's state except for the uuid.
598 * This function assumes the caller is holding streamMutex.
600 * @params vb_uuid The VB UUID to use in the StreamRequest.
602 void streamRequest_UNLOCKED(uint64_t vb_uuid);
604 EventuallyPersistentEngine* engine;
605 dcp_consumer_t consumer;
607 std::atomic<uint64_t> last_seqno;
609 std::atomic<uint64_t> cur_snapshot_start;
610 std::atomic<uint64_t> cur_snapshot_end;
611 std::atomic<Snapshot> cur_snapshot_type;
612 bool cur_snapshot_ack;
615 Buffer() : bytes(0) {}
618 LockHolder lh(bufMutex);
619 return messages.empty();
622 void push(std::unique_ptr<DcpResponse> message) {
623 std::lock_guard<std::mutex> lg(bufMutex);
624 bytes += message->getMessageSize();
625 messages.push_back(std::move(message));
629 * Caller must of locked bufMutex and pass as lh (not asserted)
631 std::unique_ptr<DcpResponse> pop_front(std::unique_lock<std::mutex>& lh) {
632 std::unique_ptr<DcpResponse> rval(std::move(messages.front()));
633 messages.pop_front();
634 bytes -= rval->getMessageSize();
639 * Caller must of locked bufMutex and pass as lh (not asserted)
641 void push_front(std::unique_ptr<DcpResponse> message,
642 std::unique_lock<std::mutex>& lh) {
643 bytes += message->getMessageSize();
644 messages.push_front(std::move(message));
648 /* Lock ordering w.r.t to streamMutex:
649 First acquire bufMutex and then streamMutex */
650 mutable std::mutex bufMutex;
651 std::deque<std::unique_ptr<DcpResponse> > messages;
655 #endif // SRC_DCP_STREAM_H_