1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2010 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef SRC_TAPCONNMAP_H_
19 #define SRC_TAPCONNMAP_H_ 1
32 #include "syncobject.h"
33 #include "atomicqueue.h"
34 #include "dcp-consumer.h"
35 #include "dcp-producer.h"
37 // Forward declaration
42 class EventuallyPersistentEngine;
44 typedef SingleThreadedRCPtr<ConnHandler> connection_t;
46 * Base class for operations performed on tap connections.
48 * @see TapConnMap::performTapOp
53 virtual ~TapOperation() {}
54 virtual void perform(TapProducer *tc, V arg) = 0;
58 * Indicate the tap operation is complete.
60 class CompleteBackfillTapOperation : public TapOperation<void*> {
62 void perform(TapProducer *tc, void* arg);
66 * Indicate that we are going to schedule a tap disk backfill for a given vbucket.
68 class ScheduleDiskBackfillTapOperation : public TapOperation<void*> {
70 void perform(TapProducer *tc, void* arg);
74 * Indicate the tap backfill disk stream thing is complete for a given vbucket.
76 class CompleteDiskBackfillTapOperation : public TapOperation<void*> {
78 void perform(TapProducer *tc, void* arg);
82 * Complete a bg fetch job and give the item to the given tap connection.
84 class CompletedBGFetchTapOperation : public TapOperation<Item*> {
86 CompletedBGFetchTapOperation(hrtime_t token, uint16_t vb, bool ie=false) :
87 connToken(token), vbid(vb), implicitEnqueue(ie) {}
89 void perform(TapProducer *tc, Item* arg);
96 class TAPSessionStats {
98 TAPSessionStats() : normalShutdown(true) {}
100 bool wasReplicationCompleted(const std::string &name) const;
102 void clearStats(const std::string &name);
105 std::map<std::string, std::string> stats;
110 * Connection notifier type.
113 TAP_CONN_NOTIFIER, //!< TAP connection notifier
114 DCP_CONN_NOTIFIER //!< DCP connection notifier
115 } conn_notifier_type;
118 * A collection of tap or dcp connections.
122 ConnMap(EventuallyPersistentEngine &theEngine);
125 void initialize(conn_notifier_type ntype);
127 Consumer *newConsumer(const void* c);
130 * Disconnect a connection by its cookie.
132 virtual void disconnect(const void *cookie) = 0;
135 * Call a function on each connection.
137 template <typename Fun>
139 LockHolder lh(connsLock);
144 * Call a function on each connection, but without the connsLock held
145 * Read Copy Update... the all list is copied under connsLock and then
146 * the function is applied against the copy without the lock held.
148 template <typename Fun>
149 void eachRCU(Fun f) const {
150 std::list<connection_t> currentConnections;
152 LockHolder lh(connsLock);
153 currentConnections = all;
155 std::for_each(currentConnections.begin(), currentConnections.end(), f);
159 * Call a function on each connection *without* a lock.
161 template <typename Fun>
162 void each_UNLOCKED(Fun f) {
163 std::for_each(all.begin(), all.end(), f);
167 * Return the number of connections for which this predicate is true.
169 template <typename Fun>
170 size_t count_if(Fun f) {
171 LockHolder lh(connsLock);
172 return count_if_UNLOCKED(f);
176 * Return the number of connections for which this predicate is
177 * true *without* a lock.
179 template <typename Fun>
180 size_t count_if_UNLOCKED(Fun f) {
181 return static_cast<size_t>(std::count_if(all.begin(), all.end(), f));
185 * Purge dead connections or identify paused connections that should send
186 * NOOP messages to their destinations.
188 virtual void manageConnections() = 0;
190 connection_t findByName(const std::string &name);
192 virtual void shutdownAllConnections() = 0;
194 void updateVBConnections(connection_t &conn,
195 const std::vector<uint16_t> &vbuckets);
197 virtual void removeVBConnections(connection_t &conn);
199 void addVBConnByVBId(connection_t &conn, int16_t vbid);
201 void removeVBConnByVBId_UNLOCKED(connection_t &conn, int16_t vbid);
203 void removeVBConnByVBId(connection_t &conn, int16_t vbid);
206 * Notify a given paused Producer.
208 * @param tc Producer to be notified
209 * @param schedule true if a notification event is pushed into a queue.
210 * Otherwise, directly notify the paused connection.
212 void notifyPausedConnection(connection_t conn, bool schedule = false);
214 void notifyAllPausedConnections();
215 bool notificationQueueEmpty();
217 EventuallyPersistentEngine& getEngine() {
223 connection_t findByName_UNLOCKED(const std::string &name);
225 // Synchronises notifying and releasing connections.
226 // Guards modifications to connection_t objects in {map_} / {all}.
227 // See also: {connLock}
230 // Synchonises access to the {map_} and {all} members, i.e. adding
231 // removing connections.
232 // Actual modification of the underlying
233 // ConnHandler objects is guarded by {releaseLock}.
234 mutable Mutex connsLock;
236 std::map<const void*, connection_t> map_;
237 std::list<connection_t> all;
239 SpinLock *vbConnLocks;
240 std::vector<std::list<connection_t> > vbConns;
242 /* Handle to the engine who owns us */
243 EventuallyPersistentEngine &engine;
245 AtomicQueue<connection_t> pendingNotifications;
246 ConnNotifier *connNotifier_;
248 static size_t vbConnLockNum;
252 * Connection notifier that wakes up paused connections.
256 ConnNotifier(conn_notifier_type ntype, ConnMap &cm)
257 : notifier_type(ntype), connMap(cm), task(0),
258 pendingNotification(false) { }
266 void notifyMutationEvent();
268 bool notifyConnections();
270 conn_notifier_type getNotifierType() const {
271 return notifier_type;
275 static const double DEFAULT_MIN_STIME;
277 conn_notifier_type notifier_type;
279 AtomicValue<size_t> task;
280 AtomicValue<bool> pendingNotification;
283 class TapConnMap : public ConnMap {
287 TapConnMap(EventuallyPersistentEngine &theEngine);
290 * Find or build a tap connection for the given cookie and with
293 TapProducer *newProducer(const void* cookie,
294 const std::string &name,
296 uint64_t backfillAge,
298 const std::vector<uint16_t> &vbuckets,
299 const std::map<uint16_t, uint64_t> &lastCheckpointIds);
303 * Create a new consumer and add it in the list of TapConnections
304 * @param e the engine
305 * @param c the cookie representing the client
306 * @return Pointer to the nw tap connection
308 TapConsumer *newConsumer(const void* c);
310 void manageConnections();
313 * Notify the paused connections that are responsible for replicating
315 * @param vbid vbucket id
317 void notifyVBConnections(uint16_t vbid);
320 * Set some backfilled events for a named conn.
322 bool setEvents(const std::string &name, std::list<queued_item> *q);
324 void resetReplicaChain();
327 * Get the size of the named backfill queue.
329 * @return the size, or -1 if we can't find the queue
331 ssize_t backfillQueueDepth(const std::string &name);
333 void incrBackfillRemaining(const std::string &name,
334 size_t num_backfill_items);
336 void shutdownAllConnections();
338 void disconnect(const void *cookie);
340 void scheduleBackfill(const std::set<uint16_t> &backfillVBuckets);
342 bool isBackfillCompleted(std::string &name);
345 * Add an event to all tap connections telling them to flush their
348 void addFlushEvent();
351 * Change the vbucket filter for a given TAP producer
352 * @param name TAP producer name
353 * @param vbuckets a new vbucket filter
354 * @param checkpoints last closed checkpoint ids for a new vbucket filter
355 * @return true if the TAP producer's vbucket filter is changed successfully
357 bool changeVBucketFilter(const std::string &name,
358 const std::vector<uint16_t> &vbuckets,
359 const std::map<uint16_t, uint64_t> &checkpoints);
362 * Load TAP-related stats from the previous engine sessions
364 * @param session_stats all the stats from the previous engine sessions
366 void loadPrevSessionStats(const std::map<std::string, std::string> &session_stats);
369 * Check if the given TAP producer completed the replication before
372 * @param name TAP producer's name
373 * @return true if the replication from the given TAP producer was
374 * completed before shutdown or crash.
376 bool prevSessionReplicaCompleted(const std::string &name) {
377 return prevSessionStats.wasReplicationCompleted(name);
380 bool checkConnectivity(const std::string &name);
382 bool closeConnectionByName(const std::string &name);
384 bool mapped(connection_t &tc);
387 * Perform a TapOperation for a named tap connection while holding
390 * @param name the name of the tap connection to run the op
391 * @param tapop the operation to perform
392 * @param arg argument for the tap operation
394 * @return true if the tap connection was valid and the operation
397 template <typename V>
398 bool performOp(const std::string &name, TapOperation<V> &tapop, V arg) {
400 LockHolder lh(connsLock);
402 connection_t tc = findByName_UNLOCKED(name);
404 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
405 cb_assert(tp != NULL);
406 tapop.perform(tp, arg);
408 notifyPausedConnection(tp, false);
416 size_t getNoopInterval() const {
417 return noopInterval_;
420 void setNoopInterval(size_t value) {
421 noopInterval_ = value;
428 * Clear all the session stats for a given TAP producer
430 * @param name TAP producer's name
432 void clearPrevSessionStats(const std::string &name) {
433 prevSessionStats.clearStats(name);
436 void getExpiredConnections_UNLOCKED(std::list<connection_t> &deadClients);
438 void removeTapCursors_UNLOCKED(TapProducer *tp);
440 bool closeConnectionByName_UNLOCKED(const std::string &name);
442 TAPSessionStats prevSessionStats;
443 size_t noopInterval_;
448 class DcpConnMap : public ConnMap {
451 DcpConnMap(EventuallyPersistentEngine &engine);
454 * Find or build a dcp connection for the given cookie and with
457 DcpProducer *newProducer(const void* cookie, const std::string &name,
462 * Create a new consumer and add it in the list of TapConnections
463 * @param e the engine
464 * @param c the cookie representing the client
465 * @return Pointer to the new dcp connection
467 DcpConsumer *newConsumer(const void* cookie, const std::string &name);
469 void notifyVBConnections(uint16_t vbid, uint64_t bySeqno);
471 void notifyBackfillManagerTasks();
473 void removeVBConnections(connection_t &conn);
475 void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
477 void shutdownAllConnections();
479 void disconnect(const void *cookie);
481 void manageConnections();
483 bool canAddBackfillToActiveQ();
485 void decrNumActiveSnoozingBackfills();
487 void updateMaxActiveSnoozingBackfills(size_t maxDataSize);
489 uint16_t getNumActiveSnoozingBackfills () const {
490 return numActiveSnoozingBackfills;
493 uint16_t getMaxActiveSnoozingBackfills () const {
494 return maxActiveSnoozingBackfills;
497 size_t getAggrDcpConsumerBufferSize () const {
498 return aggrDcpConsumerBufferSize.load();
501 void incAggrDcpConsumerBufferSize (size_t bufSize) {
502 aggrDcpConsumerBufferSize.fetch_add(bufSize);
505 void decAggrDcpConsumerBufferSize (size_t bufSize) {
506 aggrDcpConsumerBufferSize.fetch_sub(bufSize);
509 ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
510 uint16_t vbucket, uint32_t flags);
513 * Change the value at which a DcpConsumer::Processor task will yield
515 void consumerYieldConfigChanged(size_t newValue);
518 * Change the batchsize that the DcpConsumer::Processor operates with
520 void consumerBatchSizeConfigChanged(size_t newValue);
524 bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
526 void closeAllStreams_UNLOCKED();
528 // Guarded by the parent's classes `connsLock`
529 std::list<connection_t> deadConnections;
531 SpinLock numBackfillsLock;
533 static const uint32_t dbFileMem;
534 uint16_t numActiveSnoozingBackfills;
535 uint16_t maxActiveSnoozingBackfills;
536 /* Max num of backfills we want to have irrespective of memory */
537 static const uint16_t numBackfillsThreshold;
538 /* Max percentage of memory we want backfills to occupy */
539 static const uint8_t numBackfillsMemThreshold;
540 /* Total memory used by all DCP consumer buffers */
541 AtomicValue<size_t> aggrDcpConsumerBufferSize;
543 class DcpConfigChangeListener : public ValueChangedListener {
545 DcpConfigChangeListener(DcpConnMap& connMap);
546 virtual ~DcpConfigChangeListener() { }
547 virtual void sizeValueChanged(const std::string &key, size_t value);
549 DcpConnMap& myConnMap;
554 #endif // SRC_TAPCONNMAP_H_