MB-16181: Make ActiveStream track the current collections separator 02/75502/9
authorJim Walker <jim@couchbase.com>
Fri, 21 Apr 2017 18:31:42 +0000 (19:31 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 26 Apr 2017 11:18:33 +0000 (11:18 +0000)
This patch updates the ActiveStream so that it stores a copy of the current
separator and tracks changes to as they are transmitted through the checkpoint
via the separator changed SystemEvent.

Change-Id: Ie3ea87d006b0bbab3e0edd8895a4756c7c5d9fe8
Reviewed-on: http://review.couchbase.org/75502
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
src/collections/vbucket_manifest.cc
src/collections/vbucket_manifest.h
src/dcp/producer.cc
src/dcp/response.h
src/dcp/stream.cc
src/dcp/stream.h
src/systemevent.cc
src/systemevent.h
src/vbucket.h
tests/mock/mock_dcp_producer.h
tests/module_tests/collections/evp_store_collections_test.cc

index 98360fd..b4b862b 100644 (file)
@@ -350,12 +350,11 @@ std::unique_ptr<Item> Collections::VB::Manifest::createSystemEvent(
 std::unique_ptr<Item> Collections::VB::Manifest::createSeparatorChangedEvent(
         uint32_t revision, OptionalSeqno seqno) const {
     // Create an item (to be queued and written to disk) that represents
-    // the change of the separator. We serialise the state of this object
-    // into the Item's value.
-
+    // the change of the separator. The item always has the same key.
+    // We serialise the state of this object  into the Item's value.
     auto item =
             SystemEventFactory::make(SystemEvent::CollectionsSeparatorChanged,
-                                     separator + std::to_string(revision),
+                                     {/*empty string*/},
                                      getSerialisedDataSize(),
                                      seqno);
 
index 83fbd74..3567ba2 100644 (file)
@@ -90,6 +90,13 @@ public:
             return manifest.doesKeyContainValidCollection(key);
         }
 
+        /**
+         * @returns the current separator
+         */
+        std::string getSeparator() const {
+            return manifest.getSeparator();
+        }
+
     private:
         std::unique_lock<cb::ReaderLock> readLock;
         const Manifest& manifest;
@@ -385,6 +392,13 @@ private:
      */
     bool doesKeyContainValidCollection(const ::DocKey& key) const;
 
+    /**
+     * @returns the current separator
+     */
+    std::string getSeparator() const {
+        return separator;
+    }
+
 protected:
     /**
      * Add a collection entry to the manifest specifing the revision that it was
index d1153ab..411e0ea 100644 (file)
@@ -449,7 +449,7 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
     }
 
     Item* itmCpy = nullptr;
-    auto* mutationResponse = dynamic_cast<MutationResponse*>(resp);
+    auto* mutationResponse = dynamic_cast<MutationProducerResponse*>(resp);
     if (mutationResponse != nullptr) {
         try {
             itmCpy = mutationResponse->getItemCopy();
index 9abdfcb..6264dbc 100644 (file)
@@ -393,6 +393,29 @@ private:
 };
 
 /**
+ * Extend MutationResponse for the DcpProducer only so that it can store
+ * the collection length which we replicate to all collection-aware consumers.
+ */
+class MutationProducerResponse : public MutationResponse {
+public:
+    MutationProducerResponse(queued_item item,
+                             uint32_t opaque,
+                             bool isKeyOnly,
+                             uint8_t _collectionLen,
+                             ExtendedMetaData* e = NULL)
+        : MutationResponse(item, opaque, isKeyOnly, e),
+          collectionLen(_collectionLen) {
+    }
+
+    uint8_t getCollectionLen() const {
+        return collectionLen;
+    }
+
+private:
+    uint8_t collectionLen;
+};
+
+/**
  * SystemEventMessage defines the interface required by consumer and producer
  * message classes.
  */
index 64daeb9..cf3204f 100644 (file)
@@ -299,6 +299,9 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
         itemsReady.store(true);
         // lock is released on leaving the scope
     }
+
+    // Finally obtain a copy of the current separator
+    currentSeparator = vbucket->getManifest().lock().getSeparator();
 }
 
 ActiveStream::~ActiveStream() {
@@ -780,6 +783,9 @@ DcpResponse* ActiveStream::nextQueuedItem() {
                     itemsFromMemoryPhase++;
                 }
             }
