MB-19253: Fix race in void ExecutorPool::doWorkerStat
[ep-engine.git] / src / dcp-stream.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_DCP_STREAM_H_
19 #define SRC_DCP_STREAM_H_ 1
20
21 #include "config.h"
22
23 #include "ep_engine.h"
24
25 #include <queue>
26
27 class EventuallyPersistentEngine;
28 class MutationResponse;
29 class SetVBucketState;
30 class SnapshotMarker;
31 class DcpResponse;
32
33 class DcpConsumer;
34 typedef SingleThreadedRCPtr<DcpConsumer> dcp_consumer_t;
35
36 class DcpProducer;
37 typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
38
39 typedef enum {
40     STREAM_PENDING,
41     STREAM_BACKFILLING,
42     STREAM_IN_MEMORY,
43     STREAM_TAKEOVER_SEND,
44     STREAM_TAKEOVER_WAIT,
45     STREAM_READING,
46     STREAM_DEAD
47 } stream_state_t;
48
49 typedef enum {
50     //! The stream ended due to all items being streamed
51     END_STREAM_OK,
52     //! The stream closed early due to a close stream message
53     END_STREAM_CLOSED,
54     //! The stream closed early because the vbucket state changed
55     END_STREAM_STATE,
56     //! The stream closed early because the connection was disconnected
57     END_STREAM_DISCONNECTED
58 } end_stream_status_t;
59
60 typedef enum {
61     STREAM_ACTIVE,
62     STREAM_NOTIFIER,
63     STREAM_PASSIVE
64 } stream_type_t;
65
66 typedef enum {
67     none,
68     disk,
69     memory
70 } snapshot_type_t;
71
72 typedef enum {
73     all_processed,
74     more_to_process,
75     cannot_process
76 } process_items_error_t;
77
78 class Stream : public RCValue {
79 public:
80     Stream(const std::string &name, uint32_t flags, uint32_t opaque,
81            uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
82            uint64_t vb_uuid, uint64_t snap_start_seqno,
83            uint64_t snap_end_seqno);
84
85     virtual ~Stream() {}
86
87     uint32_t getFlags() { return flags_; }
88
89     uint16_t getVBucket() { return vb_; }
90
91     uint32_t getOpaque() { return opaque_; }
92
93     uint64_t getStartSeqno() { return start_seqno_; }
94
95     uint64_t getEndSeqno() { return end_seqno_; }
96
97     uint64_t getVBucketUUID() { return vb_uuid_; }
98
99     uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
100
101     uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
102
103     stream_state_t getState() { return state_; }
104
105     stream_type_t getType() { return type_; }
106
107     virtual void addStats(ADD_STAT add_stat, const void *c);
108
109     virtual DcpResponse* next() = 0;
110
111     virtual uint32_t setDead(end_stream_status_t status) = 0;
112
113     virtual void notifySeqnoAvailable(uint64_t seqno) {}
114
115     bool isActive() {
116         return state_ != STREAM_DEAD;
117     }
118
119     void clear() {
120         LockHolder lh(streamMutex);
121         clear_UNLOCKED();
122     }
123
124 protected:
125
126     const char* stateName(stream_state_t st) const;
127
128     void clear_UNLOCKED();
129
130     /* To be called after getting streamMutex lock */
131     void pushToReadyQ(DcpResponse* resp);
132
133     /* To be called after getting streamMutex lock */
134     void popFromReadyQ(void);
135
136     uint64_t getReadyQueueMemory(void);
137
138     const std::string &name_;
139     uint32_t flags_;
140     uint32_t opaque_;
141     uint16_t vb_;
142     uint64_t start_seqno_;
143     uint64_t end_seqno_;
144     uint64_t vb_uuid_;
145     uint64_t snap_start_seqno_;
146     uint64_t snap_end_seqno_;
147     stream_state_t state_;
148     stream_type_t type_;
149
150     AtomicValue<bool> itemsReady;
151     Mutex streamMutex;
152     std::queue<DcpResponse*> readyQ;
153
154     const static uint64_t dcpMaxSeqno;
155
156 private:
157     /* readyQueueMemory tracks the memory occupied by elements
158      * in the readyQ.  It is an atomic because otherwise
159        getReadyQueueMemory would need to acquire streamMutex.
160      */
161     AtomicValue <uint64_t> readyQueueMemory;
162 };
163
164 typedef RCPtr<Stream> stream_t;
165
166 class ActiveStreamCheckpointProcessorTask;
167
168 class ActiveStream : public Stream {
169 public:
170     ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
171                  const std::string &name, uint32_t flags, uint32_t opaque,
172                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
173                  uint64_t vb_uuid, uint64_t snap_start_seqno,
174                  uint64_t snap_end_seqno);
175
176     ~ActiveStream() {
177         LockHolder lh(streamMutex);
178         transitionState(STREAM_DEAD);
179         clear_UNLOCKED();
180     }
181
182     DcpResponse* next();
183
184     void setActive() {
185         LockHolder lh(streamMutex);
186         if (state_ == STREAM_PENDING) {
187             transitionState(STREAM_BACKFILLING);
188         }
189     }
190
191     uint32_t setDead(end_stream_status_t status);
192
193     void notifySeqnoAvailable(uint64_t seqno);
194
195     void snapshotMarkerAckReceived();
196
197     void setVBucketStateAckRecieved();
198
199     void incrBackfillRemaining(size_t by) {
200         backfillRemaining.fetch_add(by);
201     }
202
203     void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
204
205     void backfillReceived(Item* itm);
206
207     void completeBackfill();
208
209     void addStats(ADD_STAT add_stat, const void *c);
210
211     void addTakeoverStats(ADD_STAT add_stat, const void *c);
212
213     size_t getItemsRemaining();
214
215     const char* logHeader();
216
217     // Runs on ActiveStreamCheckpointProcessorTask
218     void nextCheckpointItemTask();
219
220 protected:
221     // Returns the outstanding items for the stream's checkpoint cursor.
222     void getOutstandingItems(RCPtr<VBucket> &vb, std::deque<queued_item> &items);
223
224     // Given a set of queued items, create mutation responses for each item,
225     // and pass onto the producer associated with this stream.
226     void processItems(std::deque<queued_item>& items);
227
228     bool nextCheckpointItem();
229
230 private:
231
232     void transitionState(stream_state_t newState);
233
234     DcpResponse* backfillPhase();
235
236     DcpResponse* inMemoryPhase();
237
238     DcpResponse* takeoverSendPhase();
239
240     DcpResponse* takeoverWaitPhase();
241
242     DcpResponse* deadPhase();
243
244     DcpResponse* nextQueuedItem();
245
246     void snapshot(std::deque<MutationResponse*>& snapshot, bool mark);
247
248     void endStream(end_stream_status_t reason);
249
250     void scheduleBackfill();
251
252     const char* getEndStreamStatusStr(end_stream_status_t status);
253
254     bool isCurrentSnapshotCompleted() const;
255
256     //! The last sequence number queued from disk or memory
257     AtomicValue<uint64_t> lastReadSeqno;
258     //! The last sequence number sent to the network layer
259     AtomicValue<uint64_t> lastSentSeqno;
260     //! The last known seqno pointed to by the checkpoint cursor
261     uint64_t curChkSeqno;
262     //! The current vbucket state to send in the takeover stream
263     vbucket_state_t takeoverState;
264     /* backfillRemaining is a stat recording the amount of
265      * items remaining to be read from disk.  It is an atomic
266      * because otherwise the function incrBackfillRemaining
267      * must acquire the streamMutex lock.
268      */
269     AtomicValue <size_t> backfillRemaining;
270
271     //! The amount of items that have been read from disk
272     size_t itemsFromBackfill;
273     //! The amount of items that have been read from memory
274     AtomicValue<size_t> itemsFromMemory;
275     //! Whether ot not this is the first snapshot marker sent
276     bool firstMarkerSent;
277
278     AtomicValue<int> waitForSnapshot;
279
280     EventuallyPersistentEngine* engine;
281     dcp_producer_t producer;
282     AtomicValue<bool> isBackfillTaskRunning;
283
284     //! Last snapshot end seqno sent to the DCP client
285     uint64_t lastSentSnapEndSeqno;
286
287     /* Flag used by checkpointCreatorTask that is set before all items are
288        extracted for given checkpoint cursor, and is unset after all retrieved
289        items are added to the readyQ */
290     AtomicValue<bool> chkptItemsExtractionInProgress;
291
292 };
293
294
295 class ActiveStreamCheckpointProcessorTask : public GlobalTask {
296 public:
297     ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
298       : GlobalTask(&e, Priority::ActiveStreamCheckpointProcessor, INT_MAX, false),
299       notified(false),
300       iterationsBeforeYield(e.getConfiguration()
301                             .getDcpProducerSnapshotMarkerYieldLimit()) { }
302
303     std::string getDescription() {
304         std::string rv("Process checkpoint(s) for DCP producer");
305         return rv;
306     }
307
308     bool run();
309     void schedule(stream_t stream);
310     void wakeup();
311     void clearQueues();
312
313 private:
314
315     stream_t queuePop() {
316         stream_t rval;
317         LockHolder lh(workQueueLock);
318         if (!queue.empty()) {
319             rval = queue.front();
320             queue.pop();
321             queuedVbuckets.erase(rval->getVBucket());
322         }
323         return rval;
324     }
325
326     bool queueEmpty() {
327         LockHolder lh(workQueueLock);
328         return queue.empty();
329     }
330
331     void pushUnique(stream_t stream) {
332         LockHolder lh(workQueueLock);
333         if (queuedVbuckets.count(stream->getVBucket()) == 0) {
334             queue.push(stream);
335             queuedVbuckets.insert(stream->getVBucket());
336         }
337     }
338
339     Mutex workQueueLock;
340
341     /**
342      * Maintain a queue of unique stream_t
343      * There's no need to have the same stream in the queue more than once
344      */
345     std::queue<stream_t> queue;
346     std::set<uint16_t> queuedVbuckets;
347
348     AtomicValue<bool> notified;
349     size_t iterationsBeforeYield;
350 };
351
352 class NotifierStream : public Stream {
353 public:
354     NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
355                    const std::string &name, uint32_t flags, uint32_t opaque,
356                    uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
357                    uint64_t vb_uuid, uint64_t snap_start_seqno,
358                    uint64_t snap_end_seqno);
359
360     ~NotifierStream() {
361         LockHolder lh(streamMutex);
362         transitionState(STREAM_DEAD);
363         clear_UNLOCKED();
364     }
365
366     DcpResponse* next();
367
368     uint32_t setDead(end_stream_status_t status);
369
370     void notifySeqnoAvailable(uint64_t seqno);
371
372 private:
373
374     void transitionState(stream_state_t newState);
375
376     dcp_producer_t producer;
377 };
378
379 class PassiveStream : public Stream {
380 public:
381     PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
382                   const std::string &name, uint32_t flags, uint32_t opaque,
383                   uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
384                   uint64_t vb_uuid, uint64_t snap_start_seqno,
385                   uint64_t snap_end_seqno, uint64_t vb_high_seqno);
386
387     ~PassiveStream();
388
389     process_items_error_t processBufferedMessages(uint32_t &processed_bytes);
390
391     DcpResponse* next();
392
393     uint32_t setDead(end_stream_status_t status);
394
395     void acceptStream(uint16_t status, uint32_t add_opaque);
396
397     void reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque,
398                          uint64_t start_seqno);
399
400     ENGINE_ERROR_CODE messageReceived(DcpResponse* response);
401
402     void addStats(ADD_STAT add_stat, const void *c);
403
404     static const size_t batchSize;
405
406 private:
407
408     ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
409
410     ENGINE_ERROR_CODE commitMutation(MutationResponse* mutation,
411                                      bool backfillPhase);
412
413     ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
414
415     ENGINE_ERROR_CODE commitDeletion(MutationResponse* deletion,
416                                      bool backfillPhase);
417
418     void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
419
420     void processMarker(SnapshotMarker* marker);
421
422     void processSetVBucketState(SetVBucketState* state);
423
424     void transitionState(stream_state_t newState);
425
426     uint32_t clearBuffer();
427
428     const char* getEndStreamStatusStr(end_stream_status_t status);
429
430     EventuallyPersistentEngine* engine;
431     dcp_consumer_t consumer;
432     uint64_t last_seqno;
433
434     uint64_t cur_snapshot_start;
435     uint64_t cur_snapshot_end;
436     snapshot_type_t cur_snapshot_type;
437     bool cur_snapshot_ack;
438     bool saveSnapshot;
439
440     struct Buffer {
441         Buffer() : bytes(0), items(0) {}
442         size_t bytes;
443         size_t items;
444         Mutex bufMutex;
445         std::queue<DcpResponse*> messages;
446     } buffer;
447 };
448
449 typedef RCPtr<PassiveStream> passive_stream_t;
450
451 #endif  // SRC_DCP_STREAM_H_