26b493c8d950a99f7f2ceee657fa66f186afacf1
[ep-engine.git] / src / connmap.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <algorithm>
21 #include <limits>
22 #include <queue>
23 #include <set>
24 #include <string>
25 #include <vector>
26
27 #include "ep_engine.h"
28 #include "executorthread.h"
29 #include "tapconnection.h"
30 #include "connmap.h"
31 #include "dcp-consumer.h"
32 #include "dcp-producer.h"
33
34 size_t ConnMap::vbConnLockNum = 32;
35 const double ConnNotifier::DEFAULT_MIN_STIME = 1.0;
36
37 /**
38  * NonIO task to free the resource of a tap connection.
39  */
40 class ConnectionReaperCallback : public GlobalTask {
41 public:
42     ConnectionReaperCallback(EventuallyPersistentEngine &e, ConnMap& cm,
43                              connection_t &conn)
44         : GlobalTask(&e, Priority::TapConnectionReaperPriority),
45           connMap(cm), connection(conn) {
46         std::stringstream ss;
47         ss << "Reaping tap or dcp connection: " << connection->getName();
48         descr = ss.str();
49     }
50
51     bool run(void) {
52         TapProducer *tp = dynamic_cast<TapProducer*>(connection.get());
53         if (tp) {
54             tp->clearQueues();
55             connMap.removeVBConnections(connection);
56         }
57         return false;
58     }
59
60     std::string getDescription() {
61         return descr;
62     }
63
64 private:
65     ConnMap &connMap;
66     connection_t connection;
67     std::string descr;
68 };
69
70 /**
71  * A Callback task for Tap connection notifier
72  */
73 class ConnNotifierCallback : public GlobalTask {
74 public:
75     ConnNotifierCallback(EventuallyPersistentEngine *e, ConnNotifier *notifier)
76     : GlobalTask(e, Priority::TapConnNotificationPriority),
77       connNotifier(notifier) { }
78
79     bool run(void) {
80         return connNotifier->notifyConnections();
81     }
82
83     std::string getDescription() {
84         if (connNotifier->getNotifierType() == TAP_CONN_NOTIFIER) {
85             return std::string("TAP connection notifier");
86         } else {
87             return std::string("DCP connection notifier");
88         }
89     }
90
91 private:
92     ConnNotifier *connNotifier;
93 };
94
95 void ConnNotifier::start() {
96     bool inverse = false;
97     pendingNotification.compare_exchange_strong(inverse, true);
98     ExTask connotifyTask = new ConnNotifierCallback(&connMap.getEngine(), this);
99     task = ExecutorPool::get()->schedule(connotifyTask, NONIO_TASK_IDX);
100     cb_assert(task);
101 }
102
103 void ConnNotifier::stop() {
104     bool inverse = true;
105     pendingNotification.compare_exchange_strong(inverse, false);
106     ExecutorPool::get()->cancel(task);
107 }
108
109 void ConnNotifier::notifyMutationEvent(void) {
110     bool inverse = false;
111     if (pendingNotification.compare_exchange_strong(inverse, true)) {
112         cb_assert(task > 0);
113         ExecutorPool::get()->wake(task);
114     }
115 }
116
117 void ConnNotifier::wake() {
118     ExecutorPool::get()->wake(task);
119 }
120
121 bool ConnNotifier::notifyConnections() {
122     bool inverse = true;
123     pendingNotification.compare_exchange_strong(inverse, false);
124     connMap.notifyAllPausedConnections();
125
126     if (!pendingNotification.load()) {
127         ExecutorPool::get()->snooze(task, DEFAULT_MIN_STIME);
128         if (pendingNotification.load()) {
129             // Check again if a new notification is arrived right before
130             // calling snooze() above.
131             ExecutorPool::get()->snooze(task, 0);
132         }
133     }
134
135     return true;
136 }
137
138 /**
139  * A task to manage connections.
140  */
141 class ConnManager : public GlobalTask {
142 public:
143     ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
144         : GlobalTask(e, Priority::TapConnMgrPriority, MIN_SLEEP_TIME, false),
145           engine(e), connmap(cmap) { }
146
147     bool run(void) {
148         if (engine->getEpStats().isShutdown) {
149             return false;
150         }
151         connmap->manageConnections();
152         snooze(MIN_SLEEP_TIME);
153         return true;
154     }
155
156     std::string getDescription() {
157         return std::string("Connection Manager");
158     }
159
160 private:
161     EventuallyPersistentEngine *engine;
162     ConnMap *connmap;
163 };
164
165 class ConnMapValueChangeListener : public ValueChangedListener {
166 public:
167     ConnMapValueChangeListener(TapConnMap &tc)
168         : connmap_(tc) {
169     }
170
171     virtual void sizeValueChanged(const std::string &key, size_t value) {
172         if (key.compare("tap_noop_interval") == 0) {
173             connmap_.setNoopInterval(value);
174         }
175     }
176
177 private:
178     TapConnMap &connmap_;
179 };
180
181 ConnMap::ConnMap(EventuallyPersistentEngine &theEngine)
182     :  engine(theEngine),
183        connNotifier_(NULL) {
184
185     Configuration &config = engine.getConfiguration();
186     vbConnLocks = new SpinLock[vbConnLockNum];
187     size_t max_vbs = config.getMaxVbuckets();
188     for (size_t i = 0; i < max_vbs; ++i) {
189         vbConns.push_back(std::list<connection_t>());
190     }
191 }
192
193 void ConnMap::initialize(conn_notifier_type ntype) {
194     connNotifier_ = new ConnNotifier(ntype, *this);
195     connNotifier_->start();
196     ExTask connMgr = new ConnManager(&engine, this);
197     ExecutorPool::get()->schedule(connMgr, NONIO_TASK_IDX);
198 }
199
200 ConnMap::~ConnMap() {
201     delete [] vbConnLocks;
202     if (connNotifier_ != NULL) {
203         connNotifier_->stop();
204     }
205     delete connNotifier_;
206 }
207
208 connection_t ConnMap::findByName(const std::string &name) {
209     LockHolder lh(connsLock);
210     return findByName_UNLOCKED(name);
211 }
212
213 connection_t ConnMap::findByName_UNLOCKED(const std::string&name) {
214     connection_t rv(NULL);
215     std::list<connection_t>::iterator iter;
216     for (iter = all.begin(); iter != all.end(); ++iter) {
217         if ((*iter)->getName() == name) {
218             rv = *iter;
219         }
220     }
221     return rv;
222 }
223
224 void ConnMap::notifyPausedConnection(connection_t conn, bool schedule) {
225     if (engine.getEpStats().isShutdown) {
226         return;
227     }
228
229     Notifiable* tp = dynamic_cast<Notifiable*>(conn.get());
230     if (schedule) {
231         if (tp && tp->isPaused() && conn->isReserved() &&
232             tp->setNotificationScheduled(true)) {
233             pendingNotifications.push(conn);
234             connNotifier_->notifyMutationEvent(); // Wake up the connection notifier so that
235                                                   // it can notify the event to a given
236                                                   // paused connection.
237         }
238     } else {
239         LockHolder rlh(releaseLock);
240         if (tp && tp->isPaused() && conn->isReserved()) {
241             engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
242             tp->setNotifySent(true);
243         }
244     }
245 }
246
247 void ConnMap::notifyAllPausedConnections() {
248     std::queue<connection_t> queue;
249     pendingNotifications.getAll(queue);
250
251     LockHolder rlh(releaseLock);
252     while (!queue.empty()) {
253         connection_t &conn = queue.front();
254         Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
255         if (tp) {
256             tp->setNotificationScheduled(false);
257             if (tp->isPaused() && conn->isReserved()) {
258                 engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
259                 tp->setNotifySent(true);
260             }
261         }
262         queue.pop();
263     }
264 }
265
266 bool ConnMap::notificationQueueEmpty() {
267     return pendingNotifications.empty();
268 }
269
270 void ConnMap::updateVBConnections(connection_t &conn,
271                                         const std::vector<uint16_t> &vbuckets)
272 {
273     Producer *tp = dynamic_cast<Producer*>(conn.get());
274     if (!tp) {
275         return;
276     }
277
278     VBucketFilter new_filter(vbuckets);
279     VBucketFilter diff = tp->getVBucketFilter().filter_diff(new_filter);
280     const std::set<uint16_t> &vset = diff.getVBSet();
281
282     for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
283         size_t lock_num = (*it) % vbConnLockNum;
284         SpinLockHolder lh (&vbConnLocks[lock_num]);
285         // Remove the connection that is no longer for a given vbucket
286         if (!tp->vbucketFilter.empty() && tp->vbucketFilter(*it)) {
287             std::list<connection_t> &vb_conns = vbConns[*it];
288             std::list<connection_t>::iterator itr = vb_conns.begin();
289             for (; itr != vb_conns.end(); ++itr) {
290                 if (conn->getCookie() == (*itr)->getCookie()) {
291                     vb_conns.erase(itr);
292                     break;
293                 }
294             }
295         } else { // Add the connection to the vbucket replicator list.
296             std::list<connection_t> &vb_conns = vbConns[*it];
297             vb_conns.push_back(conn);
298         }
299     }
300 }
301
302 void ConnMap::removeVBConnections(connection_t &conn) {
303     Producer *tp = dynamic_cast<Producer*>(conn.get());
304     if (!tp) {
305         return;
306     }
307
308     const std::set<uint16_t> &vset = tp->vbucketFilter.getVBSet();
309     for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
310         size_t lock_num = (*it) % vbConnLockNum;
311         SpinLockHolder lh (&vbConnLocks[lock_num]);
312         std::list<connection_t> &vb_conns = vbConns[*it];
313         std::list<connection_t>::iterator itr = vb_conns.begin();
314         for (; itr != vb_conns.end(); ++itr) {
315             if (conn->getCookie() == (*itr)->getCookie()) {
316                 vb_conns.erase(itr);
317                 break;
318             }
319         }
320     }
321 }
322
323 void ConnMap::addVBConnByVBId(connection_t &conn, int16_t vbid) {
324     if (!conn.get()) {
325         return;
326     }
327
328     size_t lock_num = vbid % vbConnLockNum;
329     SpinLockHolder lh (&vbConnLocks[lock_num]);
330     std::list<connection_t> &vb_conns = vbConns[vbid];
331     vb_conns.push_back(conn);
332 }
333
334 void ConnMap::removeVBConnByVBId_UNLOCKED(connection_t &conn, int16_t vbid) {
335     if (!conn.get()) {
336         return;
337     }
338
339     std::list<connection_t> &vb_conns = vbConns[vbid];
340     std::list<connection_t>::iterator itr = vb_conns.begin();
341     for (; itr != vb_conns.end(); ++itr) {
342         if (conn->getCookie() == (*itr)->getCookie()) {
343             vb_conns.erase(itr);
344             break;
345         }
346     }
347 }
348
349 void ConnMap::removeVBConnByVBId(connection_t &conn, int16_t vbid) {
350     size_t lock_num = vbid % vbConnLockNum;
351     SpinLockHolder lh (&vbConnLocks[lock_num]);
352     removeVBConnByVBId_UNLOCKED(conn, vbid);
353 }
354
355
356 TapConnMap::TapConnMap(EventuallyPersistentEngine &e)
357     : ConnMap(e), nextNoop_(0) {
358
359     Configuration &config = engine.getConfiguration();
360     noopInterval_ = config.getTapNoopInterval();
361     config.addValueChangedListener("tap_noop_interval",
362                                    new ConnMapValueChangeListener(*this));
363 }
364
365 TapConsumer *TapConnMap::newConsumer(const void* cookie)
366 {
367     LockHolder lh(connsLock);
368     TapConsumer *tc = new TapConsumer(engine, cookie, ConnHandler::getAnonName());
369     connection_t tap(tc);
370     LOG(EXTENSION_LOG_INFO, "%s created", tap->logHeader());
371     all.push_back(tap);
372     map_[cookie] = tap;
373     return tc;
374 }
375
376 TapProducer *TapConnMap::newProducer(const void* cookie,
377                                      const std::string &name,
378                                      uint32_t flags,
379                                      uint64_t backfillAge,
380                                      int tapKeepAlive,
381                                      const std::vector<uint16_t> &vbuckets,
382                                      const std::map<uint16_t, uint64_t> &lastCheckpointIds)
383 {
384     LockHolder lh(connsLock);
385     TapProducer *producer(NULL);
386
387     std::list<connection_t>::iterator iter;
388     for (iter = all.begin(); iter != all.end(); ++iter) {
389         producer = dynamic_cast<TapProducer*>((*iter).get());
390         if (producer && producer->getName() == name) {
391             producer->setExpiryTime((rel_time_t)-1);
392             producer->reconnected();
393             break;
394         }
395         else {
396             producer = NULL;
397         }
398     }
399
400     if (producer != NULL) {
401         const void *old_cookie = producer->getCookie();
402         cb_assert(old_cookie);
403         map_.erase(old_cookie);
404
405         if (tapKeepAlive == 0 || (producer->mayCompleteDumpOrTakeover() && producer->idle())) {
406             LOG(EXTENSION_LOG_INFO,
407                 "%s keep alive timed out, should be nuked", producer->logHeader());
408             producer->setName(ConnHandler::getAnonName());
409             producer->setDisconnect(true);
410             producer->setConnected(false);
411             producer->setPaused(true);
412             producer->setExpiryTime(ep_current_time() - 1);
413             producer = NULL;
414         }
415         else {
416             LOG(EXTENSION_LOG_INFO, "%s exists... grabbing the channel",
417                 producer->logHeader());
418             // Create the dummy expired producer connection for the old connection cookie.
419             // This dummy producer connection will be used for releasing the corresponding
420             // memcached connection.
421
422             // dliao: TODO no need to deal with tap or dcp separately here for the dummy?
423             TapProducer *n = new TapProducer(engine,
424                                              old_cookie,
425                                              ConnHandler::getAnonName(),
426                                              0);
427             n->setDisconnect(true);
428             n->setConnected(false);
429             n->setPaused(true);
430             n->setExpiryTime(ep_current_time() - 1);
431             all.push_back(connection_t(n));
432         }
433     }
434
435     bool reconnect = false;
436     if (producer == NULL) {
437         producer = new TapProducer(engine, cookie, name, flags);
438         LOG(EXTENSION_LOG_INFO, "%s created", producer->logHeader());
439         all.push_back(connection_t(producer));
440     } else {
441         producer->setCookie(cookie);
442         producer->setReserved(true);
443         producer->setConnected(true);
444         producer->setDisconnect(false);
445         reconnect = true;
446     }
447     producer->evaluateFlags();
448
449     connection_t conn(producer);
450     updateVBConnections(conn, vbuckets);
451
452     producer->setFlagByteorderSupport((flags & TAP_CONNECT_TAP_FIX_FLAG_BYTEORDER) != 0);
453     producer->setBackfillAge(backfillAge, reconnect);
454     producer->setVBucketFilter(vbuckets);
455     producer->registerCursor(lastCheckpointIds);
456
457     if (reconnect) {
458         producer->rollback();
459     }
460
461     map_[cookie] = conn;
462     engine.storeEngineSpecific(cookie, producer);
463     // Clear all previous session stats for this producer.
464     clearPrevSessionStats(producer->getName());
465
466     return producer;
467
468 }
469
470 void TapConnMap::manageConnections() {
471     // To avoid connections to be stucked in a bogus state forever, we're going
472     // to ping all connections that hasn't tried to walk the tap queue
473     // for this amount of time..
474     const int maxIdleTime = 5;
475
476     bool addNoop = false;
477
478     rel_time_t now = ep_current_time();
479     if (now > nextNoop_ && noopInterval_ != (size_t)-1) {
480         addNoop = true;
481         nextNoop_ = now + noopInterval_;
482     }
483
484     std::list<connection_t> deadClients;
485
486     LockHolder lh(connsLock);
487     // We should pause unless we purged some connections or
488     // all queues have items.
489     getExpiredConnections_UNLOCKED(deadClients);
490
491     // see if I have some channels that I have to signal..
492     std::map<const void*, connection_t>::iterator iter;
493     for (iter = map_.begin(); iter != map_.end(); ++iter) {
494         TapProducer *tp = dynamic_cast<TapProducer*>(iter->second.get());
495         if (tp != NULL) {
496             if (tp->supportsAck() && (tp->getExpiryTime() < now) && tp->windowIsFull()) {
497                 LOG(EXTENSION_LOG_WARNING,
498                     "%s Expired and ack windows is full. Disconnecting...",
499                     tp->logHeader());
500                 tp->setDisconnect(true);
501             } else if (addNoop) {
502                 tp->setTimeForNoop();
503             }
504         }
505     }
506
507     // Collect the list of connections that need to be signaled.
508     std::list<connection_t> toNotify;
509     for (iter = map_.begin(); iter != map_.end(); ++iter) {
510         TapProducer *tp = dynamic_cast<TapProducer*>(iter->second.get());
511         if (tp && (tp->isPaused() || tp->doDisconnect()) && !tp->isSuspended()
512             && tp->isReserved()) {
513             if (!tp->sentNotify() ||
514                 (tp->getLastWalkTime() + maxIdleTime < now)) {
515                 toNotify.push_back(iter->second);
516             }
517         }
518     }
519
520     lh.unlock();
521
522     LockHolder rlh(releaseLock);
523     std::list<connection_t>::iterator it;
524     for (it = toNotify.begin(); it != toNotify.end(); ++it) {
525         TapProducer *tp = dynamic_cast<TapProducer*>((*it).get());
526         if (tp && tp->isReserved()) {
527             engine.notifyIOComplete(tp->getCookie(), ENGINE_SUCCESS);
528             tp->setNotifySent(true);
529         }
530     }
531
532     // Delete all of the dead clients
533     std::list<connection_t>::iterator ii;
534     for (ii = deadClients.begin(); ii != deadClients.end(); ++ii) {
535         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
536         (*ii)->releaseReference();
537         TapProducer *tp = dynamic_cast<TapProducer*>((*ii).get());
538         if (tp) {
539             ExTask reapTask = new ConnectionReaperCallback(engine, *this, *ii);
540             ExecutorPool::get()->schedule(reapTask, NONIO_TASK_IDX);
541         }
542     }
543 }
544
545 void TapConnMap::notifyVBConnections(uint16_t vbid)
546 {
547     size_t lock_num = vbid % vbConnLockNum;
548     SpinLockHolder lh(&vbConnLocks[lock_num]);
549
550     std::list<connection_t> &conns = vbConns[vbid];
551     std::list<connection_t>::iterator it = conns.begin();
552     for (; it != conns.end(); ++it) {
553         Notifiable *conn = dynamic_cast<Notifiable*>((*it).get());
554         if (conn && conn->isPaused() && (*it)->isReserved() &&
555             conn->setNotificationScheduled(true)) {
556             pendingNotifications.push(*it);
557             connNotifier_->notifyMutationEvent();
558         }
559     }
560     lh.unlock();
561 }
562
563 bool TapConnMap::setEvents(const std::string &name, std::list<queued_item> *q) {
564     bool found(false);
565     LockHolder lh(connsLock);
566
567     connection_t tc = findByName_UNLOCKED(name);
568     if (tc.get()) {
569         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
570         cb_assert(tp);
571         found = true;
572         tp->appendQueue(q);
573         lh.unlock();
574         notifyPausedConnection(tp, false);
575     }
576
577     return found;
578 }
579
580 void TapConnMap::incrBackfillRemaining(const std::string &name,
581                                        size_t num_backfill_items) {
582     LockHolder lh(connsLock);
583
584     connection_t tc = findByName_UNLOCKED(name);
585     if (tc.get()) {
586         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
587         cb_assert(tp);
588         tp->incrBackfillRemaining(num_backfill_items);
589     }
590 }
591
592 ssize_t TapConnMap::backfillQueueDepth(const std::string &name) {
593     ssize_t rv(-1);
594     LockHolder lh(connsLock);
595
596     connection_t tc = findByName_UNLOCKED(name);
597     if (tc.get()) {
598         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
599         cb_assert(tp);
600         rv = tp->getBackfillQueueSize();
601     }
602
603     return rv;
604 }
605
606 void TapConnMap::resetReplicaChain() {
607     LockHolder lh(connsLock);
608     rel_time_t now = ep_current_time();
609     std::list<connection_t>::iterator it = all.begin();
610     for (; it != all.end(); ++it) {
611         connection_t &tc = *it;
612         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
613         if (!(tp && (tp->isConnected() || tp->getExpiryTime() > now))) {
614             continue;
615         }
616         LOG(EXTENSION_LOG_INFO, "%s Reset the replication chain",
617             tp->logHeader());
618         // Get the list of vbuckets that each TAP producer is replicating
619         VBucketFilter vbfilter = tp->getVBucketFilter();
620         std::vector<uint16_t> vblist (vbfilter.getVBSet().begin(), vbfilter.getVBSet().end());
621         // TAP producer sends INITIAL_VBUCKET_STREAM messages to the destination to reset
622         // replica vbuckets, and then backfills items to the destination.
623         tp->scheduleBackfill(vblist);
624         notifyPausedConnection(tp, true);
625     }
626 }
627
628 bool TapConnMap::isBackfillCompleted(std::string &name) {
629     LockHolder lh(connsLock);
630     connection_t tc = findByName_UNLOCKED(name);
631     if (tc.get()) {
632         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
633         if (tp) {
634             return tp->isBackfillCompleted();
635         }
636     }
637     return false;
638 }
639
640 void TapConnMap::addFlushEvent() {
641     LockHolder lh(connsLock);
642     std::list<connection_t>::iterator iter;
643     for (iter = all.begin(); iter != all.end(); iter++) {
644         TapProducer *tp = dynamic_cast<TapProducer*>((*iter).get());
645         if (tp && !tp->dumpQueue) {
646             tp->flush();
647         }
648     }
649 }
650
651 void TapConnMap::scheduleBackfill(const std::set<uint16_t> &backfillVBuckets) {
652     LockHolder lh(connsLock);
653     rel_time_t now = ep_current_time();
654     std::list<connection_t>::iterator it = all.begin();
655     for (; it != all.end(); ++it) {
656         connection_t &tc = *it;
657         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
658         if (!(tp && (tp->isConnected() || tp->getExpiryTime() > now))) {
659             continue;
660         }
661
662         std::vector<uint16_t> vblist;
663         std::set<uint16_t>::const_iterator vb_it = backfillVBuckets.begin();
664         for (; vb_it != backfillVBuckets.end(); ++vb_it) {
665             if (tp->checkVBucketFilter(*vb_it)) {
666                 vblist.push_back(*vb_it);
667             }
668         }
669         if (!vblist.empty()) {
670             tp->scheduleBackfill(vblist);
671             notifyPausedConnection(tp, true);
672         }
673     }
674 }
675
676 void TapConnMap::loadPrevSessionStats(const std::map<std::string, std::string> &session_stats) {
677     LockHolder lh(connsLock);
678     std::map<std::string, std::string>::const_iterator it =
679         session_stats.find("ep_force_shutdown");
680
681     if (it != session_stats.end()) {
682         if (it->second.compare("true") == 0) {
683             prevSessionStats.normalShutdown = false;
684         }
685     } else if (!session_stats.empty()) { // possible crash on the previous session.
686         prevSessionStats.normalShutdown = false;
687     }
688
689     std::string tap_prefix("eq_tapq:");
690     for (it = session_stats.begin(); it != session_stats.end(); ++it) {
691         const std::string &stat_name = it->first;
692         if (stat_name.substr(0, 8).compare(tap_prefix) == 0) {
693             if (stat_name.find("backfill_completed") != std::string::npos ||
694                 stat_name.find("idle") != std::string::npos) {
695                 prevSessionStats.stats[stat_name] = it->second;
696             }
697         }
698     }
699 }
700
701 bool TapConnMap::changeVBucketFilter(const std::string &name,
702                                      const std::vector<uint16_t> &vbuckets,
703                                      const std::map<uint16_t, uint64_t> &checkpoints) {
704     bool rv = false;
705     LockHolder lh(connsLock);
706     connection_t tc = findByName_UNLOCKED(name);
707     if (tc.get()) {
708         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
709         if (tp && (tp->isConnected() || tp->getExpiryTime() > ep_current_time())) {
710             LOG(EXTENSION_LOG_INFO, "%s Change the vbucket filter",
711                 tp->logHeader());
712             updateVBConnections(tc, vbuckets);
713             tp->setVBucketFilter(vbuckets, true);
714             tp->registerCursor(checkpoints);
715             rv = true;
716             lh.unlock();
717             notifyPausedConnection(tp, true);
718         }
719     }
720     return rv;
721 }
722
723 bool TapConnMap::checkConnectivity(const std::string &name) {
724     LockHolder lh(connsLock);
725     rel_time_t now = ep_current_time();
726     connection_t tc = findByName_UNLOCKED(name);
727     if (tc.get()) {
728         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
729         if (tp && (tp->isConnected() || tp->getExpiryTime() > now)) {
730             return true;
731         }
732     }
733     return false;
734 }
735
736 bool TapConnMap::closeConnectionByName(const std::string &name) {
737
738     LockHolder lh(connsLock);
739     return closeConnectionByName_UNLOCKED(name);
740 }
741
742 bool TapConnMap::mapped(connection_t &tc) {
743     bool rv = false;
744     std::map<const void*, connection_t>::iterator it;
745     for (it = map_.begin(); it != map_.end(); ++it) {
746         if (it->second.get() == tc.get()) {
747             rv = true;
748             break;
749         }
750     }
751     return rv;
752 }
753
754 void TapConnMap::shutdownAllConnections() {
755     LOG(EXTENSION_LOG_WARNING, "Shutting down tap connections!");
756
757     connNotifier_->stop();
758
759     // Not safe to acquire both connsLock and releaseLock at the same time
760     // (can trigger deadlock), so first acquire releaseLock to release all
761     // the connections (without changing the list/map), then drop releaseLock,
762     // acquire connsLock and actually clear out the list/map.
763     LockHolder rlh(releaseLock);
764     std::list<connection_t>::iterator ii;
765     for (ii = all.begin(); ii != all.end(); ++ii) {
766         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
767         (*ii)->releaseReference();
768         TapProducer *tp = dynamic_cast<TapProducer*>((*ii).get());
769         if (tp) {
770             tp->clearQueues();
771         }
772     }
773     rlh.unlock();
774
775     LockHolder lh(connsLock);
776     all.clear();
777     map_.clear();
778 }
779
780 void TapConnMap::disconnect(const void *cookie) {
781     LockHolder lh(connsLock);
782
783     Configuration& config = engine.getConfiguration();
784     int tapKeepAlive = static_cast<int>(config.getTapKeepalive());
785     std::map<const void*, connection_t>::iterator iter(map_.find(cookie));
786     if (iter != map_.end()) {
787         if (iter->second.get()) {
788             rel_time_t now = ep_current_time();
789             TapConsumer *tc = dynamic_cast<TapConsumer*>(iter->second.get());
790             if (tc || iter->second->doDisconnect()) {
791                 iter->second->setExpiryTime(now - 1);
792                 LOG(EXTENSION_LOG_WARNING, "%s disconnected",
793                     iter->second->logHeader());
794             }
795             else { // must be producer
796                 iter->second->setExpiryTime(now + tapKeepAlive);
797                 LOG(EXTENSION_LOG_WARNING,
798                     "%s disconnected, keep alive for %d seconds",
799                     iter->second->logHeader(), tapKeepAlive);
800             }
801             iter->second->setConnected(false);
802         }
803         else {
804             LOG(EXTENSION_LOG_WARNING,
805                 "Found half-linked tap connection at: %p", cookie);
806         }
807         map_.erase(iter);
808     }
809 }
810
811 bool TapConnMap::closeConnectionByName_UNLOCKED(const std::string &name) {
812     bool rv = false;
813     connection_t tc = findByName_UNLOCKED(name);
814     if (tc.get()) {
815         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
816         if (tp) {
817             LOG(EXTENSION_LOG_WARNING, "%s Connection is closed by force",
818                 tp->logHeader());
819             removeTapCursors_UNLOCKED(tp);
820
821             tp->setExpiryTime(ep_current_time() - 1);
822             tp->setName(ConnHandler::getAnonName());
823             tp->setDisconnect(true);
824             tp->setPaused(true);
825             rv = true;
826         }
827     }
828     return rv;
829 }
830
831 void TapConnMap::getExpiredConnections_UNLOCKED(std::list<connection_t> &deadClients) {
832     rel_time_t now = ep_current_time();
833
834     std::list<connection_t>::iterator iter;
835     for (iter = all.begin(); iter != all.end();) {
836         connection_t &tc = *iter;
837         if (tc->isConnected()) {
838             ++iter;
839             continue;
840         }
841
842         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
843
844         bool is_dead = false;
845         if (tc->getExpiryTime() <= now && !mapped(tc)) {
846             if (tp) {
847                 if (!tp->isSuspended()) {
848                     deadClients.push_back(tc);
849                     removeTapCursors_UNLOCKED(tp);
850                     iter = all.erase(iter);
851                     is_dead = true;
852                 }
853             } else {
854                 deadClients.push_back(tc);
855                 iter = all.erase(iter);
856                 is_dead = true;
857             }
858         }
859
860         if (!is_dead) {
861             ++iter;
862         }
863     }
864 }
865
866 void TapConnMap::removeTapCursors_UNLOCKED(TapProducer *tp) {
867     // Remove all the checkpoint cursors belonging to the TAP connection.
868     if (tp) {
869         const VBucketMap &vbuckets = engine.getEpStore()->getVBuckets();
870         size_t numOfVBuckets = vbuckets.getSize();
871         // Remove all the cursors belonging to the TAP connection to be purged.
872         for (size_t i = 0; i < numOfVBuckets; ++i) {
873             cb_assert(i <= std::numeric_limits<uint16_t>::max());
874             uint16_t vbid = static_cast<uint16_t>(i);
875             RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
876             if (!vb) {
877                 continue;
878             }
879             if (tp->vbucketFilter(vbid)) {
880                 LOG(EXTENSION_LOG_INFO,
881                     "%s Remove the TAP cursor from vbucket %d",
882                     tp->logHeader(), vbid);
883                 vb->checkpointManager.removeTAPCursor(tp->getName());
884             }
885         }
886     }
887 }
888
889 void CompleteBackfillTapOperation::perform(TapProducer *tc, void *) {
890     tc->completeBackfill();
891 }
892
893 void CompleteDiskBackfillTapOperation::perform(TapProducer *tc, void *) {
894     tc->completeDiskBackfill();
895 }
896
897 void ScheduleDiskBackfillTapOperation::perform(TapProducer *tc, void *) {
898     tc->scheduleDiskBackfill();
899 }
900
901 void CompletedBGFetchTapOperation::perform(TapProducer *tc, Item *arg) {
902     if (connToken != tc->getConnectionToken() && !tc->isReconnected()) {
903         delete arg;
904         return;
905     }
906     tc->completeBGFetchJob(arg, vbid, implicitEnqueue);
907 }
908
909 bool TAPSessionStats::wasReplicationCompleted(const std::string &name) const {
910     bool rv = true;
911
912     std::string backfill_stat(name + ":backfill_completed");
913     std::map<std::string, std::string>::const_iterator it = stats.find(backfill_stat);
914     if (it != stats.end() && (it->second == "false" || !normalShutdown)) {
915         rv = false;
916     }
917     std::string idle_stat(name + ":idle");
918     it = stats.find(idle_stat);
919     if (it != stats.end() && (it->second == "false" || !normalShutdown)) {
920         rv = false;
921     }
922
923     return rv;
924 }
925
926 void TAPSessionStats::clearStats(const std::string &name) {
927     std::string backfill_stat(name + ":backfill_completed");
928     stats.erase(backfill_stat);
929     std::string idle_stat(name + ":idle");
930     stats.erase(idle_stat);
931 }
932
933 DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
934     : ConnMap(e) {
935
936 }
937
938 DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
939                                      const std::string &name)
940 {
941     LockHolder lh(connsLock);
942
943     std::string conn_name("eq_dcpq:");
944     conn_name.append(name);
945
946     std::list<connection_t>::iterator iter;
947     for (iter = all.begin(); iter != all.end(); ++iter) {
948         if ((*iter)->getName() == conn_name) {
949             (*iter)->setDisconnect(true);
950             all.erase(iter);
951             break;
952         }
953     }
954
955     DcpConsumer *dcp = new DcpConsumer(engine, cookie, conn_name);
956     connection_t dc(dcp);
957     LOG(EXTENSION_LOG_INFO, "%s Connection created", dc->logHeader());
958     all.push_back(dc);
959     map_[cookie] = dc;
960     return dcp;
961
962 }
963
964 bool DcpConnMap::isPassiveStreamConnected_UNLOCKED(uint16_t vbucket) {
965     std::list<connection_t>::iterator it;
966     for(it = all.begin(); it != all.end(); it++) {
967         DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it->get());
968         if (dcpConsumer && dcpConsumer->isStreamPresent(vbucket)) {
969                 LOG(EXTENSION_LOG_DEBUG, "(vb %d) A DCP passive stream "
970                     "is already exists for the vbucket in connection: %s",
971                     vbucket, dcpConsumer->logHeader());
972                 return true;
973         }
974     }
975     return false;
976 }
977
978 ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler* conn,
979                                                uint32_t opaque,
980                                                uint16_t vbucket,
981                                                uint32_t flags)
982 {
983     cb_assert(conn);
984
985     LockHolder lh(connsLock);
986     /* Check if a stream (passive) for the vbucket is already present */
987     if (isPassiveStreamConnected_UNLOCKED(vbucket)) {
988         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Failing to add passive stream, "
989             "as one already exists for the vbucket!",
990             conn->logHeader(), vbucket);
991         return ENGINE_KEY_EEXISTS;
992     }
993
994     return conn->addStream(opaque, vbucket, flags);
995 }
996
997 DcpProducer *DcpConnMap::newProducer(const void* cookie,
998                                      const std::string &name,
999                                      bool notifyOnly)
1000 {
1001     LockHolder lh(connsLock);
1002
1003     std::string conn_name("eq_dcpq:");
1004     conn_name.append(name);
1005
1006     std::list<connection_t>::iterator iter;
1007     for (iter = all.begin(); iter != all.end(); ++iter) {
1008         if ((*iter)->getName() == conn_name) {
1009             (*iter)->setDisconnect(true);
1010             all.erase(iter);
1011             break;
1012         }
1013     }
1014
1015     DcpProducer *dcp = new DcpProducer(engine, cookie, conn_name, notifyOnly);
1016     LOG(EXTENSION_LOG_INFO, "%s Connection created", dcp->logHeader());
1017     all.push_back(connection_t(dcp));
1018     map_[cookie] = dcp;
1019
1020     return dcp;
1021 }
1022
1023 void DcpConnMap::shutdownAllConnections() {
1024     LOG(EXTENSION_LOG_WARNING, "Shutting down dcp connections!");
1025
1026     connNotifier_->stop();
1027
1028     // Not safe to acquire both connsLock and releaseLock at the same time
1029     // (can trigger deadlock), so first acquire releaseLock to release all
1030     // the connections (without changing the list/map), then drop releaseLock,
1031     // acquire connsLock and actually clear out the list/map.
1032     LockHolder rlh(releaseLock);
1033     std::list<connection_t>::iterator ii;
1034     for (ii = all.begin(); ii != all.end(); ++ii) {
1035         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
1036         (*ii)->releaseReference();
1037     }
1038     rlh.unlock();
1039
1040     LockHolder lh(connsLock);
1041     closeAllStreams_UNLOCKED();
1042     all.clear();
1043     map_.clear();
1044 }
1045
1046 void DcpConnMap::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
1047     LockHolder lh(connsLock);
1048     std::map<const void*, connection_t>::iterator itr = map_.begin();
1049     for (; itr != map_.end(); ++itr) {
1050         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1051         if (producer) {
1052             producer->vbucketStateChanged(vbucket, state);
1053         }
1054     }
1055 }
1056
1057 void DcpConnMap::closeAllStreams_UNLOCKED() {
1058     std::map<const void*, connection_t>::iterator itr = map_.begin();
1059     for (; itr != map_.end(); ++itr) {
1060         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1061         if (producer) {
1062             producer->closeAllStreams();
1063             producer->cancelCheckpointProcessorTask();
1064         } else {
1065             static_cast<DcpConsumer*>(itr->second.get())->closeAllStreams();
1066         }
1067     }
1068 }
1069
1070 void DcpConnMap::disconnect(const void *cookie) {
1071     // Move the connection matching this cookie from the `all` and map_
1072     // data structures (under connsLock).
1073     connection_t conn;
1074     {
1075         LockHolder lh(connsLock);
1076         std::list<connection_t>::iterator iter;
1077         for (iter = all.begin(); iter != all.end(); ++iter) {
1078             if ((*iter)->getCookie() == cookie) {
1079                 (*iter)->setDisconnect(true);
1080                 all.erase(iter);
1081                 break;
1082             }
1083         }
1084         std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
1085         if (itr != map_.end()) {
1086             conn = itr->second;
1087             if (conn.get()) {
1088                 LOG(EXTENSION_LOG_INFO, "%s Removing connection",
1089                     conn->logHeader());
1090                 map_.erase(itr);
1091             }
1092         }
1093     }
1094
1095     // Note we shutdown the stream *not* under the connsLock; this is
1096     // because as part of closing a DcpConsumer stream we need to
1097     // acquire PassiveStream::buffer.bufMutex; and that could deadlock
1098     // in EventuallyPersistentStore::setVBucketState, via
1099     // PassiveStream::processBufferedMessages.
1100     if (conn) {
1101         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
1102         if (producer) {
1103             producer->closeAllStreams();
1104             producer->cancelCheckpointProcessorTask();
1105         } else {
1106             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
1107         }
1108     }
1109
1110     // Finished disconnecting the stream; add it to the
1111     // deadConnections list.
1112     if (conn) {
1113         LockHolder lh(connsLock);
1114         deadConnections.push_back(conn);
1115     }
1116 }
1117
1118 void DcpConnMap::manageConnections() {
1119     std::list<connection_t> release;
1120
1121     LockHolder lh(connsLock);
1122     while (!deadConnections.empty()) {
1123         connection_t conn = deadConnections.front();
1124         release.push_back(conn);
1125         deadConnections.pop_front();
1126     }
1127
1128     const int maxIdleTime = 5;
1129     rel_time_t now = ep_current_time();
1130
1131     // Collect the list of connections that need to be signaled.
1132     std::list<connection_t> toNotify;
1133     std::map<const void*, connection_t>::iterator iter;
1134     for (iter = map_.begin(); iter != map_.end(); ++iter) {
1135         connection_t conn = iter->second;
1136         Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
1137         if (tp && (tp->isPaused() || conn->doDisconnect()) &&
1138             conn->isReserved()) {
1139             if (!tp->sentNotify() ||
1140                 (conn->getLastWalkTime() + maxIdleTime < now)) {
1141                 toNotify.push_back(iter->second);
1142             }
1143         }
1144     }
1145
1146     lh.unlock();
1147
1148     LockHolder rlh(releaseLock);
1149     std::list<connection_t>::iterator it;
1150     for (it = toNotify.begin(); it != toNotify.end(); ++it) {
1151         Notifiable *tp =
1152             static_cast<Notifiable*>(static_cast<Producer*>((*it).get()));
1153         if (tp && (*it)->isReserved()) {
1154             engine.notifyIOComplete((*it)->getCookie(), ENGINE_SUCCESS);
1155             tp->setNotifySent(true);
1156         }
1157     }
1158
1159     while (!release.empty()) {
1160         connection_t conn = release.front();
1161         conn->releaseReference();
1162         release.pop_front();
1163         removeVBConnections(conn);
1164     }
1165 }
1166
1167 void DcpConnMap::removeVBConnections(connection_t &conn) {
1168     Producer *tp = dynamic_cast<Producer*>(conn.get());
1169     if (!tp) {
1170         return;
1171     }
1172
1173     DcpProducer *prod = static_cast<DcpProducer*>(tp);
1174     std::list<uint16_t> vblist = prod->getVBList();
1175     std::list<uint16_t>::iterator it = vblist.begin();
1176     for (; it != vblist.end(); ++it) {
1177         uint16_t vbid = *it;
1178         size_t lock_num = vbid % vbConnLockNum;
1179         SpinLockHolder lh (&vbConnLocks[lock_num]);
1180         std::list<connection_t> &vb_conns = vbConns[vbid];
1181         std::list<connection_t>::iterator itr = vb_conns.begin();
1182         for (; itr != vb_conns.end(); ++itr) {
1183             if (conn->getCookie() == (*itr)->getCookie()) {
1184                 vb_conns.erase(itr);
1185                 break;
1186             }
1187         }
1188     }
1189 }
1190
1191 void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
1192 {
1193     size_t lock_num = vbid % vbConnLockNum;
1194     SpinLockHolder lh(&vbConnLocks[lock_num]);
1195
1196     std::list<connection_t> &conns = vbConns[vbid];
1197     std::list<connection_t>::iterator it = conns.begin();
1198     for (; it != conns.end(); ++it) {
1199         DcpProducer *conn = static_cast<DcpProducer*>((*it).get());
1200         conn->notifySeqnoAvailable(vbid, bySeqno);
1201     }
1202 }
1203
1204 void DcpConnMap::addStats(ADD_STAT add_stat, const void *c) {
1205     LockHolder lh(connsLock);
1206     add_casted_stat("ep_dcp_dead_conn_count", deadConnections.size(),
1207                     add_stat, c);
1208 }