void ActiveStream::nextCheckpointItemTask() {
RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
if (vbucket) {
- std::deque<queued_item> items = getOutstandingItems(vbucket);
+ std::deque<queued_item> items;
+ getOutstandingItems(vbucket, items);
processItems(items);
} else {
/* The entity deleting the vbucket must set stream to dead,
}
}
-std::deque<queued_item> ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb) {
- std::deque<queued_item> items;
+void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
+ std::deque<queued_item> &items) {
vb->checkpointManager.getAllItemsForCursor(name_, items);
if (vb->checkpointManager.getNumCheckpoints() > 1) {
engine->getEpStore()->wakeUpCheckpointRemover();
}
-
- return items;
}
protected:
// Returns the outstanding items for the stream's checkpoint cursor.
- std::deque<queued_item> getOutstandingItems(RCPtr<VBucket> &vb);
+ void getOutstandingItems(RCPtr<VBucket> &vb, std::deque<queued_item> &items);
// Given a set of queued items, create mutation responses for each item,
// and pass onto the producer associated with this stream.