Merge branch 'watson'
[ep-engine.git] / src / dcp / producer.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include <vector>
19 #include <memcached/server_api.h>
20
21 #include "dcp/producer.h"
22
23 #include "backfill.h"
24 #include "common.h"
25 #include "ep_engine.h"
26 #include "failover-table.h"
27 #include "dcp/backfill-manager.h"
28 #include "dcp/dcpconnmap.h"
29 #include "dcp/response.h"
30 #include "dcp/stream.h"
31
32 const std::chrono::seconds DcpProducer::defaultDcpNoopTxInterval(20);
33
34 DcpProducer::BufferLog::State DcpProducer::BufferLog::getState_UNLOCKED() {
35     if (isEnabled_UNLOCKED()) {
36         if (isFull_UNLOCKED()) {
37             return Full;
38         } else {
39             return SpaceAvailable;
40         }
41     }
42     return Disabled;
43 }
44
45 void DcpProducer::BufferLog::setBufferSize(size_t maxBytes) {
46     WriterLockHolder lh(logLock);
47     this->maxBytes = maxBytes;
48     if (maxBytes == 0) {
49         bytesSent = 0;
50         ackedBytes = 0;
51     }
52 }
53
54 bool DcpProducer::BufferLog::insert(size_t bytes) {
55     WriterLockHolder wlh(logLock);
56     bool inserted = false;
57     // If the log is not enabled
58     // or there is space, allow the insert
59     if (!isEnabled_UNLOCKED() || !isFull_UNLOCKED()) {
60         bytesSent += bytes;
61         inserted = true;
62     }
63     return inserted;
64 }
65
66 void DcpProducer::BufferLog::release_UNLOCKED(size_t bytes) {
67     if (bytesSent >= bytes) {
68         bytesSent -= bytes;
69     } else {
70         bytesSent = 0;
71     }
72 }
73
74 bool DcpProducer::BufferLog::pauseIfFull() {
75     ReaderLockHolder rlh(logLock);
76     if (getState_UNLOCKED() == Full) {
77         producer.setPaused(true);
78         return true;
79     }
80     return false;
81 }
82
83 void DcpProducer::BufferLog::unpauseIfSpaceAvailable() {
84     ReaderLockHolder rlh(logLock);
85     if (getState_UNLOCKED() == Full) {
86         LOG(EXTENSION_LOG_NOTICE,
87             "%s Unable to notify paused connection "
88             "because DcpProducer::BufferLog is full; ackedBytes:%lx, "
89             "bytesSent:%lx, maxBytes:%lx",
90             producer.logHeader(),
91             ackedBytes,
92             bytesSent,
93             maxBytes);
94     } else {
95         producer.notifyPaused(true);
96     }
97 }
98
99 void DcpProducer::BufferLog::acknowledge(size_t bytes) {
100     WriterLockHolder wlh(logLock);
101     State state = getState_UNLOCKED();
102     if (state != Disabled) {
103         release_UNLOCKED(bytes);
104         ackedBytes += bytes;
105         if (state == Full) {
106             LOG(EXTENSION_LOG_NOTICE,
107                 "%s Notifying paused connection now that "
108                 "DcpProducer::Bufferlog is no longer full; ackedBytes:%lx, "
109                 "bytesSent:%lx, maxBytes:%lx",
110                 producer.logHeader(),
111                 ackedBytes,
112                 bytesSent,
113                 maxBytes);
114             producer.notifyPaused(true);
115         }
116     }
117 }
118
119 void DcpProducer::BufferLog::addStats(ADD_STAT add_stat, const void *c) {
120     ReaderLockHolder rlh(logLock);
121     if (isEnabled_UNLOCKED()) {
122         producer.addStat("max_buffer_bytes", maxBytes, add_stat, c);
123         producer.addStat("unacked_bytes", bytesSent, add_stat, c);
124         producer.addStat("total_acked_bytes", ackedBytes, add_stat, c);
125         producer.addStat("flow_control", "enabled", add_stat, c);
126     } else {
127         producer.addStat("flow_control", "disabled", add_stat, c);
128     }
129 }
130
131 DcpProducer::DcpProducer(EventuallyPersistentEngine& e,
132                          const void* cookie,
133                          const std::string& name,
134                          bool isNotifier,
135                          bool startTask,
136                          MutationType mutType)
137     : Producer(e, cookie, name),
138       rejectResp(NULL),
139       notifyOnly(isNotifier),
140       lastSendTime(ep_current_time()),
141       log(*this),
142       itemsSent(0),
143       totalBytesSent(0),
144       mutationType(mutType) {
145     setSupportAck(true);
146     setReserved(true);
147     setPaused(true);
148
149     logger.setId(e.getServerApi()->cookie->get_log_info(cookie).first);
150     if (notifyOnly) {
151         setLogHeader("DCP (Notifier) " + getName() + " -");
152     } else {
153         setLogHeader("DCP (Producer) " + getName() + " -");
154     }
155     // Reduce the minimum log level of view engine DCP streams as they are
156     // extremely noisy due to creating new stream, per vbucket,per design doc
157     // every ~10s.
158     if (name.find("eq_dcpq:mapreduce_view") != std::string::npos ||
159         name.find("eq_dcpq:spatial_view") != std::string::npos) {
160         logger.min_log_level = EXTENSION_LOG_WARNING;
161     }
162
163     engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
164     priority.assign("medium");
165
166     // The consumer assigns opaques starting at 0 so lets have the producer
167     //start using opaques at 10M to prevent any opaque conflicts.
168     noopCtx.opaque = 10000000;
169     noopCtx.sendTime = ep_current_time();
170
171     // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the
172     // noop interval to 20 seconds by default, but in post 3.0 releases we set
173     // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
174     // noop interval of the producer when connecting so in an all 3.0.1+ cluster
175     // this value will be overridden. In 3.0 however we do not set the noop
176     // interval so setting this value will make sure we don't disconnect on
177     // accident due to the producer and the consumer having a different noop
178     // interval.
179     noopCtx.dcpNoopTxInterval = defaultDcpNoopTxInterval;
180     noopCtx.dcpIdleTimeout = std::chrono::seconds(
181             engine_.getConfiguration().getDcpIdleTimeout());
182     noopCtx.pendingRecv = false;
183     noopCtx.enabled = false;
184
185     enableExtMetaData = false;
186     enableValueCompression = false;
187
188     // Cursor dropping is disabled for replication connections by default,
189     // but will be enabled through a control message to support backward
190     // compatibility. For all other type of DCP connections, cursor dropping
191     // will be enabled by default.
192     if (name.find("replication") < name.length()) {
193         supportsCursorDropping = false;
194     } else {
195         supportsCursorDropping = true;
196     }
197
198     backfillMgr.reset(new BackfillManager(engine_));
199
200     if (startTask) {
201         createCheckpointProcessorTask();
202         scheduleCheckpointProcessorTask();
203     }
204 }
205
206 DcpProducer::~DcpProducer() {
207     backfillMgr.reset();
208     delete rejectResp;
209
210     if (checkpointCreatorTask) {
211         ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
212     }
213 }
214
215 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
216                                              uint32_t opaque,
217                                              uint16_t vbucket,
218                                              uint64_t start_seqno,
219                                              uint64_t end_seqno,
220                                              uint64_t vbucket_uuid,
221                                              uint64_t snap_start_seqno,
222                                              uint64_t snap_end_seqno,
223                                              uint64_t *rollback_seqno,
224                                              dcp_add_failover_log callback) {
225
226     lastReceiveTime = ep_current_time();
227     if (doDisconnect()) {
228         return ENGINE_DISCONNECT;
229     }
230
231     VBucketPtr vb = engine_.getVBucket(vbucket);
232     if (!vb) {
233         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
234             "this vbucket doesn't exist", logHeader(), vbucket);
235         return ENGINE_NOT_MY_VBUCKET;
236     }
237
238     if ((flags & DCP_ADD_STREAM_ACTIVE_VB_ONLY) &&
239         (vb->getState() != vbucket_state_active)) {
240         LOG(EXTENSION_LOG_NOTICE, "%s (vb %d) Stream request failed because "
241             "the vbucket is in %s state, only active vbuckets were requested",
242             logHeader(), vbucket, vb->toString(vb->getState()));
243         return ENGINE_NOT_MY_VBUCKET;
244     }
245     if (vb->checkpointManager.getOpenCheckpointId() == 0) {
246         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
247             "this vbucket is in backfill state", logHeader(), vbucket);
248         return ENGINE_TMPFAIL;
249     }
250
251     if (!notifyOnly && start_seqno > end_seqno) {
252         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
253             "the start seqno (%" PRIu64 ") is larger than the end seqno "
254             "(%" PRIu64 "); "
255             "Incorrect params passed by the DCP client",
256             logHeader(), vbucket, start_seqno, end_seqno);
257         return ENGINE_ERANGE;
258     }
259
260     if (!notifyOnly && !(snap_start_seqno <= start_seqno &&
261         start_seqno <= snap_end_seqno)) {
262         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
263             "the snap start seqno (%" PRIu64 ") <= start seqno (%" PRIu64 ")"
264             " <= snap end seqno (%" PRIu64 ") is required", logHeader(), vbucket,
265             snap_start_seqno, start_seqno, snap_end_seqno);
266         return ENGINE_ERANGE;
267     }
268
269     bool add_vb_conn_map = true;
270     {
271         // Need to synchronise the search and conditional erase,
272         // therefore use external locking here.
273         std::lock_guard<StreamsMap> guard(streams);
274         auto it = streams.find(vbucket, guard);
275         if (it.second) {
276             auto& stream = it.first;
277             if (stream->isActive()) {
278                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
279                     " because a stream already exists for this vbucket",
280                     logHeader(), vbucket);
281                 return ENGINE_KEY_EEXISTS;
282             } else {
283                 streams.erase(vbucket, guard);
284
285                 // Don't need to add an entry to vbucket-to-conns map
286                 add_vb_conn_map = false;
287             }
288         }
289     }
290
291     // If we are a notify stream then we can't use the start_seqno supplied
292     // since if it is greater than the current high seqno then it will always
293     // trigger a rollback. As a result we should use the current high seqno for
294     // rollback purposes.
295     uint64_t notifySeqno = start_seqno;
296     if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) {
297         start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
298     }
299
300     std::pair<bool, std::string> need_rollback =
301             vb->failovers->needsRollback(start_seqno,
302                                          vb->getHighSeqno(),
303                                          vbucket_uuid,
304                                          snap_start_seqno,
305                                          snap_end_seqno,
306                                          vb->getPurgeSeqno(),
307                                          rollback_seqno);
308
309     if (need_rollback.first) {
310         LOG(EXTENSION_LOG_WARNING,
311             "%s (vb %d) Stream request requires rollback to seqno:%" PRIu64
312             " because %s. Client requested"
313             " seqnos:{%" PRIu64 ",%" PRIu64 "}"
314             " snapshot:{%" PRIu64 ",%" PRIu64 "}"
315             " uuid:%" PRIu64,
316             logHeader(),
317             vbucket,
318             *rollback_seqno,
319             need_rollback.second.c_str(),
320             start_seqno,
321             end_seqno,
322             snap_start_seqno,
323             snap_end_seqno,
324             vbucket_uuid);
325         return ENGINE_ROLLBACK;
326     }
327
328     ENGINE_ERROR_CODE rv = vb->failovers->addFailoverLog(getCookie(), callback);
329     if (rv != ENGINE_SUCCESS) {
330         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Couldn't add failover log to "
331             "stream request due to error %d", logHeader(), vbucket, rv);
332         return rv;
333     }
334
335     if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
336         end_seqno = vb->getHighSeqno();
337     }
338
339     if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
340         end_seqno = engine_.getKVBucket()->getLastPersistedSeqno(vbucket);
341     }
342
343     if (!notifyOnly && start_seqno > end_seqno) {
344         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
345             "the start seqno (%" PRIu64 ") is larger than the end seqno (%"
346             PRIu64 "), stream request flags %d, vb_uuid %" PRIu64
347             ", snapStartSeqno %" PRIu64 ", snapEndSeqno %" PRIu64
348             "; should have rolled back instead",
349             logHeader(), vbucket, start_seqno, end_seqno, flags, vbucket_uuid,
350             snap_start_seqno, snap_end_seqno);
351         return ENGINE_ERANGE;
352     }
353
354     if (!notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno()))
355     {
356         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
357             "the start seqno (%" PRIu64 ") is larger than the vb highSeqno (%"
358             PRId64 "), stream request flags is %d, vb_uuid %" PRIu64
359             ", snapStartSeqno %" PRIu64 ", snapEndSeqno %" PRIu64
360             "; should have rolled back instead",
361             logHeader(), vbucket, start_seqno, vb->getHighSeqno(), flags,
362             vbucket_uuid, snap_start_seqno, snap_end_seqno);
363         return ENGINE_ERANGE;
364     }
365
366     stream_t s;
367     if (notifyOnly) {
368         s = new NotifierStream(&engine_, this, getName(), flags,
369                                opaque, vbucket, notifySeqno,
370                                end_seqno, vbucket_uuid,
371                                snap_start_seqno, snap_end_seqno);
372     } else {
373         s = new ActiveStream(&engine_, this, getName(), flags,
374                              opaque, vbucket, start_seqno,
375                              end_seqno, vbucket_uuid,
376                              snap_start_seqno, snap_end_seqno,
377                              (mutationType == MutationType::KeyOnly));
378     }
379
380     {
381         ReaderLockHolder rlh(vb->getStateLock());
382         if (vb->getState() == vbucket_state_dead) {
383             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
384                     "this vbucket is in dead state", logHeader(), vbucket);
385             return ENGINE_NOT_MY_VBUCKET;
386         }
387
388         if (!notifyOnly) {
389             // MB-19428: Only activate the stream if we are adding it to the
390             // streams map.
391             static_cast<ActiveStream*>(s.get())->setActive();
392         }
393         streams.insert(std::make_pair(vbucket, s));
394     }
395
396     notifyStreamReady(vbucket);
397
398     if (add_vb_conn_map) {
399         connection_t conn(this);
400         engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket);
401     }
402
403     return rv;
404 }
405
406 ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket,
407                                               dcp_add_failover_log callback) {
408     (void) opaque;
409     lastReceiveTime = ep_current_time();
410     if (doDisconnect()) {
411         return ENGINE_DISCONNECT;
412     }
413
414     VBucketPtr vb = engine_.getVBucket(vbucket);
415     if (!vb) {
416         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
417             "because this vbucket doesn't exist", logHeader(), vbucket);
418         return ENGINE_NOT_MY_VBUCKET;
419     }
420
421     return vb->failovers->addFailoverLog(getCookie(), callback);
422 }
423
424 ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
425     setLastWalkTime();
426
427     if (doDisconnect()) {
428         return ENGINE_DISCONNECT;
429     }
430
431     ENGINE_ERROR_CODE ret;
432     if ((ret = maybeDisconnect()) != ENGINE_FAILED) {
433           return ret;
434     }
435
436     if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
437         return ret;
438     }
439
440     DcpResponse *resp;
441     if (rejectResp) {
442         resp = rejectResp;
443         rejectResp = NULL;
444     } else {
445         resp = getNextItem();
446         if (!resp) {
447             return ENGINE_SUCCESS;
448         }
449     }
450
451     Item* itmCpy = nullptr;
452     auto* mutationResponse = dynamic_cast<MutationProducerResponse*>(resp);
453     if (mutationResponse != nullptr) {
454         try {
455             itmCpy = mutationResponse->getItemCopy();
456         } catch (const std::bad_alloc&) {
457             rejectResp = resp;
458             LOG(EXTENSION_LOG_WARNING,
459                 "%s (vb %d) ENOMEM while trying to copy "
460                 "item with seqno %" PRIu64 "before streaming it",
461                 logHeader(),
462                 mutationResponse->getVBucket(),
463                 *mutationResponse->getBySeqno());
464             return ENGINE_ENOMEM;
465         }
466
467         if (enableValueCompression) {
468             /**
469              * If value compression is enabled, the producer will need
470              * to snappy-compress the document before transmitting.
471              * Compression will obviously be done only if the datatype
472              * indicates that the value isn't compressed already.
473              */
474             uint32_t sizeBefore = itmCpy->getNBytes();
475             if (!itmCpy->compressValue(
476                             engine_.getDcpConnMap().getMinCompressionRatio())) {
477                 LOG(EXTENSION_LOG_WARNING,
478                     "%s Failed to snappy compress an uncompressed value!",
479                     logHeader());
480             }
481             uint32_t sizeAfter = itmCpy->getNBytes();
482
483             if (sizeAfter < sizeBefore) {
484                 log.acknowledge(sizeBefore - sizeAfter);
485             }
486         }
487     }
488
489     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
490                                                                      true);
491     switch (resp->getEvent()) {
492         case DcpResponse::Event::StreamEnd:
493         {
494             StreamEndResponse *se = static_cast<StreamEndResponse*>(resp);
495             ret = producers->stream_end(getCookie(), se->getOpaque(),
496                                         se->getVbucket(), se->getFlags());
497             break;
498         }
499         case DcpResponse::Event::Mutation:
500         {
501             if (itmCpy == nullptr) {
502                 throw std::logic_error(
503                     "DcpProducer::step(Mutation): itmCpy must be != nullptr");
504             }
505             std::pair<const char*, uint16_t> meta{nullptr, 0};
506             if (mutationResponse->getExtMetaData()) {
507                 meta = mutationResponse->getExtMetaData()->getExtMeta();
508             }
509             ret = producers->mutation(getCookie(),
510                                       mutationResponse->getOpaque(),
511                                       itmCpy,
512                                       mutationResponse->getVBucket(),
513                                       *mutationResponse->getBySeqno(),
514                                       mutationResponse->getRevSeqno(),
515                                       0 /* lock time */,
516                                       meta.first, meta.second,
517                                       mutationResponse->getItem()->getNRUValue());
518             break;
519         }
520         case DcpResponse::Event::Deletion:
521         {
522             if (itmCpy == nullptr) {
523                 throw std::logic_error(
524                     "DcpProducer::step(Deletion): itmCpy must be != nullptr");
525             }
526             std::pair<const char*, uint16_t> meta{nullptr, 0};
527             if (mutationResponse->getExtMetaData()) {
528                 meta = mutationResponse->getExtMetaData()->getExtMeta();
529             }
530             ret = producers->deletion(getCookie(),
531                                       mutationResponse->getOpaque(),
532                                       itmCpy,
533                                       mutationResponse->getVBucket(),
534                                       *mutationResponse->getBySeqno(),
535                                       mutationResponse->getRevSeqno(),
536                                       meta.first, meta.second);
537             break;
538         }
539         case DcpResponse::Event::SnapshotMarker:
540         {
541             SnapshotMarker *s = static_cast<SnapshotMarker*>(resp);
542             ret = producers->marker(getCookie(), s->getOpaque(),
543                                     s->getVBucket(),
544                                     s->getStartSeqno(),
545                                     s->getEndSeqno(),
546                                     s->getFlags());
547             break;
548         }
549         case DcpResponse::Event::SetVbucket:
550         {
551             SetVBucketState *s = static_cast<SetVBucketState*>(resp);
552             ret = producers->set_vbucket_state(getCookie(), s->getOpaque(),
553                                                s->getVBucket(), s->getState());
554             break;
555         }
556         case DcpResponse::Event::SystemEvent: {
557             SystemEventProducerMessage* s =
558                     static_cast<SystemEventProducerMessage*>(resp);
559             ret = producers->system_event(
560                     getCookie(),
561                     s->getOpaque(),
562                     s->getVBucket(),
563                     uint32_t(s->getSystemEvent()),
564                     *s->getBySeqno(),
565                     {reinterpret_cast<const uint8_t*>(s->getKey().data()),
566                      s->getKey().size()},
567                     s->getEventData());
568             break;
569         }
570         default:
571         {
572             LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%s), "
573                 "disconnecting", logHeader(),
574                 resp->to_string());
575             ret = ENGINE_DISCONNECT;
576             break;
577         }
578     }
579
580     ObjectRegistry::onSwitchThread(epe);
581
582     if (ret == ENGINE_E2BIG) {
583         rejectResp = resp;
584     } else {
585         delete resp;
586     }
587
588     lastSendTime = ep_current_time();
589     return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
590 }
591
592 ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
593                                                      uint16_t vbucket,
594                                                      uint32_t buffer_bytes) {
595     lastReceiveTime = ep_current_time();
596     log.acknowledge(buffer_bytes);
597     return ENGINE_SUCCESS;
598 }
599
600 ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
601                                        uint16_t nkey, const void* value,
602                                        uint32_t nvalue) {
603     lastReceiveTime = ep_current_time();
604     const char* param = static_cast<const char*>(key);
605     std::string keyStr(static_cast<const char*>(key), nkey);
606     std::string valueStr(static_cast<const char*>(value), nvalue);
607
608     if (strncmp(param, "connection_buffer_size", nkey) == 0) {
609         uint32_t size;
610         if (parseUint32(valueStr.c_str(), &size)) {
611             /* Size 0 implies the client (DCP consumer) does not support
612                flow control */
613             log.setBufferSize(size);
614             return ENGINE_SUCCESS;
615         }
616     } else if (strncmp(param, "stream_buffer_size", nkey) == 0) {
617         LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter stream_buffer_size is"
618             "not supported by this engine", logHeader());
619         return ENGINE_ENOTSUP;
620     } else if (strncmp(param, "enable_noop", nkey) == 0) {
621         if (valueStr == "true") {
622             noopCtx.enabled = true;
623         } else {
624             noopCtx.enabled = false;
625         }
626         return ENGINE_SUCCESS;
627     } else if (strncmp(param, "enable_ext_metadata", nkey) == 0) {
628         if (valueStr == "true") {
629             enableExtMetaData = true;
630         } else {
631             enableExtMetaData = false;
632         }
633         return ENGINE_SUCCESS;
634     } else if (strncmp(param, "enable_value_compression", nkey) == 0) {
635         if (valueStr == "true") {
636             enableValueCompression = true;
637         } else {
638             enableValueCompression = false;
639         }
640         return ENGINE_SUCCESS;
641     } else if (strncmp(param, "supports_cursor_dropping", nkey) == 0) {
642         if (valueStr == "true") {
643             supportsCursorDropping = true;
644         } else {
645             supportsCursorDropping = false;
646         }
647         return ENGINE_SUCCESS;
648     } else if (strncmp(param, "set_noop_interval", nkey) == 0) {
649         uint32_t noopInterval;
650         if (parseUint32(valueStr.c_str(), &noopInterval)) {
651             /*
652              * We need to ensure that we only set the noop interval to a value
653              * that is a multiple of the connection manager interval. The reason
654              * is that if there is no DCP traffic we snooze for the connection
655              * manager interval before sending the noop.
656              */
657             if (noopInterval % engine_.getConfiguration().
658                     getConnectionManagerInterval() == 0) {
659                 noopCtx.dcpNoopTxInterval = std::chrono::seconds(noopInterval);
660                 return ENGINE_SUCCESS;
661             } else {
662                 LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter "
663                     "set_noop_interval is being set to %" PRIu32 " seconds."
664                     "This is not a multiple of the connectionManagerInterval "
665                     "of %" PRIu64 " seconds, and so is not supported.",
666                     logHeader(), noopInterval,
667                     uint64_t(engine_.getConfiguration().
668                              getConnectionManagerInterval()));
669                 return ENGINE_EINVAL;
670             }
671         }
672     } else if(strncmp(param, "set_priority", nkey) == 0) {
673         if (valueStr == "high") {
674             engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH);
675             priority.assign("high");
676             return ENGINE_SUCCESS;
677         } else if (valueStr == "medium") {
678             engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
679             priority.assign("medium");
680             return ENGINE_SUCCESS;
681         } else if (valueStr == "low") {
682             engine_.setDCPPriority(getCookie(), CONN_PRIORITY_LOW);
683             priority.assign("low");
684             return ENGINE_SUCCESS;
685         }
686     }
687
688     LOG(EXTENSION_LOG_WARNING, "%s Invalid ctrl parameter '%s' for %s",
689         logHeader(), valueStr.c_str(), keyStr.c_str());
690
691     return ENGINE_EINVAL;
692 }
693
694 bool DcpProducer::handleResponse(protocol_binary_response_header* resp) {
695     lastReceiveTime = ep_current_time();
696     if (doDisconnect()) {
697         return false;
698     }
699
700     uint8_t opcode = resp->response.opcode;
701     if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE ||
702         opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
703         protocol_binary_response_dcp_stream_req* pkt =
704             reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
705         uint32_t opaque = pkt->message.header.response.opaque;
706
707
708         // Search for an active stream with the same opaque as the response.
709         auto itr = streams.find_if(
710             [opaque](const StreamsMap::value_type& s) {
711                 const auto& stream = s.second;
712                 if (stream && stream->isTypeActive()) {
713                     ActiveStream* as = static_cast<ActiveStream*>(stream.get());
714                     return (as && opaque == stream->getOpaque());
715                 } else {
716                     return false;
717                 }
718             }
719         );
720
721         if (itr.second) {
722             ActiveStream *as = static_cast<ActiveStream*>(itr.first.get());
723             if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
724                 as->setVBucketStateAckRecieved();
725             } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
726                 as->snapshotMarkerAckReceived();
727             }
728         }
729
730         return true;
731     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION ||
732         opcode == PROTOCOL_BINARY_CMD_DCP_DELETION ||
733         opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION ||
734         opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) {
735         // TODO: When nacking is implemented we need to handle these responses
736         return true;
737     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) {
738         if (noopCtx.opaque == resp->response.opaque) {
739             noopCtx.pendingRecv = false;
740             return true;
741         }
742     }
743
744     LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
745         "disconnecting", logHeader(), opcode);
746
747     return false;
748 }
749
750 ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
751     lastReceiveTime = ep_current_time();
752     if (doDisconnect()) {
753         return ENGINE_DISCONNECT;
754     }
755
756     auto it = streams.erase(vbucket);
757
758     ENGINE_ERROR_CODE ret;
759     if (!it.second) {
760         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
761             "stream exists for this vbucket", logHeader(), vbucket);
762         return ENGINE_KEY_ENOENT;
763     } else {
764         auto& stream = it.first;
765         if (!stream->isActive()) {
766             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
767                 "stream is already marked as dead", logHeader(), vbucket);
768             connection_t conn(this);
769             engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
770             ret = ENGINE_KEY_ENOENT;
771         } else {
772             stream->setDead(END_STREAM_CLOSED);
773             connection_t conn(this);
774             engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
775             ret = ENGINE_SUCCESS;
776         }
777     }
778
779     return ret;
780 }
781
782 void DcpProducer::notifyBackfillManager() {
783     backfillMgr->wakeUpTask();
784 }
785
786 bool DcpProducer::recordBackfillManagerBytesRead(size_t bytes, bool force) {
787     if (force) {
788         backfillMgr->bytesForceRead(bytes);
789         return true;
790     }
791     return backfillMgr->bytesCheckAndRead(bytes);
792 }
793
794 void DcpProducer::recordBackfillManagerBytesSent(size_t bytes) {
795     backfillMgr->bytesSent(bytes);
796 }
797
798 void DcpProducer::scheduleBackfillManager(VBucket& vb,
799                                           const active_stream_t& s,
800                                           uint64_t start,
801                                           uint64_t end) {
802     backfillMgr->schedule(vb, s, start, end);
803 }
804
805 void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
806     Producer::addStats(add_stat, c);
807
808     addStat("items_sent", getItemsSent(), add_stat, c);
809     addStat("items_remaining", getItemsRemaining(), add_stat, c);
810     addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
811     addStat("last_sent_time", lastSendTime, add_stat, c);
812     addStat("last_receive_time", lastReceiveTime, add_stat, c);
813     addStat("noop_enabled", noopCtx.enabled, add_stat, c);
814     addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
815     addStat("priority", priority.c_str(), add_stat, c);
816     addStat("enable_ext_metadata", enableExtMetaData ? "enabled" : "disabled",
817             add_stat, c);
818     addStat("enable_value_compression",
819             enableValueCompression ? "enabled" : "disabled",
820             add_stat, c);
821     addStat("cursor_dropping",
822             supportsCursorDropping ? "ELIGIBLE" : "NOT_ELIGIBLE",
823             add_stat, c);
824
825     // Possible that the producer has had its streams closed and hence doesn't
826     // have a backfill manager anymore.
827     if (backfillMgr) {
828         backfillMgr->addStats(this, add_stat, c);
829     }
830
831     log.addStats(add_stat, c);
832
833     addStat("num_streams", streams.size(), add_stat, c);
834
835     // Make a copy of all valid streams (under lock), and then call addStats
836     // for each one. (Done in two stages to minmise how long we have the
837     // streams map locked for).
838     std::vector<StreamsMap::mapped_type> valid_streams;
839
840     streams.for_each(
841         [&valid_streams](const StreamsMap::value_type& element) {
842             valid_streams.push_back(element.second);
843         }
844     );
845     for (const auto& stream : valid_streams) {
846         stream->addStats(add_stat, c);
847     }
848 }
849
850 void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
851                                    const VBucket& vb) {
852
853     auto stream = findStream(vb.getId());
854     if (stream) {
855         if (stream->isTypeActive()) {
856             ActiveStream* as = static_cast<ActiveStream*>(stream.get());
857             as->addTakeoverStats(add_stat, c, vb);
858         } else {
859             LOG(EXTENSION_LOG_WARNING, "%s (vb:%" PRIu16 ") "
860                 "DcpProducer::addTakeoverStats Stream type is %s and not the "
861                 "expected Active",
862                 logHeader(), vb.getId(), to_string(stream->getType()).c_str());
863         }
864     } else {
865         LOG(EXTENSION_LOG_NOTICE, "%s (vb:%" PRIu16 ") "
866             "DcpProducer::addTakeoverStats Unable to find stream",
867             logHeader(), vb.getId());
868     }
869 }
870
871 void DcpProducer::aggregateQueueStats(ConnCounter& aggregator) {
872     aggregator.conn_queueDrain += itemsSent;
873     aggregator.conn_totalBytes += totalBytesSent;
874     aggregator.conn_queueRemaining += getItemsRemaining();
875     aggregator.conn_queueBackfillRemaining += totalBackfillBacklogs;
876 }
877
878 void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
879     auto stream = findStream(vbucket);
880     if (stream && stream->isActive()) {
881         stream->notifySeqnoAvailable(seqno);
882     }
883 }
884
885 void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
886     auto stream = findStream(vbucket);
887     if (stream) {
888         LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") State changed to "
889             "%s, closing active stream!",
890             logHeader(), vbucket, VBucket::toString(state));
891         stream->setDead(END_STREAM_STATE);
892     }
893 }
894
895 bool DcpProducer::handleSlowStream(uint16_t vbid,
896                                    const std::string &name) {
897     if (supportsCursorDropping) {
898         auto stream = findStream(vbid);
899         if (stream) {
900             if (stream->getName().compare(name) == 0) {
901                 ActiveStream* as = static_cast<ActiveStream*>(stream.get());
902                 as->handleSlowStream();
903                 return true;
904             }
905         }
906     }
907     return false;
908 }
909
910 void DcpProducer::closeAllStreams() {
911     lastReceiveTime = ep_current_time();
912     std::vector<uint16_t> vbvector;
913     {
914         // Need to synchronise the disconnect and clear, therefore use
915         // external locking here.
916         std::lock_guard<StreamsMap> guard(streams);
917
918         streams.for_each(
919             [&vbvector](StreamsMap::value_type& iter) {
920                 vbvector.push_back(iter.first);
921                 iter.second->setDead(END_STREAM_DISCONNECTED);
922             },
923             guard);
924
925         streams.clear(guard);
926     }
927     connection_t conn(this);
928     for (const auto vbid: vbvector) {
929          engine_.getDcpConnMap().removeVBConnByVBId(conn, vbid);
930     }
931
932     // Destroy the backfillManager. (BackfillManager task also
933     // may hold a weak reference to it while running, but that is
934     // guaranteed to decay and free the BackfillManager once it
935     // completes run().
936     // This will terminate any tasks and delete any backfills
937     // associated with this Producer.  This is necessary as if we
938     // don't, then the RCPtr references which exist between
939     // DcpProducer and ActiveStream result in us leaking DcpProducer
940     // objects (and Couchstore vBucket files, via DCPBackfill task).
941     backfillMgr.reset();
942 }
943
944 const char* DcpProducer::getType() const {
945     if (notifyOnly) {
946         return "notifier";
947     } else {
948         return "producer";
949     }
950 }
951
952 DcpResponse* DcpProducer::getNextItem() {
953     do {
954         setPaused(false);
955
956         uint16_t vbucket = 0;
957         while (ready.popFront(vbucket)) {
958             if (log.pauseIfFull()) {
959                 ready.pushUnique(vbucket);
960                 return NULL;
961             }
962
963             DcpResponse* op = NULL;
964             stream_t stream;
965             {
966                 stream = findStream(vbucket);
967                 if (!stream) {
968                     continue;
969                 }
970             }
971
972             op = stream->next();
973
974             if (!op) {
975                 // stream is empty, try another vbucket.
976                 continue;
977             }
978
979             switch (op->getEvent()) {
980                 case DcpResponse::Event::SnapshotMarker:
981                 case DcpResponse::Event::Mutation:
982                 case DcpResponse::Event::Deletion:
983                 case DcpResponse::Event::Expiration:
984                 case DcpResponse::Event::StreamEnd:
985                 case DcpResponse::Event::SetVbucket:
986                 case DcpResponse::Event::SystemEvent:
987                     break;
988                 default:
989                     throw std::logic_error(
990                             std::string("DcpProducer::getNextItem: "
991                             "Producer (") + logHeader() + ") is attempting to "
992                             "write an unexpected event:" +
993                             op->to_string());
994             }
995
996             ready.pushUnique(vbucket);
997
998             if (op->getEvent() == DcpResponse::Event::Mutation ||
999                 op->getEvent() == DcpResponse::Event::Deletion ||
1000                 op->getEvent() == DcpResponse::Event::Expiration ||
1001                 op->getEvent() == DcpResponse::Event::SystemEvent) {
1002                 itemsSent++;
1003             }
1004
1005             totalBytesSent.fetch_add(op->getMessageSize());
1006
1007             return op;
1008         }
1009
1010         // flag we are paused
1011         setPaused(true);
1012
1013         // re-check the ready queue.
1014         // A new vbucket could of became ready and the notifier could of seen
1015         // paused = false, so reloop so we don't miss an operation.
1016     } while(!ready.empty());
1017
1018     return NULL;
1019 }
1020
1021 void DcpProducer::setDisconnect(bool disconnect) {
1022     ConnHandler::setDisconnect(disconnect);
1023
1024     if (disconnect) {
1025         streams.for_each(
1026             [](StreamsMap::value_type& iter){
1027                 iter.second->setDead(END_STREAM_DISCONNECTED);
1028             }
1029         );
1030     }
1031 }
1032
1033 void DcpProducer::notifyStreamReady(uint16_t vbucket) {
1034     if (ready.pushUnique(vbucket)) {
1035         log.unpauseIfSpaceAvailable();
1036     }
1037 }
1038
1039 void DcpProducer::notifyPaused(bool schedule) {
1040     engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
1041 }
1042
1043 ENGINE_ERROR_CODE DcpProducer::maybeDisconnect() {
1044     std::chrono::seconds elapsedTime(ep_current_time() - lastReceiveTime);
1045     if (noopCtx.enabled && elapsedTime > noopCtx.dcpIdleTimeout) {
1046         LOG(EXTENSION_LOG_NOTICE, "%s Disconnecting because the connection"
1047             " appears to be dead", logHeader());
1048             return ENGINE_DISCONNECT;
1049         }
1050         // Returning ENGINE_FAILED means ignore and continue
1051         // without disconnecting
1052         return ENGINE_FAILED;
1053 }
1054
1055 ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(
1056         struct dcp_message_producers* producers) {
1057     if (!noopCtx.enabled) {
1058         // Returning ENGINE_FAILED means ignore and continue
1059         // without sending a noop
1060         return ENGINE_FAILED;
1061     }
1062     std::chrono::seconds elapsedTime(ep_current_time() - noopCtx.sendTime);
1063
1064     // Check to see if waiting for a noop reply.
1065     // If not try to send a noop to the consumer if the interval has passed
1066     if (!noopCtx.pendingRecv && elapsedTime >= noopCtx.dcpNoopTxInterval) {
1067         EventuallyPersistentEngine *epe = ObjectRegistry::
1068                 onSwitchThread(NULL, true);
1069         ENGINE_ERROR_CODE ret = producers->noop(getCookie(), ++noopCtx.opaque);
1070         ObjectRegistry::onSwitchThread(epe);
1071
1072         if (ret == ENGINE_SUCCESS) {
1073             ret = ENGINE_WANT_MORE;
1074             noopCtx.pendingRecv = true;
1075             noopCtx.sendTime = ep_current_time();
1076             lastSendTime = noopCtx.sendTime;
1077         }
1078       return ret;
1079     }
1080     // We have already sent a noop and are awaiting a receive or
1081     // the time interval has not passed.  In either case continue
1082     // without sending a noop.
1083     return ENGINE_FAILED;
1084 }
1085
1086 bool DcpProducer::isTimeForNoop() {
1087     // Not Implemented
1088     return false;
1089 }
1090
1091 void DcpProducer::setTimeForNoop() {
1092     // Not Implemented
1093 }
1094
1095 void DcpProducer::clearQueues() {
1096     streams.for_each(
1097         [](StreamsMap::value_type& iter) {
1098             iter.second->clear();
1099         }
1100     );
1101 }
1102
1103 size_t DcpProducer::getBackfillQueueSize() {
1104     return totalBackfillBacklogs;
1105 }
1106
1107 size_t DcpProducer::getItemsSent() {
1108     return itemsSent;
1109 }
1110
1111 size_t DcpProducer::getItemsRemaining() {
1112     size_t remainingSize = 0;
1113     streams.for_each(
1114         [&remainingSize](const StreamsMap::value_type& iter) {
1115             if (iter.second->isTypeActive()) {
1116                 ActiveStream *as = static_cast<ActiveStream *>(iter.second.get());
1117                 remainingSize += as->getItemsRemaining();
1118             }
1119         }
1120     );
1121
1122     return remainingSize;
1123 }
1124
1125 size_t DcpProducer::getTotalBytes() {
1126     return totalBytesSent;
1127 }
1128
1129 std::vector<uint16_t> DcpProducer::getVBVector() {
1130     std::vector<uint16_t> vbvector;
1131     streams.for_each(
1132         [&vbvector](StreamsMap::value_type& iter) {
1133         vbvector.push_back(iter.first);
1134     });
1135     return vbvector;
1136 }
1137
1138 bool DcpProducer::bufferLogInsert(size_t bytes) {
1139     return log.insert(bytes);
1140 }
1141
1142 void DcpProducer::createCheckpointProcessorTask() {
1143     checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(engine_);
1144 }
1145
1146 void DcpProducer::scheduleCheckpointProcessorTask() {
1147     ExecutorPool::get()->schedule(checkpointCreatorTask);
1148 }
1149
1150 void DcpProducer::scheduleCheckpointProcessorTask(const stream_t& s) {
1151     if (!checkpointCreatorTask) {
1152         throw std::logic_error(
1153                 "DcpProducer::scheduleCheckpointProcessorTask task is null");
1154     }
1155     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
1156         ->schedule(s);
1157 }
1158
1159 void DcpProducer::clearCheckpointProcessorTaskQueues() {
1160     if (!checkpointCreatorTask) {
1161         throw std::logic_error(
1162                 "DcpProducer::clearCheckpointProcessorTaskQueues task is null");
1163     }
1164     static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
1165         ->clearQueues();
1166 }
1167
1168 SingleThreadedRCPtr<Stream> DcpProducer::findStream(uint16_t vbid) {
1169     auto it = streams.find(vbid);
1170     if (it.second) {
1171         return it.first;
1172     } else {
1173         return SingleThreadedRCPtr<Stream>();
1174     }
1175 }