SET(CONFIG_SOURCE src/configuration.cc
${CMAKE_CURRENT_BINARY_DIR}/src/generated_configuration.cc)
-SET(COLLECTIONS_SOURCE src/collections/manifest.cc
+SET(COLLECTIONS_SOURCE src/collections/manager.cc
+ src/collections/manifest.cc
src/collections/vbucket_manifest.cc
src/collections/vbucket_manifest_entry.cc)
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2017 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "collections/manager.h"
+#include "collections/manifest.h"
+#include "kv_bucket.h"
+#include "vbucket.h"
+
+Collections::Manager::Manager() : current(std::make_unique<Manifest>()) {
+}
+
+cb::engine_error Collections::Manager::update(KVBucket& bucket,
+ const std::string& json) {
+ std::unique_lock<std::mutex> ul(lock, std::try_to_lock);
+ if (!ul.owns_lock()) {
+ // Make concurrent updates fail, in realiy there should only be one
+ // admin connection making changes.
+ return cb::engine_error(cb::engine_errc::temporary_failure,
+ "Collections::Manager::update already locked");
+ }
+
+ std::unique_ptr<Manifest> newManifest;
+ // Construct a newManifest (will throw if JSON was illegal)
+ try {
+ newManifest = std::make_unique<Manifest>(json);
+ } catch (std::exception& e) {
+ LOG(EXTENSION_LOG_NOTICE,
+ "Collections::Manager::update can't construct manifest e.what:%s",
+ e.what());
+ return cb::engine_error(
+ cb::engine_errc::invalid_arguments,
+ "Collections::Manager::update manifest json invalid:" + json);
+ }
+
+ // Validate manifest revision is increasing
+ if (newManifest->getRevision() <= current->getRevision()) {
+ return cb::engine_error(
+ cb::engine_errc::invalid_arguments,
+ "Collections::Manager::update manifest revision:" +
+ std::to_string(current->getRevision()) + " json:" +
+ json);
+ }
+
+ current = std::move(newManifest);
+
+ for (int i = 0; i < bucket.getVBuckets().getSize(); i++) {
+ auto vb = bucket.getVBuckets().getBucket(i);
+
+ if (vb && vb->getState() == vbucket_state_active) {
+ vb->updateFromManifest(*current);
+ }
+ }
+
+ return cb::engine_error(cb::engine_errc::success,
+ "Collections::Manager::update");
+}
+
+void Collections::Manager::update(VBucket& vb) const {
+ // Lock manager updates
+ std::lock_guard<std::mutex> ul(lock);
+ vb.updateFromManifest(*current);
+}
--- /dev/null
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Copyright 2017 Couchbase, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memcached/engine_error.h>
+
+#include <memory>
+#include <mutex>
+
+class KVBucket;
+class VBucket;
+
+namespace Collections {
+
+class Manifest;
+
+/**
+ * Collections::Manager provides some bucket level management functions
+ * such as the code which enables the MCBP set_collections command.
+ */
+class Manager {
+public:
+ Manager();
+
+ /**
+ * Update the bucket with the latest JSON collections manifest.
+ *
+ * Locks the Manager and prevents concurrent updates, concurrent updates
+ * are failed with TMPFAIL as in reality there should be 1 admin connection.
+ *
+ * @param bucket the bucket receiving a set-collections command.
+ * @param json the json manifest form a set-collections command.
+ * @returns engine_error indicating why the update failed.
+ */
+ cb::engine_error update(KVBucket& bucket, const std::string& json);
+
+ /**
+ * Update the vbucket's manifest with the current Manifest
+ * The Manager is locked to prevent current changing whilst this update
+ * occurs.
+ */
+ void update(VBucket& vb) const;
+
+private:
+ mutable std::mutex lock;
+
+ /// Store the most recent (current) manifest received
+ std::unique_ptr<Manifest> current;
+};
+}
\ No newline at end of file
return callback(cookie, config, len);
}
+static cb::engine_error EvpCollectionsSetManifest(ENGINE_HANDLE* handle,
+ cb::const_char_buffer json) {
+ auto engine = acquireEngine(handle);
+ return engine->getKVBucket()->setCollections(json);
+}
+
void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
va_list va;
va_start(va, fmt);
ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
ENGINE_HANDLE_V1::dcp.system_event = EvpDcpSystemEvent;
ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
+ ENGINE_HANDLE_V1::collections.set_manifest = EvpCollectionsSetManifest;
serverApi = getServerApiFunc();
memset(&info, 0, sizeof(info));
#include "access_scanner.h"
#include "checkpoint_remover.h"
+#include "collections/manager.h"
#include "conflict_resolution.h"
#include "connmap.h"
#include "dcp/dcpconnmap.h"
bgFetchDelay(0),
backfillMemoryThreshold(0.95),
statsSnapshotTaskId(0),
- lastTransTimePerItem(0) {
+ lastTransTimePerItem(0),
+ collectionsManager(std::make_unique<Collections::Manager>()) {
cachedResidentRatio.activeRatio.store(0);
cachedResidentRatio.replicaRatio.store(0);
// Before adding the VB to the map increment the revision
getRWUnderlying(vbid)->incrementRevision(vbid);
+ // If active, update the VB from the bucket's collection state
+ if (to == vbucket_state_active) {
+ collectionsManager->update(*newvb);
+ }
+
if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
return ENGINE_ERANGE;
}
config.addValueChangedListener("exp_pager_initial_run_time",
new EPStoreValueChangeListener(*this));
}
+
+cb::engine_error KVBucket::setCollections(cb::const_char_buffer json) {
+ // cJSON can't accept a size so we must create a string
+ std::string manifest(json.data(), json.size());
+
+ // Inhibit VB state changes whilst updating the vbuckets
+ LockHolder lh(vbsetMutex);
+
+ return collectionsManager->update(*this, manifest);
+}
#include <deque>
class VBucketCountVisitor;
+namespace Collections {
+class Manager;
+}
/**
* VBucket visitor callback adaptor.
uint64_t maxCas = 0,
const std::string& collectionsManifest = "") = 0;
+ /**
+ * Method to handle set_collections commands
+ * @param json a buffer containing a JSON manifest to apply to the bucket
+ */
+ cb::engine_error setCollections(cb::const_char_buffer json);
+
protected:
// During the warmup phase we might want to enable external traffic
// at a given point in time.. The LoadStorageKvPairCallback will be
std::mutex compactionLock;
std::list<CompTaskEntry> compactionTasks;
+ std::unique_ptr<Collections::Manager> collectionsManager;
+
friend class KVBucketTest;
DISALLOW_COPY_AND_ASSIGN(KVBucket);
*/
virtual size_t getNumPersistedDeletes(uint16_t vbid) = 0;
+ /**
+ * Method to handle set_collections commands
+ * @param json a buffer containing a JSON manifest to apply to the bucket
+ */
+ virtual cb::engine_error setCollections(cb::const_char_buffer json) = 0;
+
protected:
// Methods called during warmup
// And done
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
}
+
+class CollectionsManagerTest : public CollectionsTest {};
+
+/**
+ * Test checks that setCollections propagates the collection data to active
+ * vbuckets.
+ */
+TEST_F(CollectionsManagerTest, basic) {
+ // Add some more VBuckets just so there's some iteration happening
+ const int extraVbuckets = 2;
+ for (int vb = vbid + 1; vb <= (vbid + extraVbuckets); vb++) {
+ store->setVBucketState(vb, vbucket_state_active, false);
+ }
+
+ store->setCollections(
+ {R"({"revision":1,"separator":"@@","collections":["$default", "meat"]})"});
+
+ // Check all vbuckets got the collections
+ for (int vb = vbid; vb <= (vbid + extraVbuckets); vb++) {
+ auto vbp = store->getVBucket(vb);
+ EXPECT_EQ("@@", vbp->lockCollections().getSeparator());
+ EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+ {"meat@@bacon", DocNamespace::Collections}));
+ EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+ {"anykey", DocNamespace::DefaultCollection}));
+ }
+}
+
+/**
+ * Test checks that setCollections propagates the collection data to active
+ * vbuckets and not the replicas
+ */
+TEST_F(CollectionsManagerTest, basic2) {
+ // Add some more VBuckets just so there's some iteration happening
+ const int extraVbuckets = 2;
+ // Add active and replica
+ for (int vb = vbid + 1; vb <= (vbid + extraVbuckets); vb++) {
+ if (vb & 1) {
+ store->setVBucketState(vb, vbucket_state_active, false);
+ } else {
+ store->setVBucketState(vb, vbucket_state_replica, false);
+ }
+ }
+
+ store->setCollections(
+ {R"({"revision":1,"separator":"@@","collections":["$default", "meat"]})"});
+
+ // Check all vbuckets got the collections
+ for (int vb = vbid; vb <= (vbid + extraVbuckets); vb++) {
+ auto vbp = store->getVBucket(vb);
+ if (vbp->getState() == vbucket_state_active) {
+ EXPECT_EQ("@@", vbp->lockCollections().getSeparator());
+ EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+ {"meat@@bacon", DocNamespace::Collections}));
+ EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+ {"anykey", DocNamespace::DefaultCollection}));
+ } else {
+ // Replica will be in default constructed settings
+ EXPECT_EQ("::", vbp->lockCollections().getSeparator());
+ EXPECT_FALSE(vbp->lockCollections().doesKeyContainValidCollection(
+ {"meat@@bacon", DocNamespace::Collections}));
+ EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
+ {"anykey", DocNamespace::DefaultCollection}));
+ }
+ }
+}