Merge "Merge remote-tracking branch 'couchbase/3.0.x' into sherlock" into sherlock
[ep-engine.git] / src / dcp-stream.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_DCP_STREAM_H_
19 #define SRC_DCP_STREAM_H_ 1
20
21 #include "config.h"
22
23 #include "atomic.h"
24 #include "dcp-stream.h"
25 #include "dcp-producer.h"
26 #include "ep_engine.h"
27 #include "ext_meta_parser.h"
28 #include "vbucket.h"
29
30 #include <queue>
31
32 class EventuallyPersistentEngine;
33 class MutationResponse;
34 class SetVBucketState;
35 class SnapshotMarker;
36 class DcpResponse;
37
38 class DcpConsumer;
39 typedef SingleThreadedRCPtr<DcpConsumer> dcp_consumer_t;
40
41 class DcpProducer;
42 typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
43
44 class Stream;
45 typedef SingleThreadedRCPtr<Stream> stream_t;
46
47 typedef enum {
48     STREAM_PENDING,
49     STREAM_BACKFILLING,
50     STREAM_IN_MEMORY,
51     STREAM_TAKEOVER_SEND,
52     STREAM_TAKEOVER_WAIT,
53     STREAM_READING,
54     STREAM_DEAD
55 } stream_state_t;
56
57 typedef enum {
58     //! The stream ended due to all items being streamed
59     END_STREAM_OK,
60     //! The stream closed early due to a close stream message
61     END_STREAM_CLOSED,
62     //! The stream closed early because the vbucket state changed
63     END_STREAM_STATE,
64     //! The stream closed early because the connection was disconnected
65     END_STREAM_DISCONNECTED
66 } end_stream_status_t;
67
68 typedef enum {
69     STREAM_ACTIVE,
70     STREAM_NOTIFIER,
71     STREAM_PASSIVE
72 } stream_type_t;
73
74 typedef enum {
75     none,
76     disk,
77     memory
78 } snapshot_type_t;
79
80 typedef enum {
81     all_processed,
82     more_to_process,
83     cannot_process
84 } process_items_error_t;
85
86 typedef enum {
87     BACKFILL_FROM_MEMORY,
88     BACKFILL_FROM_DISK
89 } backfill_source_t;
90
91 class Stream : public RCValue {
92 public:
93     Stream(const std::string &name, uint32_t flags, uint32_t opaque,
94            uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
95            uint64_t vb_uuid, uint64_t snap_start_seqno,
96            uint64_t snap_end_seqno);
97
98     virtual ~Stream();
99
100     uint32_t getFlags() { return flags_; }
101
102     uint16_t getVBucket() { return vb_; }
103
104     uint32_t getOpaque() { return opaque_; }
105
106     uint64_t getStartSeqno() { return start_seqno_; }
107
108     uint64_t getEndSeqno() { return end_seqno_; }
109
110     uint64_t getVBucketUUID() { return vb_uuid_; }
111
112     uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
113
114     uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
115
116     stream_state_t getState() { return state_; }
117
118     stream_type_t getType() { return type_; }
119
120     virtual void addStats(ADD_STAT add_stat, const void *c);
121
122     virtual DcpResponse* next() = 0;
123
124     virtual uint32_t setDead(end_stream_status_t status) = 0;
125
126     virtual void notifySeqnoAvailable(uint64_t seqno) {}
127
128     bool isActive() {
129         return state_ != STREAM_DEAD;
130     }
131
132     void clear() {
133         LockHolder lh(streamMutex);
134         clear_UNLOCKED();
135     }
136
137 protected:
138
139     const char* stateName(stream_state_t st) const;
140
141     void clear_UNLOCKED();
142
143     /* To be called after getting streamMutex lock */
144     void pushToReadyQ(DcpResponse* resp);
145
146     /* To be called after getting streamMutex lock */
147     void popFromReadyQ(void);
148
149     uint64_t getReadyQueueMemory(void);
150
151     const std::string &name_;
152     uint32_t flags_;
153     uint32_t opaque_;
154     uint16_t vb_;
155     uint64_t start_seqno_;
156     uint64_t end_seqno_;
157     uint64_t vb_uuid_;
158     uint64_t snap_start_seqno_;
159     uint64_t snap_end_seqno_;
160     stream_state_t state_;
161     stream_type_t type_;
162
163     AtomicValue<bool> itemsReady;
164     Mutex streamMutex;
165     std::queue<DcpResponse*> readyQ;
166
167     const static uint64_t dcpMaxSeqno;
168
169 private:
170     /* This tracks the memory occupied by elements in the readyQ */
171     uint64_t readyQueueMemory;
172 };
173
174
175 class ActiveStreamCheckpointProcessorTask;
176
177 class ActiveStream : public Stream {
178 public:
179     ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
180                  const std::string &name, uint32_t flags, uint32_t opaque,
181                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
182                  uint64_t vb_uuid, uint64_t snap_start_seqno,
183                  uint64_t snap_end_seqno);
184
185     ~ActiveStream();
186
187     DcpResponse* next();
188
189     void setActive() {
190         LockHolder lh(streamMutex);
191         if (state_ == STREAM_PENDING) {
192             transitionState(STREAM_BACKFILLING);
193         }
194     }
195
196     uint32_t setDead(end_stream_status_t status);
197
198     void notifySeqnoAvailable(uint64_t seqno);
199
200     void snapshotMarkerAckReceived();
201
202     void setVBucketStateAckRecieved();
203
204     void incrBackfillRemaining(size_t by) {
205         backfillRemaining += by;
206     }
207
208     void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
209
210     bool backfillReceived(Item* itm, backfill_source_t backfill_source);
211
212     void completeBackfill();
213
214     void addStats(ADD_STAT add_stat, const void *c);
215
216     void addTakeoverStats(ADD_STAT add_stat, const void *c);
217
218     size_t getItemsRemaining();
219
220     const char* logHeader();
221
222     // Runs on ActiveStreamCheckpointProcessorTask
223     void nextCheckpointItemTask();
224
225 protected:
226     // Returns the outstanding items for the stream's checkpoint cursor.
227     void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
228
229     // Given a set of queued items, create mutation responses for each item,
230     // and pass onto the producer associated with this stream.
231     void processItems(std::vector<queued_item>& items);
232
233     bool nextCheckpointItem();
234
235 private:
236
237     void transitionState(stream_state_t newState);
238
239     DcpResponse* backfillPhase();
240
241     DcpResponse* inMemoryPhase();
242
243     DcpResponse* takeoverSendPhase();
244
245     DcpResponse* takeoverWaitPhase();
246
247     DcpResponse* deadPhase();
248
249     DcpResponse* nextQueuedItem();
250
251     void snapshot(std::deque<MutationResponse*>& snapshot, bool mark);
252
253     void endStream(end_stream_status_t reason);
254
255     void scheduleBackfill();
256
257     const char* getEndStreamStatusStr(end_stream_status_t status);
258
259     ExtendedMetaData* prepareExtendedMetaData(uint16_t vBucketId,
260                                               uint8_t conflictResMode);
261
262     bool isCurrentSnapshotCompleted() const;
263
264     //! The last sequence number queued from disk or memory
265     uint64_t lastReadSeqno;
266     //! The last sequence number sent to the network layer
267     uint64_t lastSentSeqno;
268     //! The last known seqno pointed to by the checkpoint cursor
269     uint64_t curChkSeqno;
270     //! The current vbucket state to send in the takeover stream
271     vbucket_state_t takeoverState;
272     //! The amount of items remaining to be read from disk
273     size_t backfillRemaining;
274     //! Stats to track items read and sent from the backfill phase
275     struct {
276         AtomicValue<size_t> memory;
277         AtomicValue<size_t> disk;
278         AtomicValue<size_t> sent;
279     } backfillItems;
280     //! The amount of items that have been sent during the memory phase
281     size_t itemsFromMemoryPhase;
282     //! Whether ot not this is the first snapshot marker sent
283     bool firstMarkerSent;
284
285     AtomicValue<int> waitForSnapshot;
286
287     EventuallyPersistentEngine* engine;
288     dcp_producer_t producer;
289     AtomicValue<bool> isBackfillTaskRunning;
290
291     struct {
292         AtomicValue<uint32_t> bytes;
293         AtomicValue<uint32_t> items;
294     } bufferedBackfill;
295
296     //! Last snapshot end seqno sent to the DCP client
297     AtomicValue<uint64_t> lastSentSnapEndSeqno;
298
299     /* Flag used by checkpointCreatorTask that is set before all items are
300        extracted for given checkpoint cursor, and is unset after all retrieved
301        items are added to the readyQ */
302     AtomicValue<bool> chkptItemsExtractionInProgress;
303
304 };
305
306
307 class ActiveStreamCheckpointProcessorTask : public GlobalTask {
308 public:
309     ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
310       : GlobalTask(&e, Priority::ActiveStreamCheckpointProcessor, INT_MAX, false),
311       notified(false),
312       iterationsBeforeYield(e.getConfiguration()
313                             .getDcpProducerSnapshotMarkerYieldLimit()) { }
314
315     std::string getDescription() {
316         std::string rv("Process checkpoint(s) for DCP producer");
317         return rv;
318     }
319
320     bool run();
321     void schedule(stream_t stream);
322     void wakeup();
323     void clearQueues();
324
325 private:
326
327     stream_t queuePop() {
328         stream_t rval;
329         LockHolder lh(workQueueLock);
330         if (!queue.empty()) {
331             rval = queue.front();
332             queue.pop();
333             queuedVbuckets.erase(rval->getVBucket());
334         }
335         return rval;
336     }
337
338     bool queueEmpty() {
339         LockHolder lh(workQueueLock);
340         return queue.empty();
341     }
342
343     void pushUnique(stream_t stream) {
344         LockHolder lh(workQueueLock);
345         if (queuedVbuckets.count(stream->getVBucket()) == 0) {
346             queue.push(stream);
347             queuedVbuckets.insert(stream->getVBucket());
348         }
349     }
350
351     Mutex workQueueLock;
352
353     /**
354      * Maintain a queue of unique stream_t
355      * There's no need to have the same stream in the queue more than once
356      */
357     std::queue<stream_t> queue;
358     std::set<uint16_t> queuedVbuckets;
359
360     AtomicValue<bool> notified;
361     size_t iterationsBeforeYield;
362 };
363
364 class NotifierStream : public Stream {
365 public:
366     NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
367                    const std::string &name, uint32_t flags, uint32_t opaque,
368                    uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
369                    uint64_t vb_uuid, uint64_t snap_start_seqno,
370                    uint64_t snap_end_seqno);
371
372     ~NotifierStream() {
373         transitionState(STREAM_DEAD);
374     }
375
376     DcpResponse* next();
377
378     uint32_t setDead(end_stream_status_t status);
379
380     void notifySeqnoAvailable(uint64_t seqno);
381
382 private:
383
384     void transitionState(stream_state_t newState);
385
386     dcp_producer_t producer;
387 };
388
389 class PassiveStream : public Stream {
390 public:
391     PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
392                   const std::string &name, uint32_t flags, uint32_t opaque,
393                   uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
394                   uint64_t vb_uuid, uint64_t snap_start_seqno,
395                   uint64_t snap_end_seqno, uint64_t vb_high_seqno);
396
397     ~PassiveStream();
398
399     process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
400                                                   size_t batchSize);
401
402     DcpResponse* next();
403
404     uint32_t setDead(end_stream_status_t status);
405
406     void acceptStream(uint16_t status, uint32_t add_opaque);
407
408     void reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque,
409                          uint64_t start_seqno);
410
411     ENGINE_ERROR_CODE messageReceived(DcpResponse* response);
412
413     void addStats(ADD_STAT add_stat, const void *c);
414
415 private:
416
417     ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
418
419     ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
420
421     void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
422
423     void processMarker(SnapshotMarker* marker);
424
425     void processSetVBucketState(SetVBucketState* state);
426
427     bool transitionState(stream_state_t newState);
428
429     uint32_t clearBuffer();
430
431     uint32_t setDead_UNLOCKED(end_stream_status_t status,
432                               LockHolder *slh);
433
434     const char* getEndStreamStatusStr(end_stream_status_t status);
435
436     EventuallyPersistentEngine* engine;
437     dcp_consumer_t consumer;
438     uint64_t last_seqno;
439
440     uint64_t cur_snapshot_start;
441     uint64_t cur_snapshot_end;
442     snapshot_type_t cur_snapshot_type;
443     bool cur_snapshot_ack;
444
445     struct Buffer {
446         Buffer() : bytes(0), items(0) {}
447         size_t bytes;
448         size_t items;
449         Mutex bufMutex;
450         std::queue<DcpResponse*> messages;
451     } buffer;
452 };
453
454 typedef RCPtr<PassiveStream> passive_stream_t;
455
456 #endif  // SRC_DCP_STREAM_H_