9c543349a12abd528d754788a443124e0dcc725b
[ep-engine.git] / src / dcp-consumer.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_DCP_CONSUMER_H_
19 #define SRC_DCP_CONSUMER_H_ 1
20
21 #include "config.h"
22
23 #include "tapconnection.h"
24 #include "dcp-stream.h"
25 typedef RCPtr<PassiveStream> passive_stream_t;
26
27 class DcpResponse;
28
29 class DcpConsumer : public Consumer, public Notifiable {
30 typedef std::map<uint32_t, std::pair<uint32_t, uint16_t> > opaque_map;
31 public:
32
33     DcpConsumer(EventuallyPersistentEngine &e, const void *cookie,
34                 const std::string &n);
35
36     ~DcpConsumer();
37
38     ENGINE_ERROR_CODE addStream(uint32_t opaque, uint16_t vbucket,
39                                 uint32_t flags);
40
41     ENGINE_ERROR_CODE closeStream(uint32_t opaque, uint16_t vbucket);
42
43     ENGINE_ERROR_CODE streamEnd(uint32_t opaque, uint16_t vbucket,
44                                 uint32_t flags);
45
46     ENGINE_ERROR_CODE mutation(uint32_t opaque, const void* key, uint16_t nkey,
47                                const void* value, uint32_t nvalue, uint64_t cas,
48                                uint16_t vbucket, uint32_t flags,
49                                uint8_t datatype, uint32_t locktime,
50                                uint64_t bySeqno, uint64_t revSeqno,
51                                uint32_t exptime, uint8_t nru, const void* meta,
52                                uint16_t nmeta);
53
54     ENGINE_ERROR_CODE deletion(uint32_t opaque, const void* key, uint16_t nkey,
55                                uint64_t cas, uint16_t vbucket, uint64_t bySeqno,
56                                uint64_t revSeqno, const void* meta,
57                                uint16_t nmeta);
58
59     ENGINE_ERROR_CODE expiration(uint32_t opaque, const void* key,
60                                  uint16_t nkey, uint64_t cas, uint16_t vbucket,
61                                  uint64_t bySeqno, uint64_t revSeqno,
62                                  const void* meta, uint16_t nmeta);
63
64     ENGINE_ERROR_CODE snapshotMarker(uint32_t opaque,
65                                      uint16_t vbucket,
66                                      uint64_t start_seqno,
67                                      uint64_t end_seqno,
68                                      uint32_t flags);
69
70     ENGINE_ERROR_CODE noop(uint32_t opaque);
71
72     ENGINE_ERROR_CODE flush(uint32_t opaque, uint16_t vbucket);
73
74     ENGINE_ERROR_CODE setVBucketState(uint32_t opaque, uint16_t vbucket,
75                                       vbucket_state_t state);
76
77     ENGINE_ERROR_CODE step(struct dcp_message_producers* producers);
78
79     ENGINE_ERROR_CODE handleResponse(protocol_binary_response_header *resp);
80
81     bool doRollback(uint32_t opaque, uint16_t vbid, uint64_t rollbackSeqno);
82
83     void addStats(ADD_STAT add_stat, const void *c);
84
85     void aggregateQueueStats(ConnCounter* aggregator);
86
87     void notifyStreamReady(uint16_t vbucket);
88
89     void closeAllStreams();
90
91     process_items_error_t processBufferedItems();
92
93     bool isStreamPresent(uint16_t vbucket);
94
95 private:
96
97     DcpResponse* getNextItem();
98
99     /**
100      * Check if the provided opaque id is one of the
101      * current open "session" id's
102      *
103      * @param opaque the provided opaque
104      * @param vbucket the provided vbucket
105      * @return true if the session is open, false otherwise
106      */
107     bool isValidOpaque(uint32_t opaque, uint16_t vbucket);
108
109     void streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body,
110                         uint32_t bodylen);
111
112     ENGINE_ERROR_CODE handleNoop(struct dcp_message_producers* producers);
113
114     ENGINE_ERROR_CODE handleFlowCtl(struct dcp_message_producers* producers);
115
116     inline bool isBufferSufficientlyDrained(uint32_t ackable_bytes);
117
118     uint64_t opaqueCounter;
119     size_t processTaskId;
120     AtomicValue<bool> itemsToProcess;
121
122     Mutex readyMutex;
123     std::list<uint16_t> ready;
124
125     passive_stream_t* streams;
126     opaque_map opaqueMap_;
127
128     rel_time_t lastNoopTime;
129     uint32_t backoffs;
130     uint32_t noopInterval;
131     bool enableNoop;
132     bool sendNoopInterval;
133
134     struct FlowControl {
135         FlowControl() : enabled(true), pendingControl(true), bufferSize(0),
136                         maxUnackedBytes(0), lastBufferAck(ep_current_time()),
137                         freedBytes(0), ackedBytes(0) {}
138         bool enabled;
139         bool pendingControl;
140         uint32_t bufferSize;
141         uint32_t maxUnackedBytes;
142         rel_time_t lastBufferAck;
143         AtomicValue<uint32_t> freedBytes;
144         AtomicValue<uint64_t> ackedBytes;
145     } flowControl;
146 };
147
148 /*
149  * Task that orchestrates rollback on Consumer,
150  * runs in background.
151  */
152 class RollbackTask : public GlobalTask {
153 public:
154     RollbackTask(EventuallyPersistentEngine* e,
155                  uint32_t opaque_, uint16_t vbid_,
156                  uint64_t rollbackSeqno_, dcp_consumer_t conn,
157                  const Priority &p):
158         GlobalTask(e, p, 0, false), engine(e),
159         opaque(opaque_), vbid(vbid_), rollbackSeqno(rollbackSeqno_),
160         cons(conn) { }
161
162     std::string getDescription() {
163         return std::string("Running rollback task for vbucket %d", vbid);
164     }
165
166     bool run();
167
168 private:
169     EventuallyPersistentEngine *engine;
170     uint32_t opaque;
171     uint16_t vbid;
172     uint64_t rollbackSeqno;
173     dcp_consumer_t cons;
174 };
175
176 #endif  // SRC_DCP_CONSUMER_H_