Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / connmap.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_TAPCONNMAP_H_
19 #define SRC_TAPCONNMAP_H_ 1
20
21 #include "config.h"
22
23 #include <iterator>
24 #include <list>
25 #include <map>
26 #include <set>
27 #include <string>
28 #include <vector>
29
30 #include "common.h"
31 #include "locks.h"
32 #include "syncobject.h"
33 #include "atomicqueue.h"
34 #include "dcp-consumer.h"
35 #include "dcp-producer.h"
36
37 // Forward declaration
38 class ConnNotifier;
39 class TapConsumer;
40 class TapProducer;
41 class Item;
42 class EventuallyPersistentEngine;
43
44 typedef SingleThreadedRCPtr<ConnHandler> connection_t;
45 /**
46  * Base class for operations performed on tap connections.
47  *
48  * @see TapConnMap::performTapOp
49  */
50 template <typename V>
51 class TapOperation {
52 public:
53     virtual ~TapOperation() {}
54     virtual void perform(TapProducer *tc, V arg) = 0;
55 };
56
57 /**
58  * Indicate the tap operation is complete.
59  */
60 class CompleteBackfillTapOperation : public TapOperation<void*> {
61 public:
62     void perform(TapProducer *tc, void* arg);
63 };
64
65 /**
66  * Indicate that we are going to schedule a tap disk backfill for a given vbucket.
67  */
68 class ScheduleDiskBackfillTapOperation : public TapOperation<void*> {
69 public:
70     void perform(TapProducer *tc, void* arg);
71 };
72
73 /**
74  * Indicate the tap backfill disk stream thing is complete for a given vbucket.
75  */
76 class CompleteDiskBackfillTapOperation : public TapOperation<void*> {
77 public:
78     void perform(TapProducer *tc, void* arg);
79 };
80
81 /**
82  * Complete a bg fetch job and give the item to the given tap connection.
83  */
84 class CompletedBGFetchTapOperation : public TapOperation<Item*> {
85 public:
86     CompletedBGFetchTapOperation(hrtime_t token, uint16_t vb, bool ie=false) :
87         connToken(token), vbid(vb), implicitEnqueue(ie) {}
88
89     void perform(TapProducer *tc, Item* arg);
90 private:
91     hrtime_t connToken;
92     uint16_t vbid;
93     bool implicitEnqueue;
94 };
95
96 class TAPSessionStats {
97 public:
98     TAPSessionStats() : normalShutdown(true) {}
99
100     bool wasReplicationCompleted(const std::string &name) const;
101
102     void clearStats(const std::string &name);
103
104     bool normalShutdown;
105     std::map<std::string, std::string> stats;
106 };
107
108
109 /**
110  * Connection notifier type.
111  */
112 typedef enum {
113     TAP_CONN_NOTIFIER, //!< TAP connection notifier
114     DCP_CONN_NOTIFIER  //!< DCP connection notifier
115 } conn_notifier_type;
116
117 /**
118  * A collection of tap or dcp connections.
119  */
120 class ConnMap {
121 public:
122     ConnMap(EventuallyPersistentEngine &theEngine);
123     virtual ~ConnMap();
124
125     void initialize(conn_notifier_type ntype);
126
127     Consumer *newConsumer(const void* c);
128
129     /**
130      * Disconnect a connection by its cookie.
131      */
132     virtual void disconnect(const void *cookie) = 0;
133
134     /**
135      * Call a function on each connection.
136      */
137     template <typename Fun>
138     void each(Fun f) {
139         LockHolder lh(connsLock);
140         each_UNLOCKED(f);
141     }
142
143     /**
144      * Call a function on each connection, but without the connsLock held
145      * Read Copy Update... the all list is copied under connsLock and then
146      * the function is applied against the copy without the lock held.
147      */
148     template <typename Fun>
149     void eachRCU(Fun f) const {
150         std::list<connection_t> currentConnections;
151         {
152             LockHolder lh(connsLock);
153             currentConnections = all;
154         }
155         std::for_each(currentConnections.begin(), currentConnections.end(), f);
156     }
157
158     /**
159      * Call a function on each connection *without* a lock.
160      */
161     template <typename Fun>
162     void each_UNLOCKED(Fun f) {
163         std::for_each(all.begin(), all.end(), f);
164     }
165
166     /**
167      * Return the number of connections for which this predicate is true.
168      */
169     template <typename Fun>
170     size_t count_if(Fun f) {
171         LockHolder lh(connsLock);
172         return count_if_UNLOCKED(f);
173     }
174
175     /**
176      * Return the number of connections for which this predicate is
177      * true *without* a lock.
178      */
179     template <typename Fun>
180     size_t count_if_UNLOCKED(Fun f) {
181         return static_cast<size_t>(std::count_if(all.begin(), all.end(), f));
182     }
183
184     /**
185      * Purge dead connections or identify paused connections that should send
186      * NOOP messages to their destinations.
187      */
188     virtual void manageConnections() = 0;
189
190     connection_t findByName(const std::string &name);
191
192     virtual void shutdownAllConnections() = 0;
193
194     void updateVBConnections(connection_t &conn,
195                              const std::vector<uint16_t> &vbuckets);
196
197     virtual void removeVBConnections(connection_t &conn);
198
199     void addVBConnByVBId(connection_t &conn, int16_t vbid);
200
201     void removeVBConnByVBId_UNLOCKED(connection_t &conn, int16_t vbid);
202
203     void removeVBConnByVBId(connection_t &conn, int16_t vbid);
204
205     /**
206      * Notify a given paused Producer.
207      *
208      * @param tc Producer to be notified
209      * @param schedule true if a notification event is pushed into a queue.
210      *        Otherwise, directly notify the paused connection.
211      */
212     void notifyPausedConnection(connection_t conn, bool schedule = false);
213
214     void notifyAllPausedConnections();
215     bool notificationQueueEmpty();
216
217     EventuallyPersistentEngine& getEngine() {
218         return engine;
219     }
220
221 protected:
222
223     connection_t findByName_UNLOCKED(const std::string &name);
224
225     // Synchronises notifying and releasing connections.
226     // Guards modifications to connection_t objects in {map_} / {all}.
227     // See also: {connLock}
228     Mutex                                    releaseLock;
229
230     // Synchonises access to the {map_} and {all} members, i.e. adding
231     // removing connections.
232     // Actual modification of the underlying
233     // ConnHandler objects is guarded by {releaseLock}.
234     mutable Mutex                            connsLock;
235
236     std::map<const void*, connection_t>      map_;
237     std::list<connection_t>                  all;
238
239     SpinLock *vbConnLocks;
240     std::vector<std::list<connection_t> > vbConns;
241
242     /* Handle to the engine who owns us */
243     EventuallyPersistentEngine &engine;
244
245     AtomicQueue<connection_t> pendingNotifications;
246     ConnNotifier *connNotifier_;
247
248     static size_t vbConnLockNum;
249 };
250
251 /**
252  * Connection notifier that wakes up paused connections.
253  */
254 class ConnNotifier {
255 public:
256     ConnNotifier(conn_notifier_type ntype, ConnMap &cm)
257         : notifier_type(ntype), connMap(cm), task(0),
258           pendingNotification(false)  { }
259
260     void start();
261
262     void stop();
263
264     void wake();
265
266     void notifyMutationEvent();
267
268     bool notifyConnections();
269
270     conn_notifier_type getNotifierType() const {
271         return notifier_type;
272     }
273
274 private:
275     static const double DEFAULT_MIN_STIME;
276
277     conn_notifier_type notifier_type;
278     ConnMap &connMap;
279     AtomicValue<size_t> task;
280     AtomicValue<bool> pendingNotification;
281 };
282
283 class TapConnMap : public ConnMap {
284
285 public:
286
287     TapConnMap(EventuallyPersistentEngine &theEngine);
288
289     /**
290      * Find or build a tap connection for the given cookie and with
291      * the given name.
292      */
293     TapProducer *newProducer(const void* cookie,
294                              const std::string &name,
295                              uint32_t flags,
296                              uint64_t backfillAge,
297                              int tapKeepAlive,
298                              const std::vector<uint16_t> &vbuckets,
299                              const std::map<uint16_t, uint64_t> &lastCheckpointIds);
300
301
302     /**
303      * Create a new consumer and add it in the list of TapConnections
304      * @param e the engine
305      * @param c the cookie representing the client
306      * @return Pointer to the nw tap connection
307      */
308     TapConsumer *newConsumer(const void* c);
309
310     void manageConnections();
311
312     /**
313      * Notify the paused connections that are responsible for replicating
314      * a given vbucket.
315      * @param vbid vbucket id
316      */
317     void notifyVBConnections(uint16_t vbid);
318
319     /**
320      * Set some backfilled events for a named conn.
321      */
322     bool setEvents(const std::string &name, std::list<queued_item> *q);
323
324     void resetReplicaChain();
325
326     /**
327      * Get the size of the named backfill queue.
328      *
329      * @return the size, or -1 if we can't find the queue
330      */
331     ssize_t backfillQueueDepth(const std::string &name);
332
333     void incrBackfillRemaining(const std::string &name,
334                                size_t num_backfill_items);
335
336     void shutdownAllConnections();
337
338     void disconnect(const void *cookie);
339
340     void scheduleBackfill(const std::set<uint16_t> &backfillVBuckets);
341
342     bool isBackfillCompleted(std::string &name);
343
344     /**
345      * Add an event to all tap connections telling them to flush their
346      * items.
347      */
348     void addFlushEvent();
349
350     /**
351      * Change the vbucket filter for a given TAP producer
352      * @param name TAP producer name
353      * @param vbuckets a new vbucket filter
354      * @param checkpoints last closed checkpoint ids for a new vbucket filter
355      * @return true if the TAP producer's vbucket filter is changed successfully
356      */
357     bool changeVBucketFilter(const std::string &name,
358                              const std::vector<uint16_t> &vbuckets,
359                              const std::map<uint16_t, uint64_t> &checkpoints);
360
361     /**
362      * Load TAP-related stats from the previous engine sessions
363      *
364      * @param session_stats all the stats from the previous engine sessions
365      */
366     void loadPrevSessionStats(const std::map<std::string, std::string> &session_stats);
367
368     /**
369      * Check if the given TAP producer completed the replication before
370      * shutdown or crash.
371      *
372      * @param name TAP producer's name
373      * @return true if the replication from the given TAP producer was
374      * completed before shutdown or crash.
375      */
376     bool prevSessionReplicaCompleted(const std::string &name) {
377         return prevSessionStats.wasReplicationCompleted(name);
378     }
379
380     bool checkConnectivity(const std::string &name);
381
382     bool closeConnectionByName(const std::string &name);
383
384     bool mapped(connection_t &tc);
385
386     /**
387      * Perform a TapOperation for a named tap connection while holding
388      * appropriate locks.
389      *
390      * @param name the name of the tap connection to run the op
391      * @param tapop the operation to perform
392      * @param arg argument for the tap operation
393      *
394      * @return true if the tap connection was valid and the operation
395      *         was performed
396      */
397     template <typename V>
398     bool performOp(const std::string &name, TapOperation<V> &tapop, V arg) {
399         bool ret(true);
400         LockHolder lh(connsLock);
401
402         connection_t tc = findByName_UNLOCKED(name);
403         if (tc.get()) {
404             TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
405             cb_assert(tp != NULL);
406             tapop.perform(tp, arg);
407             lh.unlock();
408             notifyPausedConnection(tp, false);
409         } else {
410             ret = false;
411         }
412
413         return ret;
414     }
415
416     size_t getNoopInterval() const {
417         return noopInterval_;
418     }
419
420     void setNoopInterval(size_t value) {
421         noopInterval_ = value;
422         nextNoop_ = 0;
423     }
424
425 private:
426
427     /**
428      * Clear all the session stats for a given TAP producer
429      *
430      * @param name TAP producer's name
431      */
432     void clearPrevSessionStats(const std::string &name) {
433         prevSessionStats.clearStats(name);
434     }
435
436     void getExpiredConnections_UNLOCKED(std::list<connection_t> &deadClients);
437
438     void removeTapCursors_UNLOCKED(TapProducer *tp);
439
440     bool closeConnectionByName_UNLOCKED(const std::string &name);
441
442     TAPSessionStats prevSessionStats;
443     size_t noopInterval_;
444     size_t nextNoop_;
445
446 };
447
448 class DcpConnMap : public ConnMap {
449 public:
450
451     DcpConnMap(EventuallyPersistentEngine &engine);
452
453     /**
454      * Find or build a dcp connection for the given cookie and with
455      * the given name.
456      */
457     DcpProducer *newProducer(const void* cookie, const std::string &name,
458                              bool notifyOnly);
459
460
461     /**
462      * Create a new consumer and add it in the list of TapConnections
463      * @param e the engine
464      * @param c the cookie representing the client
465      * @return Pointer to the new dcp connection
466      */
467     DcpConsumer *newConsumer(const void* cookie, const std::string &name);
468
469     void notifyVBConnections(uint16_t vbid, uint64_t bySeqno);
470
471     void notifyBackfillManagerTasks();
472
473     void removeVBConnections(connection_t &conn);
474
475     void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
476
477     void shutdownAllConnections();
478
479     void disconnect(const void *cookie);
480
481     void manageConnections();
482
483     bool canAddBackfillToActiveQ();
484
485     void decrNumActiveSnoozingBackfills();
486
487     void updateMaxActiveSnoozingBackfills(size_t maxDataSize);
488
489     uint16_t getNumActiveSnoozingBackfills () const {
490         return numActiveSnoozingBackfills;
491     }
492
493     uint16_t getMaxActiveSnoozingBackfills () const {
494         return maxActiveSnoozingBackfills;
495     }
496
497     size_t getAggrDcpConsumerBufferSize () const {
498         return aggrDcpConsumerBufferSize.load();
499     }
500
501     void incAggrDcpConsumerBufferSize (size_t bufSize) {
502         aggrDcpConsumerBufferSize.fetch_add(bufSize);
503     }
504
505     void decAggrDcpConsumerBufferSize (size_t bufSize) {
506         aggrDcpConsumerBufferSize.fetch_sub(bufSize);
507     }
508
509     ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
510                                        uint16_t vbucket, uint32_t flags);
511
512     /*
513      * Change the value at which a DcpConsumer::Processor task will yield
514      */
515     void consumerYieldConfigChanged(size_t newValue);
516
517     /*
518      * Change the batchsize that the DcpConsumer::Processor operates with
519      */
520     void consumerBatchSizeConfigChanged(size_t newValue);
521
522 private:
523
524     bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
525
526     void closeAllStreams_UNLOCKED();
527
528     // Guarded by the parent's classes `connsLock`
529     std::list<connection_t> deadConnections;
530
531     SpinLock numBackfillsLock;
532     /* Db file memory */
533     static const uint32_t dbFileMem;
534     uint16_t numActiveSnoozingBackfills;
535     uint16_t maxActiveSnoozingBackfills;
536     /* Max num of backfills we want to have irrespective of memory */
537     static const uint16_t numBackfillsThreshold;
538     /* Max percentage of memory we want backfills to occupy */
539     static const uint8_t numBackfillsMemThreshold;
540     /* Total memory used by all DCP consumer buffers */
541     AtomicValue<size_t> aggrDcpConsumerBufferSize;
542
543     class DcpConfigChangeListener : public ValueChangedListener {
544     public:
545         DcpConfigChangeListener(DcpConnMap& connMap);
546         virtual ~DcpConfigChangeListener() { }
547         virtual void sizeValueChanged(const std::string &key, size_t value);
548     private:
549         DcpConnMap& myConnMap;
550     };
551 };
552
553
554 #endif  // SRC_TAPCONNMAP_H_