MB-19843: Modify the end_seqno in DCP stream request after checking for rollback
[ep-engine.git] / src / dcp-producer.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "backfill.h"
21 #include "ep_engine.h"
22 #include "failover-table.h"
23 #include "dcp-producer.h"
24 #include "dcp-response.h"
25 #include "dcp-stream.h"
26
27 const uint32_t DcpProducer::defaultNoopInerval = 20;
28
29 DcpProducer::BufferLog::State DcpProducer::BufferLog::getState_UNLOCKED() {
30     if (isEnabled_UNLOCKED()) {
31         if (isFull_UNLOCKED()) {
32             return Full;
33         } else {
34             return SpaceAvailable;
35         }
36     }
37     return Disabled;
38 }
39
40 void DcpProducer::BufferLog::setBufferSize(size_t maxBytes) {
41     WriterLockHolder lh(logLock);
42     this->maxBytes = maxBytes;
43     if (maxBytes == 0) {
44         bytesSent = 0;
45         ackedBytes = 0;
46     }
47 }
48
49 bool DcpProducer::BufferLog::insert(size_t bytes) {
50     WriterLockHolder wlh(logLock);
51     bool inserted = false;
52     // If the log is not enabled
53     // or there is space, allow the insert
54     if (!isEnabled_UNLOCKED() || !isFull_UNLOCKED()) {
55         bytesSent += bytes;
56         inserted = true;
57     }
58     return inserted;
59 }
60
61 void DcpProducer::BufferLog::release_UNLOCKED(size_t bytes) {
62     if (bytesSent >= bytes) {
63         bytesSent -= bytes;
64     } else {
65         bytesSent = 0;
66     }
67 }
68
69 bool DcpProducer::BufferLog::pauseIfFull() {
70     ReaderLockHolder rlh(logLock);
71     if (getState_UNLOCKED() == Full) {
72         producer.setPaused(true);
73         return true;
74     }
75     return false;
76 }
77
78 void DcpProducer::BufferLog::unpauseIfSpaceAvailable() {
79     ReaderLockHolder rlh(logLock);
80     if (getState_UNLOCKED() != Full) {
81         producer.notifyPaused(true);
82     }
83 }
84
85 void DcpProducer::BufferLog::acknowledge(size_t bytes) {
86     WriterLockHolder wlh(logLock);
87     State state = getState_UNLOCKED();
88     if (state != Disabled) {
89         release_UNLOCKED(bytes);
90         ackedBytes += bytes;
91         if (state == Full) {
92             producer.notifyPaused(true);
93         }
94     }
95 }
96
97 void DcpProducer::BufferLog::addStats(ADD_STAT add_stat, const void *c) {
98     ReaderLockHolder rlh(logLock);
99     if (isEnabled_UNLOCKED()) {
100         producer.addStat("max_buffer_bytes", maxBytes, add_stat, c);
101         producer.addStat("unacked_bytes", bytesSent, add_stat, c);
102         producer.addStat("total_acked_bytes", ackedBytes, add_stat, c);
103         producer.addStat("flow_control", "enabled", add_stat, c);
104     } else {
105         producer.addStat("flow_control", "disabled", add_stat, c);
106     }
107 }
108
109 DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
110                          const std::string &name, bool isNotifier)
111     : Producer(e, cookie, name), rejectResp(NULL),
112       notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(*this),
113       itemsSent(0), totalBytesSent(0) {
114     setSupportAck(true);
115     setReserved(true);
116     setPaused(true);
117
118     if (notifyOnly) {
119         setLogHeader("DCP (Notifier) " + getName() + " -");
120     } else {
121         setLogHeader("DCP (Producer) " + getName() + " -");
122     }
123
124     if (getName().find("replication") != std::string::npos) {
125         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH);
126     } else if (getName().find("xdcr") != std::string::npos) {
127         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
128     } else if (getName().find("views") != std::string::npos) {
129         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
130     }
131
132     // The consumer assigns opaques starting at 0 so lets have the producer
133     //start using opaques at 10M to prevent any opaque conflicts.
134     noopCtx.opaque = 10000000;
135     noopCtx.sendTime = ep_current_time();
136
137     // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the
138     // noop interval to 20 seconds by default, but in post 3.0 releases we set
139     // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
140     // noop interval of the producer when connecting so in an all 3.0.1+ cluster
141     // this value will be overriden. In 3.0 however we do not set the noop
142     // interval so setting this value will make sure we don't disconnect on
143     // accident due to the producer and the consumer having a different noop
144     // interval.
145     noopCtx.noopInterval = defaultNoopInerval;
146     noopCtx.pendingRecv = false;
147     noopCtx.enabled = false;
148
149     // 1 task per producer
150     checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e);
151     ExecutorPool::get()->schedule(checkpointCreatorTask, AUXIO_TASK_IDX);
152 }
153
154 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
155                                              uint32_t opaque,
156                                              uint16_t vbucket,
157                                              uint64_t start_seqno,
158                                              uint64_t end_seqno,
159                                              uint64_t vbucket_uuid,
160                                              uint64_t snap_start_seqno,
161                                              uint64_t snap_end_seqno,
162                                              uint64_t *rollback_seqno,
163                                              dcp_add_failover_log callback) {
164     if (doDisconnect()) {
165         return ENGINE_DISCONNECT;
166     }
167
168     RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
169     if (!vb) {
170         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
171             "this vbucket doesn't exist", logHeader(), vbucket);
172         return ENGINE_NOT_MY_VBUCKET;
173     }
174
175     if (vb->checkpointManager.getOpenCheckpointId() == 0) {
176         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
177             "this vbucket is in backfill state", logHeader(), vbucket);
178         return ENGINE_TMPFAIL;
179     }
180
181     if (!notifyOnly && start_seqno > end_seqno) {
182         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
183             "the start seqno (%llu) is larger than the end seqno (%llu); "
184             "Incorrect params passed by the DCP client",
185             logHeader(), vbucket, start_seqno, end_seqno);
186         return ENGINE_ERANGE;
187     }
188
189     if (!notifyOnly && !(snap_start_seqno <= start_seqno &&
190         start_seqno <= snap_end_seqno)) {
191         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
192             "the snap start seqno (%llu) <= start seqno (%llu) <= snap end "
193             "seqno (%llu) is required", logHeader(), vbucket, snap_start_seqno,
194             start_seqno, snap_end_seqno);
195         return ENGINE_ERANGE;
196     }
197
198     bool add_vb_conn_map = true;
199     std::map<uint16_t, stream_t>::iterator itr;
200     {
201         WriterLockHolder wlh(streamsMutex);
202         if ((itr = streams.find(vbucket)) != streams.end()) {
203             if (itr->second->getState() != STREAM_DEAD) {
204                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
205                     " because a stream already exists for this vbucket",
206                     logHeader(), vbucket);
207                 return ENGINE_KEY_EEXISTS;
208             } else {
209                 streams.erase(vbucket);
210
211                 // Don't need to add an entry to vbucket-to-conns map
212                 add_vb_conn_map = false;
213             }
214         }
215     }
216
217     // If we are a notify stream then we can't use the start_seqno supplied
218     // since if it is greater than the current high seqno then it will always
219     // trigger a rollback. As a result we should use the current high seqno for
220     // rollback purposes.
221     uint64_t notifySeqno = start_seqno;
222     if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) {
223         start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
224     }
225
226     if (vb->failovers->needsRollback(start_seqno, vb->getHighSeqno(),
227                                      vbucket_uuid, snap_start_seqno,
228                                      snap_end_seqno, vb->getPurgeSeqno(),
229                                      rollback_seqno)) {
230         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed "
231             "because a rollback to seqno %llu is required (start seqno %llu, "
232             "vb_uuid %llu, snapStartSeqno %llu, snapEndSeqno %llu)",
233             logHeader(), vbucket, *rollback_seqno, start_seqno, vbucket_uuid,
234             snap_start_seqno, snap_end_seqno);
235         return ENGINE_ROLLBACK;
236     }
237
238     ENGINE_ERROR_CODE rv = vb->failovers->addFailoverLog(getCookie(), callback);
239     if (rv != ENGINE_SUCCESS) {
240         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Couldn't add failover log to "
241             "stream request due to error %d", logHeader(), vbucket, rv);
242         return rv;
243     }
244
245     if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
246         end_seqno = vb->getHighSeqno();
247     }
248
249     if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
250         end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
251     }
252
253     if (!notifyOnly && start_seqno > end_seqno) {
254         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
255             "the start seqno (%llu) is larger than the end seqno (%llu), "
256             "stream request flags %d, vb_uuid %llu, snapStartSeqno %llu, "
257             "snapEndSeqno %llu; should have rolled back instead",
258             logHeader(), vbucket, start_seqno, end_seqno, flags, vbucket_uuid,
259             snap_start_seqno, snap_end_seqno);
260         return ENGINE_ERANGE;
261     }
262
263     if (!notifyOnly && start_seqno > vb->getHighSeqno()) {
264         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
265             "the start seqno (%llu) is larger than the vb highSeqno (%llu), "
266             "stream request flags is %d, vb_uuid %llu, snapStartSeqno %llu, "
267             "snapEndSeqno %llu; should have rolled back instead",
268             logHeader(), vbucket, start_seqno, vb->getHighSeqno(), flags,
269             vbucket_uuid, snap_start_seqno, snap_end_seqno);
270         return ENGINE_ERANGE;
271     }
272
273     stream_t s;
274     if (notifyOnly) {
275         s = new NotifierStream(&engine_, this, getName(), flags,
276                                opaque, vbucket, notifySeqno,
277                                end_seqno, vbucket_uuid,
278                                snap_start_seqno, snap_end_seqno);
279    } else {
280         s = new ActiveStream(&engine_, this, getName(), flags,
281                              opaque, vbucket, start_seqno,
282                              end_seqno, vbucket_uuid,
283                              snap_start_seqno, snap_end_seqno);
284         static_cast<ActiveStream*>(s.get())->setActive();
285     }
286
287     {
288         WriterLockHolder wlh(streamsMutex);
289         streams[vbucket] = s;
290     }
291
292     ready.pushUnique(vbucket);
293
294     if (add_vb_conn_map) {
295         connection_t conn(this);
296         engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket);
297     }
298
299     return rv;
300 }
301
302 ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket,
303                                               dcp_add_failover_log callback) {
304     (void) opaque;
305     if (doDisconnect()) {
306         return ENGINE_DISCONNECT;
307     }
308
309     RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
310     if (!vb) {
311         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
312             "because this vbucket doesn't exist", logHeader(), vbucket);
313         return ENGINE_NOT_MY_VBUCKET;
314     }
315
316     return vb->failovers->addFailoverLog(getCookie(), callback);
317 }
318
319 ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
320     setLastWalkTime();
321
322     if (doDisconnect()) {
323         return ENGINE_DISCONNECT;
324     }
325
326     ENGINE_ERROR_CODE ret;
327     if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
328         return ret;
329     }
330
331     DcpResponse *resp;
332     if (rejectResp) {
333         resp = rejectResp;
334         rejectResp = NULL;
335     } else {
336         resp = getNextItem();
337         if (!resp) {
338             return ENGINE_SUCCESS;
339         }
340     }
341
342     ret = ENGINE_SUCCESS;
343
344     Item* itmCpy = NULL;
345     if (resp->getEvent() == DCP_MUTATION) {
346         itmCpy = static_cast<MutationResponse*>(resp)->getItemCopy();
347     }
348
349     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
350                                                                      true);
351     switch (resp->getEvent()) {
352         case DCP_STREAM_END:
353         {
354             StreamEndResponse *se = static_cast<StreamEndResponse*>(resp);
355             ret = producers->stream_end(getCookie(), se->getOpaque(),
356                                         se->getVbucket(), se->getFlags());
357             break;
358         }
359         case DCP_MUTATION:
360         {
361             MutationResponse *m = dynamic_cast<MutationResponse*> (resp);
362             ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
363                                       m->getVBucket(), m->getBySeqno(),
364                                       m->getRevSeqno(), 0, NULL, 0,
365                                       m->getItem()->getNRUValue());
366             break;
367         }
368         case DCP_DELETION:
369         {
370             MutationResponse *m = static_cast<MutationResponse*>(resp);
371             ret = producers->deletion(getCookie(), m->getOpaque(),
372                                       m->getItem()->getKey().c_str(),
373                                       m->getItem()->getNKey(),
374                                       m->getItem()->getCas(),
375                                       m->getVBucket(), m->getBySeqno(),
376                                       m->getRevSeqno(), NULL, 0);
377             break;
378         }
379         case DCP_SNAPSHOT_MARKER:
380         {
381             SnapshotMarker *s = static_cast<SnapshotMarker*>(resp);
382             ret = producers->marker(getCookie(), s->getOpaque(),
383                                     s->getVBucket(),
384                                     s->getStartSeqno(),
385                                     s->getEndSeqno(),
386                                     s->getFlags());
387             break;
388         }
389         case DCP_SET_VBUCKET:
390         {
391             SetVBucketState *s = static_cast<SetVBucketState*>(resp);
392             ret = producers->set_vbucket_state(getCookie(), s->getOpaque(),
393                                                s->getVBucket(), s->getState());
394             break;
395         }
396         default:
397         {
398             LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%d), "
399                 "disconnecting", logHeader(), resp->getEvent());
400             ret = ENGINE_DISCONNECT;
401             break;
402         }
403     }
404
405     ObjectRegistry::onSwitchThread(epe);
406     if (resp->getEvent() == DCP_MUTATION && ret != ENGINE_SUCCESS) {
407         delete itmCpy;
408     }
409
410     if (ret == ENGINE_E2BIG) {
411         rejectResp = resp;
412     } else {
413         delete resp;
414     }
415
416     lastSendTime.store(ep_current_time(), memory_order_relaxed);
417     return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
418 }
419
420 ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
421                                                      uint16_t vbucket,
422                                                      uint32_t buffer_bytes) {
423     log.acknowledge(buffer_bytes);
424     return ENGINE_SUCCESS;
425 }
426
427 ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
428                                        uint16_t nkey, const void* value,
429                                        uint32_t nvalue) {
430     const char* param = static_cast<const char*>(key);
431     std::string keyStr(static_cast<const char*>(key), nkey);
432     std::string valueStr(static_cast<const char*>(value), nvalue);
433
434     if (strncmp(param, "connection_buffer_size", nkey) == 0) {
435         uint32_t size;
436         if (parseUint32(valueStr.c_str(), &size)) {
437             /* Size 0 implies the client (DCP consumer) does not support
438                flow control */
439             log.setBufferSize(size);
440             return ENGINE_SUCCESS;
441         }
442     } else if (strncmp(param, "stream_buffer_size", nkey) == 0) {
443         LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter stream_buffer_size is"
444             "not supported by this engine", logHeader());
445         return ENGINE_ENOTSUP;
446     } else if (strncmp(param, "enable_noop", nkey) == 0) {
447         if (valueStr.compare("true") == 0) {
448             noopCtx.enabled = true;
449         } else {
450             noopCtx.enabled = false;
451         }
452         return ENGINE_SUCCESS;
453     } else if (strncmp(param, "set_noop_interval", nkey) == 0) {
454         if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) {
455             return ENGINE_SUCCESS;
456         }
457     }
458
459     LOG(EXTENSION_LOG_WARNING, "%s Invalid ctrl parameter '%s' for %s",
460         logHeader(), valueStr.c_str(), keyStr.c_str());
461
462     return ENGINE_EINVAL;
463 }
464
465 ENGINE_ERROR_CODE DcpProducer::handleResponse(
466                                         protocol_binary_response_header *resp) {
467     if (doDisconnect()) {
468         return ENGINE_DISCONNECT;
469     }
470
471     uint8_t opcode = resp->response.opcode;
472     if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE ||
473         opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
474         protocol_binary_response_dcp_stream_req* pkt =
475             reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
476         uint32_t opaque = pkt->message.header.response.opaque;
477
478         stream_t active_stream;
479         std::map<uint16_t, stream_t>::iterator itr;
480         {
481             ReaderLockHolder rlh(streamsMutex);
482             for (itr = streams.begin() ; itr != streams.end(); ++itr) {
483                 active_stream = itr->second;
484                 Stream *str = active_stream.get();
485                 if (str && str->getType() == STREAM_ACTIVE) {
486                     ActiveStream* as = static_cast<ActiveStream*>(str);
487                     if (as && opaque == itr->second->getOpaque()) {
488                         break;
489                     }
490                 }
491             }
492         }
493
494         if (itr != streams.end()) {
495             ActiveStream *as = static_cast<ActiveStream*>(active_stream.get());
496             if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
497                 as->setVBucketStateAckRecieved();
498             } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
499                 as->snapshotMarkerAckReceived();
500             }
501         }
502
503         return ENGINE_SUCCESS;
504     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION ||
505         opcode == PROTOCOL_BINARY_CMD_DCP_DELETION ||
506         opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION ||
507         opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) {
508         // TODO: When nacking is implemented we need to handle these responses
509         return ENGINE_SUCCESS;
510     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) {
511         if (noopCtx.opaque == resp->response.opaque) {
512             noopCtx.pendingRecv = false;
513             return ENGINE_SUCCESS;
514         }
515     }
516
517     LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
518         "disconnecting", logHeader(), opcode);
519
520     return ENGINE_DISCONNECT;
521 }
522
523 ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
524     if (doDisconnect()) {
525         return ENGINE_DISCONNECT;
526     }
527
528     stream_t stream = findStreamByVbid(vbucket);
529     ENGINE_ERROR_CODE ret;
530     if (!stream) {
531         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
532             "stream exists for this vbucket", logHeader(), vbucket);
533         return ENGINE_KEY_ENOENT;
534     } else if (!stream->isActive()) {
535         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
536             "stream is already marked as dead", logHeader(), vbucket);
537         connection_t conn(this);
538         engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
539         ret = ENGINE_KEY_ENOENT;
540     } else {
541         stream->setDead(END_STREAM_CLOSED);
542         connection_t conn(this);
543         engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
544         ret = ENGINE_SUCCESS;
545     }
546
547     {
548         WriterLockHolder wlh(streamsMutex);
549         streams.erase(vbucket);
550     }
551
552     return ret;
553 }
554
555 void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
556     Producer::addStats(add_stat, c);
557
558     addStat("items_sent", getItemsSent(), add_stat, c);
559     addStat("items_remaining", getItemsRemaining(), add_stat, c);
560     addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
561     addStat("last_sent_time", lastSendTime.load(memory_order_relaxed), add_stat,
562             c);
563     addStat("noop_enabled", noopCtx.enabled, add_stat, c);
564     addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
565
566     log.addStats(add_stat, c);
567
568     ReaderLockHolder rlh(streamsMutex);
569     std::map<uint16_t, stream_t>::iterator itr;
570     for (itr = streams.begin(); itr != streams.end(); ++itr) {
571         itr->second->addStats(add_stat, c);
572     }
573 }
574
575 void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
576                                    uint16_t vbid) {
577     stream_t stream = findStreamByVbid(vbid);
578     if (stream && stream->getType() == STREAM_ACTIVE) {
579         ActiveStream* as = static_cast<ActiveStream*>(stream.get());
580         if (as) {
581             as->addTakeoverStats(add_stat, c);
582         }
583     }
584 }
585
586 void DcpProducer::aggregateQueueStats(ConnCounter* aggregator) {
587     if (!aggregator) {
588         LOG(EXTENSION_LOG_WARNING, "%s Pointer to the queue stats aggregator"
589             " is NULL!!!", logHeader());
590         return;
591     }
592     aggregator->conn_queueDrain += itemsSent;
593     aggregator->conn_totalBytes += totalBytesSent;
594     aggregator->conn_queueRemaining += getItemsRemaining();
595     aggregator->conn_queueBackfillRemaining += totalBackfillBacklogs;
596 }
597
598 void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
599     stream_t stream = findStreamByVbid(vbucket);
600     if (stream && stream->isActive()) {
601         stream->notifySeqnoAvailable(seqno);
602     }
603 }
604
605 void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
606     stream_t stream = findStreamByVbid(vbucket);
607     if (stream) {
608         stream->setDead(END_STREAM_STATE);
609     }
610 }
611
612 void DcpProducer::closeAllStreams() {
613     std::list<uint16_t> vblist;
614     {
615         WriterLockHolder wlh(streamsMutex);
616         while (!streams.empty()) {
617             std::map<uint16_t, stream_t>::iterator itr = streams.begin();
618             uint16_t vbid = itr->first;
619             itr->second->setDead(END_STREAM_DISCONNECTED);
620             streams.erase(vbid);
621             vblist.push_back(vbid);
622         }
623     }
624
625     connection_t conn(this);
626     std::list<uint16_t>::iterator it = vblist.begin();
627     for (; it != vblist.end(); ++it) {
628         engine_.getDcpConnMap().removeVBConnByVBId(conn, *it);
629     }
630 }
631
632 const char* DcpProducer::getType() const {
633     if (notifyOnly) {
634         return "notifier";
635     } else {
636         return "producer";
637     }
638 }
639
640 DcpResponse* DcpProducer::getNextItem() {
641     do {
642         setPaused(false);
643
644         uint16_t vbucket = 0;
645         while (ready.popFront(vbucket)) {
646             if (log.pauseIfFull()) {
647                 ready.pushUnique(vbucket);
648                 return NULL;
649             }
650
651             DcpResponse* op = NULL;
652             stream_t stream;
653             {
654                 ReaderLockHolder rlh(streamsMutex);
655                 std::map<uint16_t, stream_t>::iterator it = streams.find(vbucket);
656                 if (it == streams.end()) {
657                     continue;
658                 }
659                 stream.reset(it->second);
660             }
661
662             op = stream->next();
663
664             if (!op) {
665                 // stream is empty, try another vbucket.
666                 continue;
667             }
668
669             switch (op->getEvent()) {
670                 case DCP_SNAPSHOT_MARKER:
671                 case DCP_MUTATION:
672                 case DCP_DELETION:
673                 case DCP_EXPIRATION:
674                 case DCP_STREAM_END:
675                 case DCP_SET_VBUCKET:
676                     break;
677                 default:
678                     LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
679                         " an unexpected event %d", logHeader(), op->getEvent());
680                     abort();
681             }
682
683             ready.pushUnique(vbucket);
684
685             if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
686                 op->getEvent() == DCP_EXPIRATION) {
687                 itemsSent++;
688             }
689
690             totalBytesSent.fetch_add(op->getMessageSize());
691
692             return op;
693         }
694
695         // flag we are paused
696         setPaused(true);
697
698         // re-check the ready queue.
699         // A new vbucket could of became ready and the notifier could of seen
700         // paused = false, so reloop so we don't miss an operation.
701     } while(!ready.empty());
702
703     return NULL;
704 }
705
706 void DcpProducer::setDisconnect(bool disconnect) {
707     ConnHandler::setDisconnect(disconnect);
708
709     if (disconnect) {
710         ReaderLockHolder rlh(streamsMutex);
711         std::map<uint16_t, stream_t>::iterator itr = streams.begin();
712         for (; itr != streams.end(); ++itr) {
713             itr->second->setDead(END_STREAM_DISCONNECTED);
714         }
715     }
716 }
717
718 void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
719     if (ready.pushUnique(vbucket)) {
720         log.unpauseIfSpaceAvailable();
721     }
722 }
723
724 void DcpProducer::notifyPaused(bool schedule) {
725     engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
726 }
727
728 ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) {
729     if (noopCtx.enabled) {
730         size_t sinceTime = ep_current_time() - noopCtx.sendTime;
731         if (noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
732             LOG(EXTENSION_LOG_WARNING, "%s Disconnected because the connection"
733                 " appears to be dead", logHeader());
734             return ENGINE_DISCONNECT;
735         } else if (!noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
736             ENGINE_ERROR_CODE ret;
737             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
738             ret = producers->noop(getCookie(), ++noopCtx.opaque);
739             ObjectRegistry::onSwitchThread(epe);
740
741             if (ret == ENGINE_SUCCESS) {
742                 ret = ENGINE_WANT_MORE;
743                 noopCtx.pendingRecv = true;
744                 noopCtx.sendTime = ep_current_time();
745                 lastSendTime.store(ep_current_time(), memory_order_relaxed);
746             }
747             return ret;
748         }
749     }
750     return ENGINE_FAILED;
751 }
752
753 bool DcpProducer::isTimeForNoop() {
754     // Not Implemented
755     return false;
756 }
757
758 void DcpProducer::setTimeForNoop() {
759     // Not Implemented
760 }
761
762 void DcpProducer::clearQueues() {
763     WriterLockHolder wlh(streamsMutex);
764     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
765     for (; itr != streams.end(); ++itr) {
766         itr->second->clear();
767     }
768 }
769
770 void DcpProducer::appendQueue(std::list<queued_item> *q) {
771     (void) q;
772     abort(); // Not Implemented
773 }
774
775 size_t DcpProducer::getBackfillQueueSize() {
776     return totalBackfillBacklogs;
777 }
778
779 size_t DcpProducer::getItemsSent() {
780     return itemsSent;
781 }
782
783 size_t DcpProducer::getItemsRemaining() {
784     size_t remainingSize = 0;
785     ReaderLockHolder rlh(streamsMutex);
786     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
787     for (; itr != streams.end(); ++itr) {
788         Stream *s = (itr->second).get();
789
790         if (s->getType() == STREAM_ACTIVE) {
791             ActiveStream *as = static_cast<ActiveStream *>(s);
792             remainingSize += as->getItemsRemaining();
793         }
794     }
795
796     return remainingSize;
797 }
798
799 size_t DcpProducer::getTotalBytes() {
800     return totalBytesSent;
801 }
802
803 stream_t DcpProducer::findStreamByVbid(uint16_t vbid) {
804     ReaderLockHolder rlh(streamsMutex);
805     stream_t stream;
806     std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
807     if (itr != streams.end()) {
808         stream = itr->second;
809     }
810     return stream;
811 }
812
813 std::list<uint16_t> DcpProducer::getVBList() {
814     ReaderLockHolder rlh(streamsMutex);
815     std::list<uint16_t> vblist;
816     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
817     for (; itr != streams.end(); ++itr) {
818         vblist.push_back(itr->first);
819     }
820     return vblist;
821 }
822
823 bool DcpProducer::windowIsFull() {
824     abort(); // Not Implemented
825 }
826
827 void DcpProducer::flush() {
828     abort(); // Not Implemented
829 }
830
831 bool DcpProducer::bufferLogInsert(size_t bytes) {
832     return log.insert(bytes);
833 }
834
835 void DcpProducer::scheduleCheckpointProcessorTask(stream_t s) {
836     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
837         ->schedule(s);
838 }
839
840 void DcpProducer::cancelCheckpointProcessorTask() {
841     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
842         ->clearQueues();
843     ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
844 }