Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / dcp-stream.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "kvstore.h"
23 #include "statwriter.h"
24 #include "dcp-backfill-manager.h"
25 #include "dcp-backfill.h"
26 #include "dcp-consumer.h"
27 #include "dcp-producer.h"
28 #include "dcp-response.h"
29 #include "dcp-stream.h"
30
31 static const char* snapshotTypeToString(snapshot_type_t type) {
32     static const char * const snapshotTypes[] = { "none", "disk", "memory" };
33     cb_assert(type >= none && type <= memory);
34     return snapshotTypes[type];
35 }
36
37 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
38
39 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
40                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
41                uint64_t vb_uuid, uint64_t snap_start_seqno,
42                uint64_t snap_end_seqno)
43     : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
44       start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
45       snap_start_seqno_(snap_start_seqno),
46       snap_end_seqno_(snap_end_seqno),
47       state_(STREAM_PENDING), itemsReady(false), readyQueueMemory(0) {
48 }
49
50 Stream::~Stream() {
51     // NB: reusing the "unlocked" method without a lock because we're
52     // destructing and should not take any locks.
53     clear_UNLOCKED();
54 }
55
56 void Stream::clear_UNLOCKED() {
57     while (!readyQ.empty()) {
58         DcpResponse* resp = readyQ.front();
59         popFromReadyQ();
60         delete resp;
61     }
62 }
63
64 void Stream::pushToReadyQ(DcpResponse* resp)
65 {
66     if (resp) {
67         readyQ.push(resp);
68         readyQueueMemory.fetch_add(resp->getMessageSize(),
69                                    std::memory_order_relaxed);
70     }
71 }
72
73 void Stream::popFromReadyQ(void)
74 {
75     if (!readyQ.empty()) {
76         uint32_t respSize = readyQ.front()->getMessageSize();
77         readyQ.pop();
78         /* Decrement the readyQ size */
79         if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
80             readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
81         } else {
82             LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
83                 "underflow, likely wrong stat calculation! curr size: %llu;"
84                 "new size: %d", name_.c_str(), getVBucket(),
85                 readyQueueMemory.load(std::memory_order_relaxed), respSize);
86             readyQueueMemory.store(0, std::memory_order_relaxed);
87         }
88     }
89 }
90
91 uint64_t Stream::getReadyQueueMemory() {
92     return readyQueueMemory.load(std::memory_order_relaxed);
93 }
94
95 const char * Stream::stateName(stream_state_t st) const {
96     static const char * const stateNames[] = {
97         "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
98         "reading", "dead"
99     };
100     cb_assert(st >= STREAM_PENDING && st <= STREAM_DEAD);
101     return stateNames[st];
102 }
103
104 void Stream::addStats(ADD_STAT add_stat, const void *c) {
105     const int bsize = 1024;
106     char buffer[bsize];
107     snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_);
108     add_casted_stat(buffer, flags_, add_stat, c);
109     snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_);
110     add_casted_stat(buffer, opaque_, add_stat, c);
111     snprintf(buffer, bsize, "%s:stream_%d_start_seqno", name_.c_str(), vb_);
112     add_casted_stat(buffer, start_seqno_, add_stat, c);
113     snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(), vb_);
114     add_casted_stat(buffer, end_seqno_, add_stat, c);
115     snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(), vb_);
116     add_casted_stat(buffer, vb_uuid_, add_stat, c);
117     snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno", name_.c_str(), vb_);
118     add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
119     snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno", name_.c_str(), vb_);
120     add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
121     snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_);
122     add_casted_stat(buffer, stateName(state_), add_stat, c);
123 }
124
125 ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
126                            const std::string &n, uint32_t flags,
127                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
128                            uint64_t en_seqno, uint64_t vb_uuid,
129                            uint64_t snap_start_seqno, uint64_t snap_end_seqno)
130     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
131               snap_start_seqno, snap_end_seqno),
132        lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
133        takeoverState(vbucket_state_pending), backfillRemaining(0),
134        itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
135        engine(e), producer(p), isBackfillTaskRunning(false),
136        lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
137
138     const char* type = "";
139     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
140         type = "takeover ";
141         end_seqno_ = dcpMaxSeqno;
142     }
143
144     RCPtr<VBucket> vbucket = engine->getVBucket(vb);
145     if (vbucket) {
146         ReaderLockHolder rlh(vbucket->getStateLock());
147         if (vbucket->getState() == vbucket_state_replica) {
148             snapshot_info_t info = vbucket->checkpointManager.getSnapshotInfo();
149             if (info.range.end > en_seqno) {
150                 end_seqno_ = info.range.end;
151             }
152         }
153     }
154
155     LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Creating %sstream "
156         "with start seqno %" PRIu64 " and end seqno %" PRIu64"",
157         producer->logHeader(), vb, type, st_seqno, en_seqno);
158
159     backfillItems.memory = 0;
160     backfillItems.disk = 0;
161     backfillItems.sent = 0;
162
163     type_ = STREAM_ACTIVE;
164
165     bufferedBackfill.bytes = 0;
166     bufferedBackfill.items = 0;
167
168     if (start_seqno_ >= end_seqno_) {
169         endStream(END_STREAM_OK);
170         itemsReady.store(true);
171     } else {
172         LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") %sstream created!",
173             producer->logHeader(), vb, type);
174     }
175 }
176
177 ActiveStream::~ActiveStream() {
178     transitionState(STREAM_DEAD);
179 }
180
181 DcpResponse* ActiveStream::next() {
182     LockHolder lh(streamMutex);
183
184     stream_state_t initState = state_;
185
186     DcpResponse* response = NULL;
187
188     switch (state_) {
189         case STREAM_PENDING:
190             break;
191         case STREAM_BACKFILLING:
192             response = backfillPhase();
193             break;
194         case STREAM_IN_MEMORY:
195             response = inMemoryPhase();
196             break;
197         case STREAM_TAKEOVER_SEND:
198             response = takeoverSendPhase();
199             break;
200         case STREAM_TAKEOVER_WAIT:
201             response = takeoverWaitPhase();
202             break;
203         case STREAM_DEAD:
204             response = deadPhase();
205             break;
206         default:
207             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
208                 producer->logHeader(), vb_, stateName(state_));
209             abort();
210     }
211
212     if (state_ != STREAM_DEAD && initState != state_ && !response) {
213         lh.unlock();
214         return next();
215     }
216
217     itemsReady.store(response ? true : false);
218     return response;
219 }
220
221 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
222     LockHolder lh(streamMutex);
223     uint64_t chkCursorSeqno = endSeqno;
224
225     if (state_ != STREAM_BACKFILLING) {
226         return;
227     }
228
229     startSeqno = std::min(snap_start_seqno_, startSeqno);
230     firstMarkerSent = true;
231
232     RCPtr<VBucket> vb = engine->getVBucket(vb_);
233     // An atomic read of vbucket state without acquiring the
234     // reader lock for state should suffice here.
235     if (vb && vb->getState() == vbucket_state_replica) {
236         if (end_seqno_ > endSeqno) {
237             /* We possibly have items in the open checkpoint
238                (incomplete snapshot) */
239             snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
240             LOG(EXTENSION_LOG_WARNING,
241                 "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
242                 "replica vbucket, backfill start seqno %" PRIu64 ", "
243                 "backfill end seqno %" PRIu64 ", "
244                 "snapshot end seqno after merge %" PRIu64,
245                 vb_, startSeqno, endSeqno, info.range.end);
246             endSeqno = info.range.end;
247         }
248     }
249
250     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
251         "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
252         endSeqno);
253     pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
254                                     MARKER_FLAG_DISK));
255     lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
256
257     if (!vb) {
258         endStream(END_STREAM_STATE);
259     } else {
260         // Only re-register the cursor if we still need to get memory snapshots
261         CursorRegResult result =
262             vb->checkpointManager.registerCursorBySeqno(name_, chkCursorSeqno);
263         curChkSeqno = result.first;
264     }
265
266     lh.unlock();
267     bool inverse = false;
268     if (itemsReady.compare_exchange_strong(inverse, true)) {
269         producer->notifyStreamReady(vb_, false);
270     }
271 }
272
273 bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source) {
274     LockHolder lh(streamMutex);
275     if (state_ == STREAM_BACKFILLING) {
276         if (!producer->getBackfillManager()->bytesRead(itm->size())) {
277             delete itm;
278             return false;
279         }
280
281         bufferedBackfill.bytes.fetch_add(itm->size());
282         bufferedBackfill.items++;
283
284         pushToReadyQ(new MutationResponse(itm, opaque_,
285                           prepareExtendedMetaData(itm->getVBucketId(),
286                                                   itm->getConflictResMode())));
287         lastReadSeqno = itm->getBySeqno();
288         lh.unlock();
289         bool inverse = false;
290         if (itemsReady.compare_exchange_strong(inverse, true)) {
291             producer->notifyStreamReady(vb_, false);
292         }
293
294         if (backfill_source == BACKFILL_FROM_MEMORY) {
295             backfillItems.memory++;
296         } else {
297             backfillItems.disk++;
298         }
299     } else {
300         delete itm;
301     }
302
303     return true;
304 }
305
306 void ActiveStream::completeBackfill() {
307     {
308         LockHolder lh(streamMutex);
309         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
310             " from disk %d from memory, last seqno read: %llu",
311             producer->logHeader(), vb_, backfillItems.disk.load(),
312             backfillItems.memory.load(), lastReadSeqno.load());
313     }
314
315     isBackfillTaskRunning.store(false);
316     bool inverse = false;
317     if (itemsReady.compare_exchange_strong(inverse, true)) {
318         producer->notifyStreamReady(vb_, false);
319     }
320 }
321
322 void ActiveStream::snapshotMarkerAckReceived() {
323     bool inverse = false;
324     if (--waitForSnapshot == 0 &&
325         itemsReady.compare_exchange_strong(inverse, true)) {
326         producer->notifyStreamReady(vb_, true);
327     }
328 }
329
330 void ActiveStream::setVBucketStateAckRecieved() {
331     LockHolder lh(streamMutex);
332     if (state_ == STREAM_TAKEOVER_WAIT) {
333         if (takeoverState == vbucket_state_pending) {
334             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
335                 "vbucket state to pending message", producer->logHeader(), vb_);
336
337             takeoverState = vbucket_state_active;
338             transitionState(STREAM_TAKEOVER_SEND);
339             lh.unlock();
340
341             engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
342                                                   false, false);
343             RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
344             LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Vbucket marked as "
345                 "dead, last sent seqno: %" PRIu64 ", high seqno: %" PRIu64 "",
346                 producer->logHeader(), vb_, lastSentSeqno.load(),
347                 vbucket->getHighSeqno());
348         } else {
349             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
350                 "vbucket state to active message", producer->logHeader(), vb_);
351             endStream(END_STREAM_OK);
352             lh.unlock();
353         }
354
355         bool inverse = false;
356         if (itemsReady.compare_exchange_strong(inverse, true)) {
357             producer->notifyStreamReady(vb_, true);
358         }
359     } else {
360         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
361             "op on stream '%s' state '%s'", producer->logHeader(), vb_,
362             name_.c_str(), stateName(state_));
363     }
364
365 }
366
367 DcpResponse* ActiveStream::backfillPhase() {
368     DcpResponse* resp = nextQueuedItem();
369
370     if (resp && (resp->getEvent() == DCP_MUTATION ||
371          resp->getEvent() == DCP_DELETION ||
372          resp->getEvent() == DCP_EXPIRATION)) {
373         MutationResponse* m = static_cast<MutationResponse*>(resp);
374         producer->getBackfillManager()->bytesSent(m->getItem()->size());
375         bufferedBackfill.bytes.fetch_sub(m->getItem()->size());
376         bufferedBackfill.items--;
377         if (backfillRemaining > 0) {
378             backfillRemaining--;
379         }
380     }
381
382     if (!isBackfillTaskRunning && readyQ.empty()) {
383         backfillRemaining.store(0, std::memory_order_relaxed);
384         if (lastReadSeqno.load() >= end_seqno_) {
385             endStream(END_STREAM_OK);
386         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
387             transitionState(STREAM_TAKEOVER_SEND);
388         } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
389             endStream(END_STREAM_OK);
390         } else {
391             transitionState(STREAM_IN_MEMORY);
392         }
393
394         if (!resp) {
395             resp = nextQueuedItem();
396         }
397     }
398
399     return resp;
400 }
401
402 DcpResponse* ActiveStream::inMemoryPhase() {
403     if (lastSentSeqno.load() >= end_seqno_) {
404         endStream(END_STREAM_OK);
405     } else if (readyQ.empty()) {
406         if (nextCheckpointItem()) {
407             return NULL;
408         }
409     }
410
411     return nextQueuedItem();
412 }
413
414 DcpResponse* ActiveStream::takeoverSendPhase() {
415     if (!readyQ.empty()) {
416         return nextQueuedItem();
417     } else {
418         if (nextCheckpointItem()) {
419             return NULL;
420         }
421     }
422
423     if (waitForSnapshot != 0) {
424         return NULL;
425     }
426     DcpResponse* resp = NULL;
427     if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
428         resp = new SetVBucketState(opaque_, vb_, takeoverState);
429         transitionState(STREAM_TAKEOVER_WAIT);
430     }
431     return resp;
432 }
433
434 DcpResponse* ActiveStream::takeoverWaitPhase() {
435     return nextQueuedItem();
436 }
437
438 DcpResponse* ActiveStream::deadPhase() {
439     return nextQueuedItem();
440 }
441
442 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
443     Stream::addStats(add_stat, c);
444
445     const int bsize = 1024;
446     char buffer[bsize];
447     snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
448              name_.c_str(), vb_);
449     add_casted_stat(buffer, backfillItems.disk, add_stat, c);
450     snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
451              name_.c_str(), vb_);
452     add_casted_stat(buffer, backfillItems.memory, add_stat, c);
453     snprintf(buffer, bsize, "%s:stream_%d_backfill_sent", name_.c_str(), vb_);
454     add_casted_stat(buffer, backfillItems.sent, add_stat, c);
455     snprintf(buffer, bsize, "%s:stream_%d_memory_phase", name_.c_str(), vb_);
456     add_casted_stat(buffer, itemsFromMemoryPhase, add_stat, c);
457     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
458     add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
459     snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
460              name_.c_str(), vb_);
461     add_casted_stat(buffer,
462                     lastSentSnapEndSeqno.load(std::memory_order_relaxed),
463                     add_stat, c);
464     snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
465     add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
466     snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
467     add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
468     snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
469     add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat, c);
470     snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes", name_.c_str(), vb_);
471     add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
472     snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items", name_.c_str(), vb_);
473     add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
474 }
475
476 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
477     LockHolder lh(streamMutex);
478
479     RCPtr<VBucket> vb = engine->getVBucket(vb_);
480     add_casted_stat("name", name_, add_stat, cookie);
481     if (!vb || state_ == STREAM_DEAD) {
482         add_casted_stat("status", "completed", add_stat, cookie);
483         add_casted_stat("estimate", 0, add_stat, cookie);
484         add_casted_stat("backfillRemaining", 0, add_stat, cookie);
485         add_casted_stat("estimate", 0, add_stat, cookie);
486         return;
487     }
488
489     size_t total = backfillRemaining;
490     if (state_ == STREAM_BACKFILLING) {
491         add_casted_stat("status", "backfilling", add_stat, cookie);
492     } else {
493         add_casted_stat("status", "in-memory", add_stat, cookie);
494     }
495     add_casted_stat("backfillRemaining", backfillRemaining, add_stat, cookie);
496
497     item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
498     size_t vb_items = vb->getNumItems(iep);
499     size_t chk_items = vb_items > 0 ?
500                 vb->checkpointManager.getNumItemsForCursor(name_) : 0;
501     size_t del_items = engine->getEpStore()->getRWUnderlying(vb_)->
502                                                     getNumPersistedDeletes(vb_);
503
504     if (end_seqno_ < curChkSeqno) {
505         chk_items = 0;
506     } else if ((end_seqno_ - curChkSeqno) < chk_items) {
507         chk_items = end_seqno_ - curChkSeqno + 1;
508     }
509     total += chk_items;
510
511     add_casted_stat("estimate", total, add_stat, cookie);
512     add_casted_stat("chk_items", chk_items, add_stat, cookie);
513     add_casted_stat("vb_items", vb_items, add_stat, cookie);
514     add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
515 }
516
517 DcpResponse* ActiveStream::nextQueuedItem() {
518     if (!readyQ.empty()) {
519         DcpResponse* response = readyQ.front();
520         if (producer->bufferLogInsert(response->getMessageSize())) {
521             if (response->getEvent() == DCP_MUTATION ||
522                 response->getEvent() == DCP_DELETION ||
523                 response->getEvent() == DCP_EXPIRATION) {
524                 lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
525
526                 if (state_ == STREAM_BACKFILLING) {
527                     backfillItems.sent++;
528                 } else {
529                     itemsFromMemoryPhase++;
530                 }
531             }
532             popFromReadyQ();
533             return response;
534         }
535     }
536     return NULL;
537 }
538
539 bool ActiveStream::nextCheckpointItem() {
540     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
541     if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
542         // schedule this stream to build the next checkpoint
543         producer->scheduleCheckpointProcessorTask(this);
544         return true;
545     } else if (chkptItemsExtractionInProgress) {
546         return true;
547     }
548     return false;
549 }
550
551 bool ActiveStreamCheckpointProcessorTask::run() {
552     if (engine->getEpStats().isShutdown) {
553         return false;
554     }
555
556     // Setup that we will sleep forever when done.
557     snooze(INT_MAX);
558
559     // Clear the notfification flag
560     notified.store(false);
561
562     size_t iterations = 0;
563     do {
564         stream_t nextStream = queuePop();
565         ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
566
567         if (stream) {
568             stream->nextCheckpointItemTask();
569         } else {
570             break;
571         }
572         iterations++;
573     } while(!queueEmpty()
574             && iterations < iterationsBeforeYield);
575
576     // Now check if we were re-notified or there are still checkpoints
577     bool expected = true;
578     if (notified.compare_exchange_strong(expected, false)
579         || !queueEmpty()) {
580         // snooze for 0, essentially yielding and allowing other tasks a go
581         snooze(0.0);
582     }
583
584     return true;
585 }
586
587 void ActiveStreamCheckpointProcessorTask::wakeup() {
588     ExecutorPool::get()->wake(getId());
589 }
590
591 void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
592     pushUnique(stream);
593
594     bool expected = false;
595     if (notified.compare_exchange_strong(expected, true)) {
596         wakeup();
597     }
598 }
599
600 void ActiveStreamCheckpointProcessorTask::clearQueues() {
601     LockHolder lh(workQueueLock);
602     while (!queue.empty()) {
603         queue.pop();
604     }
605     queuedVbuckets.clear();
606 }
607
608 void ActiveStream::nextCheckpointItemTask() {
609     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
610     if (vbucket) {
611         std::vector<queued_item> items;
612         getOutstandingItems(vbucket, items);
613         processItems(items);
614     } else {
615         /* The entity deleting the vbucket must set stream to dead,
616            calling setDead(END_STREAM_STATE) will cause deadlock because
617            it will try to grab streamMutex which is already acquired at this
618            point here */
619         return;
620     }
621 }
622
623 void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
624                                        std::vector<queued_item> &items) {
625     // Commencing item processing - set guard flag.
626     chkptItemsExtractionInProgress.store(true);
627
628     vb->checkpointManager.getAllItemsForCursor(name_, items);
629     if (vb->checkpointManager.getNumCheckpoints() > 1) {
630         engine->getEpStore()->wakeUpCheckpointRemover();
631     }
632 }
633
634
635 void ActiveStream::processItems(std::vector<queued_item>& items) {
636     if (!items.empty()) {
637         bool mark = false;
638         if (items.front()->getOperation() == queue_op_checkpoint_start) {
639             mark = true;
640         }
641
642         std::deque<MutationResponse*> mutations;
643         std::vector<queued_item>::iterator itr = items.begin();
644         for (; itr != items.end(); ++itr) {
645             queued_item& qi = *itr;
646
647             if (qi->getOperation() == queue_op_set ||
648                 qi->getOperation() == queue_op_del) {
649                 curChkSeqno = qi->getBySeqno();
650                 lastReadSeqno = qi->getBySeqno();
651
652                 mutations.push_back(new MutationResponse(qi, opaque_,
653                             prepareExtendedMetaData(qi->getVBucketId(),
654                                                     qi->getConflictResMode())));
655             } else if (qi->getOperation() == queue_op_checkpoint_start) {
656                 cb_assert(mutations.empty());
657                 mark = true;
658             }
659         }
660
661         if (mutations.empty()) {
662             // If we only got checkpoint start or ends check to see if there are
663             // any more snapshots before pausing the stream.
664             nextCheckpointItemTask();
665         } else {
666             snapshot(mutations, mark);
667         }
668     }
669
670     // Completed item processing - clear guard flag and notify producer.
671     chkptItemsExtractionInProgress.store(false);
672     producer->notifyStreamReady(vb_, true);
673 }
674
675 void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
676     if (items.empty()) {
677         return;
678     }
679
680     LockHolder lh(streamMutex);
681
682     if (isCurrentSnapshotCompleted()) {
683         uint32_t flags = MARKER_FLAG_MEMORY;
684         uint64_t snapStart = items.front()->getBySeqno();
685         uint64_t snapEnd = items.back()->getBySeqno();
686
687         if (mark) {
688             flags |= MARKER_FLAG_CHK;
689         }
690
691         if (state_ == STREAM_TAKEOVER_SEND) {
692             waitForSnapshot++;
693             flags |= MARKER_FLAG_ACK;
694         }
695
696         if (!firstMarkerSent) {
697             snapStart = std::min(snap_start_seqno_, snapStart);
698             firstMarkerSent = true;
699         }
700         pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
701                                         flags));
702         lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
703     }
704
705     std::deque<MutationResponse*>::iterator itemItr;
706     for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
707         pushToReadyQ(*itemItr);
708     }
709 }
710
711 uint32_t ActiveStream::setDead(end_stream_status_t status) {
712     {
713         LockHolder lh(streamMutex);
714         endStream(status);
715     }
716
717     bool inverse = false;
718     if (status != END_STREAM_DISCONNECTED &&
719         itemsReady.compare_exchange_strong(inverse, true)) {
720         producer->notifyStreamReady(vb_, true);
721     }
722     return 0;
723 }
724
725 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
726     if (state_ != STREAM_DEAD) {
727         bool inverse = false;
728         if (itemsReady.compare_exchange_strong(inverse, true)) {
729             producer->notifyStreamReady(vb_, true);
730         }
731     }
732 }
733
734 void ActiveStream::endStream(end_stream_status_t reason) {
735     if (state_ != STREAM_DEAD) {
736         if (state_ == STREAM_BACKFILLING) {
737             // If Stream were in Backfilling state, clear out the
738             // backfilled items to clear up the backfill buffer.
739             clear_UNLOCKED();
740             producer->getBackfillManager()->bytesSent(bufferedBackfill.bytes);
741             bufferedBackfill.bytes = 0;
742             bufferedBackfill.items = 0;
743         }
744         if (reason != END_STREAM_DISCONNECTED) {
745             pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
746         }
747         transitionState(STREAM_DEAD);
748         LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Stream closing, "
749             "%" PRIu64 " items sent from backfill phase, %" PRIu64 " items "
750             "sent from memory phase, %" PRIu64 " was last seqno sent, "
751             "reason: %s", producer->logHeader(), vb_,
752             uint64_t(backfillItems.sent.load()),
753             uint64_t(itemsFromMemoryPhase), lastSentSeqno.load(),
754             getEndStreamStatusStr(reason));
755     }
756 }
757
758 void ActiveStream::scheduleBackfill() {
759     if (!isBackfillTaskRunning) {
760         RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
761         if (!vbucket) {
762             return;
763         }
764
765         CursorRegResult result =
766             vbucket->checkpointManager.registerCursorBySeqno(name_,
767                                                              lastReadSeqno);
768
769         curChkSeqno = result.first;
770         bool isFirstItem = result.second;
771
772         cb_assert(lastReadSeqno <= curChkSeqno);
773         uint64_t backfillStart = lastReadSeqno + 1;
774
775         /* We need to find the minimum seqno that needs to be backfilled in
776          * order to make sure that we don't miss anything when transitioning
777          * to a memory snapshot. The backfill task will always make sure that
778          * the backfill end seqno is contained in the backfill.
779          */
780         uint64_t backfillEnd = 0;
781         if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) { // disk backfill only
782             backfillEnd = end_seqno_;
783         } else { // disk backfill + in-memory streaming
784             if (backfillStart < curChkSeqno) {
785                 if (curChkSeqno > end_seqno_) {
786                     backfillEnd = end_seqno_;
787                 } else {
788                     backfillEnd = curChkSeqno - 1;
789                 }
790             }
791         }
792
793         bool tryBackfill = isFirstItem || flags_ & DCP_ADD_STREAM_FLAG_DISKONLY;
794
795         if (backfillStart <= backfillEnd && tryBackfill) {
796             BackfillManager* backfillMgr = producer->getBackfillManager();
797             backfillMgr->schedule(this, backfillStart, backfillEnd);
798             isBackfillTaskRunning.store(true);
799         } else {
800             if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
801                 endStream(END_STREAM_OK);
802             } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
803                 transitionState(STREAM_TAKEOVER_SEND);
804             } else {
805                 transitionState(STREAM_IN_MEMORY);
806             }
807             itemsReady.store(true);
808         }
809     }
810 }
811
812 const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
813 {
814     switch (status) {
815     case END_STREAM_OK:
816         return "The stream ended due to all items being streamed";
817     case END_STREAM_CLOSED:
818         return "The stream closed early due to a close stream message";
819     case END_STREAM_STATE:
820         return "The stream closed early because the vbucket state changed";
821     case END_STREAM_DISCONNECTED:
822         return "The stream closed early because the conn was disconnected";
823     default:
824         break;
825     }
826     return "Status unknown; this should not happen";
827 }
828
829 void ActiveStream::transitionState(stream_state_t newState) {
830     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
831         producer->logHeader(), vb_, stateName(state_), stateName(newState));
832
833     if (state_ == newState) {
834         return;
835     }
836
837     switch (state_) {
838         case STREAM_PENDING:
839             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
840             break;
841         case STREAM_BACKFILLING:
842             cb_assert(newState == STREAM_IN_MEMORY ||
843                    newState == STREAM_TAKEOVER_SEND ||
844                    newState == STREAM_DEAD);
845             break;
846         case STREAM_IN_MEMORY:
847             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
848             break;
849         case STREAM_TAKEOVER_SEND:
850             cb_assert(newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD);
851             break;
852         case STREAM_TAKEOVER_WAIT:
853             cb_assert(newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD);
854             break;
855         default:
856             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
857                 "to %s", producer->logHeader(), vb_, stateName(state_),
858                 stateName(newState));
859             abort();
860     }
861
862     state_ = newState;
863
864     switch (newState) {
865         case STREAM_BACKFILLING:
866             scheduleBackfill();
867             break;
868         case STREAM_IN_MEMORY:
869             // Check if the producer has sent up till the last requested
870             // sequence number already, if not - move checkpoint items into
871             // the ready queue.
872             if (lastSentSeqno >= end_seqno_) {
873                 // Stream transitioning to DEAD state
874                 endStream(END_STREAM_OK);
875             } else {
876                 nextCheckpointItem();
877             }
878             break;
879         case STREAM_TAKEOVER_SEND:
880             nextCheckpointItem();
881             break;
882         case STREAM_DEAD:
883             {
884                 RCPtr<VBucket> vb = engine->getVBucket(vb_);
885                 if (vb) {
886                     vb->checkpointManager.removeCursor(name_);
887                 }
888                 break;
889             }
890         case STREAM_TAKEOVER_WAIT:
891         case STREAM_PENDING:
892             break;
893         case STREAM_READING:
894             LOG(EXTENSION_LOG_WARNING,
895                 "ActiveStream::transitionState: newState can't be "
896                 "STREAM_READING!");
897             break;
898     }
899 }
900
901 size_t ActiveStream::getItemsRemaining() {
902     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
903
904     if (!vbucket || state_ == STREAM_DEAD) {
905         return 0;
906     }
907
908     uint64_t high_seqno = vbucket->getHighSeqno();
909
910     if (end_seqno_ < high_seqno) {
911         if (end_seqno_ > lastSentSeqno.load()) {
912             return (end_seqno_ - lastSentSeqno.load());
913         }
914     } else {
915         if (high_seqno > lastSentSeqno.load()) {
916             return (high_seqno - lastSentSeqno.load());
917         }
918     }
919
920     return 0;
921 }
922
923 ExtendedMetaData* ActiveStream::prepareExtendedMetaData(uint16_t vBucketId,
924                                                         uint8_t conflictResMode)
925 {
926     ExtendedMetaData *emd = NULL;
927     if (producer->isExtMetaDataEnabled()) {
928         RCPtr<VBucket> vb = engine->getVBucket(vBucketId);
929         if (vb && vb->isTimeSyncEnabled()) {
930             int64_t adjustedTime = gethrtime() + vb->getDriftCounter();
931             emd = new ExtendedMetaData(adjustedTime, conflictResMode);
932         } else {
933             emd = new ExtendedMetaData(conflictResMode);
934         }
935     }
936     return emd;
937 }
938
939 const char* ActiveStream::logHeader()
940 {
941     return producer->logHeader();
942 }
943
944 bool ActiveStream::isCurrentSnapshotCompleted() const
945 {
946     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
947     // An atomic read of vbucket state without acquiring the
948     // reader lock for state should suffice here.
949     if (vbucket && vbucket->getState() == vbucket_state_replica) {
950         if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
951             lastReadSeqno) {
952             return false;
953         }
954     }
955     return true;
956 }
957
958 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
959                                const std::string &name, uint32_t flags,
960                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
961                                uint64_t en_seqno, uint64_t vb_uuid,
962                                uint64_t snap_start_seqno,
963                                uint64_t snap_end_seqno)
964     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
965              snap_start_seqno, snap_end_seqno),
966       producer(p) {
967     LockHolder lh(streamMutex);
968     RCPtr<VBucket> vbucket = e->getVBucket(vb_);
969     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
970         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
971         transitionState(STREAM_DEAD);
972         itemsReady.store(true);
973     }
974
975     type_ = STREAM_NOTIFIER;
976
977     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
978         "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
979         en_seqno);
980 }
981
982 uint32_t NotifierStream::setDead(end_stream_status_t status) {
983     LockHolder lh(streamMutex);
984     if (state_ != STREAM_DEAD) {
985         transitionState(STREAM_DEAD);
986         if (status != END_STREAM_DISCONNECTED) {
987             pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
988             lh.unlock();
989             bool inverse = false;
990             if (itemsReady.compare_exchange_strong(inverse, true)) {
991                 producer->notifyStreamReady(vb_, true);
992             }
993         }
994     }
995     return 0;
996 }
997
998 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
999     LockHolder lh(streamMutex);
1000     if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1001         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1002         transitionState(STREAM_DEAD);
1003         lh.unlock();
1004         bool inverse = false;
1005         if (itemsReady.compare_exchange_strong(inverse, true)) {
1006             producer->notifyStreamReady(vb_, true);
1007         }
1008     }
1009 }
1010
1011 DcpResponse* NotifierStream::next() {
1012     LockHolder lh(streamMutex);
1013
1014     if (readyQ.empty()) {
1015         itemsReady.store(false);
1016         return NULL;
1017     }
1018
1019     DcpResponse* response = readyQ.front();
1020     if (producer->bufferLogInsert(response->getMessageSize())) {
1021         popFromReadyQ();
1022     } else {
1023         response = NULL;
1024     }
1025
1026     return response;
1027 }
1028
1029 void NotifierStream::transitionState(stream_state_t newState) {
1030     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1031         producer->logHeader(), vb_, stateName(state_), stateName(newState));
1032
1033     if (state_ == newState) {
1034         return;
1035     }
1036
1037     switch (state_) {
1038         case STREAM_PENDING:
1039             cb_assert(newState == STREAM_DEAD);
1040             break;
1041         default:
1042             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1043                 "to %s", producer->logHeader(), vb_, stateName(state_),
1044                 stateName(newState));
1045             abort();
1046     }
1047
1048     state_ = newState;
1049 }
1050
1051 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1052                              const std::string &name, uint32_t flags,
1053                              uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1054                              uint64_t en_seqno, uint64_t vb_uuid,
1055                              uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1056                              uint64_t vb_high_seqno)
1057     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1058              snap_start_seqno, snap_end_seqno),
1059       engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1060       cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false) {
1061     LockHolder lh(streamMutex);
1062     pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1063                                   vb_uuid, snap_start_seqno, snap_end_seqno));
1064     itemsReady.store(true);
1065     type_ = STREAM_PASSIVE;
1066
1067     const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1068     LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Attempting to add %s stream"
1069         " with start seqno %" PRIu64 ", end seqno %" PRIu64 ","
1070         " vbucket uuid %" PRIu64 ", snap start seqno %" PRIu64 ","
1071         " snap end seqno %" PRIu64 ", and vb_high_seqno %" PRIu64 "",
1072         consumer->logHeader(), vb, type, st_seqno, en_seqno, vb_uuid,
1073         snap_start_seqno, snap_end_seqno, vb_high_seqno);
1074 }
1075
1076 PassiveStream::~PassiveStream() {
1077     uint32_t unackedBytes = clearBuffer();
1078     if (transitionState(STREAM_DEAD)) {
1079         // Destructed a "live" stream, log it.
1080         LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Destructing stream."
1081             " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
1082             consumer->logHeader(), vb_, last_seqno,
1083             unackedBytes);
1084     }
1085 }
1086
1087 uint32_t PassiveStream::setDead_UNLOCKED(end_stream_status_t status,
1088                                          LockHolder *slh) {
1089     transitionState(STREAM_DEAD);
1090     slh->unlock();
1091     uint32_t unackedBytes = clearBuffer();
1092     LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRId16 ") Setting stream to dead"
1093         " state, last_seqno is %" PRIu64 ", unackedBytes is %" PRIu32 ","
1094         " status is %s",
1095         consumer->logHeader(), vb_, last_seqno, unackedBytes,
1096         getEndStreamStatusStr(status));
1097     return unackedBytes;
1098 }
1099
1100 uint32_t PassiveStream::setDead(end_stream_status_t status) {
1101     LockHolder lh(streamMutex);
1102     return setDead_UNLOCKED(status, &lh);
1103 }
1104
1105 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1106     LockHolder lh(streamMutex);
1107     if (state_ == STREAM_PENDING) {
1108         if (status == ENGINE_SUCCESS) {
1109             transitionState(STREAM_READING);
1110         } else {
1111             transitionState(STREAM_DEAD);
1112         }
1113         pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1114         lh.unlock();
1115         bool inverse = false;
1116         if (itemsReady.compare_exchange_strong(inverse, true)) {
1117             consumer->notifyStreamReady(vb_);
1118         }
1119     }
1120 }
1121
1122 void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1123                                     uint32_t new_opaque,
1124                                     uint64_t start_seqno) {
1125     vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1126
1127     snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
1128     if (info.range.end == info.start) {
1129         info.range.start = info.start;
1130     }
1131
1132     snap_start_seqno_ = info.range.start;
1133     start_seqno_ = info.start;
1134     snap_end_seqno_ = info.range.end;
1135
1136     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to reconnect stream "
1137         "with opaque %u, start seq no %llu, end seq no %llu, snap start seqno "
1138         "%llu, and snap end seqno %llu", consumer->logHeader(), vb_, new_opaque,
1139         start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
1140
1141     LockHolder lh(streamMutex);
1142     last_seqno = start_seqno;
1143     pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1144                                   end_seqno_, vb_uuid_, snap_start_seqno_,
1145                                   snap_end_seqno_));
1146     lh.unlock();
1147     bool inverse = false;
1148     if (itemsReady.compare_exchange_strong(inverse, true)) {
1149         consumer->notifyStreamReady(vb_);
1150     }
1151 }
1152
1153 ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1154     if(nullptr == resp) {
1155         return ENGINE_EINVAL;
1156     }
1157
1158     LockHolder lh(buffer.bufMutex);
1159
1160     if (state_ == STREAM_DEAD) {
1161         delete resp;
1162         return ENGINE_KEY_ENOENT;
1163     }
1164
1165     switch (resp->getEvent()) {
1166         case DCP_MUTATION:
1167         case DCP_DELETION:
1168         case DCP_EXPIRATION:
1169         {
1170             MutationResponse* m = static_cast<MutationResponse*>(resp);
1171             uint64_t bySeqno = m->getBySeqno();
1172             if (bySeqno <= last_seqno) {
1173                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous (out of "
1174                     "sequence) mutation received, with opaque: %u, its "
1175                     "seqno (%llu) is not greater than last received seqno "
1176                     "(%llu); Dropping mutation!", consumer->logHeader(),
1177                     vb_, opaque_, bySeqno, last_seqno);
1178                 delete m;
1179                 return ENGINE_ERANGE;
1180             }
1181             last_seqno = bySeqno;
1182             break;
1183         }
1184         case DCP_SNAPSHOT_MARKER:
1185         {
1186             SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1187             uint64_t snapStart = s->getStartSeqno();
1188             uint64_t snapEnd = s->getEndSeqno();
1189             if (snapStart < last_seqno && snapEnd <= last_seqno) {
1190                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous snapshot "
1191                     "marker received, with opaque: %u, its start (%llu), and"
1192                     "end (%llu) are less than last received seqno (%llu); "
1193                     "Dropping marker!", consumer->logHeader(), vb_, opaque_,
1194                     snapStart, snapEnd, last_seqno);
1195                 delete s;
1196                 return ENGINE_ERANGE;
1197             }
1198             break;
1199         }
1200         case DCP_SET_VBUCKET:
1201         case DCP_STREAM_END:
1202         {
1203             /* No validations necessary */
1204             break;
1205         }
1206         default:
1207         {
1208             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unknown DCP op received: %d;"
1209                 " Disconnecting connection..",
1210                 consumer->logHeader(), vb_, resp->getEvent());
1211             return ENGINE_DISCONNECT;
1212         }
1213     }
1214     buffer.messages.push(resp);
1215     buffer.items++;
1216     buffer.bytes += resp->getMessageSize();
1217
1218     return ENGINE_SUCCESS;
1219 }
1220
1221 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
1222                                                              size_t batchSize) {
1223     LockHolder lh(buffer.bufMutex);
1224     uint32_t count = 0;
1225     uint32_t message_bytes = 0;
1226     uint32_t total_bytes_processed = 0;
1227     bool failed = false;
1228     if (buffer.messages.empty()) {
1229         return all_processed;
1230     }
1231
1232     while (count < batchSize && !buffer.messages.empty()) {
1233         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1234         DcpResponse *response = buffer.messages.front();
1235         message_bytes = response->getMessageSize();
1236
1237         switch (response->getEvent()) {
1238             case DCP_MUTATION:
1239                 ret = processMutation(static_cast<MutationResponse*>(response));
1240                 break;
1241             case DCP_DELETION:
1242             case DCP_EXPIRATION:
1243                 ret = processDeletion(static_cast<MutationResponse*>(response));
1244                 break;
1245             case DCP_SNAPSHOT_MARKER:
1246                 processMarker(static_cast<SnapshotMarker*>(response));
1247                 break;
1248             case DCP_SET_VBUCKET:
1249                 processSetVBucketState(static_cast<SetVBucketState*>(response));
1250                 break;
1251             case DCP_STREAM_END:
1252                 transitionState(STREAM_DEAD);
1253                 break;
1254             default:
1255                 LOG(EXTENSION_LOG_WARNING,
1256                     "PassiveStream::processBufferedMessages: "
1257                     "(vb %d) PassiveStream failing "
1258                     "unknown message type %d",
1259                     vb_, response->getEvent());
1260                 failed = true;
1261         }
1262
1263         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1264             failed = true;
1265             break;
1266         }
1267
1268         delete response;
1269         buffer.messages.pop();
1270         buffer.items--;
1271         buffer.bytes -= message_bytes;
1272         count++;
1273         if (ret != ENGINE_ERANGE) {
1274             total_bytes_processed += message_bytes;
1275         }
1276     }
1277
1278     processed_bytes = total_bytes_processed;
1279
1280     if (failed) {
1281         return cannot_process;
1282     }
1283
1284     return all_processed;
1285 }
1286
1287 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1288     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1289     if (!vb) {
1290         return ENGINE_NOT_MY_VBUCKET;
1291     }
1292
1293     if (mutation->getBySeqno() > cur_snapshot_end) {
1294         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous mutation [sequence "
1295             "number (%llu) greater than current snapshot end seqno (%llu)] "
1296             "being processed; Dropping the mutation!", consumer->logHeader(),
1297             vb_, mutation->getBySeqno(), cur_snapshot_end);
1298         return ENGINE_ERANGE;
1299     }
1300
1301     // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
1302     // receive anything without a valid CAS, however given that versions without
1303     // this check may send us "bad" CAS values, we should regenerate them (which
1304     // is better than rejecting the data entirely).
1305     if (!Item::isValidCas(mutation->getItem()->getCas())) {
1306         LOG(EXTENSION_LOG_WARNING,
1307             "%s Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
1308             ", seqno:%" PRId64 "}. Regenerating new CAS",
1309             consumer->logHeader(),
1310             mutation->getItem()->getCas(), vb_,
1311             mutation->getItem()->getBySeqno());
1312         mutation->getItem()->setCas();
1313     }
1314
1315     ENGINE_ERROR_CODE ret;
1316     if (vb->isBackfillPhase()) {
1317         ret = engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1318                                                     INITIAL_NRU_VALUE,
1319                                                     false,
1320                                                     mutation->getExtMetaData());
1321     } else {
1322         ret = engine->getEpStore()->setWithMeta(*mutation->getItem(), 0, NULL,
1323                                                 consumer->getCookie(), true,
1324                                                 true, INITIAL_NRU_VALUE, false,
1325                                                 mutation->getExtMetaData(),
1326                                                 true);
1327     }
1328
1329     // We should probably handle these error codes in a better way, but since
1330     // the producer side doesn't do anything with them anyways let's just log
1331     // them for now until we come up with a better solution.
1332     if (ret != ENGINE_SUCCESS) {
1333         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1334             "process  mutation", consumer->logHeader(), ret);
1335     } else {
1336         handleSnapshotEnd(vb, mutation->getBySeqno());
1337     }
1338
1339     return ret;
1340 }
1341
1342 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1343     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1344     if (!vb) {
1345         return ENGINE_NOT_MY_VBUCKET;
1346     }
1347
1348     if (deletion->getBySeqno() > cur_snapshot_end) {
1349         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous deletion [sequence "
1350             "number (%llu) greater than current snapshot end seqno (%llu)] "
1351             "being processed; Dropping the deletion!", consumer->logHeader(),
1352             vb_, deletion->getBySeqno(), cur_snapshot_end);
1353         return ENGINE_ERANGE;
1354     }
1355
1356     uint64_t delCas = 0;
1357     ENGINE_ERROR_CODE ret;
1358     ItemMetaData meta = deletion->getItem()->getMetaData();
1359
1360     // MB-17517: Check for the incoming item's CAS validity.
1361     if (!Item::isValidCas(meta.cas)) {
1362         LOG(EXTENSION_LOG_WARNING,
1363             "%s Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
1364             ", seqno:%" PRId64 "}. Regenerating new CAS",
1365             consumer->logHeader(), meta.cas, vb_, deletion->getBySeqno());
1366         meta.cas = Item::nextCas();
1367     }
1368
1369     ret = engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1370                                                &delCas, NULL, deletion->getVBucket(),
1371                                                consumer->getCookie(), true,
1372                                                &meta, vb->isBackfillPhase(),
1373                                                false, deletion->getBySeqno(),
1374                                                deletion->getExtMetaData(),
1375                                                true);
1376     if (ret == ENGINE_KEY_ENOENT) {
1377         ret = ENGINE_SUCCESS;
1378     }
1379
1380     // We should probably handle these error codes in a better way, but since
1381     // the producer side doesn't do anything with them anyways let's just log
1382     // them for now until we come up with a better solution.
1383     if (ret != ENGINE_SUCCESS) {
1384         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1385             "process  deletion", consumer->logHeader(), ret);
1386     } else {
1387         handleSnapshotEnd(vb, deletion->getBySeqno());
1388     }
1389
1390     return ret;
1391 }
1392
1393 void PassiveStream::processMarker(SnapshotMarker* marker) {
1394     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1395
1396     cur_snapshot_start = marker->getStartSeqno();
1397     cur_snapshot_end = marker->getEndSeqno();
1398     cur_snapshot_type = (marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory;
1399
1400     if (vb) {
1401         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1402             vb->setBackfillPhase(true);
1403             vb->checkpointManager.setBackfillPhase(cur_snapshot_start,
1404                                                    cur_snapshot_end);
1405         } else {
1406             if (marker->getFlags() & MARKER_FLAG_CHK ||
1407                 vb->checkpointManager.getOpenCheckpointId() == 0) {
1408                 vb->checkpointManager.createSnapshot(cur_snapshot_start,
1409                                                      cur_snapshot_end);
1410             } else {
1411                 vb->checkpointManager.updateCurrentSnapshotEnd(cur_snapshot_end);
1412             }
1413             vb->setBackfillPhase(false);
1414         }
1415
1416         if (marker->getFlags() & MARKER_FLAG_ACK) {
1417             cur_snapshot_ack = true;
1418         }
1419     }
1420 }
1421
1422 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1423     engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1424
1425     LockHolder lh (streamMutex);
1426     pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1427     lh.unlock();
1428     bool inverse = false;
1429     if (itemsReady.compare_exchange_strong(inverse, true)) {
1430         consumer->notifyStreamReady(vb_);
1431     }
1432 }
1433
1434 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1435     if (byseqno == cur_snapshot_end) {
1436         if (cur_snapshot_type == disk && vb->isBackfillPhase()) {
1437             vb->setBackfillPhase(false);
1438             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1439             vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1440         } else {
1441             double maxSize = static_cast<double>(engine->getEpStats().getMaxDataSize());
1442             double mem_threshold = StoredValue::getMutationMemThreshold();
1443             double mem_used = static_cast<double>(engine->getEpStats().getTotalMemoryUsed());
1444             if (maxSize * mem_threshold < mem_used) {
1445                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1446                 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1447             }
1448         }
1449
1450         if (cur_snapshot_ack) {
1451             LockHolder lh(streamMutex);
1452             pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1453             lh.unlock();
1454             bool inverse = false;
1455             if (itemsReady.compare_exchange_strong(inverse, true)) {
1456                 consumer->notifyStreamReady(vb_);
1457             }
1458             cur_snapshot_ack = false;
1459         }
1460         cur_snapshot_type = none;
1461     }
1462 }
1463
1464 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1465     Stream::addStats(add_stat, c);
1466
1467     const int bsize = 1024;
1468     char buf[bsize];
1469     snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
1470     add_casted_stat(buf, buffer.items, add_stat, c);
1471     snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
1472     add_casted_stat(buf, buffer.bytes, add_stat, c);
1473     snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
1474     add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1475     snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
1476     add_casted_stat(buf, last_seqno, add_stat, c);
1477     snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
1478     add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1479
1480     snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
1481     add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type), add_stat, c);
1482
1483     if (cur_snapshot_type != none) {
1484         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
1485         add_casted_stat(buf, cur_snapshot_start, add_stat, c);
1486         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
1487         add_casted_stat(buf, cur_snapshot_end, add_stat, c);
1488     }
1489 }
1490
1491 DcpResponse* PassiveStream::next() {
1492     LockHolder lh(streamMutex);
1493
1494     if (readyQ.empty()) {
1495         itemsReady.store(false);
1496         return NULL;
1497     }
1498
1499     DcpResponse* response = readyQ.front();
1500     popFromReadyQ();
1501     return response;
1502 }
1503
1504 uint32_t PassiveStream::clearBuffer() {
1505     LockHolder lh(buffer.bufMutex);
1506     uint32_t unackedBytes = buffer.bytes;
1507
1508     while (!buffer.messages.empty()) {
1509         DcpResponse* resp = buffer.messages.front();
1510         buffer.messages.pop();
1511         delete resp;
1512     }
1513
1514     buffer.bytes = 0;
1515     buffer.items = 0;
1516     return unackedBytes;
1517 }
1518
1519 bool PassiveStream::transitionState(stream_state_t newState) {
1520     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1521         consumer->logHeader(), vb_, stateName(state_), stateName(newState));
1522
1523     if (state_ == newState) {
1524         return false;
1525     }
1526
1527     switch (state_) {
1528         case STREAM_PENDING:
1529             cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
1530             break;
1531         case STREAM_READING:
1532             cb_assert(newState == STREAM_PENDING || newState == STREAM_DEAD);
1533             break;
1534         default:
1535             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1536                 "to %s", consumer->logHeader(), vb_, stateName(state_),
1537                 stateName(newState));
1538             abort();
1539     }
1540
1541     state_ = newState;
1542     return true;
1543 }
1544
1545 const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
1546 {
1547     switch (status) {
1548         case END_STREAM_CLOSED:
1549             return "The stream closed due to a close stream message";
1550         case END_STREAM_DISCONNECTED:
1551             return "The stream closed early because the conn was disconnected";
1552         default:
1553             break;
1554     }
1555     return "Status unknown; this should not happen";
1556 }