+
+            // See if the currentSeparator needs changing
+            maybeChangeSeparator(response);
             popFromReadyQ();
             return response;
         }
@@ -889,7 +895,9 @@ void ActiveStream::getOutstandingItems(VBucketPtr &vb,
 std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
         queued_item& item) {
     if (item->getOperation() != queue_op::system_event) {
-        return std::make_unique<MutationResponse>(item, opaque_, isKeyOnly());
+        auto cKey = Collections::DocKey::make(item->getKey(), currentSeparator);
+        return std::make_unique<MutationProducerResponse>(
+                item, opaque_, isKeyOnly(), cKey.getCollectionLen());
     } else {
         return SystemEventProducerMessage::make(opaque_, item);
     }
@@ -1442,6 +1450,16 @@ void ActiveStream::dropCheckpointCursor_UNLOCKED()
     vbucket->checkpointManager.removeCursor(name_);
 }
 
+void ActiveStream::maybeChangeSeparator(DcpResponse* response) {
+    if (response->getEvent() == DcpResponse::Event::SystemEvent) {
+        auto se = static_cast<SystemEventProducerMessage*>(response);
+        if (se->getSystemEvent() == SystemEvent::CollectionsSeparatorChanged) {
+            currentSeparator =
+                    std::string(se->getKey().data(), se->getKey().size());
+        }
+    }
+}
+
 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                                const std::string &name, uint32_t flags,
                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
index dacaa2b..d7646a2 100644 (file)
@@ -278,11 +278,16 @@ public:
        in-memory to backfilling */
     void handleSlowStream();
 
-    /// @Returns true if keyOnly is true and false if KeyOnly is false
+    /// @returns true if keyOnly is true and false if KeyOnly is false
     bool isKeyOnly() const {
         return keyOnly;
     }
 
+    /// @returns a copy of the current collections separator.
+    std::string getCurrentSeparator() const {
+        return currentSeparator;
+    }
+
 protected:
     // Returns the outstanding items for the stream's checkpoint cursor.
     void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
@@ -306,6 +311,14 @@ protected:
      */
     void transitionState(StreamState newState);
 
+    /**
+     * Check to see if the response is a CollectionsSeparatorChanged event
+     * which would update the separator.
+     *
+     * @param response A DcpResponse that is about to be sent to a client
+     */
+    void maybeChangeSeparator(DcpResponse* response);
+
     /* Indicates that a backfill has been scheduled and has not yet completed.
      * Is protected (as opposed to private) for testing purposes.
      */
@@ -415,6 +428,15 @@ private:
     // value or just the key
     bool keyOnly;
 
+    /**
+     * A copy of the collections separator so we can generate MutationResponse
+     * instances that embed the collection/document-name data so we can
+     * replicate that collection information (as a length).
+     *
+     * As checkpoints/backfills are processed, we will monitor for
+     * CollectionsSeparatorChanged events and update the copy accordingly.
+     */
+    std::string currentSeparator;
 };
 
 
index b71ef8c..332b529 100644 (file)
@@ -59,9 +59,10 @@ std::unique_ptr<Item> SystemEventFactory::make(SystemEvent se,
     case SystemEvent::CollectionsSeparatorChanged: {
         // CollectionSeparatorChanged SystemEvent results in:
         // An update to the persisted collection manifest (updating the
-        // "separator" field).
-        // No document is persisted.
-        key = Collections::SeparatorChangedKey + keyExtra;
+        // "separator" field) and a document is persisted.
+        // Note: the key of this document is fixed so only 1 document exists
+        // regardless of the changes made.
+        key = Collections::SeparatorChangedKey;
         break;
     }
     }
