if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
std::unique_ptr<Item> it;
try {
- it = v->toItem(false, lookup.getVBucketId());
+ it = stream_->isKeyOnly() ?
+ v->toItemWithNoValue(lookup.getVBucketId()) :
+ v->toItem(false, lookup.getVBucketId());
} catch (const std::bad_alloc&) {
setStatus(ENGINE_ENOMEM);
stream_->getLogger().log(
EXTENSION_LOG_WARNING,
"Alloc error when trying to create an "
"item copy from hash table. Item seqno:%" PRIi64
- ", vb:%" PRIu16,
+ ", vb:%" PRIu16 " isKeyOnly:%s",
v->getBySeqno(),
- lookup.getVBucketId());
+ lookup.getVBucketId(),
+ stream_->isKeyOnly() ? "True" : "False");
return;
}
hbl.getHTLock().unlock();
KVStore* kvstore = engine.getKVBucket()->getROUnderlying(vbid);
ValueFilter valFilter = ValueFilter::VALUES_DECOMPRESSED;
- if (stream->isCompressionEnabled()) {
- valFilter = ValueFilter::VALUES_COMPRESSED;
+ if (stream->isKeyOnly()) {
+ valFilter = ValueFilter::KEYS_ONLY;
+ } else {
+ if (stream->isCompressionEnabled()) {
+ valFilter = ValueFilter::VALUES_COMPRESSED;
+ }
}
std::shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
DcpProducer *DcpConnMap::newProducer(const void* cookie,
const std::string &name,
- bool notifyOnly)
+ bool notifyOnly,
+ bool isKeyOnly)
{
LockHolder lh(connsLock);
}
DcpProducer* dcp = new DcpProducer(
- engine, cookie, conn_name, notifyOnly, true /*startTask*/);
+ engine, cookie, conn_name, notifyOnly, true /*startTask*/,
+ isKeyOnly ? DcpProducer::MutationType::KeyOnly :
+ DcpProducer::MutationType::KeyAndValue);
LOG(EXTENSION_LOG_INFO, "%s Connection created", dcp->logHeader());
map_[cookie] = dcp;
/**
* Find or build a dcp connection for the given cookie and with
* the given name.
+ * @param cookie The cookie representing the client
+ * @param name The name of the connection
+ * @param notifyOnly If true the producer to be created only notifies,
+ * i.e. no data is sent.
+ * @param isKeyOnly States if items sent from active streams should contain
+ * only the key.
*/
DcpProducer *newProducer(const void* cookie, const std::string &name,
- bool notifyOnly);
+ bool notifyOnly, bool isKeyOnly);
/**
const void* cookie,
const std::string& name,
bool isNotifier,
- bool startTask)
+ bool startTask,
+ MutationType mutType)
: Producer(e, cookie, name),
rejectResp(NULL),
notifyOnly(isNotifier),
lastSendTime(ep_current_time()),
log(*this),
itemsSent(0),
- totalBytesSent(0) {
+ totalBytesSent(0),
+ mutationType(mutType) {
setSupportAck(true);
setReserved(true);
setPaused(true);
s = new ActiveStream(&engine_, this, getName(), flags,
opaque, vbucket, start_seqno,
end_seqno, vbucket_uuid,
- snap_start_seqno, snap_end_seqno);
+ snap_start_seqno, snap_end_seqno,
+ (mutationType == MutationType::KeyOnly));
}
{
class DcpProducer : public Producer {
public:
+
+/*
+ * MutationType is used to state whether the active streams associated with the
+ * the DCPProducer need to send both the key and value (AllValue) or whether
+ * they can send just the key (KeyOnly).
+ */
+enum class MutationType {
+ KeyOnly,
+ KeyAndValue
+};
+
/**
* Construct a DCP Producer
*
* sent.
* @param startTask If true an internal checkpoint task is created and
* started. Test code may wish to defer or manually handle the task
- * creation.
+ * creation.
+ * @param mutType The MutationType to use for the items sent from the
+ * DcpProducer.
*/
DcpProducer(EventuallyPersistentEngine& e,
const void* cookie,
const std::string& n,
bool notifyOnly,
- bool startTask);
+ bool startTask,
+ MutationType mutType);
~DcpProducer();
ExTask checkpointCreatorTask;
static const std::chrono::seconds defaultDcpNoopTxInterval;
+ // mutationType i.e. KeyOnly or AllValue, used to determine how all
+ // active streams belonging to the DcpProducer should send their data.
+ MutationType mutationType;
+
};
#endif // SRC_DCP_PRODUCER_H_
class MutationResponse : public DcpResponse {
public:
- MutationResponse(queued_item item, uint32_t opaque,
+ MutationResponse(queued_item item, uint32_t opaque, bool isKeyOnly,
ExtendedMetaData *e = NULL)
: DcpResponse(item->isDeleted() ? Event::Deletion : Event::Mutation, opaque),
- item_(item), emd(e) {}
+ item_(item), keyOnly(isKeyOnly), emd(e) {}
queued_item& getItem() {
return item_;
}
Item* getItemCopy() {
- return new Item(*item_);
+ return new Item(*item_, keyOnly);
}
uint16_t getVBucket() {
uint32_t getMessageSize() {
const uint32_t base = item_->isDeleted() ? deletionBaseMsgBytes :
mutationBaseMsgBytes;
- uint32_t body = item_->getKey().size() + item_->getNBytes();
+
+ uint32_t body = item_->getKey().size();
+ if (!keyOnly) {
+ // Add the size of the value
+ body += item_->getNBytes();
+ }
if (emd) {
body += emd->getExtMeta().second;
private:
queued_item item_;
+
+ // Whether the response should contain the key and value or just the key
+ bool keyOnly;
+
std::unique_ptr<ExtendedMetaData> emd;
};
uint64_t en_seqno,
uint64_t vb_uuid,
uint64_t snap_start_seqno,
- uint64_t snap_end_seqno)
+ uint64_t snap_end_seqno,
+ bool isKeyOnly)
: Stream(n,
flags,
opaque,
engine(e),
producer(p),
lastSentSnapEndSeqno(0),
- chkptItemsExtractionInProgress(false) {
+ chkptItemsExtractionInProgress(false),
+ keyOnly(isKeyOnly) {
const char* type = "";
if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
type = "takeover ";
std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
queued_item& item) {
if (item->getOperation() != queue_op::system_event) {
- return std::make_unique<MutationResponse>(item, opaque_);
+ return std::make_unique<MutationResponse>(item, opaque_, isKeyOnly());
} else {
return SystemEventProducerMessage::make(opaque_, item);
}
const std::string &name, uint32_t flags, uint32_t opaque,
uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
uint64_t vb_uuid, uint64_t snap_start_seqno,
- uint64_t snap_end_seqno);
+ uint64_t snap_end_seqno, bool isKeyOnly);
~ActiveStream();
in-memory to backfilling */
void handleSlowStream();
+ /// @Returns true if keyOnly is true and false if KeyOnly is false
+ bool isKeyOnly() const {
+ return keyOnly;
+ }
+
protected:
// Returns the outstanding items for the stream's checkpoint cursor.
void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
items are added to the readyQ */
std::atomic<bool> chkptItemsExtractionInProgress;
+ // Whether the responses sent using this stream should contain the key and
+ // value or just the key
+ bool keyOnly;
+
};
}
ConnHandler *handler = NULL;
+ const bool keyOnly = flags & DCP_OPEN_NO_VALUE;
if (flags & DCP_OPEN_PRODUCER) {
- handler = dcpConnMap_->newProducer(cookie, connName, false);
+ handler = dcpConnMap_->newProducer(cookie, connName, false, keyOnly);
} else if (flags & DCP_OPEN_NOTIFIER) {
- handler = dcpConnMap_->newProducer(cookie, connName, true);
+ handler = dcpConnMap_->newProducer(cookie, connName, true, keyOnly);
} else {
handler = dcpConnMap_->newConsumer(cookie, connName);
}
return itm;
}
+std::unique_ptr<Item> StoredValue::toItemWithNoValue(uint16_t vbucket) const {
+ auto itm =
+ std::make_unique<Item>(getKey(),
+ getFlags(),
+ getExptime(),
+ /* valuePtr */ nullptr,
+ /* valuelen */ 0,
+ /* ext_meta*/ nullptr,
+ /* ext_len */ 0,
+ getCas(),
+ getBySeqno(),
+ vbucket,
+ getRevSeqno());
+
+ return itm;
+}
+
void StoredValue::reallocate() {
// Allocate a new Blob for this stored value; copy the existing Blob to
// the new one and free the old.
std::unique_ptr<Item> toItem(bool lck, uint16_t vbucket) const;
/**
+ * Generate a new Item with only key and metadata out of this object.
+ * The item generated will not contain value
+ *
+ * @param vbucket the vbucket containing this item.
+ */
+ std::unique_ptr<Item> toItemWithNoValue(uint16_t vbucket) const;
+
+ /**
* Set the memory threshold on the current bucket quota for accepting a new mutation
*/
static void setMutationMemoryThreshold(double memThreshold);
const void* cookie,
const std::string& name,
bool isNotifier,
- bool startTask = true)
- : DcpProducer(theEngine, cookie, name, isNotifier, startTask) {
+ bool startTask = true,
+ DcpProducer::MutationType mutationType =
+ DcpProducer::MutationType::KeyAndValue)
+ : DcpProducer(theEngine, cookie, name,
+ isNotifier, startTask, mutationType) {
}
ENGINE_ERROR_CODE maybeDisconnect() {
return *static_cast<ActiveStreamCheckpointProcessorTask*>(
checkpointCreatorTask.get());
}
+
+ /**
+ * Finds the stream for a given vbucket
+ */
+ SingleThreadedRCPtr<Stream> findStream(uint16_t vbid) {
+ return DcpProducer::findStream(vbid);
+ }
};
uint64_t en_seqno,
uint64_t vb_uuid,
uint64_t snap_start_seqno,
- uint64_t snap_end_seqno)
+ uint64_t snap_end_seqno,
+ bool isKeyOnly = false)
: ActiveStream(e,
p,
name,
en_seqno,
vb_uuid,
snap_start_seqno,
- snap_end_seqno) {
+ snap_end_seqno,
+ isKeyOnly) {
}
// Expose underlying protected ActiveStream methods as public
int getNumBackfillItemsRemaining() const {
return backfillRemaining;
}
+
+ std::unique_ptr<DcpResponse> public_makeResponseFromItem(
+ queued_item& item) {
+ return makeResponseFromItem(item);
+ }
};
/* Mock of the PassiveStream class. Wraps the real PassiveStream, but exposes
cookieP,
"test_producer",
/*notifyOnly*/ false,
- /*startTask*/ false);
+ /*startTask*/ false,
+ DcpProducer::MutationType::KeyAndValue);
// Create the task object, but don't schedule
producer->createCheckpointProcessorTask();
}
// Setup a DCP producer and attach a stream and cursor to it.
- void setup_dcp_stream() {
- producer = new DcpProducer(*engine,
+ void setup_dcp_stream(DcpProducer::MutationType mutationType =
+ DcpProducer::MutationType::KeyAndValue) {
+ producer = new MockDcpProducer(*engine,
/*cookie*/ nullptr,
"test_producer",
/*notifyOnly*/ false,
- /*startTask*/ true);
+ /*startTask*/ true,
+ mutationType);
stream = new MockActiveStream(engine, producer,
producer->getName(), /*flags*/0,
/*opaque*/0, vbid,
/*en_seqno*/~0,
/*vb_uuid*/0xabcd,
/*snap_start_seqno*/0,
- /*snap_end_seqno*/~0);
+ /*snap_end_seqno*/~0,
+ mutationType == DcpProducer::MutationType::KeyOnly ?
+ true : false);
EXPECT_FALSE(vb0->checkpointManager.registerCursor(
producer->getName(),
<< "Found an existing TAP cursor when attempting to register ours";
}
+ /*
+ * Fake callback emulating dcp_add_failover_log
+ */
+ static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(vbucket_failover_t* entry,
+ size_t nentries,
+ const void *cookie) {
+ return ENGINE_SUCCESS;
+ }
+
dcp_producer_t producer;
stream_t stream;
VBucketPtr vb0;
};
+/*
+ * Test that when have a producer with MutationType set to KeyOnly an active
+ * stream created via a streamRequest returns true for isKeyOnly.
+ */
+TEST_P(StreamTest, test_streamSetMutationTypeKeyOnly) {
+ setup_dcp_stream(DcpProducer::MutationType::KeyOnly);
+ uint64_t rollbackSeqno;
+ auto err = producer->streamRequest(/*flags*/0,
+ /*opaque*/0,
+ /*vbucket*/0,
+ /*start_seqno*/0,
+ /*end_seqno*/0,
+ /*vb_uuid*/0,
+ /*snap_start*/0,
+ /*snap_end*/0,
+ &rollbackSeqno,
+ StreamTest::fakeDcpAddFailoverLog);
+ ASSERT_EQ(ENGINE_SUCCESS, err)
+ << "stream request did not return ENGINE_SUCCESS";
+
+ stream = dynamic_cast<MockDcpProducer*>(producer.get())->findStream(0);
+ EXPECT_TRUE(dynamic_cast<ActiveStream*>(stream.get())->isKeyOnly());
+ producer.get()->closeAllStreams();
+}
+
+/*
+ * Test that when have a producer with MutationType set to KeyAndValue an active
+ * stream created via a streamRequest returns false for isKeyOnly.
+ */
+TEST_P(StreamTest, test_streamSettMutationTypeAllValue) {
+ setup_dcp_stream(DcpProducer::MutationType::KeyAndValue);
+ uint64_t rollbackSeqno;
+ auto err = producer->streamRequest(/*flags*/0,
+ /*opaque*/0,
+ /*vbucket*/0,
+ /*start_seqno*/0,
+ /*end_seqno*/0,
+ /*vb_uuid*/0,
+ /*snap_start*/0,
+ /*snap_end*/0,
+ &rollbackSeqno,
+ StreamTest::fakeDcpAddFailoverLog);
+ ASSERT_EQ(ENGINE_SUCCESS, err)
+ << "stream request did not return ENGINE_SUCCESS";
+
+ stream = dynamic_cast<MockDcpProducer*>(producer.get())->findStream(0);
+ EXPECT_FALSE(dynamic_cast<ActiveStream*>(stream.get())->isKeyOnly());
+ producer.get()->closeAllStreams();
+}
+
+/*
+ * Test for a dcpResponse retrieved from a stream where isKeyOnly is true, that
+ * the message size does not include the size of the body.
+ */
+TEST_P(StreamTest, test_keyOnlyMessageSize) {
+ std::string value("value");
+ uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_DATATYPE_JSON};
+ SingleThreadedRCPtr<Item> item = std::make_unique<Item>(
+ makeStoredDocKey("key"),
+ 0,
+ 0,
+ value.c_str(),
+ value.size(),
+ ext_meta,
+ sizeof(ext_meta));
+
+ auto keyOnlyMessageSize = MutationResponse::mutationBaseMsgBytes +
+ item.get()->getKey().size();
+ queued_item qi(item);
+
+ setup_dcp_stream(DcpProducer::MutationType::KeyOnly);
+ std::unique_ptr<DcpResponse> dcpResponse =
+ dynamic_cast<MockActiveStream*>(
+ stream.get())->public_makeResponseFromItem(qi);
+
+ EXPECT_EQ(keyOnlyMessageSize, dcpResponse->getMessageSize());
+}
+
+/*
+ * Test for a dcpResponse retrieved from a stream where isKeyOnly is false, that
+ * the message size includes the size of the body.
+ */
+TEST_P(StreamTest, test_keyAndValueMessageSize) {
+ std::string value("value");
+ uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_DATATYPE_JSON};
+ SingleThreadedRCPtr<Item> item = std::make_unique<Item>(
+ makeStoredDocKey("key"),
+ 0,
+ 0,
+ value.c_str(),
+ value.size(),
+ ext_meta,
+ sizeof(ext_meta));
+
+ auto keyAndValueMessageSize = MutationResponse::mutationBaseMsgBytes +
+ item.get()->getKey().size() + item->getNBytes();
+ queued_item qi(item);
+
+ setup_dcp_stream(DcpProducer::MutationType::KeyAndValue);
+ std::unique_ptr<DcpResponse> dcpResponse =
+ dynamic_cast<MockActiveStream*>(
+ stream.get())->public_makeResponseFromItem(qi);
+
+ EXPECT_EQ(keyAndValueMessageSize, dcpResponse->getMessageSize());
+}
+
/* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
* queued items to be sent out via a DCP consumer, that nextCheckpointItem()
* doesn't incorrectly return false (meaning that there are no more checkpoint
const void *cookie = create_mock_cookie();
// Create a new Dcp producer
dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
- /*notifyOnly*/false);
+ /*notifyOnly*/false, /*isKeyOnly*/false);
// Disconnect the producer connection
connMap.disconnect(cookie);
const void *cookie = create_mock_cookie();
// Create a new Dcp producer
dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
- /*notifyOnly*/false);
+ /*notifyOnly*/false, /*isKeyOnly*/false);
// should be able to find the connection
ASSERT_NE(connection_t(nullptr),
connMap.findByName("eq_dcpq:test_producer"));
const void* cookie2 = create_mock_cookie();
// Create a new Dcp producer
dcp_producer_t producer = connMap.newProducer(cookie1, "test_producer",
- /*notifyOnly*/false);
+ /*notifyOnly*/false, /*isKeyOnly*/false);
ASSERT_NE(0, producer) << "producer is null";
// should be able to find the connection
ASSERT_NE(connection_t(nullptr),
// Create a duplicate Dcp producer
dcp_producer_t duplicateproducer = connMap.newProducer(
- cookie2, "test_producer", /*notifyOnly*/false);
+ cookie2, "test_producer", /*notifyOnly*/false, /*isKeyOnly*/false);
ASSERT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
ASSERT_NE(0, duplicateproducer) << "duplicateproducer is null";
const void* cookie2 = create_mock_cookie();
// Create a new Dcp producer
dcp_producer_t producer = connMap.newProducer(cookie1, "test_producer",
- /*notifyOnly*/false);
+ /*notifyOnly*/false,
+ /*isKeyOnly*/false);
EXPECT_NE(0, producer) << "producer is null";
// Create a duplicate Dcp producer
dcp_producer_t duplicateproducer = connMap.newProducer(cookie2, "test_producer",
- /*notifyOnly*/false);
+ /*notifyOnly*/false,
+ /*isKeyOnly*/false);
EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
EXPECT_NE(0, duplicateproducer) << "duplicateproducer is null";
const void* cookie = create_mock_cookie();
// Create a new Dcp producer
dcp_producer_t producer = connMap.newProducer(cookie, "test_producer1",
- /*notifyOnly*/false);
+ /*notifyOnly*/false,
+ /*isKeyOnly*/false);
// Create a duplicate Dcp producer
dcp_producer_t duplicateproducer = connMap.newProducer(cookie, "test_producer2",
- /*notifyOnly*/false);
+ /*notifyOnly*/false,
+ /*isKeyOnly*/false);
EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
EXPECT_EQ(0, duplicateproducer) << "duplicateproducer is not null";
const void *cookie = create_mock_cookie();
// Create a new Dcp producer
dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
- /*notifyOnly*/false);
+ /*notifyOnly*/false, /*isKeyOnly*/false);
// Disconnect the producer connection
connMap.disconnect(cookie);
const void *cookie = create_mock_cookie();
// Create a new Dcp producer.
dcp_producer_t producer = connMap.newProducer(cookie, "mb_20716r",
- /*notifyOnly*/false);
+ /*notifyOnly*/false,
+ /*isKeyOnly*/false);
// Check preconditions.
EXPECT_TRUE(producer->isPaused());
// Use 'this' instead of a mock cookie
producer = connMap->newProducer(static_cast<void*>(this),
"test_producer",
- /*notifyOnly*/false);
+ /*notifyOnly*/false,
+ /*isKeyOnly*/false);
}
void notify() {
// Create a new Dcp producer, reserving its cookie.
get_mock_server_api()->cookie->reserve(cookie);
dcp_producer_t producer = engine->getDcpConnMap().newProducer(
- cookie, "test_producer", /*notifyOnly*/false);
+ cookie, "test_producer", /*notifyOnly*/false, /*isKeyOnly*/false);
uint64_t rollbackSeqno;
auto err = producer->streamRequest(/*flags*/0,
// Create a new Dcp producer, reserving its cookie.
get_mock_server_api()->cookie->reserve(cookie);
dcp_producer_t producer = engine->getDcpConnMap().newProducer(
- cookie, "mb_20716r", /*notifyOnly*/false);
+ cookie, "mb_20716r", /*notifyOnly*/false, /*isKeyOnly*/false);
// Check preconditions.
EXPECT_TRUE(producer->isPaused());