MB-16181: Add collections.set_manifest support 36/77436/16
authorJim Walker <jim@couchbase.com>
Fri, 10 Mar 2017 16:34:47 +0000 (16:34 +0000)
committerDave Rigby <daver@couchbase.com>
Tue, 23 May 2017 16:49:07 +0000 (16:49 +0000)
Add a method which will accept the new manifest and apply it to
all active vbuckets.

The latest manifest is saved in memory and also used for when any VB
is set to active

Change-Id: Ic6a339bc5af279d105b679f528ff3675d1f16ac7
Reviewed-on: http://review.couchbase.org/77436
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
CMakeLists.txt
src/collections/manager.cc [new file with mode: 0644]
src/collections/manager.h [new file with mode: 0644]
src/ep_engine.cc
src/kv_bucket.cc
src/kv_bucket.h
src/kv_bucket_iface.h
tests/module_tests/collections/evp_store_collections_test.cc

index 60d7d24..2dc99c8 100644 (file)
@@ -151,7 +151,8 @@ SET(OBJECTREGISTRY_SOURCE src/objectregistry.cc)
 SET(CONFIG_SOURCE src/configuration.cc
   ${CMAKE_CURRENT_BINARY_DIR}/src/generated_configuration.cc)
 
-SET(COLLECTIONS_SOURCE src/collections/manifest.cc
+SET(COLLECTIONS_SOURCE src/collections/manager.cc
+                       src/collections/manifest.cc
                        src/collections/vbucket_manifest.cc
                        src/collections/vbucket_manifest_entry.cc)
 
diff --git a/src/collections/manager.cc b/src/collections/manager.cc
new file mode 100644 (file)
index 0000000..88f79ad
--- /dev/null
@@ -0,0 +1,76 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "collections/manager.h"
+#include "collections/manifest.h"
+#include "kv_bucket.h"
+#include "vbucket.h"
+
+Collections::Manager::Manager() : current(std::make_unique<Manifest>()) {
+}
+
+cb::engine_error Collections::Manager::update(KVBucket& bucket,
+                                              const std::string& json) {
+    std::unique_lock<std::mutex> ul(lock, std::try_to_lock);
+    if (!ul.owns_lock()) {
+        // Make concurrent updates fail, in realiy there should only be one
+        // admin connection making changes.
+        return cb::engine_error(cb::engine_errc::temporary_failure,
+                                "Collections::Manager::update already locked");
+    }
+
+    std::unique_ptr<Manifest> newManifest;
+    // Construct a newManifest (will throw if JSON was illegal)
+    try {
+        newManifest = std::make_unique<Manifest>(json);
+    } catch (std::exception& e) {
+        LOG(EXTENSION_LOG_NOTICE,
+            "Collections::Manager::update can't construct manifest e.what:%s",
+            e.what());
+        return cb::engine_error(
+                cb::engine_errc::invalid_arguments,
+                "Collections::Manager::update manifest json invalid:" + json);
+    }
+
+    // Validate manifest revision is increasing
+    if (newManifest->getRevision() <= current->getRevision()) {
+        return cb::engine_error(
+                cb::engine_errc::invalid_arguments,
+                "Collections::Manager::update manifest revision:" +
+                        std::to_string(current->getRevision()) + " json:" +
+                        json);
+    }
+
+    current = std::move(newManifest);
+
+    for (int i = 0; i < bucket.getVBuckets().getSize(); i++) {
+        auto vb = bucket.getVBuckets().getBucket(i);
+
+        if (vb && vb->getState() == vbucket_state_active) {
+            vb->updateFromManifest(*current);
+        }
+    }
+
+    return cb::engine_error(cb::engine_errc::success,
+                            "Collections::Manager::update");
+}
+
+void Collections::Manager::update(VBucket& vb) const {
+    // Lock manager updates
+    std::lock_guard<std::mutex> ul(lock);
+    vb.updateFromManifest(*current);
+}
diff --git a/src/collections/manager.h b/src/collections/manager.h
new file mode 100644 (file)
index 0000000..8988684
--- /dev/null
@@ -0,0 +1,65 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include <memcached/engine_error.h>
+
+#include <memory>
+#include <mutex>
+
+class KVBucket;
+class VBucket;
+
+namespace Collections {
+
+class Manifest;
+
+/**
+ * Collections::Manager provides some bucket level management functions
+ * such as the code which enables the MCBP set_collections command.
+ */
+class Manager {
+public:
+    Manager();
+
+    /**
+     * Update the bucket with the latest JSON collections manifest.
+     *
+     * Locks the Manager and prevents concurrent updates, concurrent updates
+     * are failed with TMPFAIL as in reality there should be 1 admin connection.
+     *
+     * @param bucket the bucket receiving a set-collections command.
+     * @param json the json manifest form a set-collections command.
+     * @returns engine_error indicating why the update failed.
+     */
+    cb::engine_error update(KVBucket& bucket, const std::string& json);
+
+    /**
+     * Update the vbucket's manifest with the current Manifest
+     * The Manager is locked to prevent current changing whilst this update
+     * occurs.
+     */
+    void update(VBucket& vb) const;
+
+private:
+    mutable std::mutex lock;
+
+    /// Store the most recent (current) manifest received
+    std::unique_ptr<Manifest> current;
+};
+}
\ No newline at end of file
index adc936b..5557245 100644 (file)
@@ -1764,6 +1764,12 @@ static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
     return callback(cookie, config, len);
 }
 
