Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / connmap.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <algorithm>
21 #include <limits>
22 #include <queue>
23 #include <set>
24 #include <string>
25 #include <vector>
26
27 #include "ep_engine.h"
28 #include "executorthread.h"
29 #include "tapconnection.h"
30 #include "connmap.h"
31 #include "dcp-backfill-manager.h"
32 #include "dcp-consumer.h"
33 #include "dcp-producer.h"
34
35 size_t ConnMap::vbConnLockNum = 32;
36 const double ConnNotifier::DEFAULT_MIN_STIME = 1.0;
37 const uint32_t DcpConnMap::dbFileMem = 10 * 1024;
38 const uint16_t DcpConnMap::numBackfillsThreshold = 4096;
39 const uint8_t DcpConnMap::numBackfillsMemThreshold = 1;
40
41 /**
42  * NonIO task to free the resource of a tap connection.
43  */
44 class ConnectionReaperCallback : public GlobalTask {
45 public:
46     ConnectionReaperCallback(EventuallyPersistentEngine &e, ConnMap& cm,
47                              connection_t &conn)
48         : GlobalTask(&e, TaskId::ConnectionReaperCallback),
49           connMap(cm), connection(conn) {
50     }
51
52     bool run(void) {
53         TapProducer *tp = dynamic_cast<TapProducer*>(connection.get());
54         if (tp) {
55             tp->clearQueues();
56             connMap.removeVBConnections(connection);
57         }
58         return false;
59     }
60
61     std::string getDescription() {
62         return "Reaping tap or dcp connection: " + connection->getName();
63     }
64
65 private:
66     ConnMap &connMap;
67     connection_t connection;
68 };
69
70 /**
71  * A Callback task for Tap connection notifier
72  */
73 class ConnNotifierCallback : public GlobalTask {
74 public:
75     ConnNotifierCallback(EventuallyPersistentEngine *e, ConnNotifier *notifier)
76     : GlobalTask(e, TaskId::ConnNotifierCallback),
77       connNotifier(notifier) { }
78
79     bool run(void) {
80         return connNotifier->notifyConnections();
81     }
82
83     std::string getDescription() {
84         if (connNotifier->getNotifierType() == TAP_CONN_NOTIFIER) {
85             return std::string("TAP connection notifier");
86         } else {
87             return std::string("DCP connection notifier");
88         }
89     }
90
91 private:
92     ConnNotifier *connNotifier;
93 };
94
95 void ConnNotifier::start() {
96     bool inverse = false;
97     pendingNotification.compare_exchange_strong(inverse, true);
98     ExTask connotifyTask = new ConnNotifierCallback(&connMap.getEngine(), this);
99     task = ExecutorPool::get()->schedule(connotifyTask, NONIO_TASK_IDX);
100     cb_assert(task);
101 }
102
103 void ConnNotifier::stop() {
104     bool inverse = true;
105     pendingNotification.compare_exchange_strong(inverse, false);
106     ExecutorPool::get()->cancel(task);
107 }
108
109 void ConnNotifier::notifyMutationEvent(void) {
110     bool inverse = false;
111     if (pendingNotification.compare_exchange_strong(inverse, true)) {
112         cb_assert(task > 0);
113         ExecutorPool::get()->wake(task);
114     }
115 }
116
117 void ConnNotifier::wake() {
118     ExecutorPool::get()->wake(task);
119 }
120
121 bool ConnNotifier::notifyConnections() {
122     bool inverse = true;
123     pendingNotification.compare_exchange_strong(inverse, false);
124     connMap.notifyAllPausedConnections();
125
126     if (!pendingNotification.load()) {
127         ExecutorPool::get()->snooze(task, DEFAULT_MIN_STIME);
128         if (pendingNotification.load()) {
129             // Check again if a new notification is arrived right before
130             // calling snooze() above.
131             ExecutorPool::get()->snooze(task, 0);
132         }
133     }
134
135     return true;
136 }
137
138 /**
139  * A task to manage connections.
140  */
141 class ConnManager : public GlobalTask {
142 public:
143     ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
144         : GlobalTask(e, TaskId::ConnManager, MIN_SLEEP_TIME, false),
145           engine(e), connmap(cmap) { }
146
147     bool run(void) {
148         if (engine->getEpStats().isShutdown) {
149             return false;
150         }
151         connmap->manageConnections();
152         snooze(MIN_SLEEP_TIME);
153         return true;
154     }
155
156     std::string getDescription() {
157         return std::string("Connection Manager");
158     }
159
160 private:
161     EventuallyPersistentEngine *engine;
162     ConnMap *connmap;
163 };
164
165 class ConnMapValueChangeListener : public ValueChangedListener {
166 public:
167     ConnMapValueChangeListener(TapConnMap &tc)
168         : connmap_(tc) {
169     }
170
171     virtual void sizeValueChanged(const std::string &key, size_t value) {
172         if (key.compare("tap_noop_interval") == 0) {
173             connmap_.setNoopInterval(value);
174         }
175     }
176
177 private:
178     TapConnMap &connmap_;
179 };
180
181 ConnMap::ConnMap(EventuallyPersistentEngine &theEngine)
182     :  engine(theEngine) {
183
184     Configuration &config = engine.getConfiguration();
185     vbConnLocks = new SpinLock[vbConnLockNum];
186     size_t max_vbs = config.getMaxVbuckets();
187     for (size_t i = 0; i < max_vbs; ++i) {
188         vbConns.push_back(std::list<connection_t>());
189     }
190 }
191
192 void ConnMap::initialize(conn_notifier_type ntype) {
193     connNotifier_ = new ConnNotifier(ntype, *this);
194     connNotifier_->start();
195     ExTask connMgr = new ConnManager(&engine, this);
196     ExecutorPool::get()->schedule(connMgr, NONIO_TASK_IDX);
197 }
198
199 ConnMap::~ConnMap() {
200     delete [] vbConnLocks;
201     connNotifier_->stop();
202     delete connNotifier_;
203 }
204
205 connection_t ConnMap::findByName(const std::string &name) {
206     LockHolder lh(connsLock);
207     return findByName_UNLOCKED(name);
208 }
209
210 connection_t ConnMap::findByName_UNLOCKED(const std::string&name) {
211     connection_t rv(NULL);
212     std::list<connection_t>::iterator iter;
213     for (iter = all.begin(); iter != all.end(); ++iter) {
214         if ((*iter)->getName() == name) {
215             rv = *iter;
216         }
217     }
218     return rv;
219 }
220
221 void ConnMap::notifyPausedConnection(connection_t conn, bool schedule) {
222     if (engine.getEpStats().isShutdown) {
223         return;
224     }
225
226     Notifiable* tp = dynamic_cast<Notifiable*>(conn.get());
227     if (schedule) {
228         if (tp && tp->isPaused() && conn->isReserved() &&
229             tp->setNotificationScheduled(true)) {
230             pendingNotifications.push(conn);
231             connNotifier_->notifyMutationEvent(); // Wake up the connection notifier so that
232                                                   // it can notify the event to a given
233                                                   // paused connection.
234         }
235     } else {
236         LockHolder rlh(releaseLock);
237         if (tp && tp->isPaused() && conn->isReserved()) {
238             engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
239             tp->setNotifySent(true);
240         }
241     }
242 }
243
244 void ConnMap::notifyAllPausedConnections() {
245     std::queue<connection_t> queue;
246     pendingNotifications.getAll(queue);
247
248     LockHolder rlh(releaseLock);
249     while (!queue.empty()) {
250         connection_t &conn = queue.front();
251         Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
252         if (tp) {
253             tp->setNotificationScheduled(false);
254             if (tp->isPaused() && conn->isReserved()) {
255                 engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
256                 tp->setNotifySent(true);
257             }
258         }
259         queue.pop();
260     }
261 }
262
263 bool ConnMap::notificationQueueEmpty() {
264     return pendingNotifications.empty();
265 }
266
267 void ConnMap::updateVBConnections(connection_t &conn,
268                                         const std::vector<uint16_t> &vbuckets)
269 {
270     Producer *tp = dynamic_cast<Producer*>(conn.get());
271     if (!tp) {
272         return;
273     }
274
275     VBucketFilter new_filter(vbuckets);
276     VBucketFilter diff = tp->getVBucketFilter().filter_diff(new_filter);
277     const std::set<uint16_t> &vset = diff.getVBSet();
278
279     for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
280         size_t lock_num = (*it) % vbConnLockNum;
281         SpinLockHolder lh (&vbConnLocks[lock_num]);
282         // Remove the connection that is no longer for a given vbucket
283         if (!tp->vbucketFilter.empty() && tp->vbucketFilter(*it)) {
284             std::list<connection_t> &vb_conns = vbConns[*it];
285             std::list<connection_t>::iterator itr = vb_conns.begin();
286             for (; itr != vb_conns.end(); ++itr) {
287                 if (conn->getCookie() == (*itr)->getCookie()) {
288                     vb_conns.erase(itr);
289                     break;
290                 }
291             }
292         } else { // Add the connection to the vbucket replicator list.
293             std::list<connection_t> &vb_conns = vbConns[*it];
294             vb_conns.push_back(conn);
295         }
296     }
297 }
298
299 void ConnMap::removeVBConnections(connection_t &conn) {
300     Producer *tp = dynamic_cast<Producer*>(conn.get());
301     if (!tp) {
302         return;
303     }
304
305     const std::set<uint16_t> &vset = tp->vbucketFilter.getVBSet();
306     for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
307         size_t lock_num = (*it) % vbConnLockNum;
308         SpinLockHolder lh (&vbConnLocks[lock_num]);
309         std::list<connection_t> &vb_conns = vbConns[*it];
310         std::list<connection_t>::iterator itr = vb_conns.begin();
311         for (; itr != vb_conns.end(); ++itr) {
312             if (conn->getCookie() == (*itr)->getCookie()) {
313                 vb_conns.erase(itr);
314                 break;
315             }
316         }
317     }
318 }
319
320 void ConnMap::addVBConnByVBId(connection_t &conn, int16_t vbid) {
321     if (!conn.get()) {
322         return;
323     }
324
325     size_t lock_num = vbid % vbConnLockNum;
326     SpinLockHolder lh (&vbConnLocks[lock_num]);
327     std::list<connection_t> &vb_conns = vbConns[vbid];
328     vb_conns.push_back(conn);
329 }
330
331 void ConnMap::removeVBConnByVBId_UNLOCKED(connection_t &conn, int16_t vbid) {
332     if (!conn.get()) {
333         return;
334     }
335
336     std::list<connection_t> &vb_conns = vbConns[vbid];
337     std::list<connection_t>::iterator itr = vb_conns.begin();
338     for (; itr != vb_conns.end(); ++itr) {
339         if (conn->getCookie() == (*itr)->getCookie()) {
340             vb_conns.erase(itr);
341             break;
342         }
343     }
344 }
345
346 void ConnMap::removeVBConnByVBId(connection_t &conn, int16_t vbid) {
347     size_t lock_num = vbid % vbConnLockNum;
348     SpinLockHolder lh (&vbConnLocks[lock_num]);
349     removeVBConnByVBId_UNLOCKED(conn, vbid);
350 }
351
352
353 TapConnMap::TapConnMap(EventuallyPersistentEngine &e)
354     : ConnMap(e), nextNoop_(0) {
355
356     Configuration &config = engine.getConfiguration();
357     noopInterval_ = config.getTapNoopInterval();
358     config.addValueChangedListener("tap_noop_interval",
359                                    new ConnMapValueChangeListener(*this));
360 }
361
362 TapConsumer *TapConnMap::newConsumer(const void* cookie)
363 {
364     LockHolder lh(connsLock);
365     TapConsumer *tc = new TapConsumer(engine, cookie, ConnHandler::getAnonName());
366     connection_t tap(tc);
367     LOG(EXTENSION_LOG_INFO, "%s created", tap->logHeader());
368     all.push_back(tap);
369     map_[cookie] = tap;
370     return tc;
371 }
372
373 TapProducer *TapConnMap::newProducer(const void* cookie,
374                                      const std::string &name,
375                                      uint32_t flags,
376                                      uint64_t backfillAge,
377                                      int tapKeepAlive,
378                                      const std::vector<uint16_t> &vbuckets,
379                                      const std::map<uint16_t, uint64_t> &lastCheckpointIds)
380 {
381     LockHolder lh(connsLock);
382     TapProducer *producer(NULL);
383
384     std::list<connection_t>::iterator iter;
385     for (iter = all.begin(); iter != all.end(); ++iter) {
386         producer = dynamic_cast<TapProducer*>((*iter).get());
387         if (producer && producer->getName() == name) {
388             producer->setExpiryTime((rel_time_t)-1);
389             producer->reconnected();
390             break;
391         }
392         else {
393             producer = NULL;
394         }
395     }
396
397     if (producer != NULL) {
398         const void *old_cookie = producer->getCookie();
399         cb_assert(old_cookie);
400         map_.erase(old_cookie);
401
402         if (tapKeepAlive == 0 || (producer->mayCompleteDumpOrTakeover() && producer->idle())) {
403             LOG(EXTENSION_LOG_INFO,
404                 "%s keep alive timed out, should be nuked", producer->logHeader());
405             producer->setName(ConnHandler::getAnonName());
406             producer->setDisconnect(true);
407             producer->setConnected(false);
408             producer->setPaused(true);
409             producer->setExpiryTime(ep_current_time() - 1);
410             producer = NULL;
411         }
412         else {
413             LOG(EXTENSION_LOG_INFO, "%s exists... grabbing the channel",
414                 producer->logHeader());
415             // Create the dummy expired producer connection for the old connection cookie.
416             // This dummy producer connection will be used for releasing the corresponding
417             // memcached connection.
418
419             // dliao: TODO no need to deal with tap or dcp separately here for the dummy?
420             TapProducer *n = new TapProducer(engine,
421                                              old_cookie,
422                                              ConnHandler::getAnonName(),
423                                              0);
424             n->setDisconnect(true);
425             n->setConnected(false);
426             n->setPaused(true);
427             n->setExpiryTime(ep_current_time() - 1);
428             all.push_back(connection_t(n));
429         }
430     }
431
432     bool reconnect = false;
433     if (producer == NULL) {
434         producer = new TapProducer(engine, cookie, name, flags);
435         LOG(EXTENSION_LOG_INFO, "%s created", producer->logHeader());
436         all.push_back(connection_t(producer));
437     } else {
438         producer->setCookie(cookie);
439         producer->setReserved(true);
440         producer->setConnected(true);
441         producer->setDisconnect(false);
442         reconnect = true;
443     }
444     producer->evaluateFlags();
445
446     connection_t conn(producer);
447     updateVBConnections(conn, vbuckets);
448
449     producer->setFlagByteorderSupport((flags & TAP_CONNECT_TAP_FIX_FLAG_BYTEORDER) != 0);
450     producer->setBackfillAge(backfillAge, reconnect);
451     producer->setVBucketFilter(vbuckets);
452     producer->registerCursor(lastCheckpointIds);
453
454     if (reconnect) {
455         producer->rollback();
456     }
457
458     map_[cookie] = conn;
459     engine.storeEngineSpecific(cookie, producer);
460     // Clear all previous session stats for this producer.
461     clearPrevSessionStats(producer->getName());
462
463     return producer;
464
465 }
466
467 void TapConnMap::manageConnections() {
468     // To avoid connections to be stucked in a bogus state forever, we're going
469     // to ping all connections that hasn't tried to walk the tap queue
470     // for this amount of time..
471     const int maxIdleTime = 5;
472
473     bool addNoop = false;
474
475     rel_time_t now = ep_current_time();
476     if (now > nextNoop_ && noopInterval_ != (size_t)-1) {
477         addNoop = true;
478         nextNoop_ = now + noopInterval_;
479     }
480
481     std::list<connection_t> deadClients;
482
483     LockHolder lh(connsLock);
484     // We should pause unless we purged some connections or
485     // all queues have items.
486     getExpiredConnections_UNLOCKED(deadClients);
487
488     // see if I have some channels that I have to signal..
489     std::map<const void*, connection_t>::iterator iter;
490     for (iter = map_.begin(); iter != map_.end(); ++iter) {
491         TapProducer *tp = dynamic_cast<TapProducer*>(iter->second.get());
492         if (tp != NULL) {
493             if (tp->supportsAck() && (tp->getExpiryTime() < now) && tp->windowIsFull()) {
494                 LOG(EXTENSION_LOG_WARNING,
495                     "%s Expired and ack windows is full. Disconnecting...",
496                     tp->logHeader());
497                 tp->setDisconnect(true);
498             } else if (addNoop) {
499                 tp->setTimeForNoop();
500             }
501         }
502     }
503
504     // Collect the list of connections that need to be signaled.
505     std::list<connection_t> toNotify;
506     for (iter = map_.begin(); iter != map_.end(); ++iter) {
507         TapProducer *tp = dynamic_cast<TapProducer*>(iter->second.get());
508         if (tp && (tp->isPaused() || tp->doDisconnect()) && !tp->isSuspended()
509             && tp->isReserved()) {
510             if (!tp->sentNotify() ||
511                 (tp->getLastWalkTime() + maxIdleTime < now)) {
512                 toNotify.push_back(iter->second);
513             }
514         }
515     }
516
517     lh.unlock();
518
519     LockHolder rlh(releaseLock);
520     std::list<connection_t>::iterator it;
521     for (it = toNotify.begin(); it != toNotify.end(); ++it) {
522         TapProducer *tp = dynamic_cast<TapProducer*>((*it).get());
523         if (tp && tp->isReserved()) {
524             engine.notifyIOComplete(tp->getCookie(), ENGINE_SUCCESS);
525             tp->setNotifySent(true);
526         }
527     }
528
529     // Delete all of the dead clients
530     std::list<connection_t>::iterator ii;
531     for (ii = deadClients.begin(); ii != deadClients.end(); ++ii) {
532         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
533         (*ii)->releaseReference();
534         TapProducer *tp = dynamic_cast<TapProducer*>((*ii).get());
535         if (tp) {
536             ExTask reapTask = new ConnectionReaperCallback(engine, *this, *ii);
537             ExecutorPool::get()->schedule(reapTask, NONIO_TASK_IDX);
538         }
539     }
540 }
541
542 void TapConnMap::notifyVBConnections(uint16_t vbid)
543 {
544     size_t lock_num = vbid % vbConnLockNum;
545     SpinLockHolder lh(&vbConnLocks[lock_num]);
546
547     std::list<connection_t> &conns = vbConns[vbid];
548     std::list<connection_t>::iterator it = conns.begin();
549     for (; it != conns.end(); ++it) {
550         Notifiable *conn = dynamic_cast<Notifiable*>((*it).get());
551         if (conn && conn->isPaused() && (*it)->isReserved() &&
552             conn->setNotificationScheduled(true)) {
553             pendingNotifications.push(*it);
554             connNotifier_->notifyMutationEvent();
555         }
556     }
557     lh.unlock();
558 }
559
560 bool TapConnMap::setEvents(const std::string &name, std::list<queued_item> *q) {
561     bool found(false);
562     LockHolder lh(connsLock);
563
564     connection_t tc = findByName_UNLOCKED(name);
565     if (tc.get()) {
566         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
567         cb_assert(tp);
568         found = true;
569         tp->appendQueue(q);
570         lh.unlock();
571         notifyPausedConnection(tp, false);
572     }
573
574     return found;
575 }
576
577 void TapConnMap::incrBackfillRemaining(const std::string &name,
578                                        size_t num_backfill_items) {
579     LockHolder lh(connsLock);
580
581     connection_t tc = findByName_UNLOCKED(name);
582     if (tc.get()) {
583         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
584         cb_assert(tp);
585         tp->incrBackfillRemaining(num_backfill_items);
586     }
587 }
588
589 ssize_t TapConnMap::backfillQueueDepth(const std::string &name) {
590     ssize_t rv(-1);
591     LockHolder lh(connsLock);
592
593     connection_t tc = findByName_UNLOCKED(name);
594     if (tc.get()) {
595         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
596         cb_assert(tp);
597         rv = tp->getBackfillQueueSize();
598     }
599
600     return rv;
601 }
602
603 void TapConnMap::resetReplicaChain() {
604     LockHolder lh(connsLock);
605     rel_time_t now = ep_current_time();
606     std::list<connection_t>::iterator it = all.begin();
607     for (; it != all.end(); ++it) {
608         connection_t &tc = *it;
609         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
610         if (!(tp && (tp->isConnected() || tp->getExpiryTime() > now))) {
611             continue;
612         }
613         LOG(EXTENSION_LOG_INFO, "%s Reset the replication chain",
614             tp->logHeader());
615         // Get the list of vbuckets that each TAP producer is replicating
616         VBucketFilter vbfilter = tp->getVBucketFilter();
617         std::vector<uint16_t> vblist (vbfilter.getVBSet().begin(), vbfilter.getVBSet().end());
618         // TAP producer sends INITIAL_VBUCKET_STREAM messages to the destination to reset
619         // replica vbuckets, and then backfills items to the destination.
620         tp->scheduleBackfill(vblist);
621         notifyPausedConnection(tp, true);
622     }
623 }
624
625 bool TapConnMap::isBackfillCompleted(std::string &name) {
626     LockHolder lh(connsLock);
627     connection_t tc = findByName_UNLOCKED(name);
628     if (tc.get()) {
629         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
630         if (tp) {
631             return tp->isBackfillCompleted();
632         }
633     }
634     return false;
635 }
636
637 void TapConnMap::addFlushEvent() {
638     LockHolder lh(connsLock);
639     std::list<connection_t>::iterator iter;
640     for (iter = all.begin(); iter != all.end(); iter++) {
641         TapProducer *tp = dynamic_cast<TapProducer*>((*iter).get());
642         if (tp && !tp->dumpQueue) {
643             tp->flush();
644         }
645     }
646 }
647
648 void TapConnMap::scheduleBackfill(const std::set<uint16_t> &backfillVBuckets) {
649     LockHolder lh(connsLock);
650     rel_time_t now = ep_current_time();
651     std::list<connection_t>::iterator it = all.begin();
652     for (; it != all.end(); ++it) {
653         connection_t &tc = *it;
654         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
655         if (!(tp && (tp->isConnected() || tp->getExpiryTime() > now))) {
656             continue;
657         }
658
659         std::vector<uint16_t> vblist;
660         std::set<uint16_t>::const_iterator vb_it = backfillVBuckets.begin();
661         for (; vb_it != backfillVBuckets.end(); ++vb_it) {
662             if (tp->checkVBucketFilter(*vb_it)) {
663                 vblist.push_back(*vb_it);
664             }
665         }
666         if (!vblist.empty()) {
667             tp->scheduleBackfill(vblist);
668             notifyPausedConnection(tp, true);
669         }
670     }
671 }
672
673 void TapConnMap::loadPrevSessionStats(const std::map<std::string, std::string> &session_stats) {
674     LockHolder lh(connsLock);
675     std::map<std::string, std::string>::const_iterator it =
676         session_stats.find("ep_force_shutdown");
677
678     if (it != session_stats.end()) {
679         if (it->second.compare("true") == 0) {
680             prevSessionStats.normalShutdown = false;
681         }
682     } else if (!session_stats.empty()) { // possible crash on the previous session.
683         prevSessionStats.normalShutdown = false;
684     }
685
686     std::string tap_prefix("eq_tapq:");
687     for (it = session_stats.begin(); it != session_stats.end(); ++it) {
688         const std::string &stat_name = it->first;
689         if (stat_name.substr(0, 8).compare(tap_prefix) == 0) {
690             if (stat_name.find("backfill_completed") != std::string::npos ||
691                 stat_name.find("idle") != std::string::npos) {
692                 prevSessionStats.stats[stat_name] = it->second;
693             }
694         }
695     }
696 }
697
698 bool TapConnMap::changeVBucketFilter(const std::string &name,
699                                      const std::vector<uint16_t> &vbuckets,
700                                      const std::map<uint16_t, uint64_t> &checkpoints) {
701     bool rv = false;
702     LockHolder lh(connsLock);
703     connection_t tc = findByName_UNLOCKED(name);
704     if (tc.get()) {
705         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
706         if (tp && (tp->isConnected() || tp->getExpiryTime() > ep_current_time())) {
707             LOG(EXTENSION_LOG_INFO, "%s Change the vbucket filter",
708                 tp->logHeader());
709             updateVBConnections(tc, vbuckets);
710             tp->setVBucketFilter(vbuckets, true);
711             tp->registerCursor(checkpoints);
712             rv = true;
713             lh.unlock();
714             notifyPausedConnection(tp, true);
715         }
716     }
717     return rv;
718 }
719
720 bool TapConnMap::checkConnectivity(const std::string &name) {
721     LockHolder lh(connsLock);
722     rel_time_t now = ep_current_time();
723     connection_t tc = findByName_UNLOCKED(name);
724     if (tc.get()) {
725         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
726         if (tp && (tp->isConnected() || tp->getExpiryTime() > now)) {
727             return true;
728         }
729     }
730     return false;
731 }
732
733 bool TapConnMap::closeConnectionByName(const std::string &name) {
734
735     LockHolder lh(connsLock);
736     return closeConnectionByName_UNLOCKED(name);
737 }
738
739 bool TapConnMap::mapped(connection_t &tc) {
740     bool rv = false;
741     std::map<const void*, connection_t>::iterator it;
742     for (it = map_.begin(); it != map_.end(); ++it) {
743         if (it->second.get() == tc.get()) {
744             rv = true;
745             break;
746         }
747     }
748     return rv;
749 }
750
751 void TapConnMap::shutdownAllConnections() {
752     LOG(EXTENSION_LOG_WARNING, "Shutting down tap connections!");
753
754     connNotifier_->stop();
755
756     // Not safe to acquire both connsLock and releaseLock at the same time
757     // (can trigger deadlock), so first acquire releaseLock to release all
758     // the connections (without changing the list/map), then drop releaseLock,
759     // acquire connsLock and actually clear out the list/map.
760     LockHolder rlh(releaseLock);
761     std::list<connection_t>::iterator ii;
762     for (ii = all.begin(); ii != all.end(); ++ii) {
763         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
764         (*ii)->releaseReference();
765         TapProducer *tp = dynamic_cast<TapProducer*>((*ii).get());
766         if (tp) {
767             tp->clearQueues();
768         }
769     }
770     rlh.unlock();
771
772     LockHolder lh(connsLock);
773     all.clear();
774     map_.clear();
775 }
776
777 void TapConnMap::disconnect(const void *cookie) {
778     LockHolder lh(connsLock);
779
780     Configuration& config = engine.getConfiguration();
781     int tapKeepAlive = static_cast<int>(config.getTapKeepalive());
782     std::map<const void*, connection_t>::iterator iter(map_.find(cookie));
783     if (iter != map_.end()) {
784         if (iter->second.get()) {
785             rel_time_t now = ep_current_time();
786             TapConsumer *tc = dynamic_cast<TapConsumer*>(iter->second.get());
787             if (tc || iter->second->doDisconnect()) {
788                 iter->second->setExpiryTime(now - 1);
789                 LOG(EXTENSION_LOG_WARNING, "%s disconnected",
790                     iter->second->logHeader());
791             }
792             else { // must be producer
793                 iter->second->setExpiryTime(now + tapKeepAlive);
794                 LOG(EXTENSION_LOG_WARNING,
795                     "%s disconnected, keep alive for %d seconds",
796                     iter->second->logHeader(), tapKeepAlive);
797             }
798             iter->second->setConnected(false);
799         }
800         else {
801             LOG(EXTENSION_LOG_WARNING,
802                 "Found half-linked tap connection at: %p", cookie);
803         }
804         map_.erase(iter);
805     }
806 }
807
808 bool TapConnMap::closeConnectionByName_UNLOCKED(const std::string &name) {
809     bool rv = false;
810     connection_t tc = findByName_UNLOCKED(name);
811     if (tc.get()) {
812         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
813         if (tp) {
814             LOG(EXTENSION_LOG_WARNING, "%s Connection is closed by force",
815                 tp->logHeader());
816             removeTapCursors_UNLOCKED(tp);
817
818             tp->setExpiryTime(ep_current_time() - 1);
819             tp->setName(ConnHandler::getAnonName());
820             tp->setDisconnect(true);
821             tp->setPaused(true);
822             rv = true;
823         }
824     }
825     return rv;
826 }
827
828 void TapConnMap::getExpiredConnections_UNLOCKED(std::list<connection_t> &deadClients) {
829     rel_time_t now = ep_current_time();
830
831     std::list<connection_t>::iterator iter;
832     for (iter = all.begin(); iter != all.end();) {
833         connection_t &tc = *iter;
834         if (tc->isConnected()) {
835             ++iter;
836             continue;
837         }
838
839         TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
840
841         bool is_dead = false;
842         if (tc->getExpiryTime() <= now && !mapped(tc)) {
843             if (tp) {
844                 if (!tp->isSuspended()) {
845                     deadClients.push_back(tc);
846                     removeTapCursors_UNLOCKED(tp);
847                     iter = all.erase(iter);
848                     is_dead = true;
849                 }
850             } else {
851                 deadClients.push_back(tc);
852                 iter = all.erase(iter);
853                 is_dead = true;
854             }
855         }
856
857         if (!is_dead) {
858             ++iter;
859         }
860     }
861 }
862
863 void TapConnMap::removeTapCursors_UNLOCKED(TapProducer *tp) {
864     // Remove all the checkpoint cursors belonging to the TAP connection.
865     if (tp) {
866         const VBucketMap &vbuckets = engine.getEpStore()->getVBuckets();
867         size_t numOfVBuckets = vbuckets.getSize();
868         // Remove all the cursors belonging to the TAP connection to be purged.
869         for (size_t i = 0; i < numOfVBuckets; ++i) {
870             cb_assert(i <= std::numeric_limits<uint16_t>::max());
871             uint16_t vbid = static_cast<uint16_t>(i);
872             RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
873             if (!vb) {
874                 continue;
875             }
876             if (tp->vbucketFilter(vbid)) {
877                 LOG(EXTENSION_LOG_INFO,
878                     "%s Remove the TAP cursor from vbucket %d",
879                     tp->logHeader(), vbid);
880                 vb->checkpointManager.removeCursor(tp->getName());
881             }
882         }
883     }
884 }
885
886 void CompleteBackfillTapOperation::perform(TapProducer *tc, void *) {
887     tc->completeBackfill();
888 }
889
890 void CompleteDiskBackfillTapOperation::perform(TapProducer *tc, void *) {
891     tc->completeDiskBackfill();
892 }
893
894 void ScheduleDiskBackfillTapOperation::perform(TapProducer *tc, void *) {
895     tc->scheduleDiskBackfill();
896 }
897
898 void CompletedBGFetchTapOperation::perform(TapProducer *tc, Item *arg) {
899     if (connToken != tc->getConnectionToken() && !tc->isReconnected()) {
900         delete arg;
901         return;
902     }
903     tc->completeBGFetchJob(arg, vbid, implicitEnqueue);
904 }
905
906 bool TAPSessionStats::wasReplicationCompleted(const std::string &name) const {
907     bool rv = true;
908
909     std::string backfill_stat(name + ":backfill_completed");
910     std::map<std::string, std::string>::const_iterator it = stats.find(backfill_stat);
911     if (it != stats.end() && (it->second == "false" || !normalShutdown)) {
912         rv = false;
913     }
914     std::string idle_stat(name + ":idle");
915     it = stats.find(idle_stat);
916     if (it != stats.end() && (it->second == "false" || !normalShutdown)) {
917         rv = false;
918     }
919
920     return rv;
921 }
922
923 void TAPSessionStats::clearStats(const std::string &name) {
924     std::string backfill_stat(name + ":backfill_completed");
925     stats.erase(backfill_stat);
926     std::string idle_stat(name + ":idle");
927     stats.erase(idle_stat);
928 }
929
930 DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
931     : ConnMap(e),
932       aggrDcpConsumerBufferSize(0) {
933     numActiveSnoozingBackfills = 0;
934     updateMaxActiveSnoozingBackfills(engine.getEpStats().getMaxDataSize());
935
936     // Note: these allocations are deleted by ~Configuration
937     engine.getConfiguration().
938         addValueChangedListener("dcp_consumer_process_buffered_messages_yield_limit",
939                                 new DcpConfigChangeListener(*this));
940     engine.getConfiguration().
941         addValueChangedListener("dcp_consumer_process_buffered_messages_batch_size",
942                                 new DcpConfigChangeListener(*this));
943 }
944
945 DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
946                                      const std::string &name)
947 {
948     LockHolder lh(connsLock);
949
950     std::string conn_name("eq_dcpq:");
951     conn_name.append(name);
952
953     std::list<connection_t>::iterator iter;
954     for (iter = all.begin(); iter != all.end(); ++iter) {
955         if ((*iter)->getName() == conn_name) {
956             (*iter)->setDisconnect(true);
957             all.erase(iter);
958             break;
959         }
960     }
961
962     DcpConsumer *dcp = new DcpConsumer(engine, cookie, conn_name);
963     connection_t dc(dcp);
964     LOG(EXTENSION_LOG_INFO, "%s Connection created", dc->logHeader());
965     all.push_back(dc);
966     map_[cookie] = dc;
967     return dcp;
968
969 }
970
971 bool DcpConnMap::isPassiveStreamConnected_UNLOCKED(uint16_t vbucket) {
972     std::list<connection_t>::iterator it;
973     for(it = all.begin(); it != all.end(); it++) {
974         DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it->get());
975         if (dcpConsumer && dcpConsumer->isStreamPresent(vbucket)) {
976                 LOG(EXTENSION_LOG_DEBUG, "(vb %d) A DCP passive stream "
977                     "is already exists for the vbucket in connection: %s",
978                     vbucket, dcpConsumer->logHeader());
979                 return true;
980         }
981     }
982     return false;
983 }
984
985 ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler* conn,
986                                                uint32_t opaque,
987                                                uint16_t vbucket,
988                                                uint32_t flags)
989 {
990     cb_assert(conn);
991
992     LockHolder lh(connsLock);
993     /* Check if a stream (passive) for the vbucket is already present */
994     if (isPassiveStreamConnected_UNLOCKED(vbucket)) {
995         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Failing to add passive stream, "
996             "as one already exists for the vbucket!",
997             conn->logHeader(), vbucket);
998         return ENGINE_KEY_EEXISTS;
999     }
1000
1001     return conn->addStream(opaque, vbucket, flags);
1002 }
1003
1004
1005 DcpConnMap::DcpConfigChangeListener::DcpConfigChangeListener(DcpConnMap& connMap)
1006     : myConnMap(connMap){}
1007
1008 void DcpConnMap::DcpConfigChangeListener::sizeValueChanged(const std::string &key,
1009                                                            size_t value) {
1010     if (key == "dcp_consumer_process_buffered_messages_yield_limit") {
1011         myConnMap.consumerYieldConfigChanged(value);
1012     } else if (key == "dcp_consumer_process_buffered_messages_batch_size") {
1013         myConnMap.consumerBatchSizeConfigChanged(value);
1014     }
1015 }
1016
1017 /*
1018  * Find all DcpConsumers and set the yield threshold
1019  */
1020 void DcpConnMap::consumerYieldConfigChanged(size_t newValue) {
1021     LockHolder lh(connsLock);
1022     for (auto it : all) {
1023         DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it.get());
1024         if (dcpConsumer) {
1025             dcpConsumer->setProcessorYieldThreshold(newValue);
1026         }
1027     }
1028 }
1029
1030 /*
1031  * Find all DcpConsumers and set the processor batchsize
1032  */
1033 void DcpConnMap::consumerBatchSizeConfigChanged(size_t newValue) {
1034     LockHolder lh(connsLock);
1035     for (auto it : all) {
1036         DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it.get());
1037         if (dcpConsumer) {
1038             dcpConsumer->setProcessBufferedMessagesBatchSize(newValue);
1039         }
1040     }
1041 }
1042
1043 DcpProducer *DcpConnMap::newProducer(const void* cookie,
1044                                      const std::string &name,
1045                                      bool notifyOnly)
1046 {
1047     LockHolder lh(connsLock);
1048
1049     std::string conn_name("eq_dcpq:");
1050     conn_name.append(name);
1051
1052     std::list<connection_t>::iterator iter;
1053     for (iter = all.begin(); iter != all.end(); ++iter) {
1054         if ((*iter)->getName() == conn_name) {
1055             (*iter)->setDisconnect(true);
1056             all.erase(iter);
1057             break;
1058         }
1059     }
1060
1061     DcpProducer *dcp = new DcpProducer(engine, cookie, conn_name, notifyOnly);
1062     LOG(EXTENSION_LOG_INFO, "%s Connection created", dcp->logHeader());
1063     all.push_back(connection_t(dcp));
1064     map_[cookie] = dcp;
1065
1066     return dcp;
1067 }
1068
1069 void DcpConnMap::shutdownAllConnections() {
1070     LOG(EXTENSION_LOG_WARNING, "Shutting down dcp connections!");
1071
1072     connNotifier_->stop();
1073
1074     // Not safe to acquire both connsLock and releaseLock at the same time
1075     // (can trigger deadlock), so first acquire releaseLock to release all
1076     // the connections (without changing the list/map), then drop releaseLock,
1077     // acquire connsLock and actually clear out the list/map.
1078     LockHolder rlh(releaseLock);
1079     std::list<connection_t>::iterator ii;
1080     for (ii = all.begin(); ii != all.end(); ++ii) {
1081         LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
1082         (*ii)->releaseReference();
1083     }
1084     rlh.unlock();
1085
1086     LockHolder lh(connsLock);
1087     closeAllStreams_UNLOCKED();
1088     all.clear();
1089     map_.clear();
1090 }
1091
1092 void DcpConnMap::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
1093     LockHolder lh(connsLock);
1094     std::map<const void*, connection_t>::iterator itr = map_.begin();
1095     for (; itr != map_.end(); ++itr) {
1096         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1097         if (producer) {
1098             producer->vbucketStateChanged(vbucket, state);
1099         }
1100     }
1101 }
1102
1103 void DcpConnMap::closeAllStreams_UNLOCKED() {
1104     std::map<const void*, connection_t>::iterator itr = map_.begin();
1105     for (; itr != map_.end(); ++itr) {
1106         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1107         if (producer) {
1108             producer->closeAllStreams();
1109             producer->clearCheckpointProcessorTaskQueues();
1110         } else {
1111             static_cast<DcpConsumer*>(itr->second.get())->closeAllStreams();
1112         }
1113     }
1114 }
1115
1116 void DcpConnMap::disconnect(const void *cookie) {
1117     // Move the connection matching this cookie from the `all` and map_
1118     // data structures (under connsLock).
1119     connection_t conn;
1120     {
1121         LockHolder lh(connsLock);
1122         std::list<connection_t>::iterator iter;
1123         for (iter = all.begin(); iter != all.end(); ++iter) {
1124             if ((*iter)->getCookie() == cookie) {
1125                 (*iter)->setDisconnect(true);
1126                 all.erase(iter);
1127                 break;
1128             }
1129         }
1130         std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
1131         if (itr != map_.end()) {
1132             conn = itr->second;
1133             if (conn.get()) {
1134                 LOG(EXTENSION_LOG_INFO, "%s Removing connection",
1135                     conn->logHeader());
1136                 map_.erase(itr);
1137             }
1138         }
1139     }
1140
1141     // Note we shutdown the stream *not* under the connsLock; this is
1142     // because as part of closing a DcpConsumer stream we need to
1143     // acquire PassiveStream::buffer.bufMutex; and that could deadlock
1144     // in EventuallyPersistentStore::setVBucketState, via
1145     // PassiveStream::processBufferedMessages.
1146     if (conn) {
1147         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
1148         if (producer) {
1149             producer->closeAllStreams();
1150             producer->clearCheckpointProcessorTaskQueues();
1151         } else {
1152             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
1153         }
1154     }
1155
1156     // Finished disconnecting the stream; add it to the
1157     // deadConnections list.
1158     if (conn) {
1159         LockHolder lh(connsLock);
1160         deadConnections.push_back(conn);
1161     }
1162 }
1163
1164 void DcpConnMap::manageConnections() {
1165     std::list<connection_t> release;
1166
1167     LockHolder lh(connsLock);
1168     while (!deadConnections.empty()) {
1169         connection_t conn = deadConnections.front();
1170         release.push_back(conn);
1171         deadConnections.pop_front();
1172     }
1173
1174     const int maxIdleTime = 5;
1175     rel_time_t now = ep_current_time();
1176
1177     // Collect the list of connections that need to be signaled.
1178     std::list<connection_t> toNotify;
1179     std::map<const void*, connection_t>::iterator iter;
1180     for (iter = map_.begin(); iter != map_.end(); ++iter) {
1181         connection_t conn = iter->second;
1182         Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
1183         if (tp && (tp->isPaused() || conn->doDisconnect()) &&
1184             conn->isReserved()) {
1185             if (!tp->sentNotify() ||
1186                 (conn->getLastWalkTime() + maxIdleTime < now)) {
1187                 toNotify.push_back(iter->second);
1188             }
1189         }
1190     }
1191
1192     lh.unlock();
1193
1194     LockHolder rlh(releaseLock);
1195     std::list<connection_t>::iterator it;
1196     for (it = toNotify.begin(); it != toNotify.end(); ++it) {
1197         Notifiable *tp =
1198             static_cast<Notifiable*>(static_cast<Producer*>((*it).get()));
1199         if (tp && (*it)->isReserved()) {
1200             engine.notifyIOComplete((*it)->getCookie(), ENGINE_SUCCESS);
1201             tp->setNotifySent(true);
1202         }
1203     }
1204
1205     while (!release.empty()) {
1206         connection_t conn = release.front();
1207         conn->releaseReference();
1208         release.pop_front();
1209         removeVBConnections(conn);
1210     }
1211 }
1212
1213 void DcpConnMap::removeVBConnections(connection_t &conn) {
1214     Producer *tp = dynamic_cast<Producer*>(conn.get());
1215     if (!tp) {
1216         return;
1217     }
1218
1219     DcpProducer *prod = static_cast<DcpProducer*>(tp);
1220     std::list<uint16_t> vblist = prod->getVBList();
1221     std::list<uint16_t>::iterator it = vblist.begin();
1222     for (; it != vblist.end(); ++it) {
1223         uint16_t vbid = *it;
1224         size_t lock_num = vbid % vbConnLockNum;
1225         SpinLockHolder lh (&vbConnLocks[lock_num]);
1226         std::list<connection_t> &vb_conns = vbConns[vbid];
1227         std::list<connection_t>::iterator itr = vb_conns.begin();
1228         for (; itr != vb_conns.end(); ++itr) {
1229             if (conn->getCookie() == (*itr)->getCookie()) {
1230                 vb_conns.erase(itr);
1231                 break;
1232             }
1233         }
1234     }
1235 }
1236
1237 void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno) {
1238     size_t lock_num = vbid % vbConnLockNum;
1239     SpinLockHolder lh(&vbConnLocks[lock_num]);
1240
1241     std::list<connection_t> &conns = vbConns[vbid];
1242     std::list<connection_t>::iterator it = conns.begin();
1243     for (; it != conns.end(); ++it) {
1244         DcpProducer *conn = static_cast<DcpProducer*>((*it).get());
1245         conn->notifySeqnoAvailable(vbid, bySeqno);
1246     }
1247 }
1248
1249 void DcpConnMap::notifyBackfillManagerTasks() {
1250     LockHolder lh(connsLock);
1251     std::map<const void*, connection_t>::iterator itr = map_.begin();
1252     for (; itr != map_.end(); ++itr) {
1253         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1254         if (producer) {
1255             producer->getBackfillManager()->wakeUpTask();
1256         }
1257     }
1258 }
1259
1260 bool DcpConnMap::canAddBackfillToActiveQ()
1261 {
1262     SpinLockHolder lh(&numBackfillsLock);
1263     if (numActiveSnoozingBackfills < maxActiveSnoozingBackfills) {
1264         ++numActiveSnoozingBackfills;
1265         return true;
1266     }
1267     return false;
1268 }
1269
1270 void DcpConnMap::decrNumActiveSnoozingBackfills()
1271 {
1272     SpinLockHolder lh(&numBackfillsLock);
1273     if (numActiveSnoozingBackfills > 0) {
1274         --numActiveSnoozingBackfills;
1275     } else {
1276         LOG(EXTENSION_LOG_WARNING, "ActiveSnoozingBackfills already zero!!!");
1277     }
1278 }
1279
1280 void DcpConnMap::updateMaxActiveSnoozingBackfills(size_t maxDataSize)
1281 {
1282     double numBackfillsMemThresholdPercent =
1283                          static_cast<double>(numBackfillsMemThreshold)/100;
1284     size_t max = maxDataSize * numBackfillsMemThresholdPercent / dbFileMem;
1285     /* We must have atleast one active/snoozing backfill */
1286     SpinLockHolder lh(&numBackfillsLock);
1287     maxActiveSnoozingBackfills =
1288         std::max(static_cast<size_t>(1),
1289                  std::min(max, static_cast<size_t>(numBackfillsThreshold)));
1290     LOG(EXTENSION_LOG_DEBUG, "Max active snoozing backfills set to %d",
1291         maxActiveSnoozingBackfills);
1292 }