if (nullptr == itm) {
return false;
}
- LockHolder lh(streamMutex);
- if (state_ == STREAM_BACKFILLING) {
- if (!producer->recordBackfillManagerBytesRead(itm->size())) {
- delete itm;
- return false;
- }
- bufferedBackfill.bytes.fetch_add(itm->size());
- bufferedBackfill.items++;
+ if (itm->shouldReplicate()) {
+ LockHolder lh(streamMutex);
+ if (state_ == STREAM_BACKFILLING) {
+ if (!producer->recordBackfillManagerBytesRead(itm->size())) {
+ delete itm;
+ return false;
+ }
- pushToReadyQ(new MutationResponse(itm, opaque_,
- prepareExtendedMetaData(itm->getVBucketId(),
- itm->getConflictResMode())));
+ bufferedBackfill.bytes.fetch_add(itm->size());
+ bufferedBackfill.items++;
- lastReadSeqno.store(itm->getBySeqno());
- lh.unlock();
- bool inverse = false;
- if (itemsReady.compare_exchange_strong(inverse, true)) {
- producer->notifyStreamReady(vb_, false);
- }
+ pushToReadyQ(new MutationResponse(itm, opaque_,
+ prepareExtendedMetaData(itm->getVBucketId(),
+ itm->getConflictResMode())));
+
+ lastReadSeqno.store(itm->getBySeqno());
+ lh.unlock();
+ bool inverse = false;
+ if (itemsReady.compare_exchange_strong(inverse, true)) {
+ producer->notifyStreamReady(vb_, false);
+ }
- if (backfill_source == BACKFILL_FROM_MEMORY) {
- backfillItems.memory++;
+ if (backfill_source == BACKFILL_FROM_MEMORY) {
+ backfillItems.memory++;
+ } else {
+ backfillItems.disk++;
+ }
} else {
- backfillItems.disk++;
+ delete itm;
}
} else {
delete itm;
for (; itr != items.end(); ++itr) {
queued_item& qi = *itr;
- if (qi->getOperation() == queue_op_set ||
- qi->getOperation() == queue_op_del) {
+ if (qi->shouldReplicate()) {
curChkSeqno = qi->getBySeqno();
lastReadSeqnoUnSnapshotted = qi->getBySeqno();
uint64_t maxDeletedRevSeqno = 0;
std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
std::vector<queued_item>::iterator it = items.begin();
+
for(; it != items.end(); ++it) {
- if ((*it)->getOperation() != queue_op_set &&
- (*it)->getOperation() != queue_op_del) {
+
+ if (!(*it)->shouldPersist()) {
continue;
} else if (!prev || prev->getKey() != (*it)->getKey()) {
prev = (*it).get();
ObjectRegistry::onCreateItem(this);
}
- Item(const std::string &k, const uint16_t vb,
- enum queue_operation o, const uint64_t revSeq,
- const int64_t bySeq, uint8_t nru_value = INITIAL_NRU_VALUE,
- uint8_t conflict_res_value = revision_seqno) :
- metaData(),
- key(k),
- bySeqno(bySeq),
- queuedTime(ep_current_time()),
- vbucketId(vb),
- op(static_cast<uint16_t>(o)),
- nru(nru_value),
- conflictResMode(conflict_res_value)
+ Item(const std::string &k, const uint16_t vb,
+ enum queue_operation o, const uint64_t revSeq,
+ const int64_t bySeq, uint8_t nru_value = INITIAL_NRU_VALUE,
+ uint8_t conflict_res_value = revision_seqno) :
+ metaData(),
+ key(k),
+ bySeqno(bySeq),
+ queuedTime(ep_current_time()),
+ vbucketId(vb),
+ op(static_cast<uint16_t>(o)),
+ nru(nru_value),
+ conflictResMode(conflict_res_value)
{
if (bySeqno < 0) {
throw std::invalid_argument("Item(): bySeqno must be non-negative");
return static_cast<enum queue_operation>(op);
}
+ /*
+ * Should this item be persisted?
+ */
+ bool shouldPersist() const {
+ return (op == queue_op_set) ||
+ (op == queue_op_del);
+ }
+
+ /*
+ * Should this item be replicated (e.g. by DCP)
+ */
+ bool shouldReplicate() const {
+ return (op == queue_op_set) ||
+ (op == queue_op_del);
+ }
+
void setOperation(enum queue_operation o) {
op = static_cast<uint8_t>(o);
}