MB-19253: Fix race in void ExecutorPool::doWorkerStat
[ep-engine.git] / src / dcp-stream.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "kvstore.h"
23 #include "statwriter.h"
24 #include "dcp-stream.h"
25 #include "dcp-consumer.h"
26 #include "dcp-producer.h"
27 #include "dcp-response.h"
28
29 #define DCP_BACKFILL_SLEEP_TIME 2
30
31 static const char* snapshotTypeToString(snapshot_type_t type) {
32     static const char * const snapshotTypes[] = { "none", "disk", "memory" };
33     cb_assert(type >= none && type <= memory);
34     return snapshotTypes[type];
35 }
36
37 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
38 const size_t PassiveStream::batchSize = 10;
39
40 class SnapshotMarkerCallback : public Callback<SeqnoRange> {
41 public:
42     SnapshotMarkerCallback(stream_t s)
43         : stream(s) {
44         cb_assert(s->getType() == STREAM_ACTIVE);
45     }
46
47     void callback(SeqnoRange &range) {
48         uint64_t st = range.getStartSeqno();
49         uint64_t en = range.getEndSeqno();
50         static_cast<ActiveStream*>(stream.get())->markDiskSnapshot(st, en);
51     }
52
53 private:
54     stream_t stream;
55 };
56
57 class CacheCallback : public Callback<CacheLookup> {
58 public:
59     CacheCallback(EventuallyPersistentEngine* e, stream_t &s)
60         : engine_(e), stream_(s) {
61         Stream *str = stream_.get();
62         if (str) {
63             cb_assert(str->getType() == STREAM_ACTIVE);
64         }
65     }
66
67     void callback(CacheLookup &lookup);
68
69 private:
70     EventuallyPersistentEngine* engine_;
71     stream_t stream_;
72 };
73
74 void CacheCallback::callback(CacheLookup &lookup) {
75     RCPtr<VBucket> vb = engine_->getEpStore()->getVBucket(lookup.getVBucketId());
76     if (!vb) {
77         setStatus(ENGINE_SUCCESS);
78         return;
79     }
80
81     int bucket_num(0);
82     LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
83     StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num, false, false);
84     if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
85         Item* it = v->toItem(false, lookup.getVBucketId());
86         lh.unlock();
87         static_cast<ActiveStream*>(stream_.get())->backfillReceived(it);
88         setStatus(ENGINE_KEY_EEXISTS);
89     } else {
90         setStatus(ENGINE_SUCCESS);
91     }
92 }
93
94 class DiskCallback : public Callback<GetValue> {
95 public:
96     DiskCallback(stream_t &s)
97         : stream_(s) {
98         Stream *str = stream_.get();
99         if (str) {
100             cb_assert(str->getType() == STREAM_ACTIVE);
101         }
102     }
103
104     void callback(GetValue &val) {
105         cb_assert(val.getValue());
106         ActiveStream* active_stream = static_cast<ActiveStream*>(stream_.get());
107         active_stream->backfillReceived(val.getValue());
108     }
109
110 private:
111     stream_t stream_;
112 };
113
114 class DCPBackfill : public GlobalTask {
115 public:
116     DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
117                 uint64_t start_seqno, uint64_t end_seqno, const Priority &p,
118                 double sleeptime = 0, bool shutdown = false)
119         : GlobalTask(e, p, sleeptime, shutdown), engine(e), stream(s),
120           startSeqno(start_seqno), endSeqno(end_seqno) {
121         cb_assert(stream->getType() == STREAM_ACTIVE);
122     }
123
124     bool run();
125
126     std::string getDescription();
127
128 private:
129     EventuallyPersistentEngine *engine;
130     stream_t                    stream;
131     uint64_t                    startSeqno;
132     uint64_t                    endSeqno;
133 };
134
135 bool DCPBackfill::run() {
136     uint16_t vbid = stream->getVBucket();
137
138     if (engine->getEpStore()->isMemoryUsageTooHigh()) {
139         LOG(EXTENSION_LOG_WARNING, "VBucket %d dcp backfill task temporarily "
140                 "suspended  because the current memory usage is too high",
141                 vbid);
142         snooze(DCP_BACKFILL_SLEEP_TIME);
143         return true;
144     }
145
146     uint64_t lastPersistedSeqno =
147         engine->getEpStore()->getLastPersistedSeqno(vbid);
148     uint64_t diskSeqno =
149         engine->getEpStore()->getRWUnderlying(vbid)->getLastPersistedSeqno(vbid);
150
151     if (lastPersistedSeqno < endSeqno) {
152         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Rescheduling backfill "
153             "because backfill up to seqno %llu is needed but only up to "
154             "%llu is persisted (disk %llu)",
155             static_cast<ActiveStream*>(stream.get())->logHeader(), vbid,
156             endSeqno, lastPersistedSeqno, diskSeqno);
157         snooze(DCP_BACKFILL_SLEEP_TIME);
158         return true;
159     }
160
161     KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
162     size_t numItems = kvstore->getNumItems(vbid, startSeqno,
163                                            std::numeric_limits<uint64_t>::max());
164     static_cast<ActiveStream*>(stream.get())->incrBackfillRemaining(numItems);
165
166     shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
167     shared_ptr<Callback<CacheLookup> > cl(new CacheCallback(engine, stream));
168     shared_ptr<Callback<SeqnoRange> > sr(new SnapshotMarkerCallback(stream));
169     kvstore->dump(vbid, startSeqno, cb, cl, sr);
170
171     static_cast<ActiveStream*>(stream.get())->completeBackfill();
172
173     LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") "
174         "Backfill task (%" PRIu64 " to %" PRIu64 ") "
175         "finished. disk seqno %" PRIu64 " memory seqno %" PRIu64 "",
176         static_cast<ActiveStream*>(stream.get())->logHeader(), vbid,
177         startSeqno, endSeqno, diskSeqno, lastPersistedSeqno);
178
179     return false;
180 }
181
182 std::string DCPBackfill::getDescription() {
183     std::stringstream ss;
184     ss << "DCP backfill for vbucket " << stream->getVBucket();
185     return ss.str();
186 }
187
188 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
189                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
190                uint64_t vb_uuid, uint64_t snap_start_seqno,
191                uint64_t snap_end_seqno)
192     : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
193       start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
194       snap_start_seqno_(snap_start_seqno),
195       snap_end_seqno_(snap_end_seqno),
196       state_(STREAM_PENDING), itemsReady(false), readyQueueMemory(0) {
197 }
198
199 void Stream::clear_UNLOCKED() {
200     while (!readyQ.empty()) {
201         DcpResponse* resp = readyQ.front();
202         popFromReadyQ();
203         delete resp;
204     }
205 }
206
207 void Stream::pushToReadyQ(DcpResponse* resp)
208 {
209     if (resp) {
210         readyQ.push(resp);
211         readyQueueMemory.fetch_add(resp->getMessageSize(),
212                                    memory_order_relaxed);
213     }
214 }
215
216 void Stream::popFromReadyQ(void)
217 {
218     if (!readyQ.empty()) {
219         uint32_t respSize = readyQ.front()->getMessageSize();
220         readyQ.pop();
221         /* Decrement the readyQ size */
222         if (respSize <= readyQueueMemory.load(memory_order_relaxed)) {
223             readyQueueMemory.fetch_sub(respSize, memory_order_relaxed);
224         } else {
225             LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
226                 "underflow, likely wrong stat calculation! curr size: %llu;"
227                 "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory.load(),
228                 respSize);
229             readyQueueMemory.store(0, memory_order_relaxed);
230         }
231     }
232 }
233
234 uint64_t Stream::getReadyQueueMemory() {
235     return readyQueueMemory.load(memory_order_relaxed);
236 }
237
238 const char * Stream::stateName(stream_state_t st) const {
239     static const char * const stateNames[] = {
240         "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
241         "reading", "dead"
242     };
243     cb_assert(st >= STREAM_PENDING && st <= STREAM_DEAD);
244     return stateNames[st];
245 }
246
247 void Stream::addStats(ADD_STAT add_stat, const void *c) {
248     const int bsize = 128;
249     char buffer[bsize];
250     snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_);
251     add_casted_stat(buffer, flags_, add_stat, c);
252     snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_);
253     add_casted_stat(buffer, opaque_, add_stat, c);
254     snprintf(buffer, bsize, "%s:stream_%d_start_seqno", name_.c_str(), vb_);
255     add_casted_stat(buffer, start_seqno_, add_stat, c);
256     snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(), vb_);
257     add_casted_stat(buffer, end_seqno_, add_stat, c);
258     snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(), vb_);
259     add_casted_stat(buffer, vb_uuid_, add_stat, c);
260     snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno", name_.c_str(), vb_);
261     add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
262     snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno", name_.c_str(), vb_);
263     add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
264     snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_);
265     add_casted_stat(buffer, stateName(state_), add_stat, c);
266 }
267
268 ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
269                            const std::string &n, uint32_t flags,
270                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
271                            uint64_t en_seqno, uint64_t vb_uuid,
272                            uint64_t snap_start_seqno, uint64_t snap_end_seqno)
273     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
274               snap_start_seqno, snap_end_seqno),
275        lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
276        takeoverState(vbucket_state_pending), backfillRemaining(0),
277        itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
278        waitForSnapshot(0), engine(e), producer(p),
279        isBackfillTaskRunning(false), lastSentSnapEndSeqno(0),
280        chkptItemsExtractionInProgress(false) {
281
282     const char* type = "";
283     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
284         type = "takeover ";
285         end_seqno_ = dcpMaxSeqno;
286     }
287
288     RCPtr<VBucket> vbucket = engine->getVBucket(vb);
289     if (vbucket) {
290         ReaderLockHolder rlh(vbucket->getStateLock());
291         if (vbucket->getState() == vbucket_state_replica) {
292             uint64_t snapshot_start, snapshot_end;
293             vbucket->getCurrentSnapshot(snapshot_start, snapshot_end);
294             if (snapshot_end > en_seqno) {
295                 end_seqno_ = snapshot_end;
296             }
297         }
298     }
299
300     if (start_seqno_ >= end_seqno_) {
301         endStream(END_STREAM_OK);
302         itemsReady.store(true);
303     }
304
305     type_ = STREAM_ACTIVE;
306
307     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) %sstream created with start seqno "
308         "%llu and end seqno %llu", producer->logHeader(), vb, type, st_seqno,
309         en_seqno);
310 }
311
312 DcpResponse* ActiveStream::next() {
313     LockHolder lh(streamMutex);
314
315     stream_state_t initState = state_;
316
317     DcpResponse* response = NULL;
318
319     switch (state_) {
320         case STREAM_PENDING:
321             break;
322         case STREAM_BACKFILLING:
323             response = backfillPhase();
324             break;
325         case STREAM_IN_MEMORY:
326             response = inMemoryPhase();
327             break;
328         case STREAM_TAKEOVER_SEND:
329             response = takeoverSendPhase();
330             break;
331         case STREAM_TAKEOVER_WAIT:
332             response = takeoverWaitPhase();
333             break;
334         case STREAM_DEAD:
335             response = deadPhase();
336             break;
337         default:
338             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
339                 producer->logHeader(), vb_, stateName(state_));
340             abort();
341     }
342
343     if (state_ != STREAM_DEAD && initState != state_ && !response) {
344         lh.unlock();
345         return next();
346     }
347
348     itemsReady.store(response ? true : false);
349     return response;
350 }
351
352 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
353     LockHolder lh(streamMutex);
354     uint64_t chkCursorSeqno = endSeqno;
355
356     if (state_ != STREAM_BACKFILLING) {
357         return;
358     }
359
360     startSeqno = std::min(snap_start_seqno_, startSeqno);
361     firstMarkerSent = true;
362
363     RCPtr<VBucket> vb = engine->getVBucket(vb_);
364     // An atomic read of vbucket state without acquiring the
365     // reader lock for state should suffice here.
366     if (vb && vb->getState() == vbucket_state_replica) {
367         if (end_seqno_ > endSeqno) {
368             /* We possibly have items in the open checkpoint
369                (incomplete snapshot) */
370             LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Merging backfill"
371                 " and memory snapshot for a replica vbucket, start seqno "
372                 "%" PRIu64 " and end seqno %" PRIu64 "",
373                 producer->logHeader(), vb_, startSeqno, endSeqno);
374             endSeqno = end_seqno_;
375         }
376     }
377
378     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
379         "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
380         endSeqno);
381     pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
382                                     MARKER_FLAG_DISK));
383     lastSentSnapEndSeqno = endSeqno;
384
385     if (!vb) {
386         endStream(END_STREAM_STATE);
387     } else {
388         if (endSeqno > end_seqno_) {
389             chkCursorSeqno = end_seqno_;
390         }
391         // Only re-register the cursor if we still need to get memory snapshots
392         CursorRegResult result =
393             vb->checkpointManager.registerTAPCursorBySeqno(name_,
394                                                            chkCursorSeqno);
395         curChkSeqno = result.first;
396     }
397
398     lh.unlock();
399     bool inverse = false;
400     if (itemsReady.compare_exchange_strong(inverse, true)) {
401         producer->notifyStreamReady(vb_, false);
402     }
403 }
404
405 void ActiveStream::backfillReceived(Item* itm) {
406     LockHolder lh(streamMutex);
407     if (state_ == STREAM_BACKFILLING) {
408         pushToReadyQ(new MutationResponse(itm, opaque_));
409         lastReadSeqno = itm->getBySeqno();
410         lh.unlock();
411         bool inverse = false;
412         if (itemsReady.compare_exchange_strong(inverse, true)) {
413             producer->notifyStreamReady(vb_, false);
414         }
415     } else {
416         delete itm;
417     }
418 }
419
420 void ActiveStream::completeBackfill() {
421     {
422         LockHolder lh(streamMutex);
423         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
424             " from disk, last seqno read: %ld", producer->logHeader(), vb_,
425             itemsFromBackfill, lastReadSeqno.load());
426     }
427
428     isBackfillTaskRunning.store(false);
429     bool inverse = false;
430     if (itemsReady.compare_exchange_strong(inverse, true)) {
431         producer->notifyStreamReady(vb_, false);
432     }
433 }
434
435 void ActiveStream::snapshotMarkerAckReceived() {
436     bool inverse = false;
437     if (--waitForSnapshot == 0 &&
438         itemsReady.compare_exchange_strong(inverse, true)) {
439         producer->notifyStreamReady(vb_, true);
440     }
441 }
442
443 void ActiveStream::setVBucketStateAckRecieved() {
444     LockHolder lh(streamMutex);
445     if (state_ == STREAM_TAKEOVER_WAIT) {
446         if (takeoverState == vbucket_state_pending) {
447             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
448                 "vbucket state to pending message", producer->logHeader(), vb_);
449
450             takeoverState = vbucket_state_active;
451             transitionState(STREAM_TAKEOVER_SEND);
452             lh.unlock();
453
454             engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
455                                                   false, false);
456             RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
457             LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Vbucket marked as "
458                 "dead, last sent seqno: %" PRIu64 ", high seqno: %" PRIu64 "",
459                 producer->logHeader(), vb_, lastSentSeqno.load(),
460                 vbucket->getHighSeqno());
461         } else {
462             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
463                 "vbucket state to active message", producer->logHeader(), vb_);
464             endStream(END_STREAM_OK);
465             lh.unlock();
466         }
467
468         bool inverse = false;
469         if (itemsReady.compare_exchange_strong(inverse, true)) {
470             producer->notifyStreamReady(vb_, true);
471         }
472     } else {
473         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
474             "op on stream '%s' state '%s'", producer->logHeader(), vb_,
475             name_.c_str(), stateName(state_));
476     }
477
478 }
479
480 DcpResponse* ActiveStream::backfillPhase() {
481     DcpResponse* resp = nextQueuedItem();
482
483     if (resp && backfillRemaining > 0 &&
484         (resp->getEvent() == DCP_MUTATION ||
485          resp->getEvent() == DCP_DELETION ||
486          resp->getEvent() == DCP_EXPIRATION)) {
487         backfillRemaining--;
488     }
489
490     if (!isBackfillTaskRunning && readyQ.empty()) {
491         backfillRemaining.store(0, memory_order_relaxed);
492         if (lastReadSeqno.load() >= end_seqno_) {
493             endStream(END_STREAM_OK);
494         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
495             transitionState(STREAM_TAKEOVER_SEND);
496         } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
497             endStream(END_STREAM_OK);
498         } else {
499             transitionState(STREAM_IN_MEMORY);
500         }
501
502         if (!resp) {
503             resp = nextQueuedItem();
504         }
505     }
506
507     return resp;
508 }
509
510 DcpResponse* ActiveStream::inMemoryPhase() {
511     if (lastSentSeqno.load() >= end_seqno_) {
512         endStream(END_STREAM_OK);
513     } else if (readyQ.empty()) {
514         if (nextCheckpointItem()) {
515             return NULL;
516         }
517     }
518
519     return nextQueuedItem();
520 }
521
522 DcpResponse* ActiveStream::takeoverSendPhase() {
523     if (!readyQ.empty()) {
524         return nextQueuedItem();
525     } else {
526         if (nextCheckpointItem()) {
527             return NULL;
528         }
529     }
530
531     if (waitForSnapshot != 0) {
532         return NULL;
533     }
534     DcpResponse* resp = NULL;
535     if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
536         resp = new SetVBucketState(opaque_, vb_, takeoverState);
537         transitionState(STREAM_TAKEOVER_WAIT);
538     }
539     return resp;
540 }
541
542 DcpResponse* ActiveStream::takeoverWaitPhase() {
543     return nextQueuedItem();
544 }
545
546 DcpResponse* ActiveStream::deadPhase() {
547     return nextQueuedItem();
548 }
549
550 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
551     Stream::addStats(add_stat, c);
552
553     const int bsize = 128;
554     char buffer[bsize];
555     snprintf(buffer, bsize, "%s:stream_%d_backfilled", name_.c_str(), vb_);
556     add_casted_stat(buffer, itemsFromBackfill, add_stat, c);
557     snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
558     add_casted_stat(buffer, itemsFromMemory, add_stat, c);
559     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
560     add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
561     snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
562     add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
563     snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
564     add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
565     snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
566     add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat, c);
567 }
568
569 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
570     LockHolder lh(streamMutex);
571
572     RCPtr<VBucket> vb = engine->getVBucket(vb_);
573     add_casted_stat("name", name_, add_stat, cookie);
574     if (!vb || state_ == STREAM_DEAD) {
575         add_casted_stat("status", "completed", add_stat, cookie);
576         add_casted_stat("estimate", 0, add_stat, cookie);
577         add_casted_stat("backfillRemaining", 0, add_stat, cookie);
578         add_casted_stat("estimate", 0, add_stat, cookie);
579         return;
580     }
581
582     size_t total = backfillRemaining;
583     if (state_ == STREAM_BACKFILLING) {
584         add_casted_stat("status", "backfilling", add_stat, cookie);
585     } else {
586         add_casted_stat("status", "in-memory", add_stat, cookie);
587     }
588     add_casted_stat("backfillRemaining", backfillRemaining, add_stat, cookie);
589
590     item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
591     size_t vb_items = vb->getNumItems(iep);
592     size_t chk_items = vb_items > 0 ?
593                 vb->checkpointManager.getNumItemsForTAPConnection(name_) : 0;
594     size_t del_items = engine->getEpStore()->getRWUnderlying(vb_)->
595                                                     getNumPersistedDeletes(vb_);
596
597     if (end_seqno_ < curChkSeqno) {
598         chk_items = 0;
599     } else if ((end_seqno_ - curChkSeqno) < chk_items) {
600         chk_items = end_seqno_ - curChkSeqno + 1;
601     }
602     total += chk_items;
603
604     add_casted_stat("estimate", total, add_stat, cookie);
605     add_casted_stat("chk_items", chk_items, add_stat, cookie);
606     add_casted_stat("vb_items", vb_items, add_stat, cookie);
607     add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
608 }
609
610 DcpResponse* ActiveStream::nextQueuedItem() {
611     if (!readyQ.empty()) {
612         DcpResponse* response = readyQ.front();
613         if (producer->bufferLogInsert(response->getMessageSize())) {
614             if (response->getEvent() == DCP_MUTATION ||
615                 response->getEvent() == DCP_DELETION ||
616                 response->getEvent() == DCP_EXPIRATION) {
617                 lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
618
619                 if (state_ == STREAM_BACKFILLING) {
620                     itemsFromBackfill++;
621                 } else {
622                     itemsFromMemory++;
623                 }
624             }
625             popFromReadyQ();
626             return response;
627         }
628     }
629     return NULL;
630 }
631
632 bool ActiveStream::nextCheckpointItem() {
633     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
634     if (vbucket && vbucket->checkpointManager.getNumItemsForTAPConnection(name_) > 0) {
635         // schedule this stream to build the next checkpoint
636         producer->scheduleCheckpointProcessorTask(this);
637         return true;
638     } else if (chkptItemsExtractionInProgress) {
639         return true;
640     }
641     return false;
642 }
643
644 bool ActiveStreamCheckpointProcessorTask::run() {
645     if (engine->getEpStats().isShutdown) {
646         return false;
647     }
648
649     // Setup that we will sleep forever when done.
650     snooze(INT_MAX);
651
652     // Clear the notfification flag
653     notified.store(false);
654
655     size_t iterations = 0;
656     do {
657         stream_t nextStream = queuePop();
658         ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
659
660         if (stream) {
661             stream->nextCheckpointItemTask();
662         } else {
663             break;
664         }
665         iterations++;
666     } while(!queueEmpty()
667             && iterations < iterationsBeforeYield);
668
669     // Now check if we were re-notified or there are still checkpoints
670     bool expected = true;
671     if (notified.compare_exchange_strong(expected, false)
672         || !queueEmpty()) {
673         // snooze for 0, essentially yielding and allowing other tasks a go
674         snooze(0.0);
675     }
676
677     return true;
678 }
679
680 void ActiveStreamCheckpointProcessorTask::wakeup() {
681     ExecutorPool::get()->wake(getId());
682 }
683
684 void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
685     pushUnique(stream);
686
687     bool expected = false;
688     if (notified.compare_exchange_strong(expected, true)) {
689         wakeup();
690     }
691 }
692
693 void ActiveStreamCheckpointProcessorTask::clearQueues() {
694     LockHolder lh(workQueueLock);
695     while (!queue.empty()) {
696         queue.pop();
697     }
698     queuedVbuckets.clear();
699 }
700
701 void ActiveStream::nextCheckpointItemTask() {
702     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
703     if (vbucket) {
704         std::deque<queued_item> items;
705         getOutstandingItems(vbucket, items);
706         processItems(items);
707     } else {
708         /* The entity deleting the vbucket must set stream to dead,
709            calling setDead(END_STREAM_STATE) will cause deadlock because
710            it will try to grab streamMutex which is already acquired at this
711            point here */
712         return;
713     }
714 }
715
716 void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
717                                        std::deque<queued_item> &items) {
718     // Commencing item processing - set guard flag.
719     chkptItemsExtractionInProgress.store(true);
720
721     vb->checkpointManager.getAllItemsForCursor(name_, items);
722     if (vb->checkpointManager.getNumCheckpoints() > 1) {
723         engine->getEpStore()->wakeUpCheckpointRemover();
724     }
725 }
726
727
728 void ActiveStream::processItems(std::deque<queued_item>& items) {
729     if (!items.empty()) {
730         bool mark = false;
731         if (items.front()->getOperation() == queue_op_checkpoint_start) {
732             mark = true;
733         }
734
735         std::deque<MutationResponse*> mutations;
736         std::deque<queued_item>::iterator itemItr;
737         for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
738             queued_item& qi = *itemItr;
739
740             if (qi->getOperation() == queue_op_set ||
741                 qi->getOperation() == queue_op_del) {
742                 curChkSeqno = qi->getBySeqno();
743                 lastReadSeqno = qi->getBySeqno();
744
745                 mutations.push_back(new MutationResponse(qi, opaque_));
746             } else if (qi->getOperation() == queue_op_checkpoint_start) {
747                 snapshot(mutations, mark);
748                 mark = true;
749             }
750         }
751
752         if (mutations.empty()) {
753             // If we only got checkpoint start or ends check to see if there are
754             // any more snapshots before pausing the stream.
755             nextCheckpointItemTask();
756         } else {
757             snapshot(mutations, mark);
758         }
759     }
760
761     // Completed item processing - clear guard flag and notify producer.
762     chkptItemsExtractionInProgress.store(false);
763     producer->notifyStreamReady(vb_, true);
764 }
765
766 void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
767     if (items.empty()) {
768         return;
769     }
770
771     LockHolder lh(streamMutex);
772
773     if (isCurrentSnapshotCompleted()) {
774         uint32_t flags = MARKER_FLAG_MEMORY;
775         uint64_t snapStart = items.front()->getBySeqno();
776         uint64_t snapEnd = items.back()->getBySeqno();
777
778         if (mark) {
779             flags |= MARKER_FLAG_CHK;
780         }
781
782         if (state_ == STREAM_TAKEOVER_SEND) {
783             waitForSnapshot++;
784             flags |= MARKER_FLAG_ACK;
785         }
786
787         if (!firstMarkerSent) {
788             snapStart = std::min(snap_start_seqno_, snapStart);
789             firstMarkerSent = true;
790         }
791         pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
792                                         flags));
793         lastSentSnapEndSeqno = snapEnd;
794     }
795
796     std::deque<MutationResponse*>::iterator itemItr;
797     for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
798         pushToReadyQ(*itemItr);
799     }
800 }
801
802 uint32_t ActiveStream::setDead(end_stream_status_t status) {
803     {
804         LockHolder lh(streamMutex);
805         endStream(status);
806     }
807
808     bool inverse = false;
809     if (status != END_STREAM_DISCONNECTED &&
810         itemsReady.compare_exchange_strong(inverse, true)) {
811         producer->notifyStreamReady(vb_, true);
812     }
813     return 0;
814 }
815
816 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
817     if (state_ != STREAM_DEAD) {
818         bool inverse = false;
819         if (itemsReady.compare_exchange_strong(inverse, true)) {
820             producer->notifyStreamReady(vb_, true);
821         }
822     }
823 }
824
825 void ActiveStream::endStream(end_stream_status_t reason) {
826     if (state_ != STREAM_DEAD) {
827         if (reason != END_STREAM_DISCONNECTED) {
828             pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
829         }
830         transitionState(STREAM_DEAD);
831         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
832             " from disk, %llu items sent from memory, %llu was last seqno sent"
833             " %s is the reason", producer->logHeader(), vb_, itemsFromBackfill,
834             itemsFromMemory.load(), lastSentSeqno.load(),
835             getEndStreamStatusStr(reason));
836     }
837 }
838
839 void ActiveStream::scheduleBackfill() {
840     if (!isBackfillTaskRunning) {
841         RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
842         if (!vbucket) {
843             return;
844         }
845
846         CursorRegResult result =
847             vbucket->checkpointManager.registerTAPCursorBySeqno(name_,
848                                                                 lastReadSeqno);
849
850         curChkSeqno = result.first;
851         bool isFirstItem = result.second;
852
853         cb_assert(lastReadSeqno <= curChkSeqno);
854         uint64_t backfillStart = lastReadSeqno + 1;
855
856         /* We need to find the minimum seqno that needs to be backfilled in
857          * order to make sure that we don't miss anything when transitioning
858          * to a memory snapshot. The backfill task will always make sure that
859          * the backfill end seqno is contained in the backfill.
860          */
861         uint64_t backfillEnd = 0;
862         if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) { // disk backfill only
863             backfillEnd = end_seqno_;
864         } else { // disk backfill + in-memory streaming
865             if (backfillStart < curChkSeqno) {
866                 if (curChkSeqno > end_seqno_) {
867                     backfillEnd = end_seqno_;
868                 } else {
869                     backfillEnd = curChkSeqno - 1;
870                 }
871             }
872         }
873
874         bool tryBackfill = isFirstItem || flags_ & DCP_ADD_STREAM_FLAG_DISKONLY;
875
876         if (backfillStart <= backfillEnd && tryBackfill) {
877             ExTask task = new DCPBackfill(engine, this, backfillStart, backfillEnd,
878                                           Priority::TapBgFetcherPriority, 0, false);
879             ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
880             isBackfillTaskRunning.store(true);
881         } else {
882             if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
883                 endStream(END_STREAM_OK);
884             } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
885                 transitionState(STREAM_TAKEOVER_SEND);
886             } else {
887                 transitionState(STREAM_IN_MEMORY);
888             }
889             itemsReady.store(true);
890         }
891     }
892 }
893
894 const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
895 {
896     switch (status) {
897         case END_STREAM_OK:
898             return "The stream ended due to all items being streamed";
899         case END_STREAM_CLOSED:
900             return "The stream closed early due to a close stream message";
901         case END_STREAM_STATE:
902             return "The stream closed early because the vbucket state changed";
903         case END_STREAM_DISCONNECTED:
904             return "The stream closed early because the conn was disconnected";
905         default:
906             break;
907     }
908     return "Status unknown; this should not happen";
909 }
910
911 void ActiveStream::transitionState(stream_state_t newState) {
912     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
913         producer->logHeader(), vb_, stateName(state_), stateName(newState));
914
915     if (state_ == newState) {
916         return;
917     }
918
919     switch (state_) {
920         case STREAM_PENDING:
921             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
922             break;
923         case STREAM_BACKFILLING:
924             cb_assert(newState == STREAM_IN_MEMORY ||
925                    newState == STREAM_TAKEOVER_SEND ||
926                    newState == STREAM_DEAD);
927             break;
928         case STREAM_IN_MEMORY:
929             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
930             break;
931         case STREAM_TAKEOVER_SEND:
932             cb_assert(newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD);
933             break;
934         case STREAM_TAKEOVER_WAIT:
935             cb_assert(newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD);
936             break;
937         default:
938             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
939                 "to %s", producer->logHeader(), vb_, stateName(state_),
940                 stateName(newState));
941             abort();
942     }
943
944     state_ = newState;
945
946     switch (newState) {
947         case STREAM_BACKFILLING:
948             scheduleBackfill();
949             break;
950         case STREAM_IN_MEMORY:
951             // Check if the producer has sent up till the last requested
952             // sequence number already, if not - move checkpoint items into
953             // the ready queue.
954             if (lastSentSeqno >= end_seqno_) {
955                 // Stream transitioning to DEAD state
956                 endStream(END_STREAM_OK);
957             } else {
958                 nextCheckpointItem();
959             }
960             break;
961         case STREAM_TAKEOVER_SEND:
962             nextCheckpointItem();
963             break;
964         case STREAM_DEAD:
965             {
966                 RCPtr<VBucket> vb = engine->getVBucket(vb_);
967                 if (vb) {
968                     vb->checkpointManager.removeTAPCursor(name_);
969                 }
970                 break;
971             }
972         case STREAM_TAKEOVER_WAIT:
973         case STREAM_PENDING:
974             break;
975         case STREAM_READING:
976             LOG(EXTENSION_LOG_WARNING,
977                 "ActiveStream::transitionState: newState can't be "
978                 "STREAM_READING!");
979             break;
980     }
981 }
982
983 size_t ActiveStream::getItemsRemaining() {
984     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
985
986     if (!vbucket || state_ == STREAM_DEAD) {
987         return 0;
988     }
989
990     uint64_t high_seqno = vbucket->getHighSeqno();
991
992     if (end_seqno_ < high_seqno) {
993         if (end_seqno_ > lastSentSeqno.load()) {
994             return (end_seqno_ - lastSentSeqno.load());
995         }
996     } else {
997         if (high_seqno > lastSentSeqno.load()) {
998             return (high_seqno - lastSentSeqno.load());
999         }
1000     }
1001
1002     return 0;
1003 }
1004
1005 const char* ActiveStream::logHeader()
1006 {
1007     return producer->logHeader();
1008 }
1009
1010 bool ActiveStream::isCurrentSnapshotCompleted() const
1011 {
1012     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1013     // An atomic read of vbucket state without acquiring the
1014     // reader lock for state should suffice here.
1015     if (vbucket && vbucket->getState() == vbucket_state_replica) {
1016         if (lastSentSnapEndSeqno >= lastReadSeqno) {
1017             return false;
1018         }
1019     }
1020     return true;
1021 }
1022
1023 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
1024                                const std::string &name, uint32_t flags,
1025                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1026                                uint64_t en_seqno, uint64_t vb_uuid,
1027                                uint64_t snap_start_seqno,
1028                                uint64_t snap_end_seqno)
1029     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1030              snap_start_seqno, snap_end_seqno),
1031       producer(p) {
1032     LockHolder lh(streamMutex);
1033     RCPtr<VBucket> vbucket = e->getVBucket(vb_);
1034     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1035         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1036         transitionState(STREAM_DEAD);
1037         itemsReady.store(true);
1038     }
1039
1040     type_ = STREAM_NOTIFIER;
1041
1042     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
1043         "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
1044         en_seqno);
1045 }
1046
1047 uint32_t NotifierStream::setDead(end_stream_status_t status) {
1048     LockHolder lh(streamMutex);
1049     if (state_ != STREAM_DEAD) {
1050         transitionState(STREAM_DEAD);
1051         if (status != END_STREAM_DISCONNECTED) {
1052             pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
1053             lh.unlock();
1054             bool inverse = false;
1055             if (itemsReady.compare_exchange_strong(inverse, true)) {
1056                 producer->notifyStreamReady(vb_, true);
1057             }
1058         }
1059     }
1060     return 0;
1061 }
1062
1063 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1064     LockHolder lh(streamMutex);
1065     if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1066         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1067         transitionState(STREAM_DEAD);
1068         lh.unlock();
1069         bool inverse = false;
1070         if (itemsReady.compare_exchange_strong(inverse, true)) {
1071             producer->notifyStreamReady(vb_, true);
1072         }
1073     }
1074 }
1075
1076 DcpResponse* NotifierStream::next() {
1077     LockHolder lh(streamMutex);
1078
1079     if (readyQ.empty()) {
1080         itemsReady.store(false);
1081         return NULL;
1082     }
1083
1084     DcpResponse* response = readyQ.front();
1085     if (producer->bufferLogInsert(response->getMessageSize())) {
1086         popFromReadyQ();
1087     } else {
1088         response = NULL;
1089     }
1090
1091     return response;
1092 }
1093
1094 void NotifierStream::transitionState(stream_state_t newState) {
1095     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1096         producer->logHeader(), vb_, stateName(state_), stateName(newState));
1097
1098     if (state_ == newState) {
1099         return;
1100     }
1101
1102     switch (state_) {
1103         case STREAM_PENDING:
1104             cb_assert(newState == STREAM_DEAD);
1105             break;
1106         default:
1107             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1108                 "to %s", producer->logHeader(), vb_, stateName(state_),
1109                 stateName(newState));
1110             abort();
1111     }
1112
1113     state_ = newState;
1114 }
1115
1116 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1117                              const std::string &name, uint32_t flags,
1118                              uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1119                              uint64_t en_seqno, uint64_t vb_uuid,
1120                              uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1121                              uint64_t vb_high_seqno)
1122     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1123              snap_start_seqno, snap_end_seqno),
1124       engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1125       cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false),
1126       saveSnapshot(false) {
1127     LockHolder lh(streamMutex);
1128     pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1129                                   vb_uuid, snap_start_seqno, snap_end_seqno));
1130     itemsReady.store(true);
1131     type_ = STREAM_PASSIVE;
1132
1133     const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1134     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to add %s stream with "
1135         "start seqno %llu, end seqno %llu, vbucket uuid %llu, snap start seqno "
1136         "%llu, snap end seqno %llu, and vb_high_seqno %llu",
1137         consumer->logHeader(), vb, type, st_seqno, en_seqno, vb_uuid,
1138         snap_start_seqno, snap_end_seqno, vb_high_seqno);
1139 }
1140
1141 PassiveStream::~PassiveStream() {
1142     LockHolder lh(streamMutex);
1143     clear_UNLOCKED();
1144     cb_assert(state_ == STREAM_DEAD);
1145     cb_assert(buffer.bytes == 0);
1146 }
1147
1148 uint32_t PassiveStream::setDead(end_stream_status_t status) {
1149     LockHolder lh(streamMutex);
1150     transitionState(STREAM_DEAD);
1151     lh.unlock();
1152     uint32_t unackedBytes = clearBuffer();
1153     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Setting stream to dead state,"
1154         " last_seqno is %llu, unackedBytes is %u, status is %s",
1155         consumer->logHeader(), vb_, last_seqno, unackedBytes,
1156         getEndStreamStatusStr(status));
1157     return unackedBytes;
1158 }
1159
1160 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1161     LockHolder lh(streamMutex);
1162     if (state_ == STREAM_PENDING) {
1163         if (status == ENGINE_SUCCESS) {
1164             transitionState(STREAM_READING);
1165         } else {
1166             transitionState(STREAM_DEAD);
1167         }
1168         pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1169         lh.unlock();
1170         bool inverse = false;
1171         if (itemsReady.compare_exchange_strong(inverse, true)) {
1172             consumer->notifyStreamReady(vb_);
1173         }
1174     }
1175 }
1176
1177 void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1178                                     uint32_t new_opaque,
1179                                     uint64_t start_seqno) {
1180     vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1181     vb->getCurrentSnapshot(snap_start_seqno_, snap_end_seqno_);
1182
1183     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to reconnect stream "
1184         "with opaque %ld, start seq no %llu, end seq no %llu, snap start seqno "
1185         "%llu, and snap end seqno %llu", consumer->logHeader(), vb_, new_opaque,
1186         start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
1187
1188     LockHolder lh(streamMutex);
1189     last_seqno = start_seqno;
1190     pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1191                                   end_seqno_, vb_uuid_, snap_start_seqno_,
1192                                   snap_end_seqno_));
1193     lh.unlock();
1194     bool inverse = false;
1195     if (itemsReady.compare_exchange_strong(inverse, true)) {
1196         consumer->notifyStreamReady(vb_);
1197     }
1198 }
1199
1200 ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1201     LockHolder lh(buffer.bufMutex);
1202     cb_assert(resp);
1203
1204     if (state_ == STREAM_DEAD) {
1205         delete resp;
1206         return ENGINE_KEY_ENOENT;
1207     }
1208
1209     switch (resp->getEvent()) {
1210         case DCP_MUTATION:
1211         case DCP_DELETION:
1212         case DCP_EXPIRATION:
1213         {
1214             MutationResponse* m = static_cast<MutationResponse*>(resp);
1215             uint64_t bySeqno = m->getBySeqno();
1216             if (bySeqno <= last_seqno) {
1217                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous (out of "
1218                     "sequence) mutation received, with opaque: %ld, its "
1219                     "seqno (%llu) is not greater than last received seqno "
1220                     "(%llu); Dropping mutation!", consumer->logHeader(),
1221                     vb_, opaque_, bySeqno, last_seqno);
1222                 delete m;
1223                 return ENGINE_ERANGE;
1224             }
1225             last_seqno = bySeqno;
1226             break;
1227         }
1228         case DCP_SNAPSHOT_MARKER:
1229         {
1230             SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1231             uint64_t snapStart = s->getStartSeqno();
1232             uint64_t snapEnd = s->getEndSeqno();
1233             if (snapStart < last_seqno && snapEnd <= last_seqno) {
1234                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous snapshot "
1235                     "marker received, with opaque: %ld, its start (%llu), and"
1236                     "end (%llu) are less than last received seqno (%llu); "
1237                     "Dropping marker!", consumer->logHeader(), vb_, opaque_,
1238                     snapStart, snapEnd, last_seqno);
1239                 delete s;
1240                 return ENGINE_ERANGE;
1241             }
1242             break;
1243         }
1244         case DCP_SET_VBUCKET:
1245         case DCP_STREAM_END:
1246         {
1247             /* No validations necessary */
1248             break;
1249         }
1250         default:
1251         {
1252             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unknown DCP op received: %d;"
1253                 " Disconnecting connection..",
1254                 consumer->logHeader(), vb_, resp->getEvent());
1255             return ENGINE_DISCONNECT;
1256         }
1257     }
1258
1259     buffer.messages.push(resp);
1260     buffer.items++;
1261     buffer.bytes += resp->getMessageSize();
1262
1263     return ENGINE_SUCCESS;
1264 }
1265
1266 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes) {
1267     LockHolder lh(buffer.bufMutex);
1268     uint32_t count = 0;
1269     uint32_t message_bytes = 0;
1270     uint32_t total_bytes_processed = 0;
1271     bool failed = false;
1272
1273     if (buffer.messages.empty()) {
1274         return all_processed;
1275     }
1276
1277     while (count < PassiveStream::batchSize && !buffer.messages.empty()) {
1278         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1279         DcpResponse *response = buffer.messages.front();
1280         message_bytes = response->getMessageSize();
1281
1282         switch (response->getEvent()) {
1283             case DCP_MUTATION:
1284                 ret = processMutation(static_cast<MutationResponse*>(response));
1285                 break;
1286             case DCP_DELETION:
1287             case DCP_EXPIRATION:
1288                 ret = processDeletion(static_cast<MutationResponse*>(response));
1289                 break;
1290             case DCP_SNAPSHOT_MARKER:
1291                 processMarker(static_cast<SnapshotMarker*>(response));
1292                 break;
1293             case DCP_SET_VBUCKET:
1294                 processSetVBucketState(static_cast<SetVBucketState*>(response));
1295                 break;
1296             case DCP_STREAM_END:
1297                 transitionState(STREAM_DEAD);
1298                 break;
1299             default:
1300                 abort();
1301         }
1302
1303         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1304             failed = true;
1305             break;
1306         }
1307
1308         delete response;
1309         buffer.messages.pop();
1310         buffer.items--;
1311         buffer.bytes -= message_bytes;
1312         count++;
1313         if (ret != ENGINE_ERANGE) {
1314             total_bytes_processed += message_bytes;
1315         }
1316     }
1317
1318     processed_bytes = total_bytes_processed;
1319
1320     if (failed) {
1321         return cannot_process;
1322     }
1323
1324     return all_processed;
1325 }
1326
1327 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1328     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1329     if (!vb) {
1330         return ENGINE_NOT_MY_VBUCKET;
1331     }
1332
1333     if (mutation->getBySeqno() > cur_snapshot_end) {
1334         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous mutation [sequence "
1335             "number (%llu) greater than current snapshot end seqno (%llu)] "
1336             "being processed; Dropping the mutation!", consumer->logHeader(),
1337             vb_, mutation->getBySeqno(), cur_snapshot_end);
1338         return ENGINE_ERANGE;
1339     }
1340
1341     ENGINE_ERROR_CODE ret;
1342     if (saveSnapshot) {
1343         LockHolder lh = vb->getSnapshotLock();
1344         ret = commitMutation(mutation, vb->isBackfillPhase());
1345         vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
1346         saveSnapshot = false;
1347         lh.unlock();
1348     } else {
1349         ret = commitMutation(mutation, vb->isBackfillPhase());
1350     }
1351
1352     // We should probably handle these error codes in a better way, but since
1353     // the producer side doesn't do anything with them anyways let's just log
1354     // them for now until we come up with a better solution.
1355     if (ret != ENGINE_SUCCESS) {
1356         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1357             "process  mutation", consumer->logHeader(), ret);
1358     } else {
1359         handleSnapshotEnd(vb, mutation->getBySeqno());
1360     }
1361
1362     return ret;
1363 }
1364
1365 ENGINE_ERROR_CODE PassiveStream::commitMutation(MutationResponse* mutation,
1366                                                 bool backfillPhase) {
1367     if (backfillPhase) {
1368         return engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1369                                                         INITIAL_NRU_VALUE,
1370                                                         false);
1371     } else {
1372         return engine->getEpStore()->setWithMeta(*mutation->getItem(), 0,
1373                                                  consumer->getCookie(), true,
1374                                                  true, INITIAL_NRU_VALUE, false,
1375                                                  true);
1376     }
1377 }
1378
1379 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1380     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1381     if (!vb) {
1382         return ENGINE_NOT_MY_VBUCKET;
1383     }
1384
1385     if (deletion->getBySeqno() > cur_snapshot_end) {
1386         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous deletion [sequence "
1387             "number (%llu) greater than current snapshot end seqno (%llu)] "
1388             "being processed; Dropping the deletion!", consumer->logHeader(),
1389             vb_, deletion->getBySeqno(), cur_snapshot_end);
1390         return ENGINE_ERANGE;
1391     }
1392
1393     ENGINE_ERROR_CODE ret;
1394     if (saveSnapshot) {
1395         LockHolder lh = vb->getSnapshotLock();
1396         ret = commitDeletion(deletion, vb->isBackfillPhase());
1397         vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
1398         saveSnapshot = false;
1399         lh.unlock();
1400     } else {
1401         ret = commitDeletion(deletion, vb->isBackfillPhase());
1402     }
1403
1404     if (ret == ENGINE_KEY_ENOENT) {
1405         ret = ENGINE_SUCCESS;
1406     }
1407
1408     // We should probably handle these error codes in a better way, but since
1409     // the producer side doesn't do anything with them anyways let's just log
1410     // them for now until we come up with a better solution.
1411     if (ret != ENGINE_SUCCESS) {
1412         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1413             "process  deletion", consumer->logHeader(), ret);
1414     } else {
1415         handleSnapshotEnd(vb, deletion->getBySeqno());
1416     }
1417
1418     return ret;
1419 }
1420
1421 ENGINE_ERROR_CODE PassiveStream::commitDeletion(MutationResponse* deletion,
1422                                                 bool backfillPhase) {
1423     uint64_t delCas = 0;
1424     ItemMetaData meta = deletion->getItem()->getMetaData();
1425     return engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1426                                                 &delCas, deletion->getVBucket(),
1427                                                 consumer->getCookie(), true,
1428                                                 &meta, backfillPhase, false,
1429                                                 deletion->getBySeqno(), true);
1430 }
1431
1432 void PassiveStream::processMarker(SnapshotMarker* marker) {
1433     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1434
1435     cur_snapshot_start = marker->getStartSeqno();
1436     cur_snapshot_end = marker->getEndSeqno();
1437     cur_snapshot_type = (marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory;
1438     saveSnapshot = true;
1439
1440     if (vb) {
1441         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1442             vb->setBackfillPhase(true);
1443             /* When replica vb is in backfill phase, then open checkpoint id
1444                is set to 0. */
1445             vb->checkpointManager.setOpenCheckpointId(0);
1446         } else {
1447             if (marker->getFlags() & MARKER_FLAG_CHK ||
1448                 vb->checkpointManager.getOpenCheckpointId() == 0) {
1449                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1450                 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1451             }
1452             vb->setBackfillPhase(false);
1453         }
1454
1455         if (marker->getFlags() & MARKER_FLAG_ACK) {
1456             cur_snapshot_ack = true;
1457         }
1458     }
1459 }
1460
1461 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1462     engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1463
1464     LockHolder lh (streamMutex);
1465     pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1466     lh.unlock();
1467     bool inverse = false;
1468     if (itemsReady.compare_exchange_strong(inverse, true)) {
1469         consumer->notifyStreamReady(vb_);
1470     }
1471 }
1472
1473 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1474     if (byseqno == cur_snapshot_end) {
1475         if (cur_snapshot_type == disk && vb->isBackfillPhase()) {
1476             vb->setBackfillPhase(false);
1477             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1478             vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1479         } else {
1480             double maxSize = static_cast<double>(engine->getEpStats().getMaxDataSize());
1481             double mem_threshold = StoredValue::getMutationMemThreshold();
1482             double mem_used = static_cast<double>(engine->getEpStats().getTotalMemoryUsed());
1483             if (maxSize * mem_threshold < mem_used) {
1484                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1485                 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1486             }
1487         }
1488
1489         if (cur_snapshot_ack) {
1490             LockHolder lh(streamMutex);
1491             pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1492             lh.unlock();
1493             bool inverse = false;
1494             if (itemsReady.compare_exchange_strong(inverse, true)) {
1495                 consumer->notifyStreamReady(vb_);
1496             }
1497             cur_snapshot_ack = false;
1498         }
1499         cur_snapshot_type = none;
1500         vb->setCurrentSnapshot(byseqno, byseqno);
1501     }
1502 }
1503
1504 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1505     Stream::addStats(add_stat, c);
1506
1507     const int bsize = 128;
1508     char buf[bsize];
1509     snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
1510     add_casted_stat(buf, buffer.items, add_stat, c);
1511     snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
1512     add_casted_stat(buf, buffer.bytes, add_stat, c);
1513     snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
1514     add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1515     snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
1516     add_casted_stat(buf, last_seqno, add_stat, c);
1517     snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
1518     add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1519
1520     snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
1521     add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type), add_stat, c);
1522
1523     if (cur_snapshot_type != none) {
1524         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
1525         add_casted_stat(buf, cur_snapshot_start, add_stat, c);
1526         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
1527         add_casted_stat(buf, cur_snapshot_end, add_stat, c);
1528     }
1529 }
1530
1531 DcpResponse* PassiveStream::next() {
1532     LockHolder lh(streamMutex);
1533
1534     if (readyQ.empty()) {
1535         itemsReady.store(false);
1536         return NULL;
1537     }
1538
1539     DcpResponse* response = readyQ.front();
1540     popFromReadyQ();
1541     return response;
1542 }
1543
1544 uint32_t PassiveStream::clearBuffer() {
1545     LockHolder lh(buffer.bufMutex);
1546     uint32_t unackedBytes = buffer.bytes;
1547
1548     while (!buffer.messages.empty()) {
1549         DcpResponse* resp = buffer.messages.front();
1550         buffer.messages.pop();
1551         delete resp;
1552     }
1553
1554     buffer.bytes = 0;
1555     buffer.items = 0;
1556     return unackedBytes;
1557 }
1558
1559 void PassiveStream::transitionState(stream_state_t newState) {
1560     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1561         consumer->logHeader(), vb_, stateName(state_), stateName(newState));
1562
1563     if (state_ == newState) {
1564         return;
1565     }
1566
1567     switch (state_) {
1568         case STREAM_PENDING:
1569             cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
1570             break;
1571         case STREAM_READING:
1572             cb_assert(newState == STREAM_PENDING || newState == STREAM_DEAD);
1573             break;
1574         default:
1575             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1576                 "to %s", consumer->logHeader(), vb_, stateName(state_),
1577                 stateName(newState));
1578             abort();
1579     }
1580
1581     state_ = newState;
1582 }
1583
1584 const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
1585 {
1586     switch (status) {
1587         case END_STREAM_CLOSED:
1588             return "The stream closed due to a close stream message";
1589         case END_STREAM_DISCONNECTED:
1590             return "The stream closed early because the conn was disconnected";
1591         default:
1592             break;
1593     }
1594     return "Status unknown; this should not happen";
1595 }