DCP Backfill: Use size_t instead of uint32_t to record mem usage
[ep-engine.git] / src / dcp / stream.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_DCP_STREAM_H_
19 #define SRC_DCP_STREAM_H_ 1
20
21 #include "config.h"
22
23 #include "ep_engine.h"
24 #include "ext_meta_parser.h"
25 #include "dcp/dcp-types.h"
26 #include "dcp/producer.h"
27 #include "response.h"
28 #include "vbucket.h"
29
30 #include <atomic>
31 #include <climits>
32 #include <queue>
33
34 class EventuallyPersistentEngine;
35 class MutationResponse;
36 class SetVBucketState;
37 class SnapshotMarker;
38 class DcpResponse;
39
40 enum end_stream_status_t {
41     //! The stream ended due to all items being streamed
42     END_STREAM_OK,
43     //! The stream closed early due to a close stream message
44     END_STREAM_CLOSED,
45     //! The stream closed early because the vbucket state changed
46     END_STREAM_STATE,
47     //! The stream closed early because the connection was disconnected
48     END_STREAM_DISCONNECTED,
49     //! The stream was closed early because it was too slow (currently unused,
50     //! but not deleted because it is part of the externally-visible API)
51     END_STREAM_SLOW,
52     //! The stream closed early due to backfill failure
53     END_STREAM_BACKFILL_FAIL
54 };
55
56 enum process_items_error_t {
57     all_processed,
58     more_to_process,
59     cannot_process
60 };
61
62 enum backfill_source_t {
63     BACKFILL_FROM_MEMORY,
64     BACKFILL_FROM_DISK
65 };
66
67 class Stream : public RCValue {
68 public:
69
70     enum class Type {
71         Active,
72         Notifier,
73         Passive
74     };
75
76     enum class Snapshot {
77            None,
78            Disk,
79            Memory
80     };
81
82     Stream(const std::string &name,
83            uint32_t flags,
84            uint32_t opaque,
85            uint16_t vb,
86            uint64_t start_seqno,
87            uint64_t end_seqno,
88            uint64_t vb_uuid,
89            uint64_t snap_start_seqno,
90            uint64_t snap_end_seqno,
91            Type type);
92
93     virtual ~Stream();
94
95     uint32_t getFlags() { return flags_; }
96
97     uint16_t getVBucket() { return vb_; }
98
99     uint32_t getOpaque() { return opaque_; }
100
101     uint64_t getStartSeqno() { return start_seqno_; }
102
103     uint64_t getEndSeqno() { return end_seqno_; }
104
105     uint64_t getVBucketUUID() { return vb_uuid_; }
106
107     uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
108
109     uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
110
111     virtual void addStats(ADD_STAT add_stat, const void *c);
112
113     virtual DcpResponse* next() = 0;
114
115     virtual uint32_t setDead(end_stream_status_t status) = 0;
116
117     virtual void notifySeqnoAvailable(uint64_t seqno) {}
118
119     const std::string& getName() {
120         return name_;
121     }
122
123     virtual void setActive() {
124         // Stream defaults to do nothing
125     }
126
127     Type getType() { return type_; }
128
129     /// @returns true if the stream type is Active
130     bool isTypeActive() const;
131
132     /// @returns true if state_ is not Dead
133     bool isActive() const;
134
135     /// @Returns true if state_ is Backfilling
136     bool isBackfilling() const;
137
138     /// @Returns true if state_ is InMemory
139     bool isInMemory() const;
140
141     /// @Returns true if state_ is Pending
142     bool isPending() const;
143
144     /// @Returns true if state_ is TakeoverSend
145     bool isTakeoverSend() const;
146
147     /// @Returns true if state_ is TakeoverWait
148     bool isTakeoverWait() const;
149
150     void clear() {
151         LockHolder lh(streamMutex);
152         clear_UNLOCKED();
153     }
154
155 protected:
156
157     // The StreamState is protected as it needs to be accessed by sub-classes
158     enum class StreamState {
159           Pending,
160           Backfilling,
161           InMemory,
162           TakeoverSend,
163           TakeoverWait,
164           Reading,
165           Dead
166       };
167
168     static const std::string to_string(Stream::StreamState type);
169
170     StreamState getState() const { return state_; }
171
172     void clear_UNLOCKED();
173
174     /* To be called after getting streamMutex lock */
175     void pushToReadyQ(DcpResponse* resp);
176
177     /* To be called after getting streamMutex lock */
178     void popFromReadyQ(void);
179
180     uint64_t getReadyQueueMemory(void);
181
182     const std::string &name_;
183     uint32_t flags_;
184     uint32_t opaque_;
185     uint16_t vb_;
186     uint64_t start_seqno_;
187     uint64_t end_seqno_;
188     uint64_t vb_uuid_;
189     uint64_t snap_start_seqno_;
190     uint64_t snap_end_seqno_;
191     std::atomic<StreamState> state_;
192     Type type_;
193
194     std::atomic<bool> itemsReady;
195     std::mutex streamMutex;
196     std::queue<DcpResponse*> readyQ;
197
198     // Number of items in the readyQ that are not meta items. Used for
199     // calculating getItemsRemaining(). Atomic so it can be safely read by
200     // getItemsRemaining() without acquiring streamMutex.
201     std::atomic<size_t> readyQ_non_meta_items;
202
203     const static uint64_t dcpMaxSeqno;
204
205 private:
206     /* readyQueueMemory tracks the memory occupied by elements
207      * in the readyQ.  It is an atomic because otherwise
208        getReadyQueueMemory would need to acquire streamMutex.
209      */
210     std::atomic <uint64_t> readyQueueMemory;
211 };
212
213 const char* to_string(Stream::Snapshot type);
214 const std::string to_string(Stream::Type type);
215
216 class ActiveStreamCheckpointProcessorTask;
217
218 class ActiveStream : public Stream {
219 public:
220     ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
221                  const std::string &name, uint32_t flags, uint32_t opaque,
222                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
223                  uint64_t vb_uuid, uint64_t snap_start_seqno,
224                  uint64_t snap_end_seqno, bool isKeyOnly);
225
226     ~ActiveStream();
227
228     DcpResponse* next();
229
230     void setActive() {
231         LockHolder lh(streamMutex);
232         if (isPending()) {
233             transitionState(StreamState::Backfilling);
234         }
235     }
236
237     uint32_t setDead(end_stream_status_t status);
238
239     void notifySeqnoAvailable(uint64_t seqno);
240
241     void snapshotMarkerAckReceived();
242
243     void setVBucketStateAckRecieved();
244
245     void incrBackfillRemaining(size_t by) {
246         backfillRemaining.fetch_add(by, std::memory_order_relaxed);
247     }
248
249     void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
250
251     bool backfillReceived(std::unique_ptr<Item> itm,
252                           backfill_source_t backfill_source);
253
254     void completeBackfill();
255
256     bool isCompressionEnabled();
257
258     void addStats(ADD_STAT add_stat, const void *c);
259
260     void addTakeoverStats(ADD_STAT add_stat, const void *c, const VBucket& vb);
261
262     /* Returns a count of how many items are outstanding to be sent for this
263      * stream's vBucket.
264      */
265     size_t getItemsRemaining();
266
267     uint64_t getLastReadSeqno() const;
268
269     uint64_t getLastSentSeqno() const;
270
271     const Logger& getLogger() const;
272
273     // Runs on ActiveStreamCheckpointProcessorTask
274     void nextCheckpointItemTask();
275
276     /* Function to handle a slow stream that is supposedly hogging memory in
277        checkpoint mgr. Currently we handle the slow stream by switching from
278        in-memory to backfilling */
279     void handleSlowStream();
280
281     /// @Returns true if keyOnly is true and false if KeyOnly is false
282     bool isKeyOnly() const {
283         return keyOnly;
284     }
285
286 protected:
287     // Returns the outstanding items for the stream's checkpoint cursor.
288     void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
289
290     // Given a set of queued items, create mutation responses for each item,
291     // and pass onto the producer associated with this stream.
292     void processItems(std::vector<queued_item>& items);
293
294     bool nextCheckpointItem();
295
296     DcpResponse* nextQueuedItem();
297
298     /**
299      * @return a DcpResponse to represent the item. This will be either a
300      *         MutationResponse or SystemEventProducerMessage.
301      */
302     std::unique_ptr<DcpResponse> makeResponseFromItem(queued_item& item);
303
304     /* The transitionState function is protected (as opposed to private) for
305      * testing purposes.
306      */
307     void transitionState(StreamState newState);
308
309     /* Indicates that a backfill has been scheduled and has not yet completed.
310      * Is protected (as opposed to private) for testing purposes.
311      */
312     std::atomic<bool> isBackfillTaskRunning;
313
314     /* Indicates if another backfill must be scheduled following the completion
315      * of current running backfill.  Guarded by streamMutex.
316      * Is protected (as opposed to private) for testing purposes.
317      */
318     bool pendingBackfill;
319
320     //! Stats to track items read and sent from the backfill phase
321     struct {
322         std::atomic<size_t> memory;
323         std::atomic<size_t> disk;
324         std::atomic<size_t> sent;
325     } backfillItems;
326
327     /* The last sequence number queued from disk or memory and is
328        snapshotted and put onto readyQ */
329     std::atomic<uint64_t> lastReadSeqno;
330
331     /* backfillRemaining is a stat recording the amount of
332      * items remaining to be read from disk.  It is an atomic
333      * because otherwise the function incrBackfillRemaining
334      * must acquire the streamMutex lock.
335      */
336     std::atomic<size_t> backfillRemaining;
337
338 private:
339
340     DcpResponse* next(std::lock_guard<std::mutex>& lh);
341
342     DcpResponse* backfillPhase(std::lock_guard<std::mutex>& lh);
343
344     DcpResponse* inMemoryPhase();
345
346     DcpResponse* takeoverSendPhase();
347
348     DcpResponse* takeoverWaitPhase();
349
350     DcpResponse* deadPhase();
351
352     void snapshot(std::deque<DcpResponse*>& snapshot, bool mark);
353
354     void endStream(end_stream_status_t reason);
355
356     /* reschedule = FALSE ==> First backfill on the stream
357      * reschedule = TRUE ==> Schedules another backfill on the stream that has
358      *                       finished backfilling once and still in
359      *                       STREAM_BACKFILLING state or in STREAM_IN_MEMORY
360      *                       state.
361      * Note: Expects the streamMutex to be acquired when called
362      */
363     void scheduleBackfill_UNLOCKED(bool reschedule);
364
365     const char* getEndStreamStatusStr(end_stream_status_t status);
366
367     bool isCurrentSnapshotCompleted() const;
368
369     /* Drop the cursor registered with the checkpoint manager.
370      * Note: Expects the streamMutex to be acquired when called
371      */
372     void dropCheckpointCursor_UNLOCKED();
373
374     /* The last sequence number queued from disk or memory, but is yet to be
375        snapshotted and put onto readyQ */
376     std::atomic<uint64_t> lastReadSeqnoUnSnapshotted;
377
378     //! The last sequence number sent to the network layer
379     std::atomic<uint64_t> lastSentSeqno;
380
381     //! The last known seqno pointed to by the checkpoint cursor
382     std::atomic<uint64_t> curChkSeqno;
383
384     //! The current vbucket state to send in the takeover stream
385     vbucket_state_t takeoverState;
386
387     //! The amount of items that have been sent during the memory phase
388     std::atomic<size_t> itemsFromMemoryPhase;
389
390     //! Whether ot not this is the first snapshot marker sent
391     bool firstMarkerSent;
392
393     std::atomic<int> waitForSnapshot;
394
395     EventuallyPersistentEngine* engine;
396     dcp_producer_t producer;
397
398     struct {
399         std::atomic<size_t> bytes;
400         std::atomic<size_t> items;
401     } bufferedBackfill;
402
403     std::atomic<rel_time_t> takeoverStart;
404     size_t takeoverSendMaxTime;
405
406     //! Last snapshot end seqno sent to the DCP client
407     std::atomic<uint64_t> lastSentSnapEndSeqno;
408
409     /* Flag used by checkpointCreatorTask that is set before all items are
410        extracted for given checkpoint cursor, and is unset after all retrieved
411        items are added to the readyQ */
412     std::atomic<bool> chkptItemsExtractionInProgress;
413
414     // Whether the responses sent using this stream should contain the key and
415     // value or just the key
416     bool keyOnly;
417
418 };
419
420
421 class ActiveStreamCheckpointProcessorTask : public GlobalTask {
422 public:
423     ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
424         : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
425                      INT_MAX, false),
426       notified(false),
427       iterationsBeforeYield(e.getConfiguration()
428                             .getDcpProducerSnapshotMarkerYieldLimit()) { }
429
430     cb::const_char_buffer getDescription() {
431         return "Process checkpoint(s) for DCP producer";
432     }
433
434     bool run();
435     void schedule(const stream_t& stream);
436     void wakeup();
437     void clearQueues();
438     size_t queueSize() {
439         LockHolder lh(workQueueLock);
440         return queue.size();
441     }
442
443 private:
444
445     stream_t queuePop() {
446         stream_t rval;
447         LockHolder lh(workQueueLock);
448         if (!queue.empty()) {
449             rval = queue.front();
450             queue.pop();
451             queuedVbuckets.erase(rval->getVBucket());
452         }
453         return rval;
454     }
455
456     bool queueEmpty() {
457         LockHolder lh(workQueueLock);
458         return queue.empty();
459     }
460
461     void pushUnique(const stream_t& stream) {
462         LockHolder lh(workQueueLock);
463         if (queuedVbuckets.count(stream->getVBucket()) == 0) {
464             queue.push(stream);
465             queuedVbuckets.insert(stream->getVBucket());
466         }
467     }
468
469     std::mutex workQueueLock;
470
471     /**
472      * Maintain a queue of unique stream_t
473      * There's no need to have the same stream in the queue more than once
474      */
475     std::queue<stream_t> queue;
476     std::set<uint16_t> queuedVbuckets;
477
478     std::atomic<bool> notified;
479     size_t iterationsBeforeYield;
480 };
481
482 class NotifierStream : public Stream {
483 public:
484     NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
485                    const std::string &name, uint32_t flags, uint32_t opaque,
486                    uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
487                    uint64_t vb_uuid, uint64_t snap_start_seqno,
488                    uint64_t snap_end_seqno);
489
490     ~NotifierStream() {
491         transitionState(StreamState::Dead);
492     }
493
494     DcpResponse* next();
495
496     uint32_t setDead(end_stream_status_t status);
497
498     void notifySeqnoAvailable(uint64_t seqno);
499
500 private:
501
502     void transitionState(StreamState newState);
503
504     dcp_producer_t producer;
505 };
506
507 class PassiveStream : public Stream {
508 public:
509     PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
510                   const std::string &name, uint32_t flags, uint32_t opaque,
511                   uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
512                   uint64_t vb_uuid, uint64_t snap_start_seqno,
513                   uint64_t snap_end_seqno, uint64_t vb_high_seqno);
514
515     ~PassiveStream();
516
517     process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
518                                                   size_t batchSize);
519
520     DcpResponse* next();
521
522     uint32_t setDead(end_stream_status_t status);
523
524     /**
525      * Place a StreamRequest message into the readyQueue, requesting a DCP
526      * stream for the given UUID.
527      *
528      * @params vb_uuid The UUID to use in the StreamRequest.
529      */
530     void streamRequest(uint64_t vb_uuid);
531
532     void acceptStream(uint16_t status, uint32_t add_opaque);
533
534     void reconnectStream(VBucketPtr &vb, uint32_t new_opaque,
535                          uint64_t start_seqno);
536
537     ENGINE_ERROR_CODE messageReceived(std::unique_ptr<DcpResponse> response);
538
539     void addStats(ADD_STAT add_stat, const void *c);
540
541     static const size_t batchSize;
542
543 protected:
544
545     bool transitionState(StreamState newState);
546
547     ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
548
549     ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
550
551     /**
552      * Handle DCP system events against this stream.
553      *
554      * @param event The system-event to process against the stream.
555      */
556     ENGINE_ERROR_CODE processSystemEvent(const SystemEventMessage& event);
557
558     /**
559      * Process a create collection event, creating the collection on vb
560      *
561      * @param vb Vbucket onto which the collection is created.
562      * @param event The collection system event creating the collection.
563      */
564     ENGINE_ERROR_CODE processCreateCollection(VBucket& vb,
565                                               const CollectionsEvent& event);
566
567     /**
568      * Process a begin delete collection event.
569      *
570      * @param vb Vbucket which we apply the delete on.
571      * @param event The collection system event deleting the collection.
572      */
573     ENGINE_ERROR_CODE processBeginDeleteCollection(
574             VBucket& vb, const CollectionsEvent& event);
575
576     /**
577      * Process a collections change separator event.
578      *
579      * @param vb Vbucket which we apply the delete on.
580      * @param event The collection system event changing the separator.
581      */
582     ENGINE_ERROR_CODE processSeparatorChanged(VBucket& vb,
583                                               const CollectionsEvent& event);
584
585     void handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno);
586
587     void processMarker(SnapshotMarker* marker);
588
589     void processSetVBucketState(SetVBucketState* state);
590
591     uint32_t clearBuffer_UNLOCKED();
592
593     const char* getEndStreamStatusStr(end_stream_status_t status);
594
595     /**
596      * Push a StreamRequest into the readyQueue. The StreamRequest is initiaised
597      * from the object's state except for the uuid.
598      * This function assumes the caller is holding streamMutex.
599      *
600      * @params vb_uuid The VB UUID to use in the StreamRequest.
601      */
602     void streamRequest_UNLOCKED(uint64_t vb_uuid);
603
604     EventuallyPersistentEngine* engine;
605     dcp_consumer_t consumer;
606
607     std::atomic<uint64_t> last_seqno;
608
609     std::atomic<uint64_t> cur_snapshot_start;
610     std::atomic<uint64_t> cur_snapshot_end;
611     std::atomic<Snapshot> cur_snapshot_type;
612     bool cur_snapshot_ack;
613
614     struct Buffer {
615         Buffer() : bytes(0) {}
616
617         bool empty() const {
618             LockHolder lh(bufMutex);
619             return messages.empty();
620         }
621
622         void push(std::unique_ptr<DcpResponse> message) {
623             std::lock_guard<std::mutex> lg(bufMutex);
624             bytes += message->getMessageSize();
625             messages.push_back(std::move(message));
626         }
627
628         /*
629          * Caller must of locked bufMutex and pass as lh (not asserted)
630          */
631         std::unique_ptr<DcpResponse> pop_front(std::unique_lock<std::mutex>& lh) {
632             std::unique_ptr<DcpResponse> rval(std::move(messages.front()));
633             messages.pop_front();
634             bytes -= rval->getMessageSize();
635             return rval;
636         }
637
638         /*
639          * Caller must of locked bufMutex and pass as lh (not asserted)
640          */
641         void push_front(std::unique_ptr<DcpResponse> message,
642                         std::unique_lock<std::mutex>& lh) {
643             bytes += message->getMessageSize();
644             messages.push_front(std::move(message));
645         }
646
647         size_t bytes;
648         /* Lock ordering w.r.t to streamMutex:
649            First acquire bufMutex and then streamMutex */
650         mutable std::mutex bufMutex;
651         std::deque<std::unique_ptr<DcpResponse> > messages;
652     } buffer;
653 };
654
655 #endif  // SRC_DCP_STREAM_H_