1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #include "ep_engine.h"
21 #include "failover-table.h"
23 #include "statwriter.h"
24 #include "dcp-stream.h"
25 #include "dcp-consumer.h"
26 #include "dcp-producer.h"
27 #include "dcp-response.h"
29 #define DCP_BACKFILL_SLEEP_TIME 2
31 static const char* snapshotTypeToString(snapshot_type_t type) {
32 static const char * const snapshotTypes[] = { "none", "disk", "memory" };
33 cb_assert(type >= none && type <= memory);
34 return snapshotTypes[type];
37 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
38 const size_t PassiveStream::batchSize = 10;
40 class SnapshotMarkerCallback : public Callback<SeqnoRange> {
42 SnapshotMarkerCallback(stream_t s)
44 cb_assert(s->getType() == STREAM_ACTIVE);
47 void callback(SeqnoRange &range) {
48 uint64_t st = range.getStartSeqno();
49 uint64_t en = range.getEndSeqno();
50 static_cast<ActiveStream*>(stream.get())->markDiskSnapshot(st, en);
57 class CacheCallback : public Callback<CacheLookup> {
59 CacheCallback(EventuallyPersistentEngine* e, stream_t &s)
60 : engine_(e), stream_(s) {
61 Stream *str = stream_.get();
63 cb_assert(str->getType() == STREAM_ACTIVE);
67 void callback(CacheLookup &lookup);
70 EventuallyPersistentEngine* engine_;
74 void CacheCallback::callback(CacheLookup &lookup) {
75 RCPtr<VBucket> vb = engine_->getEpStore()->getVBucket(lookup.getVBucketId());
77 setStatus(ENGINE_SUCCESS);
82 LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
83 StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num, false, false);
84 if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
85 Item* it = v->toItem(false, lookup.getVBucketId());
87 static_cast<ActiveStream*>(stream_.get())->backfillReceived(it);
88 setStatus(ENGINE_KEY_EEXISTS);
90 setStatus(ENGINE_SUCCESS);
94 class DiskCallback : public Callback<GetValue> {
96 DiskCallback(stream_t &s)
98 Stream *str = stream_.get();
100 cb_assert(str->getType() == STREAM_ACTIVE);
104 void callback(GetValue &val) {
105 cb_assert(val.getValue());
106 ActiveStream* active_stream = static_cast<ActiveStream*>(stream_.get());
107 active_stream->backfillReceived(val.getValue());
114 class DCPBackfill : public GlobalTask {
116 DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
117 uint64_t start_seqno, uint64_t end_seqno, const Priority &p,
118 double sleeptime = 0, bool shutdown = false)
119 : GlobalTask(e, p, sleeptime, shutdown), engine(e), stream(s),
120 startSeqno(start_seqno), endSeqno(end_seqno) {
121 cb_assert(stream->getType() == STREAM_ACTIVE);
126 std::string getDescription();
129 EventuallyPersistentEngine *engine;
135 bool DCPBackfill::run() {
136 uint16_t vbid = stream->getVBucket();
138 if (engine->getEpStore()->isMemoryUsageTooHigh()) {
139 LOG(EXTENSION_LOG_WARNING, "VBucket %d dcp backfill task temporarily "
140 "suspended because the current memory usage is too high",
142 snooze(DCP_BACKFILL_SLEEP_TIME);
146 uint64_t lastPersistedSeqno =
147 engine->getEpStore()->getLastPersistedSeqno(vbid);
149 engine->getEpStore()->getRWUnderlying(vbid)->getLastPersistedSeqno(vbid);
151 if (lastPersistedSeqno < endSeqno) {
152 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Rescheduling backfill "
153 "because backfill up to seqno %llu is needed but only up to "
154 "%llu is persisted (disk %llu)",
155 static_cast<ActiveStream*>(stream.get())->logHeader(), vbid,
156 endSeqno, lastPersistedSeqno, diskSeqno);
157 snooze(DCP_BACKFILL_SLEEP_TIME);
161 KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
162 size_t numItems = kvstore->getNumItems(vbid, startSeqno,
163 std::numeric_limits<uint64_t>::max());
164 static_cast<ActiveStream*>(stream.get())->incrBackfillRemaining(numItems);
166 shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
167 shared_ptr<Callback<CacheLookup> > cl(new CacheCallback(engine, stream));
168 shared_ptr<Callback<SeqnoRange> > sr(new SnapshotMarkerCallback(stream));
169 kvstore->dump(vbid, startSeqno, cb, cl, sr);
171 static_cast<ActiveStream*>(stream.get())->completeBackfill();
173 LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") "
174 "Backfill task (%" PRIu64 " to %" PRIu64 ") "
175 "finished. disk seqno %" PRIu64 " memory seqno %" PRIu64 "",
176 static_cast<ActiveStream*>(stream.get())->logHeader(), vbid,
177 startSeqno, endSeqno, diskSeqno, lastPersistedSeqno);
182 std::string DCPBackfill::getDescription() {
183 std::stringstream ss;
184 ss << "DCP backfill for vbucket " << stream->getVBucket();
188 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
189 uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
190 uint64_t vb_uuid, uint64_t snap_start_seqno,
191 uint64_t snap_end_seqno)
192 : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
193 start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
194 snap_start_seqno_(snap_start_seqno),
195 snap_end_seqno_(snap_end_seqno),
196 state_(STREAM_PENDING), itemsReady(false), readyQueueMemory(0) {
199 void Stream::clear_UNLOCKED() {
200 while (!readyQ.empty()) {
201 DcpResponse* resp = readyQ.front();
207 void Stream::pushToReadyQ(DcpResponse* resp)
211 readyQueueMemory.fetch_add(resp->getMessageSize(),
212 memory_order_relaxed);
216 void Stream::popFromReadyQ(void)
218 if (!readyQ.empty()) {
219 uint32_t respSize = readyQ.front()->getMessageSize();
221 /* Decrement the readyQ size */
222 if (respSize <= readyQueueMemory.load(memory_order_relaxed)) {
223 readyQueueMemory.fetch_sub(respSize, memory_order_relaxed);
225 LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
226 "underflow, likely wrong stat calculation! curr size: %llu;"
227 "new size: %d", name_.c_str(), getVBucket(), readyQueueMemory.load(),
229 readyQueueMemory.store(0, memory_order_relaxed);
234 uint64_t Stream::getReadyQueueMemory() {
235 return readyQueueMemory.load(memory_order_relaxed);
238 const char * Stream::stateName(stream_state_t st) const {
239 static const char * const stateNames[] = {
240 "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
243 cb_assert(st >= STREAM_PENDING && st <= STREAM_DEAD);
244 return stateNames[st];
247 void Stream::addStats(ADD_STAT add_stat, const void *c) {
248 const int bsize = 128;
250 snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_);
251 add_casted_stat(buffer, flags_, add_stat, c);
252 snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_);
253 add_casted_stat(buffer, opaque_, add_stat, c);
254 snprintf(buffer, bsize, "%s:stream_%d_start_seqno", name_.c_str(), vb_);
255 add_casted_stat(buffer, start_seqno_, add_stat, c);
256 snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(), vb_);
257 add_casted_stat(buffer, end_seqno_, add_stat, c);
258 snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(), vb_);
259 add_casted_stat(buffer, vb_uuid_, add_stat, c);
260 snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno", name_.c_str(), vb_);
261 add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
262 snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno", name_.c_str(), vb_);
263 add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
264 snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_);
265 add_casted_stat(buffer, stateName(state_), add_stat, c);
268 ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
269 const std::string &n, uint32_t flags,
270 uint32_t opaque, uint16_t vb, uint64_t st_seqno,
271 uint64_t en_seqno, uint64_t vb_uuid,
272 uint64_t snap_start_seqno, uint64_t snap_end_seqno)
273 : Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
274 snap_start_seqno, snap_end_seqno),
275 lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
276 takeoverState(vbucket_state_pending), backfillRemaining(0),
277 itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
278 waitForSnapshot(0), engine(e), producer(p),
279 isBackfillTaskRunning(false), lastSentSnapEndSeqno(0),
280 chkptItemsExtractionInProgress(false) {
282 const char* type = "";
283 if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
285 end_seqno_ = dcpMaxSeqno;
288 RCPtr<VBucket> vbucket = engine->getVBucket(vb);
290 // An atomic read of vbucket state without acquiring the
291 // reader lock for state should suffice here.
292 if (vbucket->getState() == vbucket_state_replica) {
293 uint64_t snapshot_start, snapshot_end;
294 vbucket->getCurrentSnapshot(snapshot_start, snapshot_end);
295 if (snapshot_end > en_seqno) {
296 end_seqno_ = snapshot_end;
301 if (start_seqno_ >= end_seqno_) {
302 endStream(END_STREAM_OK);
303 itemsReady.store(true);
306 type_ = STREAM_ACTIVE;
308 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) %sstream created with start seqno "
309 "%llu and end seqno %llu", producer->logHeader(), vb, type, st_seqno,
313 DcpResponse* ActiveStream::next() {
314 LockHolder lh(streamMutex);
316 stream_state_t initState = state_;
318 DcpResponse* response = NULL;
323 case STREAM_BACKFILLING:
324 response = backfillPhase();
326 case STREAM_IN_MEMORY:
327 response = inMemoryPhase();
329 case STREAM_TAKEOVER_SEND:
330 response = takeoverSendPhase();
332 case STREAM_TAKEOVER_WAIT:
333 response = takeoverWaitPhase();
336 response = deadPhase();
339 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
340 producer->logHeader(), vb_, stateName(state_));
344 stream_state_t newState = state_;
346 if (newState != STREAM_DEAD && newState != state_ && !response) {
351 itemsReady.store(response ? true : false);
355 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
356 LockHolder lh(streamMutex);
357 uint64_t chkCursorSeqno = endSeqno;
359 if (state_ != STREAM_BACKFILLING) {
363 startSeqno = std::min(snap_start_seqno_, startSeqno);
364 firstMarkerSent = true;
366 RCPtr<VBucket> vb = engine->getVBucket(vb_);
367 // An atomic read of vbucket state without acquiring the
368 // reader lock for state should suffice here.
369 if (vb && vb->getState() == vbucket_state_replica) {
370 if (end_seqno_ > endSeqno) {
371 /* We possibly have items in the open checkpoint
372 (incomplete snapshot) */
373 LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Merging backfill"
374 " and memory snapshot for a replica vbucket, start seqno "
375 "%" PRIu64 " and end seqno %" PRIu64 "",
376 producer->logHeader(), vb_, startSeqno, endSeqno);
377 endSeqno = end_seqno_;
381 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
382 "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
384 pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
386 lastSentSnapEndSeqno = endSeqno;
389 endStream(END_STREAM_STATE);
391 if (endSeqno > end_seqno_) {
392 chkCursorSeqno = end_seqno_;
394 // Only re-register the cursor if we still need to get memory snapshots
395 CursorRegResult result =
396 vb->checkpointManager.registerTAPCursorBySeqno(name_,
398 curChkSeqno = result.first;
402 bool inverse = false;
403 if (itemsReady.compare_exchange_strong(inverse, true)) {
404 producer->notifyStreamReady(vb_, false);
408 void ActiveStream::backfillReceived(Item* itm) {
409 LockHolder lh(streamMutex);
410 if (state_ == STREAM_BACKFILLING) {
411 pushToReadyQ(new MutationResponse(itm, opaque_));
412 lastReadSeqno = itm->getBySeqno();
414 bool inverse = false;
415 if (itemsReady.compare_exchange_strong(inverse, true)) {
416 producer->notifyStreamReady(vb_, false);
423 void ActiveStream::completeBackfill() {
425 LockHolder lh(streamMutex);
426 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
427 " from disk, last seqno read: %ld", producer->logHeader(), vb_,
428 itemsFromBackfill, lastReadSeqno.load());
431 isBackfillTaskRunning.store(false);
432 bool inverse = false;
433 if (itemsReady.compare_exchange_strong(inverse, true)) {
434 producer->notifyStreamReady(vb_, false);
438 void ActiveStream::snapshotMarkerAckReceived() {
439 bool inverse = false;
440 if (--waitForSnapshot == 0 &&
441 itemsReady.compare_exchange_strong(inverse, true)) {
442 producer->notifyStreamReady(vb_, true);
446 void ActiveStream::setVBucketStateAckRecieved() {
447 LockHolder lh(streamMutex);
448 if (state_ == STREAM_TAKEOVER_WAIT) {
449 if (takeoverState == vbucket_state_pending) {
450 LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
451 "vbucket state to pending message", producer->logHeader(), vb_);
453 takeoverState = vbucket_state_active;
454 transitionState(STREAM_TAKEOVER_SEND);
457 engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
459 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
460 LOG(EXTENSION_LOG_WARNING, "%s (vb %" PRIu16 ") Vbucket marked as "
461 "dead, last sent seqno: %" PRIu64 ", high seqno: %" PRIu64 "",
462 producer->logHeader(), vb_, lastSentSeqno.load(),
463 vbucket->getHighSeqno());
465 LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") Receive ack for set "
466 "vbucket state to active message", producer->logHeader(), vb_);
467 endStream(END_STREAM_OK);
471 bool inverse = false;
472 if (itemsReady.compare_exchange_strong(inverse, true)) {
473 producer->notifyStreamReady(vb_, true);
476 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
477 "op on stream '%s' state '%s'", producer->logHeader(), vb_,
478 name_.c_str(), stateName(state_));
483 DcpResponse* ActiveStream::backfillPhase() {
484 DcpResponse* resp = nextQueuedItem();
486 if (resp && backfillRemaining > 0 &&
487 (resp->getEvent() == DCP_MUTATION ||
488 resp->getEvent() == DCP_DELETION ||
489 resp->getEvent() == DCP_EXPIRATION)) {
493 if (!isBackfillTaskRunning && readyQ.empty()) {
494 backfillRemaining.store(0, memory_order_relaxed);
495 if (lastReadSeqno.load() >= end_seqno_) {
496 endStream(END_STREAM_OK);
497 } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
498 transitionState(STREAM_TAKEOVER_SEND);
499 } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
500 endStream(END_STREAM_OK);
502 transitionState(STREAM_IN_MEMORY);
506 resp = nextQueuedItem();
513 DcpResponse* ActiveStream::inMemoryPhase() {
514 if (lastSentSeqno.load() >= end_seqno_) {
515 endStream(END_STREAM_OK);
516 } else if (readyQ.empty()) {
517 if (nextCheckpointItem()) {
522 return nextQueuedItem();
525 DcpResponse* ActiveStream::takeoverSendPhase() {
526 if (!readyQ.empty()) {
527 return nextQueuedItem();
529 if (nextCheckpointItem()) {
534 if (waitForSnapshot != 0) {
537 DcpResponse* resp = NULL;
538 if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
539 resp = new SetVBucketState(opaque_, vb_, takeoverState);
540 transitionState(STREAM_TAKEOVER_WAIT);
545 DcpResponse* ActiveStream::takeoverWaitPhase() {
546 return nextQueuedItem();
549 DcpResponse* ActiveStream::deadPhase() {
550 DcpResponse* resp = nextQueuedItem();
552 LOG(EXTENSION_LOG_WARNING,
553 "(vb %" PRIu16 ") Stream closed, "
554 "%" PRIu64 " items sent from backfill phase, "
555 "%" PRIu64 " items sent from memory phase, "
556 "%" PRIu64 " was last seqno sent",
558 uint64_t(itemsFromBackfill),
559 uint64_t(itemsFromMemory),
560 lastSentSeqno.load());
565 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
566 Stream::addStats(add_stat, c);
568 const int bsize = 128;
570 snprintf(buffer, bsize, "%s:stream_%d_backfilled", name_.c_str(), vb_);
571 add_casted_stat(buffer, itemsFromBackfill, add_stat, c);
572 snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
573 add_casted_stat(buffer, itemsFromMemory, add_stat, c);
574 snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
575 add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
576 snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
577 add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
578 snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
579 add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
580 snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
581 add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat, c);
584 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
585 LockHolder lh(streamMutex);
587 RCPtr<VBucket> vb = engine->getVBucket(vb_);
588 add_casted_stat("name", name_, add_stat, cookie);
589 if (!vb || state_ == STREAM_DEAD) {
590 add_casted_stat("status", "completed", add_stat, cookie);
591 add_casted_stat("estimate", 0, add_stat, cookie);
592 add_casted_stat("backfillRemaining", 0, add_stat, cookie);
593 add_casted_stat("estimate", 0, add_stat, cookie);
597 size_t total = backfillRemaining;
598 if (state_ == STREAM_BACKFILLING) {
599 add_casted_stat("status", "backfilling", add_stat, cookie);
601 add_casted_stat("status", "in-memory", add_stat, cookie);
603 add_casted_stat("backfillRemaining", backfillRemaining, add_stat, cookie);
605 item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
606 size_t vb_items = vb->getNumItems(iep);
607 size_t chk_items = vb_items > 0 ?
608 vb->checkpointManager.getNumItemsForTAPConnection(name_) : 0;
609 size_t del_items = engine->getEpStore()->getRWUnderlying(vb_)->
610 getNumPersistedDeletes(vb_);
612 if (end_seqno_ < curChkSeqno) {
614 } else if ((end_seqno_ - curChkSeqno) < chk_items) {
615 chk_items = end_seqno_ - curChkSeqno + 1;
619 add_casted_stat("estimate", total, add_stat, cookie);
620 add_casted_stat("chk_items", chk_items, add_stat, cookie);
621 add_casted_stat("vb_items", vb_items, add_stat, cookie);
622 add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
625 DcpResponse* ActiveStream::nextQueuedItem() {
626 if (!readyQ.empty()) {
627 DcpResponse* response = readyQ.front();
628 if (producer->bufferLogInsert(response->getMessageSize())) {
629 if (response->getEvent() == DCP_MUTATION ||
630 response->getEvent() == DCP_DELETION ||
631 response->getEvent() == DCP_EXPIRATION) {
632 lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
634 if (state_ == STREAM_BACKFILLING) {
647 bool ActiveStream::nextCheckpointItem() {
648 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
649 if (vbucket && vbucket->checkpointManager.getNumItemsForTAPConnection(name_) > 0) {
650 // schedule this stream to build the next checkpoint
651 producer->scheduleCheckpointProcessorTask(this);
653 } else if (chkptItemsExtractionInProgress) {
659 bool ActiveStreamCheckpointProcessorTask::run() {
660 if (engine->getEpStats().isShutdown) {
664 // Setup that we will sleep forever when done.
667 // Clear the notfification flag
668 notified.store(false);
670 size_t iterations = 0;
672 stream_t nextStream = queuePop();
673 ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
676 stream->nextCheckpointItemTask();
681 } while(!queueEmpty()
682 && iterations < iterationsBeforeYield);
684 // Now check if we were re-notified or there are still checkpoints
685 bool expected = true;
686 if (notified.compare_exchange_strong(expected, false)
688 // snooze for 0, essentially yielding and allowing other tasks a go
695 void ActiveStreamCheckpointProcessorTask::wakeup() {
696 ExecutorPool::get()->wake(getId());
699 void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
702 bool expected = false;
703 if (notified.compare_exchange_strong(expected, true)) {
708 void ActiveStreamCheckpointProcessorTask::clearQueues() {
709 LockHolder lh(workQueueLock);
710 while (!queue.empty()) {
713 queuedVbuckets.clear();
716 void ActiveStream::nextCheckpointItemTask() {
717 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
719 std::deque<queued_item> items;
720 getOutstandingItems(vbucket, items);
723 /* The entity deleting the vbucket must set stream to dead,
724 calling setDead(END_STREAM_STATE) will cause deadlock because
725 it will try to grab streamMutex which is already acquired at this
731 void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
732 std::deque<queued_item> &items) {
733 // Commencing item processing - set guard flag.
734 chkptItemsExtractionInProgress.store(true);
736 vb->checkpointManager.getAllItemsForCursor(name_, items);
737 if (vb->checkpointManager.getNumCheckpoints() > 1) {
738 engine->getEpStore()->wakeUpCheckpointRemover();
743 void ActiveStream::processItems(std::deque<queued_item>& items) {
744 if (!items.empty()) {
746 if (items.front()->getOperation() == queue_op_checkpoint_start) {
750 std::deque<MutationResponse*> mutations;
751 std::deque<queued_item>::iterator itemItr;
752 for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
753 queued_item& qi = *itemItr;
755 if (qi->getOperation() == queue_op_set ||
756 qi->getOperation() == queue_op_del) {
757 curChkSeqno = qi->getBySeqno();
758 lastReadSeqno = qi->getBySeqno();
760 mutations.push_back(new MutationResponse(qi, opaque_));
761 } else if (qi->getOperation() == queue_op_checkpoint_start) {
762 snapshot(mutations, mark);
767 if (mutations.empty()) {
768 // If we only got checkpoint start or ends check to see if there are
769 // any more snapshots before pausing the stream.
770 nextCheckpointItemTask();
772 snapshot(mutations, mark);
776 // Completed item processing - clear guard flag and notify producer.
777 chkptItemsExtractionInProgress.store(false);
778 producer->notifyStreamReady(vb_, true);
781 void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
786 LockHolder lh(streamMutex);
788 if (isCurrentSnapshotCompleted()) {
789 uint32_t flags = MARKER_FLAG_MEMORY;
790 uint64_t snapStart = items.front()->getBySeqno();
791 uint64_t snapEnd = items.back()->getBySeqno();
794 flags |= MARKER_FLAG_CHK;
797 if (state_ == STREAM_TAKEOVER_SEND) {
799 flags |= MARKER_FLAG_ACK;
802 if (!firstMarkerSent) {
803 snapStart = std::min(snap_start_seqno_, snapStart);
804 firstMarkerSent = true;
806 pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
808 lastSentSnapEndSeqno = snapEnd;
811 std::deque<MutationResponse*>::iterator itemItr;
812 for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
813 pushToReadyQ(*itemItr);
817 uint32_t ActiveStream::setDead(end_stream_status_t status) {
819 LockHolder lh(streamMutex);
823 bool inverse = false;
824 if (status != END_STREAM_DISCONNECTED &&
825 itemsReady.compare_exchange_strong(inverse, true)) {
826 producer->notifyStreamReady(vb_, true);
831 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
832 if (state_ != STREAM_DEAD) {
833 bool inverse = false;
834 if (itemsReady.compare_exchange_strong(inverse, true)) {
835 producer->notifyStreamReady(vb_, true);
840 void ActiveStream::endStream(end_stream_status_t reason) {
841 if (state_ != STREAM_DEAD) {
842 if (reason != END_STREAM_DISCONNECTED) {
843 pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
845 transitionState(STREAM_DEAD);
846 LOG(EXTENSION_LOG_WARNING,
847 "(vb %" PRIu16 ") Stream closing, "
848 "sent until seqno %" PRIu64 " "
851 lastSentSeqno.load(),
852 getEndStreamStatusStr(reason));
856 void ActiveStream::scheduleBackfill() {
857 if (!isBackfillTaskRunning) {
858 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
863 CursorRegResult result =
864 vbucket->checkpointManager.registerTAPCursorBySeqno(name_,
867 curChkSeqno = result.first;
868 bool isFirstItem = result.second;
870 cb_assert(lastReadSeqno <= curChkSeqno);
871 uint64_t backfillStart = lastReadSeqno + 1;
873 /* We need to find the minimum seqno that needs to be backfilled in
874 * order to make sure that we don't miss anything when transitioning
875 * to a memory snapshot. The backfill task will always make sure that
876 * the backfill end seqno is contained in the backfill.
878 uint64_t backfillEnd = 0;
879 if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) { // disk backfill only
880 backfillEnd = end_seqno_;
881 } else { // disk backfill + in-memory streaming
882 if (backfillStart < curChkSeqno) {
883 if (curChkSeqno > end_seqno_) {
884 backfillEnd = end_seqno_;
886 backfillEnd = curChkSeqno - 1;
891 bool tryBackfill = isFirstItem || flags_ & DCP_ADD_STREAM_FLAG_DISKONLY;
893 if (backfillStart <= backfillEnd && tryBackfill) {
894 ExTask task = new DCPBackfill(engine, this, backfillStart, backfillEnd,
895 Priority::TapBgFetcherPriority, 0, false);
896 ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
897 isBackfillTaskRunning.store(true);
899 if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
900 endStream(END_STREAM_OK);
901 } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
902 transitionState(STREAM_TAKEOVER_SEND);
904 transitionState(STREAM_IN_MEMORY);
906 itemsReady.store(true);
911 const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
915 return "The stream ended due to all items being streamed";
916 case END_STREAM_CLOSED:
917 return "The stream closed early due to a close stream message";
918 case END_STREAM_STATE:
919 return "The stream closed early because the vbucket state changed";
920 case END_STREAM_DISCONNECTED:
921 return "The stream closed early because the conn was disconnected";
925 return "Status unknown; this should not happen";
928 void ActiveStream::transitionState(stream_state_t newState) {
929 LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
930 producer->logHeader(), vb_, stateName(state_), stateName(newState));
932 if (state_ == newState) {
936 switch (state_.load()) {
938 cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
940 case STREAM_BACKFILLING:
941 cb_assert(newState == STREAM_IN_MEMORY ||
942 newState == STREAM_TAKEOVER_SEND ||
943 newState == STREAM_DEAD);
945 case STREAM_IN_MEMORY:
946 cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
948 case STREAM_TAKEOVER_SEND:
949 cb_assert(newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD);
951 case STREAM_TAKEOVER_WAIT:
952 cb_assert(newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD);
955 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
956 "to %s", producer->logHeader(), vb_, stateName(state_),
957 stateName(newState));
964 case STREAM_BACKFILLING:
967 case STREAM_IN_MEMORY:
968 // Check if the producer has sent up till the last requested
969 // sequence number already, if not - move checkpoint items into
971 if (lastSentSeqno >= end_seqno_) {
972 // Stream transitioning to DEAD state
973 endStream(END_STREAM_OK);
975 nextCheckpointItem();
978 case STREAM_TAKEOVER_SEND:
979 nextCheckpointItem();
983 RCPtr<VBucket> vb = engine->getVBucket(vb_);
985 vb->checkpointManager.removeTAPCursor(name_);
989 case STREAM_TAKEOVER_WAIT:
993 LOG(EXTENSION_LOG_WARNING,
994 "ActiveStream::transitionState: newState can't be "
1000 size_t ActiveStream::getItemsRemaining() {
1001 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1003 if (!vbucket || state_ == STREAM_DEAD) {
1007 uint64_t high_seqno = vbucket->getHighSeqno();
1009 if (end_seqno_ < high_seqno) {
1010 if (end_seqno_ > lastSentSeqno.load()) {
1011 return (end_seqno_ - lastSentSeqno.load());
1014 if (high_seqno > lastSentSeqno.load()) {
1015 return (high_seqno - lastSentSeqno.load());
1022 const char* ActiveStream::logHeader()
1024 return producer->logHeader();
1027 bool ActiveStream::isCurrentSnapshotCompleted() const
1029 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1030 // An atomic read of vbucket state without acquiring the
1031 // reader lock for state should suffice here.
1032 if (vbucket && vbucket->getState() == vbucket_state_replica) {
1033 if (lastSentSnapEndSeqno >= lastReadSeqno) {
1040 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
1041 const std::string &name, uint32_t flags,
1042 uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1043 uint64_t en_seqno, uint64_t vb_uuid,
1044 uint64_t snap_start_seqno,
1045 uint64_t snap_end_seqno)
1046 : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1047 snap_start_seqno, snap_end_seqno),
1049 LockHolder lh(streamMutex);
1050 RCPtr<VBucket> vbucket = e->getVBucket(vb_);
1051 if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1052 pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1053 transitionState(STREAM_DEAD);
1054 itemsReady.store(true);
1057 type_ = STREAM_NOTIFIER;
1059 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
1060 "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
1064 uint32_t NotifierStream::setDead(end_stream_status_t status) {
1065 LockHolder lh(streamMutex);
1066 if (state_ != STREAM_DEAD) {
1067 transitionState(STREAM_DEAD);
1068 if (status != END_STREAM_DISCONNECTED) {
1069 pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
1071 bool inverse = false;
1072 if (itemsReady.compare_exchange_strong(inverse, true)) {
1073 producer->notifyStreamReady(vb_, true);
1080 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1081 LockHolder lh(streamMutex);
1082 if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1083 pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1084 transitionState(STREAM_DEAD);
1086 bool inverse = false;
1087 if (itemsReady.compare_exchange_strong(inverse, true)) {
1088 producer->notifyStreamReady(vb_, true);
1093 DcpResponse* NotifierStream::next() {
1094 LockHolder lh(streamMutex);
1096 if (readyQ.empty()) {
1097 itemsReady.store(false);
1101 DcpResponse* response = readyQ.front();
1102 if (producer->bufferLogInsert(response->getMessageSize())) {
1111 void NotifierStream::transitionState(stream_state_t newState) {
1112 LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1113 producer->logHeader(), vb_, stateName(state_), stateName(newState));
1115 if (state_ == newState) {
1119 switch (state_.load()) {
1120 case STREAM_PENDING:
1121 cb_assert(newState == STREAM_DEAD);
1124 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1125 "to %s", producer->logHeader(), vb_, stateName(state_),
1126 stateName(newState));
1133 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1134 const std::string &name, uint32_t flags,
1135 uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1136 uint64_t en_seqno, uint64_t vb_uuid,
1137 uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1138 uint64_t vb_high_seqno)
1139 : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1140 snap_start_seqno, snap_end_seqno),
1141 engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1142 cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false),
1143 saveSnapshot(false) {
1144 LockHolder lh(streamMutex);
1145 pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1146 vb_uuid, snap_start_seqno, snap_end_seqno));
1147 itemsReady.store(true);
1148 type_ = STREAM_PASSIVE;
1150 const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1151 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to add %s stream with "
1152 "start seqno %llu, end seqno %llu, vbucket uuid %llu, snap start seqno "
1153 "%llu, snap end seqno %llu, and vb_high_seqno %llu",
1154 consumer->logHeader(), vb, type, st_seqno, en_seqno, vb_uuid,
1155 snap_start_seqno, snap_end_seqno, vb_high_seqno);
1158 PassiveStream::~PassiveStream() {
1159 LockHolder lh(streamMutex);
1161 cb_assert(state_ == STREAM_DEAD);
1162 cb_assert(buffer.bytes == 0);
1165 uint32_t PassiveStream::setDead(end_stream_status_t status) {
1166 LockHolder lh(streamMutex);
1167 transitionState(STREAM_DEAD);
1169 uint32_t unackedBytes = clearBuffer();
1170 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Setting stream to dead state,"
1171 " last_seqno is %llu, unackedBytes is %u, status is %s",
1172 consumer->logHeader(), vb_, last_seqno.load(), unackedBytes,
1173 getEndStreamStatusStr(status));
1174 return unackedBytes;
1177 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1178 LockHolder lh(streamMutex);
1179 if (state_ == STREAM_PENDING) {
1180 if (status == ENGINE_SUCCESS) {
1181 transitionState(STREAM_READING);
1183 transitionState(STREAM_DEAD);
1185 pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1187 bool inverse = false;
1188 if (itemsReady.compare_exchange_strong(inverse, true)) {
1189 consumer->notifyStreamReady(vb_);
1194 void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1195 uint32_t new_opaque,
1196 uint64_t start_seqno) {
1197 vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1198 vb->getCurrentSnapshot(snap_start_seqno_, snap_end_seqno_);
1200 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to reconnect stream "
1201 "with opaque %ld, start seq no %llu, end seq no %llu, snap start seqno "
1202 "%llu, and snap end seqno %llu", consumer->logHeader(), vb_, new_opaque,
1203 start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
1205 LockHolder lh(streamMutex);
1206 last_seqno.store(start_seqno);
1207 pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1208 end_seqno_, vb_uuid_, snap_start_seqno_,
1211 bool inverse = false;
1212 if (itemsReady.compare_exchange_strong(inverse, true)) {
1213 consumer->notifyStreamReady(vb_);
1217 ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1218 LockHolder lh(buffer.bufMutex);
1221 if (state_ == STREAM_DEAD) {
1223 return ENGINE_KEY_ENOENT;
1226 switch (resp->getEvent()) {
1229 case DCP_EXPIRATION:
1231 MutationResponse* m = static_cast<MutationResponse*>(resp);
1232 uint64_t bySeqno = m->getBySeqno();
1233 if (bySeqno <= last_seqno.load()) {
1234 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous (out of "
1235 "sequence) mutation received, with opaque: %ld, its "
1236 "seqno (%llu) is not greater than last received seqno "
1237 "(%llu); Dropping mutation!", consumer->logHeader(),
1238 vb_, opaque_, bySeqno, last_seqno.load());
1240 return ENGINE_ERANGE;
1242 last_seqno.store(bySeqno);
1245 case DCP_SNAPSHOT_MARKER:
1247 SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1248 uint64_t snapStart = s->getStartSeqno();
1249 uint64_t snapEnd = s->getEndSeqno();
1250 if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
1251 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous snapshot "
1252 "marker received, with opaque: %ld, its start (%llu), and"
1253 "end (%llu) are less than last received seqno (%llu); "
1254 "Dropping marker!", consumer->logHeader(), vb_, opaque_,
1255 snapStart, snapEnd, last_seqno.load());
1257 return ENGINE_ERANGE;
1261 case DCP_SET_VBUCKET:
1262 case DCP_STREAM_END:
1264 /* No validations necessary */
1269 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unknown DCP op received: %d;"
1270 " Disconnecting connection..",
1271 consumer->logHeader(), vb_, resp->getEvent());
1272 return ENGINE_DISCONNECT;
1276 buffer.messages.push(resp);
1278 buffer.bytes += resp->getMessageSize();
1280 return ENGINE_SUCCESS;
1283 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes) {
1284 LockHolder lh(buffer.bufMutex);
1286 uint32_t message_bytes = 0;
1287 uint32_t total_bytes_processed = 0;
1288 bool failed = false;
1290 if (buffer.messages.empty()) {
1291 return all_processed;
1294 while (count < PassiveStream::batchSize && !buffer.messages.empty()) {
1295 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1296 DcpResponse *response = buffer.messages.front();
1297 message_bytes = response->getMessageSize();
1299 switch (response->getEvent()) {
1301 ret = processMutation(static_cast<MutationResponse*>(response));
1304 case DCP_EXPIRATION:
1305 ret = processDeletion(static_cast<MutationResponse*>(response));
1307 case DCP_SNAPSHOT_MARKER:
1308 processMarker(static_cast<SnapshotMarker*>(response));
1310 case DCP_SET_VBUCKET:
1311 processSetVBucketState(static_cast<SetVBucketState*>(response));
1313 case DCP_STREAM_END:
1314 transitionState(STREAM_DEAD);
1320 if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1326 buffer.messages.pop();
1328 buffer.bytes -= message_bytes;
1330 if (ret != ENGINE_ERANGE) {
1331 total_bytes_processed += message_bytes;
1335 processed_bytes = total_bytes_processed;
1338 return cannot_process;
1341 return all_processed;
1344 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1345 RCPtr<VBucket> vb = engine->getVBucket(vb_);
1347 return ENGINE_NOT_MY_VBUCKET;
1350 if (mutation->getBySeqno() > cur_snapshot_end.load()) {
1351 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous mutation [sequence "
1352 "number (%llu) greater than current snapshot end seqno (%llu)] "
1353 "being processed; Dropping the mutation!", consumer->logHeader(),
1354 vb_, mutation->getBySeqno(), cur_snapshot_end.load());
1355 return ENGINE_ERANGE;
1358 ENGINE_ERROR_CODE ret;
1360 LockHolder lh = vb->getSnapshotLock();
1361 ret = commitMutation(mutation, vb->isBackfillPhase());
1362 vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start.load(),
1363 cur_snapshot_end.load());
1364 saveSnapshot = false;
1367 ret = commitMutation(mutation, vb->isBackfillPhase());
1370 // We should probably handle these error codes in a better way, but since
1371 // the producer side doesn't do anything with them anyways let's just log
1372 // them for now until we come up with a better solution.
1373 if (ret != ENGINE_SUCCESS) {
1374 LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1375 "process mutation", consumer->logHeader(), ret);
1377 handleSnapshotEnd(vb, mutation->getBySeqno());
1383 ENGINE_ERROR_CODE PassiveStream::commitMutation(MutationResponse* mutation,
1384 bool backfillPhase) {
1385 if (backfillPhase) {
1386 return engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1390 return engine->getEpStore()->setWithMeta(*mutation->getItem(), 0,
1391 consumer->getCookie(), true,
1392 true, INITIAL_NRU_VALUE, false,
1397 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1398 RCPtr<VBucket> vb = engine->getVBucket(vb_);
1400 return ENGINE_NOT_MY_VBUCKET;
1403 if (deletion->getBySeqno() > cur_snapshot_end.load()) {
1404 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Erroneous deletion [sequence "
1405 "number (%llu) greater than current snapshot end seqno (%llu)] "
1406 "being processed; Dropping the deletion!", consumer->logHeader(),
1407 vb_, deletion->getBySeqno(), cur_snapshot_end.load());
1408 return ENGINE_ERANGE;
1411 ENGINE_ERROR_CODE ret;
1413 LockHolder lh = vb->getSnapshotLock();
1414 ret = commitDeletion(deletion, vb->isBackfillPhase());
1415 vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start.load(),
1416 cur_snapshot_end.load());
1417 saveSnapshot = false;
1420 ret = commitDeletion(deletion, vb->isBackfillPhase());
1423 if (ret == ENGINE_KEY_ENOENT) {
1424 ret = ENGINE_SUCCESS;
1427 // We should probably handle these error codes in a better way, but since
1428 // the producer side doesn't do anything with them anyways let's just log
1429 // them for now until we come up with a better solution.
1430 if (ret != ENGINE_SUCCESS) {
1431 LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1432 "process deletion", consumer->logHeader(), ret);
1434 handleSnapshotEnd(vb, deletion->getBySeqno());
1440 ENGINE_ERROR_CODE PassiveStream::commitDeletion(MutationResponse* deletion,
1441 bool backfillPhase) {
1442 uint64_t delCas = 0;
1443 ItemMetaData meta = deletion->getItem()->getMetaData();
1444 return engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1445 &delCas, deletion->getVBucket(),
1446 consumer->getCookie(), true,
1447 &meta, backfillPhase, false,
1448 deletion->getBySeqno(), true);
1451 void PassiveStream::processMarker(SnapshotMarker* marker) {
1452 RCPtr<VBucket> vb = engine->getVBucket(vb_);
1454 cur_snapshot_start.store(marker->getStartSeqno());
1455 cur_snapshot_end.store(marker->getEndSeqno());
1456 cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
1457 saveSnapshot = true;
1460 if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1461 vb->setBackfillPhase(true);
1462 /* When replica vb is in backfill phase, then open checkpoint id
1464 vb->checkpointManager.setOpenCheckpointId(0);
1466 if (marker->getFlags() & MARKER_FLAG_CHK ||
1467 vb->checkpointManager.getOpenCheckpointId() == 0) {
1468 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1469 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1471 vb->setBackfillPhase(false);
1474 if (marker->getFlags() & MARKER_FLAG_ACK) {
1475 cur_snapshot_ack = true;
1480 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1481 engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1483 LockHolder lh (streamMutex);
1484 pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1486 bool inverse = false;
1487 if (itemsReady.compare_exchange_strong(inverse, true)) {
1488 consumer->notifyStreamReady(vb_);
1492 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1493 if (byseqno == cur_snapshot_end.load()) {
1494 if (cur_snapshot_type.load() == disk && vb->isBackfillPhase()) {
1495 vb->setBackfillPhase(false);
1496 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1497 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1499 double maxSize = static_cast<double>(engine->getEpStats().getMaxDataSize());
1500 double mem_threshold = StoredValue::getMutationMemThreshold();
1501 double mem_used = static_cast<double>(engine->getEpStats().getTotalMemoryUsed());
1502 if (maxSize * mem_threshold < mem_used) {
1503 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1504 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1508 if (cur_snapshot_ack) {
1509 LockHolder lh(streamMutex);
1510 pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1512 bool inverse = false;
1513 if (itemsReady.compare_exchange_strong(inverse, true)) {
1514 consumer->notifyStreamReady(vb_);
1516 cur_snapshot_ack = false;
1518 cur_snapshot_type.store(none);
1519 vb->setCurrentSnapshot(byseqno, byseqno);
1523 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1524 Stream::addStats(add_stat, c);
1526 const int bsize = 128;
1528 size_t buffer_bytes;
1529 size_t buffer_items;
1531 LockHolder lh(buffer.bufMutex);
1532 buffer_bytes = buffer.bytes;
1533 buffer_items = buffer.items;
1535 snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
1536 add_casted_stat(buf, buffer_items, add_stat, c);
1537 snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
1538 add_casted_stat(buf, buffer_bytes, add_stat, c);
1539 snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
1540 add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1541 snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
1542 add_casted_stat(buf, last_seqno.load(), add_stat, c);
1543 snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
1544 add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1546 snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
1547 add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type.load()), add_stat, c);
1549 if (cur_snapshot_type.load() != none) {
1550 snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
1551 add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
1552 snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
1553 add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
1557 DcpResponse* PassiveStream::next() {
1558 LockHolder lh(streamMutex);
1560 if (readyQ.empty()) {
1561 itemsReady.store(false);
1565 DcpResponse* response = readyQ.front();
1570 uint32_t PassiveStream::clearBuffer() {
1571 LockHolder lh(buffer.bufMutex);
1572 uint32_t unackedBytes = buffer.bytes;
1574 while (!buffer.messages.empty()) {
1575 DcpResponse* resp = buffer.messages.front();
1576 buffer.messages.pop();
1582 return unackedBytes;
1585 void PassiveStream::transitionState(stream_state_t newState) {
1586 LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1587 consumer->logHeader(), vb_, stateName(state_), stateName(newState));
1589 if (state_ == newState) {
1593 switch (state_.load()) {
1594 case STREAM_PENDING:
1595 cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
1597 case STREAM_READING:
1598 cb_assert(newState == STREAM_PENDING || newState == STREAM_DEAD);
1601 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1602 "to %s", consumer->logHeader(), vb_, stateName(state_),
1603 stateName(newState));
1610 const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
1613 case END_STREAM_CLOSED:
1614 return "The stream closed due to a close stream message";
1615 case END_STREAM_DISCONNECTED:
1616 return "The stream closed early because the conn was disconnected";
1620 return "Status unknown; this should not happen";