std::unique_ptr<Item> Collections::VB::Manifest::createSeparatorChangedEvent(
uint32_t revision, OptionalSeqno seqno) const {
// Create an item (to be queued and written to disk) that represents
- // the change of the separator. We serialise the state of this object
- // into the Item's value.
-
+ // the change of the separator. The item always has the same key.
+ // We serialise the state of this object into the Item's value.
auto item =
SystemEventFactory::make(SystemEvent::CollectionsSeparatorChanged,
- separator + std::to_string(revision),
+ {/*empty string*/},
getSerialisedDataSize(),
seqno);
return manifest.doesKeyContainValidCollection(key);
}
+ /**
+ * @returns the current separator
+ */
+ std::string getSeparator() const {
+ return manifest.getSeparator();
+ }
+
private:
std::unique_lock<cb::ReaderLock> readLock;
const Manifest& manifest;
*/
bool doesKeyContainValidCollection(const ::DocKey& key) const;
+ /**
+ * @returns the current separator
+ */
+ std::string getSeparator() const {
+ return separator;
+ }
+
protected:
/**
* Add a collection entry to the manifest specifing the revision that it was
}
Item* itmCpy = nullptr;
- auto* mutationResponse = dynamic_cast<MutationResponse*>(resp);
+ auto* mutationResponse = dynamic_cast<MutationProducerResponse*>(resp);
if (mutationResponse != nullptr) {
try {
itmCpy = mutationResponse->getItemCopy();
};
/**
+ * Extend MutationResponse for the DcpProducer only so that it can store
+ * the collection length which we replicate to all collection-aware consumers.
+ */
+class MutationProducerResponse : public MutationResponse {
+public:
+ MutationProducerResponse(queued_item item,
+ uint32_t opaque,
+ bool isKeyOnly,
+ uint8_t _collectionLen,
+ ExtendedMetaData* e = NULL)
+ : MutationResponse(item, opaque, isKeyOnly, e),
+ collectionLen(_collectionLen) {
+ }
+
+ uint8_t getCollectionLen() const {
+ return collectionLen;
+ }
+
+private:
+ uint8_t collectionLen;
+};
+
+/**
* SystemEventMessage defines the interface required by consumer and producer
* message classes.
*/
itemsReady.store(true);
// lock is released on leaving the scope
}
+
+ // Finally obtain a copy of the current separator
+ currentSeparator = vbucket->getManifest().lock().getSeparator();
}
ActiveStream::~ActiveStream() {
itemsFromMemoryPhase++;
}
}
+
+ // See if the currentSeparator needs changing
+ maybeChangeSeparator(response);
popFromReadyQ();
return response;
}
std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
queued_item& item) {
if (item->getOperation() != queue_op::system_event) {
- return std::make_unique<MutationResponse>(item, opaque_, isKeyOnly());
+ auto cKey = Collections::DocKey::make(item->getKey(), currentSeparator);
+ return std::make_unique<MutationProducerResponse>(
+ item, opaque_, isKeyOnly(), cKey.getCollectionLen());
} else {
return SystemEventProducerMessage::make(opaque_, item);
}
vbucket->checkpointManager.removeCursor(name_);
}
+void ActiveStream::maybeChangeSeparator(DcpResponse* response) {
+ if (response->getEvent() == DcpResponse::Event::SystemEvent) {
+ auto se = static_cast<SystemEventProducerMessage*>(response);
+ if (se->getSystemEvent() == SystemEvent::CollectionsSeparatorChanged) {
+ currentSeparator =
+ std::string(se->getKey().data(), se->getKey().size());
+ }
+ }
+}
+
NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
const std::string &name, uint32_t flags,
uint32_t opaque, uint16_t vb, uint64_t st_seqno,
in-memory to backfilling */
void handleSlowStream();
- /// @Returns true if keyOnly is true and false if KeyOnly is false
+ /// @returns true if keyOnly is true and false if KeyOnly is false
bool isKeyOnly() const {
return keyOnly;
}
+ /// @returns a copy of the current collections separator.
+ std::string getCurrentSeparator() const {
+ return currentSeparator;
+ }
+
protected:
// Returns the outstanding items for the stream's checkpoint cursor.
void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
*/
void transitionState(StreamState newState);
+ /**
+ * Check to see if the response is a CollectionsSeparatorChanged event
+ * which would update the separator.
+ *
+ * @param response A DcpResponse that is about to be sent to a client
+ */
+ void maybeChangeSeparator(DcpResponse* response);
+
/* Indicates that a backfill has been scheduled and has not yet completed.
* Is protected (as opposed to private) for testing purposes.
*/
// value or just the key
bool keyOnly;
+ /**
+ * A copy of the collections separator so we can generate MutationResponse
+ * instances that embed the collection/document-name data so we can
+ * replicate that collection information (as a length).
+ *
+ * As checkpoints/backfills are processed, we will monitor for
+ * CollectionsSeparatorChanged events and update the copy accordingly.
+ */
+ std::string currentSeparator;
};
case SystemEvent::CollectionsSeparatorChanged: {
// CollectionSeparatorChanged SystemEvent results in:
// An update to the persisted collection manifest (updating the
- // "separator" field).
- // No document is persisted.
- key = Collections::SeparatorChangedKey + keyExtra;
+ // "separator" field) and a document is persisted.
+ // Note: the key of this document is fixed so only 1 document exists
+ // regardless of the changes made.
+ key = Collections::SeparatorChangedKey;
break;
}
}
switch (SystemEvent(item->getFlags())) {
case SystemEvent::CreateCollection:
case SystemEvent::DeleteCollectionHard:
- case SystemEvent::DeleteCollectionSoft: {
- // CreateCollection updates the manifest and writes a SystemEvent
- // DeleteCollection* both update the manifest and write a SystemEvent
- saveCollectionsManifestItem(item);
- return ProcessStatus::Continue;
- }
- case SystemEvent::BeginDeleteCollection:
+ case SystemEvent::DeleteCollectionSoft:
case SystemEvent::CollectionsSeparatorChanged: {
- // These two will update the manifest but should not write an Item
- saveCollectionsManifestItem(item);
- return ProcessStatus::Skip;
+ saveCollectionsManifestItem(item); // Updates manifest
+ return ProcessStatus::Continue; // And flushes an item
+ }
+ case SystemEvent::BeginDeleteCollection: {
+ saveCollectionsManifestItem(item); // Updates manifest
+ return ProcessStatus::Skip; // But skips flushing the item
}
}
bool SystemEventFlush::isUpsert(const Item& item) {
if (item.getOperation() == queue_op::system_event) {
- // CreateCollection and DeleteCollection* are the only valid events.
+ // CreateCollection, CollectionsSeparatorChanged and DeleteCollection*
+ // are the only valid events.(returning true/false)
// The ::process function should of skipped BeginDeleteCollection and
- // CollectionsSeparatorChanged
+ // thus is an error if calling this method with such an event.
switch (SystemEvent(item.getFlags())) {
- case SystemEvent::CreateCollection: {
+ case SystemEvent::CreateCollection:
+ case SystemEvent::CollectionsSeparatorChanged: {
return true;
}
- case SystemEvent::BeginDeleteCollection:
- case SystemEvent::CollectionsSeparatorChanged: {
- throw std::logic_error("SystemEventFlush::isUpsert event " +
- to_string(SystemEvent(item.getFlags())) +
- " should neither delete or upsert ");
+ case SystemEvent::BeginDeleteCollection: {
+ throw std::invalid_argument(
+ "SystemEventFlush::isUpsert event " +
+ to_string(SystemEvent(item.getFlags())) +
+ " should neither delete or upsert ");
}
case SystemEvent::DeleteCollectionHard:
case SystemEvent::DeleteCollectionSoft: {
/**
* The BeginDeleteCollection system event is generated when a VBucket
- * receives a manifest that removes a collection. The events's purpose is to
+ * receives a manifest that removes a collection. The event's purpose is to
* carry data to the flusher so we can persist a new collections JSON
* manifest that indicates the collection is now in the process of being
* removed. This is indicated by changing the end-seqno of a collection's
/**
* The CollectionsSeparatorChanged system event is generated when a VBucket
* changes the separator used for identifying collections in keys. This
- * must result in a vbucket manifest update but no item is stored.
+ * must result in a vbucket manifest update and a SystemEvent document is
+ * stored. All separator changes write to the same SystemEvent document.
*/
CollectionsSeparatorChanged
};
*this, separator, revision, bySeqno);
}
+ /**
+ * Get the collection manifest
+ *
+ * @return const reference to the manifest
+ */
+ const Collections::VB::Manifest& getManifest() const {
+ return manifest;
+ }
+
static const vbucket_state_t ACTIVE;
static const vbucket_state_t REPLICA;
static const vbucket_state_t PENDING;
SingleThreadedRCPtr<Stream> findStream(uint16_t vbid) {
return DcpProducer::findStream(vbid);
}
+
+ std::string getCurrentSeparatorForStream(uint16_t vbid) {
+ auto stream = findStream(vbid);
+ if (stream) {
+ auto as = static_cast<ActiveStream*>(stream.get());
+ return as->getCurrentSeparator();
+ }
+ return {};
+ }
};
TEST_F(CollectionsWarmupTest, warmup) {
VBucketPtr vb = store->getVBucket(vbid);
- // Add the meat collection
+ // Add the meat collection *and* change the separator
vb->updateFromManifest(
{R"({"revision":1,"separator":"-+-","collections":["$default","meat"]})"});
- // Trigger a flush to disk. Flushes the meat create event and 1 item
- flush_vbucket_to_disk(vbid, 1);
+ // Trigger a flush to disk. Flushes the meat create event and a separator
+ // changed event.
+ flush_vbucket_to_disk(vbid, 2);
// Now we can write to beef
store_item(vbid, {"meat-+-beef", DocNamespace::Collections}, "value");
// Step which will notify the snapshot task
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
+ // The producer should start with the old separator
+ EXPECT_EQ("::", producer->getCurrentSeparatorForStream(vbid));
+
EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
// Now call run on the snapshot task to move checkpoint into DCP stream
+ // this will trigger the stream to update the separator
producer->getCheckpointSnapshotTask().run();
// Next step which should process a snapshot marker
VBucketPtr replica = store->getVBucket(replicaVB);
+ // The replica should have the :: separator
+ EXPECT_EQ("::", replica->lockCollections().getSeparator());
+
// Now step the producer to transfer the separator
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
+ // The producer should now have the new separator
+ EXPECT_EQ("@@", producer->getCurrentSeparatorForStream(vbid));
+
+ // The replica should now have the new separator
+ EXPECT_EQ("@@", replica->lockCollections().getSeparator());
+
// Now step the producer to transfer the collection
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
// And done
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
}
+
+TEST_F(CollectionsDcpTest, test_dcp_separator_many) {
+ auto vb = store->getVBucket(vbid);
+
+ // Change the separator
+ vb->updateFromManifest(
+ {R"({"revision":1,"separator":"@@","collections":["$default"]})"});
+ // Change the separator
+ vb->updateFromManifest(
+ {R"({"revision":2,"separator":":","collections":["$default"]})"});
+ // Change the separator
+ vb->updateFromManifest(
+ {R"({"revision":3,"separator":",","collections":["$default"]})"});
+ // Add a collection
+ vb->updateFromManifest(
+ {R"({"revision":4,"separator":",","collections":["$default","meat"]})"});
+
+ // All the changes will be collapsed into one update and we will expect
+ // to see , as the separator once DCP steps through the checkpoint
+ producer->notifySeqnoAvailable(vb->getId(), vb->getHighSeqno());
+
+ // Step which will notify the snapshot task
+ EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
+
+ // The producer should start with the initial separator
+ EXPECT_EQ("::", producer->getCurrentSeparatorForStream(vbid));
+
+ EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
+
+ // Now call run on the snapshot task to move checkpoint into DCP stream
+ // this will trigger the stream to update the separator
+ producer->getCheckpointSnapshotTask().run();
+
+ // Next step which should process a snapshot marker
+ EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
+
+ auto replica = store->getVBucket(replicaVB);
+
+ // The replica should have the :: separator
+ EXPECT_EQ("::", replica->lockCollections().getSeparator());
+
+ // Now step the producer to transfer the separator
+ EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
+
+ // The producer should now have the new separator
+ EXPECT_EQ(",", producer->getCurrentSeparatorForStream(vbid));
+
+ // The replica should now have the new separator
+ EXPECT_EQ(",", replica->lockCollections().getSeparator());
+
+ // Now step the producer to transfer the collection
+ EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
+
+ // Collection should now be live on the replica with the final separator
+ EXPECT_TRUE(replica->lockCollections().doesKeyContainValidCollection(
+ {"meat,bacon", DocNamespace::Collections}));
+
+ // And done
+ EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
+}