MB-19673: Log the actual last seqno sent before closing the stream.
[ep-engine.git] / src / dcp-stream.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "kvstore.h"
23 #include "statwriter.h"
24 #include "dcp-stream.h"
25 #include "dcp-consumer.h"
26 #include "dcp-producer.h"
27 #include "dcp-response.h"
28
29 #define DCP_BACKFILL_SLEEP_TIME 2
30
31 static const char* snapshotTypeToString(snapshot_type_t type) {
32     static const char * const snapshotTypes[] = { "none", "disk", "memory" };
33     cb_assert(type >= none && type <= memory);
34     return snapshotTypes[type];
35 }
36
37 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
38 const size_t PassiveStream::batchSize = 10;
39
40 class SnapshotMarkerCallback : public Callback<SeqnoRange> {
41 public:
42     SnapshotMarkerCallback(stream_t s)
43         : stream(s) {
44         cb_assert(s->getType() == STREAM_ACTIVE);
45     }
46
47     void callback(SeqnoRange &range) {
48         uint64_t st = range.getStartSeqno();
49         uint64_t en = range.getEndSeqno();
50         static_cast<ActiveStream*>(stream.get())->markDiskSnapshot(st, en);
51     }
52
53 private:
54     stream_t stream;
55 };
56
57 class CacheCallback : public Callback<CacheLookup> {
58 public:
59     CacheCallback(EventuallyPersistentEngine* e, stream_t &s)
60         : engine_(e), stream_(s) {
61         Stream *str = stream_.get();
62         if (str) {
63             cb_assert(str->getType() == STREAM_ACTIVE);
64         }
65     }
66
67     void callback(CacheLookup &lookup);
68
69 private:
70     EventuallyPersistentEngine* engine_;
71     stream_t stream_;
72 };
73
74 void CacheCallback::callback(CacheLookup &lookup) {
75     RCPtr<VBucket> vb = engine_->getEpStore()->getVBucket(lookup.getVBucketId());
76     if (!vb) {
77         setStatus(ENGINE_SUCCESS);
78         return;
79     }
80
81     int bucket_num(0);
82     LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
83     StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num, false, false);
84     if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
85         Item* it = v->toItem(false, lookup.getVBucketId());
86         lh.unlock();
87         static_cast<ActiveStream*>(stream_.get())->backfillReceived(it);
88         setStatus(ENGINE_KEY_EEXISTS);
89     } else {
90         setStatus(ENGINE_SUCCESS);
91     }
92 }
93
94 class DiskCallback : public Callback<GetValue> {
95 public:
96     DiskCallback(stream_t &s)
97         : stream_(s) {
98         Stream *str = stream_.get();
99         if (str) {
100             cb_assert(str->getType() == STREAM_ACTIVE);
101         }
102     }
103
104     void callback(GetValue &val) {
105         cb_assert(val.getValue());
106         ActiveStream* active_stream = static_cast<ActiveStream*>(stream_.get());
107         active_stream->backfillReceived(val.getValue());
108     }
109
110 private:
111     stream_t stream_;
112 };
113
114 class DCPBackfill : public GlobalTask {
115 public:
116     DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
117                 uint64_t start_seqno, uint64_t end_seqno, const Priority &p,
118                 double sleeptime = 0, bool shutdown = false)
119         : GlobalTask(e, p, sleeptime, shutdown), engine(e), stream(s),
120           startSeqno(start_seqno), endSeqno(end_seqno) {
121         cb_assert(stream->getType() == STREAM_ACTIVE);
122     }
123
124     bool run();
125
126     std::string getDescription();
127
128 private:
129     EventuallyPersistentEngine *engine;
130     stream_t                    stream;
131     uint64_t                    startSeqno;
132     uint64_t                    endSeqno;
133 };
134
135 bool DCPBackfill::run() {
136     uint16_t vbid = stream->getVBucket();
137
138     if (engine->getEpStore()->isMemoryUsageTooHigh()) {
139         LOG(EXTENSION_LOG_WARNING, "VBucket %d dcp backfill task temporarily "
140                 "suspended  because the current memory usage is too high",
141                 vbid);
142         snooze(DCP_BACKFILL_SLEEP_TIME);
143         return true;
144     }
145
146     uint64_t lastPersistedSeqno =
147         engine->getEpStore()->getLastPersistedSeqno(vbid);
148     uint64_t diskSeqno =
149         engine->getEpStore()->getRWUnderlying(vbid)->getLastPersistedSeqno(vbid);
150
151     if (lastPersistedSeqno < endSeqno) {
152         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Rescheduling backfill "
153             "because backfill up to seqno %llu is needed but only up to "
154             "%llu is persisted (disk %llu)",
155             static_cast<ActiveStream*>(stream.get())->logHeader(), vbid,
156             endSeqno, lastPersistedSeqno, diskSeqno);
157         snooze(DCP_BACKFILL_SLEEP_TIME);
158         return true;
159     }
160
161     KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
162     size_t numItems = kvstore->getNumItems(vbid, startSeqno,
163                                            std::numeric_limits<uint64_t>::max());
164     static_cast<ActiveStream*>(stream.get())->incrBackfillRemaining(numItems);
165
166     shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
167     shared_ptr<Callback<CacheLookup> > cl(new CacheCallback(engine, stream));
168     shared_ptr<Callback<SeqnoRange> > sr(new SnapshotMarkerCallback(stream));
169     kvstore->dump(vbid, startSeqno, cb, cl, sr);
170
171     static_cast<ActiveStream*>(stream.get())->completeBackfill();
172
173     LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") "
174         "Backfill task (%" PRIu64 " to %" PRIu64 ") "
175         "finished. disk seqno %" PRIu64 " memory seqno %" PRIu64 "",
176         static_cast<ActiveStream*>(stream.get())->logHeader(), vbid,
177         startSeqno, endSeqno, diskSeqno, lastPersistedSeqno);
178
179     return false;
180 }
181
182 std::string DCPBackfill::getDescription() {
183     std::stringstream ss;
184     ss << "DCP backfill for vbucket " << stream->getVBucket();
185     return ss.str();
186 }
187
188 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
189                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
190                uint64_t vb_uuid, uint64_t snap_start_seqno,
191                uint64_t snap_end_seqno)
192     : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
193       start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
194       snap_start_seqno_(snap_start_seqno),
195       snap_end_seqno_(snap_end_seqno),
196       state_(STREAM_PENDING), itemsReady(false), readyQueueMemory(0) {
197 }
198
199 void Stream::clear_UNLOCKED() {
200     while (!readyQ.empty()) {
201         DcpResponse* resp = readyQ.front();
202         popFromReadyQ();
203         delete resp;
204     }
205 }
206
207 void Stream::pushToReadyQ(DcpResponse* resp)
208 {
209     if (resp) {
210         readyQ.push(resp);
211         readyQueueMemory.fetch_add(resp->getMessageSize(),
212                                    memory_order_relaxed);
213     }
214 }
215
216 void Stream::popFromReadyQ(void)
217 {
218     if (!readyQ.empty()) {
219         uint32_t respSize = readyQ.front()->getMessageSize();
220         readyQ.pop();
221         /* Decrement the readyQ size */
222         if (respSize <= readyQueueMemory.load(memory_order_relaxed)) {
223             readyQueueMemory.fetch_sub(respSize, memory_order_relaxed);
224         } else {
225             LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
226                 "underflow, likely wrong stat calculation! curr size: %llu;"
227                 "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory.load(),
228                 respSize);
229             readyQueueMemory.store(0, memory_order_relaxed);
230         }
231     }
232 }
233
234 uint64_t Stream::getReadyQueueMemory() {
235     return readyQueueMemory.load(memory_order_relaxed);
236 }
237
238 const char * Stream::stateName(stream_state_t st) const {
239     static const char * const stateNames[] = {
240         "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
241         "reading", "dead"
242     };
243     cb_assert(st >= STREAM_PENDING && st <= STREAM_DEAD);
244     return stateNames[st];
245 }
246
247 void Stream::addStats(ADD_STAT add_stat, const void *c) {
248     const int bsize = 128;
249     char buffer[bsize];
250     snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_);
251     add_casted_stat(buffer, flags_, add_stat, c);
252     snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_);
253     add_casted_stat(buffer, opaque_, add_stat, c);
254     snprintf(buffer, bsize, "%s:stream_%d_start_seqno", name_.c_str(), vb_);
255     add_casted_stat(buffer, start_seqno_, add_stat, c);
256     snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(), vb_);
257     add_casted_stat(buffer, end_seqno_, add_stat, c);
258     snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(), vb_);
259     add_casted_stat(buffer, vb_uuid_, add_stat, c);
260     snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno", name_.c_str(), vb_);
261     add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
262     snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno", name_.c_str(), vb_);
263     add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
264     snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_);
265     add_casted_stat(buffer, stateName(state_), add_stat, c);
266 }
267
268 ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
269                            const std::string &n, uint32_t flags,
270                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
271                            uint64_t en_seqno, uint64_t vb_uuid,
272                            uint64_t snap_start_seqno, uint64_t snap_end_seqno)
273     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
274               snap_start_seqno, snap_end_seqno),
275        lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
276        takeoverState(vbucket_state_pending), backfillRemaining(0),
277        itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
278        waitForSnapshot(0), engine(e), producer(p),
279        isBackfillTaskRunning(false), lastSentSnapEndSeqno(0),
280        chkptItemsExtractionInProgress(false) {
281
282     const char* type = "";
283     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
284         type = "takeover ";
285         end_seqno_ = dcpMaxSeqno;
286     }
287
288     RCPtr<VBucket> vbucket = engine->getVBucket(vb);
289     if (vbucket) {
290         // An atomic read of vbucket state without acquiring the
291         // reader lock for state should suffice here.
292         if (vbucket->getState() == vbucket_state_replica) {
293             uint64_t snapshot_start, snapshot_end;
294             vbucket->getCurrentSnapshot(snapshot_start, snapshot_end);
295             if (snapshot_end > en_seqno) {
296                 end_seqno_ = snapshot_end;
297             }
298         }
299     }
300
301     if (start_seqno_ >= end_seqno_) {
302         endStream(END_STREAM_OK);
303         itemsReady.store(true);
304     }
305
306     type_ = STREAM_ACTIVE;
307
308     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) %sstream created with start seqno "
309         "%llu and end seqno %llu", producer->logHeader(), vb, type, st_seqno,
310         en_seqno);
311 }
312
313 DcpResponse* ActiveStream::next() {
314     LockHolder lh(streamMutex);
315
316     stream_state_t initState = state_;
317
318     DcpResponse* response = NULL;
319
320     switch (initState) {
321         case STREAM_PENDING:
322             break;
323         case STREAM_BACKFILLING:
324             response = backfillPhase();
325             break;
326         case STREAM_IN_MEMORY:
327             response = inMemoryPhase();
328             break;
329         case STREAM_TAKEOVER_SEND:
330             response = takeoverSendPhase();
331             break;
332         case STREAM_TAKEOVER_WAIT:
333             response = takeoverWaitPhase();
334             break;
335         case STREAM_DEAD:
336             response = deadPhase();
337             break;
338         default:
339             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
340                 producer->logHeader(), vb_, stateName(state_));
341             abort();
342     }
343
344     stream_state_t newState = state_;
345
346     if (newState != STREAM_DEAD && newState != state_ && !response) {
347         lh.unlock();
348         return next();
349     }
350
351     itemsReady.store(response ? true : false);
352     return response;
353 }
354
355 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
356     LockHolder lh(streamMutex);
357     uint64_t chkCursorSeqno = endSeqno;
358
359     if (state_ != STREAM_BACKFILLING) {
360         return;
361     }
362
363     startSeqno = std::min(snap_start_seqno_, startSeqno);
364     firstMarkerSent = true;
365
366     RCPtr<VBucket> vb = engine->getVBucket(vb_);
367     // An atomic read of vbucket state without acquiring the
368     // reader lock for state should suffice here.
369     if (vb && vb->getState() == vbucket_state_replica) {
370         if (end_seqno_ > endSeqno) {
371             /* We possibly have items in the open checkpoint
372                (incomplete snapshot) */
373             LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Merging backfill"
374                 " and memory snapshot for a replica vbucket, start seqno "
375                 "%" PRIu64 " and end seqno %" PRIu64 "",
376                 producer->logHeader(), vb_, startSeqno, endSeqno);
377             endSeqno = end_seqno_;
378         }
379     }
380
381     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
382         "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
383         endSeqno);
384     pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
385                                     MARKER_FLAG_DISK));
386     lastSentSnapEndSeqno = endSeqno;
387
388     if (!vb) {
389         endStream(END_STREAM_STATE);
390     } else {
391         if (endSeqno > end_seqno_) {
392             chkCursorSeqno = end_seqno_;
393         }
394         // Only re-register the cursor if we still need to get memory snapshots
395         CursorRegResult result =
396             vb->checkpointManager.registerTAPCursorBySeqno(name_,
397                                                            chkCursorSeqno);
398         curChkSeqno = result.first;
399     }
400
401     lh.unlock();
402     bool inverse = false;
403     if (itemsReady.compare_exchange_strong(inverse, true)) {
404         producer->notifyStreamReady(vb_, false);
405     }
406 }
407
408 void ActiveStream::backfillReceived(Item* itm) {
409     LockHolder lh(streamMutex);
410     if (state_ == STREAM_BACKFILLING) {
411         pushToReadyQ(new MutationResponse(itm, opaque_));
412         lastReadSeqno = itm->getBySeqno();
413         lh.unlock();
414         bool inverse = false;
415         if (itemsReady.compare_exchange_strong(inverse, true)) {
416             producer->notifyStreamReady(vb_, false);
417         }
418     } else {
419         delete itm;
420     }
421 }
422
423 void ActiveStream::completeBackfill() {
424     {
425         LockHolder lh(streamMutex);
426         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
427             " from disk, last seqno read: %ld", producer->logHeader(), vb_,
428             itemsFromBackfill, lastReadSeqno.load());
429     }
430
431     isBackfillTaskRunning.store(false);
432     bool inverse = false;
433     if (itemsReady.compare_exchange_strong(inverse, true)) {
434         producer->notifyStreamReady(vb_, false);
435     }
436 }
437
438 void ActiveStream::snapshotMarkerAckReceived() {
439     bool inverse = false;
440     if (--waitForSnapshot == 0 &&
441         itemsReady.compare_exchange_strong(inverse, true)) {
442         producer->notifyStreamReady(vb_, true);
443     }
444 }
445
446 void ActiveStream::setVBucketStateAckRecieved() {
447     LockHolder lh(streamMutex);
448     if (state_ == STREAM_TAKEOVER_WAIT) {
449         if (takeoverState == vbucket_state_pending) {
450             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
451                 "vbucket state to pending message", producer->logHeader(), vb_);
452
453             takeoverState = vbucket_state_active;
454             transitionState(STREAM_TAKEOVER_SEND);
455             lh.unlock();
456
457             engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
458                                                   false, false);
459             RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
460             LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Vbucket marked as "
461                 "dead, last sent seqno: %" PRIu64 ", high seqno: %" PRIu64 "",
462                 producer->logHeader(), vb_, lastSentSeqno.load(),
463                 vbucket->getHighSeqno());
464         } else {
465             LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
466                 "vbucket state to active message", producer->logHeader(), vb_);
467             endStream(END_STREAM_OK);
468             lh.unlock();
469         }
470
471         bool inverse = false;
472         if (itemsReady.compare_exchange_strong(inverse, true)) {
473             producer->notifyStreamReady(vb_, true);
474         }
475     } else {
476         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
477             "op on stream '%s' state '%s'", producer->logHeader(), vb_,
478             name_.c_str(), stateName(state_));
479     }
480
481 }
482
483 DcpResponse* ActiveStream::backfillPhase() {
484     DcpResponse* resp = nextQueuedItem();
485
486     if (resp && backfillRemaining > 0 &&
487         (resp->getEvent() == DCP_MUTATION ||
488          resp->getEvent() == DCP_DELETION ||
489          resp->getEvent() == DCP_EXPIRATION)) {
490         backfillRemaining--;
491     }
492
493     if (!isBackfillTaskRunning && readyQ.empty()) {
494         backfillRemaining.store(0, memory_order_relaxed);
495         if (lastReadSeqno.load() >= end_seqno_) {
496             endStream(END_STREAM_OK);
497         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
498             transitionState(STREAM_TAKEOVER_SEND);
499         } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
500             endStream(END_STREAM_OK);
501         } else {
502             transitionState(STREAM_IN_MEMORY);
503         }
504
505         if (!resp) {
506             resp = nextQueuedItem();
507         }
508     }
509
510     return resp;
511 }
512
513 DcpResponse* ActiveStream::inMemoryPhase() {
514     if (lastSentSeqno.load() >= end_seqno_) {
515         endStream(END_STREAM_OK);
516     } else if (readyQ.empty()) {
517         if (nextCheckpointItem()) {
518             return NULL;
519         }
520     }
521
522     return nextQueuedItem();
523 }
524
525 DcpResponse* ActiveStream::takeoverSendPhase() {
526     if (!readyQ.empty()) {
527         return nextQueuedItem();
528     } else {
529         if (nextCheckpointItem()) {
530             return NULL;
531         }
532     }
533
534     if (waitForSnapshot != 0) {
535         return NULL;
536     }
537     DcpResponse* resp = NULL;
538     if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
539         resp = new SetVBucketState(opaque_, vb_, takeoverState);
540         transitionState(STREAM_TAKEOVER_WAIT);
541     }
542     return resp;
543 }
544
545 DcpResponse* ActiveStream::takeoverWaitPhase() {
546     return nextQueuedItem();
547 }
548
549 DcpResponse* ActiveStream::deadPhase() {
550     DcpResponse* resp = nextQueuedItem();
551     if (!resp) {
552         LOG(EXTENSION_LOG_WARNING,
553             "(vb %" PRIu16 ") Stream closed, "
554             "%" PRIu64 " items sent from backfill phase, "
555             "%" PRIu64 " items sent from memory phase, "
556             "%" PRIu64 " was last seqno sent",
557             vb_,
558             uint64_t(itemsFromBackfill),
559             uint64_t(itemsFromMemory),
560             lastSentSeqno.load());
561     }
562     return resp;
563 }
564
565 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
566     Stream::addStats(add_stat, c);
567
568     const int bsize = 128;
569     char buffer[bsize];
570     snprintf(buffer, bsize, "%s:stream_%d_backfilled", name_.c_str(), vb_);
571     add_casted_stat(buffer, itemsFromBackfill, add_stat, c);
572     snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
573     add_casted_stat(buffer, itemsFromMemory, add_stat, c);
574     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
575     add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
576     snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
577     add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
578     snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
579     add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
580     snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
581     add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat, c);
582 }
583
584 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
585     LockHolder lh(streamMutex);
586
587     RCPtr<VBucket> vb = engine->getVBucket(vb_);
588     add_casted_stat("name", name_, add_stat, cookie);
589     if (!vb || state_ == STREAM_DEAD) {
590         add_casted_stat("status", "completed", add_stat, cookie);
591         add_casted_stat("estimate", 0, add_stat, cookie);
592         add_casted_stat("backfillRemaining", 0, add_stat, cookie);
593         add_casted_stat("estimate", 0, add_stat, cookie);
594         return;
595     }
596
597     size_t total = backfillRemaining;
598     if (state_ == STREAM_BACKFILLING) {
599         add_casted_stat("status", "backfilling", add_stat, cookie);
600     } else {
601         add_casted_stat("status", "in-memory", add_stat, cookie);
602     }
603     add_casted_stat("backfillRemaining", backfillRemaining, add_stat, cookie);
604
605     item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
606     size_t vb_items = vb->getNumItems(iep);
607     size_t chk_items = vb_items > 0 ?
608                 vb->checkpointManager.getNumItemsForTAPConnection(name_) : 0;
609     size_t del_items = engine->getEpStore()->getRWUnderlying(vb_)->
610                                                     getNumPersistedDeletes(vb_);
611
612     if (end_seqno_ < curChkSeqno) {
613         chk_items = 0;
614     } else if ((end_seqno_ - curChkSeqno) < chk_items) {
615         chk_items = end_seqno_ - curChkSeqno + 1;
616     }
617     total += chk_items;
618
619     add_casted_stat("estimate", total, add_stat, cookie);
620     add_casted_stat("chk_items", chk_items, add_stat, cookie);
621     add_casted_stat("vb_items", vb_items, add_stat, cookie);
622     add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
623 }
624
625 DcpResponse* ActiveStream::nextQueuedItem() {
626     if (!readyQ.empty()) {
627         DcpResponse* response = readyQ.front();
628         if (producer->bufferLogInsert(response->getMessageSize())) {
629             if (response->getEvent() == DCP_MUTATION ||
630                 response->getEvent() == DCP_DELETION ||
631                 response->getEvent() == DCP_EXPIRATION) {
632                 lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
633
634                 if (state_ == STREAM_BACKFILLING) {
635                     itemsFromBackfill++;
636                 } else {
637                     itemsFromMemory++;
638                 }
639             }
640             popFromReadyQ();
641             return response;
642         }
643     }
644     return NULL;
645 }
646
647 bool ActiveStream::nextCheckpointItem() {
648     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
649     if (vbucket && vbucket->checkpointManager.getNumItemsForTAPConnection(name_) > 0) {
650         // schedule this stream to build the next checkpoint
651         producer->scheduleCheckpointProcessorTask(this);
652         return true;
653     } else if (chkptItemsExtractionInProgress) {
654         return true;
655     }
656     return false;
657 }
658
659 bool ActiveStreamCheckpointProcessorTask::run() {
660     if (engine->getEpStats().isShutdown) {
661         return false;
662     }
663
664     // Setup that we will sleep forever when done.
665     snooze(INT_MAX);
666
667     // Clear the notfification flag
668     notified.store(false);
669
670     size_t iterations = 0;
671     do {
672         stream_t nextStream = queuePop();
673         ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
674
675         if (stream) {
676             stream->nextCheckpointItemTask();
677         } else {
678             break;
679         }
680         iterations++;
681     } while(!queueEmpty()
682             && iterations < iterationsBeforeYield);
683
684     // Now check if we were re-notified or there are still checkpoints
685     bool expected = true;
686     if (notified.compare_exchange_strong(expected, false)
687         || !queueEmpty()) {
688         // snooze for 0, essentially yielding and allowing other tasks a go
689         snooze(0.0);
690     }
691
692     return true;
693 }
694
695 void ActiveStreamCheckpointProcessorTask::wakeup() {
696     ExecutorPool::get()->wake(getId());
697 }
698
699 void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
700     pushUnique(stream);
701
702     bool expected = false;
703     if (notified.compare_exchange_strong(expected, true)) {
704         wakeup();
705     }
706 }
707
708 void ActiveStreamCheckpointProcessorTask::clearQueues() {
709     LockHolder lh(workQueueLock);
710     while (!queue.empty()) {
711         queue.pop();
712     }
713     queuedVbuckets.clear();
714 }
715
716 void ActiveStream::nextCheckpointItemTask() {
717     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
718     if (vbucket) {
719         std::deque<queued_item> items;
720         getOutstandingItems(vbucket, items);
721         processItems(items);
722     } else {
723         /* The entity deleting the vbucket must set stream to dead,
724            calling setDead(END_STREAM_STATE) will cause deadlock because
725            it will try to grab streamMutex which is already acquired at this
726            point here */
727         return;
728     }
729 }
730
731 void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
732                                        std::deque<queued_item> &items) {
733     // Commencing item processing - set guard flag.
734     chkptItemsExtractionInProgress.store(true);
735
736     vb->checkpointManager.getAllItemsForCursor(name_, items);
737     if (vb->checkpointManager.getNumCheckpoints() > 1) {
738         engine->getEpStore()->wakeUpCheckpointRemover();
739     }
740 }
741
742
743 void ActiveStream::processItems(std::deque<queued_item>& items) {
744     if (!items.empty()) {
745         bool mark = false;
746         if (items.front()->getOperation() == queue_op_checkpoint_start) {
747             mark = true;
748         }
749
750         std::deque<MutationResponse*> mutations;
751         std::deque<queued_item>::iterator itemItr;
752         for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
753             queued_item& qi = *itemItr;
754
755             if (qi->getOperation() == queue_op_set ||
756                 qi->getOperation() == queue_op_del) {
757                 curChkSeqno = qi->getBySeqno();
758                 lastReadSeqno = qi->getBySeqno();
759
760                 mutations.push_back(new MutationResponse(qi, opaque_));
761             } else if (qi->getOperation() == queue_op_checkpoint_start) {
762                 snapshot(mutations, mark);
763                 mark = true;
764             }
765         }
766
767         if (mutations.empty()) {
768             // If we only got checkpoint start or ends check to see if there are
769             // any more snapshots before pausing the stream.
770             nextCheckpointItemTask();
771         } else {
772             snapshot(mutations, mark);
773         }
774     }
775
776     // Completed item processing - clear guard flag and notify producer.
777     chkptItemsExtractionInProgress.store(false);
778     producer->notifyStreamReady(vb_, true);
779 }
780
781 void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
782     if (items.empty()) {
783         return;
784     }
785
786     LockHolder lh(streamMutex);
787
788     if (isCurrentSnapshotCompleted()) {
789         uint32_t flags = MARKER_FLAG_MEMORY;
790         uint64_t snapStart = items.front()->getBySeqno();
791         uint64_t snapEnd = items.back()->getBySeqno();
792
793         if (mark) {
794             flags |= MARKER_FLAG_CHK;
795         }
796
797         if (state_ == STREAM_TAKEOVER_SEND) {
798             waitForSnapshot++;
799             flags |= MARKER_FLAG_ACK;
800         }
801
802         if (!firstMarkerSent) {
803             snapStart = std::min(snap_start_seqno_, snapStart);
804             firstMarkerSent = true;
805         }
806         pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
807                                         flags));
808         lastSentSnapEndSeqno = snapEnd;
809     }
810
811     std::deque<MutationResponse*>::iterator itemItr;
812     for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
813         pushToReadyQ(*itemItr);
814     }
815 }
816
817 uint32_t ActiveStream::setDead(end_stream_status_t status) {
818     {
819         LockHolder lh(streamMutex);
820         endStream(status);
821     }
822
823     bool inverse = false;
824     if (status != END_STREAM_DISCONNECTED &&
825         itemsReady.compare_exchange_strong(inverse, true)) {
826         producer->notifyStreamReady(vb_, true);
827     }
828     return 0;
829 }
830
831 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
832     if (state_ != STREAM_DEAD) {
833         bool inverse = false;
834         if (itemsReady.compare_exchange_strong(inverse, true)) {
835             producer->notifyStreamReady(vb_, true);
836         }
837     }
838 }
839
840 void ActiveStream::endStream(end_stream_status_t reason) {
841     if (state_ != STREAM_DEAD) {
842         if (reason != END_STREAM_DISCONNECTED) {
843             pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
844         }
845         transitionState(STREAM_DEAD);
846         LOG(EXTENSION_LOG_WARNING,
847             "(vb %" PRIu16 ") Stream closing, "
848             "sent until seqno %" PRIu64 " "
849             "reason: %s",
850             vb_,
851             lastSentSeqno.load(),
852             getEndStreamStatusStr(reason));
853     }
854 }
855
856 void ActiveStream::scheduleBackfill() {
857     if (!isBackfillTaskRunning) {
858         RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
859         if (!vbucket) {
860             return;
861         }
862
863         CursorRegResult result =
864             vbucket->checkpointManager.registerTAPCursorBySeqno(name_,
865                                                                 lastReadSeqno);
866
867         curChkSeqno = result.first;
868         bool isFirstItem = result.second;
869
870         cb_assert(lastReadSeqno <= curChkSeqno);
871         uint64_t backfillStart = lastReadSeqno + 1;
872
873         /* We need to find the minimum seqno that needs to be backfilled in
874          * order to make sure that we don't miss anything when transitioning
875          * to a memory snapshot. The backfill task will always make sure that
876          * the backfill end seqno is contained in the backfill.
877          */
878         uint64_t backfillEnd = 0;
879         if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) { // disk backfill only
880             backfillEnd = end_seqno_;
881         } else { // disk backfill + in-memory streaming
882             if (backfillStart < curChkSeqno) {
883                 if (curChkSeqno > end_seqno_) {
884                     backfillEnd = end_seqno_;
885                 } else {
886                     backfillEnd = curChkSeqno - 1;
887                 }
888             }
889         }
890
891         bool tryBackfill = isFirstItem || flags_ & DCP_ADD_STREAM_FLAG_DISKONLY;
892
893         if (backfillStart <= backfillEnd && tryBackfill) {
894             ExTask task = new DCPBackfill(engine, this, backfillStart, backfillEnd,
895                                           Priority::TapBgFetcherPriority, 0, false);
896             ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
897             isBackfillTaskRunning.store(true);
898         } else {
899             if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
900                 endStream(END_STREAM_OK);
901             } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
902                 transitionState(STREAM_TAKEOVER_SEND);
903             } else {
904                 transitionState(STREAM_IN_MEMORY);
905             }
906             itemsReady.store(true);
907         }
908     }
909 }
910
911 const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
912 {
913     switch (status) {
914         case END_STREAM_OK:
915             return "The stream ended due to all items being streamed";
916         case END_STREAM_CLOSED:
917             return "The stream closed early due to a close stream message";
918         case END_STREAM_STATE:
919             return "The stream closed early because the vbucket state changed";
920         case END_STREAM_DISCONNECTED:
921             return "The stream closed early because the conn was disconnected";
922         default:
923             break;
924     }
925     return "Status unknown; this should not happen";
926 }
927
928 void ActiveStream::transitionState(stream_state_t newState) {
929     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
930         producer->logHeader(), vb_, stateName(state_), stateName(newState));
931
932     if (state_ == newState) {
933         return;
934     }
935
936     switch (state_.load()) {
937         case STREAM_PENDING:
938             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
939             break;
940         case STREAM_BACKFILLING:
941             cb_assert(newState == STREAM_IN_MEMORY ||
942                    newState == STREAM_TAKEOVER_SEND ||
943                    newState == STREAM_DEAD);
944             break;
945         case STREAM_IN_MEMORY:
946             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
947             break;
948         case STREAM_TAKEOVER_SEND:
949             cb_assert(newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD);
950             break;
951         case STREAM_TAKEOVER_WAIT:
952             cb_assert(newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD);
953             break;
954         default:
955             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
956                 "to %s", producer->logHeader(), vb_, stateName(state_),
957                 stateName(newState));
958             abort();
959     }
960
961     state_ = newState;
962
963     switch (newState) {
964         case STREAM_BACKFILLING:
965             scheduleBackfill();
966             break;
967         case STREAM_IN_MEMORY:
968             // Check if the producer has sent up till the last requested
969             // sequence number already, if not - move checkpoint items into
970             // the ready queue.
971             if (lastSentSeqno >= end_seqno_) {
972                 // Stream transitioning to DEAD state
973                 endStream(END_STREAM_OK);
974             } else {
975                 nextCheckpointItem();
976             }
977             break;
978         case STREAM_TAKEOVER_SEND:
979             nextCheckpointItem();
980             break;
981         case STREAM_DEAD:
982             {
983                 RCPtr<VBucket> vb = engine->getVBucket(vb_);
984                 if (vb) {
985                     vb->checkpointManager.removeTAPCursor(name_);
986                 }
987                 break;
988             }
989         case STREAM_TAKEOVER_WAIT:
990         case STREAM_PENDING:
991             break;
992         case STREAM_READING:
993             LOG(EXTENSION_LOG_WARNING,
994                 "ActiveStream::transitionState: newState can't be "
995                 "STREAM_READING!");
996             break;
997     }
998 }
999
1000 size_t ActiveStream::getItemsRemaining() {
1001     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1002
1003     if (!vbucket || state_ == STREAM_DEAD) {
1004         return 0;
1005     }
1006
1007     uint64_t high_seqno = vbucket->getHighSeqno();
1008
1009     if (end_seqno_ < high_seqno) {
1010         if (end_seqno_ > lastSentSeqno.load()) {
1011             return (end_seqno_ - lastSentSeqno.load());
1012         }
1013     } else {
1014         if (high_seqno > lastSentSeqno.load()) {
1015             return (high_seqno - lastSentSeqno.load());
1016         }
1017     }
1018
1019     return 0;
1020 }
1021
1022 const char* ActiveStream::logHeader()
1023 {
1024     return producer->logHeader();
1025 }
1026
1027 bool ActiveStream::isCurrentSnapshotCompleted() const
1028 {
1029     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1030     // An atomic read of vbucket state without acquiring the
1031     // reader lock for state should suffice here.
1032     if (vbucket && vbucket->getState() == vbucket_state_replica) {
1033         if (lastSentSnapEndSeqno >= lastReadSeqno) {
1034             return false;
1035         }
1036     }
1037     return true;
1038 }
1039
1040 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
1041                                const std::string &name, uint32_t flags,
1042                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1043                                uint64_t en_seqno, uint64_t vb_uuid,
1044                                uint64_t snap_start_seqno,
1045                                uint64_t snap_end_seqno)
1046     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1047              snap_start_seqno, snap_end_seqno),
1048       producer(p) {
1049     LockHolder lh(streamMutex);
1050     RCPtr<VBucket> vbucket = e->getVBucket(vb_);
1051     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1052         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1053         transitionState(STREAM_DEAD);
1054         itemsReady.store(true);
1055     }
1056
1057     type_ = STREAM_NOTIFIER;
1058
1059     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
1060         "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
1061         en_seqno);
1062 }
1063
1064 uint32_t NotifierStream::setDead(end_stream_status_t status) {
1065     LockHolder lh(streamMutex);
1066     if (state_ != STREAM_DEAD) {
1067         transitionState(STREAM_DEAD);
1068         if (status != END_STREAM_DISCONNECTED) {
1069             pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
1070             lh.unlock();
1071             bool inverse = false;
1072             if (itemsReady.compare_exchange_strong(inverse, true)) {
1073                 producer->notifyStreamReady(vb_, true);
1074             }
1075         }
1076     }
1077     return 0;
1078 }
1079
1080 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1081     LockHolder lh(streamMutex);
1082     if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1083         pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1084         transitionState(STREAM_DEAD);
1085         lh.unlock();
1086         bool inverse = false;
1087         if (itemsReady.compare_exchange_strong(inverse, true)) {
1088             producer->notifyStreamReady(vb_, true);
1089         }
1090     }
1091 }
1092
1093 DcpResponse* NotifierStream::next() {
1094     LockHolder lh(streamMutex);
1095
1096     if (readyQ.empty()) {
1097         itemsReady.store(false);
1098         return NULL;
1099     }
1100
1101     DcpResponse* response = readyQ.front();
1102     if (producer->bufferLogInsert(response->getMessageSize())) {
1103         popFromReadyQ();
1104     } else {
1105         response = NULL;
1106     }
1107
1108     return response;
1109 }
1110
1111 void NotifierStream::transitionState(stream_state_t newState) {
1112     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1113         producer->logHeader(), vb_, stateName(state_), stateName(newState));
1114
1115     if (state_ == newState) {
1116         return;
1117     }
1118
1119     switch (state_.load()) {
1120         case STREAM_PENDING:
1121             cb_assert(newState == STREAM_DEAD);
1122             break;
1123         default:
1124             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1125                 "to %s", producer->logHeader(), vb_, stateName(state_),
1126                 stateName(newState));
1127             abort();
1128     }
1129
1130     state_ = newState;
1131 }
1132
1133 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1134                              const std::string &name, uint32_t flags,
1135                              uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1136                              uint64_t en_seqno, uint64_t vb_uuid,
1137                              uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1138                              uint64_t vb_high_seqno)
1139     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1140              snap_start_seqno, snap_end_seqno),
1141       engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1142       cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false),
1143       saveSnapshot(false) {
1144     LockHolder lh(streamMutex);
1145     pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1146                                   vb_uuid, snap_start_seqno, snap_end_seqno));
1147     itemsReady.store(true);
1148     type_ = STREAM_PASSIVE;
1149
1150     const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1151     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to add %s stream with "
1152         "start seqno %llu, end seqno %llu, vbucket uuid %llu, snap start seqno "
1153         "%llu, snap end seqno %llu, and vb_high_seqno %llu",
1154         consumer->logHeader(), vb, type, st_seqno, en_seqno, vb_uuid,
1155         snap_start_seqno, snap_end_seqno, vb_high_seqno);
1156 }
1157
1158 PassiveStream::~PassiveStream() {
1159     LockHolder lh(streamMutex);
1160     clear_UNLOCKED();
1161     cb_assert(state_ == STREAM_DEAD);
1162     cb_assert(buffer.bytes == 0);
1163 }
1164
1165 uint32_t PassiveStream::setDead(end_stream_status_t status) {
1166     LockHolder lh(streamMutex);
1167     transitionState(STREAM_DEAD);
1168     lh.unlock();
1169     uint32_t unackedBytes = clearBuffer();
1170     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Setting stream to dead state,"
1171         " last_seqno is %llu, unackedBytes is %u, status is %s",
1172         consumer->logHeader(), vb_, last_seqno.load(), unackedBytes,
1173         getEndStreamStatusStr(status));
1174     return unackedBytes;
1175 }
1176
1177 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1178     LockHolder lh(streamMutex);
1179     if (state_ == STREAM_PENDING) {
1180         if (status == ENGINE_SUCCESS) {
1181             transitionState(STREAM_READING);
1182         } else {
1183             transitionState(STREAM_DEAD);
1184         }
1185         pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1186         lh.unlock();
1187         bool inverse = false;
1188         if (itemsReady.compare_exchange_strong(inverse, true)) {
1189             consumer->notifyStreamReady(vb_);
1190         }
1191     }
1192 }
1193
1194 void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1195                                     uint32_t new_opaque,
1196                                     uint64_t start_seqno) {
1197     vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1198     vb->getCurrentSnapshot(snap_start_seqno_, snap_end_seqno_);
1199
1200     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to reconnect stream "
1201         "with opaque %ld, start seq no %llu, end seq no %llu, snap start seqno "
1202         "%llu, and snap end seqno %llu", consumer->logHeader(), vb_, new_opaque,
1203         start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
1204
1205     LockHolder lh(streamMutex);
1206     last_seqno.store(start_seqno);
1207     pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1208                                   end_seqno_, vb_uuid_, snap_start_seqno_,
1209                                   snap_end_seqno_));
1210     lh.unlock();
1211     bool inverse = false;
1212     if (itemsReady.compare_exchange_strong(inverse, true)) {
1213         consumer->notifyStreamReady(vb_);
1214     }
1215 }
1216
1217 ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1218     LockHolder lh(buffer.bufMutex);
1219     cb_assert(resp);
1220
1221     if (state_ == STREAM_DEAD) {
1222         delete resp;
1223         return ENGINE_KEY_ENOENT;
1224     }
1225
1226     switch (resp->getEvent()) {
1227         case DCP_MUTATION:
1228         case DCP_DELETION:
1229         case DCP_EXPIRATION:
1230         {
1231             MutationResponse* m = static_cast<MutationResponse*>(resp);
1232             uint64_t bySeqno = m->getBySeqno();
1233             if (bySeqno <= last_seqno.load()) {
1234                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous (out of "
1235                     "sequence) mutation received, with opaque: %ld, its "
1236                     "seqno (%llu) is not greater than last received seqno "
1237                     "(%llu); Dropping mutation!", consumer->logHeader(),
1238                     vb_, opaque_, bySeqno, last_seqno.load());
1239                 delete m;
1240                 return ENGINE_ERANGE;
1241             }
1242             last_seqno.store(bySeqno);
1243             break;
1244         }
1245         case DCP_SNAPSHOT_MARKER:
1246         {
1247             SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1248             uint64_t snapStart = s->getStartSeqno();
1249             uint64_t snapEnd = s->getEndSeqno();
1250             if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
1251                 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous snapshot "
1252                     "marker received, with opaque: %ld, its start (%llu), and"
1253                     "end (%llu) are less than last received seqno (%llu); "
1254                     "Dropping marker!", consumer->logHeader(), vb_, opaque_,
1255                     snapStart, snapEnd, last_seqno.load());
1256                 delete s;
1257                 return ENGINE_ERANGE;
1258             }
1259             break;
1260         }
1261         case DCP_SET_VBUCKET:
1262         case DCP_STREAM_END:
1263         {
1264             /* No validations necessary */
1265             break;
1266         }
1267         default:
1268         {
1269             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unknown DCP op received: %d;"
1270                 " Disconnecting connection..",
1271                 consumer->logHeader(), vb_, resp->getEvent());
1272             return ENGINE_DISCONNECT;
1273         }
1274     }
1275
1276     buffer.messages.push(resp);
1277     buffer.items++;
1278     buffer.bytes += resp->getMessageSize();
1279
1280     return ENGINE_SUCCESS;
1281 }
1282
1283 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes) {
1284     LockHolder lh(buffer.bufMutex);
1285     uint32_t count = 0;
1286     uint32_t message_bytes = 0;
1287     uint32_t total_bytes_processed = 0;
1288     bool failed = false;
1289
1290     if (buffer.messages.empty()) {
1291         return all_processed;
1292     }
1293
1294     while (count < PassiveStream::batchSize && !buffer.messages.empty()) {
1295         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1296         DcpResponse *response = buffer.messages.front();
1297         message_bytes = response->getMessageSize();
1298
1299         switch (response->getEvent()) {
1300             case DCP_MUTATION:
1301                 ret = processMutation(static_cast<MutationResponse*>(response));
1302                 break;
1303             case DCP_DELETION:
1304             case DCP_EXPIRATION:
1305                 ret = processDeletion(static_cast<MutationResponse*>(response));
1306                 break;
1307             case DCP_SNAPSHOT_MARKER:
1308                 processMarker(static_cast<SnapshotMarker*>(response));
1309                 break;
1310             case DCP_SET_VBUCKET:
1311                 processSetVBucketState(static_cast<SetVBucketState*>(response));
1312                 break;
1313             case DCP_STREAM_END:
1314                 transitionState(STREAM_DEAD);
1315                 break;
1316             default:
1317                 abort();
1318         }
1319
1320         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1321             failed = true;
1322             break;
1323         }
1324
1325         delete response;
1326         buffer.messages.pop();
1327         buffer.items--;
1328         buffer.bytes -= message_bytes;
1329         count++;
1330         if (ret != ENGINE_ERANGE) {
1331             total_bytes_processed += message_bytes;
1332         }
1333     }
1334
1335     processed_bytes = total_bytes_processed;
1336
1337     if (failed) {
1338         return cannot_process;
1339     }
1340
1341     return all_processed;
1342 }
1343
1344 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1345     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1346     if (!vb) {
1347         return ENGINE_NOT_MY_VBUCKET;
1348     }
1349
1350     if (mutation->getBySeqno() > cur_snapshot_end.load()) {
1351         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous mutation [sequence "
1352             "number (%llu) greater than current snapshot end seqno (%llu)] "
1353             "being processed; Dropping the mutation!", consumer->logHeader(),
1354             vb_, mutation->getBySeqno(), cur_snapshot_end.load());
1355         return ENGINE_ERANGE;
1356     }
1357
1358     ENGINE_ERROR_CODE ret;
1359     if (saveSnapshot) {
1360         LockHolder lh = vb->getSnapshotLock();
1361         ret = commitMutation(mutation, vb->isBackfillPhase());
1362         vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start.load(),
1363                                         cur_snapshot_end.load());
1364         saveSnapshot = false;
1365         lh.unlock();
1366     } else {
1367         ret = commitMutation(mutation, vb->isBackfillPhase());
1368     }
1369
1370     // We should probably handle these error codes in a better way, but since
1371     // the producer side doesn't do anything with them anyways let's just log
1372     // them for now until we come up with a better solution.
1373     if (ret != ENGINE_SUCCESS) {
1374         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1375             "process  mutation", consumer->logHeader(), ret);
1376     } else {
1377         handleSnapshotEnd(vb, mutation->getBySeqno());
1378     }
1379
1380     return ret;
1381 }
1382
1383 ENGINE_ERROR_CODE PassiveStream::commitMutation(MutationResponse* mutation,
1384                                                 bool backfillPhase) {
1385     if (backfillPhase) {
1386         return engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1387                                                         INITIAL_NRU_VALUE,
1388                                                         false);
1389     } else {
1390         return engine->getEpStore()->setWithMeta(*mutation->getItem(), 0,
1391                                                  consumer->getCookie(), true,
1392                                                  true, INITIAL_NRU_VALUE, false,
1393                                                  true);
1394     }
1395 }
1396
1397 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1398     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1399     if (!vb) {
1400         return ENGINE_NOT_MY_VBUCKET;
1401     }
1402
1403     if (deletion->getBySeqno() > cur_snapshot_end.load()) {
1404         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous deletion [sequence "
1405             "number (%llu) greater than current snapshot end seqno (%llu)] "
1406             "being processed; Dropping the deletion!", consumer->logHeader(),
1407             vb_, deletion->getBySeqno(), cur_snapshot_end.load());
1408         return ENGINE_ERANGE;
1409     }
1410
1411     ENGINE_ERROR_CODE ret;
1412     if (saveSnapshot) {
1413         LockHolder lh = vb->getSnapshotLock();
1414         ret = commitDeletion(deletion, vb->isBackfillPhase());
1415         vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start.load(),
1416                                         cur_snapshot_end.load());
1417         saveSnapshot = false;
1418         lh.unlock();
1419     } else {
1420         ret = commitDeletion(deletion, vb->isBackfillPhase());
1421     }
1422
1423     if (ret == ENGINE_KEY_ENOENT) {
1424         ret = ENGINE_SUCCESS;
1425     }
1426
1427     // We should probably handle these error codes in a better way, but since
1428     // the producer side doesn't do anything with them anyways let's just log
1429     // them for now until we come up with a better solution.
1430     if (ret != ENGINE_SUCCESS) {
1431         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1432             "process  deletion", consumer->logHeader(), ret);
1433     } else {
1434         handleSnapshotEnd(vb, deletion->getBySeqno());
1435     }
1436
1437     return ret;
1438 }
1439
1440 ENGINE_ERROR_CODE PassiveStream::commitDeletion(MutationResponse* deletion,
1441                                                 bool backfillPhase) {
1442     uint64_t delCas = 0;
1443     ItemMetaData meta = deletion->getItem()->getMetaData();
1444     return engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1445                                                 &delCas, deletion->getVBucket(),
1446                                                 consumer->getCookie(), true,
1447                                                 &meta, backfillPhase, false,
1448                                                 deletion->getBySeqno(), true);
1449 }
1450
1451 void PassiveStream::processMarker(SnapshotMarker* marker) {
1452     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1453
1454     cur_snapshot_start.store(marker->getStartSeqno());
1455     cur_snapshot_end.store(marker->getEndSeqno());
1456     cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
1457     saveSnapshot = true;
1458
1459     if (vb) {
1460         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1461             vb->setBackfillPhase(true);
1462             /* When replica vb is in backfill phase, then open checkpoint id
1463                is set to 0. */
1464             vb->checkpointManager.setOpenCheckpointId(0);
1465         } else {
1466             if (marker->getFlags() & MARKER_FLAG_CHK ||
1467                 vb->checkpointManager.getOpenCheckpointId() == 0) {
1468                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1469                 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1470             }
1471             vb->setBackfillPhase(false);
1472         }
1473
1474         if (marker->getFlags() & MARKER_FLAG_ACK) {
1475             cur_snapshot_ack = true;
1476         }
1477     }
1478 }
1479
1480 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1481     engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1482
1483     LockHolder lh (streamMutex);
1484     pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1485     lh.unlock();
1486     bool inverse = false;
1487     if (itemsReady.compare_exchange_strong(inverse, true)) {
1488         consumer->notifyStreamReady(vb_);
1489     }
1490 }
1491
1492 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1493     if (byseqno == cur_snapshot_end.load()) {
1494         if (cur_snapshot_type.load() == disk && vb->isBackfillPhase()) {
1495             vb->setBackfillPhase(false);
1496             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1497             vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1498         } else {
1499             double maxSize = static_cast<double>(engine->getEpStats().getMaxDataSize());
1500             double mem_threshold = StoredValue::getMutationMemThreshold();
1501             double mem_used = static_cast<double>(engine->getEpStats().getTotalMemoryUsed());
1502             if (maxSize * mem_threshold < mem_used) {
1503                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1504                 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1505             }
1506         }
1507
1508         if (cur_snapshot_ack) {
1509             LockHolder lh(streamMutex);
1510             pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1511             lh.unlock();
1512             bool inverse = false;
1513             if (itemsReady.compare_exchange_strong(inverse, true)) {
1514                 consumer->notifyStreamReady(vb_);
1515             }
1516             cur_snapshot_ack = false;
1517         }
1518         cur_snapshot_type.store(none);
1519         vb->setCurrentSnapshot(byseqno, byseqno);
1520     }
1521 }
1522
1523 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1524     Stream::addStats(add_stat, c);
1525
1526     const int bsize = 128;
1527     char buf[bsize];
1528     size_t buffer_bytes;
1529     size_t buffer_items;
1530     {
1531         LockHolder lh(buffer.bufMutex);
1532         buffer_bytes = buffer.bytes;
1533         buffer_items = buffer.items;
1534     }
1535     snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
1536     add_casted_stat(buf, buffer_items, add_stat, c);
1537     snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
1538     add_casted_stat(buf, buffer_bytes, add_stat, c);
1539     snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
1540     add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1541     snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
1542     add_casted_stat(buf, last_seqno.load(), add_stat, c);
1543     snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
1544     add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1545
1546     snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
1547     add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type.load()), add_stat, c);
1548
1549     if (cur_snapshot_type.load() != none) {
1550         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
1551         add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
1552         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
1553         add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
1554     }
1555 }
1556
1557 DcpResponse* PassiveStream::next() {
1558     LockHolder lh(streamMutex);
1559
1560     if (readyQ.empty()) {
1561         itemsReady.store(false);
1562         return NULL;
1563     }
1564
1565     DcpResponse* response = readyQ.front();
1566     popFromReadyQ();
1567     return response;
1568 }
1569
1570 uint32_t PassiveStream::clearBuffer() {
1571     LockHolder lh(buffer.bufMutex);
1572     uint32_t unackedBytes = buffer.bytes;
1573
1574     while (!buffer.messages.empty()) {
1575         DcpResponse* resp = buffer.messages.front();
1576         buffer.messages.pop();
1577         delete resp;
1578     }
1579
1580     buffer.bytes = 0;
1581     buffer.items = 0;
1582     return unackedBytes;
1583 }
1584
1585 void PassiveStream::transitionState(stream_state_t newState) {
1586     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1587         consumer->logHeader(), vb_, stateName(state_), stateName(newState));
1588
1589     if (state_ == newState) {
1590         return;
1591     }
1592
1593     switch (state_.load()) {
1594         case STREAM_PENDING:
1595             cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
1596             break;
1597         case STREAM_READING:
1598             cb_assert(newState == STREAM_PENDING || newState == STREAM_DEAD);
1599             break;
1600         default:
1601             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1602                 "to %s", consumer->logHeader(), vb_, stateName(state_),
1603                 stateName(newState));
1604             abort();
1605     }
1606
1607     state_ = newState;
1608 }
1609
1610 const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
1611 {
1612     switch (status) {
1613         case END_STREAM_CLOSED:
1614             return "The stream closed due to a close stream message";
1615         case END_STREAM_DISCONNECTED:
1616             return "The stream closed early because the conn was disconnected";
1617         default:
1618             break;
1619     }
1620     return "Status unknown; this should not happen";
1621 }