1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2010 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 #include "ep_engine.h"
28 #include "executorthread.h"
29 #include "tapconnection.h"
31 #include "dcp-consumer.h"
32 #include "dcp-producer.h"
34 size_t ConnMap::vbConnLockNum = 32;
35 const double ConnNotifier::DEFAULT_MIN_STIME = 1.0;
38 * NonIO task to free the resource of a tap connection.
40 class ConnectionReaperCallback : public GlobalTask {
42 ConnectionReaperCallback(EventuallyPersistentEngine &e, ConnMap& cm,
44 : GlobalTask(&e, Priority::TapConnectionReaperPriority),
45 connMap(cm), connection(conn) {
47 ss << "Reaping tap or dcp connection: " << connection->getName();
52 TapProducer *tp = dynamic_cast<TapProducer*>(connection.get());
55 connMap.removeVBConnections(connection);
60 std::string getDescription() {
66 connection_t connection;
71 * A Callback task for Tap connection notifier
73 class ConnNotifierCallback : public GlobalTask {
75 ConnNotifierCallback(EventuallyPersistentEngine *e, ConnNotifier *notifier)
76 : GlobalTask(e, Priority::TapConnNotificationPriority),
77 connNotifier(notifier) { }
80 return connNotifier->notifyConnections();
83 std::string getDescription() {
84 if (connNotifier->getNotifierType() == TAP_CONN_NOTIFIER) {
85 return std::string("TAP connection notifier");
87 return std::string("DCP connection notifier");
92 ConnNotifier *connNotifier;
95 void ConnNotifier::start() {
97 pendingNotification.compare_exchange_strong(inverse, true);
98 ExTask connotifyTask = new ConnNotifierCallback(&connMap.getEngine(), this);
99 task = ExecutorPool::get()->schedule(connotifyTask, NONIO_TASK_IDX);
103 void ConnNotifier::stop() {
105 pendingNotification.compare_exchange_strong(inverse, false);
106 ExecutorPool::get()->cancel(task);
109 void ConnNotifier::notifyMutationEvent(void) {
110 bool inverse = false;
111 if (pendingNotification.compare_exchange_strong(inverse, true)) {
113 ExecutorPool::get()->wake(task);
117 void ConnNotifier::wake() {
118 ExecutorPool::get()->wake(task);
121 bool ConnNotifier::notifyConnections() {
123 pendingNotification.compare_exchange_strong(inverse, false);
124 connMap.notifyAllPausedConnections();
126 if (!pendingNotification.load()) {
127 ExecutorPool::get()->snooze(task, DEFAULT_MIN_STIME);
128 if (pendingNotification.load()) {
129 // Check again if a new notification is arrived right before
130 // calling snooze() above.
131 ExecutorPool::get()->snooze(task, 0);
139 * A task to manage connections.
141 class ConnManager : public GlobalTask {
143 ConnManager(EventuallyPersistentEngine *e, ConnMap *cmap)
144 : GlobalTask(e, Priority::TapConnMgrPriority, MIN_SLEEP_TIME, false),
145 engine(e), connmap(cmap) { }
148 if (engine->getEpStats().isShutdown) {
151 connmap->manageConnections();
152 snooze(MIN_SLEEP_TIME);
156 std::string getDescription() {
157 return std::string("Connection Manager");
161 EventuallyPersistentEngine *engine;
165 class ConnMapValueChangeListener : public ValueChangedListener {
167 ConnMapValueChangeListener(TapConnMap &tc)
171 virtual void sizeValueChanged(const std::string &key, size_t value) {
172 if (key.compare("tap_noop_interval") == 0) {
173 connmap_.setNoopInterval(value);
178 TapConnMap &connmap_;
181 ConnMap::ConnMap(EventuallyPersistentEngine &theEngine)
183 connNotifier_(NULL) {
185 Configuration &config = engine.getConfiguration();
186 vbConnLocks = new SpinLock[vbConnLockNum];
187 size_t max_vbs = config.getMaxVbuckets();
188 for (size_t i = 0; i < max_vbs; ++i) {
189 vbConns.push_back(std::list<connection_t>());
193 void ConnMap::initialize(conn_notifier_type ntype) {
194 connNotifier_ = new ConnNotifier(ntype, *this);
195 connNotifier_->start();
196 ExTask connMgr = new ConnManager(&engine, this);
197 ExecutorPool::get()->schedule(connMgr, NONIO_TASK_IDX);
200 ConnMap::~ConnMap() {
201 delete [] vbConnLocks;
202 if (connNotifier_ != NULL) {
203 connNotifier_->stop();
205 delete connNotifier_;
208 connection_t ConnMap::findByName(const std::string &name) {
209 LockHolder lh(connsLock);
210 return findByName_UNLOCKED(name);
213 connection_t ConnMap::findByName_UNLOCKED(const std::string&name) {
214 connection_t rv(NULL);
215 std::list<connection_t>::iterator iter;
216 for (iter = all.begin(); iter != all.end(); ++iter) {
217 if ((*iter)->getName() == name) {
224 void ConnMap::notifyPausedConnection(connection_t conn, bool schedule) {
225 if (engine.getEpStats().isShutdown) {
229 Notifiable* tp = dynamic_cast<Notifiable*>(conn.get());
231 if (tp && tp->isPaused() && conn->isReserved() &&
232 tp->setNotificationScheduled(true)) {
233 pendingNotifications.push(conn);
234 connNotifier_->notifyMutationEvent(); // Wake up the connection notifier so that
235 // it can notify the event to a given
236 // paused connection.
239 LockHolder rlh(releaseLock);
240 if (tp && tp->isPaused() && conn->isReserved()) {
241 engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
242 tp->setNotifySent(true);
247 void ConnMap::notifyAllPausedConnections() {
248 std::queue<connection_t> queue;
249 pendingNotifications.getAll(queue);
251 LockHolder rlh(releaseLock);
252 while (!queue.empty()) {
253 connection_t &conn = queue.front();
254 Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
256 tp->setNotificationScheduled(false);
257 if (tp->isPaused() && conn->isReserved()) {
258 engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
259 tp->setNotifySent(true);
266 bool ConnMap::notificationQueueEmpty() {
267 return pendingNotifications.empty();
270 void ConnMap::updateVBConnections(connection_t &conn,
271 const std::vector<uint16_t> &vbuckets)
273 Producer *tp = dynamic_cast<Producer*>(conn.get());
278 VBucketFilter new_filter(vbuckets);
279 VBucketFilter diff = tp->getVBucketFilter().filter_diff(new_filter);
280 const std::set<uint16_t> &vset = diff.getVBSet();
282 for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
283 size_t lock_num = (*it) % vbConnLockNum;
284 SpinLockHolder lh (&vbConnLocks[lock_num]);
285 // Remove the connection that is no longer for a given vbucket
286 if (!tp->vbucketFilter.empty() && tp->vbucketFilter(*it)) {
287 std::list<connection_t> &vb_conns = vbConns[*it];
288 std::list<connection_t>::iterator itr = vb_conns.begin();
289 for (; itr != vb_conns.end(); ++itr) {
290 if (conn->getCookie() == (*itr)->getCookie()) {
295 } else { // Add the connection to the vbucket replicator list.
296 std::list<connection_t> &vb_conns = vbConns[*it];
297 vb_conns.push_back(conn);
302 void ConnMap::removeVBConnections(connection_t &conn) {
303 Producer *tp = dynamic_cast<Producer*>(conn.get());
308 const std::set<uint16_t> &vset = tp->vbucketFilter.getVBSet();
309 for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
310 size_t lock_num = (*it) % vbConnLockNum;
311 SpinLockHolder lh (&vbConnLocks[lock_num]);
312 std::list<connection_t> &vb_conns = vbConns[*it];
313 std::list<connection_t>::iterator itr = vb_conns.begin();
314 for (; itr != vb_conns.end(); ++itr) {
315 if (conn->getCookie() == (*itr)->getCookie()) {
323 void ConnMap::addVBConnByVBId(connection_t &conn, int16_t vbid) {
328 size_t lock_num = vbid % vbConnLockNum;
329 SpinLockHolder lh (&vbConnLocks[lock_num]);
330 std::list<connection_t> &vb_conns = vbConns[vbid];
331 vb_conns.push_back(conn);
334 void ConnMap::removeVBConnByVBId_UNLOCKED(connection_t &conn, int16_t vbid) {
339 std::list<connection_t> &vb_conns = vbConns[vbid];
340 std::list<connection_t>::iterator itr = vb_conns.begin();
341 for (; itr != vb_conns.end(); ++itr) {
342 if (conn->getCookie() == (*itr)->getCookie()) {
349 void ConnMap::removeVBConnByVBId(connection_t &conn, int16_t vbid) {
350 size_t lock_num = vbid % vbConnLockNum;
351 SpinLockHolder lh (&vbConnLocks[lock_num]);
352 removeVBConnByVBId_UNLOCKED(conn, vbid);
356 TapConnMap::TapConnMap(EventuallyPersistentEngine &e)
357 : ConnMap(e), nextNoop_(0) {
359 Configuration &config = engine.getConfiguration();
360 noopInterval_ = config.getTapNoopInterval();
361 config.addValueChangedListener("tap_noop_interval",
362 new ConnMapValueChangeListener(*this));
365 TapConsumer *TapConnMap::newConsumer(const void* cookie)
367 LockHolder lh(connsLock);
368 TapConsumer *tc = new TapConsumer(engine, cookie, ConnHandler::getAnonName());
369 connection_t tap(tc);
370 LOG(EXTENSION_LOG_INFO, "%s created", tap->logHeader());
376 TapProducer *TapConnMap::newProducer(const void* cookie,
377 const std::string &name,
379 uint64_t backfillAge,
381 const std::vector<uint16_t> &vbuckets,
382 const std::map<uint16_t, uint64_t> &lastCheckpointIds)
384 LockHolder lh(connsLock);
385 TapProducer *producer(NULL);
387 std::list<connection_t>::iterator iter;
388 for (iter = all.begin(); iter != all.end(); ++iter) {
389 producer = dynamic_cast<TapProducer*>((*iter).get());
390 if (producer && producer->getName() == name) {
391 producer->setExpiryTime((rel_time_t)-1);
392 producer->reconnected();
400 if (producer != NULL) {
401 const void *old_cookie = producer->getCookie();
402 cb_assert(old_cookie);
403 map_.erase(old_cookie);
405 if (tapKeepAlive == 0 || (producer->mayCompleteDumpOrTakeover() && producer->idle())) {
406 LOG(EXTENSION_LOG_INFO,
407 "%s keep alive timed out, should be nuked", producer->logHeader());
408 producer->setName(ConnHandler::getAnonName());
409 producer->setDisconnect(true);
410 producer->setConnected(false);
411 producer->setPaused(true);
412 producer->setExpiryTime(ep_current_time() - 1);
416 LOG(EXTENSION_LOG_INFO, "%s exists... grabbing the channel",
417 producer->logHeader());
418 // Create the dummy expired producer connection for the old connection cookie.
419 // This dummy producer connection will be used for releasing the corresponding
420 // memcached connection.
422 // dliao: TODO no need to deal with tap or dcp separately here for the dummy?
423 TapProducer *n = new TapProducer(engine,
425 ConnHandler::getAnonName(),
427 n->setDisconnect(true);
428 n->setConnected(false);
430 n->setExpiryTime(ep_current_time() - 1);
431 all.push_back(connection_t(n));
435 bool reconnect = false;
436 if (producer == NULL) {
437 producer = new TapProducer(engine, cookie, name, flags);
438 LOG(EXTENSION_LOG_INFO, "%s created", producer->logHeader());
439 all.push_back(connection_t(producer));
441 producer->setCookie(cookie);
442 producer->setReserved(true);
443 producer->setConnected(true);
444 producer->setDisconnect(false);
447 producer->evaluateFlags();
449 connection_t conn(producer);
450 updateVBConnections(conn, vbuckets);
452 producer->setFlagByteorderSupport((flags & TAP_CONNECT_TAP_FIX_FLAG_BYTEORDER) != 0);
453 producer->setBackfillAge(backfillAge, reconnect);
454 producer->setVBucketFilter(vbuckets);
455 producer->registerCursor(lastCheckpointIds);
458 producer->rollback();
462 engine.storeEngineSpecific(cookie, producer);
463 // Clear all previous session stats for this producer.
464 clearPrevSessionStats(producer->getName());
470 void TapConnMap::manageConnections() {
471 // To avoid connections to be stucked in a bogus state forever, we're going
472 // to ping all connections that hasn't tried to walk the tap queue
473 // for this amount of time..
474 const int maxIdleTime = 5;
476 bool addNoop = false;
478 rel_time_t now = ep_current_time();
479 if (now > nextNoop_ && noopInterval_ != (size_t)-1) {
481 nextNoop_ = now + noopInterval_;
484 std::list<connection_t> deadClients;
486 LockHolder lh(connsLock);
487 // We should pause unless we purged some connections or
488 // all queues have items.
489 getExpiredConnections_UNLOCKED(deadClients);
491 // see if I have some channels that I have to signal..
492 std::map<const void*, connection_t>::iterator iter;
493 for (iter = map_.begin(); iter != map_.end(); ++iter) {
494 TapProducer *tp = dynamic_cast<TapProducer*>(iter->second.get());
496 if (tp->supportsAck() && (tp->getExpiryTime() < now) && tp->windowIsFull()) {
497 LOG(EXTENSION_LOG_WARNING,
498 "%s Expired and ack windows is full. Disconnecting...",
500 tp->setDisconnect(true);
501 } else if (addNoop) {
502 tp->setTimeForNoop();
507 // Collect the list of connections that need to be signaled.
508 std::list<connection_t> toNotify;
509 for (iter = map_.begin(); iter != map_.end(); ++iter) {
510 TapProducer *tp = dynamic_cast<TapProducer*>(iter->second.get());
511 if (tp && (tp->isPaused() || tp->doDisconnect()) && !tp->isSuspended()
512 && tp->isReserved()) {
513 if (!tp->sentNotify() ||
514 (tp->getLastWalkTime() + maxIdleTime < now)) {
515 toNotify.push_back(iter->second);
522 LockHolder rlh(releaseLock);
523 std::list<connection_t>::iterator it;
524 for (it = toNotify.begin(); it != toNotify.end(); ++it) {
525 TapProducer *tp = dynamic_cast<TapProducer*>((*it).get());
526 if (tp && tp->isReserved()) {
527 engine.notifyIOComplete(tp->getCookie(), ENGINE_SUCCESS);
528 tp->setNotifySent(true);
532 // Delete all of the dead clients
533 std::list<connection_t>::iterator ii;
534 for (ii = deadClients.begin(); ii != deadClients.end(); ++ii) {
535 LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
536 (*ii)->releaseReference();
537 TapProducer *tp = dynamic_cast<TapProducer*>((*ii).get());
539 ExTask reapTask = new ConnectionReaperCallback(engine, *this, *ii);
540 ExecutorPool::get()->schedule(reapTask, NONIO_TASK_IDX);
545 void TapConnMap::notifyVBConnections(uint16_t vbid)
547 size_t lock_num = vbid % vbConnLockNum;
548 SpinLockHolder lh(&vbConnLocks[lock_num]);
550 std::list<connection_t> &conns = vbConns[vbid];
551 std::list<connection_t>::iterator it = conns.begin();
552 for (; it != conns.end(); ++it) {
553 Notifiable *conn = dynamic_cast<Notifiable*>((*it).get());
554 if (conn && conn->isPaused() && (*it)->isReserved() &&
555 conn->setNotificationScheduled(true)) {
556 pendingNotifications.push(*it);
557 connNotifier_->notifyMutationEvent();
563 bool TapConnMap::setEvents(const std::string &name, std::list<queued_item> *q) {
565 LockHolder lh(connsLock);
567 connection_t tc = findByName_UNLOCKED(name);
569 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
574 notifyPausedConnection(tp, false);
580 void TapConnMap::incrBackfillRemaining(const std::string &name,
581 size_t num_backfill_items) {
582 LockHolder lh(connsLock);
584 connection_t tc = findByName_UNLOCKED(name);
586 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
588 tp->incrBackfillRemaining(num_backfill_items);
592 ssize_t TapConnMap::backfillQueueDepth(const std::string &name) {
594 LockHolder lh(connsLock);
596 connection_t tc = findByName_UNLOCKED(name);
598 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
600 rv = tp->getBackfillQueueSize();
606 void TapConnMap::resetReplicaChain() {
607 LockHolder lh(connsLock);
608 rel_time_t now = ep_current_time();
609 std::list<connection_t>::iterator it = all.begin();
610 for (; it != all.end(); ++it) {
611 connection_t &tc = *it;
612 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
613 if (!(tp && (tp->isConnected() || tp->getExpiryTime() > now))) {
616 LOG(EXTENSION_LOG_INFO, "%s Reset the replication chain",
618 // Get the list of vbuckets that each TAP producer is replicating
619 VBucketFilter vbfilter = tp->getVBucketFilter();
620 std::vector<uint16_t> vblist (vbfilter.getVBSet().begin(), vbfilter.getVBSet().end());
621 // TAP producer sends INITIAL_VBUCKET_STREAM messages to the destination to reset
622 // replica vbuckets, and then backfills items to the destination.
623 tp->scheduleBackfill(vblist);
624 notifyPausedConnection(tp, true);
628 bool TapConnMap::isBackfillCompleted(std::string &name) {
629 LockHolder lh(connsLock);
630 connection_t tc = findByName_UNLOCKED(name);
632 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
634 return tp->isBackfillCompleted();
640 void TapConnMap::addFlushEvent() {
641 LockHolder lh(connsLock);
642 std::list<connection_t>::iterator iter;
643 for (iter = all.begin(); iter != all.end(); iter++) {
644 TapProducer *tp = dynamic_cast<TapProducer*>((*iter).get());
645 if (tp && !tp->dumpQueue) {
651 void TapConnMap::scheduleBackfill(const std::set<uint16_t> &backfillVBuckets) {
652 LockHolder lh(connsLock);
653 rel_time_t now = ep_current_time();
654 std::list<connection_t>::iterator it = all.begin();
655 for (; it != all.end(); ++it) {
656 connection_t &tc = *it;
657 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
658 if (!(tp && (tp->isConnected() || tp->getExpiryTime() > now))) {
662 std::vector<uint16_t> vblist;
663 std::set<uint16_t>::const_iterator vb_it = backfillVBuckets.begin();
664 for (; vb_it != backfillVBuckets.end(); ++vb_it) {
665 if (tp->checkVBucketFilter(*vb_it)) {
666 vblist.push_back(*vb_it);
669 if (!vblist.empty()) {
670 tp->scheduleBackfill(vblist);
671 notifyPausedConnection(tp, true);
676 void TapConnMap::loadPrevSessionStats(const std::map<std::string, std::string> &session_stats) {
677 LockHolder lh(connsLock);
678 std::map<std::string, std::string>::const_iterator it =
679 session_stats.find("ep_force_shutdown");
681 if (it != session_stats.end()) {
682 if (it->second.compare("true") == 0) {
683 prevSessionStats.normalShutdown = false;
685 } else if (!session_stats.empty()) { // possible crash on the previous session.
686 prevSessionStats.normalShutdown = false;
689 std::string tap_prefix("eq_tapq:");
690 for (it = session_stats.begin(); it != session_stats.end(); ++it) {
691 const std::string &stat_name = it->first;
692 if (stat_name.substr(0, 8).compare(tap_prefix) == 0) {
693 if (stat_name.find("backfill_completed") != std::string::npos ||
694 stat_name.find("idle") != std::string::npos) {
695 prevSessionStats.stats[stat_name] = it->second;
701 bool TapConnMap::changeVBucketFilter(const std::string &name,
702 const std::vector<uint16_t> &vbuckets,
703 const std::map<uint16_t, uint64_t> &checkpoints) {
705 LockHolder lh(connsLock);
706 connection_t tc = findByName_UNLOCKED(name);
708 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
709 if (tp && (tp->isConnected() || tp->getExpiryTime() > ep_current_time())) {
710 LOG(EXTENSION_LOG_INFO, "%s Change the vbucket filter",
712 updateVBConnections(tc, vbuckets);
713 tp->setVBucketFilter(vbuckets, true);
714 tp->registerCursor(checkpoints);
717 notifyPausedConnection(tp, true);
723 bool TapConnMap::checkConnectivity(const std::string &name) {
724 LockHolder lh(connsLock);
725 rel_time_t now = ep_current_time();
726 connection_t tc = findByName_UNLOCKED(name);
728 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
729 if (tp && (tp->isConnected() || tp->getExpiryTime() > now)) {
736 bool TapConnMap::closeConnectionByName(const std::string &name) {
738 LockHolder lh(connsLock);
739 return closeConnectionByName_UNLOCKED(name);
742 bool TapConnMap::mapped(connection_t &tc) {
744 std::map<const void*, connection_t>::iterator it;
745 for (it = map_.begin(); it != map_.end(); ++it) {
746 if (it->second.get() == tc.get()) {
754 void TapConnMap::shutdownAllConnections() {
755 LOG(EXTENSION_LOG_WARNING, "Shutting down tap connections!");
757 connNotifier_->stop();
759 // Not safe to acquire both connsLock and releaseLock at the same time
760 // (can trigger deadlock), so first acquire releaseLock to release all
761 // the connections (without changing the list/map), then drop releaseLock,
762 // acquire connsLock and actually clear out the list/map.
763 LockHolder rlh(releaseLock);
764 std::list<connection_t>::iterator ii;
765 for (ii = all.begin(); ii != all.end(); ++ii) {
766 LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
767 (*ii)->releaseReference();
768 TapProducer *tp = dynamic_cast<TapProducer*>((*ii).get());
775 LockHolder lh(connsLock);
780 void TapConnMap::disconnect(const void *cookie) {
781 LockHolder lh(connsLock);
783 Configuration& config = engine.getConfiguration();
784 int tapKeepAlive = static_cast<int>(config.getTapKeepalive());
785 std::map<const void*, connection_t>::iterator iter(map_.find(cookie));
786 if (iter != map_.end()) {
787 if (iter->second.get()) {
788 rel_time_t now = ep_current_time();
789 TapConsumer *tc = dynamic_cast<TapConsumer*>(iter->second.get());
790 if (tc || iter->second->doDisconnect()) {
791 iter->second->setExpiryTime(now - 1);
792 LOG(EXTENSION_LOG_WARNING, "%s disconnected",
793 iter->second->logHeader());
795 else { // must be producer
796 iter->second->setExpiryTime(now + tapKeepAlive);
797 LOG(EXTENSION_LOG_WARNING,
798 "%s disconnected, keep alive for %d seconds",
799 iter->second->logHeader(), tapKeepAlive);
801 iter->second->setConnected(false);
804 LOG(EXTENSION_LOG_WARNING,
805 "Found half-linked tap connection at: %p", cookie);
811 bool TapConnMap::closeConnectionByName_UNLOCKED(const std::string &name) {
813 connection_t tc = findByName_UNLOCKED(name);
815 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
817 LOG(EXTENSION_LOG_WARNING, "%s Connection is closed by force",
819 removeTapCursors_UNLOCKED(tp);
821 tp->setExpiryTime(ep_current_time() - 1);
822 tp->setName(ConnHandler::getAnonName());
823 tp->setDisconnect(true);
831 void TapConnMap::getExpiredConnections_UNLOCKED(std::list<connection_t> &deadClients) {
832 rel_time_t now = ep_current_time();
834 std::list<connection_t>::iterator iter;
835 for (iter = all.begin(); iter != all.end();) {
836 connection_t &tc = *iter;
837 if (tc->isConnected()) {
842 TapProducer *tp = dynamic_cast<TapProducer*>(tc.get());
844 bool is_dead = false;
845 if (tc->getExpiryTime() <= now && !mapped(tc)) {
847 if (!tp->isSuspended()) {
848 deadClients.push_back(tc);
849 removeTapCursors_UNLOCKED(tp);
850 iter = all.erase(iter);
854 deadClients.push_back(tc);
855 iter = all.erase(iter);
866 void TapConnMap::removeTapCursors_UNLOCKED(TapProducer *tp) {
867 // Remove all the checkpoint cursors belonging to the TAP connection.
869 const VBucketMap &vbuckets = engine.getEpStore()->getVBuckets();
870 size_t numOfVBuckets = vbuckets.getSize();
871 // Remove all the cursors belonging to the TAP connection to be purged.
872 for (size_t i = 0; i < numOfVBuckets; ++i) {
873 cb_assert(i <= std::numeric_limits<uint16_t>::max());
874 uint16_t vbid = static_cast<uint16_t>(i);
875 RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
879 if (tp->vbucketFilter(vbid)) {
880 LOG(EXTENSION_LOG_INFO,
881 "%s Remove the TAP cursor from vbucket %d",
882 tp->logHeader(), vbid);
883 vb->checkpointManager.removeTAPCursor(tp->getName());
889 void CompleteBackfillTapOperation::perform(TapProducer *tc, void *) {
890 tc->completeBackfill();
893 void CompleteDiskBackfillTapOperation::perform(TapProducer *tc, void *) {
894 tc->completeDiskBackfill();
897 void ScheduleDiskBackfillTapOperation::perform(TapProducer *tc, void *) {
898 tc->scheduleDiskBackfill();
901 void CompletedBGFetchTapOperation::perform(TapProducer *tc, Item *arg) {
902 if (connToken != tc->getConnectionToken() && !tc->isReconnected()) {
906 tc->completeBGFetchJob(arg, vbid, implicitEnqueue);
909 bool TAPSessionStats::wasReplicationCompleted(const std::string &name) const {
912 std::string backfill_stat(name + ":backfill_completed");
913 std::map<std::string, std::string>::const_iterator it = stats.find(backfill_stat);
914 if (it != stats.end() && (it->second == "false" || !normalShutdown)) {
917 std::string idle_stat(name + ":idle");
918 it = stats.find(idle_stat);
919 if (it != stats.end() && (it->second == "false" || !normalShutdown)) {
926 void TAPSessionStats::clearStats(const std::string &name) {
927 std::string backfill_stat(name + ":backfill_completed");
928 stats.erase(backfill_stat);
929 std::string idle_stat(name + ":idle");
930 stats.erase(idle_stat);
933 DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
938 DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
939 const std::string &name)
941 LockHolder lh(connsLock);
943 std::string conn_name("eq_dcpq:");
944 conn_name.append(name);
946 std::list<connection_t>::iterator iter;
947 for (iter = all.begin(); iter != all.end(); ++iter) {
948 if ((*iter)->getName() == conn_name) {
949 (*iter)->setDisconnect(true);
955 DcpConsumer *dcp = new DcpConsumer(engine, cookie, conn_name);
956 connection_t dc(dcp);
957 LOG(EXTENSION_LOG_INFO, "%s Connection created", dc->logHeader());
964 bool DcpConnMap::isPassiveStreamConnected_UNLOCKED(uint16_t vbucket) {
965 std::list<connection_t>::iterator it;
966 for(it = all.begin(); it != all.end(); it++) {
967 DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it->get());
968 if (dcpConsumer && dcpConsumer->isStreamPresent(vbucket)) {
969 LOG(EXTENSION_LOG_DEBUG, "(vb %d) A DCP passive stream "
970 "is already exists for the vbucket in connection: %s",
971 vbucket, dcpConsumer->logHeader());
978 ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler* conn,
985 LockHolder lh(connsLock);
986 /* Check if a stream (passive) for the vbucket is already present */
987 if (isPassiveStreamConnected_UNLOCKED(vbucket)) {
988 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Failing to add passive stream, "
989 "as one already exists for the vbucket!",
990 conn->logHeader(), vbucket);
991 return ENGINE_KEY_EEXISTS;
994 return conn->addStream(opaque, vbucket, flags);
997 DcpProducer *DcpConnMap::newProducer(const void* cookie,
998 const std::string &name,
1001 LockHolder lh(connsLock);
1003 std::string conn_name("eq_dcpq:");
1004 conn_name.append(name);
1006 std::list<connection_t>::iterator iter;
1007 for (iter = all.begin(); iter != all.end(); ++iter) {
1008 if ((*iter)->getName() == conn_name) {
1009 (*iter)->setDisconnect(true);
1015 DcpProducer *dcp = new DcpProducer(engine, cookie, conn_name, notifyOnly);
1016 LOG(EXTENSION_LOG_INFO, "%s Connection created", dcp->logHeader());
1017 all.push_back(connection_t(dcp));
1023 void DcpConnMap::shutdownAllConnections() {
1024 LOG(EXTENSION_LOG_WARNING, "Shutting down dcp connections!");
1026 connNotifier_->stop();
1028 // Not safe to acquire both connsLock and releaseLock at the same time
1029 // (can trigger deadlock), so first acquire releaseLock to release all
1030 // the connections (without changing the list/map), then drop releaseLock,
1031 // acquire connsLock and actually clear out the list/map.
1032 LockHolder rlh(releaseLock);
1033 std::list<connection_t>::iterator ii;
1034 for (ii = all.begin(); ii != all.end(); ++ii) {
1035 LOG(EXTENSION_LOG_WARNING, "Clean up \"%s\"", (*ii)->getName().c_str());
1036 (*ii)->releaseReference();
1040 LockHolder lh(connsLock);
1041 closeAllStreams_UNLOCKED();
1046 void DcpConnMap::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
1047 LockHolder lh(connsLock);
1048 std::map<const void*, connection_t>::iterator itr = map_.begin();
1049 for (; itr != map_.end(); ++itr) {
1050 DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1052 producer->vbucketStateChanged(vbucket, state);
1057 void DcpConnMap::closeAllStreams_UNLOCKED() {
1058 std::map<const void*, connection_t>::iterator itr = map_.begin();
1059 for (; itr != map_.end(); ++itr) {
1060 DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
1062 producer->closeAllStreams();
1063 producer->cancelCheckpointProcessorTask();
1065 static_cast<DcpConsumer*>(itr->second.get())->closeAllStreams();
1070 void DcpConnMap::disconnect(const void *cookie) {
1071 // Move the connection matching this cookie from the `all` and map_
1072 // data structures (under connsLock).
1075 LockHolder lh(connsLock);
1076 std::list<connection_t>::iterator iter;
1077 for (iter = all.begin(); iter != all.end(); ++iter) {
1078 if ((*iter)->getCookie() == cookie) {
1079 (*iter)->setDisconnect(true);
1084 std::map<const void*, connection_t>::iterator itr(map_.find(cookie));
1085 if (itr != map_.end()) {
1088 LOG(EXTENSION_LOG_INFO, "%s Removing connection",
1095 // Note we shutdown the stream *not* under the connsLock; this is
1096 // because as part of closing a DcpConsumer stream we need to
1097 // acquire PassiveStream::buffer.bufMutex; and that could deadlock
1098 // in EventuallyPersistentStore::setVBucketState, via
1099 // PassiveStream::processBufferedMessages.
1101 DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
1103 producer->closeAllStreams();
1104 producer->cancelCheckpointProcessorTask();
1106 static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
1110 // Finished disconnecting the stream; add it to the
1111 // deadConnections list.
1113 LockHolder lh(connsLock);
1114 deadConnections.push_back(conn);
1118 void DcpConnMap::manageConnections() {
1119 std::list<connection_t> release;
1121 LockHolder lh(connsLock);
1122 while (!deadConnections.empty()) {
1123 connection_t conn = deadConnections.front();
1124 release.push_back(conn);
1125 deadConnections.pop_front();
1128 const int maxIdleTime = 5;
1129 rel_time_t now = ep_current_time();
1131 // Collect the list of connections that need to be signaled.
1132 std::list<connection_t> toNotify;
1133 std::map<const void*, connection_t>::iterator iter;
1134 for (iter = map_.begin(); iter != map_.end(); ++iter) {
1135 connection_t conn = iter->second;
1136 Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
1137 if (tp && (tp->isPaused() || conn->doDisconnect()) &&
1138 conn->isReserved()) {
1139 if (!tp->sentNotify() ||
1140 (conn->getLastWalkTime() + maxIdleTime < now)) {
1141 toNotify.push_back(iter->second);
1148 LockHolder rlh(releaseLock);
1149 std::list<connection_t>::iterator it;
1150 for (it = toNotify.begin(); it != toNotify.end(); ++it) {
1152 static_cast<Notifiable*>(static_cast<Producer*>((*it).get()));
1153 if (tp && (*it)->isReserved()) {
1154 engine.notifyIOComplete((*it)->getCookie(), ENGINE_SUCCESS);
1155 tp->setNotifySent(true);
1159 while (!release.empty()) {
1160 connection_t conn = release.front();
1161 conn->releaseReference();
1162 release.pop_front();
1163 removeVBConnections(conn);
1167 void DcpConnMap::removeVBConnections(connection_t &conn) {
1168 Producer *tp = dynamic_cast<Producer*>(conn.get());
1173 DcpProducer *prod = static_cast<DcpProducer*>(tp);
1174 std::list<uint16_t> vblist = prod->getVBList();
1175 std::list<uint16_t>::iterator it = vblist.begin();
1176 for (; it != vblist.end(); ++it) {
1177 uint16_t vbid = *it;
1178 size_t lock_num = vbid % vbConnLockNum;
1179 SpinLockHolder lh (&vbConnLocks[lock_num]);
1180 std::list<connection_t> &vb_conns = vbConns[vbid];
1181 std::list<connection_t>::iterator itr = vb_conns.begin();
1182 for (; itr != vb_conns.end(); ++itr) {
1183 if (conn->getCookie() == (*itr)->getCookie()) {
1184 vb_conns.erase(itr);
1191 void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
1193 size_t lock_num = vbid % vbConnLockNum;
1194 SpinLockHolder lh(&vbConnLocks[lock_num]);
1196 std::list<connection_t> &conns = vbConns[vbid];
1197 std::list<connection_t>::iterator it = conns.begin();
1198 for (; it != conns.end(); ++it) {
1199 DcpProducer *conn = static_cast<DcpProducer*>((*it).get());
1200 conn->notifySeqnoAvailable(vbid, bySeqno);
1204 void DcpConnMap::addStats(ADD_STAT add_stat, const void *c) {
1205 LockHolder lh(connsLock);
1206 add_casted_stat("ep_dcp_dead_conn_count", deadConnections.size(),