void ActiveStream::nextCheckpointItemTask() {
RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
- if (!vbucket) {
+ if (vbucket) {
+ std::deque<queued_item> items = getOutstandingItems(vbucket);
+ processItems(items);
+ } else {
/* The entity deleting the vbucket must set stream to dead,
calling setDead(END_STREAM_STATE) will cause deadlock because
it will try to grab streamMutex which is already acquired at this
point here */
return;
}
- bool mark = false;
+}
+
+std::deque<queued_item> ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb) {
std::deque<queued_item> items;
- std::deque<MutationResponse*> mutations;
- vbucket->checkpointManager.getAllItemsForCursor(name_, items);
- if (vbucket->checkpointManager.getNumCheckpoints() > 1) {
+ vb->checkpointManager.getAllItemsForCursor(name_, items);
+ if (vb->checkpointManager.getNumCheckpoints() > 1) {
engine->getEpStore()->wakeUpCheckpointRemover();
}
+ return items;
+}
+
+
+void ActiveStream::processItems(std::deque<queued_item>& items) {
+ bool mark = false;
if (items.empty()) {
producer->notifyStreamReady(vb_, true);
return;
mark = true;
}
+ std::deque<MutationResponse*> mutations;
std::deque<queued_item>::iterator itemItr;
for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
queued_item& qi = *itemItr;
// Runs on ActiveStreamCheckpointProcessorTask
void nextCheckpointItemTask();
+protected:
+ // Returns the outstanding items for the stream's checkpoint cursor.
+ std::deque<queued_item> getOutstandingItems(RCPtr<VBucket> &vb);
+
+ // Given a set of queued items, create mutation responses for each item,
+ // and pass onto the producer associated with this stream.
+ void processItems(std::deque<queued_item>& items);
+
private:
void transitionState(stream_state_t newState);