MB-19278: Fix lock-order inversion on ActiveStream::streamMutex
[ep-engine.git] / src / dcp-stream.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_DCP_STREAM_H_
19 #define SRC_DCP_STREAM_H_ 1
20
21 #include "config.h"
22
23 #include "ep_engine.h"
24
25 #include <queue>
26
27 class EventuallyPersistentEngine;
28 class MutationResponse;
29 class SetVBucketState;
30 class SnapshotMarker;
31 class DcpResponse;
32
33 class DcpConsumer;
34 typedef SingleThreadedRCPtr<DcpConsumer> dcp_consumer_t;
35
36 class DcpProducer;
37 typedef SingleThreadedRCPtr<DcpProducer> dcp_producer_t;
38
39 typedef enum {
40     STREAM_PENDING,
41     STREAM_BACKFILLING,
42     STREAM_IN_MEMORY,
43     STREAM_TAKEOVER_SEND,
44     STREAM_TAKEOVER_WAIT,
45     STREAM_READING,
46     STREAM_DEAD
47 } stream_state_t;
48
49 typedef enum {
50     //! The stream ended due to all items being streamed
51     END_STREAM_OK,
52     //! The stream closed early due to a close stream message
53     END_STREAM_CLOSED,
54     //! The stream closed early because the vbucket state changed
55     END_STREAM_STATE,
56     //! The stream closed early because the connection was disconnected
57     END_STREAM_DISCONNECTED
58 } end_stream_status_t;
59
60 typedef enum {
61     STREAM_ACTIVE,
62     STREAM_NOTIFIER,
63     STREAM_PASSIVE
64 } stream_type_t;
65
66 typedef enum {
67     none,
68     disk,
69     memory
70 } snapshot_type_t;
71
72 typedef enum {
73     all_processed,
74     more_to_process,
75     cannot_process
76 } process_items_error_t;
77
78 class Stream : public RCValue {
79 public:
80     Stream(const std::string &name, uint32_t flags, uint32_t opaque,
81            uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
82            uint64_t vb_uuid, uint64_t snap_start_seqno,
83            uint64_t snap_end_seqno);
84
85     virtual ~Stream() {}
86
87     uint32_t getFlags() { return flags_; }
88
89     uint16_t getVBucket() { return vb_; }
90
91     uint32_t getOpaque() { return opaque_; }
92
93     uint64_t getStartSeqno() { return start_seqno_; }
94
95     uint64_t getEndSeqno() { return end_seqno_; }
96
97     uint64_t getVBucketUUID() { return vb_uuid_; }
98
99     uint64_t getSnapStartSeqno() { return snap_start_seqno_; }
100
101     uint64_t getSnapEndSeqno() { return snap_end_seqno_; }
102
103     stream_state_t getState() { return state_; }
104
105     stream_type_t getType() { return type_; }
106
107     virtual void addStats(ADD_STAT add_stat, const void *c);
108
109     virtual DcpResponse* next() = 0;
110
111     virtual uint32_t setDead(end_stream_status_t status) = 0;
112
113     virtual void notifySeqnoAvailable(uint64_t seqno) {}
114
115     bool isActive() {
116         return state_ != STREAM_DEAD;
117     }
118
119     void clear() {
120         LockHolder lh(streamMutex);
121         clear_UNLOCKED();
122     }
123
124 protected:
125
126     const char* stateName(stream_state_t st) const;
127
128     void clear_UNLOCKED();
129
130     /* To be called after getting streamMutex lock */
131     void pushToReadyQ(DcpResponse* resp);
132
133     /* To be called after getting streamMutex lock */
134     void popFromReadyQ(void);
135
136     uint64_t getReadyQueueMemory(void);
137
138     const std::string &name_;
139     uint32_t flags_;
140     uint32_t opaque_;
141     uint16_t vb_;
142     uint64_t start_seqno_;
143     uint64_t end_seqno_;
144     uint64_t vb_uuid_;
145     uint64_t snap_start_seqno_;
146     uint64_t snap_end_seqno_;
147     AtomicValue<stream_state_t> state_;
148     stream_type_t type_;
149
150     AtomicValue<bool> itemsReady;
151     Mutex streamMutex;
152     std::queue<DcpResponse*> readyQ;
153
154     const static uint64_t dcpMaxSeqno;
155
156 private:
157     /* readyQueueMemory tracks the memory occupied by elements
158      * in the readyQ.  It is an atomic because otherwise
159        getReadyQueueMemory would need to acquire streamMutex.
160      */
161     AtomicValue <uint64_t> readyQueueMemory;
162 };
163
164 typedef RCPtr<Stream> stream_t;
165
166 class ActiveStreamCheckpointProcessorTask;
167
168 class ActiveStream : public Stream {
169 public:
170     ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
171                  const std::string &name, uint32_t flags, uint32_t opaque,
172                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
173                  uint64_t vb_uuid, uint64_t snap_start_seqno,
174                  uint64_t snap_end_seqno);
175
176     ~ActiveStream() {
177         transitionState(STREAM_DEAD);
178         clear_UNLOCKED();
179     }
180
181     DcpResponse* next();
182
183     void setActive() {
184         LockHolder lh(streamMutex);
185         if (state_ == STREAM_PENDING) {
186             transitionState(STREAM_BACKFILLING);
187         }
188     }
189
190     uint32_t setDead(end_stream_status_t status);
191
192     void notifySeqnoAvailable(uint64_t seqno);
193
194     void snapshotMarkerAckReceived();
195
196     void setVBucketStateAckRecieved();
197
198     void incrBackfillRemaining(size_t by) {
199         backfillRemaining.fetch_add(by);
200     }
201
202     void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
203
204     void backfillReceived(Item* itm);
205
206     void completeBackfill();
207
208     void addStats(ADD_STAT add_stat, const void *c);
209
210     void addTakeoverStats(ADD_STAT add_stat, const void *c);
211
212     size_t getItemsRemaining();
213
214     const char* logHeader();
215
216     // Runs on ActiveStreamCheckpointProcessorTask
217     void nextCheckpointItemTask();
218
219 protected:
220     // Returns the outstanding items for the stream's checkpoint cursor.
221     void getOutstandingItems(RCPtr<VBucket> &vb, std::deque<queued_item> &items);
222
223     // Given a set of queued items, create mutation responses for each item,
224     // and pass onto the producer associated with this stream.
225     void processItems(std::deque<queued_item>& items);
226
227     bool nextCheckpointItem();
228
229 private:
230
231     void transitionState(stream_state_t newState);
232
233     DcpResponse* backfillPhase();
234
235     DcpResponse* inMemoryPhase();
236
237     DcpResponse* takeoverSendPhase();
238
239     DcpResponse* takeoverWaitPhase();
240
241     DcpResponse* deadPhase();
242
243     DcpResponse* nextQueuedItem();
244
245     void snapshot(std::deque<MutationResponse*>& snapshot, bool mark);
246
247     void endStream(end_stream_status_t reason);
248
249     void scheduleBackfill();
250
251     const char* getEndStreamStatusStr(end_stream_status_t status);
252
253     bool isCurrentSnapshotCompleted() const;
254
255     //! The last sequence number queued from disk or memory
256     AtomicValue<uint64_t> lastReadSeqno;
257     //! The last sequence number sent to the network layer
258     AtomicValue<uint64_t> lastSentSeqno;
259     //! The last known seqno pointed to by the checkpoint cursor
260     uint64_t curChkSeqno;
261     //! The current vbucket state to send in the takeover stream
262     vbucket_state_t takeoverState;
263     /* backfillRemaining is a stat recording the amount of
264      * items remaining to be read from disk.  It is an atomic
265      * because otherwise the function incrBackfillRemaining
266      * must acquire the streamMutex lock.
267      */
268     AtomicValue <size_t> backfillRemaining;
269
270     //! The amount of items that have been read from disk
271     size_t itemsFromBackfill;
272     //! The amount of items that have been read from memory
273     AtomicValue<size_t> itemsFromMemory;
274     //! Whether ot not this is the first snapshot marker sent
275     bool firstMarkerSent;
276
277     AtomicValue<int> waitForSnapshot;
278
279     EventuallyPersistentEngine* engine;
280     dcp_producer_t producer;
281     AtomicValue<bool> isBackfillTaskRunning;
282
283     //! Last snapshot end seqno sent to the DCP client
284     uint64_t lastSentSnapEndSeqno;
285
286     /* Flag used by checkpointCreatorTask that is set before all items are
287        extracted for given checkpoint cursor, and is unset after all retrieved
288        items are added to the readyQ */
289     AtomicValue<bool> chkptItemsExtractionInProgress;
290
291 };
292
293
294 class ActiveStreamCheckpointProcessorTask : public GlobalTask {
295 public:
296     ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
297       : GlobalTask(&e, Priority::ActiveStreamCheckpointProcessor, INT_MAX, false),
298       notified(false),
299       iterationsBeforeYield(e.getConfiguration()
300                             .getDcpProducerSnapshotMarkerYieldLimit()) { }
301
302     std::string getDescription() {
303         std::string rv("Process checkpoint(s) for DCP producer");
304         return rv;
305     }
306
307     bool run();
308     void schedule(stream_t stream);
309     void wakeup();
310     void clearQueues();
311
312 private:
313
314     stream_t queuePop() {
315         stream_t rval;
316         LockHolder lh(workQueueLock);
317         if (!queue.empty()) {
318             rval = queue.front();
319             queue.pop();
320             queuedVbuckets.erase(rval->getVBucket());
321         }
322         return rval;
323     }
324
325     bool queueEmpty() {
326         LockHolder lh(workQueueLock);
327         return queue.empty();
328     }
329
330     void pushUnique(stream_t stream) {
331         LockHolder lh(workQueueLock);
332         if (queuedVbuckets.count(stream->getVBucket()) == 0) {
333             queue.push(stream);
334             queuedVbuckets.insert(stream->getVBucket());
335         }
336     }
337
338     Mutex workQueueLock;
339
340     /**
341      * Maintain a queue of unique stream_t
342      * There's no need to have the same stream in the queue more than once
343      */
344     std::queue<stream_t> queue;
345     std::set<uint16_t> queuedVbuckets;
346
347     AtomicValue<bool> notified;
348     size_t iterationsBeforeYield;
349 };
350
351 class NotifierStream : public Stream {
352 public:
353     NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t producer,
354                    const std::string &name, uint32_t flags, uint32_t opaque,
355                    uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
356                    uint64_t vb_uuid, uint64_t snap_start_seqno,
357                    uint64_t snap_end_seqno);
358
359     ~NotifierStream() {
360         LockHolder lh(streamMutex);
361         transitionState(STREAM_DEAD);
362         clear_UNLOCKED();
363     }
364
365     DcpResponse* next();
366
367     uint32_t setDead(end_stream_status_t status);
368
369     void notifySeqnoAvailable(uint64_t seqno);
370
371 private:
372
373     void transitionState(stream_state_t newState);
374
375     dcp_producer_t producer;
376 };
377
378 class PassiveStream : public Stream {
379 public:
380     PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t consumer,
381                   const std::string &name, uint32_t flags, uint32_t opaque,
382                   uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
383                   uint64_t vb_uuid, uint64_t snap_start_seqno,
384                   uint64_t snap_end_seqno, uint64_t vb_high_seqno);
385
386     ~PassiveStream();
387
388     process_items_error_t processBufferedMessages(uint32_t &processed_bytes);
389
390     DcpResponse* next();
391
392     uint32_t setDead(end_stream_status_t status);
393
394     void acceptStream(uint16_t status, uint32_t add_opaque);
395
396     void reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque,
397                          uint64_t start_seqno);
398
399     ENGINE_ERROR_CODE messageReceived(DcpResponse* response);
400
401     void addStats(ADD_STAT add_stat, const void *c);
402
403     static const size_t batchSize;
404
405 private:
406
407     ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
408
409     ENGINE_ERROR_CODE commitMutation(MutationResponse* mutation,
410                                      bool backfillPhase);
411
412     ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
413
414     ENGINE_ERROR_CODE commitDeletion(MutationResponse* deletion,
415                                      bool backfillPhase);
416
417     void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
418
419     void processMarker(SnapshotMarker* marker);
420
421     void processSetVBucketState(SetVBucketState* state);
422
423     void transitionState(stream_state_t newState);
424
425     uint32_t clearBuffer();
426
427     const char* getEndStreamStatusStr(end_stream_status_t status);
428
429     EventuallyPersistentEngine* engine;
430     dcp_consumer_t consumer;
431     uint64_t last_seqno;
432
433     uint64_t cur_snapshot_start;
434     uint64_t cur_snapshot_end;
435     snapshot_type_t cur_snapshot_type;
436     bool cur_snapshot_ack;
437     bool saveSnapshot;
438
439     struct Buffer {
440         Buffer() : bytes(0), items(0) {}
441         size_t bytes;
442         size_t items;
443         Mutex bufMutex;
444         std::queue<DcpResponse*> messages;
445     } buffer;
446 };
447
448 typedef RCPtr<PassiveStream> passive_stream_t;
449
450 #endif  // SRC_DCP_STREAM_H_