+static cb::engine_error EvpCollectionsSetManifest(ENGINE_HANDLE* handle,
+                                                  cb::const_char_buffer json) {
+    auto engine = acquireEngine(handle);
+    return engine->getKVBucket()->setCollections(json);
+}
+
 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
     va_list va;
     va_start(va, fmt);
@@ -1832,6 +1838,7 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(
     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
     ENGINE_HANDLE_V1::dcp.system_event = EvpDcpSystemEvent;
     ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
+    ENGINE_HANDLE_V1::collections.set_manifest = EvpCollectionsSetManifest;
 
     serverApi = getServerApiFunc();
     memset(&info, 0, sizeof(info));
index 6ab13d1..6acb324 100644 (file)
@@ -34,6 +34,7 @@
 
 #include "access_scanner.h"
 #include "checkpoint_remover.h"
+#include "collections/manager.h"
 #include "conflict_resolution.h"
 #include "connmap.h"
 #include "dcp/dcpconnmap.h"
@@ -351,7 +352,8 @@ KVBucket::KVBucket(EventuallyPersistentEngine& theEngine)
       bgFetchDelay(0),
       backfillMemoryThreshold(0.95),
       statsSnapshotTaskId(0),
-      lastTransTimePerItem(0) {
+      lastTransTimePerItem(0),
+      collectionsManager(std::make_unique<Collections::Manager>()) {
     cachedResidentRatio.activeRatio.store(0);
     cachedResidentRatio.replicaRatio.store(0);
 
@@ -818,6 +820,11 @@ ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(
         // Before adding the VB to the map increment the revision
         getRWUnderlying(vbid)->incrementRevision(vbid);
 
+        // If active, update the VB from the bucket's collection state
+        if (to == vbucket_state_active) {
+            collectionsManager->update(*newvb);
+        }
+
         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
             return ENGINE_ERANGE;
         }
@@ -2791,3 +2798,13 @@ void KVBucket::initializeExpiryPager(Configuration& config) {
     config.addValueChangedListener("exp_pager_initial_run_time",
                                    new EPStoreValueChangeListener(*this));
 }
+
+cb::engine_error KVBucket::setCollections(cb::const_char_buffer json) {
+    // cJSON can't accept a size so we must create a string
+    std::string manifest(json.data(), json.size());
+
+    // Inhibit VB state changes whilst updating the vbuckets
+    LockHolder lh(vbsetMutex);
+
+    return collectionsManager->update(*this, manifest);
+}
index 5283021..beffd41 100644 (file)
@@ -30,6 +30,9 @@
 #include <deque>
 
 class VBucketCountVisitor;
+namespace Collections {
+class Manager;
+}
 
 /**
  * VBucket visitor callback adaptor.
@@ -740,6 +743,12 @@ public:
             uint64_t maxCas = 0,
             const std::string& collectionsManifest = "") = 0;
 
+    /**
+     * Method to handle set_collections commands
+     * @param json a buffer containing a JSON manifest to apply to the bucket
+     */
+    cb::engine_error setCollections(cb::const_char_buffer json);
+
 protected:
     // During the warmup phase we might want to enable external traffic
     // at a given point in time.. The LoadStorageKvPairCallback will be
@@ -860,6 +869,8 @@ protected:
     std::mutex compactionLock;
     std::list<CompTaskEntry> compactionTasks;
 
+    std::unique_ptr<Collections::Manager> collectionsManager;
+
     friend class KVBucketTest;
 
     DISALLOW_COPY_AND_ASSIGN(KVBucket);
index b3d0422..edf965c 100644 (file)
@@ -822,6 +822,12 @@ public:
      */
     virtual size_t getNumPersistedDeletes(uint16_t vbid) = 0;
 
+    /**
+     * Method to handle set_collections commands
+     * @param json a buffer containing a JSON manifest to apply to the bucket
+     */
+    virtual cb::engine_error setCollections(cb::const_char_buffer json) = 0;
+
 protected:
 
     // Methods called during warmup
index 8e6ae79..a652769 100644 (file)
@@ -880,3 +880,69 @@ TEST_F(CollectionsDcpTest, test_dcp_separator_many) {
     // And done
     EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
 }
+
+class CollectionsManagerTest : public CollectionsTest {};
+
+/**
+ * Test checks that setCollections propagates the collection data to active
+ * vbuckets.
+ */
+TEST_F(CollectionsManagerTest, basic) {
+    // Add some more VBuckets just so there's some iteration happening
+    const int extraVbuckets = 2;
+    for (int vb = vbid + 1; vb <= (vbid + extraVbuckets); vb++) {
+        store->setVBucketState(vb, vbucket_state_active, false);
+    }
+
+    store->setCollections(
+            {R"({"revision":1,"separator":"@@","collections":["$default", "meat"]})"});
+
+    // Check all vbuckets got the collections
+    for (int vb = vbid; vb <= (vbid + extraVbuckets); vb++) {
+        auto vbp = store->getVBucket(vb);
+        EXPECT_EQ("@@", vbp->lockCollections().getSeparator());
+        EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+                {"meat@@bacon", DocNamespace::Collections}));
+        EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+                {"anykey", DocNamespace::DefaultCollection}));
+    }
+}
+
+/**
+ * Test checks that setCollections propagates the collection data to active
+ * vbuckets and not the replicas
+ */
+TEST_F(CollectionsManagerTest, basic2) {
+    // Add some more VBuckets just so there's some iteration happening
+    const int extraVbuckets = 2;
+    // Add active and replica
+    for (int vb = vbid + 1; vb <= (vbid + extraVbuckets); vb++) {
+        if (vb & 1) {
+            store->setVBucketState(vb, vbucket_state_active, false);
+        } else {
+            store->setVBucketState(vb, vbucket_state_replica, false);
+        }
+    }
+
+    store->setCollections(
+            {R"({"revision":1,"separator":"@@","collections":["$default", "meat"]})"});
+
+    // Check all vbuckets got the collections
+    for (int vb = vbid; vb <= (vbid + extraVbuckets); vb++) {
+        auto vbp = store->getVBucket(vb);
+        if (vbp->getState() == vbucket_state_active) {
+            EXPECT_EQ("@@", vbp->lockCollections().getSeparator());
+            EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+                    {"meat@@bacon", DocNamespace::Collections}));
+            EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+                    {"anykey", DocNamespace::DefaultCollection}));
+        } else {
+            // Replica will be in default constructed settings
+            EXPECT_EQ("::", vbp->lockCollections().getSeparator());
+            EXPECT_FALSE(vbp->lockCollections().doesKeyContainValidCollection(
+                    {"meat@@bacon", DocNamespace::Collections}));
+            EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+                    {"anykey", DocNamespace::DefaultCollection}));
+        }
+    }
+}