1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef SRC_DCP_CONSUMER_H_
19 #define SRC_DCP_CONSUMER_H_ 1
23 #include "tapconnection.h"
24 #include "dcp-stream.h"
25 typedef RCPtr<PassiveStream> passive_stream_t;
29 class DcpConsumer : public Consumer, public Notifiable {
30 typedef std::map<uint32_t, std::pair<uint32_t, uint16_t> > opaque_map;
33 DcpConsumer(EventuallyPersistentEngine &e, const void *cookie,
34 const std::string &n);
38 ENGINE_ERROR_CODE addStream(uint32_t opaque, uint16_t vbucket,
41 ENGINE_ERROR_CODE closeStream(uint32_t opaque, uint16_t vbucket);
43 ENGINE_ERROR_CODE streamEnd(uint32_t opaque, uint16_t vbucket,
46 ENGINE_ERROR_CODE mutation(uint32_t opaque, const void* key, uint16_t nkey,
47 const void* value, uint32_t nvalue, uint64_t cas,
48 uint16_t vbucket, uint32_t flags,
49 uint8_t datatype, uint32_t locktime,
50 uint64_t bySeqno, uint64_t revSeqno,
51 uint32_t exptime, uint8_t nru, const void* meta,
54 ENGINE_ERROR_CODE deletion(uint32_t opaque, const void* key, uint16_t nkey,
55 uint64_t cas, uint16_t vbucket, uint64_t bySeqno,
56 uint64_t revSeqno, const void* meta,
59 ENGINE_ERROR_CODE expiration(uint32_t opaque, const void* key,
60 uint16_t nkey, uint64_t cas, uint16_t vbucket,
61 uint64_t bySeqno, uint64_t revSeqno,
62 const void* meta, uint16_t nmeta);
64 ENGINE_ERROR_CODE snapshotMarker(uint32_t opaque,
70 ENGINE_ERROR_CODE noop(uint32_t opaque);
72 ENGINE_ERROR_CODE flush(uint32_t opaque, uint16_t vbucket);
74 ENGINE_ERROR_CODE setVBucketState(uint32_t opaque, uint16_t vbucket,
75 vbucket_state_t state);
77 ENGINE_ERROR_CODE step(struct dcp_message_producers* producers);
79 ENGINE_ERROR_CODE handleResponse(protocol_binary_response_header *resp);
81 bool doRollback(uint32_t opaque, uint16_t vbid, uint64_t rollbackSeqno);
83 void addStats(ADD_STAT add_stat, const void *c);
85 void aggregateQueueStats(ConnCounter* aggregator);
87 void notifyStreamReady(uint16_t vbucket);
89 void closeAllStreams();
91 process_items_error_t processBufferedItems();
93 bool isStreamPresent(uint16_t vbucket);
97 DcpResponse* getNextItem();
100 * Check if the provided opaque id is one of the
101 * current open "session" id's
103 * @param opaque the provided opaque
104 * @param vbucket the provided vbucket
105 * @return true if the session is open, false otherwise
107 bool isValidOpaque(uint32_t opaque, uint16_t vbucket);
109 void streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body,
112 ENGINE_ERROR_CODE handleNoop(struct dcp_message_producers* producers);
114 ENGINE_ERROR_CODE handleFlowCtl(struct dcp_message_producers* producers);
116 inline bool isBufferSufficientlyDrained(uint32_t ackable_bytes);
118 uint64_t opaqueCounter;
119 size_t processTaskId;
120 AtomicValue<bool> itemsToProcess;
123 std::list<uint16_t> ready;
125 passive_stream_t* streams;
126 opaque_map opaqueMap_;
128 rel_time_t lastNoopTime;
130 uint32_t noopInterval;
132 bool sendNoopInterval;
135 FlowControl() : enabled(true), pendingControl(true), bufferSize(0),
136 maxUnackedBytes(0), lastBufferAck(ep_current_time()),
137 freedBytes(0), ackedBytes(0) {}
141 uint32_t maxUnackedBytes;
142 rel_time_t lastBufferAck;
143 AtomicValue<uint32_t> freedBytes;
144 AtomicValue<uint64_t> ackedBytes;
149 * Task that orchestrates rollback on Consumer,
150 * runs in background.
152 class RollbackTask : public GlobalTask {
154 RollbackTask(EventuallyPersistentEngine* e,
155 uint32_t opaque_, uint16_t vbid_,
156 uint64_t rollbackSeqno_, dcp_consumer_t conn,
158 GlobalTask(e, p, 0, false), engine(e),
159 opaque(opaque_), vbid(vbid_), rollbackSeqno(rollbackSeqno_),
162 std::string getDescription() {
163 return std::string("Running rollback task for vbucket %d", vbid);
169 EventuallyPersistentEngine *engine;
172 uint64_t rollbackSeqno;
176 #endif // SRC_DCP_CONSUMER_H_