cb_assert(err == ENGINE_SUCCESS);
RCPtr<VBucket> vb = engine_.getVBucket(vbid);
- streams[vbid]->reconnectStream(vb, opaque, vb->getHighSeqno());
+ passive_stream_t stream = streams[vbid];
+ if (stream) {
+ stream->reconnectStream(vb, opaque, vb->getHighSeqno());
+ }
return false;
}
public:
RollbackTask(EventuallyPersistentEngine* e,
uint32_t opaque_, uint16_t vbid_,
- uint64_t rollbackSeqno_, DcpConsumer *conn,
+ uint64_t rollbackSeqno_, dcp_consumer_t conn,
const Priority &p):
GlobalTask(e, p, 0, false), engine(e),
opaque(opaque_), vbid(vbid_), rollbackSeqno(rollbackSeqno_),
uint32_t opaque;
uint16_t vbid;
uint64_t rollbackSeqno;
- DcpConsumer* cons;
+ dcp_consumer_t cons;
};
#endif // SRC_DCP_CONSUMER_H_