07268989c594c83d7f82009430c63303e488d32a
[ep-engine.git] / src / connmap.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_TAPCONNMAP_H_
19 #define SRC_TAPCONNMAP_H_ 1
20
21 #include "config.h"
22
23 #include <iterator>
24 #include <list>
25 #include <map>
26 #include <set>
27 #include <string>
28 #include <vector>
29
30 #include "common.h"
31 #include "locks.h"
32 #include "syncobject.h"
33 #include "atomicqueue.h"
34 #include "dcp-consumer.h"
35 #include "dcp-producer.h"
36
37 // Forward declaration
38 class ConnNotifier;
39 class TapConsumer;
40 class TapProducer;
41 class Item;
42 class EventuallyPersistentEngine;
43
44 typedef SingleThreadedRCPtr<ConnHandler> connection_t;
45 /**
46  * Base class for operations performed on tap connections.
47  *
48  * @see TapConnMap::performTapOp
49  */
50 template <typename V>
51 class TapOperation {
52 public:
53     virtual ~TapOperation() {}
54     virtual void perform(TapProducer *tc, V arg) = 0;
55 };
56
57 /**
58  * Indicate the tap operation is complete.
59  */
60 class CompleteBackfillTapOperation : public TapOperation<void*> {
61 public:
62     void perform(TapProducer *tc, void* arg);
63 };
64
65 /**
66  * Indicate that we are going to schedule a tap disk backfill for a given vbucket.
67  */
68 class ScheduleDiskBackfillTapOperation : public TapOperation<void*> {
69 public:
70     void perform(TapProducer *tc, void* arg);
71 };
72
73 /**
74  * Indicate the tap backfill disk stream thing is complete for a given vbucket.
75  */
76 class CompleteDiskBackfillTapOperation : public TapOperation<void*> {
77 public:
78     void perform(TapProducer *tc, void* arg);
79 };
80
81 /**
82  * Complete a bg fetch job and give the item to the given tap connection.
83  */
84 class CompletedBGFetchTapOperation : public TapOperation<Item*> {
85 public:
86     CompletedBGFetchTapOperation(hrtime_t token, uint16_t vb, bool ie=false) :
87         connToken(token), vbid(vb), implicitEnqueue(ie) {}
88
89     void perform(TapProducer *tc, Item* arg);
90 private:
91     hrtime_t connToken;
92     uint16_t vbid;
93     bool implicitEnqueue;
94 };
95
96 class TAPSessionStats {
97 public:
98     TAPSessionStats() : normalShutdown(true) {}
99
100     bool wasReplicationCompleted(const std::string &name) const;
101
102     void clearStats(const std::string &name);
103
104     bool normalShutdown;
105     std::map<std::string, std::string> stats;
106 };
107
108
109 /**
110  * Connection notifier type.
111  */
112 typedef enum {
113     TAP_CONN_NOTIFIER, //!< TAP connection notifier
114     DCP_CONN_NOTIFIER  //!< DCP connection notifier
115 } conn_notifier_type;
116
117 /**
118  * A collection of tap or dcp connections.
119  */
120 class ConnMap {
121 public:
122     ConnMap(EventuallyPersistentEngine &theEngine);
123     virtual ~ConnMap();
124
125     void initialize(conn_notifier_type ntype);
126
127     Consumer *newConsumer(const void* c);
128
129     /**
130      * Disconnect a connection by its cookie.
131      */
132     virtual void disconnect(const void *cookie) = 0;
133
134     /**
135      * Call a function on each connection.
136      */
137     template <typename Fun>
138     void each(Fun f) {
139         LockHolder lh(connsLock);
140         each_UNLOCKED(f);
141     }
142
143     /**
144      * Call a function on each connection, but without the connsLock held
145      * Read Copy Update... the all list is copied under connsLock and then
146      * the function is applied against the copy without the lock held.
147      */
148     template <typename Fun>
149     void eachRCU(Fun f) const {
150         std::list<connection_t> currentConnections;
151         {
152             LockHolder lh(connsLock);
153             currentConnections = all;
154         }
155         std::for_each(currentConnections.begin(), currentConnections.end(), f);
156     }
157
158     /**
159      * Call a function on each connection *without* a lock.
160      */
161     template <typename Fun>
162     void each_UNLOCKED(Fun f) {
163         std::for_each(all.begin(), all.end(), f);
164     }
165
166     /**
167      * Return the number of connections for which this predicate is true.
168      */
169     template <typename Fun>
170     size_t count_if(Fun f) {
171         LockHolder lh(connsLock);
172         return count_if_UNLOCKED(f);
173     }
174
175     /**
176      * Return the number of connections for which this predicate is
177      * true *without* a lock.
178      */
179     template <typename Fun>
180     size_t count_if_UNLOCKED(Fun f) {
181         return static_cast<size_t>(std::count_if(all.begin(), all.end(), f));
182     }
183
184     /**
185      * Purge dead connections or identify paused connections that should send
186      * NOOP messages to their destinations.
187      */
188     virtual void manageConnections() = 0;
189
190     connection_t findByName(const std::string &name);
191
192     virtual void shutdownAllConnections() = 0;
193
194     void updateVBConnections(connection_t &conn,
195                              const std::vector<uint16_t> &vbuckets);
196
197     virtual void removeVBConnections(connection_t &conn);
198
199     void addVBConnByVBId(connection_t &conn, int16_t vbid);
200
201     void removeVBConnByVBId_UNLOCKED(connection_t &conn, int16_t vbid);
202
203     void removeVBConnByVBId(connection_t &conn, int16_t vbid);
204
205     /**
206      * Notify a given paused Producer.
207      *
208      * @param tc Producer to be notified
209      * @param schedule true if a notification event is pushed into a queue.
210      *        Otherwise, directly notify the paused connection.
211      */
212     void notifyPausedConnection(connection_t conn, bool schedule = false);
213
214     void notifyAllPausedConnections();
215     bool notificationQueueEmpty();
216
217     EventuallyPersistentEngine& getEngine() {
218         return engine;
219     }
220
221 protected:
222
223     connection_t findByName_UNLOCKED(const std::string &name);
224
225     // Synchronises notifying and releasing connections.
226     // Guards modifications to connection_t objects in {map_} / {all}.
227     // See also: {connLock}
228     Mutex                                    releaseLock;
229
230     // Synchonises access to the {map_} and {all} members, i.e. adding
231     // removing connections.
232     // Actual modification of the underlying
233     // ConnHandler objects is guarded by {releaseLock}.
234     mutable Mutex                            connsLock;
235
236     std::map<const void*, connection_t>      map_;
237     std::list<connection_t>                  all;
238
239     SpinLock *vbConnLocks;
240     std::vector<std::list<connection_t> > vbConns;
241
242     /* Handle to the engine who owns us */
243     EventuallyPersistentEngine &engine;
244
245     AtomicQueue<connection_t> pendingNotifications;
246     ConnNotifier *connNotifier_;
247
248     static size_t vbConnLockNum;
249 };
250
251 /**
252  * Connection notifier that wakes up paused connections.
253  */
254 class ConnNotifier {
255 public:
256     ConnNotifier(conn_notifier_type ntype, ConnMap &cm)
257         : notifier_type(ntype), connMap(cm), pendingNotification(false)  { }
258
259     void start();
260
261     void stop();
262
263     void wake();
264
265     void notifyMutationEvent();
266
267     bool notifyConnections();
268
269     conn_notifier_type getNotifierType() const {
270         return notifier_type;
271     }
272
273 private:
274     static const double DEFAULT_MIN_STIME;
275
276     conn_notifier_type notifier_type;
277     ConnMap &connMap;
278     AtomicValue<size_t> task;
279     AtomicValue<bool> pendingNotification;
280 };
281
282 class TapConnMap : public ConnMap {
283
284 public:
285
286     TapConnMap(EventuallyPersistentEngine &theEngine);
287
288     /**
289      * Find or build a tap connection for the given cookie and with
290      * the given name.
291      */
292     TapProducer *newProducer(const void* cookie,
293                              const std::string &name,
294                              uint32_t flags,
295                              uint64_t backfillAge,
296                              int tapKeepAlive,
297                              const std::vector<uint16_t> &vbuckets,
298                              const std::map<uint16_t, uint64_t> &lastCheckpointIds);
299
300
301     /**
302      * Create a new consumer and add it in the list of TapConnections
303      * @param e the engine
304      * @param c the cookie representing the client
305      * @return Pointer to the nw tap connection
306      */
307     TapConsumer *newConsumer(const void* c);
308
309     void manageConnections();
310
311     /**
312      * Notify the paused connections that are responsible for replicating
313      * a given vbucket.
314      * @param vbid vbucket id
315      */
316     void notifyVBConnections(uint16_t vbid);
317
318     /**
319      * Set some backfilled events for a named conn.
320      */
321     bool setEvents(const std::string &name, std::list<queued_item> *q);
322
323     void resetReplicaChain();
324
325     /**
326      * Get the size of the named backfill queue.
327      *
328      * @return the size, or -1 if we can't find the queue
329      */
330     ssize_t backfillQueueDepth(const std::string &name);
331
332     void incrBackfillRemaining(const std::string &name,
333                                size_t num_backfill_items);
334
335     void shutdownAllConnections();
336
337     void disconnect(const void *cookie);
338
339     void scheduleBackfill(const std::set<uint16_t> &backfillVBuckets);
340
341     bool isBackfillCompleted(std::string &name);
342
343     /**
344      * Add an event to all tap connections telling them to flush their
345      * items.
346      */
347     void addFlushEvent();
348
349     /**
350      * Change the vbucket filter for a given TAP producer
351      * @param name TAP producer name
352      * @param vbuckets a new vbucket filter
353      * @param checkpoints last closed checkpoint ids for a new vbucket filter
354      * @return true if the TAP producer's vbucket filter is changed successfully
355      */
356     bool changeVBucketFilter(const std::string &name,
357                              const std::vector<uint16_t> &vbuckets,
358                              const std::map<uint16_t, uint64_t> &checkpoints);
359
360     /**
361      * Load TAP-related stats from the previous engine sessions
362      *
363      * @param session_stats all the stats from the previous engine sessions
364      */
365     void loadPrevSessionStats(const std::map<std::string, std::string> &session_stats);
366
367     /**
368      * Check if the given TAP producer completed the replication before
369      * shutdown or crash.
370      *
371      * @param name TAP producer's name
372      * @return true if the replication from the given TAP producer was
373      * completed before shutdown or crash.
374      */
375     bool prevSessionReplicaCompleted(const std::string &name) {
376         return prevSessionStats.wasReplicationCompleted(name);
377     }
378
379     bool checkConnectivity(const std::string &name);
380
381     bool closeConnectionByName(const std::string &name);
382
383     bool mapped(connection_t &tc);
384
385     /**
386      * Perform a TapOperation for a named tap connection while holding
387      * appropriate locks.
388      *
389      * @param name the name of the tap connection to run the op
390      * @param tapop the operation to perform
391      * @param arg argument for the tap operation
392      *
393      * @return true if the tap connection was valid and the operation
394      *         was performed
395      */
396     template <typename V>
397     bool performOp(const std::string &name, TapOperation<V> &tapop, V arg) {
398         bool ret(true);
399         LockHolder lh(connsLock);
400
401         connection_t tc = findByName_UNLOCKED(name);
402         if (tc.get()) {
403             TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
404             cb_assert(tp != NULL);
405             tapop.perform(tp, arg);
406             lh.unlock();
407             notifyPausedConnection(tp, false);
408         } else {
409             ret = false;
410         }
411
412         return ret;
413     }
414
415     size_t getNoopInterval() const {
416         return noopInterval_;
417     }
418
419     void setNoopInterval(size_t value) {
420         noopInterval_ = value;
421         nextNoop_ = 0;
422     }
423
424 private:
425
426     /**
427      * Clear all the session stats for a given TAP producer
428      *
429      * @param name TAP producer's name
430      */
431     void clearPrevSessionStats(const std::string &name) {
432         prevSessionStats.clearStats(name);
433     }
434
435     void getExpiredConnections_UNLOCKED(std::list<connection_t> &deadClients);
436
437     void removeTapCursors_UNLOCKED(TapProducer *tp);
438
439     bool closeConnectionByName_UNLOCKED(const std::string &name);
440
441     TAPSessionStats prevSessionStats;
442     size_t noopInterval_;
443     size_t nextNoop_;
444
445 };
446
447
448 class DcpConnMap : public ConnMap {
449
450 public:
451
452     DcpConnMap(EventuallyPersistentEngine &engine);
453
454     /**
455      * Find or build a dcp connection for the given cookie and with
456      * the given name.
457      */
458     DcpProducer *newProducer(const void* cookie, const std::string &name,
459                              bool notifyOnly);
460
461
462     /**
463      * Create a new consumer and add it in the list of TapConnections
464      * @param e the engine
465      * @param c the cookie representing the client
466      * @return Pointer to the new dcp connection
467      */
468     DcpConsumer *newConsumer(const void* cookie, const std::string &name);
469
470     void notifyVBConnections(uint16_t vbid, uint64_t bySeqno);
471
472     void removeVBConnections(connection_t &conn);
473
474     void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
475
476     void shutdownAllConnections();
477
478     void disconnect(const void *cookie);
479
480     void manageConnections();
481
482     ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
483                                        uint16_t vbucket, uint32_t flags);
484
485     void addStats(ADD_STAT add_stat, const void *c);
486
487 protected:
488
489     bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
490
491     void closeAllStreams_UNLOCKED();
492
493     // Guarded by the parent's classes `connsLock`
494     std::list<connection_t> deadConnections;
495 };
496
497
498 #endif  // SRC_TAPCONNMAP_H_