Merge branch 'watson'
[ep-engine.git] / src / dcp / stream.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include <platform/checked_snprintf.h>
21
22 #include "ep_engine.h"
23 #include "failover-table.h"
24 #include "kvstore.h"
25 #include "statwriter.h"
26 #include "dcp/backfill-manager.h"
27 #include "dcp/backfill.h"
28 #include "dcp/consumer.h"
29 #include "dcp/producer.h"
30 #include "dcp/response.h"
31 #include "dcp/stream.h"
32 #include "replicationthrottle.h"
33
34 #include <memory>
35
36
37 const char* to_string(Stream::Snapshot type) {
38     switch (type) {
39     case Stream::Snapshot::None:
40         return "none";
41     case Stream::Snapshot::Disk:
42         return "disk";
43     case Stream::Snapshot::Memory:
44         return "memory";
45     }
46     throw std::logic_error("to_string(Stream::Snapshot): called with invalid "
47             "Snapshot type:" + std::to_string(int(type)));
48 }
49
50 const std::string to_string(Stream::Type type) {
51     switch (type) {
52     case Stream::Type::Active:
53         return "Active";
54     case Stream::Type::Notifier:
55         return "Notifier";
56     case Stream::Type::Passive:
57         return "Passive";
58     }
59     throw std::logic_error("to_string(Stream::Type): called with invalid "
60             "type:" + std::to_string(int(type)));
61 }
62
63 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
64
65 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
66                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
67                uint64_t vb_uuid, uint64_t snap_start_seqno,
68                uint64_t snap_end_seqno, Type type)
69     : name_(name),
70       flags_(flags),
71       opaque_(opaque),
72       vb_(vb),
73       start_seqno_(start_seqno),
74       end_seqno_(end_seqno),
75       vb_uuid_(vb_uuid),
76       snap_start_seqno_(snap_start_seqno),
77       snap_end_seqno_(snap_end_seqno),
78       state_(StreamState::Pending),
79       type_(type),
80       itemsReady(false),
81       readyQ_non_meta_items(0),
82       readyQueueMemory(0) {
83 }
84
85 Stream::~Stream() {
86     // NB: reusing the "unlocked" method without a lock because we're
87     // destructing and should not take any locks.
88     clear_UNLOCKED();
89 }
90
91 const std::string Stream::to_string(Stream::StreamState st) {
92     switch(st) {
93     case StreamState::Pending:
94         return "pending";
95     case StreamState::Backfilling:
96         return "backfilling";
97     case StreamState::InMemory:
98         return "in-memory";
99     case StreamState::TakeoverSend:
100         return "takeover-send";
101     case StreamState::TakeoverWait:
102         return "takeover-wait";
103     case StreamState::Reading:
104         return "reading";
105     case StreamState::Dead:
106         return "dead";
107     }
108     throw std::invalid_argument(
109         "Stream::to_string(StreamState): " + std::to_string(int(st)));
110 }
111
112 bool Stream::isTypeActive() const {
113     return type_ == Type::Active;
114 }
115
116 bool Stream::isActive() const {
117     return state_.load() != StreamState::Dead;
118 }
119
120 bool Stream::isBackfilling() const {
121     return state_.load() == StreamState::Backfilling;
122 }
123
124 bool Stream::isInMemory() const {
125     return state_.load() == StreamState::InMemory;
126 }
127
128 bool Stream::isPending() const {
129     return state_.load() == StreamState::Pending;
130 }
131
132 bool Stream::isTakeoverSend() const {
133     return state_.load() == StreamState::TakeoverSend;
134 }
135
136 bool Stream::isTakeoverWait() const {
137     return state_.load() == StreamState::TakeoverWait;
138 }
139
140 void Stream::clear_UNLOCKED() {
141     while (!readyQ.empty()) {
142         DcpResponse* resp = readyQ.front();
143         popFromReadyQ();
144         delete resp;
145     }
146 }
147
148 void Stream::pushToReadyQ(DcpResponse* resp)
149 {
150    /* expect streamMutex.ownsLock() == true */
151     if (resp) {
152         readyQ.push(resp);
153         if (!resp->isMetaEvent()) {
154             readyQ_non_meta_items++;
155         }
156         readyQueueMemory.fetch_add(resp->getMessageSize(),
157                                    std::memory_order_relaxed);
158     }
159 }
160
161 void Stream::popFromReadyQ(void)
162 {
163     /* expect streamMutex.ownsLock() == true */
164     if (!readyQ.empty()) {
165         const auto& front = readyQ.front();
166         if (!front->isMetaEvent()) {
167             readyQ_non_meta_items--;
168         }
169         const uint32_t respSize = front->getMessageSize();
170         readyQ.pop();
171
172         /* Decrement the readyQ size */
173         if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
174             readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
175         } else {
176             LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
177                 "underflow, likely wrong stat calculation! curr size: %" PRIu64
178                 "; new size: %d",
179                 name_.c_str(), getVBucket(),
180                 readyQueueMemory.load(std::memory_order_relaxed), respSize);
181             readyQueueMemory.store(0, std::memory_order_relaxed);
182         }
183     }
184 }
185
186 uint64_t Stream::getReadyQueueMemory() {
187     return readyQueueMemory.load(std::memory_order_relaxed);
188 }
189
190 void Stream::addStats(ADD_STAT add_stat, const void *c) {
191     try {
192         const int bsize = 1024;
193         char buffer[bsize];
194         checked_snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(),
195                          vb_);
196         add_casted_stat(buffer, flags_, add_stat, c);
197         checked_snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(),
198                          vb_);
199         add_casted_stat(buffer, opaque_, add_stat, c);
200         checked_snprintf(buffer, bsize, "%s:stream_%d_start_seqno",
201                          name_.c_str(), vb_);
202         add_casted_stat(buffer, start_seqno_, add_stat, c);
203         checked_snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(),
204                          vb_);
205         add_casted_stat(buffer, end_seqno_, add_stat, c);
206         checked_snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(),
207                          vb_);
208         add_casted_stat(buffer, vb_uuid_, add_stat, c);
209         checked_snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno",
210                          name_.c_str(), vb_);
211         add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
212         checked_snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno",
213                          name_.c_str(), vb_);
214         add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
215         checked_snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(),
216                          vb_);
217         add_casted_stat(buffer, to_string(state_.load()), add_stat, c);
218     } catch (std::exception& error) {
219         LOG(EXTENSION_LOG_WARNING,
220             "Stream::addStats: Failed to build stats: %s", error.what());
221     }
222 }
223
224 ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
225                            dcp_producer_t p,
226                            const std::string& n,
227                            uint32_t flags,
228                            uint32_t opaque,
229                            uint16_t vb,
230                            uint64_t st_seqno,
231                            uint64_t en_seqno,
232                            uint64_t vb_uuid,
233                            uint64_t snap_start_seqno,
234                            uint64_t snap_end_seqno,
235                            bool isKeyOnly)
236     : Stream(n,
237              flags,
238              opaque,
239              vb,
240              st_seqno,
241              en_seqno,
242              vb_uuid,
243              snap_start_seqno,
244              snap_end_seqno,
245              Type::Active),
246       isBackfillTaskRunning(false),
247       pendingBackfill(false),
248       lastReadSeqno(st_seqno),
249       backfillRemaining(0),
250       lastReadSeqnoUnSnapshotted(st_seqno),
251       lastSentSeqno(st_seqno),
252       curChkSeqno(st_seqno),
253       takeoverState(vbucket_state_pending),
254       itemsFromMemoryPhase(0),
255       firstMarkerSent(false),
256       waitForSnapshot(0),
257       engine(e),
258       producer(p),
259       lastSentSnapEndSeqno(0),
260       chkptItemsExtractionInProgress(false),
261       keyOnly(isKeyOnly) {
262     const char* type = "";
263     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
264         type = "takeover ";
265         end_seqno_ = dcpMaxSeqno;
266     }
267
268     VBucketPtr vbucket = engine->getVBucket(vb);
269     if (vbucket) {
270         ReaderLockHolder rlh(vbucket->getStateLock());
271         if (vbucket->getState() == vbucket_state_replica) {
272             snapshot_info_t info = vbucket->checkpointManager.getSnapshotInfo();
273             if (info.range.end > en_seqno) {
274                 end_seqno_ = info.range.end;
275             }
276         }
277     }
278
279     producer->getLogger().log(EXTENSION_LOG_NOTICE,
280         "(vb %" PRIu16 ") Creating %sstream with start seqno %" PRIu64
281         " and end seqno %" PRIu64, vb, type, st_seqno, en_seqno);
282
283     backfillItems.memory = 0;
284     backfillItems.disk = 0;
285     backfillItems.sent = 0;
286
287     bufferedBackfill.bytes = 0;
288     bufferedBackfill.items = 0;
289
290     takeoverStart = 0;
291     takeoverSendMaxTime = engine->getConfiguration().getDcpTakeoverMaxTime();
292
293     if (start_seqno_ >= end_seqno_) {
294         /* streamMutex lock needs to be acquired because endStream
295          * potentially makes call to pushToReadyQueue.
296          */
297         LockHolder lh(streamMutex);
298         endStream(END_STREAM_OK);
299         itemsReady.store(true);
300         // lock is released on leaving the scope
301     }
302
303     // Finally obtain a copy of the current separator
304     currentSeparator = vbucket->getManifest().lock().getSeparator();
305 }
306
307 ActiveStream::~ActiveStream() {
308     transitionState(StreamState::Dead);
309 }
310
311 DcpResponse* ActiveStream::next() {
312     std::lock_guard<std::mutex> lh(streamMutex);
313     return next(lh);
314 }
315
316 DcpResponse* ActiveStream::next(std::lock_guard<std::mutex>& lh) {
317     DcpResponse* response = NULL;
318
319     switch (state_.load()) {
320         case StreamState::Pending:
321             break;
322         case StreamState::Backfilling:
323             response = backfillPhase(lh);
324             break;
325         case StreamState::InMemory:
326             response = inMemoryPhase();
327             break;
328         case StreamState::TakeoverSend:
329             response = takeoverSendPhase();
330             break;
331         case StreamState::TakeoverWait:
332             response = takeoverWaitPhase();
333             break;
334         case StreamState::Reading:
335             // Not valid for an active stream.
336             throw std::logic_error("ActiveStream::next: Invalid state "
337                     "StreamReading for stream " +
338                     std::string(producer->logHeader()) + " vb " +
339                     std::to_string(vb_));
340             break;
341         case StreamState::Dead:
342             response = deadPhase();
343             break;
344     }
345
346     itemsReady.store(response ? true : false);
347     return response;
348 }
349
350 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
351     {
352         LockHolder lh(streamMutex);
353         uint64_t chkCursorSeqno = endSeqno;
354
355         if (!isBackfilling()) {
356             producer->getLogger().log(EXTENSION_LOG_WARNING,
357                                       "(vb %" PRIu16 ") ActiveStream::"
358                                       "markDiskSnapshot: Unexpected state_:%s",
359                                       vb_, to_string(state_.load()).c_str());
360             return;
361         }
362
363         startSeqno = std::min(snap_start_seqno_, startSeqno);
364         firstMarkerSent = true;
365
366         VBucketPtr vb = engine->getVBucket(vb_);
367         if (!vb) {
368             producer->getLogger().log(EXTENSION_LOG_WARNING,"(vb %" PRIu16 ") "
369                                       "ActiveStream::markDiskSnapshot, vbucket "
370                                       "does not exist", vb_);
371             return;
372         }
373         // An atomic read of vbucket state without acquiring the
374         // reader lock for state should suffice here.
375         if (vb->getState() == vbucket_state_replica) {
376             if (end_seqno_ > endSeqno) {
377                 /* We possibly have items in the open checkpoint
378                    (incomplete snapshot) */
379                 snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
380                 producer->getLogger().log(EXTENSION_LOG_NOTICE,
381                     "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
382                     "replica vbucket, backfill start seqno %" PRIu64 ", "
383                     "backfill end seqno %" PRIu64 ", "
384                     "snapshot end seqno after merge %" PRIu64,
385                     vb_, startSeqno, endSeqno, info.range.end);
386                 endSeqno = info.range.end;
387             }
388         }
389
390         producer->getLogger().log(EXTENSION_LOG_NOTICE,
391             "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
392             " and end seqno %" PRIu64, vb_, startSeqno, endSeqno);
393         pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
394                                         MARKER_FLAG_DISK));
395         lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
396
397         if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
398             // Only re-register the cursor if we still need to get memory
399             // snapshots
400             try {
401                 CursorRegResult result =
402                         vb->checkpointManager.registerCursorBySeqno(
403                                 name_, chkCursorSeqno,
404                                 MustSendCheckpointEnd::NO);
405
406                 curChkSeqno = result.first;
407             } catch(std::exception& error) {
408                 producer->getLogger().log(EXTENSION_LOG_WARNING,
409                         "(vb %" PRIu16 ") Failed to register cursor: %s",
410                         vb_, error.what());
411                 endStream(END_STREAM_STATE);
412             }
413         }
414     }
415     bool inverse = false;
416     if (itemsReady.compare_exchange_strong(inverse, true)) {
417         producer->notifyStreamReady(vb_);
418     }
419 }
420
421 bool ActiveStream::backfillReceived(std::unique_ptr<Item> itm,
422                                     backfill_source_t backfill_source,
423                                     bool force) {
424     if (!itm) {
425         return false;
426     }
427
428     if (itm->shouldReplicate()) {
429         std::unique_lock<std::mutex> lh(streamMutex);
430         if (isBackfilling()) {
431             queued_item qi(std::move(itm));
432             std::unique_ptr<DcpResponse> resp(makeResponseFromItem(qi));
433             if (!producer->recordBackfillManagerBytesRead(
434                         resp->getApproximateSize(), force)) {
435                 // Deleting resp may also delete itm (which is owned by resp)
436                 resp.reset();
437                 return false;
438             }
439
440             bufferedBackfill.bytes.fetch_add(resp->getApproximateSize());
441             bufferedBackfill.items++;
442             lastReadSeqno.store(uint64_t(*resp->getBySeqno()));
443
444             pushToReadyQ(resp.release());
445
446             lh.unlock();
447             bool inverse = false;
448             if (itemsReady.compare_exchange_strong(inverse, true)) {
449                 producer->notifyStreamReady(vb_);
450             }
451
452             if (backfill_source == BACKFILL_FROM_MEMORY) {
453                 backfillItems.memory++;
454             } else {
455                 backfillItems.disk++;
456             }
457         }
458     }
459
460     return true;
461 }
462
463 void ActiveStream::completeBackfill() {
464     {
465         LockHolder lh(streamMutex);
466         if (isBackfilling()) {
467             producer->getLogger().log(EXTENSION_LOG_NOTICE,
468                     "(vb %" PRIu16 ") Backfill complete, %" PRIu64 " items "
469                     "read from disk, %" PRIu64 " from memory, last seqno read: "
470                     "%" PRIu64 ", pendingBackfill : %s",
471                     vb_, uint64_t(backfillItems.disk.load()),
472                     uint64_t(backfillItems.memory.load()),
473                     lastReadSeqno.load(),
474                     pendingBackfill ? "True" : "False");
475         } else {
476             producer->getLogger().log(EXTENSION_LOG_WARNING,
477                     "(vb %" PRIu16 ") ActiveStream::completeBackfill: "
478                     "Unexpected state_:%s",
479                     vb_, to_string(state_.load()).c_str());
480         }
481     }
482
483     bool inverse = true;
484     isBackfillTaskRunning.compare_exchange_strong(inverse, false);
485     inverse = false;
486     if (itemsReady.compare_exchange_strong(inverse, true)) {
487         producer->notifyStreamReady(vb_);
488     }
489 }
490
491 void ActiveStream::snapshotMarkerAckReceived() {
492     bool inverse = false;
493     if (--waitForSnapshot == 0 &&
494         itemsReady.compare_exchange_strong(inverse, true)) {
495         producer->notifyStreamReady(vb_);
496     }
497 }
498
499 void ActiveStream::setVBucketStateAckRecieved() {
500     std::unique_lock<std::mutex> lh(streamMutex);
501     if (isTakeoverWait()) {
502         if (takeoverState == vbucket_state_pending) {
503             producer->getLogger().log(EXTENSION_LOG_INFO,
504                 "(vb %" PRIu16 ") Receive ack for set vbucket state to pending "
505                 "message", vb_);
506
507             takeoverState = vbucket_state_active;
508             transitionState(StreamState::TakeoverSend);
509             lh.unlock();
510
511             engine->getKVBucket()->setVBucketState(vb_, vbucket_state_dead,
512                                                    false, false);
513             VBucketPtr vbucket = engine->getVBucket(vb_);
514             producer->getLogger().log(EXTENSION_LOG_NOTICE,
515                 "(vb %" PRIu16 ") Vbucket marked as dead, last sent seqno: %"
516                 PRIu64 ", high seqno: %" PRIu64,
517                 vb_, lastSentSeqno.load(), vbucket->getHighSeqno());
518         } else {
519             producer->getLogger().log(EXTENSION_LOG_INFO,
520                 "(vb %" PRIu16 ") Receive ack for set vbucket state to active "
521                 "message", vb_);
522             endStream(END_STREAM_OK);
523             lh.unlock();
524         }
525
526         bool inverse = false;
527         if (itemsReady.compare_exchange_strong(inverse, true)) {
528             producer->notifyStreamReady(vb_);
529         }
530     } else {
531         producer->getLogger().log(EXTENSION_LOG_WARNING,
532             "(vb %" PRIu16 ") Unexpected ack for set vbucket op on stream '%s' "
533             "state '%s'", vb_, name_.c_str(), to_string(state_.load()).c_str());
534     }
535
536 }
537
538 DcpResponse* ActiveStream::backfillPhase(std::lock_guard<std::mutex>& lh) {
539     DcpResponse* resp = nextQueuedItem();
540
541     if (resp) {
542         producer->recordBackfillManagerBytesSent(resp->getApproximateSize());
543         bufferedBackfill.bytes.fetch_sub(resp->getApproximateSize());
544         bufferedBackfill.items--;
545
546         // Only DcpResponse objects representing items from "disk" have a size
547         // so only update backfillRemaining when non-zero
548         if (resp->getApproximateSize()) {
549             if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
550                 backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
551             }
552         }
553     }
554
555     if (!isBackfillTaskRunning && readyQ.empty()) {
556         // Given readyQ.empty() is True resp will be NULL
557         backfillRemaining.store(0, std::memory_order_relaxed);
558         // The previous backfill has completed.  Check to see if another
559         // backfill needs to be scheduled.
560         if (pendingBackfill) {
561             scheduleBackfill_UNLOCKED(true);
562             pendingBackfill = false;
563         } else {
564             if (lastReadSeqno.load() >= end_seqno_) {
565                 endStream(END_STREAM_OK);
566             } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
567                 transitionState(StreamState::TakeoverSend);
568             } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
569                 endStream(END_STREAM_OK);
570             } else {
571                 transitionState(StreamState::InMemory);
572             }
573
574             if (!resp) {
575                 resp = nextQueuedItem();
576             }
577         }
578     }
579
580     return resp;
581 }
582
583 DcpResponse* ActiveStream::inMemoryPhase() {
584     if (lastSentSeqno.load() >= end_seqno_) {
585         endStream(END_STREAM_OK);
586     } else if (readyQ.empty()) {
587         if (pendingBackfill) {
588             // Moving the state from InMemory to Backfilling will result in a
589             // backfill being scheduled
590             transitionState(StreamState::Backfilling);
591             pendingBackfill = false;
592             return NULL;
593         } else if (nextCheckpointItem()) {
594             return NULL;
595         }
596     }
597     return nextQueuedItem();
598 }
599
600 DcpResponse* ActiveStream::takeoverSendPhase() {
601
602     VBucketPtr vb = engine->getVBucket(vb_);
603     if (vb && takeoverStart != 0 &&
604         !vb->isTakeoverBackedUp() &&
605         (ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
606         vb->setTakeoverBackedUpState(true);
607     }
608
609     if (!readyQ.empty()) {
610         return nextQueuedItem();
611     } else {
612         if (nextCheckpointItem()) {
613             return NULL;
614         }
615     }
616
617     if (waitForSnapshot != 0) {
618         return NULL;
619     }
620
621     if (vb) {
622         vb->setTakeoverBackedUpState(false);
623         takeoverStart = 0;
624     }
625
626     DcpResponse* resp = NULL;
627     if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
628         resp = new SetVBucketState(opaque_, vb_, takeoverState);
629         transitionState(StreamState::TakeoverWait);
630     }
631     return resp;
632 }
633
634 DcpResponse* ActiveStream::takeoverWaitPhase() {
635     return nextQueuedItem();
636 }
637
638 DcpResponse* ActiveStream::deadPhase() {
639     DcpResponse* resp = nextQueuedItem();
640     if (!resp) {
641         producer->getLogger().log(EXTENSION_LOG_NOTICE,
642                                   "(vb %" PRIu16 ") Stream closed, "
643                                   "%" PRIu64 " items sent from backfill phase, "
644                                   "%" PRIu64 " items sent from memory phase, "
645                                   "%" PRIu64 " was last seqno sent",
646                                   vb_,
647                                   uint64_t(backfillItems.sent.load()),
648                                   uint64_t(itemsFromMemoryPhase.load()),
649                                   lastSentSeqno.load());
650     }
651     return resp;
652 }
653
654 bool ActiveStream::isCompressionEnabled() {
655     return producer->isValueCompressionEnabled();
656 }
657
658 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
659     Stream::addStats(add_stat, c);
660
661     try {
662         const int bsize = 1024;
663         char buffer[bsize];
664         checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
665                          name_.c_str(), vb_);
666         add_casted_stat(buffer, backfillItems.disk, add_stat, c);
667         checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
668                          name_.c_str(), vb_);
669         add_casted_stat(buffer, backfillItems.memory, add_stat, c);
670         checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_sent",
671                          name_.c_str(), vb_);
672         add_casted_stat(buffer, backfillItems.sent, add_stat, c);
673         checked_snprintf(buffer, bsize, "%s:stream_%d_memory_phase",
674                          name_.c_str(), vb_);
675         add_casted_stat(buffer, itemsFromMemoryPhase.load(), add_stat, c);
676         checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno",
677                          name_.c_str(), vb_);
678         add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
679         checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
680                          name_.c_str(), vb_);
681         add_casted_stat(buffer,
682                         lastSentSnapEndSeqno.load(std::memory_order_relaxed),
683                         add_stat, c);
684         checked_snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno",
685                          name_.c_str(), vb_);
686         add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
687         checked_snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory",
688                          name_.c_str(), vb_);
689         add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
690         checked_snprintf(buffer, bsize, "%s:stream_%d_items_ready",
691                          name_.c_str(), vb_);
692         add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat,
693                         c);
694         checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes",
695                          name_.c_str(), vb_);
696         add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
697         checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items",
698                          name_.c_str(), vb_);
699         add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
700
701         if (isTakeoverSend() && takeoverStart != 0) {
702             checked_snprintf(buffer, bsize, "%s:stream_%d_takeover_since",
703                              name_.c_str(), vb_);
704             add_casted_stat(buffer, ep_current_time() - takeoverStart, add_stat,
705                             c);
706         }
707     } catch (std::exception& error) {
708         LOG(EXTENSION_LOG_WARNING,
709             "ActiveStream::addStats: Failed to build stats: %s", error.what());
710     }
711 }
712
713 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie,
714                                     const VBucket& vb) {
715     LockHolder lh(streamMutex);
716
717     add_casted_stat("name", name_, add_stat, cookie);
718     if (!isActive()) {
719         /**
720          *  There is not a legitimate case where the stream is already dead.
721          *  ns-server cleans-up old streams towards the end of a vbucket move.
722          *
723          *  But just in case it does happen: log the event and return status of
724          *  does_not_exist to ensure rebalance does not hang.
725          */
726         producer->getLogger().log(EXTENSION_LOG_WARNING,
727                                   "(vb %" PRIu16 ") "
728                                   "ActiveStream::addTakeoverStats: Stream has "
729                                   "status StreamDead", vb_);
730         add_casted_stat("status", "does_not_exist", add_stat, cookie);
731         add_casted_stat("estimate", 0, add_stat, cookie);
732         add_casted_stat("backfillRemaining", 0, add_stat, cookie);
733         return;
734     }
735
736     size_t total = backfillRemaining.load(std::memory_order_relaxed);
737     if (isBackfilling()) {
738         add_casted_stat("status", "backfilling", add_stat, cookie);
739     } else {
740         add_casted_stat("status", "in-memory", add_stat, cookie);
741     }
742     add_casted_stat("backfillRemaining",
743                     backfillRemaining.load(std::memory_order_relaxed),
744                     add_stat, cookie);
745
746     size_t vb_items = vb.getNumItems();
747     size_t chk_items = vb_items > 0 ?
748                 vb.checkpointManager.getNumItemsForCursor(name_) : 0;
749
750     size_t del_items = 0;
751     try {
752         del_items = engine->getKVBucket()->getNumPersistedDeletes(vb_);
753     } catch (std::runtime_error& e) {
754         producer->getLogger().log(EXTENSION_LOG_WARNING,
755             "ActiveStream:addTakeoverStats: exception while getting num persisted "
756             "deletes for vbucket:%" PRIu16 " - treating as 0 deletes. "
757             "Details: %s", vb_, e.what());
758     }
759
760     if (end_seqno_ < curChkSeqno) {
761         chk_items = 0;
762     } else if ((end_seqno_ - curChkSeqno) < chk_items) {
763         chk_items = end_seqno_ - curChkSeqno + 1;
764     }
765     total += chk_items;
766
767     add_casted_stat("estimate", total, add_stat, cookie);
768     add_casted_stat("chk_items", chk_items, add_stat, cookie);
769     add_casted_stat("vb_items", vb_items, add_stat, cookie);
770     add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
771 }
772
773 DcpResponse* ActiveStream::nextQueuedItem() {
774     if (!readyQ.empty()) {
775         DcpResponse* response = readyQ.front();
776         if (producer->bufferLogInsert(response->getMessageSize())) {
777             auto seqno = response->getBySeqno();
778             if (seqno) {
779                 lastSentSeqno.store(*seqno);
780
781                 if (isBackfilling()) {
782                     backfillItems.sent++;
783                 } else {
784                     itemsFromMemoryPhase++;
785                 }
786             }
787
788             // See if the currentSeparator needs changing
789             maybeChangeSeparator(response);
790             popFromReadyQ();
791             return response;
792         }
793     }
794     return NULL;
795 }
796
797 bool ActiveStream::nextCheckpointItem() {
798     VBucketPtr vbucket = engine->getVBucket(vb_);
799     if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
800         // schedule this stream to build the next checkpoint
801         producer->scheduleCheckpointProcessorTask(this);
802         return true;
803     } else if (chkptItemsExtractionInProgress) {
804         return true;
805     }
806     return false;
807 }
808
809 bool ActiveStreamCheckpointProcessorTask::run() {
810     if (engine->getEpStats().isShutdown) {
811         return false;
812     }
813
814     // Setup that we will sleep forever when done.
815     snooze(INT_MAX);
816
817     // Clear the notfification flag
818     notified.store(false);
819
820     size_t iterations = 0;
821     do {
822         stream_t nextStream = queuePop();
823         ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
824
825         if (stream) {
826             stream->nextCheckpointItemTask();
827         } else {
828             break;
829         }
830         iterations++;
831     } while(!queueEmpty()
832             && iterations < iterationsBeforeYield);
833
834     // Now check if we were re-notified or there are still checkpoints
835     bool expected = true;
836     if (notified.compare_exchange_strong(expected, false)
837         || !queueEmpty()) {
838         // snooze for 0, essentially yielding and allowing other tasks a go
839         snooze(0.0);
840     }
841
842     return true;
843 }
844
845 void ActiveStreamCheckpointProcessorTask::wakeup() {
846     ExecutorPool::get()->wake(getId());
847 }
848
849 void ActiveStreamCheckpointProcessorTask::schedule(const stream_t& stream) {
850     pushUnique(stream);
851
852     bool expected = false;
853     if (notified.compare_exchange_strong(expected, true)) {
854         wakeup();
855     }
856 }
857
858 void ActiveStreamCheckpointProcessorTask::clearQueues() {
859     LockHolder lh(workQueueLock);
860     while (!queue.empty()) {
861         queue.pop();
862     }
863     queuedVbuckets.clear();
864 }
865
866 void ActiveStream::nextCheckpointItemTask() {
867     VBucketPtr vbucket = engine->getVBucket(vb_);
868     if (vbucket) {
869         std::vector<queued_item> items;
870         getOutstandingItems(vbucket, items);
871         processItems(items);
872     } else {
873         /* The entity deleting the vbucket must set stream to dead,
874            calling setDead(END_STREAM_STATE) will cause deadlock because
875            it will try to grab streamMutex which is already acquired at this
876            point here */
877         return;
878     }
879 }
880
881 void ActiveStream::getOutstandingItems(VBucketPtr &vb,
882                                        std::vector<queued_item> &items) {
883     // Commencing item processing - set guard flag.
884     chkptItemsExtractionInProgress.store(true);
885
886     hrtime_t _begin_ = gethrtime();
887     vb->checkpointManager.getAllItemsForCursor(name_, items);
888     engine->getEpStats().dcpCursorsGetItemsHisto.add(
889                                             (gethrtime() - _begin_) / 1000);
890
891     if (vb->checkpointManager.getNumCheckpoints() > 1) {
892         engine->getKVBucket()->wakeUpCheckpointRemover();
893     }
894 }
895
896 std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
897         queued_item& item) {
898     if (item->getOperation() != queue_op::system_event) {
899         auto cKey = Collections::DocKey::make(item->getKey(), currentSeparator);
900         return std::make_unique<MutationProducerResponse>(
901                 item, opaque_, isKeyOnly(), cKey.getCollectionLen());
902     } else {
903         return SystemEventProducerMessage::make(opaque_, item);
904     }
905 }
906
907 void ActiveStream::processItems(std::vector<queued_item>& items) {
908     if (!items.empty()) {
909         bool mark = false;
910         if (items.front()->getOperation() == queue_op::checkpoint_start) {
911             mark = true;
912         }
913
914         std::deque<DcpResponse*> mutations;
915         std::vector<queued_item>::iterator itr = items.begin();
916         for (; itr != items.end(); ++itr) {
917             queued_item& qi = *itr;
918
919             if (SystemEventReplicate::process(*qi) == ProcessStatus::Continue) {
920                 curChkSeqno = qi->getBySeqno();
921                 lastReadSeqnoUnSnapshotted = qi->getBySeqno();
922                 mutations.push_back(makeResponseFromItem(qi).release());
923             } else if (qi->getOperation() == queue_op::checkpoint_start) {
924                 /* if there are already other mutations, then they belong to the
925                    previous checkpoint and hence we must create a snapshot and
926                    put them onto readyQ */
927                 if (!mutations.empty()) {
928                     snapshot(mutations, mark);
929                     /* clear out all the mutations since they are already put
930                        onto the readyQ */
931                     mutations.clear();
932                 }
933                 /* mark true as it indicates a new checkpoint snapshot */
934                 mark = true;
935             }
936         }
937
938         if (mutations.empty()) {
939             // If we only got checkpoint start or ends check to see if there are
940             // any more snapshots before pausing the stream.
941             nextCheckpointItemTask();
942         } else {
943             snapshot(mutations, mark);
944         }
945     }
946
947     // Completed item processing - clear guard flag and notify producer.
948     chkptItemsExtractionInProgress.store(false);
949     producer->notifyStreamReady(vb_);
950 }
951
952 void ActiveStream::snapshot(std::deque<DcpResponse*>& items, bool mark) {
953     if (items.empty()) {
954         return;
955     }
956
957     LockHolder lh(streamMutex);
958
959     if (!isActive() || isBackfilling()) {
960         // If stream was closed forcefully by the time the checkpoint items
961         // retriever task completed, or if we decided to switch the stream to
962         // backfill state from in-memory state, none of the acquired mutations
963         // should be added on the stream's readyQ. We must drop items in case
964         // we switch state from in-memory to backfill because we schedule
965         // backfill from lastReadSeqno + 1
966         for (auto& item : items) {
967             delete item;
968         }
969         items.clear();
970         return;
971     }
972
973     /* This assumes that all items in the "items deque" is put onto readyQ */
974     lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
975
976     if (isCurrentSnapshotCompleted()) {
977         uint32_t flags = MARKER_FLAG_MEMORY;
978
979         // Get OptionalSeqnos which for the items list types should have values
980         auto seqnoStart = items.front()->getBySeqno();
981         auto seqnoEnd = items.back()->getBySeqno();
982         if (!seqnoStart || !seqnoEnd) {
983             throw std::logic_error(
984                     "ActiveStream::snapshot incorrect DcpEvent, missing a "
985                     "seqno " +
986                     std::string(items.front()->to_string()) + " " +
987                     std::string(items.back()->to_string()));
988         }
989
990         uint64_t snapStart = *seqnoStart;
991         uint64_t snapEnd = *seqnoEnd;
992
993         if (mark) {
994             flags |= MARKER_FLAG_CHK;
995         }
996
997         if (isTakeoverSend()) {
998             waitForSnapshot++;
999             flags |= MARKER_FLAG_ACK;
1000         }
1001
1002         if (!firstMarkerSent) {
1003             snapStart = std::min(snap_start_seqno_, snapStart);
1004             firstMarkerSent = true;
1005         }
1006         pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
1007                                         flags));
1008         lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
1009     }
1010
1011     for (const auto& item : items) {
1012         pushToReadyQ(item);
1013     }
1014 }
1015
1016 uint32_t ActiveStream::setDead(end_stream_status_t status) {
1017     {
1018         LockHolder lh(streamMutex);
1019         endStream(status);
1020     }
1021
1022     bool inverse = false;
1023     if (status != END_STREAM_DISCONNECTED &&
1024         itemsReady.compare_exchange_strong(inverse, true)) {
1025         producer->notifyStreamReady(vb_);
1026     }
1027     return 0;
1028 }
1029
1030 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
1031     if (isActive()) {
1032         bool inverse = false;
1033         if (itemsReady.compare_exchange_strong(inverse, true)) {
1034             producer->notifyStreamReady(vb_);
1035         }
1036     }
1037 }
1038
1039 void ActiveStream::endStream(end_stream_status_t reason) {
1040     if (isActive()) {
1041         pendingBackfill = false;
1042         if (isBackfilling()) {
1043             // If Stream were in Backfilling state, clear out the
1044             // backfilled items to clear up the backfill buffer.
1045             clear_UNLOCKED();
1046             producer->recordBackfillManagerBytesSent(bufferedBackfill.bytes);
1047             bufferedBackfill.bytes = 0;
1048             bufferedBackfill.items = 0;
1049         }
1050         transitionState(StreamState::Dead);
1051         if (reason != END_STREAM_DISCONNECTED) {
1052             pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
1053         }
1054         producer->getLogger().log(EXTENSION_LOG_NOTICE,
1055                                   "(vb %" PRIu16 ") Stream closing, "
1056                                   "sent until seqno %" PRIu64 " "
1057                                   "remaining items %" PRIu64 ", "
1058                                   "reason: %s",
1059                                   vb_,
1060                                   lastSentSeqno.load(),
1061                                   uint64_t(readyQ_non_meta_items.load()),
1062                                   getEndStreamStatusStr(reason));
1063     }
1064 }
1065
1066 void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
1067     if (isBackfillTaskRunning) {
1068         producer->getLogger().log(EXTENSION_LOG_NOTICE,
1069                                   "(vb %" PRIu16 ") Skipping "
1070                                   "scheduleBackfill_UNLOCKED; "
1071                                   "lastReadSeqno %" PRIu64 ", reschedule flag "
1072                                   ": %s", vb_, lastReadSeqno.load(),
1073                                   reschedule ? "True" : "False");
1074         return;
1075     }
1076
1077     VBucketPtr vbucket = engine->getVBucket(vb_);
1078     if (!vbucket) {
1079         producer->getLogger().log(EXTENSION_LOG_WARNING,
1080                                   "(vb %" PRIu16 ") Failed to schedule "
1081                                   "backfill as unable to get vbucket; "
1082                                   "lastReadSeqno : %" PRIu64 ", "
1083                                   "reschedule : %s",
1084                                   vb_, lastReadSeqno.load(),
1085                                   reschedule ? "True" : "False");
1086         return;
1087     }
1088
1089     uint64_t backfillStart = lastReadSeqno.load() + 1;
1090     uint64_t backfillEnd = 0;
1091     bool tryBackfill = false;
1092
1093     if ((flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) || reschedule) {
1094         uint64_t vbHighSeqno = static_cast<uint64_t>(vbucket->getHighSeqno());
1095         if (lastReadSeqno.load() > vbHighSeqno) {
1096             throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1097                                    "lastReadSeqno (which is " +
1098                                    std::to_string(lastReadSeqno.load()) +
1099                                    " ) is greater than vbHighSeqno (which is " +
1100                                    std::to_string(vbHighSeqno) + " ). " +
1101                                    "for stream " + producer->logHeader() +
1102                                    "; vb " + std::to_string(vb_));
1103         }
1104         if (reschedule) {
1105             /* We need to do this for reschedule because in case of
1106                DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
1107                set to last persisted seqno befor calling
1108                scheduleBackfill_UNLOCKED() */
1109             backfillEnd = engine->getKVBucket()->getLastPersistedSeqno(vb_);
1110         } else {
1111             backfillEnd = end_seqno_;
1112         }
1113         tryBackfill = true;
1114     } else {
1115         try {
1116             std::tie(curChkSeqno, tryBackfill) =
1117                     vbucket->checkpointManager.registerCursorBySeqno(
1118                             name_, lastReadSeqno.load(),
1119                             MustSendCheckpointEnd::NO);
1120         } catch(std::exception& error) {
1121             producer->getLogger().log(EXTENSION_LOG_WARNING,
1122                                       "(vb %" PRIu16 ") Failed to register "
1123                                       "cursor: %s", vb_, error.what());
1124             endStream(END_STREAM_STATE);
1125         }
1126
1127         if (lastReadSeqno.load() > curChkSeqno) {
1128             throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1129                                    "lastReadSeqno (which is " +
1130                                    std::to_string(lastReadSeqno.load()) +
1131                                    " ) is greater than curChkSeqno (which is " +
1132                                    std::to_string(curChkSeqno) + " ). " +
1133                                    "for stream " + producer->logHeader() +
1134                                    "; vb " + std::to_string(vb_));
1135         }
1136
1137         /* We need to find the minimum seqno that needs to be backfilled in
1138          * order to make sure that we don't miss anything when transitioning
1139          * to a memory snapshot. The backfill task will always make sure that
1140          * the backfill end seqno is contained in the backfill.
1141          */
1142         if (backfillStart < curChkSeqno) {
1143             if (curChkSeqno > end_seqno_) {
1144                 /* Backfill only is enough */
1145                 backfillEnd = end_seqno_;
1146             } else {
1147                 /* Backfill + in-memory streaming */
1148                 backfillEnd = curChkSeqno - 1;
1149             }
1150         }
1151     }
1152
1153     if (backfillStart <= backfillEnd && tryBackfill) {
1154         producer->getLogger().log(EXTENSION_LOG_NOTICE,
1155                                   "(vb %" PRIu16 ") Scheduling backfill "
1156                                   "from %" PRIu64 " to %" PRIu64 ", reschedule "
1157                                   "flag : %s", vb_, backfillStart, backfillEnd,
1158                                   reschedule ? "True" : "False");
1159         producer->scheduleBackfillManager(
1160                 *vbucket, this, backfillStart, backfillEnd);
1161         isBackfillTaskRunning.store(true);
1162     } else {
1163         if (reschedule) {
1164             // Infrequent code path, see comment below.
1165             producer->getLogger().log(EXTENSION_LOG_NOTICE,
1166                                       "(vb %" PRIu16 ") Did not schedule "
1167                                       "backfill with reschedule : True, "
1168                                       "tryBackfill : True; "
1169                                       "backfillStart : %" PRIu64 ", "
1170                                       "backfillEnd : %" PRIu64 ", "
1171                                       "flags_ : %" PRIu32 ", "
1172                                       "start_seqno_ : %" PRIu64 ", "
1173                                       "end_seqno_ : %" PRIu64 ", "
1174                                       "lastReadSeqno : %" PRIu64 ", "
1175                                       "lastSentSeqno : %" PRIu64 ", "
1176                                       "curChkSeqno : %" PRIu64 ", "
1177                                       "itemsReady : %s",
1178                                       vb_, backfillStart, backfillEnd, flags_,
1179                                       start_seqno_, end_seqno_,
1180                                       lastReadSeqno.load(),
1181                                       lastSentSeqno.load(), curChkSeqno.load(),
1182                                       itemsReady ? "True" : "False");
1183
1184             /* Cursor was dropped, but we will not do backfill.
1185              * This may happen in a corner case where, the memory usage is high
1186              * due to other vbuckets and persistence cursor moves ahead of
1187              * replication cursor to new checkpoint open but does not persist
1188              * items yet.
1189              *
1190              * Because we dropped the cursor but did not do a backfill (and
1191              * therefore did not re-register a cursor in markDiskSnapshot) we
1192              * must re-register the cursor here.
1193              */
1194             try {
1195                 CursorRegResult result =
1196                             vbucket->checkpointManager.registerCursorBySeqno(
1197                             name_, lastReadSeqno.load(),
1198                             MustSendCheckpointEnd::NO);
1199
1200                     curChkSeqno = result.first;
1201             } catch (std::exception& error) {
1202                 producer->getLogger().log(EXTENSION_LOG_WARNING,
1203                                           "(vb %" PRIu16 ") Failed to register "
1204                                           "cursor: %s", vb_, error.what());
1205                 endStream(END_STREAM_STATE);
1206             }
1207         }
1208         if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
1209             endStream(END_STREAM_OK);
1210         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
1211             transitionState(StreamState::TakeoverSend);
1212         } else {
1213             transitionState(StreamState::InMemory);
1214         }
1215         if (reschedule) {
1216             /*
1217              * It is not absolutely necessary to notify immediately as conn
1218              * manager or an incoming item will cause a notification eventually,
1219              * but wouldn't hurt to do so.
1220              *
1221              * Note: must not notify when we schedule a backfill for the first
1222              * time (i.e. when reschedule is false) because the stream is not
1223              * yet in producer conn list of streams.
1224              */
1225             bool inverse = false;
1226             if (itemsReady.compare_exchange_strong(inverse, true)) {
1227                 producer->notifyStreamReady(vb_);
1228             }
1229         }
1230     }
1231 }
1232
1233 void ActiveStream::handleSlowStream()
1234 {
1235     LockHolder lh(streamMutex);
1236     producer->getLogger().log(EXTENSION_LOG_NOTICE,
1237                               "(vb %" PRIu16 ") Handling slow stream; "
1238                               "state_ : %s, "
1239                               "lastReadSeqno : %" PRIu64 ", "
1240                               "lastSentSeqno : %" PRIu64 ", "
1241                               "isBackfillTaskRunning : %s",
1242                               vb_, to_string(state_.load()).c_str(),
1243                               lastReadSeqno.load(),
1244                               lastSentSeqno.load(),
1245                               isBackfillTaskRunning.load() ? "True" : "False");
1246     switch (state_.load()) {
1247         case StreamState::Backfilling:
1248         case StreamState::InMemory:
1249             /* Drop the existing cursor and set pending backfill */
1250             dropCheckpointCursor_UNLOCKED();
1251             pendingBackfill = true;
1252             break;
1253         case StreamState::TakeoverSend:
1254             /* To be handled later if needed */
1255         case StreamState::TakeoverWait:
1256             /* To be handled later if needed */
1257         case StreamState::Dead:
1258             /* To be handled later if needed */
1259             break;
1260         case StreamState::Pending:
1261         case StreamState::Reading:
1262             throw std::logic_error("ActiveStream::handleSlowStream: "
1263                                    "called with state " +
1264                                    to_string(state_.load()) + " "
1265                                    "for stream " + producer->logHeader() +
1266                                    "; vb " + std::to_string(vb_));
1267     }
1268 }
1269
1270 const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
1271 {
1272     switch (status) {
1273     case END_STREAM_OK:
1274         return "The stream ended due to all items being streamed";
1275     case END_STREAM_CLOSED:
1276         return "The stream closed early due to a close stream message";
1277     case END_STREAM_STATE:
1278         return "The stream closed early because the vbucket state changed";
1279     case END_STREAM_DISCONNECTED:
1280         return "The stream closed early because the conn was disconnected";
1281     case END_STREAM_SLOW:
1282         return "The stream was closed early because it was too slow";
1283     case END_STREAM_BACKFILL_FAIL:
1284         return "The stream closed early due to backfill failure";
1285     }
1286     std::string msg("Status unknown: " + std::to_string(status) +
1287                     "; this should not have happened!");
1288     return msg.c_str();
1289 }
1290
1291 void ActiveStream::transitionState(StreamState newState) {
1292     producer->getLogger().log(EXTENSION_LOG_DEBUG,
1293                               "(vb %d) Transitioning from %s to %s",
1294                               vb_, to_string(state_.load()).c_str(),
1295                               to_string(newState).c_str());
1296
1297     if (state_ == newState) {
1298         return;
1299     }
1300
1301     bool validTransition = false;
1302     switch (state_.load()) {
1303         case StreamState::Pending:
1304             if (newState == StreamState::Backfilling ||
1305                     newState == StreamState::Dead) {
1306                 validTransition = true;
1307             }
1308             break;
1309         case StreamState::Backfilling:
1310             if(newState == StreamState::InMemory ||
1311                newState == StreamState::TakeoverSend ||
1312                newState == StreamState::Dead) {
1313                 validTransition = true;
1314             }
1315             break;
1316         case StreamState::InMemory:
1317             if (newState == StreamState::Backfilling ||
1318                     newState == StreamState::Dead) {
1319                 validTransition = true;
1320             }
1321             break;
1322         case StreamState::TakeoverSend:
1323             if (newState == StreamState::TakeoverWait ||
1324                     newState == StreamState::Dead) {
1325                 validTransition = true;
1326             }
1327             break;
1328         case StreamState::TakeoverWait:
1329             if (newState == StreamState::TakeoverSend ||
1330                     newState == StreamState::Dead) {
1331                 validTransition = true;
1332             }
1333             break;
1334         case StreamState::Reading:
1335             // Active stream should never be in READING state.
1336             validTransition = false;
1337             break;
1338         case StreamState::Dead:
1339             // Once DEAD, no other transitions should occur.
1340             validTransition = false;
1341             break;
1342     }
1343
1344     if (!validTransition) {
1345         throw std::invalid_argument("ActiveStream::transitionState:"
1346                 " newState (which is " + to_string(newState) +
1347                 ") is not valid for current state (which is " +
1348                 to_string(state_.load()) + ")");
1349     }
1350
1351     StreamState oldState = state_.load();
1352     state_ = newState;
1353
1354     switch (newState) {
1355         case StreamState::Backfilling:
1356             if (StreamState::Pending == oldState) {
1357                 scheduleBackfill_UNLOCKED(false /* reschedule */);
1358             } else if (StreamState::InMemory == oldState) {
1359                 scheduleBackfill_UNLOCKED(true /* reschedule */);
1360             }
1361             break;
1362         case StreamState::InMemory:
1363             // Check if the producer has sent up till the last requested
1364             // sequence number already, if not - move checkpoint items into
1365             // the ready queue.
1366             if (lastSentSeqno.load() >= end_seqno_) {
1367                 // Stream transitioning to DEAD state
1368                 endStream(END_STREAM_OK);
1369                 bool inverse = false;
1370                 if (itemsReady.compare_exchange_strong(inverse, true)) {
1371                     producer->notifyStreamReady(vb_);
1372                 }
1373             } else {
1374                 nextCheckpointItem();
1375             }
1376             break;
1377         case StreamState::TakeoverSend:
1378             takeoverStart = ep_current_time();
1379             nextCheckpointItem();
1380             break;
1381         case StreamState::Dead:
1382             {
1383                 VBucketPtr vb = engine->getVBucket(vb_);
1384                 if (vb) {
1385                     vb->checkpointManager.removeCursor(name_);
1386                 }
1387                 break;
1388             }
1389         case StreamState::TakeoverWait:
1390         case StreamState::Pending:
1391             break;
1392         case StreamState::Reading:
1393             throw std::logic_error("ActiveStream::transitionState:"
1394                     " newState can't be " + to_string(newState) +
1395                     "!");
1396     }
1397 }
1398
1399 size_t ActiveStream::getItemsRemaining() {
1400     VBucketPtr vbucket = engine->getVBucket(vb_);
1401
1402     if (!vbucket || !isActive()) {
1403         return 0;
1404     }
1405
1406     // Items remaining is the sum of:
1407     // (a) Items outstanding in checkpoints
1408     // (b) Items pending in our readyQ, excluding any meta items.
1409     return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1410             readyQ_non_meta_items;
1411 }
1412
1413 uint64_t ActiveStream::getLastReadSeqno() const {
1414     return lastReadSeqno.load();
1415 }
1416
1417 uint64_t ActiveStream::getLastSentSeqno() const {
1418     return lastSentSeqno.load();
1419 }
1420
1421 const Logger& ActiveStream::getLogger() const
1422 {
1423     return producer->getLogger();
1424 }
1425
1426 bool ActiveStream::isCurrentSnapshotCompleted() const
1427 {
1428     VBucketPtr vbucket = engine->getVBucket(vb_);
1429     // An atomic read of vbucket state without acquiring the
1430     // reader lock for state should suffice here.
1431     if (vbucket && vbucket->getState() == vbucket_state_replica) {
1432         if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
1433             lastReadSeqno) {
1434             return false;
1435         }
1436     }
1437     return true;
1438 }
1439
1440 void ActiveStream::dropCheckpointCursor_UNLOCKED()
1441 {
1442     VBucketPtr vbucket = engine->getVBucket(vb_);
1443     if (!vbucket) {
1444         endStream(END_STREAM_STATE);
1445         bool inverse = false;
1446         if (itemsReady.compare_exchange_strong(inverse, true)) {
1447             producer->notifyStreamReady(vb_);
1448         }
1449     }
1450     /* Drop the existing cursor */
1451     vbucket->checkpointManager.removeCursor(name_);
1452 }
1453
1454 void ActiveStream::maybeChangeSeparator(DcpResponse* response) {
1455     if (response->getEvent() == DcpResponse::Event::SystemEvent) {
1456         auto se = static_cast<SystemEventProducerMessage*>(response);
1457         if (se->getSystemEvent() == SystemEvent::CollectionsSeparatorChanged) {
1458             currentSeparator =
1459                     std::string(se->getKey().data(), se->getKey().size());
1460         }
1461     }
1462 }
1463
1464 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
1465                                const std::string &name, uint32_t flags,
1466                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1467                                uint64_t en_seqno, uint64_t vb_uuid,
1468                                uint64_t snap_start_seqno,
1469                                uint64_t snap_end_seqno)
1470     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1471              snap_start_seqno, snap_end_seqno, Type::Notifier),
1472       producer(p) {
1473     LockHolder lh(streamMutex);
1474     VBucketPtr vbucket = e->getVBucket(vb_);
1475     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1476         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1477         transitionState(StreamState::Dead);
1478         itemsReady.store(true);
1479     }
1480
1481     producer->getLogger().log(EXTENSION_LOG_NOTICE,
1482         "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
1483         PRIu64, vb, st_seqno, en_seqno);
1484 }
1485
1486 uint32_t NotifierStream::setDead(end_stream_status_t status) {
1487     std::unique_lock<std::mutex> lh(streamMutex);
1488     if (isActive()) {
1489         transitionState(StreamState::Dead);
1490         if (status != END_STREAM_DISCONNECTED) {
1491             pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
1492             lh.unlock();
1493             bool inverse = false;
1494             if (itemsReady.compare_exchange_strong(inverse, true)) {
1495                 producer->notifyStreamReady(vb_);
1496             }
1497         }
1498     }
1499     return 0;
1500 }
1501
1502 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1503     std::unique_lock<std::mutex> lh(streamMutex);
1504     if (isActive() && start_seqno_ < seqno) {
1505         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1506         transitionState(StreamState::Dead);
1507         lh.unlock();
1508         bool inverse = false;
1509         if (itemsReady.compare_exchange_strong(inverse, true)) {
1510             producer->notifyStreamReady(vb_);
1511         }
1512     }
1513 }
1514
1515 DcpResponse* NotifierStream::next() {
1516     LockHolder lh(streamMutex);
1517
1518     if (readyQ.empty()) {
1519         itemsReady.store(false);
1520         return NULL;
1521     }
1522
1523     DcpResponse* response = readyQ.front();
1524     if (producer->bufferLogInsert(response->getMessageSize())) {
1525         popFromReadyQ();
1526     } else {
1527         response = NULL;
1528     }
1529
1530     return response;
1531 }
1532
1533 void NotifierStream::transitionState(StreamState newState) {
1534     producer->getLogger().log(EXTENSION_LOG_DEBUG,
1535         "(vb %d) Transitioning from %s to %s", vb_,
1536         to_string(state_.load()).c_str(), to_string(newState).c_str());
1537
1538     if (state_ == newState) {
1539         return;
1540     }
1541
1542     bool validTransition = false;
1543     switch (state_.load()) {
1544         case StreamState::Pending:
1545             if (newState == StreamState::Dead) {
1546                 validTransition = true;
1547             }
1548             break;
1549
1550         case StreamState::Backfilling:
1551         case StreamState::InMemory:
1552         case StreamState::TakeoverSend:
1553         case StreamState::TakeoverWait:
1554         case StreamState::Reading:
1555         case StreamState::Dead:
1556             // No other state transitions are valid for a notifier stream.
1557             break;
1558     }
1559
1560     if (!validTransition) {
1561         throw std::invalid_argument("NotifierStream::transitionState:"
1562                 " newState (which is " + to_string(newState) +
1563                 ") is not valid for current state (which is " +
1564                 to_string(state_.load()) + ")");
1565     }
1566     state_ = newState;
1567 }
1568
1569 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1570                              const std::string &name, uint32_t flags,
1571                              uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1572                              uint64_t en_seqno, uint64_t vb_uuid,
1573                              uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1574                              uint64_t vb_high_seqno)
1575     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1576              snap_start_seqno, snap_end_seqno, Type::Passive),
1577       engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1578       cur_snapshot_end(0), cur_snapshot_type(Snapshot::None), cur_snapshot_ack(false) {
1579     LockHolder lh(streamMutex);
1580     streamRequest_UNLOCKED(vb_uuid);
1581     itemsReady.store(true);
1582 }
1583
1584 PassiveStream::~PassiveStream() {
1585     uint32_t unackedBytes = clearBuffer_UNLOCKED();
1586     if (transitionState(StreamState::Dead)) {
1587         // Destructed a "live" stream, log it.
1588         consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1589             "(vb %" PRId16 ") Destructing stream."
1590             " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
1591             vb_, last_seqno.load(), unackedBytes);
1592     }
1593 }
1594
1595 void PassiveStream::streamRequest(uint64_t vb_uuid) {
1596     {
1597         std::unique_lock<std::mutex> lh(streamMutex);
1598         streamRequest_UNLOCKED(vb_uuid);
1599     }
1600
1601     bool expected = false;
1602     if (itemsReady.compare_exchange_strong(expected, true)) {
1603         consumer->notifyStreamReady(vb_);
1604     }
1605 }
1606
1607 void PassiveStream::streamRequest_UNLOCKED(uint64_t vb_uuid) {
1608     pushToReadyQ(new StreamRequest(vb_,
1609                                    opaque_,
1610                                    flags_,
1611                                    start_seqno_,
1612                                    end_seqno_,
1613                                    vb_uuid,
1614                                    snap_start_seqno_,
1615                                    snap_end_seqno_));
1616
1617     const char* type = (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER)
1618         ? "takeover stream" : "stream";
1619     consumer->getLogger().log(
1620             EXTENSION_LOG_NOTICE,
1621             "(vb %" PRId16 ") Attempting to add %s: opaque_:%" PRIu32 ", "
1622             "start_seqno_:%" PRIu64 ", end_seqno_:%" PRIu64 ", "
1623             "vb_uuid:%" PRIu64 ", snap_start_seqno_:%" PRIu64 ", "
1624             "snap_end_seqno_:%" PRIu64 ", last_seqno:%" PRIu64,
1625             vb_,
1626             type,
1627             opaque_,
1628             start_seqno_,
1629             end_seqno_,
1630             vb_uuid,
1631             snap_start_seqno_,
1632             snap_end_seqno_,
1633             last_seqno.load());
1634 }
1635
1636 uint32_t PassiveStream::setDead(end_stream_status_t status) {
1637     /* Hold buffer lock so that we clear out all items before we set the stream
1638        to dead state. We do not want to add any new message to the buffer or
1639        process any items in the buffer once we set the stream state to dead. */
1640     std::unique_lock<std::mutex> lg(buffer.bufMutex);
1641     uint32_t unackedBytes = clearBuffer_UNLOCKED();
1642     bool killed = false;
1643
1644     LockHolder slh(streamMutex);
1645     if (transitionState(StreamState::Dead)) {
1646         killed = true;
1647     }
1648
1649     if (killed) {
1650         EXTENSION_LOG_LEVEL logLevel = EXTENSION_LOG_NOTICE;
1651         if (END_STREAM_DISCONNECTED == status) {
1652             logLevel = EXTENSION_LOG_WARNING;
1653         }
1654         consumer->getLogger().log(logLevel,
1655             "(vb %" PRId16 ") Setting stream to dead state, last_seqno is %"
1656             PRIu64 ", unAckedBytes is %" PRIu32 ", status is %s",
1657             vb_, last_seqno.load(), unackedBytes, getEndStreamStatusStr(status));
1658     }
1659     return unackedBytes;
1660 }
1661
1662 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1663     std::unique_lock<std::mutex> lh(streamMutex);
1664     if (isPending()) {
1665         if (status == ENGINE_SUCCESS) {
1666             transitionState(StreamState::Reading);
1667         } else {
1668             transitionState(StreamState::Dead);
1669         }
1670         pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1671         lh.unlock();
1672         bool inverse = false;
1673         if (itemsReady.compare_exchange_strong(inverse, true)) {
1674             consumer->notifyStreamReady(vb_);
1675         }
1676     }
1677 }
1678
1679 void PassiveStream::reconnectStream(VBucketPtr &vb,
1680                                     uint32_t new_opaque,
1681                                     uint64_t start_seqno) {
1682     vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1683
1684     snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
1685     if (info.range.end == info.start) {
1686         info.range.start = info.start;
1687     }
1688
1689     snap_start_seqno_ = info.range.start;
1690     start_seqno_ = info.start;
1691     snap_end_seqno_ = info.range.end;
1692
1693     consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1694         "(vb %d) Attempting to reconnect stream with opaque %" PRIu32
1695         ", start seq no %" PRIu64 ", end seq no %" PRIu64
1696         ", snap start seqno %" PRIu64 ", and snap end seqno %" PRIu64,
1697         vb_, new_opaque, start_seqno, end_seqno_,
1698         snap_start_seqno_, snap_end_seqno_);
1699     {
1700         LockHolder lh(streamMutex);
1701         last_seqno.store(start_seqno);
1702         pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1703                                       end_seqno_, vb_uuid_, snap_start_seqno_,
1704                                       snap_end_seqno_));
1705     }
1706     bool inverse = false;
1707     if (itemsReady.compare_exchange_strong(inverse, true)) {
1708         consumer->notifyStreamReady(vb_);
1709     }
1710 }
1711
1712 ENGINE_ERROR_CODE PassiveStream::messageReceived(std::unique_ptr<DcpResponse> dcpResponse) {
1713     if (!dcpResponse) {
1714         return ENGINE_EINVAL;
1715     }
1716
1717     if (!isActive()) {
1718         return ENGINE_KEY_ENOENT;
1719     }
1720
1721     auto seqno = dcpResponse->getBySeqno();
1722     if (seqno) {
1723         if (uint64_t(*seqno) <= last_seqno.load()) {
1724             consumer->getLogger().log(EXTENSION_LOG_WARNING,
1725                 "(vb %d) Erroneous (out of sequence) message (%s) received, "
1726                 "with opaque: %" PRIu32 ", its seqno (%" PRIu64 ") is not "
1727                 "greater than last received seqno (%" PRIu64 "); "
1728                 "Dropping mutation!",
1729                 vb_, dcpResponse->to_string(), opaque_, *seqno, last_seqno.load());
1730             return ENGINE_ERANGE;
1731         }
1732         last_seqno.store(*seqno);
1733     } else if(dcpResponse->getEvent() == DcpResponse::Event::SnapshotMarker) {
1734         auto s = static_cast<SnapshotMarker*>(dcpResponse.get());
1735         uint64_t snapStart = s->getStartSeqno();
1736         uint64_t snapEnd = s->getEndSeqno();
1737         if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
1738             consumer->getLogger().log(EXTENSION_LOG_WARNING,
1739                 "(vb %d) Erroneous snapshot marker received, with "
1740                 "opaque: %" PRIu32 ", its start "
1741                 "(%" PRIu64 "), and end (%" PRIu64 ") are less than last "
1742                 "received seqno (%" PRIu64 "); Dropping marker!",
1743                 vb_, opaque_, snapStart, snapEnd, last_seqno.load());
1744             return ENGINE_ERANGE;
1745         }
1746     }
1747
1748     if (engine->getReplicationThrottle().shouldProcess() && buffer.empty()) {
1749         /* Process the response here itself rather than buffering it */
1750         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1751         switch (dcpResponse->getEvent()) {
1752             case DcpResponse::Event::Mutation:
1753                 ret = processMutation(static_cast<MutationResponse*>(dcpResponse.get()));
1754                 break;
1755             case DcpResponse::Event::Deletion:
1756             case DcpResponse::Event::Expiration:
1757                 ret = processDeletion(static_cast<MutationResponse*>(dcpResponse.get()));
1758                 break;
1759             case DcpResponse::Event::SnapshotMarker:
1760                 processMarker(static_cast<SnapshotMarker*>(dcpResponse.get()));
1761                 break;
1762             case DcpResponse::Event::SetVbucket:
1763                 processSetVBucketState(static_cast<SetVBucketState*>(dcpResponse.get()));
1764                 break;
1765             case DcpResponse::Event::StreamEnd:
1766                 {
1767                     LockHolder lh(streamMutex);
1768                     transitionState(StreamState::Dead);
1769                 }
1770                 break;
1771             case DcpResponse::Event::SystemEvent: {
1772                     ret = processSystemEvent(*static_cast<SystemEventMessage*>(
1773                             dcpResponse.get()));
1774                     break;
1775                 }
1776             default:
1777                 consumer->getLogger().log(
1778                         EXTENSION_LOG_WARNING,
1779                         "(vb %d) Unknown event:%d, opaque:%" PRIu32,
1780                         vb_,
1781                         int(dcpResponse->getEvent()),
1782                         opaque_);
1783                 return ENGINE_DISCONNECT;
1784         }
1785         if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1786             return ret;
1787         }
1788     }
1789
1790     // Only buffer if the stream is not dead
1791     if (isActive()) {
1792         buffer.push(std::move(dcpResponse));
1793     }
1794     return ENGINE_TMPFAIL;
1795 }
1796
1797 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
1798                                                              size_t batchSize) {
1799     std::unique_lock<std::mutex> lh(buffer.bufMutex);
1800     uint32_t count = 0;
1801     uint32_t message_bytes = 0;
1802     uint32_t total_bytes_processed = 0;
1803     bool failed = false;
1804     while (count < batchSize && !buffer.messages.empty()) {
1805         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1806         /* If the stream is in dead state we should not process any remaining
1807            items in the buffer, we should rather clear them */
1808         if (!isActive()) {
1809             total_bytes_processed += clearBuffer_UNLOCKED();
1810             processed_bytes = total_bytes_processed;
1811             return all_processed;
1812         }
1813
1814         std::unique_ptr<DcpResponse> response = buffer.pop_front(lh);
1815
1816         // Release bufMutex whilst we attempt to process the message
1817         // a lock inversion exists with connManager if we hold this.
1818         lh.unlock();
1819
1820         message_bytes = response->getMessageSize();
1821
1822         switch (response->getEvent()) {
1823             case DcpResponse::Event::Mutation:
1824                 ret = processMutation(static_cast<MutationResponse*>(response.get()));
1825                 break;
1826             case DcpResponse::Event::Deletion:
1827             case DcpResponse::Event::Expiration:
1828                 ret = processDeletion(static_cast<MutationResponse*>(response.get()));
1829                 break;
1830             case DcpResponse::Event::SnapshotMarker:
1831                 processMarker(static_cast<SnapshotMarker*>(response.get()));
1832                 break;
1833             case DcpResponse::Event::SetVbucket:
1834                 processSetVBucketState(static_cast<SetVBucketState*>(response.get()));
1835                 break;
1836             case DcpResponse::Event::StreamEnd:
1837                 {
1838                     LockHolder lh(streamMutex);
1839                     transitionState(StreamState::Dead);
1840                 }
1841                 break;
1842             case DcpResponse::Event::SystemEvent: {
1843                     ret = processSystemEvent(
1844                             *static_cast<SystemEventMessage*>(response.get()));
1845                     break;
1846                 }
1847             default:
1848                 consumer->getLogger().log(EXTENSION_LOG_WARNING,
1849                                           "PassiveStream::processBufferedMessages:"
1850                                           "(vb %" PRIu16 ") PassiveStream ignoring "
1851                                           "unknown message type %s",
1852                                           vb_,
1853                                           response->to_string());
1854                 continue;
1855         }
1856
1857         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1858             failed = true;
1859         }
1860
1861         // Re-acquire bufMutex so that
1862         // 1) we can update the buffer
1863         // 2) safely re-check the while conditional statement
1864         lh.lock();
1865
1866         // If we failed and the stream is not dead, stash the DcpResponse at the
1867         // front of the queue and break the loop.
1868         if (failed && isActive()) {
1869             buffer.push_front(std::move(response), lh);
1870             break;
1871         }
1872
1873         count++;
1874         if (ret != ENGINE_ERANGE) {
1875             total_bytes_processed += message_bytes;
1876         }
1877     }
1878
1879     processed_bytes = total_bytes_processed;
1880
1881     if (failed) {
1882         return cannot_process;
1883     }
1884
1885     return all_processed;
1886 }
1887
1888 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1889     VBucketPtr vb = engine->getVBucket(vb_);
1890     if (!vb) {
1891         return ENGINE_NOT_MY_VBUCKET;
1892     }
1893
1894     if (uint64_t(*mutation->getBySeqno()) < cur_snapshot_start.load() ||
1895         uint64_t(*mutation->getBySeqno()) > cur_snapshot_end.load()) {
1896         consumer->getLogger().log(EXTENSION_LOG_WARNING,
1897             "(vb %d) Erroneous mutation [sequence "
1898             "number does not fall in the expected snapshot range : "
1899             "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1900             "snapshot_end (%" PRIu64 ")]; Dropping the mutation!",
1901             vb_, cur_snapshot_start.load(),
1902             *mutation->getBySeqno(), cur_snapshot_end.load());
1903         return ENGINE_ERANGE;
1904     }
1905
1906     // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
1907     // receive anything without a valid CAS, however given that versions without
1908     // this check may send us "bad" CAS values, we should regenerate them (which
1909     // is better than rejecting the data entirely).
1910     if (!Item::isValidCas(mutation->getItem()->getCas())) {
1911         LOG(EXTENSION_LOG_WARNING,
1912             "%s Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
1913             ", seqno:%" PRId64 "}. Regenerating new CAS",
1914             consumer->logHeader(),
1915             mutation->getItem()->getCas(), vb_,
1916             mutation->getItem()->getBySeqno());
1917         mutation->getItem()->setCas();
1918     }
1919
1920     ENGINE_ERROR_CODE ret;
1921     if (vb->isBackfillPhase()) {
1922         ret = engine->getKVBucket()->addBackfillItem(
1923                 *mutation->getItem(),
1924                 GenerateBySeqno::No,
1925                 mutation->getExtMetaData());
1926     } else {
1927         ret = engine->getKVBucket()->setWithMeta(*mutation->getItem(), 0,
1928                                                  NULL,
1929                                                  consumer->getCookie(),
1930                                                  true, true,
1931                                                  GenerateBySeqno::No,
1932                                                  GenerateCas::No,
1933                                                  mutation->getExtMetaData(),
1934                                                  true);
1935     }
1936
1937     if (ret != ENGINE_SUCCESS) {
1938         consumer->getLogger().log(EXTENSION_LOG_WARNING,
1939             "Got an error code %d while trying to process mutation", ret);
1940     } else {
1941         handleSnapshotEnd(vb, *mutation->getBySeqno());
1942     }
1943
1944     return ret;
1945 }
1946
1947 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1948     VBucketPtr vb = engine->getVBucket(vb_);
1949     if (!vb) {
1950         return ENGINE_NOT_MY_VBUCKET;
1951     }
1952
1953     if (uint64_t(*deletion->getBySeqno()) < cur_snapshot_start.load() ||
1954         uint64_t(*deletion->getBySeqno()) > cur_snapshot_end.load()) {
1955         consumer->getLogger().log(EXTENSION_LOG_WARNING,
1956             "(vb %d) Erroneous deletion [sequence "
1957             "number does not fall in the expected snapshot range : "
1958             "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1959             "snapshot_end (%" PRIu64 ")]; Dropping the deletion!",
1960             vb_, cur_snapshot_start.load(),
1961             *deletion->getBySeqno(), cur_snapshot_end.load());
1962         return ENGINE_ERANGE;
1963     }
1964
1965     uint64_t delCas = 0;
1966     ENGINE_ERROR_CODE ret;
1967     ItemMetaData meta = deletion->getItem()->getMetaData();
1968
1969     // MB-17517: Check for the incoming item's CAS validity.
1970     if (!Item::isValidCas(meta.cas)) {
1971         LOG(EXTENSION_LOG_WARNING,
1972             "%s Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
1973             ", seqno:%" PRId64 "}. Regenerating new CAS",
1974             consumer->logHeader(), meta.cas, vb_, *deletion->getBySeqno());
1975         meta.cas = Item::nextCas();
1976     }
1977
1978     ret = engine->getKVBucket()->deleteWithMeta(deletion->getItem()->getKey(),
1979                                                 delCas,
1980                                                 nullptr,
1981                                                 deletion->getVBucket(),
1982                                                 consumer->getCookie(),
1983                                                 true,
1984                                                 meta,
1985                                                 vb->isBackfillPhase(),
1986                                                 GenerateBySeqno::No,
1987                                                 GenerateCas::No,
1988                                                 *deletion->getBySeqno(),
1989                                                 deletion->getExtMetaData(),
1990                                                 true);
1991     if (ret == ENGINE_KEY_ENOENT) {
1992         ret = ENGINE_SUCCESS;
1993     }
1994
1995     if (ret != ENGINE_SUCCESS) {
1996         consumer->getLogger().log(EXTENSION_LOG_WARNING,
1997             "Got an error code %d while trying to process deletion", ret);
1998     } else {
1999         handleSnapshotEnd(vb, *deletion->getBySeqno());
2000     }
2001
2002     return ret;
2003 }
2004
2005 ENGINE_ERROR_CODE PassiveStream::processSystemEvent(
2006         const SystemEventMessage& event) {
2007     VBucketPtr vb = engine->getVBucket(vb_);
2008
2009     if (!vb) {
2010         return ENGINE_NOT_MY_VBUCKET;
2011     }
2012
2013     ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
2014     // Depending on the event, extras is different and key may even be empty
2015     // The specific handler will know how to interpret.
2016     switch (event.getSystemEvent()) {
2017     case SystemEvent::CreateCollection: {
2018         rv = processCreateCollection(*vb, {event});
2019         break;
2020     }
2021     case SystemEvent::BeginDeleteCollection: {
2022         rv = processBeginDeleteCollection(*vb, {event});
2023         break;
2024     }
2025     case SystemEvent::CollectionsSeparatorChanged: {
2026         rv = processSeparatorChanged(*vb, {event});
2027         break;
2028     }
2029     case SystemEvent::DeleteCollectionSoft:
2030     case SystemEvent::DeleteCollectionHard: {
2031         rv = ENGINE_EINVAL; // Producer won't send
2032         break;
2033     }
2034     }
2035
2036     if (rv != ENGINE_SUCCESS) {
2037         consumer->getLogger().log(EXTENSION_LOG_WARNING,
2038             "Got an error code %d while trying to process system event", rv);
2039     } else {
2040         handleSnapshotEnd(vb, *event.getBySeqno());
2041     }
2042
2043     return rv;
2044 }
2045
2046 ENGINE_ERROR_CODE PassiveStream::processCreateCollection(
2047         VBucket& vb, const CollectionsEvent& event) {
2048     try {
2049         vb.replicaAddCollection(event.getKey(),
2050                                 event.getRevision(),
2051                                 event.getBySeqno());
2052     } catch (std::exception& e) {
2053         LOG(EXTENSION_LOG_WARNING,
2054             "PassiveStream::processCreateCollection exception %s",
2055             e.what());
2056         return ENGINE_EINVAL;
2057     }
2058     return ENGINE_SUCCESS;
2059 }
2060
2061 ENGINE_ERROR_CODE PassiveStream::processBeginDeleteCollection(
2062         VBucket& vb, const CollectionsEvent& event) {
2063     try {
2064         vb.replicaBeginDeleteCollection(event.getKey(),
2065                                         event.getRevision(),
2066                                         event.getBySeqno());
2067     } catch (std::exception& e) {
2068         LOG(EXTENSION_LOG_WARNING,
2069             "PassiveStream::processBeginDeleteCollection exception %s",
2070             e.what());
2071         return ENGINE_EINVAL;
2072     }
2073     return ENGINE_SUCCESS;
2074 }
2075
2076 ENGINE_ERROR_CODE PassiveStream::processSeparatorChanged(
2077         VBucket& vb, const CollectionsEvent& event) {
2078     try {
2079         vb.replicaChangeCollectionSeparator(
2080                 event.getKey(), event.getRevision(), event.getBySeqno());
2081     } catch (std::exception& e) {
2082         LOG(EXTENSION_LOG_WARNING,
2083             "PassiveStream::processSeparatorChanged exception %s",
2084             e.what());
2085         return ENGINE_EINVAL;
2086     }
2087     return ENGINE_SUCCESS;
2088 }
2089
2090 void PassiveStream::processMarker(SnapshotMarker* marker) {
2091     VBucketPtr vb = engine->getVBucket(vb_);
2092
2093     cur_snapshot_start.store(marker->getStartSeqno());
2094     cur_snapshot_end.store(marker->getEndSeqno());
2095     cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ?
2096             Snapshot::Disk : Snapshot::Memory);
2097
2098     if (vb) {
2099         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
2100             vb->setBackfillPhase(true);
2101             // calling checkpointManager.setBackfillPhase sets the
2102             // openCheckpointId to zero
2103             vb->checkpointManager.setBackfillPhase(cur_snapshot_start.load(),
2104                                                    cur_snapshot_end.load());
2105         } else {
2106             if (marker->getFlags() & MARKER_FLAG_CHK ||
2107                     vb->checkpointManager.getOpenCheckpointId() == 0) {
2108                 vb->checkpointManager.createSnapshot(cur_snapshot_start.load(),
2109                                                      cur_snapshot_end.load());
2110             } else {
2111                 vb->checkpointManager.updateCurrentSnapshotEnd(cur_snapshot_end.load());
2112             }
2113             vb->setBackfillPhase(false);
2114         }
2115
2116         if (marker->getFlags() & MARKER_FLAG_ACK) {
2117             cur_snapshot_ack = true;
2118         }
2119     }
2120 }
2121
2122 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
2123     engine->getKVBucket()->setVBucketState(vb_, state->getState(), true);
2124     {
2125         LockHolder lh (streamMutex);
2126         pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
2127     }
2128     bool inverse = false;
2129     if (itemsReady.compare_exchange_strong(inverse, true)) {
2130         consumer->notifyStreamReady(vb_);
2131     }
2132 }
2133
2134 void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
2135     if (byseqno == cur_snapshot_end.load()) {
2136         if (cur_snapshot_type.load() == Snapshot::Disk &&
2137                 vb->isBackfillPhase()) {
2138             vb->setBackfillPhase(false);
2139             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
2140             vb->checkpointManager.checkAndAddNewCheckpoint(id, *vb);
2141         } else {
2142             size_t mem_threshold = engine->getEpStats().mem_high_wat.load();
2143             size_t mem_used = engine->getEpStats().getTotalMemoryUsed();
2144             /* We want to add a new replica checkpoint if the mem usage is above
2145                high watermark (85%) */
2146             if (mem_threshold < mem_used) {
2147                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
2148                 vb->checkpointManager.checkAndAddNewCheckpoint(id, *vb);
2149             }
2150         }
2151
2152         if (cur_snapshot_ack) {
2153             {
2154                 LockHolder lh(streamMutex);
2155                 pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
2156             }
2157             bool inverse = false;
2158             if (itemsReady.compare_exchange_strong(inverse, true)) {
2159                 consumer->notifyStreamReady(vb_);
2160             }
2161             cur_snapshot_ack = false;
2162         }
2163         cur_snapshot_type.store(Snapshot::None);
2164     }
2165 }
2166
2167 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
2168     Stream::addStats(add_stat, c);
2169
2170     try {
2171         const int bsize = 1024;
2172         char buf[bsize];
2173         size_t bufferItems = 0;
2174         size_t bufferBytes = 0;
2175         {
2176             std::lock_guard<std::mutex> lg(buffer.bufMutex);
2177             bufferItems = buffer.messages.size();
2178             bufferBytes = buffer.bytes;
2179         }
2180         checked_snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(),
2181                          vb_);
2182         add_casted_stat(buf, bufferItems, add_stat, c);
2183         checked_snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(),
2184                          vb_);
2185         add_casted_stat(buf, bufferBytes, add_stat, c);
2186         checked_snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(),
2187                          vb_);
2188         add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
2189         checked_snprintf(buf, bsize, "%s:stream_%d_last_received_seqno",
2190                          name_.c_str(), vb_);
2191         add_casted_stat(buf, last_seqno.load(), add_stat, c);
2192         checked_snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory",
2193                          name_.c_str(), vb_);
2194         add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
2195
2196         checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type",
2197                          name_.c_str(), vb_);
2198         add_casted_stat(buf, ::to_string(cur_snapshot_type.load()),
2199                         add_stat, c);
2200
2201         if (cur_snapshot_type.load() != Snapshot::None) {
2202             checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start",
2203                              name_.c_str(), vb_);
2204             add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
2205             checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end",
2206                              name_.c_str(), vb_);
2207             add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
2208         }
2209     } catch (std::exception& error) {
2210         LOG(EXTENSION_LOG_WARNING,
2211             "PassiveStream::addStats: Failed to build stats: %s", error.what());
2212     }
2213 }
2214
2215 DcpResponse* PassiveStream::next() {
2216     LockHolder lh(streamMutex);
2217
2218     if (readyQ.empty()) {
2219         itemsReady.store(false);
2220         return NULL;
2221     }
2222
2223     DcpResponse* response = readyQ.front();
2224     popFromReadyQ();
2225     return response;
2226 }
2227
2228 uint32_t PassiveStream::clearBuffer_UNLOCKED() {
2229     uint32_t unackedBytes = buffer.bytes;
2230     buffer.messages.clear();
2231     buffer.bytes = 0;
2232     return unackedBytes;
2233 }
2234
2235 bool PassiveStream::transitionState(StreamState newState) {
2236     consumer->getLogger().log(EXTENSION_LOG_DEBUG,
2237         "(vb %d) Transitioning from %s to %s",
2238         vb_, to_string(state_.load()).c_str(), to_string(newState).c_str());
2239
2240     if (state_ == newState) {
2241         return false;
2242     }
2243
2244     bool validTransition = false;
2245     switch (state_.load()) {
2246         case StreamState::Pending:
2247             if (newState == StreamState::Reading ||
2248                     newState == StreamState::Dead) {
2249                 validTransition = true;
2250             }
2251             break;
2252
2253         case StreamState::Backfilling:
2254         case StreamState::InMemory:
2255         case StreamState::TakeoverSend:
2256         case StreamState::TakeoverWait:
2257             // Not valid for passive streams
2258             break;
2259
2260         case StreamState::Reading:
2261             if (newState == StreamState::Pending ||
2262                     newState == StreamState::Dead) {
2263                 validTransition = true;
2264             }
2265             break;
2266
2267         case StreamState::Dead:
2268             // Once 'dead' shouldn't transition away from it.
2269             break;
2270     }
2271
2272     if (!validTransition) {
2273         throw std::invalid_argument("PassiveStream::transitionState:"
2274                 " newState (which is" + to_string(newState) +
2275                 ") is not valid for current state (which is " +
2276                 to_string(state_.load()) + ")");
2277     }
2278
2279     state_ = newState;
2280     return true;
2281 }
2282
2283 const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
2284 {
2285     switch (status) {
2286         case END_STREAM_OK:
2287             return "The stream closed as part of normal operation";
2288         case END_STREAM_CLOSED:
2289             return "The stream closed due to a close stream message";
2290         case END_STREAM_DISCONNECTED:
2291             return "The stream closed early because the conn was disconnected";
2292         case END_STREAM_STATE:
2293             return "The stream closed early because the vbucket state changed";
2294         default:
2295             break;
2296     }
2297     std::string msg("Status unknown: " + std::to_string(status) +
2298                     "; this should not have happened!");
2299     return msg.c_str();
2300 }