MB-16181: Transfer the collection name over DCP 35/78135/12
authorJim Walker <jim@couchbase.com>
Tue, 21 Mar 2017 15:15:16 +0000 (15:15 +0000)
committerDave Rigby <daver@couchbase.com>
Tue, 23 May 2017 17:58:05 +0000 (17:58 +0000)
The DCP mutation/deletion callbacks now take a collection_len field,
the data in this field will be sent over DCP streams when a client
has signalled they want collection-aware DCP.

For example "dairy::cheese" will set a collection length of 5,
default collection documents, set a collection length of 0.

Change-Id: I303d9b18bc5d0fd0968708d84e461ee59577c003
Reviewed-on: http://review.couchbase.org/78135
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
CMakeLists.txt
src/dcp/producer.cc
src/ep_engine.cc
src/ep_engine.h
src/stats.cc [new file with mode: 0644]
tests/mock/mock_dcp.cc

index 2dc99c8..1f8eeb0 100644 (file)
@@ -211,6 +211,7 @@ ADD_LIBRARY(ep_objs OBJECT
             src/replicationthrottle.cc
             src/linked_list.cc
             src/seqlist.cc
+            src/stats.cc
             src/string_utils.cc
             src/storeddockey.cc
             src/stored-value.cc
index 9483d49..1626e1c 100644 (file)
@@ -517,7 +517,7 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
                     meta.first,
                     meta.second,
                     mutationResponse->getItem()->getNRUValue(),
-                    0);
+                    mutationResponse->getCollectionLen());
             break;
         }
         case DcpResponse::Event::Deletion:
@@ -538,7 +538,7 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
                                       mutationResponse->getRevSeqno(),
                                       meta.first,
                                       meta.second,
-                                      0);
+                                      mutationResponse->getCollectionLen());
             break;
         }
         case DcpResponse::Event::SnapshotMarker:
index 5557245..5fb3af9 100644 (file)
@@ -6349,55 +6349,3 @@ void EpEngineTaskable::logRunTime(TaskId id,
                                   const ProcessClock::duration runTime) {
     myEngine->getKVBucket()->logRunTime(id, runTime);
 }
-
-void EPStats::memAllocated(size_t sz) {
-    if (isShutdown) {
-        return;
-    }
-
-    if (localMemCounter.get() == nullptr) {
-        // this HAS to be a non-bucket allocation
-        // or else the callbacks would try to call this
-        // function again & it would become an infinite loop
-        SystemAllocationGuard system_alloc_guard;
-        localMemCounter.set(new TLMemCounter());
-    }
-
-    if (0 == sz) {
-        return;
-    }
-
-    localMemCounter.get()->used += sz;
-    mergeMemCounter();
-}
-
-void EPStats::memDeallocated(size_t sz) {
-    if (isShutdown) {
-        return;
-    }
-
-    if (localMemCounter.get() == nullptr) {
-        // this HAS to be a non-bucket allocation
-        // or else the callbacks would try to call this
-        // function again & it would become an infinite loop
-        SystemAllocationGuard system_alloc_guard;
-        localMemCounter.set(new TLMemCounter());
-    }
-
-    if (0 == sz) {
-        return;
-    }
-
-    localMemCounter.get()->used -= sz;
-    mergeMemCounter();
-}
-
-void EPStats::mergeMemCounter(bool force) {
-    auto& counter = *(localMemCounter.get());
-    counter.count++;
-    if (force || counter.count % mem_merge_count_threshold == 0 ||
-        std::abs(counter.used) > (long)mem_merge_bytes_threshold) {
-        totalMemory->fetch_add(counter.used);
-        counter.used = 0;
-    }
-}
index 3d6eaae..6d384da 100644 (file)
@@ -373,6 +373,14 @@ public:
         return isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_XATTR);
     }
 
+    bool isCollectionsSupported(const void* cookie) {
+        EventuallyPersistentEngine* epe =
+                ObjectRegistry::onSwitchThread(NULL, true);
+        bool isSupported = serverApi->cookie->is_collections_supported(cookie);
+        ObjectRegistry::onSwitchThread(epe);
+        return isSupported;
+    }
+
     uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
         uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