@@ -87,17 +88,14 @@ ProcessStatus SystemEventFlush::process(const queued_item& item) {
     switch (SystemEvent(item->getFlags())) {
     case SystemEvent::CreateCollection:
     case SystemEvent::DeleteCollectionHard:
-    case SystemEvent::DeleteCollectionSoft: {
-        // CreateCollection updates the manifest and writes a SystemEvent
-        // DeleteCollection* both update the manifest and write a SystemEvent
-        saveCollectionsManifestItem(item);
-        return ProcessStatus::Continue;
-    }
-    case SystemEvent::BeginDeleteCollection:
+    case SystemEvent::DeleteCollectionSoft:
     case SystemEvent::CollectionsSeparatorChanged: {
-        // These two will update the manifest but should not write an Item
-        saveCollectionsManifestItem(item);
-        return ProcessStatus::Skip;
+        saveCollectionsManifestItem(item); // Updates manifest
+        return ProcessStatus::Continue; // And flushes an item
+    }
+    case SystemEvent::BeginDeleteCollection: {
+        saveCollectionsManifestItem(item); // Updates manifest
+        return ProcessStatus::Skip; // But skips flushing the item
     }
     }
 
@@ -107,18 +105,20 @@ ProcessStatus SystemEventFlush::process(const queued_item& item) {
 
 bool SystemEventFlush::isUpsert(const Item& item) {
     if (item.getOperation() == queue_op::system_event) {
-        // CreateCollection and DeleteCollection* are the only valid events.
+        // CreateCollection, CollectionsSeparatorChanged and DeleteCollection*
+        // are the only valid events.(returning true/false)
         // The ::process function should of skipped BeginDeleteCollection and
-        // CollectionsSeparatorChanged
+        // thus is an error if calling this method with such an event.
         switch (SystemEvent(item.getFlags())) {
-        case SystemEvent::CreateCollection: {
+        case SystemEvent::CreateCollection:
+        case SystemEvent::CollectionsSeparatorChanged: {
             return true;
         }
-        case SystemEvent::BeginDeleteCollection:
-        case SystemEvent::CollectionsSeparatorChanged: {
-            throw std::logic_error("SystemEventFlush::isUpsert event " +
-                                   to_string(SystemEvent(item.getFlags())) +
-                                   " should neither delete or upsert ");
+        case SystemEvent::BeginDeleteCollection: {
+            throw std::invalid_argument(
+                    "SystemEventFlush::isUpsert event " +
+                    to_string(SystemEvent(item.getFlags())) +
+                    " should neither delete or upsert ");
         }
         case SystemEvent::DeleteCollectionHard:
         case SystemEvent::DeleteCollectionSoft: {
index 534a46d..05adcea 100644 (file)
@@ -40,7 +40,7 @@ enum class SystemEvent : uint32_t {
 
     /**
      * The BeginDeleteCollection system event is generated when a VBucket
-     * receives a manifest that removes a collection. The events's purpose is to
+     * receives a manifest that removes a collection. The event's purpose is to
      * carry data to the flusher so we can persist a new collections JSON
      * manifest that indicates the collection is now in the process of being
      * removed. This is indicated by changing the end-seqno of a collection's
@@ -72,7 +72,8 @@ enum class SystemEvent : uint32_t {
     /**
      * The CollectionsSeparatorChanged system event is generated when a VBucket
      * changes the separator used for identifying collections in keys. This
-     * must result in a vbucket manifest update but no item is stored.
+     * must result in a vbucket manifest update and a SystemEvent document is
+     * stored. All separator changes write to the same SystemEvent document.
      */
     CollectionsSeparatorChanged
 };
index 5669bbc..374f1e1 100644 (file)
@@ -660,6 +660,15 @@ public:
                 *this, separator, revision, bySeqno);
     }
 
+    /**
+     * Get the collection manifest
+     *
+     * @return const reference to the manifest
+     */
+    const Collections::VB::Manifest& getManifest() const {
+        return manifest;
+    }
+
     static const vbucket_state_t ACTIVE;
     static const vbucket_state_t REPLICA;
     static const vbucket_state_t PENDING;
index 20562a3..0503ceb 100644 (file)
@@ -95,4 +95,13 @@ public:
     SingleThreadedRCPtr<Stream> findStream(uint16_t vbid) {
         return DcpProducer::findStream(vbid);
     }
+
+    std::string getCurrentSeparatorForStream(uint16_t vbid) {
+        auto stream = findStream(vbid);
+        if (stream) {
+            auto as = static_cast<ActiveStream*>(stream.get());
+            return as->getCurrentSeparator();
+        }
+        return {};
+    }
 };
index a71706e..78f17f3 100644 (file)
@@ -495,12 +495,13 @@ public:
 TEST_F(CollectionsWarmupTest, warmup) {
     VBucketPtr vb = store->getVBucket(vbid);
 
-    // Add the meat collection
+    // Add the meat collection *and* change the separator
     vb->updateFromManifest(
         {R"({"revision":1,"separator":"-+-","collections":["$default","meat"]})"});
 
-    // Trigger a flush to disk. Flushes the meat create event and 1 item
-    flush_vbucket_to_disk(vbid, 1);
+    // Trigger a flush to disk. Flushes the meat create event and a separator
+    // changed event.
+    flush_vbucket_to_disk(vbid, 2);
 
     // Now we can write to beef
     store_item(vbid, {"meat-+-beef", DocNamespace::Collections}, "value");
@@ -789,9 +790,13 @@ TEST_F(CollectionsDcpTest, test_dcp_separator) {
     // Step which will notify the snapshot task
     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
 
+    // The producer should start with the old separator
+    EXPECT_EQ("::", producer->getCurrentSeparatorForStream(vbid));
+
     EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
 
     // Now call run on the snapshot task to move checkpoint into DCP stream
+    // this will trigger the stream to update the separator
     producer->getCheckpointSnapshotTask().run();
 
     // Next step which should process a snapshot marker
@@ -799,9 +804,18 @@ TEST_F(CollectionsDcpTest, test_dcp_separator) {
 
     VBucketPtr replica = store->getVBucket(replicaVB);
 
+    // The replica should have the :: separator
+    EXPECT_EQ("::", replica->lockCollections().getSeparator());
+
     // Now step the producer to transfer the separator
     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
 
+    // The producer should now have the new separator
+    EXPECT_EQ("@@", producer->getCurrentSeparatorForStream(vbid));
+
+    // The replica should now have the new separator
+    EXPECT_EQ("@@", replica->lockCollections().getSeparator());
+
     // Now step the producer to transfer the collection
     EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
 
@@ -812,3 +826,63 @@ TEST_F(CollectionsDcpTest, test_dcp_separator) {
     // And done
     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
 }
+
+TEST_F(CollectionsDcpTest, test_dcp_separator_many) {
+    auto vb = store->getVBucket(vbid);
+
+    // Change the separator
+    vb->updateFromManifest(
+            {R"({"revision":1,"separator":"@@","collections":["$default"]})"});
+    // Change the separator
+    vb->updateFromManifest(
+            {R"({"revision":2,"separator":":","collections":["$default"]})"});
+    // Change the separator
+    vb->updateFromManifest(
+            {R"({"revision":3,"separator":",","collections":["$default"]})"});
+    // Add a collection
+    vb->updateFromManifest(
+            {R"({"revision":4,"separator":",","collections":["$default","meat"]})"});
+
+    // All the changes will be collapsed into one update and we will expect
+    // to see , as the separator once DCP steps through the checkpoint
+    producer->notifySeqnoAvailable(vb->getId(), vb->getHighSeqno());
+
+    // Step which will notify the snapshot task
+    EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
+
+    // The producer should start with the initial separator
+    EXPECT_EQ("::", producer->getCurrentSeparatorForStream(vbid));
+
+    EXPECT_EQ(1, producer->getCheckpointSnapshotTask().queueSize());
+
+    // Now call run on the snapshot task to move checkpoint into DCP stream
+    // this will trigger the stream to update the separator
+    producer->getCheckpointSnapshotTask().run();
+
+    // Next step which should process a snapshot marker
+    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
+
+    auto replica = store->getVBucket(replicaVB);
+
+    // The replica should have the :: separator
+    EXPECT_EQ("::", replica->lockCollections().getSeparator());
+
+    // Now step the producer to transfer the separator
+    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
+
+    // The producer should now have the new separator
+    EXPECT_EQ(",", producer->getCurrentSeparatorForStream(vbid));
+
+    // The replica should now have the new separator
+    EXPECT_EQ(",", replica->lockCollections().getSeparator());
+
+    // Now step the producer to transfer the collection
+    EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
+
+    // Collection should now be live on the replica with the final separator
+    EXPECT_TRUE(replica->lockCollections().doesKeyContainValidCollection(
+            {"meat,bacon", DocNamespace::Collections}));
+
+    // And done
+    EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
+}