MB-20054: Regression test - bucket is deleted with DCPBackfill running
[ep-engine.git] / src / connmap.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <algorithm>
21 #include <limits>
22 #include <queue>
23 #include <set>
24 #include <string>
25 #include <vector>
26
27 #include "ep_engine.h"
28 #include "executorthread.h"
29 #include "tapconnection.h"
30 #include "connmap.h"
31 #include "dcp-consumer.h"
32 #include "dcp-producer.h"
33
34 size_t ConnMap::vbConnLockNum = 32;
35 const double ConnNotifier::DEFAULT_MIN_STIME = 1.0;
36
37 /**
38  * NonIO task to free the resource of a tap connection.
39  */
40 class ConnectionReaperCallback : public GlobalTask {
41 public:
42     ConnectionReaperCallback(EventuallyPersistentEngine &e, ConnMap& cm,
43                              connection_t &conn)
44         : GlobalTask(&e, Priority::TapConnectionReaperPriority),
45           connMap(cm), connection(conn) {
46         std::stringstream ss;
47         ss << "Reaping tap or dcp connection: " << connection->getName();
48         descr = ss.str();
49     }
50
51     bool run(void) {
52         TapProducer *tp = dynamic_cast<TapProducer*>(connection.get());
53         if (tp) {
54             tp->clearQueues();
55             connMap.removeVBConnections(connection);
56         }
57         return false;
58     }
59
60     std::string getDescription() {
61         return descr;
62     }
63
64 private:
65     ConnMap &connMap;
66     connection_t connection;
67     std::string descr;
68 };
69
70 /**
71  * A Callback task for Tap connection notifier
72  */
73 class ConnNotifierCallback : public GlobalTask {
74 public:
75     ConnNotifierCallback(EventuallyPersistentEngine *e, ConnNotifier *notifier)
76     : GlobalTask(e, Priority::TapConnNotificationPriority),
77       connNotifier(notifier) { }
78
79     bool run(void) {
80         return connNotifier->notifyConnections();
81     }
82
83     std::string getDescription() {
84         if (connNotifier->getNotifierType() == TAP_CONN_NOTIFIER) {
85             return std::string("TAP connection notifier");
86         } else {
87             return std::string("DCP connection notifier");
88         }
89     }
90
91 private:
92     ConnNotifier *connNotifier;
93 };
94
95 void ConnNotifier::start() {
96     bool inverse = false;
97     pendingNotification.compare_exchange_strong(inverse, true);
98     ExTask connotifyTask = new ConnNotifierCallback(&connMap.getEngine(), this);
99     task = ExecutorPool::get()->schedule(connotifyTask, NONIO_TASK_IDX);
100     cb_assert(task);
101 }
102
103 void ConnNotifier::stop() {
104     bool inverse = true;
105     pendingNotification.compare_exchange_strong(inverse, false);
106     ExecutorPool::get()->cancel(task);
107 }
108
109 void ConnNotifier::notifyMutationEvent(void) {
110     bool inverse = false;
111     if (pendingNotification.compare_exchange_strong(inverse, true)) {
112         if (task > 0) {
113             ExecutorPool::get()->wake(task);
114         }
115     }
116 }
117
118 void ConnNotifier::wake() {
119     ExecutorPool::get()->wake(task);
120 }
121
122 bool ConnNotifier::notifyConnections() {
123     bool inverse = true;
124     pendingNotification.compare_exchange_strong(inverse, false);
125     connMap.notifyAllPausedConnections();
126
127     if (!pendingNotification.load()) {
128         ExecutorPool::get()->snooze(task, DEFAULT_MIN_STIME);
129         if (pendingNotification.load()) {
130             // Check again if a new notification is arrived right before
131             // calling snooze() above.
132             ExecutorPool::get()->snooze(task, 0);
133         }
134     }
135
136     return true;
137 }
138
139 /**
140  * A task to manage connections.
141  */
142 class ConnManager : public GlobalTask {
143 public:
144     ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
145         : GlobalTask(e, Priority::TapConnMgrPriority, MIN_SLEEP_TIME, false),
146           engine(e), connmap(cmap) { }
147
148     bool run(void) {
149         if (engine->getEpStats().isShutdown) {
150             return false;
151         }
152         connmap->manageConnections();
153         snooze(MIN_SLEEP_TIME);
154         return true;
155     }
156
157     std::string getDescription() {
158         return std::string("Connection Manager");
159     }
160
161 private:
162     EventuallyPersistentEngine *engine;
163     ConnMap *connmap;
164 };
165
166 class ConnMapValueChangeListener : public ValueChangedListener {
167 public:
168     ConnMapValueChangeListener(TapConnMap &tc)
169         : connmap_(tc) {
170     }
171
172     virtual void sizeValueChanged(const std::string &key, size_t value) {
173         if (key.compare("tap_noop_interval") == 0) {
174             connmap_.setNoopInterval(value);
175         }
176     }
177
178 private:
179     TapConnMap &connmap_;
180 };
181
182 ConnMap::ConnMap(EventuallyPersistentEngine &theEngine)
183     :  engine(theEngine),
184        connNotifier_(NULL) {
185
186     Configuration &config = engine.getConfiguration();
187     vbConnLocks = new SpinLock[vbConnLockNum];
188     size_t max_vbs = config.getMaxVbuckets();
189     for (size_t i = 0; i < max_vbs; ++i) {
190         vbConns.push_back(std::list<connection_t>());
191     }
192 }
193
194 void ConnMap::initialize(conn_notifier_type ntype) {
195     connNotifier_ = new ConnNotifier(ntype, *this);
196     connNotifier_->start();
197     ExTask connMgr = new ConnManager(&engine, this);
198     ExecutorPool::get()->schedule(connMgr, NONIO_TASK_IDX);
199 }
200
201 ConnMap::~ConnMap() {
202     delete [] vbConnLocks;
203     if (connNotifier_ != NULL) {
204         connNotifier_->stop();
205     }
206     delete connNotifier_;
207 }
208
209 connection_t ConnMap::findByName(const std::string &name) {
210     LockHolder lh(connsLock);
211     return findByName_UNLOCKED(name);
212 }
213
214 connection_t ConnMap::findByName_UNLOCKED(const std::string&name) {
215     connection_t rv(NULL);
216     std::list<connection_t>::iterator iter;
217     for (iter = all.begin(); iter != all.end(); ++iter) {
218         if ((*iter)->getName() == name) {
219             rv = *iter;
220         }
221     }
222     return rv;
223 }
224
225 void ConnMap::notifyPausedConnection(connection_t conn, bool schedule) {
226     if (engine.getEpStats().isShutdown) {
227         return;
228     }
229
230     Notifiable* tp = dynamic_cast<Notifiable*>(conn.get());
231     if (schedule) {
232         if (tp && tp->isPaused() && conn->isReserved() &&
233             tp->setNotificationScheduled(true)) {
234             pendingNotifications.push(conn);
235             connNotifier_->notifyMutationEvent(); // Wake up the connection notifier so that
236                                                   // it can notify the event to a given
237                                                   // paused connection.
238         }
239     } else {
240         LockHolder rlh(releaseLock);
241         if (tp && tp->isPaused() && conn->isReserved()) {
242             engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
243             tp->setNotifySent(true);
244         }
245     }
246 }
247
248 void ConnMap::notifyAllPausedConnections() {
249     std::queue<connection_t> queue;
250     pendingNotifications.getAll(queue);
251
252     LockHolder rlh(releaseLock);
253     while (!queue.empty()) {
254         connection_t &conn = queue.front();
255         Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
256         if (tp) {
257             tp->setNotificationScheduled(false);
258             if (tp->isPaused() && conn->isReserved()) {
259                 engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
260                 tp->setNotifySent(true);
261             }
262         }
263         queue.pop();
264     }
265 }
266
267 bool ConnMap::notificationQueueEmpty() {
268     return pendingNotifications.empty();
269 }
270
271 void ConnMap::updateVBConnections(connection_t &conn,
272                                         const std::vector<uint16_t> &vbuckets)
273 {
274     Producer *tp = dynamic_cast<Producer*>(conn.get());
275     if (!tp) {
276         return;
277     }
278
279     VBucketFilter new_filter(vbuckets);
280     VBucketFilter diff = tp->getVBucketFilter().filter_diff(new_filter);
281     const std::set<uint16_t> &vset = diff.getVBSet();
282
283     for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
284         size_t lock_num = (*it) % vbConnLockNum;
285         SpinLockHolder lh (&vbConnLocks[lock_num]);
286         // Remove the connection that is no longer for a given vbucket
287         if (!tp->vbucketFilter.empty() && tp->vbucketFilter(*it)) {
288             std::list<connection_t> &vb_conns = vbConns[*it];
289             std::list<connection_t>::iterator itr = vb_conns.begin();
290             for (; itr != vb_conns.end(); ++itr) {
291                 if (conn->getCookie() == (*itr)->getCookie()) {
292                     vb_conns.erase(itr);
293                     break;
294                 }
295             }
296         } else { // Add the connection to the vbucket replicator list.
297             std::list<connection_t> &vb_conns = vbConns[*it];
298             vb_conns.push_back(conn);
299         }
300     }
301 }
302
303 void ConnMap::removeVBConnections(connection_t &conn) {
304     Producer *tp = dynamic_cast<Producer*>(conn.get());
305     if (!tp) {
306         return;
307     }
308
309     const std::set<uint16_t> &vset = tp->vbucketFilter.getVBSet();
310     for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
311         size_t lock_num = (*it) % vbConnLockNum;
312         SpinLockHolder lh (&vbConnLocks[lock_num]);
313         std::list<connection_t> &vb_conns = vbConns[*it];
314         std::list<connection_t>::iterator itr = vb_conns.begin();
315         for (; itr != vb_conns.end(); ++itr) {
316             if (conn->getCookie() == (*itr)->getCookie()) {
317                 vb_conns.erase(itr);
318                 break;
319             }
320         }
321     }
322 }
323
324 void ConnMap::addVBConnByVBId(connection_t &conn, int16_t vbid) {
325     if (!conn.get()) {
326         return;
327     }
328
329     size_t lock_num = vbid % vbConnLockNum;
330     SpinLockHolder lh (&vbConnLocks[lock_num]);
331     std::list<connection_t> &vb_conns = vbConns[vbid];
332     vb_conns.push_back(conn);
333 }
334
335 void ConnMap::removeVBConnByVBId_UNLOCKED(connection_t &conn, int16_t vbid) {
336     if (!conn.get()) {
337         return;
338     }
339
340     std::list<connection_t> &vb_conns = vbConns[vbid];
341     std::list<connection_t>::iterator itr = vb_conns.begin();
342     for (; itr != vb_conns.end(); ++itr) {
343         if (conn->getCookie() == (*itr)->getCookie()) {
344             vb_conns.erase(itr);
345             break;
346         }
347     }
348 }
349
350 void ConnMap::removeVBConnByVBId(connection_t &conn, int16_t vbid) {
351     size_t lock_num = vbid % vbConnLockNum;
352     SpinLockHolder lh (&vbConnLocks[lock_num]);
353     removeVBConnByVBId_UNLOCKED(conn, vbid);
354 }
355
356
357 TapConnMap::TapConnMap(EventuallyPersistentEngine &e)
358     : ConnMap(e), nextNoop_(0) {
359
360     Configuration &config = engine.getConfiguration();
361     noopInterval_ = config.getTapNoopInterval();
362     config.addValueChangedListener("tap_noop_interval",
363                                    new ConnMapValueChangeListener(*this));
364 }
365
366 TapConsumer *TapConnMap::newConsumer(const void* cookie)
367 {
368     LockHolder lh(connsLock);
369     TapConsumer *tc = new TapConsumer(engine, cookie, ConnHandler::getAnonName());
370     connection_t tap(tc);
371     LOG(EXTENSION_LOG_INFO, "%s created", tap->logHeader());
372     all.push_back(tap);
373     map_[cookie] = tap;
374     return tc;
375 }
376
377 TapProducer *TapConnMap::newProducer(const void* cookie,
378                                      const std::string &name,
379                                      uint32_t flags,
380                                      uint64_t backfillAge,
381                                      int tapKeepAlive,
382                                      const std::vector<uint16_t> &vbuckets,
383                                      const std::map<uint16_t, uint64_t> &lastCheckpointIds)
384 {
385     LockHolder lh(connsLock);
386     TapProducer *producer(NULL);
387
388     std::list<connection_t>::iterator iter;
389     for (iter = all.begin(); iter != all.end(); ++iter) {
390         producer = dynamic_cast<TapProducer*>((*iter).get());
391         if (producer && producer->getName() == name) {
392             producer->setExpiryTime((rel_time_t)-1);
393             producer->reconnected();
394             break;
395         }
396         else {
397             producer = NULL;
398         }
399     }
400
401     if (producer != NULL) {
402         const void *old_cookie = producer->getCookie();
403         cb_assert(old_cookie);
404         map_.erase(old_cookie);
405
406         if (tapKeepAlive == 0 || (producer->mayCompleteDumpOrTakeover() && producer->idle())) {
407             LOG(EXTENSION_LOG_INFO,
408                 "%s keep alive timed out, should be nuked", producer->logHeader());
409             producer->setName(ConnHandler::getAnonName());
410             producer->setDisconnect(true);
411             producer->setConnected(false);
412             producer->setPaused(true);
413             producer->setExpiryTime(ep_current_time() - 1);
414             producer = NULL;
415         }
416         else {
417             LOG(EXTENSION_LOG_INFO, "%s exists... grabbing the channel",
418                 producer->logHeader());
419             // Create the dummy expired producer connection for the old connection cookie.
420             // This dummy producer connection will be used for releasing the corresponding
421             // memcached connection.
422
423             // dliao: TODO no need to deal with tap or dcp separately here for the dummy?
424             TapProducer *n = new TapProducer(engine,
425                                              old_cookie,
426                                              ConnHandler::getAnonName(),
427                                              0);
428             n->setDisconnect(true);
429             n->setConnected(false);
430             n->setPaused(true);
431             n->setExpiryTime(ep_current_time() - 1);
432             all.push_back(connection_t(n));
433         }
434     }
435
436     bool reconnect = false;
437     if (producer == NULL) {
438         producer = new TapProducer(engine, cookie, name, flags);
439         LOG(EXTENSION_LOG_INFO, "%s created", producer->logHeader());
440         all.push_back(connection_t(producer));
441     } else {
442         producer->setCookie(cookie);
443         producer->setReserved(true);
444         producer->setConnected(true);
445         producer->setDisconnect(false);
446         reconnect = true;
447     }
448     producer->evaluateFlags();
449
450     connection_t conn(producer);
451     updateVBConnections(conn, vbuckets);
452
453     producer->setFlagByteorderSupport((flags & TAP_CONNECT_TAP_FIX_FLAG_BYTEORDER) != 0);
454     producer->setBackfillAge(backfillAge, reconnect);
455     producer->setVBucketFilter(vbuckets);
456     producer->registerCursor(lastCheckpointIds);
457
458     if (reconnect) {
459         producer->rollback();
460     }
461
462     map_[cookie] = conn;
463     engine.storeEngineSpecific(cookie, producer);
464     // Clear all previous session stats for this producer.
465     clearPrevSessionStats(producer->getName());
466
467     return producer;
468
469 }
470
471 void TapConnMap::manageConnections() {
472     // To avoid connections to be stucked in a bogus state forever, we're going
473     // to ping all connections that hasn't tried to walk the tap queue
474     // for this amount of time..
475     const int maxIdleTime = 5;
476
477     bool addNoop = false;
478
479     rel_time_t now = ep_current_time();
480     if (now > nextNoop_ && noopInterval_ != (size_t)-1) {
481         addNoop = true;
482         nextNoop_ = now + noopInterval_;
483     }
484
485     std::list<connection_t> deadClients;
486
487     LockHolder lh(connsLock);
488     // We should pause unless we purged some connections or
489     // all queues have items.
490     getExpiredConnections_UNLOCKED(deadClients);
491
492     // see if I have some channels that I have to signal..
493     std::map<const void*, connection_t>::iterator iter;
494     for (iter = map_.begin(); iter != map_.end(); ++iter) {
495         TapProducer *tp = dynamic_cast<TapProducer*>(iter->second.get());
496         if (tp != NULL) {
497             if (tp->supportsAck() && (tp->getExpiryTime() < now) && tp->windowIsFull()) {
498                 LOG(EXTENSION_LOG_WARNING,
499                     "%s Expired and ack windows is full. Disconnecting...",
500                     tp->logHeader());
501                 tp->setDisconnect(true);
502             } else if (addNoop) {
503                 tp->setTimeForNoop();
504             }
505         }
506     }
507
508     // Collect the list of connections that need to be signaled.
509     std::list<connection_t> toNotify;
510     for (iter = map_.begin(); iter != map_.end(); ++iter) {
511         TapProducer *tp = dynamic_cast<TapProducer*>(iter->second.get());
512         if (tp && (tp->isPaused() || tp->doDisconnect()) && !tp->isSuspended()
513             && tp->isReserved()) {
514             if (!tp->sentNotify() ||
515                 (tp->getLastWalkTime() + maxIdleTime < now)) {
516                 toNotify.push_back(iter->second);
517             }
518         }
519     }
520
521     lh.unlock();
522
523     LockHolder rlh(releaseLock);
524     std::list<connection_t>::iterator it;
525     for (it = toNotify.begin(); it != toNotify.end(); ++it) {
526         TapProducer *tp = dynamic_cast<TapProducer*>((*it).get());
527         if (tp && tp->isReserved()) {
528             engine.notifyIOComplete(tp->getCookie(), ENGINE_SUCCESS);
529             tp->setNotifySent(true);
530         }
531     }
532
533     // Delete all of the dead clients
534     std::list<connection_t>::iterator ii;
535     for (ii = deadClients.begin(); ii != deadClients.end(); ++ii) {
536         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
537         (*ii)->releaseReference();
538         TapProducer *tp = dynamic_cast<TapProducer*>((*ii).get());
539         if (tp) {
540             ExTask reapTask = new ConnectionReaperCallback(engine, *this, *ii);
541             ExecutorPool::get()->schedule(reapTask, NONIO_TASK_IDX);
542         }
543     }
544 }
545
546 void TapConnMap::notifyVBConnections(uint16_t vbid)
547 {
548     size_t lock_num = vbid % vbConnLockNum;
549     SpinLockHolder lh(&vbConnLocks[lock_num]);
550
551     std::list<connection_t> &conns = vbConns[vbid];
552     std::list<connection_t>::iterator it = conns.begin();
553     for (; it != conns.end(); ++it) {
554         Notifiable *conn = dynamic_cast<Notifiable*>((*it).get());
555         if (conn && conn->isPaused() && (*it)->isReserved() &&
556             conn->setNotificationScheduled(true)) {
557             pendingNotifications.push(*it);
558             connNotifier_->notifyMutationEvent();
559         }
560     }
561     lh.unlock();
562 }
563
564 bool TapConnMap::setEvents(const std::string &name, std::list<queued_item> *q) {
565     bool found(false);
566     LockHolder lh(connsLock);
567
568     connection_t tc = findByName_UNLOCKED(name);
569     if (tc.get()) {
570         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
571         cb_assert(tp);
572         found = true;
573         tp->appendQueue(q);
574         lh.unlock();
575         notifyPausedConnection(tp, false);
576     }
577
578     return found;
579 }
580
581 void TapConnMap::incrBackfillRemaining(const std::string &name,
582                                        size_t num_backfill_items) {
583     LockHolder lh(connsLock);
584
585     connection_t tc = findByName_UNLOCKED(name);
586     if (tc.get()) {
587         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
588         cb_assert(tp);
589         tp->incrBackfillRemaining(num_backfill_items);
590     }
591 }
592
593 ssize_t TapConnMap::backfillQueueDepth(const std::string &name) {
594     ssize_t rv(-1);
595     LockHolder lh(connsLock);
596
597     connection_t tc = findByName_UNLOCKED(name);
598     if (tc.get()) {
599         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
600         cb_assert(tp);
601         rv = tp->getBackfillQueueSize();
602     }
603
604     return rv;
605 }
606
607 void TapConnMap::resetReplicaChain() {
608     LockHolder lh(connsLock);
609     rel_time_t now = ep_current_time();
610     std::list<connection_t>::iterator it = all.begin();
611     for (; it != all.end(); ++it) {
612         connection_t &tc = *it;
613         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
614         if (!(tp && (tp->isConnected() || tp->getExpiryTime() > now))) {
615             continue;
616         }
617         LOG(EXTENSION_LOG_INFO, "%s Reset the replication chain",
618             tp->logHeader());
619         // Get the list of vbuckets that each TAP producer is replicating
620         VBucketFilter vbfilter = tp->getVBucketFilter();
621         std::vector<uint16_t> vblist (vbfilter.getVBSet().begin(), vbfilter.getVBSet().end());
622         // TAP producer sends INITIAL_VBUCKET_STREAM messages to the destination to reset
623         // replica vbuckets, and then backfills items to the destination.
624         tp->scheduleBackfill(vblist);
625         notifyPausedConnection(tp, true);
626     }
627 }
628
629 bool TapConnMap::isBackfillCompleted(std::string &name) {
630     LockHolder lh(connsLock);
631     connection_t tc = findByName_UNLOCKED(name);
632     if (tc.get()) {
633         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
634         if (tp) {
635             return tp->isBackfillCompleted();
636         }
637     }
638     return false;
639 }
640
641 void TapConnMap::addFlushEvent() {
642     LockHolder lh(connsLock);
643     std::list<connection_t>::iterator iter;
644     for (iter = all.begin(); iter != all.end(); iter++) {
645         TapProducer *tp = dynamic_cast<TapProducer*>((*iter).get());
646         if (tp && !tp->dumpQueue) {
647             tp->flush();
648         }
649     }
650 }
651
652 void TapConnMap::scheduleBackfill(const std::set<uint16_t> &backfillVBuckets) {
653     LockHolder lh(connsLock);
654     rel_time_t now = ep_current_time();
655     std::list<connection_t>::iterator it = all.begin();
656     for (; it != all.end(); ++it) {
657         connection_t &tc = *it;
658         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
659         if (!(tp && (tp->isConnected() || tp->getExpiryTime() > now))) {
660             continue;
661         }
662
663         std::vector<uint16_t> vblist;
664         std::set<uint16_t>::const_iterator vb_it = backfillVBuckets.begin();
665         for (; vb_it != backfillVBuckets.end(); ++vb_it) {
666             if (tp->checkVBucketFilter(*vb_it)) {
667                 vblist.push_back(*vb_it);
668             }
669         }
670         if (!vblist.empty()) {
671             tp->scheduleBackfill(vblist);
672             notifyPausedConnection(tp, true);
673         }
674     }
675 }
676
677 void TapConnMap::loadPrevSessionStats(const std::map<std::string, std::string> &session_stats) {
678     LockHolder lh(connsLock);
679     std::map<std::string, std::string>::const_iterator it =
680         session_stats.find("ep_force_shutdown");
681
682     if (it != session_stats.end()) {
683         if (it->second.compare("true") == 0) {
684             prevSessionStats.normalShutdown = false;
685         }
686     } else if (!session_stats.empty()) { // possible crash on the previous session.
687         prevSessionStats.normalShutdown = false;
688     }
689
690     std::string tap_prefix("eq_tapq:");
691     for (it = session_stats.begin(); it != session_stats.end(); ++it) {
692         const std::string &stat_name = it->first;
693         if (stat_name.substr(0, 8).compare(tap_prefix) == 0) {
694             if (stat_name.find("backfill_completed") != std::string::npos ||
695                 stat_name.find("idle") != std::string::npos) {
696                 prevSessionStats.stats[stat_name] = it->second;
697             }
698         }
699     }
700 }
701
702 bool TapConnMap::changeVBucketFilter(const std::string &name,
703                                      const std::vector<uint16_t> &vbuckets,
704                                      const std::map<uint16_t, uint64_t> &checkpoints) {
705     bool rv = false;
706     LockHolder lh(connsLock);
707     connection_t tc = findByName_UNLOCKED(name);
708     if (tc.get()) {
709         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
710         if (tp && (tp->isConnected() || tp->getExpiryTime() > ep_current_time())) {
711             LOG(EXTENSION_LOG_INFO, "%s Change the vbucket filter",
712                 tp->logHeader());
713             updateVBConnections(tc, vbuckets);
714             tp->setVBucketFilter(vbuckets, true);
715             tp->registerCursor(checkpoints);
716             rv = true;
717             lh.unlock();
718             notifyPausedConnection(tp, true);
719         }
720     }
721     return rv;
722 }
723
724 bool TapConnMap::checkConnectivity(const std::string &name) {
725     LockHolder lh(connsLock);
726     rel_time_t now = ep_current_time();
727     connection_t tc = findByName_UNLOCKED(name);
728     if (tc.get()) {
729         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
730         if (tp && (tp->isConnected() || tp->getExpiryTime() > now)) {
731             return true;
732         }
733     }
734     return false;
735 }
736
737 bool TapConnMap::closeConnectionByName(const std::string &name) {
738
739     LockHolder lh(connsLock);
740     return closeConnectionByName_UNLOCKED(name);
741 }
742
743 bool TapConnMap::mapped(connection_t &tc) {
744     bool rv = false;
745     std::map<const void*, connection_t>::iterator it;
746     for (it = map_.begin(); it != map_.end(); ++it) {
747         if (it->second.get() == tc.get()) {
748             rv = true;
749             break;
750         }
751     }
752     return rv;
753 }
754
755 void TapConnMap::shutdownAllConnections() {
756     LOG(EXTENSION_LOG_WARNING, "Shutting down tap connections!");
757
758     if (connNotifier_ != NULL) {
759         connNotifier_->stop();
760     }
761
762     // Not safe to acquire both connsLock and releaseLock at the same time
763     // (can trigger deadlock), so first acquire releaseLock to release all
764     // the connections (without changing the list/map), then drop releaseLock,
765     // acquire connsLock and actually clear out the list/map.
766     LockHolder rlh(releaseLock);
767     std::list<connection_t>::iterator ii;
768     for (ii = all.begin(); ii != all.end(); ++ii) {
769         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
770         (*ii)->releaseReference();
771         TapProducer *tp = dynamic_cast<TapProducer*>((*ii).get());
772         if (tp) {
773             tp->clearQueues();
774         }
775     }
776     rlh.unlock();
777
778     LockHolder lh(connsLock);
779     all.clear();
780     map_.clear();
781 }
782
783 void TapConnMap::disconnect(const void *cookie) {
784     LockHolder lh(connsLock);
785
786     Configuration& config = engine.getConfiguration();
787     int tapKeepAlive = static_cast<int>(config.getTapKeepalive());
788     std::map<const void*, connection_t>::iterator iter(map_.find(cookie));
789     if (iter != map_.end()) {
790         if (iter->second.get()) {
791             rel_time_t now = ep_current_time();
792             TapConsumer *tc = dynamic_cast<TapConsumer*>(iter->second.get());
793             if (tc || iter->second->doDisconnect()) {
794                 iter->second->setExpiryTime(now - 1);
795                 LOG(EXTENSION_LOG_WARNING, "%s disconnected",
796                     iter->second->logHeader());
797             }
798             else { // must be producer
799                 iter->second->setExpiryTime(now + tapKeepAlive);
800                 LOG(EXTENSION_LOG_WARNING,
801                     "%s disconnected, keep alive for %d seconds",
802                     iter->second->logHeader(), tapKeepAlive);
803             }
804             iter->second->setConnected(false);
805         }
806         else {
807             LOG(EXTENSION_LOG_WARNING,
808                 "Found half-linked tap connection at: %p", cookie);
809         }
810         map_.erase(iter);
811     }
812 }
813
814 bool TapConnMap::closeConnectionByName_UNLOCKED(const std::string &name) {
815     bool rv = false;
816     connection_t tc = findByName_UNLOCKED(name);
817     if (tc.get()) {
818         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
819         if (tp) {
820             LOG(EXTENSION_LOG_WARNING, "%s Connection is closed by force",
821                 tp->logHeader());
822             removeTapCursors_UNLOCKED(tp);
823
824             tp->setExpiryTime(ep_current_time() - 1);
825             tp->setName(ConnHandler::getAnonName());
826             tp->setDisconnect(true);
827             tp->setPaused(true);
828             rv = true;
829         }
830     }
831     return rv;
832 }
833
834 void TapConnMap::getExpiredConnections_UNLOCKED(std::list<connection_t> &deadClients) {
835     rel_time_t now = ep_current_time();
836
837     std::list<connection_t>::iterator iter;
838     for (iter = all.begin(); iter != all.end();) {
839         connection_t &tc = *iter;
840         if (tc->isConnected()) {
841             ++iter;
842             continue;
843         }
844
845         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
846
847         bool is_dead = false;
848         if (tc->getExpiryTime() <= now && !mapped(tc)) {
849             if (tp) {
850                 if (!tp->isSuspended()) {
851                     deadClients.push_back(tc);
852                     removeTapCursors_UNLOCKED(tp);
853                     iter = all.erase(iter);
854                     is_dead = true;
855                 }
856             } else {
857                 deadClients.push_back(tc);
858                 iter = all.erase(iter);
859                 is_dead = true;
860             }
861         }
862
863         if (!is_dead) {
864             ++iter;
865         }
866     }
867 }
868
869 void TapConnMap::removeTapCursors_UNLOCKED(TapProducer *tp) {
870     // Remove all the checkpoint cursors belonging to the TAP connection.
871     if (tp) {
872         const VBucketMap &vbuckets = engine.getEpStore()->getVBuckets();
873         size_t numOfVBuckets = vbuckets.getSize();
874         // Remove all the cursors belonging to the TAP connection to be purged.
875         for (size_t i = 0; i < numOfVBuckets; ++i) {
876             cb_assert(i <= std::numeric_limits<uint16_t>::max());
877             uint16_t vbid = static_cast<uint16_t>(i);
878             RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
879             if (!vb) {
880                 continue;
881             }
882             if (tp->vbucketFilter(vbid)) {
883                 LOG(EXTENSION_LOG_INFO,
884                     "%s Remove the TAP cursor from vbucket %d",
885                     tp->logHeader(), vbid);
886                 vb->checkpointManager.removeTAPCursor(tp->getName());
887             }
888         }
889     }
890 }
891
892 void CompleteBackfillTapOperation::perform(TapProducer *tc, void *) {
893     tc->completeBackfill();
894 }
895
896 void CompleteDiskBackfillTapOperation::perform(TapProducer *tc, void *) {
897     tc->completeDiskBackfill();
898 }
899
900 void ScheduleDiskBackfillTapOperation::perform(TapProducer *tc, void *) {
901     tc->scheduleDiskBackfill();
902 }
903
904 void CompletedBGFetchTapOperation::perform(TapProducer *tc, Item *arg) {
905     if (connToken != tc->getConnectionToken() && !tc->isReconnected()) {
906         delete arg;
907         return;
908     }
909     tc->completeBGFetchJob(arg, vbid, implicitEnqueue);
910 }
911
912 bool TAPSessionStats::wasReplicationCompleted(const std::string &name) const {
913     bool rv = true;
914
915     std::string backfill_stat(name + ":backfill_completed");
916     std::map<std::string, std::string>::const_iterator it = stats.find(backfill_stat);
917     if (it != stats.end() && (it->second == "false" || !normalShutdown)) {
918         rv = false;
919     }
920     std::string idle_stat(name + ":idle");
921     it = stats.find(idle_stat);
922     if (it != stats.end() && (it->second == "false" || !normalShutdown)) {
923         rv = false;
924     }
925
926     return rv;
927 }
928
929 void TAPSessionStats::clearStats(const std::string &name) {
930     std::string backfill_stat(name + ":backfill_completed");
931     stats.erase(backfill_stat);
932     std::string idle_stat(name + ":idle");
933     stats.erase(idle_stat);
934 }
935
936 DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
937     : ConnMap(e) {
938
939 }
940
941 DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
942                                      const std::string &name)
943 {
944     LockHolder lh(connsLock);
945
946     std::string conn_name("eq_dcpq:");
947     conn_name.append(name);
948
949     std::list<connection_t>::iterator iter;
950     for (iter = all.begin(); iter != all.end(); ++iter) {
951         if ((*iter)->getName() == conn_name) {
952             (*iter)->setDisconnect(true);
953             all.erase(iter);
954             break;
955         }
956     }
957
958     DcpConsumer *dcp = new DcpConsumer(engine, cookie, conn_name);
959     connection_t dc(dcp);
960     LOG(EXTENSION_LOG_INFO, "%s Connection created", dc->logHeader());
961     all.push_back(dc);
962     map_[cookie] = dc;
963     return dcp;
964
965 }
966
967 bool DcpConnMap::isPassiveStreamConnected_UNLOCKED(uint16_t vbucket) {
968     std::list<connection_t>::iterator it;
969     for(it = all.begin(); it != all.end(); it++) {
970         DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it->get());
971         if (dcpConsumer && dcpConsumer->isStreamPresent(vbucket)) {
972                 LOG(EXTENSION_LOG_DEBUG, "(vb %d) A DCP passive stream "
973                     "is already exists for the vbucket in connection: %s",
974                     vbucket, dcpConsumer->logHeader());
975                 return true;
976         }
977     }
978     return false;
979 }
980
981 ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler* conn,
982                                                uint32_t opaque,
983                                                uint16_t vbucket,
984                                                uint32_t flags)
985 {
986     cb_assert(conn);
987
988     LockHolder lh(connsLock);
989     /* Check if a stream (passive) for the vbucket is already present */
990     if (isPassiveStreamConnected_UNLOCKED(vbucket)) {
991         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Failing to add passive stream, "
992             "as one already exists for the vbucket!",
993             conn->logHeader(), vbucket);
994         return ENGINE_KEY_EEXISTS;
995     }
996
997     return conn->addStream(opaque, vbucket, flags);
998 }
999
1000 DcpProducer *DcpConnMap::newProducer(const void* cookie,
1001                                      const std::string &name,
1002                                      bool notifyOnly)
1003 {
1004     LockHolder lh(connsLock);
1005
1006     std::string conn_name("eq_dcpq:");
1007     conn_name.append(name);
1008
1009     std::list<connection_t>::iterator iter;
1010     for (iter = all.begin(); iter != all.end(); ++iter) {
1011         if ((*iter)->getName() == conn_name) {
1012             (*iter)->setDisconnect(true);
1013             all.erase(iter);
1014             break;
1015         }
1016     }
1017
1018     DcpProducer *dcp = new DcpProducer(engine, cookie, conn_name, notifyOnly);
1019     LOG(EXTENSION_LOG_INFO, "%s Connection created", dcp->logHeader());
1020     all.push_back(connection_t(dcp));
1021     map_[cookie] = dcp;
1022
1023     return dcp;
1024 }
1025
1026 void DcpConnMap::shutdownAllConnections() {
1027     LOG(EXTENSION_LOG_WARNING, "Shutting down dcp connections!");
1028
1029     if (connNotifier_ != NULL) {
1030         connNotifier_->stop();
1031     }
1032
1033     // Not safe to acquire both connsLock and releaseLock at the same time
1034     // (can trigger deadlock), so first acquire releaseLock to release all
1035     // the connections (without changing the list/map), then drop releaseLock,
1036     // acquire connsLock and actually clear out the list/map.
1037     LockHolder rlh(releaseLock);
1038     std::list<connection_t>::iterator ii;
1039     for (ii = all.begin(); ii != all.end(); ++ii) {
1040         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
1041         (*ii)->releaseReference();
1042     }
1043     rlh.unlock();
1044
1045     LockHolder lh(connsLock);
1046     closeAllStreams_UNLOCKED();
1047     all.clear();
1048     map_.clear();
1049 }
1050
1051 void DcpConnMap::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
1052     LockHolder lh(connsLock);
1053     std::map<const void*, connection_t>::iterator itr = map_.begin();
1054     for (; itr != map_.end(); ++itr) {
1055         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1056         if (producer) {
1057             producer->vbucketStateChanged(vbucket, state);
1058         }
1059     }
1060 }
1061
1062 void DcpConnMap::closeAllStreams_UNLOCKED() {
1063     std::map<const void*, connection_t>::iterator itr = map_.begin();
1064     for (; itr != map_.end(); ++itr) {
1065         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1066         if (producer) {
1067             producer->closeAllStreams();
1068             producer->cancelCheckpointProcessorTask();
1069         } else {
1070             static_cast<DcpConsumer*>(itr->second.get())->closeAllStreams();
1071         }
1072     }
1073 }
1074
1075 void DcpConnMap::disconnect(const void *cookie) {
1076     // Move the connection matching this cookie from the `all` and map_
1077     // data structures (under connsLock).
1078     connection_t conn;
1079     {
1080         LockHolder lh(connsLock);
1081         std::list<connection_t>::iterator iter;
1082         for (iter = all.begin(); iter != all.end(); ++iter) {
1083             if ((*iter)->getCookie() == cookie) {
1084                 (*iter)->setDisconnect(true);
1085                 all.erase(iter);
1086                 break;
1087             }
1088         }
1089         std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
1090         if (itr != map_.end()) {
1091             conn = itr->second;
1092             if (conn.get()) {
1093                 LOG(EXTENSION_LOG_INFO, "%s Removing connection",
1094                     conn->logHeader());
1095                 map_.erase(itr);
1096             }
1097         }
1098     }
1099
1100     // Note we shutdown the stream *not* under the connsLock; this is
1101     // because as part of closing a DcpConsumer stream we need to
1102     // acquire PassiveStream::buffer.bufMutex; and that could deadlock
1103     // in EventuallyPersistentStore::setVBucketState, via
1104     // PassiveStream::processBufferedMessages.
1105     if (conn) {
1106         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
1107         if (producer) {
1108             producer->closeAllStreams();
1109             producer->cancelCheckpointProcessorTask();
1110         } else {
1111             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
1112         }
1113     }
1114
1115     // Finished disconnecting the stream; add it to the
1116     // deadConnections list.
1117     if (conn) {
1118         LockHolder lh(connsLock);
1119         deadConnections.push_back(conn);
1120     }
1121 }
1122
1123 void DcpConnMap::manageConnections() {
1124     std::list<connection_t> release;
1125
1126     LockHolder lh(connsLock);
1127     while (!deadConnections.empty()) {
1128         connection_t conn = deadConnections.front();
1129         release.push_back(conn);
1130         deadConnections.pop_front();
1131     }
1132
1133     const int maxIdleTime = 5;
1134     rel_time_t now = ep_current_time();
1135
1136     // Collect the list of connections that need to be signaled.
1137     std::list<connection_t> toNotify;
1138     std::map<const void*, connection_t>::iterator iter;
1139     for (iter = map_.begin(); iter != map_.end(); ++iter) {
1140         connection_t conn = iter->second;
1141         Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
1142         if (tp && (tp->isPaused() || conn->doDisconnect()) &&
1143             conn->isReserved()) {
1144             if (!tp->sentNotify() ||
1145                 (conn->getLastWalkTime() + maxIdleTime < now)) {
1146                 toNotify.push_back(iter->second);
1147             }
1148         }
1149     }
1150
1151     lh.unlock();
1152
1153     LockHolder rlh(releaseLock);
1154     std::list<connection_t>::iterator it;
1155     for (it = toNotify.begin(); it != toNotify.end(); ++it) {
1156         Notifiable *tp =
1157             static_cast<Notifiable*>(static_cast<Producer*>((*it).get()));
1158         if (tp && (*it)->isReserved()) {
1159             engine.notifyIOComplete((*it)->getCookie(), ENGINE_SUCCESS);
1160             tp->setNotifySent(true);
1161         }
1162     }
1163
1164     while (!release.empty()) {
1165         connection_t conn = release.front();
1166         conn->releaseReference();
1167         release.pop_front();
1168         removeVBConnections(conn);
1169     }
1170 }
1171
1172 void DcpConnMap::removeVBConnections(connection_t &conn) {
1173     Producer *tp = dynamic_cast<Producer*>(conn.get());
1174     if (!tp) {
1175         return;
1176     }
1177
1178     DcpProducer *prod = static_cast<DcpProducer*>(tp);
1179     std::list<uint16_t> vblist = prod->getVBList();
1180     std::list<uint16_t>::iterator it = vblist.begin();
1181     for (; it != vblist.end(); ++it) {
1182         uint16_t vbid = *it;
1183         size_t lock_num = vbid % vbConnLockNum;
1184         SpinLockHolder lh (&vbConnLocks[lock_num]);
1185         std::list<connection_t> &vb_conns = vbConns[vbid];
1186         std::list<connection_t>::iterator itr = vb_conns.begin();
1187         for (; itr != vb_conns.end(); ++itr) {
1188             if (conn->getCookie() == (*itr)->getCookie()) {
1189                 vb_conns.erase(itr);
1190                 break;
1191             }
1192         }
1193     }
1194 }
1195
1196 void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
1197 {
1198     size_t lock_num = vbid % vbConnLockNum;
1199     SpinLockHolder lh(&vbConnLocks[lock_num]);
1200
1201     std::list<connection_t> &conns = vbConns[vbid];
1202     std::list<connection_t>::iterator it = conns.begin();
1203     for (; it != conns.end(); ++it) {
1204         DcpProducer *conn = static_cast<DcpProducer*>((*it).get());
1205         conn->notifySeqnoAvailable(vbid, bySeqno);
1206     }
1207 }
1208
1209 void DcpConnMap::addStats(ADD_STAT add_stat, const void *c) {
1210     LockHolder lh(connsLock);
1211     add_casted_stat("ep_dcp_dead_conn_count", deadConnections.size(),
1212                     add_stat, c);
1213 }