diff --git a/src/stats.cc b/src/stats.cc
new file mode 100644 (file)
index 0000000..ba4bd4c
--- /dev/null
@@ -0,0 +1,70 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "stats.h"
+
+void EPStats::memAllocated(size_t sz) {
+    if (isShutdown) {
+        return;
+    }
+
+    if (localMemCounter.get() == nullptr) {
+        // this HAS to be a non-bucket allocation
+        // or else the callbacks would try to call this
+        // function again & it would become an infinite loop
+        SystemAllocationGuard system_alloc_guard;
+        localMemCounter.set(new TLMemCounter());
+    }
+
+    if (0 == sz) {
+        return;
+    }
+
+    localMemCounter.get()->used += sz;
+    mergeMemCounter();
+}
+
+void EPStats::memDeallocated(size_t sz) {
+    if (isShutdown) {
+        return;
+    }
+
+    if (localMemCounter.get() == nullptr) {
+        // this HAS to be a non-bucket allocation
+        // or else the callbacks would try to call this
+        // function again & it would become an infinite loop
+        SystemAllocationGuard system_alloc_guard;
+        localMemCounter.set(new TLMemCounter());
+    }
+
+    if (0 == sz) {
+        return;
+    }
+
+    localMemCounter.get()->used -= sz;
+    mergeMemCounter();
+}
+
+void EPStats::mergeMemCounter(bool force) {
+    auto& counter = *(localMemCounter.get());
+    counter.count++;
+    if (force || counter.count % mem_merge_count_threshold == 0 ||
+        std::abs(counter.used) > (long)mem_merge_bytes_threshold) {
+        totalMemory->fetch_add(counter.used);
+        counter.used = 0;
+    }
+}
\ No newline at end of file
index d583dc7..144706a 100644 (file)
@@ -39,6 +39,7 @@ uint64_t dcp_last_snap_start_seqno;
 uint64_t dcp_last_snap_end_seqno;
 uint64_t dcp_last_byseqno;
 uint64_t dcp_last_revseqno;
+uint8_t dcp_last_collection_len;
 std::string dcp_last_meta;
 std::string dcp_last_value;
 std::string dcp_last_key;
@@ -188,8 +189,17 @@ static ENGINE_ERROR_CODE mock_mutation(const void* cookie,
     dcp_last_value.assign(static_cast<const char*>(item->getData()),
                           item->getNBytes());
     dcp_last_nru = nru;
-    dcp_last_packet_size = 55 + dcp_last_key.length() +
+
+    // @todo: MB-24391: We are querying the header length with collections
+    // off, which if we extended our testapp tests to do collections may not be
+    // correct. For now collections testing is done via GTEST tests and isn't
+    // reliant on dcp_last_packet_size so this doesn't cause any problems.
+    dcp_last_packet_size =
+            protocol_binary_request_dcp_mutation::getHeaderLength(false);
+    dcp_last_packet_size = dcp_last_packet_size + dcp_last_key.length() +
                            item->getNBytes() + nmeta;
+
+    dcp_last_collection_len = collectionLen;
     if (engine_handle_v1 && engine_handle) {
         engine_handle_v1->release(engine_handle, NULL, item);
     }
@@ -216,10 +226,15 @@ static ENGINE_ERROR_CODE mock_deletion(const void* cookie,
     dcp_last_byseqno = by_seqno;
     dcp_last_revseqno = rev_seqno;
     dcp_last_meta.assign(static_cast<const char*>(meta), nmeta);
-    dcp_last_packet_size =
-            42 + dcp_last_key.length() + item->getNBytes() + nmeta;
+
+    // @todo: MB-24391 as above.
+    dcp_last_packet_size = dcp_last_key.length() + item->getNBytes() + nmeta;
+    dcp_last_packet_size +=
+            protocol_binary_request_dcp_deletion::getHeaderLength(false);
+
     dcp_last_value.assign(static_cast<const char*>(item->getData()),
                           item->getNBytes());
+    dcp_last_collection_len = collectionLen;
 
     if (engine_handle_v1 && engine_handle) {
         engine_handle_v1->release(engine_handle, nullptr, item);