19194ae951cae8ad562aaede6f35a3eda0a2af2d
[ep-engine.git] / src / dcp-producer.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "backfill.h"
21 #include "ep_engine.h"
22 #include "failover-table.h"
23 #include "dcp-producer.h"
24 #include "dcp-response.h"
25 #include "dcp-stream.h"
26
27 const uint32_t DcpProducer::defaultNoopInerval = 20;
28
29 DcpProducer::BufferLog::State DcpProducer::BufferLog::getState_UNLOCKED() {
30     if (isEnabled_UNLOCKED()) {
31         if (isFull_UNLOCKED()) {
32             return Full;
33         } else {
34             return SpaceAvailable;
35         }
36     }
37     return Disabled;
38 }
39
40 void DcpProducer::BufferLog::setBufferSize(size_t maxBytes) {
41     WriterLockHolder lh(logLock);
42     this->maxBytes = maxBytes;
43     if (maxBytes == 0) {
44         bytesSent = 0;
45         ackedBytes = 0;
46     }
47 }
48
49 bool DcpProducer::BufferLog::insert(size_t bytes) {
50     WriterLockHolder wlh(logLock);
51     bool inserted = false;
52     // If the log is not enabled
53     // or there is space, allow the insert
54     if (!isEnabled_UNLOCKED() || !isFull_UNLOCKED()) {
55         bytesSent += bytes;
56         inserted = true;
57     }
58     return inserted;
59 }
60
61 void DcpProducer::BufferLog::release_UNLOCKED(size_t bytes) {
62     if (bytesSent >= bytes) {
63         bytesSent -= bytes;
64     } else {
65         bytesSent = 0;
66     }
67 }
68
69 bool DcpProducer::BufferLog::pauseIfFull() {
70     ReaderLockHolder rlh(logLock);
71     if (getState_UNLOCKED() == Full) {
72         producer.setPaused(true);
73         return true;
74     }
75     return false;
76 }
77
78 void DcpProducer::BufferLog::unpauseIfSpaceAvailable() {
79     ReaderLockHolder rlh(logLock);
80     if (getState_UNLOCKED() != Full) {
81         producer.notifyPaused(true);
82     }
83 }
84
85 void DcpProducer::BufferLog::acknowledge(size_t bytes) {
86     WriterLockHolder wlh(logLock);
87     State state = getState_UNLOCKED();
88     if (state != Disabled) {
89         release_UNLOCKED(bytes);
90         ackedBytes += bytes;
91         if (state == Full) {
92             producer.notifyPaused(true);
93         }
94     }
95 }
96
97 void DcpProducer::BufferLog::addStats(ADD_STAT add_stat, const void *c) {
98     ReaderLockHolder rlh(logLock);
99     if (isEnabled_UNLOCKED()) {
100         producer.addStat("max_buffer_bytes", maxBytes, add_stat, c);
101         producer.addStat("unacked_bytes", bytesSent, add_stat, c);
102         producer.addStat("total_acked_bytes", ackedBytes, add_stat, c);
103         producer.addStat("flow_control", "enabled", add_stat, c);
104     } else {
105         producer.addStat("flow_control", "disabled", add_stat, c);
106     }
107 }
108
109 DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
110                          const std::string &name, bool isNotifier)
111     : Producer(e, cookie, name), rejectResp(NULL),
112       notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(*this),
113       itemsSent(0), totalBytesSent(0) {
114     setSupportAck(true);
115     setReserved(true);
116     setPaused(true);
117
118     if (notifyOnly) {
119         setLogHeader("DCP (Notifier) " + getName() + " -");
120     } else {
121         setLogHeader("DCP (Producer) " + getName() + " -");
122     }
123
124     if (getName().find("replication") != std::string::npos) {
125         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH);
126     } else if (getName().find("xdcr") != std::string::npos) {
127         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
128     } else if (getName().find("views") != std::string::npos) {
129         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
130     }
131
132     // The consumer assigns opaques starting at 0 so lets have the producer
133     //start using opaques at 10M to prevent any opaque conflicts.
134     noopCtx.opaque = 10000000;
135     noopCtx.sendTime = ep_current_time();
136
137     // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the
138     // noop interval to 20 seconds by default, but in post 3.0 releases we set
139     // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
140     // noop interval of the producer when connecting so in an all 3.0.1+ cluster
141     // this value will be overriden. In 3.0 however we do not set the noop
142     // interval so setting this value will make sure we don't disconnect on
143     // accident due to the producer and the consumer having a different noop
144     // interval.
145     noopCtx.noopInterval = defaultNoopInerval;
146     noopCtx.pendingRecv = false;
147     noopCtx.enabled = false;
148
149     // 1 task per producer
150     checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e);
151     ExecutorPool::get()->schedule(checkpointCreatorTask, AUXIO_TASK_IDX);
152 }
153
154 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
155                                              uint32_t opaque,
156                                              uint16_t vbucket,
157                                              uint64_t start_seqno,
158                                              uint64_t end_seqno,
159                                              uint64_t vbucket_uuid,
160                                              uint64_t snap_start_seqno,
161                                              uint64_t snap_end_seqno,
162                                              uint64_t *rollback_seqno,
163                                              dcp_add_failover_log callback) {
164     if (doDisconnect()) {
165         return ENGINE_DISCONNECT;
166     }
167
168     RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
169     if (!vb) {
170         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
171             "this vbucket doesn't exist", logHeader(), vbucket);
172         return ENGINE_NOT_MY_VBUCKET;
173     }
174
175     if (vb->checkpointManager.getOpenCheckpointId() == 0) {
176         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
177             "this vbucket is in backfill state", logHeader(), vbucket);
178         return ENGINE_TMPFAIL;
179     }
180
181     if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
182         end_seqno = vb->getHighSeqno();
183     }
184
185     if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
186         end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
187     }
188
189     if (!notifyOnly && start_seqno > end_seqno) {
190         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
191             "the start seqno (%llu) is larger than the end seqno (%llu)",
192             logHeader(), vbucket, start_seqno, end_seqno);
193         return ENGINE_ERANGE;
194     }
195
196     if (!notifyOnly && !(snap_start_seqno <= start_seqno &&
197         start_seqno <= snap_end_seqno)) {
198         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
199             "the snap start seqno (%llu) <= start seqno (%llu) <= snap end "
200             "seqno (%llu) is required", logHeader(), vbucket, snap_start_seqno,
201             start_seqno, snap_end_seqno);
202         return ENGINE_ERANGE;
203     }
204
205     bool add_vb_conn_map = true;
206     std::map<uint16_t, stream_t>::iterator itr;
207     {
208         WriterLockHolder wlh(streamsMutex);
209         if ((itr = streams.find(vbucket)) != streams.end()) {
210             if (itr->second->getState() != STREAM_DEAD) {
211                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
212                     " because a stream already exists for this vbucket",
213                     logHeader(), vbucket);
214                 return ENGINE_KEY_EEXISTS;
215             } else {
216                 streams.erase(vbucket);
217
218                 // Don't need to add an entry to vbucket-to-conns map
219                 add_vb_conn_map = false;
220             }
221         }
222     }
223
224     // If we are a notify stream then we can't use the start_seqno supplied
225     // since if it is greater than the current high seqno then it will always
226     // trigger a rollback. As a result we should use the current high seqno for
227     // rollback purposes.
228     uint64_t notifySeqno = start_seqno;
229     if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) {
230         start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
231     }
232
233     if (vb->failovers->needsRollback(start_seqno, vb->getHighSeqno(),
234                                      vbucket_uuid, snap_start_seqno,
235                                      snap_end_seqno, vb->getPurgeSeqno(),
236                                      rollback_seqno)) {
237         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed "
238             "because a rollback to seqno %llu is required (start seqno %llu, "
239             "vb_uuid %llu, snapStartSeqno %llu, snapEndSeqno %llu)",
240             logHeader(), vbucket, *rollback_seqno, start_seqno, vbucket_uuid,
241             snap_start_seqno, snap_end_seqno);
242         return ENGINE_ROLLBACK;
243     }
244
245     ENGINE_ERROR_CODE rv = vb->failovers->addFailoverLog(getCookie(), callback);
246     if (rv != ENGINE_SUCCESS) {
247         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Couldn't add failover log to "
248             "stream request due to error %d", logHeader(), vbucket, rv);
249         return rv;
250     }
251
252     stream_t s;
253     if (notifyOnly) {
254         s = new NotifierStream(&engine_, this, getName(), flags,
255                                opaque, vbucket, notifySeqno,
256                                end_seqno, vbucket_uuid,
257                                snap_start_seqno, snap_end_seqno);
258    } else {
259         s = new ActiveStream(&engine_, this, getName(), flags,
260                              opaque, vbucket, start_seqno,
261                              end_seqno, vbucket_uuid,
262                              snap_start_seqno, snap_end_seqno);
263         static_cast<ActiveStream*>(s.get())->setActive();
264     }
265
266     {
267         WriterLockHolder wlh(streamsMutex);
268         streams[vbucket] = s;
269     }
270
271     ready.pushUnique(vbucket);
272
273     if (add_vb_conn_map) {
274         connection_t conn(this);
275         engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket);
276     }
277
278     return rv;
279 }
280
281 ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket,
282                                               dcp_add_failover_log callback) {
283     (void) opaque;
284     if (doDisconnect()) {
285         return ENGINE_DISCONNECT;
286     }
287
288     RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
289     if (!vb) {
290         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
291             "because this vbucket doesn't exist", logHeader(), vbucket);
292         return ENGINE_NOT_MY_VBUCKET;
293     }
294
295     return vb->failovers->addFailoverLog(getCookie(), callback);
296 }
297
298 ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
299     setLastWalkTime();
300
301     if (doDisconnect()) {
302         return ENGINE_DISCONNECT;
303     }
304
305     ENGINE_ERROR_CODE ret;
306     if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
307         return ret;
308     }
309
310     DcpResponse *resp;
311     if (rejectResp) {
312         resp = rejectResp;
313         rejectResp = NULL;
314     } else {
315         resp = getNextItem();
316         if (!resp) {
317             return ENGINE_SUCCESS;
318         }
319     }
320
321     ret = ENGINE_SUCCESS;
322
323     Item* itmCpy = NULL;
324     if (resp->getEvent() == DCP_MUTATION) {
325         itmCpy = static_cast<MutationResponse*>(resp)->getItemCopy();
326     }
327
328     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
329                                                                      true);
330     switch (resp->getEvent()) {
331         case DCP_STREAM_END:
332         {
333             StreamEndResponse *se = static_cast<StreamEndResponse*>(resp);
334             ret = producers->stream_end(getCookie(), se->getOpaque(),
335                                         se->getVbucket(), se->getFlags());
336             break;
337         }
338         case DCP_MUTATION:
339         {
340             MutationResponse *m = dynamic_cast<MutationResponse*> (resp);
341             ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
342                                       m->getVBucket(), m->getBySeqno(),
343                                       m->getRevSeqno(), 0, NULL, 0,
344                                       m->getItem()->getNRUValue());
345             break;
346         }
347         case DCP_DELETION:
348         {
349             MutationResponse *m = static_cast<MutationResponse*>(resp);
350             ret = producers->deletion(getCookie(), m->getOpaque(),
351                                       m->getItem()->getKey().c_str(),
352                                       m->getItem()->getNKey(),
353                                       m->getItem()->getCas(),
354                                       m->getVBucket(), m->getBySeqno(),
355                                       m->getRevSeqno(), NULL, 0);
356             break;
357         }
358         case DCP_SNAPSHOT_MARKER:
359         {
360             SnapshotMarker *s = static_cast<SnapshotMarker*>(resp);
361             ret = producers->marker(getCookie(), s->getOpaque(),
362                                     s->getVBucket(),
363                                     s->getStartSeqno(),
364                                     s->getEndSeqno(),
365                                     s->getFlags());
366             break;
367         }
368         case DCP_SET_VBUCKET:
369         {
370             SetVBucketState *s = static_cast<SetVBucketState*>(resp);
371             ret = producers->set_vbucket_state(getCookie(), s->getOpaque(),
372                                                s->getVBucket(), s->getState());
373             break;
374         }
375         default:
376         {
377             LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%d), "
378                 "disconnecting", logHeader(), resp->getEvent());
379             ret = ENGINE_DISCONNECT;
380             break;
381         }
382     }
383
384     ObjectRegistry::onSwitchThread(epe);
385     if (resp->getEvent() == DCP_MUTATION && ret != ENGINE_SUCCESS) {
386         delete itmCpy;
387     }
388
389     if (ret == ENGINE_E2BIG) {
390         rejectResp = resp;
391     } else {
392         delete resp;
393     }
394
395     lastSendTime.store(ep_current_time(), memory_order_relaxed);
396     return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
397 }
398
399 ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
400                                                      uint16_t vbucket,
401                                                      uint32_t buffer_bytes) {
402     log.acknowledge(buffer_bytes);
403     return ENGINE_SUCCESS;
404 }
405
406 ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
407                                        uint16_t nkey, const void* value,
408                                        uint32_t nvalue) {
409     const char* param = static_cast<const char*>(key);
410     std::string keyStr(static_cast<const char*>(key), nkey);
411     std::string valueStr(static_cast<const char*>(value), nvalue);
412
413     if (strncmp(param, "connection_buffer_size", nkey) == 0) {
414         uint32_t size;
415         if (parseUint32(valueStr.c_str(), &size)) {
416             /* Size 0 implies the client (DCP consumer) does not support
417                flow control */
418             log.setBufferSize(size);
419             return ENGINE_SUCCESS;
420         }
421     } else if (strncmp(param, "stream_buffer_size", nkey) == 0) {
422         LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter stream_buffer_size is"
423             "not supported by this engine", logHeader());
424         return ENGINE_ENOTSUP;
425     } else if (strncmp(param, "enable_noop", nkey) == 0) {
426         if (valueStr.compare("true") == 0) {
427             noopCtx.enabled = true;
428         } else {
429             noopCtx.enabled = false;
430         }
431         return ENGINE_SUCCESS;
432     } else if (strncmp(param, "set_noop_interval", nkey) == 0) {
433         if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) {
434             return ENGINE_SUCCESS;
435         }
436     }
437
438     LOG(EXTENSION_LOG_WARNING, "%s Invalid ctrl parameter '%s' for %s",
439         logHeader(), valueStr.c_str(), keyStr.c_str());
440
441     return ENGINE_EINVAL;
442 }
443
444 ENGINE_ERROR_CODE DcpProducer::handleResponse(
445                                         protocol_binary_response_header *resp) {
446     if (doDisconnect()) {
447         return ENGINE_DISCONNECT;
448     }
449
450     uint8_t opcode = resp->response.opcode;
451     if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE ||
452         opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
453         protocol_binary_response_dcp_stream_req* pkt =
454             reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
455         uint32_t opaque = pkt->message.header.response.opaque;
456
457         stream_t active_stream;
458         std::map<uint16_t, stream_t>::iterator itr;
459         {
460             ReaderLockHolder rlh(streamsMutex);
461             for (itr = streams.begin() ; itr != streams.end(); ++itr) {
462                 active_stream = itr->second;
463                 Stream *str = active_stream.get();
464                 if (str && str->getType() == STREAM_ACTIVE) {
465                     ActiveStream* as = static_cast<ActiveStream*>(str);
466                     if (as && opaque == itr->second->getOpaque()) {
467                         break;
468                     }
469                 }
470             }
471         }
472
473         if (itr != streams.end()) {
474             ActiveStream *as = static_cast<ActiveStream*>(active_stream.get());
475             if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
476                 as->setVBucketStateAckRecieved();
477             } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
478                 as->snapshotMarkerAckReceived();
479             }
480         }
481
482         return ENGINE_SUCCESS;
483     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION ||
484         opcode == PROTOCOL_BINARY_CMD_DCP_DELETION ||
485         opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION ||
486         opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) {
487         // TODO: When nacking is implemented we need to handle these responses
488         return ENGINE_SUCCESS;
489     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) {
490         if (noopCtx.opaque == resp->response.opaque) {
491             noopCtx.pendingRecv = false;
492             return ENGINE_SUCCESS;
493         }
494     }
495
496     LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
497         "disconnecting", logHeader(), opcode);
498
499     return ENGINE_DISCONNECT;
500 }
501
502 ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
503     if (doDisconnect()) {
504         return ENGINE_DISCONNECT;
505     }
506
507     stream_t stream = findStreamByVbid(vbucket);
508     ENGINE_ERROR_CODE ret;
509     if (!stream) {
510         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
511             "stream exists for this vbucket", logHeader(), vbucket);
512         return ENGINE_KEY_ENOENT;
513     } else if (!stream->isActive()) {
514         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
515             "stream is already marked as dead", logHeader(), vbucket);
516         connection_t conn(this);
517         engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
518         ret = ENGINE_KEY_ENOENT;
519     } else {
520         stream->setDead(END_STREAM_CLOSED);
521         connection_t conn(this);
522         engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
523         ret = ENGINE_SUCCESS;
524     }
525
526     {
527         WriterLockHolder wlh(streamsMutex);
528         streams.erase(vbucket);
529     }
530
531     return ret;
532 }
533
534 void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
535     Producer::addStats(add_stat, c);
536
537     addStat("items_sent", getItemsSent(), add_stat, c);
538     addStat("items_remaining", getItemsRemaining(), add_stat, c);
539     addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
540     addStat("last_sent_time", lastSendTime.load(memory_order_relaxed), add_stat,
541             c);
542     addStat("noop_enabled", noopCtx.enabled, add_stat, c);
543     addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
544
545     log.addStats(add_stat, c);
546
547     ReaderLockHolder rlh(streamsMutex);
548     std::map<uint16_t, stream_t>::iterator itr;
549     for (itr = streams.begin(); itr != streams.end(); ++itr) {
550         itr->second->addStats(add_stat, c);
551     }
552 }
553
554 void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
555                                    uint16_t vbid) {
556     stream_t stream = findStreamByVbid(vbid);
557     if (stream && stream->getType() == STREAM_ACTIVE) {
558         ActiveStream* as = static_cast<ActiveStream*>(stream.get());
559         if (as) {
560             as->addTakeoverStats(add_stat, c);
561         }
562     }
563 }
564
565 void DcpProducer::aggregateQueueStats(ConnCounter* aggregator) {
566     if (!aggregator) {
567         LOG(EXTENSION_LOG_WARNING, "%s Pointer to the queue stats aggregator"
568             " is NULL!!!", logHeader());
569         return;
570     }
571     aggregator->conn_queueDrain += itemsSent;
572     aggregator->conn_totalBytes += totalBytesSent;
573     aggregator->conn_queueRemaining += getItemsRemaining();
574     aggregator->conn_queueBackfillRemaining += totalBackfillBacklogs;
575 }
576
577 void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
578     stream_t stream = findStreamByVbid(vbucket);
579     if (stream && stream->isActive()) {
580         stream->notifySeqnoAvailable(seqno);
581     }
582 }
583
584 void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
585     stream_t stream = findStreamByVbid(vbucket);
586     if (stream) {
587         stream->setDead(END_STREAM_STATE);
588     }
589 }
590
591 void DcpProducer::closeAllStreams() {
592     std::list<uint16_t> vblist;
593     {
594         WriterLockHolder wlh(streamsMutex);
595         while (!streams.empty()) {
596             std::map<uint16_t, stream_t>::iterator itr = streams.begin();
597             uint16_t vbid = itr->first;
598             itr->second->setDead(END_STREAM_DISCONNECTED);
599             streams.erase(vbid);
600             vblist.push_back(vbid);
601         }
602     }
603
604     connection_t conn(this);
605     std::list<uint16_t>::iterator it = vblist.begin();
606     for (; it != vblist.end(); ++it) {
607         engine_.getDcpConnMap().removeVBConnByVBId(conn, *it);
608     }
609 }
610
611 const char* DcpProducer::getType() const {
612     if (notifyOnly) {
613         return "notifier";
614     } else {
615         return "producer";
616     }
617 }
618
619 DcpResponse* DcpProducer::getNextItem() {
620     do {
621         setPaused(false);
622
623         uint16_t vbucket = 0;
624         while (ready.popFront(vbucket)) {
625             if (log.pauseIfFull()) {
626                 ready.pushUnique(vbucket);
627                 return NULL;
628             }
629
630             DcpResponse* op = NULL;
631             stream_t stream;
632             {
633                 ReaderLockHolder rlh(streamsMutex);
634                 std::map<uint16_t, stream_t>::iterator it = streams.find(vbucket);
635                 if (it == streams.end()) {
636                     continue;
637                 }
638                 stream.reset(it->second);
639             }
640
641             op = stream->next();
642
643             if (!op) {
644                 // stream is empty, try another vbucket.
645                 continue;
646             }
647
648             switch (op->getEvent()) {
649                 case DCP_SNAPSHOT_MARKER:
650                 case DCP_MUTATION:
651                 case DCP_DELETION:
652                 case DCP_EXPIRATION:
653                 case DCP_STREAM_END:
654                 case DCP_SET_VBUCKET:
655                     break;
656                 default:
657                     LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
658                         " an unexpected event %d", logHeader(), op->getEvent());
659                     abort();
660             }
661
662             ready.pushUnique(vbucket);
663
664             if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
665                 op->getEvent() == DCP_EXPIRATION) {
666                 itemsSent++;
667             }
668
669             totalBytesSent.fetch_add(op->getMessageSize());
670
671             return op;
672         }
673
674         // flag we are paused
675         setPaused(true);
676
677         // re-check the ready queue.
678         // A new vbucket could of became ready and the notifier could of seen
679         // paused = false, so reloop so we don't miss an operation.
680     } while(!ready.empty());
681
682     return NULL;
683 }
684
685 void DcpProducer::setDisconnect(bool disconnect) {
686     ConnHandler::setDisconnect(disconnect);
687
688     if (disconnect) {
689         ReaderLockHolder rlh(streamsMutex);
690         std::map<uint16_t, stream_t>::iterator itr = streams.begin();
691         for (; itr != streams.end(); ++itr) {
692             itr->second->setDead(END_STREAM_DISCONNECTED);
693         }
694     }
695 }
696
697 void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
698     if (ready.pushUnique(vbucket)) {
699         log.unpauseIfSpaceAvailable();
700     }
701 }
702
703 void DcpProducer::notifyPaused(bool schedule) {
704     engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
705 }
706
707 ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) {
708     if (noopCtx.enabled) {
709         size_t sinceTime = ep_current_time() - noopCtx.sendTime;
710         if (noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
711             LOG(EXTENSION_LOG_WARNING, "%s Disconnected because the connection"
712                 " appears to be dead", logHeader());
713             return ENGINE_DISCONNECT;
714         } else if (!noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
715             ENGINE_ERROR_CODE ret;
716             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
717             ret = producers->noop(getCookie(), ++noopCtx.opaque);
718             ObjectRegistry::onSwitchThread(epe);
719
720             if (ret == ENGINE_SUCCESS) {
721                 ret = ENGINE_WANT_MORE;
722                 noopCtx.pendingRecv = true;
723                 noopCtx.sendTime = ep_current_time();
724                 lastSendTime.store(ep_current_time(), memory_order_relaxed);
725             }
726             return ret;
727         }
728     }
729     return ENGINE_FAILED;
730 }
731
732 bool DcpProducer::isTimeForNoop() {
733     // Not Implemented
734     return false;
735 }
736
737 void DcpProducer::setTimeForNoop() {
738     // Not Implemented
739 }
740
741 void DcpProducer::clearQueues() {
742     WriterLockHolder wlh(streamsMutex);
743     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
744     for (; itr != streams.end(); ++itr) {
745         itr->second->clear();
746     }
747 }
748
749 void DcpProducer::appendQueue(std::list<queued_item> *q) {
750     (void) q;
751     abort(); // Not Implemented
752 }
753
754 size_t DcpProducer::getBackfillQueueSize() {
755     return totalBackfillBacklogs;
756 }
757
758 size_t DcpProducer::getItemsSent() {
759     return itemsSent;
760 }
761
762 size_t DcpProducer::getItemsRemaining() {
763     size_t remainingSize = 0;
764     ReaderLockHolder rlh(streamsMutex);
765     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
766     for (; itr != streams.end(); ++itr) {
767         Stream *s = (itr->second).get();
768
769         if (s->getType() == STREAM_ACTIVE) {
770             ActiveStream *as = static_cast<ActiveStream *>(s);
771             remainingSize += as->getItemsRemaining();
772         }
773     }
774
775     return remainingSize;
776 }
777
778 size_t DcpProducer::getTotalBytes() {
779     return totalBytesSent;
780 }
781
782 stream_t DcpProducer::findStreamByVbid(uint16_t vbid) {
783     ReaderLockHolder rlh(streamsMutex);
784     stream_t stream;
785     std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
786     if (itr != streams.end()) {
787         stream = itr->second;
788     }
789     return stream;
790 }
791
792 std::list<uint16_t> DcpProducer::getVBList() {
793     ReaderLockHolder rlh(streamsMutex);
794     std::list<uint16_t> vblist;
795     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
796     for (; itr != streams.end(); ++itr) {
797         vblist.push_back(itr->first);
798     }
799     return vblist;
800 }
801
802 bool DcpProducer::windowIsFull() {
803     abort(); // Not Implemented
804 }
805
806 void DcpProducer::flush() {
807     abort(); // Not Implemented
808 }
809
810 bool DcpProducer::bufferLogInsert(size_t bytes) {
811     return log.insert(bytes);
812 }
813
814 void DcpProducer::scheduleCheckpointProcessorTask(stream_t s) {
815     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
816         ->schedule(s);
817 }
818
819 void DcpProducer::cancelCheckpointProcessorTask() {
820     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
821         ->clearQueues();
822     ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
823 }