Merge "Merge remote-tracking branch 'couchbase/3.0.x' into sherlock" into sherlock
[ep-engine.git] / src / dcp-stream.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "kvstore.h"
23 #include "statwriter.h"
24 #include "dcp-backfill-manager.h"
25 #include "dcp-backfill.h"
26 #include "dcp-consumer.h"
27 #include "dcp-producer.h"
28 #include "dcp-response.h"
29 #include "dcp-stream.h"
30
31 static const char* snapshotTypeToString(snapshot_type_t type) {
32     static const char * const snapshotTypes[] = { "none", "disk", "memory" };
33     cb_assert(type >= none && type <= memory);
34     return snapshotTypes[type];
35 }
36
37 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
38
39 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
40                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
41                uint64_t vb_uuid, uint64_t snap_start_seqno,
42                uint64_t snap_end_seqno)
43     : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
44       start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
45       snap_start_seqno_(snap_start_seqno),
46       snap_end_seqno_(snap_end_seqno),
47       state_(STREAM_PENDING), itemsReady(false), readyQueueMemory(0) {
48 }
49
50 Stream::~Stream() {
51     // NB: reusing the "unlocked" method without a lock because we're
52     // destructing and should not take any locks.
53     clear_UNLOCKED();
54 }
55
56 void Stream::clear_UNLOCKED() {
57     while (!readyQ.empty()) {
58         DcpResponse* resp = readyQ.front();
59         popFromReadyQ();
60         delete resp;
61     }
62 }
63
64 void Stream::pushToReadyQ(DcpResponse* resp)
65 {
66     if (resp) {
67         readyQ.push(resp);
68         readyQueueMemory += resp->getMessageSize();
69     }
70 }
71
72 void Stream::popFromReadyQ(void)
73 {
74     if (!readyQ.empty()) {
75         uint32_t respSize = readyQ.front()->getMessageSize();
76         readyQ.pop();
77         /* Decrement the readyQ size */
78         if ((readyQueueMemory - respSize) <= readyQueueMemory) {
79             readyQueueMemory -= respSize;
80         } else {
81             LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
82                 "underflow, likely wrong stat calculation! curr size: %llu;"
83                 "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory,
84                 respSize);
85             readyQueueMemory = 0;
86         }
87     }
88 }
89
90 uint64_t Stream::getReadyQueueMemory() {
91     return readyQueueMemory;
92 }
93
94 const char * Stream::stateName(stream_state_t st) const {
95     static const char * const stateNames[] = {
96         "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
97         "reading", "dead"
98     };
99     cb_assert(st >= STREAM_PENDING && st <= STREAM_DEAD);
100     return stateNames[st];
101 }
102
103 void Stream::addStats(ADD_STAT add_stat, const void *c) {
104     const int bsize = 1024;
105     char buffer[bsize];
106     snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_);
107     add_casted_stat(buffer, flags_, add_stat, c);
108     snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_);
109     add_casted_stat(buffer, opaque_, add_stat, c);
110     snprintf(buffer, bsize, "%s:stream_%d_start_seqno", name_.c_str(), vb_);
111     add_casted_stat(buffer, start_seqno_, add_stat, c);
112     snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(), vb_);
113     add_casted_stat(buffer, end_seqno_, add_stat, c);
114     snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(), vb_);
115     add_casted_stat(buffer, vb_uuid_, add_stat, c);
116     snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno", name_.c_str(), vb_);
117     add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
118     snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno", name_.c_str(), vb_);
119     add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
120     snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_);
121     add_casted_stat(buffer, stateName(state_), add_stat, c);
122 }
123
124 ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
125                            const std::string &n, uint32_t flags,
126                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
127                            uint64_t en_seqno, uint64_t vb_uuid,
128                            uint64_t snap_start_seqno, uint64_t snap_end_seqno)
129     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
130               snap_start_seqno, snap_end_seqno),
131        lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
132        takeoverState(vbucket_state_pending), backfillRemaining(0),
133        itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
134        engine(e), producer(p), isBackfillTaskRunning(false),
135        lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
136
137     const char* type = "";
138     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
139         type = "takeover ";
140         end_seqno_ = dcpMaxSeqno;
141     }
142
143     RCPtr<VBucket> vbucket = engine->getVBucket(vb);
144     if (vbucket) {
145         ReaderLockHolder rlh(vbucket->getStateLock());
146         if (vbucket->getState() == vbucket_state_replica) {
147             snapshot_info_t info = vbucket->checkpointManager.getSnapshotInfo();
148             if (info.range.end > en_seqno) {
149                 end_seqno_ = info.range.end;
150             }
151         }
152     }
153
154     LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Creating %sstream "
155         "with start seqno %" PRIu64 " and end seqno %" PRIu64"",
156         producer->logHeader(), vb, type, st_seqno, en_seqno);
157
158     backfillItems.memory = 0;
159     backfillItems.disk = 0;
160     backfillItems.sent = 0;
161
162     type_ = STREAM_ACTIVE;
163
164     bufferedBackfill.bytes = 0;
165     bufferedBackfill.items = 0;
166
167     if (start_seqno_ >= end_seqno_) {
168         endStream(END_STREAM_OK);
169         itemsReady.store(true);
170     } else {
171         LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") %sstream created!",
172             producer->logHeader(), vb, type);
173     }
174 }
175
176 ActiveStream::~ActiveStream() {
177     transitionState(STREAM_DEAD);
178 }
179
180 DcpResponse* ActiveStream::next() {
181     LockHolder lh(streamMutex);
182
183     stream_state_t initState = state_;
184
185     DcpResponse* response = NULL;
186
187     switch (state_) {
188         case STREAM_PENDING:
189             break;
190         case STREAM_BACKFILLING:
191             response = backfillPhase();
192             break;
193         case STREAM_IN_MEMORY:
194             response = inMemoryPhase();
195             break;
196         case STREAM_TAKEOVER_SEND:
197             response = takeoverSendPhase();
198             break;
199         case STREAM_TAKEOVER_WAIT:
200             response = takeoverWaitPhase();
201             break;
202         case STREAM_DEAD:
203             response = deadPhase();
204             break;
205         default:
206             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
207                 producer->logHeader(), vb_, stateName(state_));
208             abort();
209     }
210
211     if (state_ != STREAM_DEAD && initState != state_ && !response) {
212         lh.unlock();
213         return next();
214     }
215
216     itemsReady.store(response ? true : false);
217     return response;
218 }
219
220 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
221     LockHolder lh(streamMutex);
222     uint64_t chkCursorSeqno = endSeqno;
223
224     if (state_ != STREAM_BACKFILLING) {
225         return;
226     }
227
228     startSeqno = std::min(snap_start_seqno_, startSeqno);
229     firstMarkerSent = true;
230
231     RCPtr<VBucket> vb = engine->getVBucket(vb_);
232     // An atomic read of vbucket state without acquiring the
233     // reader lock for state should suffice here.
234     if (vb && vb->getState() == vbucket_state_replica) {
235         if (end_seqno_ > endSeqno) {
236             /* We possibly have items in the open checkpoint
237                (incomplete snapshot) */
238             snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
239             LOG(EXTENSION_LOG_WARNING,
240                 "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
241                 "replica vbucket, backfill start seqno %" PRIu64 ", "
242                 "backfill end seqno %" PRIu64 ", "
243                 "snapshot end seqno after merge %" PRIu64,
244                 vb_, startSeqno, endSeqno, info.range.end);
245             endSeqno = info.range.end;
246         }
247     }
248
249     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
250         "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
251         endSeqno);
252     pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
253                                     MARKER_FLAG_DISK));
254     lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
255
256     if (!vb) {
257         endStream(END_STREAM_STATE);
258     } else {
259         // Only re-register the cursor if we still need to get memory snapshots
260         CursorRegResult result =
261             vb->checkpointManager.registerCursorBySeqno(name_, chkCursorSeqno);
262         curChkSeqno = result.first;
263     }
264
265     lh.unlock();
266     bool inverse = false;
267     if (itemsReady.compare_exchange_strong(inverse, true)) {
268         producer->notifyStreamReady(vb_, false);
269     }
270 }
271
272 bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source) {
273     LockHolder lh(streamMutex);
274     if (state_ == STREAM_BACKFILLING) {
275         if (!producer->getBackfillManager()->bytesRead(itm->size())) {
276             delete itm;
277             return false;
278         }
279
280         bufferedBackfill.bytes.fetch_add(itm->size());
281         bufferedBackfill.items++;
282
283         pushToReadyQ(new MutationResponse(itm, opaque_,
284                           prepareExtendedMetaData(itm->getVBucketId(),
285                                                   itm->getConflictResMode())));
286         lastReadSeqno = itm->getBySeqno();
287         lh.unlock();
288         bool inverse = false;
289         if (itemsReady.compare_exchange_strong(inverse, true)) {
290             producer->notifyStreamReady(vb_, false);
291         }
292
293         if (backfill_source == BACKFILL_FROM_MEMORY) {
294             backfillItems.memory++;
295         } else {
296             backfillItems.disk++;
297         }
298     } else {
299         delete itm;
300     }
301
302     return true;
303 }
304
305 void ActiveStream::completeBackfill() {
306     {
307         LockHolder lh(streamMutex);
308         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
309             " from disk %d from memory, last seqno read: %llu",
310             producer->logHeader(), vb_, backfillItems.disk.load(),
311             backfillItems.memory.load(), lastReadSeqno);
312     }
313
314     isBackfillTaskRunning.store(false);
315     bool inverse = false;
316     if (itemsReady.compare_exchange_strong(inverse, true)) {
317         producer->notifyStreamReady(vb_, false);
318     }
319 }
320
321 void ActiveStream::snapshotMarkerAckReceived() {
322     bool inverse = false;
323     if (--waitForSnapshot == 0 &&
324         itemsReady.compare_exchange_strong(inverse, true)) {
325         producer->notifyStreamReady(vb_, true);
326     }
327 }
328
329 void ActiveStream::setVBucketStateAckRecieved() {
330     LockHolder lh(streamMutex);
331     if (state_ == STREAM_TAKEOVER_WAIT) {
332         if (takeoverState == vbucket_state_pending) {
333             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
334                 "vbucket state to pending message", producer->logHeader(), vb_);
335
336             takeoverState = vbucket_state_active;
337             transitionState(STREAM_TAKEOVER_SEND);
338             lh.unlock();
339
340             engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
341                                                   false, false);
342             RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
343             LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Vbucket marked as "
344                 "dead, last sent seqno: %" PRIu64 ", high seqno: %" PRIu64 "",
345                 producer->logHeader(), vb_, lastSentSeqno,
346                 vbucket->getHighSeqno());
347         } else {
348             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
349                 "vbucket state to active message", producer->logHeader(), vb_);
350             endStream(END_STREAM_OK);
351             lh.unlock();
352         }
353
354         bool inverse = false;
355         if (itemsReady.compare_exchange_strong(inverse, true)) {
356             producer->notifyStreamReady(vb_, true);
357         }
358     } else {
359         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
360             "op on stream '%s' state '%s'", producer->logHeader(), vb_,
361             name_.c_str(), stateName(state_));
362     }
363
364 }
365
366 DcpResponse* ActiveStream::backfillPhase() {
367     DcpResponse* resp = nextQueuedItem();
368
369     if (resp && (resp->getEvent() == DCP_MUTATION ||
370          resp->getEvent() == DCP_DELETION ||
371          resp->getEvent() == DCP_EXPIRATION)) {
372         MutationResponse* m = static_cast<MutationResponse*>(resp);
373         producer->getBackfillManager()->bytesSent(m->getItem()->size());
374         bufferedBackfill.bytes.fetch_sub(m->getItem()->size());
375         bufferedBackfill.items--;
376         if (backfillRemaining > 0) {
377             backfillRemaining--;
378         }
379     }
380
381     if (!isBackfillTaskRunning && readyQ.empty()) {
382         backfillRemaining = 0;
383         if (lastReadSeqno >= end_seqno_) {
384             endStream(END_STREAM_OK);
385         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
386             transitionState(STREAM_TAKEOVER_SEND);
387         } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
388             endStream(END_STREAM_OK);
389         } else {
390             transitionState(STREAM_IN_MEMORY);
391         }
392
393         if (!resp) {
394             resp = nextQueuedItem();
395         }
396     }
397
398     return resp;
399 }
400
401 DcpResponse* ActiveStream::inMemoryPhase() {
402     if (lastSentSeqno >= end_seqno_) {
403         endStream(END_STREAM_OK);
404     } else if (readyQ.empty()) {
405         if (nextCheckpointItem()) {
406             return NULL;
407         }
408     }
409
410     return nextQueuedItem();
411 }
412
413 DcpResponse* ActiveStream::takeoverSendPhase() {
414     if (!readyQ.empty()) {
415         return nextQueuedItem();
416     } else {
417         if (nextCheckpointItem()) {
418             return NULL;
419         }
420     }
421
422     if (waitForSnapshot != 0) {
423         return NULL;
424     }
425     DcpResponse* resp = NULL;
426     if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
427         resp = new SetVBucketState(opaque_, vb_, takeoverState);
428         transitionState(STREAM_TAKEOVER_WAIT);
429     }
430     return resp;
431 }
432
433 DcpResponse* ActiveStream::takeoverWaitPhase() {
434     return nextQueuedItem();
435 }
436
437 DcpResponse* ActiveStream::deadPhase() {
438     return nextQueuedItem();
439 }
440
441 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
442     Stream::addStats(add_stat, c);
443
444     const int bsize = 1024;
445     char buffer[bsize];
446     snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
447              name_.c_str(), vb_);
448     add_casted_stat(buffer, backfillItems.disk, add_stat, c);
449     snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
450              name_.c_str(), vb_);
451     add_casted_stat(buffer, backfillItems.memory, add_stat, c);
452     snprintf(buffer, bsize, "%s:stream_%d_backfill_sent", name_.c_str(), vb_);
453     add_casted_stat(buffer, backfillItems.sent, add_stat, c);
454     snprintf(buffer, bsize, "%s:stream_%d_memory_phase", name_.c_str(), vb_);
455     add_casted_stat(buffer, itemsFromMemoryPhase, add_stat, c);
456     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
457     add_casted_stat(buffer, lastSentSeqno, add_stat, c);
458     snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
459              name_.c_str(), vb_);
460     add_casted_stat(buffer,
461                     lastSentSnapEndSeqno.load(std::memory_order_relaxed),
462                     add_stat, c);
463     snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
464     add_casted_stat(buffer, lastReadSeqno, add_stat, c);
465     snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
466     add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
467     snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
468     add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat, c);
469     snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes", name_.c_str(), vb_);
470     add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
471     snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items", name_.c_str(), vb_);
472     add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
473 }
474
475 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
476     LockHolder lh(streamMutex);
477
478     RCPtr<VBucket> vb = engine->getVBucket(vb_);
479     add_casted_stat("name", name_, add_stat, cookie);
480     if (!vb || state_ == STREAM_DEAD) {
481         add_casted_stat("status", "completed", add_stat, cookie);
482         add_casted_stat("estimate", 0, add_stat, cookie);
483         add_casted_stat("backfillRemaining", 0, add_stat, cookie);
484         add_casted_stat("estimate", 0, add_stat, cookie);
485         return;
486     }
487
488     size_t total = backfillRemaining;
489     if (state_ == STREAM_BACKFILLING) {
490         add_casted_stat("status", "backfilling", add_stat, cookie);
491     } else {
492         add_casted_stat("status", "in-memory", add_stat, cookie);
493     }
494     add_casted_stat("backfillRemaining", backfillRemaining, add_stat, cookie);
495
496     item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
497     size_t vb_items = vb->getNumItems(iep);
498     size_t chk_items = vb_items > 0 ?
499                 vb->checkpointManager.getNumItemsForCursor(name_) : 0;
500     size_t del_items = engine->getEpStore()->getRWUnderlying(vb_)->
501                                                     getNumPersistedDeletes(vb_);
502
503     if (end_seqno_ < curChkSeqno) {
504         chk_items = 0;
505     } else if ((end_seqno_ - curChkSeqno) < chk_items) {
506         chk_items = end_seqno_ - curChkSeqno + 1;
507     }
508     total += chk_items;
509
510     add_casted_stat("estimate", total, add_stat, cookie);
511     add_casted_stat("chk_items", chk_items, add_stat, cookie);
512     add_casted_stat("vb_items", vb_items, add_stat, cookie);
513     add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
514 }
515
516 DcpResponse* ActiveStream::nextQueuedItem() {
517     if (!readyQ.empty()) {
518         DcpResponse* response = readyQ.front();
519         if (producer->bufferLogInsert(response->getMessageSize())) {
520             if (response->getEvent() == DCP_MUTATION ||
521                 response->getEvent() == DCP_DELETION ||
522                 response->getEvent() == DCP_EXPIRATION) {
523                 lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
524
525                 if (state_ == STREAM_BACKFILLING) {
526                     backfillItems.sent++;
527                 } else {
528                     itemsFromMemoryPhase++;
529                 }
530             }
531             popFromReadyQ();
532             return response;
533         }
534     }
535     return NULL;
536 }
537
538 bool ActiveStream::nextCheckpointItem() {
539     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
540     if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
541         // schedule this stream to build the next checkpoint
542         producer->scheduleCheckpointProcessorTask(this);
543         return true;
544     } else if (chkptItemsExtractionInProgress) {
545         return true;
546     }
547     return false;
548 }
549
550 bool ActiveStreamCheckpointProcessorTask::run() {
551     if (engine->getEpStats().isShutdown) {
552         return false;
553     }
554
555     // Setup that we will sleep forever when done.
556     snooze(INT_MAX);
557
558     // Clear the notfification flag
559     notified.store(false);
560
561     size_t iterations = 0;
562     do {
563         stream_t nextStream = queuePop();
564         ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
565
566         if (stream) {
567             stream->nextCheckpointItemTask();
568         } else {
569             break;
570         }
571         iterations++;
572     } while(!queueEmpty()
573             && iterations < iterationsBeforeYield);
574
575     // Now check if we were re-notified or there are still checkpoints
576     bool expected = true;
577     if (notified.compare_exchange_strong(expected, false)
578         || !queueEmpty()) {
579         // snooze for 0, essentially yielding and allowing other tasks a go
580         snooze(0.0);
581     }
582
583     return true;
584 }
585
586 void ActiveStreamCheckpointProcessorTask::wakeup() {
587     ExecutorPool::get()->wake(getId());
588 }
589
590 void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
591     pushUnique(stream);
592
593     bool expected = false;
594     if (notified.compare_exchange_strong(expected, true)) {
595         wakeup();
596     }
597 }
598
599 void ActiveStreamCheckpointProcessorTask::clearQueues() {
600     LockHolder lh(workQueueLock);
601     while (!queue.empty()) {
602         queue.pop();
603     }
604     queuedVbuckets.clear();
605 }
606
607 void ActiveStream::nextCheckpointItemTask() {
608     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
609     if (vbucket) {
610         std::vector<queued_item> items;
611         getOutstandingItems(vbucket, items);
612         processItems(items);
613     } else {
614         /* The entity deleting the vbucket must set stream to dead,
615            calling setDead(END_STREAM_STATE) will cause deadlock because
616            it will try to grab streamMutex which is already acquired at this
617            point here */
618         return;
619     }
620 }
621
622 void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
623                                        std::vector<queued_item> &items) {
624     // Commencing item processing - set guard flag.
625     chkptItemsExtractionInProgress.store(true);
626
627     vb->checkpointManager.getAllItemsForCursor(name_, items);
628     if (vb->checkpointManager.getNumCheckpoints() > 1) {
629         engine->getEpStore()->wakeUpCheckpointRemover();
630     }
631 }
632
633
634 void ActiveStream::processItems(std::vector<queued_item>& items) {
635     if (!items.empty()) {
636         bool mark = false;
637         if (items.front()->getOperation() == queue_op_checkpoint_start) {
638             mark = true;
639         }
640
641         std::deque<MutationResponse*> mutations;
642         std::vector<queued_item>::iterator itr = items.begin();
643         for (; itr != items.end(); ++itr) {
644             queued_item& qi = *itr;
645
646             if (qi->getOperation() == queue_op_set ||
647                 qi->getOperation() == queue_op_del) {
648                 curChkSeqno = qi->getBySeqno();
649                 lastReadSeqno = qi->getBySeqno();
650
651                 mutations.push_back(new MutationResponse(qi, opaque_,
652                             prepareExtendedMetaData(qi->getVBucketId(),
653                                                     qi->getConflictResMode())));
654             } else if (qi->getOperation() == queue_op_checkpoint_start) {
655                 cb_assert(mutations.empty());
656                 mark = true;
657             }
658         }
659
660         if (mutations.empty()) {
661             // If we only got checkpoint start or ends check to see if there are
662             // any more snapshots before pausing the stream.
663             nextCheckpointItemTask();
664         } else {
665             snapshot(mutations, mark);
666         }
667     }
668
669     // Completed item processing - clear guard flag and notify producer.
670     chkptItemsExtractionInProgress.store(false);
671     producer->notifyStreamReady(vb_, true);
672 }
673
674 void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
675     if (items.empty()) {
676         return;
677     }
678
679     LockHolder lh(streamMutex);
680
681     if (isCurrentSnapshotCompleted()) {
682         uint32_t flags = MARKER_FLAG_MEMORY;
683         uint64_t snapStart = items.front()->getBySeqno();
684         uint64_t snapEnd = items.back()->getBySeqno();
685
686         if (mark) {
687             flags |= MARKER_FLAG_CHK;
688         }
689
690         if (state_ == STREAM_TAKEOVER_SEND) {
691             waitForSnapshot++;
692             flags |= MARKER_FLAG_ACK;
693         }
694
695         if (!firstMarkerSent) {
696             snapStart = std::min(snap_start_seqno_, snapStart);
697             firstMarkerSent = true;
698         }
699         pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
700                                         flags));
701         lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
702     }
703
704     std::deque<MutationResponse*>::iterator itemItr;
705     for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
706         pushToReadyQ(*itemItr);
707     }
708 }
709
710 uint32_t ActiveStream::setDead(end_stream_status_t status) {
711     {
712         LockHolder lh(streamMutex);
713         endStream(status);
714     }
715
716     bool inverse = false;
717     if (status != END_STREAM_DISCONNECTED &&
718         itemsReady.compare_exchange_strong(inverse, true)) {
719         producer->notifyStreamReady(vb_, true);
720     }
721     return 0;
722 }
723
724 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
725     if (state_ != STREAM_DEAD) {
726         bool inverse = false;
727         if (itemsReady.compare_exchange_strong(inverse, true)) {
728             producer->notifyStreamReady(vb_, true);
729         }
730     }
731 }
732
733 void ActiveStream::endStream(end_stream_status_t reason) {
734     if (state_ != STREAM_DEAD) {
735         if (state_ == STREAM_BACKFILLING) {
736             // If Stream were in Backfilling state, clear out the
737             // backfilled items to clear up the backfill buffer.
738             clear_UNLOCKED();
739             producer->getBackfillManager()->bytesSent(bufferedBackfill.bytes);
740             bufferedBackfill.bytes = 0;
741             bufferedBackfill.items = 0;
742         }
743         if (reason != END_STREAM_DISCONNECTED) {
744             pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
745         }
746         transitionState(STREAM_DEAD);
747         LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Stream closing, "
748             "%" PRIu64 " items sent from backfill phase, %" PRIu64 " items "
749             "sent from memory phase, %" PRIu64 " was last seqno sent, "
750             "reason: %s", producer->logHeader(), vb_,
751             uint64_t(backfillItems.sent.load()),
752             uint64_t(itemsFromMemoryPhase), lastSentSeqno,
753             getEndStreamStatusStr(reason));
754     }
755 }
756
757 void ActiveStream::scheduleBackfill() {
758     if (!isBackfillTaskRunning) {
759         RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
760         if (!vbucket) {
761             return;
762         }
763
764         CursorRegResult result =
765             vbucket->checkpointManager.registerCursorBySeqno(name_,
766                                                              lastReadSeqno);
767
768         curChkSeqno = result.first;
769         bool isFirstItem = result.second;
770
771         cb_assert(lastReadSeqno <= curChkSeqno);
772         uint64_t backfillStart = lastReadSeqno + 1;
773
774         /* We need to find the minimum seqno that needs to be backfilled in
775          * order to make sure that we don't miss anything when transitioning
776          * to a memory snapshot. The backfill task will always make sure that
777          * the backfill end seqno is contained in the backfill.
778          */
779         uint64_t backfillEnd = 0;
780         if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) { // disk backfill only
781             backfillEnd = end_seqno_;
782         } else { // disk backfill + in-memory streaming
783             if (backfillStart < curChkSeqno) {
784                 if (curChkSeqno > end_seqno_) {
785                     backfillEnd = end_seqno_;
786                 } else {
787                     backfillEnd = curChkSeqno - 1;
788                 }
789             }
790         }
791
792         bool tryBackfill = isFirstItem || flags_ & DCP_ADD_STREAM_FLAG_DISKONLY;
793
794         if (backfillStart <= backfillEnd && tryBackfill) {
795             BackfillManager* backfillMgr = producer->getBackfillManager();
796             backfillMgr->schedule(this, backfillStart, backfillEnd);
797             isBackfillTaskRunning.store(true);
798         } else {
799             if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
800                 endStream(END_STREAM_OK);
801             } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
802                 transitionState(STREAM_TAKEOVER_SEND);
803             } else {
804                 transitionState(STREAM_IN_MEMORY);
805             }
806             itemsReady.store(true);
807         }
808     }
809 }
810
811 const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
812 {
813     switch (status) {
814     case END_STREAM_OK:
815         return "The stream ended due to all items being streamed";
816     case END_STREAM_CLOSED:
817         return "The stream closed early due to a close stream message";
818     case END_STREAM_STATE:
819         return "The stream closed early because the vbucket state changed";
820     case END_STREAM_DISCONNECTED:
821         return "The stream closed early because the conn was disconnected";
822     default:
823         break;
824     }
825     return "Status unknown; this should not happen";
826 }
827
828 void ActiveStream::transitionState(stream_state_t newState) {
829     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
830         producer->logHeader(), vb_, stateName(state_), stateName(newState));
831
832     if (state_ == newState) {
833         return;
834     }
835
836     switch (state_) {
837         case STREAM_PENDING:
838             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
839             break;
840         case STREAM_BACKFILLING:
841             cb_assert(newState == STREAM_IN_MEMORY ||
842                    newState == STREAM_TAKEOVER_SEND ||
843                    newState == STREAM_DEAD);
844             break;
845         case STREAM_IN_MEMORY:
846             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
847             break;
848         case STREAM_TAKEOVER_SEND:
849             cb_assert(newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD);
850             break;
851         case STREAM_TAKEOVER_WAIT:
852             cb_assert(newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD);
853             break;
854         default:
855             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
856                 "to %s", producer->logHeader(), vb_, stateName(state_),
857                 stateName(newState));
858             abort();
859     }
860
861     state_ = newState;
862
863     switch (newState) {
864         case STREAM_BACKFILLING:
865             scheduleBackfill();
866             break;
867         case STREAM_IN_MEMORY:
868             // Check if the producer has sent up till the last requested
869             // sequence number already, if not - move checkpoint items into
870             // the ready queue.
871             if (lastSentSeqno >= end_seqno_) {
872                 // Stream transitioning to DEAD state
873                 endStream(END_STREAM_OK);
874             } else {
875                 nextCheckpointItem();
876             }
877             break;
878         case STREAM_TAKEOVER_SEND:
879             nextCheckpointItem();
880             break;
881         case STREAM_DEAD:
882             {
883                 RCPtr<VBucket> vb = engine->getVBucket(vb_);
884                 if (vb) {
885                     vb->checkpointManager.removeCursor(name_);
886                 }
887                 break;
888             }
889         case STREAM_TAKEOVER_WAIT:
890         case STREAM_PENDING:
891             break;
892         case STREAM_READING:
893             LOG(EXTENSION_LOG_WARNING,
894                 "ActiveStream::transitionState: newState can't be "
895                 "STREAM_READING!");
896             break;
897     }
898 }
899
900 size_t ActiveStream::getItemsRemaining() {
901     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
902
903     if (!vbucket || state_ == STREAM_DEAD) {
904         return 0;
905     }
906
907     uint64_t high_seqno = vbucket->getHighSeqno();
908
909     if (end_seqno_ < high_seqno) {
910         if (end_seqno_ > lastSentSeqno) {
911             return (end_seqno_ - lastSentSeqno);
912         }
913     } else {
914         if (high_seqno > lastSentSeqno) {
915             return (high_seqno - lastSentSeqno);
916         }
917     }
918
919     return 0;
920 }
921
922 ExtendedMetaData* ActiveStream::prepareExtendedMetaData(uint16_t vBucketId,
923                                                         uint8_t conflictResMode)
924 {
925     ExtendedMetaData *emd = NULL;
926     if (producer->isExtMetaDataEnabled()) {
927         RCPtr<VBucket> vb = engine->getVBucket(vBucketId);
928         if (vb && vb->isTimeSyncEnabled()) {
929             int64_t adjustedTime = gethrtime() + vb->getDriftCounter();
930             emd = new ExtendedMetaData(adjustedTime, conflictResMode);
931         } else {
932             emd = new ExtendedMetaData(conflictResMode);
933         }
934     }
935     return emd;
936 }
937
938 const char* ActiveStream::logHeader()
939 {
940     return producer->logHeader();
941 }
942
943 bool ActiveStream::isCurrentSnapshotCompleted() const
944 {
945     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
946     // An atomic read of vbucket state without acquiring the
947     // reader lock for state should suffice here.
948     if (vbucket && vbucket->getState() == vbucket_state_replica) {
949         if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
950             lastReadSeqno) {
951             return false;
952         }
953     }
954     return true;
955 }
956
957 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
958                                const std::string &name, uint32_t flags,
959                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
960                                uint64_t en_seqno, uint64_t vb_uuid,
961                                uint64_t snap_start_seqno,
962                                uint64_t snap_end_seqno)
963     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
964              snap_start_seqno, snap_end_seqno),
965       producer(p) {
966     LockHolder lh(streamMutex);
967     RCPtr<VBucket> vbucket = e->getVBucket(vb_);
968     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
969         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
970         transitionState(STREAM_DEAD);
971         itemsReady.store(true);
972     }
973
974     type_ = STREAM_NOTIFIER;
975
976     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
977         "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
978         en_seqno);
979 }
980
981 uint32_t NotifierStream::setDead(end_stream_status_t status) {
982     LockHolder lh(streamMutex);
983     if (state_ != STREAM_DEAD) {
984         transitionState(STREAM_DEAD);
985         if (status != END_STREAM_DISCONNECTED) {
986             pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
987             lh.unlock();
988             bool inverse = false;
989             if (itemsReady.compare_exchange_strong(inverse, true)) {
990                 producer->notifyStreamReady(vb_, true);
991             }
992         }
993     }
994     return 0;
995 }
996
997 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
998     LockHolder lh(streamMutex);
999     if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1000         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1001         transitionState(STREAM_DEAD);
1002         lh.unlock();
1003         bool inverse = false;
1004         if (itemsReady.compare_exchange_strong(inverse, true)) {
1005             producer->notifyStreamReady(vb_, true);
1006         }
1007     }
1008 }
1009
1010 DcpResponse* NotifierStream::next() {
1011     LockHolder lh(streamMutex);
1012
1013     if (readyQ.empty()) {
1014         itemsReady.store(false);
1015         return NULL;
1016     }
1017
1018     DcpResponse* response = readyQ.front();
1019     if (producer->bufferLogInsert(response->getMessageSize())) {
1020         popFromReadyQ();
1021     } else {
1022         response = NULL;
1023     }
1024
1025     return response;
1026 }
1027
1028 void NotifierStream::transitionState(stream_state_t newState) {
1029     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1030         producer->logHeader(), vb_, stateName(state_), stateName(newState));
1031
1032     if (state_ == newState) {
1033         return;
1034     }
1035
1036     switch (state_) {
1037         case STREAM_PENDING:
1038             cb_assert(newState == STREAM_DEAD);
1039             break;
1040         default:
1041             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1042                 "to %s", producer->logHeader(), vb_, stateName(state_),
1043                 stateName(newState));
1044             abort();
1045     }
1046
1047     state_ = newState;
1048 }
1049
1050 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1051                              const std::string &name, uint32_t flags,
1052                              uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1053                              uint64_t en_seqno, uint64_t vb_uuid,
1054                              uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1055                              uint64_t vb_high_seqno)
1056     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1057              snap_start_seqno, snap_end_seqno),
1058       engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1059       cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false) {
1060     LockHolder lh(streamMutex);
1061     pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1062                                   vb_uuid, snap_start_seqno, snap_end_seqno));
1063     itemsReady.store(true);
1064     type_ = STREAM_PASSIVE;
1065
1066     const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1067     LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Attempting to add %s stream"
1068         " with start seqno %" PRIu64 ", end seqno %" PRIu64 ","
1069         " vbucket uuid %" PRIu64 ", snap start seqno %" PRIu64 ","
1070         " snap end seqno %" PRIu64 ", and vb_high_seqno %" PRIu64 "",
1071         consumer->logHeader(), vb, type, st_seqno, en_seqno, vb_uuid,
1072         snap_start_seqno, snap_end_seqno, vb_high_seqno);
1073 }
1074
1075 PassiveStream::~PassiveStream() {
1076     uint32_t unackedBytes = clearBuffer();
1077     if (transitionState(STREAM_DEAD)) {
1078         // Destructed a "live" stream, log it.
1079         LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Destructing stream."
1080             " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
1081             consumer->logHeader(), vb_, last_seqno,
1082             unackedBytes);
1083     }
1084 }
1085
1086 uint32_t PassiveStream::setDead_UNLOCKED(end_stream_status_t status,
1087                                          LockHolder *slh) {
1088     transitionState(STREAM_DEAD);
1089     slh->unlock();
1090     uint32_t unackedBytes = clearBuffer();
1091     LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Setting stream to dead"
1092         " state, last_seqno is %" PRIu64 ", unackedBytes is %" PRIu32 ","
1093         " status is %s",
1094         consumer->logHeader(), vb_, last_seqno, unackedBytes,
1095         getEndStreamStatusStr(status));
1096     return unackedBytes;
1097 }
1098
1099 uint32_t PassiveStream::setDead(end_stream_status_t status) {
1100     LockHolder lh(streamMutex);
1101     return setDead_UNLOCKED(status, &lh);
1102 }
1103
1104 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1105     LockHolder lh(streamMutex);
1106     if (state_ == STREAM_PENDING) {
1107         if (status == ENGINE_SUCCESS) {
1108             transitionState(STREAM_READING);
1109         } else {
1110             transitionState(STREAM_DEAD);
1111         }
1112         pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1113         lh.unlock();
1114         bool inverse = false;
1115         if (itemsReady.compare_exchange_strong(inverse, true)) {
1116             consumer->notifyStreamReady(vb_);
1117         }
1118     }
1119 }
1120
1121 void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1122                                     uint32_t new_opaque,
1123                                     uint64_t start_seqno) {
1124     vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1125
1126     snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
1127     if (info.range.end == info.start) {
1128         info.range.start = info.start;
1129     }
1130
1131     snap_start_seqno_ = info.range.start;
1132     start_seqno_ = info.start;
1133     snap_end_seqno_ = info.range.end;
1134
1135     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to reconnect stream "
1136         "with opaque %u, start seq no %llu, end seq no %llu, snap start seqno "
1137         "%llu, and snap end seqno %llu", consumer->logHeader(), vb_, new_opaque,
1138         start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
1139
1140     LockHolder lh(streamMutex);
1141     last_seqno = start_seqno;
1142     pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1143                                   end_seqno_, vb_uuid_, snap_start_seqno_,
1144                                   snap_end_seqno_));
1145     lh.unlock();
1146     bool inverse = false;
1147     if (itemsReady.compare_exchange_strong(inverse, true)) {
1148         consumer->notifyStreamReady(vb_);
1149     }
1150 }
1151
1152 ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1153     if(nullptr == resp) {
1154         return ENGINE_EINVAL;
1155     }
1156
1157     LockHolder lh(buffer.bufMutex);
1158
1159     if (state_ == STREAM_DEAD) {
1160         delete resp;
1161         return ENGINE_KEY_ENOENT;
1162     }
1163
1164     switch (resp->getEvent()) {
1165         case DCP_MUTATION:
1166         case DCP_DELETION:
1167         case DCP_EXPIRATION:
1168         {
1169             MutationResponse* m = static_cast<MutationResponse*>(resp);
1170             uint64_t bySeqno = m->getBySeqno();
1171             if (bySeqno <= last_seqno) {
1172                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous (out of "
1173                     "sequence) mutation received, with opaque: %u, its "
1174                     "seqno (%llu) is not greater than last received seqno "
1175                     "(%llu); Dropping mutation!", consumer->logHeader(),
1176                     vb_, opaque_, bySeqno, last_seqno);
1177                 delete m;
1178                 return ENGINE_ERANGE;
1179             }
1180             last_seqno = bySeqno;
1181             break;
1182         }
1183         case DCP_SNAPSHOT_MARKER:
1184         {
1185             SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1186             uint64_t snapStart = s->getStartSeqno();
1187             uint64_t snapEnd = s->getEndSeqno();
1188             if (snapStart < last_seqno && snapEnd <= last_seqno) {
1189                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous snapshot "
1190                     "marker received, with opaque: %u, its start (%llu), and"
1191                     "end (%llu) are less than last received seqno (%llu); "
1192                     "Dropping marker!", consumer->logHeader(), vb_, opaque_,
1193                     snapStart, snapEnd, last_seqno);
1194                 delete s;
1195                 return ENGINE_ERANGE;
1196             }
1197             break;
1198         }
1199         case DCP_SET_VBUCKET:
1200         case DCP_STREAM_END:
1201         {
1202             /* No validations necessary */
1203             break;
1204         }
1205         default:
1206         {
1207             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unknown DCP op received: %d;"
1208                 " Disconnecting connection..",
1209                 consumer->logHeader(), vb_, resp->getEvent());
1210             return ENGINE_DISCONNECT;
1211         }
1212     }
1213     buffer.messages.push(resp);
1214     buffer.items++;
1215     buffer.bytes += resp->getMessageSize();
1216
1217     return ENGINE_SUCCESS;
1218 }
1219
1220 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
1221                                                              size_t batchSize) {
1222     LockHolder lh(buffer.bufMutex);
1223     uint32_t count = 0;
1224     uint32_t message_bytes = 0;
1225     uint32_t total_bytes_processed = 0;
1226     bool failed = false;
1227     if (buffer.messages.empty()) {
1228         return all_processed;
1229     }
1230
1231     while (count < batchSize && !buffer.messages.empty()) {
1232         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1233         DcpResponse *response = buffer.messages.front();
1234         message_bytes = response->getMessageSize();
1235
1236         switch (response->getEvent()) {
1237             case DCP_MUTATION:
1238                 ret = processMutation(static_cast<MutationResponse*>(response));
1239                 break;
1240             case DCP_DELETION:
1241             case DCP_EXPIRATION:
1242                 ret = processDeletion(static_cast<MutationResponse*>(response));
1243                 break;
1244             case DCP_SNAPSHOT_MARKER:
1245                 processMarker(static_cast<SnapshotMarker*>(response));
1246                 break;
1247             case DCP_SET_VBUCKET:
1248                 processSetVBucketState(static_cast<SetVBucketState*>(response));
1249                 break;
1250             case DCP_STREAM_END:
1251                 transitionState(STREAM_DEAD);
1252                 break;
1253             default:
1254                 LOG(EXTENSION_LOG_WARNING,
1255                     "PassiveStream::processBufferedMessages: "
1256                     "(vb %d) PassiveStream failing "
1257                     "unknown message type %d",
1258                     vb_, response->getEvent());
1259                 failed = true;
1260         }
1261
1262         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1263             failed = true;
1264             break;
1265         }
1266
1267         delete response;
1268         buffer.messages.pop();
1269         buffer.items--;
1270         buffer.bytes -= message_bytes;
1271         count++;
1272         if (ret != ENGINE_ERANGE) {
1273             total_bytes_processed += message_bytes;
1274         }
1275     }
1276
1277     processed_bytes = total_bytes_processed;
1278
1279     if (failed) {
1280         return cannot_process;
1281     }
1282
1283     return all_processed;
1284 }
1285
1286 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1287     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1288     if (!vb) {
1289         return ENGINE_NOT_MY_VBUCKET;
1290     }
1291
1292     if (mutation->getBySeqno() > cur_snapshot_end) {
1293         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous mutation [sequence "
1294             "number (%llu) greater than current snapshot end seqno (%llu)] "
1295             "being processed; Dropping the mutation!", consumer->logHeader(),
1296             vb_, mutation->getBySeqno(), cur_snapshot_end);
1297         return ENGINE_ERANGE;
1298     }
1299
1300     // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
1301     // receive anything without a valid CAS, however given that versions without
1302     // this check may send us "bad" CAS values, we should regenerate them (which
1303     // is better than rejecting the data entirely).
1304     if (!Item::isValidCas(mutation->getItem()->getCas())) {
1305         LOG(EXTENSION_LOG_WARNING,
1306             "%s Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
1307             ", seqno:%" PRId64 "}. Regenerating new CAS",
1308             consumer->logHeader(),
1309             mutation->getItem()->getCas(), vb_,
1310             mutation->getItem()->getBySeqno());
1311         mutation->getItem()->setCas();
1312     }
1313
1314     ENGINE_ERROR_CODE ret;
1315     if (vb->isBackfillPhase()) {
1316         ret = engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1317                                                     INITIAL_NRU_VALUE,
1318                                                     false,
1319                                                     mutation->getExtMetaData());
1320     } else {
1321         ret = engine->getEpStore()->setWithMeta(*mutation->getItem(), 0, NULL,
1322                                                 consumer->getCookie(), true,
1323                                                 true, INITIAL_NRU_VALUE, false,
1324                                                 mutation->getExtMetaData(),
1325                                                 true);
1326     }
1327
1328     // We should probably handle these error codes in a better way, but since
1329     // the producer side doesn't do anything with them anyways let's just log
1330     // them for now until we come up with a better solution.
1331     if (ret != ENGINE_SUCCESS) {
1332         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1333             "process  mutation", consumer->logHeader(), ret);
1334     } else {
1335         handleSnapshotEnd(vb, mutation->getBySeqno());
1336     }
1337
1338     return ret;
1339 }
1340
1341 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1342     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1343     if (!vb) {
1344         return ENGINE_NOT_MY_VBUCKET;
1345     }
1346
1347     if (deletion->getBySeqno() > cur_snapshot_end) {
1348         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous deletion [sequence "
1349             "number (%llu) greater than current snapshot end seqno (%llu)] "
1350             "being processed; Dropping the deletion!", consumer->logHeader(),
1351             vb_, deletion->getBySeqno(), cur_snapshot_end);
1352         return ENGINE_ERANGE;
1353     }
1354
1355     uint64_t delCas = 0;
1356     ENGINE_ERROR_CODE ret;
1357     ItemMetaData meta = deletion->getItem()->getMetaData();
1358
1359     // MB-17517: Check for the incoming item's CAS validity.
1360     if (!Item::isValidCas(meta.cas)) {
1361         LOG(EXTENSION_LOG_WARNING,
1362             "%s Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
1363             ", seqno:%" PRId64 "}. Regenerating new CAS",
1364             consumer->logHeader(), meta.cas, vb_, deletion->getBySeqno());
1365         meta.cas = Item::nextCas();
1366     }
1367
1368     ret = engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1369                                                &delCas, NULL, deletion->getVBucket(),
1370                                                consumer->getCookie(), true,
1371                                                &meta, vb->isBackfillPhase(),
1372                                                false, deletion->getBySeqno(),
1373                                                deletion->getExtMetaData(),
1374                                                true);
1375     if (ret == ENGINE_KEY_ENOENT) {
1376         ret = ENGINE_SUCCESS;
1377     }
1378
1379     // We should probably handle these error codes in a better way, but since
1380     // the producer side doesn't do anything with them anyways let's just log
1381     // them for now until we come up with a better solution.
1382     if (ret != ENGINE_SUCCESS) {
1383         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1384             "process  deletion", consumer->logHeader(), ret);
1385     } else {
1386         handleSnapshotEnd(vb, deletion->getBySeqno());
1387     }
1388
1389     return ret;
1390 }
1391
1392 void PassiveStream::processMarker(SnapshotMarker* marker) {
1393     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1394
1395     cur_snapshot_start = marker->getStartSeqno();
1396     cur_snapshot_end = marker->getEndSeqno();
1397     cur_snapshot_type = (marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory;
1398
1399     if (vb) {
1400         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1401             vb->setBackfillPhase(true);
1402             vb->checkpointManager.setBackfillPhase(cur_snapshot_start,
1403                                                    cur_snapshot_end);
1404         } else {
1405             if (marker->getFlags() & MARKER_FLAG_CHK ||
1406                 vb->checkpointManager.getOpenCheckpointId() == 0) {
1407                 vb->checkpointManager.createSnapshot(cur_snapshot_start,
1408                                                      cur_snapshot_end);
1409             } else {
1410                 vb->checkpointManager.updateCurrentSnapshotEnd(cur_snapshot_end);
1411             }
1412             vb->setBackfillPhase(false);
1413         }
1414
1415         if (marker->getFlags() & MARKER_FLAG_ACK) {
1416             cur_snapshot_ack = true;
1417         }
1418     }
1419 }
1420
1421 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1422     engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1423
1424     LockHolder lh (streamMutex);
1425     pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1426     lh.unlock();
1427     bool inverse = false;
1428     if (itemsReady.compare_exchange_strong(inverse, true)) {
1429         consumer->notifyStreamReady(vb_);
1430     }
1431 }
1432
1433 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1434     if (byseqno == cur_snapshot_end) {
1435         if (cur_snapshot_type == disk && vb->isBackfillPhase()) {
1436             vb->setBackfillPhase(false);
1437             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1438             vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1439         } else {
1440             double maxSize = static_cast<double>(engine->getEpStats().getMaxDataSize());
1441             double mem_threshold = StoredValue::getMutationMemThreshold();
1442             double mem_used = static_cast<double>(engine->getEpStats().getTotalMemoryUsed());
1443             if (maxSize * mem_threshold < mem_used) {
1444                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1445                 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1446             }
1447         }
1448
1449         if (cur_snapshot_ack) {
1450             LockHolder lh(streamMutex);
1451             pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1452             lh.unlock();
1453             bool inverse = false;
1454             if (itemsReady.compare_exchange_strong(inverse, true)) {
1455                 consumer->notifyStreamReady(vb_);
1456             }
1457             cur_snapshot_ack = false;
1458         }
1459         cur_snapshot_type = none;
1460     }
1461 }
1462
1463 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1464     Stream::addStats(add_stat, c);
1465
1466     const int bsize = 1024;
1467     char buf[bsize];
1468     snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
1469     add_casted_stat(buf, buffer.items, add_stat, c);
1470     snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
1471     add_casted_stat(buf, buffer.bytes, add_stat, c);
1472     snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
1473     add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1474     snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
1475     add_casted_stat(buf, last_seqno, add_stat, c);
1476     snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
1477     add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1478
1479     snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
1480     add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type), add_stat, c);
1481
1482     if (cur_snapshot_type != none) {
1483         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
1484         add_casted_stat(buf, cur_snapshot_start, add_stat, c);
1485         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
1486         add_casted_stat(buf, cur_snapshot_end, add_stat, c);
1487     }
1488 }
1489
1490 DcpResponse* PassiveStream::next() {
1491     LockHolder lh(streamMutex);
1492
1493     if (readyQ.empty()) {
1494         itemsReady.store(false);
1495         return NULL;
1496     }
1497
1498     DcpResponse* response = readyQ.front();
1499     popFromReadyQ();
1500     return response;
1501 }
1502
1503 uint32_t PassiveStream::clearBuffer() {
1504     LockHolder lh(buffer.bufMutex);
1505     uint32_t unackedBytes = buffer.bytes;
1506
1507     while (!buffer.messages.empty()) {
1508         DcpResponse* resp = buffer.messages.front();
1509         buffer.messages.pop();
1510         delete resp;
1511     }
1512
1513     buffer.bytes = 0;
1514     buffer.items = 0;
1515     return unackedBytes;
1516 }
1517
1518 bool PassiveStream::transitionState(stream_state_t newState) {
1519     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1520         consumer->logHeader(), vb_, stateName(state_), stateName(newState));
1521
1522     if (state_ == newState) {
1523         return false;
1524     }
1525
1526     switch (state_) {
1527         case STREAM_PENDING:
1528             cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
1529             break;
1530         case STREAM_READING:
1531             cb_assert(newState == STREAM_PENDING || newState == STREAM_DEAD);
1532             break;
1533         default:
1534             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1535                 "to %s", consumer->logHeader(), vb_, stateName(state_),
1536                 stateName(newState));
1537             abort();
1538     }
1539
1540     state_ = newState;
1541     return true;
1542 }
1543
1544 const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
1545 {
1546     switch (status) {
1547         case END_STREAM_CLOSED:
1548             return "The stream closed due to a close stream message";
1549         case END_STREAM_DISCONNECTED:
1550             return "The stream closed early because the conn was disconnected";
1551         default:
1552             break;
1553     }
1554     return "Status unknown; this should not happen";
1555 }