Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / dcp-stream.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_DCP_STREAM_H_
19 #define SRC_DCP_STREAM_H_ 1
20
21 #include "config.h"
22
23 #include "atomic.h"
24 #include "dcp-stream.h"
25 #include "dcp-producer.h"
26 #include "ep_engine.h"
27 #include "ext_meta_parser.h"
28 #include "vbucket.h"
29
30 #include <queue>
31
32 class EventuallyPersistentEngine;
33 class MutationResponse;
34 class SetVBucketState;
35 class SnapshotMarker;
36 class DcpResponse;
37
38 class DcpConsumer;
39 typedef SingleThreadedRCPtr<DcpConsumer> dcp_consumer_t;
40
41 class DcpProducer;
42 typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
43
44 class Stream;
45 typedef SingleThreadedRCPtr<Stream> stream_t;
46
47 typedef enum {
48     STREAM_PENDING,
49     STREAM_BACKFILLING,
50     STREAM_IN_MEMORY,
51     STREAM_TAKEOVER_SEND,
52     STREAM_TAKEOVER_WAIT,
53     STREAM_READING,
54     STREAM_DEAD
55 } stream_state_t;
56
57 typedef enum {
58     //! The stream ended due to all items being streamed
59     END_STREAM_OK,
60     //! The stream closed early due to a close stream message
61     END_STREAM_CLOSED,
62     //! The stream closed early because the vbucket state changed
63     END_STREAM_STATE,
64     //! The stream closed early because the connection was disconnected
65     END_STREAM_DISCONNECTED
66 } end_stream_status_t;
67
68 typedef enum {
69     STREAM_ACTIVE,
70     STREAM_NOTIFIER,
71     STREAM_PASSIVE
72 } stream_type_t;
73
74 typedef enum {
75     none,
76     disk,
77     memory
78 } snapshot_type_t;
79
80 typedef enum {
81     all_processed,
82     more_to_process,
83     cannot_process
84 } process_items_error_t;
85
86 typedef enum {
87     BACKFILL_FROM_MEMORY,
88     BACKFILL_FROM_DISK
89 } backfill_source_t;
90
91 class Stream : public RCValue {
92 public:
93     Stream(const std::string &name, uint32_t flags, uint32_t opaque,
94            uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
95            uint64_t vb_uuid, uint64_t snap_start_seqno,
96            uint64_t snap_end_seqno);
97
98     virtual ~Stream();
99
100     uint32_t getFlags() { return flags_; }
101
102     uint16_t getVBucket() { return vb_; }
103
104     uint32_t getOpaque() { return opaque_; }
105
106     uint64_t getStartSeqno() { return start_seqno_; }
107
108     uint64_t getEndSeqno() { return end_seqno_; }
109
110     uint64_t getVBucketUUID() { return vb_uuid_; }
111
112     uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
113
114     uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
115
116     stream_state_t getState() { return state_; }
117
118     stream_type_t getType() { return type_; }
119
120     virtual void addStats(ADD_STAT add_stat, const void *c);
121
122     virtual DcpResponse* next() = 0;
123
124     virtual uint32_t setDead(end_stream_status_t status) = 0;
125
126     virtual void notifySeqnoAvailable(uint64_t seqno) {}
127
128     bool isActive() {
129         return state_ != STREAM_DEAD;
130     }
131
132     void clear() {
133         LockHolder lh(streamMutex);
134         clear_UNLOCKED();
135     }
136
137 protected:
138
139     const char* stateName(stream_state_t st) const;
140
141     void clear_UNLOCKED();
142
143     /* To be called after getting streamMutex lock */
144     void pushToReadyQ(DcpResponse* resp);
145
146     /* To be called after getting streamMutex lock */
147     void popFromReadyQ(void);
148
149     uint64_t getReadyQueueMemory(void);
150
151     const std::string &name_;
152     uint32_t flags_;
153     uint32_t opaque_;
154     uint16_t vb_;
155     uint64_t start_seqno_;
156     uint64_t end_seqno_;
157     uint64_t vb_uuid_;
158     uint64_t snap_start_seqno_;
159     uint64_t snap_end_seqno_;
160     AtomicValue<stream_state_t> state_;
161     stream_type_t type_;
162
163     AtomicValue<bool> itemsReady;
164     Mutex streamMutex;
165     std::queue<DcpResponse*> readyQ;
166
167     const static uint64_t dcpMaxSeqno;
168
169 private:
170     /* readyQueueMemory tracks the memory occupied by elements
171      * in the readyQ.  It is an atomic because otherwise
172        getReadyQueueMemory would need to acquire streamMutex.
173      */
174     AtomicValue <uint64_t> readyQueueMemory;
175 };
176
177
178 class ActiveStreamCheckpointProcessorTask;
179
180 class ActiveStream : public Stream {
181 public:
182     ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
183                  const std::string &name, uint32_t flags, uint32_t opaque,
184                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
185                  uint64_t vb_uuid, uint64_t snap_start_seqno,
186                  uint64_t snap_end_seqno);
187
188     ~ActiveStream();
189
190     DcpResponse* next();
191
192     void setActive() {
193         LockHolder lh(streamMutex);
194         if (state_ == STREAM_PENDING) {
195             transitionState(STREAM_BACKFILLING);
196         }
197     }
198
199     uint32_t setDead(end_stream_status_t status);
200
201     void notifySeqnoAvailable(uint64_t seqno);
202
203     void snapshotMarkerAckReceived();
204
205     void setVBucketStateAckRecieved();
206
207     void incrBackfillRemaining(size_t by) {
208         backfillRemaining.fetch_add(by);
209     }
210
211     void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
212
213     bool backfillReceived(Item* itm, backfill_source_t backfill_source);
214
215     void completeBackfill();
216
217     void addStats(ADD_STAT add_stat, const void *c);
218
219     void addTakeoverStats(ADD_STAT add_stat, const void *c);
220
221     size_t getItemsRemaining();
222
223     const char* logHeader();
224
225     // Runs on ActiveStreamCheckpointProcessorTask
226     void nextCheckpointItemTask();
227
228 protected:
229     // Returns the outstanding items for the stream's checkpoint cursor.
230     void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
231
232     // Given a set of queued items, create mutation responses for each item,
233     // and pass onto the producer associated with this stream.
234     void processItems(std::vector<queued_item>& items);
235
236     bool nextCheckpointItem();
237
238 private:
239
240     void transitionState(stream_state_t newState);
241
242     DcpResponse* backfillPhase();
243
244     DcpResponse* inMemoryPhase();
245
246     DcpResponse* takeoverSendPhase();
247
248     DcpResponse* takeoverWaitPhase();
249
250     DcpResponse* deadPhase();
251
252     DcpResponse* nextQueuedItem();
253
254     void snapshot(std::deque<MutationResponse*>& snapshot, bool mark);
255
256     void endStream(end_stream_status_t reason);
257
258     void scheduleBackfill();
259
260     const char* getEndStreamStatusStr(end_stream_status_t status);
261
262     ExtendedMetaData* prepareExtendedMetaData(uint16_t vBucketId,
263                                               uint8_t conflictResMode);
264
265     bool isCurrentSnapshotCompleted() const;
266
267     //! The last sequence number queued from disk or memory
268     AtomicValue<uint64_t> lastReadSeqno;
269     //! The last sequence number sent to the network layer
270     AtomicValue<uint64_t> lastSentSeqno;
271     //! The last known seqno pointed to by the checkpoint cursor
272     AtomicValue<uint64_t> curChkSeqno;
273     //! The current vbucket state to send in the takeover stream
274     vbucket_state_t takeoverState;
275     /* backfillRemaining is a stat recording the amount of
276      * items remaining to be read from disk.  It is an atomic
277      * because otherwise the function incrBackfillRemaining
278      * must acquire the streamMutex lock.
279      */
280     AtomicValue <size_t> backfillRemaining;
281     //! Stats to track items read and sent from the backfill phase
282     struct {
283         AtomicValue<size_t> memory;
284         AtomicValue<size_t> disk;
285         AtomicValue<size_t> sent;
286     } backfillItems;
287     //! The amount of items that have been sent during the memory phase
288     size_t itemsFromMemoryPhase;
289     //! Whether ot not this is the first snapshot marker sent
290     bool firstMarkerSent;
291
292     AtomicValue<int> waitForSnapshot;
293
294     EventuallyPersistentEngine* engine;
295     dcp_producer_t producer;
296     AtomicValue<bool> isBackfillTaskRunning;
297
298     struct {
299         AtomicValue<uint32_t> bytes;
300         AtomicValue<uint32_t> items;
301     } bufferedBackfill;
302
303     //! Last snapshot end seqno sent to the DCP client
304     AtomicValue<uint64_t> lastSentSnapEndSeqno;
305
306     /* Flag used by checkpointCreatorTask that is set before all items are
307        extracted for given checkpoint cursor, and is unset after all retrieved
308        items are added to the readyQ */
309     AtomicValue<bool> chkptItemsExtractionInProgress;
310
311 };
312
313
314 class ActiveStreamCheckpointProcessorTask : public GlobalTask {
315 public:
316     ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
317         : GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
318                      INT_MAX, false),
319       notified(false),
320       iterationsBeforeYield(e.getConfiguration()
321                             .getDcpProducerSnapshotMarkerYieldLimit()) { }
322
323     std::string getDescription() {
324         std::string rv("Process checkpoint(s) for DCP producer");
325         return rv;
326     }
327
328     bool run();
329     void schedule(stream_t stream);
330     void wakeup();
331     void clearQueues();
332
333 private:
334
335     stream_t queuePop() {
336         stream_t rval;
337         LockHolder lh(workQueueLock);
338         if (!queue.empty()) {
339             rval = queue.front();
340             queue.pop();
341             queuedVbuckets.erase(rval->getVBucket());
342         }
343         return rval;
344     }
345
346     bool queueEmpty() {
347         LockHolder lh(workQueueLock);
348         return queue.empty();
349     }
350
351     void pushUnique(stream_t stream) {
352         LockHolder lh(workQueueLock);
353         if (queuedVbuckets.count(stream->getVBucket()) == 0) {
354             queue.push(stream);
355             queuedVbuckets.insert(stream->getVBucket());
356         }
357     }
358
359     Mutex workQueueLock;
360
361     /**
362      * Maintain a queue of unique stream_t
363      * There's no need to have the same stream in the queue more than once
364      */
365     std::queue<stream_t> queue;
366     std::set<uint16_t> queuedVbuckets;
367
368     AtomicValue<bool> notified;
369     size_t iterationsBeforeYield;
370 };
371
372 class NotifierStream : public Stream {
373 public:
374     NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
375                    const std::string &name, uint32_t flags, uint32_t opaque,
376                    uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
377                    uint64_t vb_uuid, uint64_t snap_start_seqno,
378                    uint64_t snap_end_seqno);
379
380     ~NotifierStream() {
381         transitionState(STREAM_DEAD);
382     }
383
384     DcpResponse* next();
385
386     uint32_t setDead(end_stream_status_t status);
387
388     void notifySeqnoAvailable(uint64_t seqno);
389
390 private:
391
392     void transitionState(stream_state_t newState);
393
394     dcp_producer_t producer;
395 };
396
397 class PassiveStream : public Stream {
398 public:
399     PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
400                   const std::string &name, uint32_t flags, uint32_t opaque,
401                   uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
402                   uint64_t vb_uuid, uint64_t snap_start_seqno,
403                   uint64_t snap_end_seqno, uint64_t vb_high_seqno);
404
405     ~PassiveStream();
406
407     process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
408                                                   size_t batchSize);
409
410     DcpResponse* next();
411
412     uint32_t setDead(end_stream_status_t status);
413
414     void acceptStream(uint16_t status, uint32_t add_opaque);
415
416     void reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque,
417                          uint64_t start_seqno);
418
419     ENGINE_ERROR_CODE messageReceived(DcpResponse* response);
420
421     void addStats(ADD_STAT add_stat, const void *c);
422
423 private:
424
425     ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
426
427     ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
428
429     void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
430
431     void processMarker(SnapshotMarker* marker);
432
433     void processSetVBucketState(SetVBucketState* state);
434
435     bool transitionState(stream_state_t newState);
436
437     uint32_t clearBuffer();
438
439     uint32_t setDead_UNLOCKED(end_stream_status_t status,
440                               LockHolder *slh);
441
442     const char* getEndStreamStatusStr(end_stream_status_t status);
443
444     EventuallyPersistentEngine* engine;
445     dcp_consumer_t consumer;
446     AtomicValue<uint64_t> last_seqno;
447
448     AtomicValue<uint64_t> cur_snapshot_start;
449     AtomicValue<uint64_t> cur_snapshot_end;
450     AtomicValue<snapshot_type_t> cur_snapshot_type;
451     bool cur_snapshot_ack;
452
453     struct Buffer {
454         Buffer() : bytes(0), items(0) {}
455         size_t bytes;
456         size_t items;
457         Mutex bufMutex;
458         std::queue<DcpResponse*> messages;
459     } buffer;
460 };
461
462 typedef RCPtr<PassiveStream> passive_stream_t;
463
464 #endif  // SRC_DCP_STREAM_H_