MB-22738: Move stripping of value from DCP stream back into ep-engine 18/76418/31
authorDaniel Owen <owend@couchbase.com>
Thu, 6 Apr 2017 21:12:28 +0000 (22:12 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 13 Apr 2017 13:29:45 +0000 (13:29 +0000)
A revert of the http://review.couchbase.org/#/c/72398/
with the addition that determining whether whether to retrieve only
is made on the connection level, (as opposed to the stream level).

Change-Id: I641978c2be6c67e6a9d96c0a229ff21688c74055
Reviewed-on: http://review.couchbase.org/76418
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
17 files changed:
src/dcp/backfill_disk.cc
src/dcp/dcpconnmap.cc
src/dcp/dcpconnmap.h
src/dcp/producer.cc
src/dcp/producer.h
src/dcp/response.h
src/dcp/stream.cc
src/dcp/stream.h
src/ep_engine.cc
src/stored-value.cc
src/stored-value.h
tests/mock/mock_dcp_producer.h
tests/mock/mock_stream.h
tests/module_tests/collections/evp_store_collections_test.cc
tests/module_tests/dcp_test.cc
tests/module_tests/evp_store_rollback_test.cc
tests/module_tests/evp_store_test.cc

index 3379ac8..2e3ba7c 100644 (file)
@@ -64,16 +64,19 @@ void CacheCallback::callback(CacheLookup& lookup) {
     if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
         std::unique_ptr<Item> it;
         try {
-            it = v->toItem(false, lookup.getVBucketId());
+            it = stream_->isKeyOnly() ?
+                    v->toItemWithNoValue(lookup.getVBucketId()) :
+                    v->toItem(false, lookup.getVBucketId());
         } catch (const std::bad_alloc&) {
             setStatus(ENGINE_ENOMEM);
             stream_->getLogger().log(
                     EXTENSION_LOG_WARNING,
                     "Alloc error when trying to create an "
                     "item copy from hash table. Item seqno:%" PRIi64
-                    ", vb:%" PRIu16,
+                    ", vb:%" PRIu16 " isKeyOnly:%s",
                     v->getBySeqno(),
-                    lookup.getVBucketId());
+                    lookup.getVBucketId(),
+                    stream_->isKeyOnly() ? "True" : "False");
             return;
         }
         hbl.getHTLock().unlock();
@@ -170,8 +173,12 @@ backfill_status_t DCPBackfillDisk::create() {
 
     KVStore* kvstore = engine.getKVBucket()->getROUnderlying(vbid);
     ValueFilter valFilter = ValueFilter::VALUES_DECOMPRESSED;
-    if (stream->isCompressionEnabled()) {
-        valFilter = ValueFilter::VALUES_COMPRESSED;
+    if (stream->isKeyOnly()) {
+        valFilter = ValueFilter::KEYS_ONLY;
+    } else {
+        if (stream->isCompressionEnabled()) {
+            valFilter = ValueFilter::VALUES_COMPRESSED;
+        }
     }
 
     std::shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
index 5fa9065..694b973 100644 (file)
@@ -122,7 +122,8 @@ ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler& conn,
 
 DcpProducer *DcpConnMap::newProducer(const void* cookie,
                                      const std::string &name,
-                                     bool notifyOnly)
+                                     bool notifyOnly,
+                                     bool isKeyOnly)
 {
     LockHolder lh(connsLock);
 
@@ -153,7 +154,9 @@ DcpProducer *DcpConnMap::newProducer(const void* cookie,
     }
 
     DcpProducer* dcp = new DcpProducer(
-            engine, cookie, conn_name, notifyOnly, true /*startTask*/);
+            engine, cookie, conn_name, notifyOnly, true /*startTask*/,
+            isKeyOnly ? DcpProducer::MutationType::KeyOnly :
+                    DcpProducer::MutationType::KeyAndValue);
     LOG(EXTENSION_LOG_INFO, "%s Connection created", dcp->logHeader());
     map_[cookie] = dcp;
 
index f24c71e..1e5aacb 100644 (file)
@@ -41,9 +41,15 @@ public:
     /**
      * Find or build a dcp connection for the given cookie and with
      * the given name.
+     * @param cookie The cookie representing the client
+     * @param name The name of the connection
+     * @param notifyOnly If true the producer to be created only notifies,
+     *        i.e. no data is sent.
+     * @param isKeyOnly States if items sent from active streams should contain
+     *        only the key.
      */
     DcpProducer *newProducer(const void* cookie, const std::string &name,
-                             bool notifyOnly);
+                             bool notifyOnly, bool isKeyOnly);
 
 
     /**
index e2b54a5..4ff6f8a 100644 (file)
@@ -132,14 +132,16 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine& e,
                          const void* cookie,
                          const std::string& name,
                          bool isNotifier,
-                         bool startTask)
+                         bool startTask,
+                         MutationType mutType)
     : Producer(e, cookie, name),
       rejectResp(NULL),
       notifyOnly(isNotifier),
       lastSendTime(ep_current_time()),
       log(*this),
       itemsSent(0),
-      totalBytesSent(0) {
+      totalBytesSent(0),
+      mutationType(mutType) {
     setSupportAck(true);
     setReserved(true);
     setPaused(true);
@@ -371,7 +373,8 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         s = new ActiveStream(&engine_, this, getName(), flags,
                              opaque, vbucket, start_seqno,
                              end_seqno, vbucket_uuid,
-                             snap_start_seqno, snap_end_seqno);
+                             snap_start_seqno, snap_end_seqno,
+                             (mutationType == MutationType::KeyOnly));
     }
 
     {
index 3ab37ab..ec6cdda 100644 (file)
@@ -29,6 +29,17 @@ class DcpResponse;
 
 class DcpProducer : public Producer {
 public:
+
+/*
+ * MutationType is used to state whether the active streams associated with the
+ * the DCPProducer need to send both the key and value (AllValue) or whether
+ * they can send just the key (KeyOnly).
+ */
+enum class MutationType {
+    KeyOnly,
+    KeyAndValue
+};
+
     /**
      * Construct a DCP Producer
      *
@@ -39,13 +50,16 @@ public:
      *        sent.
      * @param startTask If true an internal checkpoint task is created and
      *        started. Test code may wish to defer or manually handle the task
-     *         creation.
+     *        creation.
+     * @param mutType The MutationType to use for the items sent from the
+     *        DcpProducer.
      */
     DcpProducer(EventuallyPersistentEngine& e,
                 const void* cookie,
                 const std::string& n,
                 bool notifyOnly,
-                bool startTask);
+                bool startTask,
+                MutationType mutType);
 
     ~DcpProducer();
 
@@ -315,6 +329,10 @@ protected:
     ExTask checkpointCreatorTask;
     static const std::chrono::seconds defaultDcpNoopTxInterval;
 
+    // mutationType i.e. KeyOnly or AllValue, used to determine how all
+    // active streams belonging to the DcpProducer should send their data.
+    MutationType mutationType;
+
 };
 
 #endif  // SRC_DCP_PRODUCER_H_
index 3b71076..9abdfcb 100644 (file)
@@ -324,17 +324,17 @@ private:
 
 class MutationResponse : public DcpResponse {
 public:
-    MutationResponse(queued_item item, uint32_t opaque,
+    MutationResponse(queued_item item, uint32_t opaque, bool isKeyOnly,
                      ExtendedMetaData *e = NULL)
         : DcpResponse(item->isDeleted() ? Event::Deletion : Event::Mutation, opaque),
-          item_(item), emd(e) {}
+          item_(item), keyOnly(isKeyOnly), emd(e) {}
 
     queued_item& getItem() {
         return item_;
     }
 
     Item* getItemCopy() {
-        return new Item(*item_);
+        return new Item(*item_, keyOnly);
     }
 
     uint16_t getVBucket() {
@@ -355,7 +355,12 @@ public:
     uint32_t getMessageSize() {
         const uint32_t base = item_->isDeleted() ? deletionBaseMsgBytes :
                         mutationBaseMsgBytes;
-        uint32_t body = item_->getKey().size() + item_->getNBytes();
+
+        uint32_t body = item_->getKey().size();
+        if (!keyOnly) {
+            // Add the size of the value
+            body += item_->getNBytes();
+        }
 
         if (emd) {
             body += emd->getExtMeta().second;
@@ -380,6 +385,10 @@ public:
 
 private:
     queued_item item_;
+
+    // Whether the response should contain the key and value or just the key
+    bool keyOnly;
+
     std::unique_ptr<ExtendedMetaData> emd;
 };
 
index 7bce4dd..85f96c0 100644 (file)
@@ -231,7 +231,8 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
                            uint64_t en_seqno,
                            uint64_t vb_uuid,
                            uint64_t snap_start_seqno,
-                           uint64_t snap_end_seqno)
+                           uint64_t snap_end_seqno,
+                           bool isKeyOnly)
     : Stream(n,
              flags,
              opaque,
@@ -256,7 +257,8 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
       engine(e),
       producer(p),
       lastSentSnapEndSeqno(0),
-      chkptItemsExtractionInProgress(false) {
+      chkptItemsExtractionInProgress(false),
+      keyOnly(isKeyOnly) {
     const char* type = "";
     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
         type = "takeover ";
@@ -887,7 +889,7 @@ void ActiveStream::getOutstandingItems(VBucketPtr &vb,
 std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
         queued_item& item) {
     if (item->getOperation() != queue_op::system_event) {
-        return std::make_unique<MutationResponse>(item, opaque_);
+        return std::make_unique<MutationResponse>(item, opaque_, isKeyOnly());
     } else {
         return SystemEventProducerMessage::make(opaque_, item);
     }
index 79b7906..3be5778 100644 (file)
@@ -219,7 +219,7 @@ public:
                  const std::string &name, uint32_t flags, uint32_t opaque,
                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
                  uint64_t vb_uuid, uint64_t snap_start_seqno,
-                 uint64_t snap_end_seqno);
+                 uint64_t snap_end_seqno, bool isKeyOnly);
 
     ~ActiveStream();
 
@@ -276,6 +276,11 @@ public:
        in-memory to backfilling */
     void handleSlowStream();
 
+    /// @Returns true if keyOnly is true and false if KeyOnly is false
+    bool isKeyOnly() const {
+        return keyOnly;
+    }
+
 protected:
     // Returns the outstanding items for the stream's checkpoint cursor.
     void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
@@ -404,6 +409,10 @@ private:
        items are added to the readyQ */
     std::atomic<bool> chkptItemsExtractionInProgress;
 
+    // Whether the responses sent using this stream should contain the key and
+    // value or just the key
+    bool keyOnly;
+
 };
 
 
index 226ddfe..a38d609 100644 (file)
@@ -6079,10 +6079,11 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(const void* cookie,
     }
 
     ConnHandler *handler = NULL;
+    const bool keyOnly = flags & DCP_OPEN_NO_VALUE;
     if (flags & DCP_OPEN_PRODUCER) {
-        handler = dcpConnMap_->newProducer(cookie, connName, false);
+        handler = dcpConnMap_->newProducer(cookie, connName, false, keyOnly);
     } else if (flags & DCP_OPEN_NOTIFIER) {
-        handler = dcpConnMap_->newProducer(cookie, connName, true);
+        handler = dcpConnMap_->newProducer(cookie, connName, true, keyOnly);
     } else {
         handler = dcpConnMap_->newConsumer(cookie, connName);
     }
index 97a8cbd..2bb5718 100644 (file)
@@ -171,6 +171,23 @@ std::unique_ptr<Item> StoredValue::toItem(bool lck, uint16_t vbucket) const {
     return itm;
 }
 
+std::unique_ptr<Item> StoredValue::toItemWithNoValue(uint16_t vbucket) const {
+    auto itm =
+            std::make_unique<Item>(getKey(),
+                                   getFlags(),
+                                   getExptime(),
+                                   /* valuePtr */ nullptr,
+                                   /* valuelen */ 0,
+                                   /* ext_meta*/ nullptr,
+                                   /* ext_len */ 0,
+                                   getCas(),
+                                   getBySeqno(),
+                                   vbucket,
+                                   getRevSeqno());
+
+    return itm;
+}
+
 void StoredValue::reallocate() {
     // Allocate a new Blob for this stored value; copy the existing Blob to
     // the new one and free the old.
index 5161c96..338cd81 100644 (file)
@@ -531,6 +531,14 @@ public:
     std::unique_ptr<Item> toItem(bool lck, uint16_t vbucket) const;
 
     /**
+     * Generate a new Item with only key and metadata out of this object.
+     * The item generated will not contain value
+     *
+     * @param vbucket the vbucket containing this item.
+     */
+    std::unique_ptr<Item> toItemWithNoValue(uint16_t vbucket) const;
+
+    /**
      * Set the memory threshold on the current bucket quota for accepting a new mutation
      */
     static void setMutationMemoryThreshold(double memThreshold);
index 5562515..20562a3 100644 (file)
@@ -30,8 +30,11 @@ public:
                     const void* cookie,
                     const std::string& name,
                     bool isNotifier,
-                    bool startTask = true)
-        : DcpProducer(theEngine, cookie, name, isNotifier, startTask) {
+                    bool startTask = true,
+                    DcpProducer::MutationType mutationType =
+                            DcpProducer::MutationType::KeyAndValue)
+        : DcpProducer(theEngine, cookie, name,
+                      isNotifier, startTask, mutationType) {
     }
 
     ENGINE_ERROR_CODE maybeDisconnect() {
@@ -85,4 +88,11 @@ public:
         return *static_cast<ActiveStreamCheckpointProcessorTask*>(
                 checkpointCreatorTask.get());
     }
+
+    /**
+     * Finds the stream for a given vbucket
+     */
+    SingleThreadedRCPtr<Stream> findStream(uint16_t vbid) {
+        return DcpProducer::findStream(vbid);
+    }
 };
index 07bf592..5e26a17 100644 (file)
@@ -35,7 +35,8 @@ public:
                      uint64_t en_seqno,
                      uint64_t vb_uuid,
                      uint64_t snap_start_seqno,
-                     uint64_t snap_end_seqno)
+                     uint64_t snap_end_seqno,
+                     bool isKeyOnly = false)
         : ActiveStream(e,
                        p,
                        name,
@@ -46,7 +47,8 @@ public:
                        en_seqno,
                        vb_uuid,
                        snap_start_seqno,
-                       snap_end_seqno) {
+                       snap_end_seqno,
+                       isKeyOnly) {
     }
 
     // Expose underlying protected ActiveStream methods as public
@@ -98,6 +100,11 @@ public:
     int getNumBackfillItemsRemaining() const {
         return backfillRemaining;
     }
+
+    std::unique_ptr<DcpResponse> public_makeResponseFromItem(
+            queued_item& item) {
+        return makeResponseFromItem(item);
+    }
 };
 
 /* Mock of the PassiveStream class. Wraps the real PassiveStream, but exposes
index 0d92a85..a71706e 100644 (file)
@@ -636,7 +636,8 @@ public:
                                        cookieP,
                                        "test_producer",
                                        /*notifyOnly*/ false,
-                                       /*startTask*/ false);
+                                       /*startTask*/ false,
+                                       DcpProducer::MutationType::KeyAndValue);
 
         // Create the task object, but don't schedule
         producer->createCheckpointProcessorTask();
index eeeb691..0249d80 100644 (file)
@@ -107,12 +107,14 @@ protected:
     }
 
     // Setup a DCP producer and attach a stream and cursor to it.
-    void setup_dcp_stream() {
-        producer = new DcpProducer(*engine,
+    void setup_dcp_stream(DcpProducer::MutationType mutationType =
+            DcpProducer::MutationType::KeyAndValue) {
+        producer = new MockDcpProducer(*engine,
                                    /*cookie*/ nullptr,
                                    "test_producer",
                                    /*notifyOnly*/ false,
-                                   /*startTask*/ true);
+                                   /*startTask*/ true,
+                                   mutationType);
         stream = new MockActiveStream(engine, producer,
                                       producer->getName(), /*flags*/0,
                                       /*opaque*/0, vbid,
@@ -120,7 +122,9 @@ protected:
                                       /*en_seqno*/~0,
                                       /*vb_uuid*/0xabcd,
                                       /*snap_start_seqno*/0,
-                                      /*snap_end_seqno*/~0);
+                                      /*snap_end_seqno*/~0,
+                                      mutationType == DcpProducer::MutationType::KeyOnly ?
+                                              true : false);
 
         EXPECT_FALSE(vb0->checkpointManager.registerCursor(
                                                            producer->getName(),
@@ -129,11 +133,126 @@ protected:
             << "Found an existing TAP cursor when attempting to register ours";
     }
 
+    /*
+     * Fake callback emulating dcp_add_failover_log
+     */
+    static ENGINE_ERROR_CODE fakeDcpAddFailoverLog(vbucket_failover_t* entry,
+                                                   size_t nentries,
+                                                   const void *cookie) {
+        return ENGINE_SUCCESS;
+    }
+
     dcp_producer_t producer;
     stream_t stream;
     VBucketPtr vb0;
 };
 
+/*
+ * Test that when have a producer with MutationType set to KeyOnly an active
+ * stream created via a streamRequest returns true for isKeyOnly.
+ */
+TEST_P(StreamTest, test_streamSetMutationTypeKeyOnly) {
+    setup_dcp_stream(DcpProducer::MutationType::KeyOnly);
+    uint64_t rollbackSeqno;
+    auto err = producer->streamRequest(/*flags*/0,
+                                       /*opaque*/0,
+                                       /*vbucket*/0,
+                                       /*start_seqno*/0,
+                                       /*end_seqno*/0,
+                                       /*vb_uuid*/0,
+                                       /*snap_start*/0,
+                                       /*snap_end*/0,
+                                       &rollbackSeqno,
+                                       StreamTest::fakeDcpAddFailoverLog);
+    ASSERT_EQ(ENGINE_SUCCESS, err)
+        << "stream request did not return ENGINE_SUCCESS";
+
+    stream = dynamic_cast<MockDcpProducer*>(producer.get())->findStream(0);
+    EXPECT_TRUE(dynamic_cast<ActiveStream*>(stream.get())->isKeyOnly());
+    producer.get()->closeAllStreams();
+}
+
+/*
+ * Test that when have a producer with MutationType set to KeyAndValue an active
+ * stream created via a streamRequest returns false for isKeyOnly.
+ */
+TEST_P(StreamTest, test_streamSettMutationTypeAllValue) {
+    setup_dcp_stream(DcpProducer::MutationType::KeyAndValue);
+    uint64_t rollbackSeqno;
+    auto err = producer->streamRequest(/*flags*/0,
+                                       /*opaque*/0,
+                                       /*vbucket*/0,
+                                       /*start_seqno*/0,
+                                       /*end_seqno*/0,
+                                       /*vb_uuid*/0,
+                                       /*snap_start*/0,
+                                       /*snap_end*/0,
+                                       &rollbackSeqno,
+                                       StreamTest::fakeDcpAddFailoverLog);
+    ASSERT_EQ(ENGINE_SUCCESS, err)
+        << "stream request did not return ENGINE_SUCCESS";
+
+    stream = dynamic_cast<MockDcpProducer*>(producer.get())->findStream(0);
+    EXPECT_FALSE(dynamic_cast<ActiveStream*>(stream.get())->isKeyOnly());
+    producer.get()->closeAllStreams();
+}
+
+/*
+ * Test for a dcpResponse retrieved from a stream where isKeyOnly is true, that
+ * the message size does not include the size of the body.
+ */
+TEST_P(StreamTest, test_keyOnlyMessageSize) {
+    std::string value("value");
+    uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_DATATYPE_JSON};
+    SingleThreadedRCPtr<Item> item = std::make_unique<Item>(
+            makeStoredDocKey("key"),
+            0,
+            0,
+            value.c_str(),
+            value.size(),
+            ext_meta,
+            sizeof(ext_meta));
+
+    auto keyOnlyMessageSize = MutationResponse::mutationBaseMsgBytes +
+            item.get()->getKey().size();
+    queued_item qi(item);
+
+  setup_dcp_stream(DcpProducer::MutationType::KeyOnly);
+  std::unique_ptr<DcpResponse> dcpResponse =
+          dynamic_cast<MockActiveStream*>(
+                  stream.get())->public_makeResponseFromItem(qi);
+
+  EXPECT_EQ(keyOnlyMessageSize, dcpResponse->getMessageSize());
+}
+
+/*
+ * Test for a dcpResponse retrieved from a stream where isKeyOnly is false, that
+ * the message size includes the size of the body.
+ */
+TEST_P(StreamTest, test_keyAndValueMessageSize) {
+    std::string value("value");
+    uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_DATATYPE_JSON};
+    SingleThreadedRCPtr<Item> item = std::make_unique<Item>(
+            makeStoredDocKey("key"),
+            0,
+            0,
+            value.c_str(),
+            value.size(),
+            ext_meta,
+            sizeof(ext_meta));
+
+    auto keyAndValueMessageSize = MutationResponse::mutationBaseMsgBytes +
+            item.get()->getKey().size() + item->getNBytes();
+    queued_item qi(item);
+
+  setup_dcp_stream(DcpProducer::MutationType::KeyAndValue);
+  std::unique_ptr<DcpResponse> dcpResponse =
+          dynamic_cast<MockActiveStream*>(
+                  stream.get())->public_makeResponseFromItem(qi);
+
+  EXPECT_EQ(keyAndValueMessageSize, dcpResponse->getMessageSize());
+}
+
 /* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
  * queued items to be sent out via a DCP consumer, that nextCheckpointItem()
  * doesn't incorrectly return false (meaning that there are no more checkpoint
@@ -493,7 +612,7 @@ TEST_F(ConnectionTest, test_deadConnections) {
     const void *cookie = create_mock_cookie();
     // Create a new Dcp producer
     dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
-                                    /*notifyOnly*/false);
+                                    /*notifyOnly*/false, /*isKeyOnly*/false);
 
     // Disconnect the producer connection
     connMap.disconnect(cookie);
@@ -511,7 +630,7 @@ TEST_F(ConnectionTest, test_mb23637_findByNameWithConnectionDoDisconnect) {
     const void *cookie = create_mock_cookie();
     // Create a new Dcp producer
     dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
-                                    /*notifyOnly*/false);
+                                    /*notifyOnly*/false, /*isKeyOnly*/false);
     // should be able to find the connection
     ASSERT_NE(connection_t(nullptr),
               connMap.findByName("eq_dcpq:test_producer"));
@@ -536,7 +655,7 @@ TEST_F(ConnectionTest, test_mb23637_findByNameWithDuplicateConnections) {
     const void* cookie2 = create_mock_cookie();
     // Create a new Dcp producer
     dcp_producer_t producer = connMap.newProducer(cookie1, "test_producer",
-                                    /*notifyOnly*/false);
+                                    /*notifyOnly*/false, /*isKeyOnly*/false);
     ASSERT_NE(0, producer) << "producer is null";
     // should be able to find the connection
     ASSERT_NE(connection_t(nullptr),
@@ -544,7 +663,7 @@ TEST_F(ConnectionTest, test_mb23637_findByNameWithDuplicateConnections) {
 
     // Create a duplicate Dcp producer
     dcp_producer_t duplicateproducer = connMap.newProducer(
-            cookie2, "test_producer", /*notifyOnly*/false);
+            cookie2, "test_producer", /*notifyOnly*/false, /*isKeyOnly*/false);
     ASSERT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
     ASSERT_NE(0, duplicateproducer) << "duplicateproducer is null";
 
@@ -574,12 +693,14 @@ TEST_F(ConnectionTest, test_mb17042_duplicate_name_producer_connections) {
     const void* cookie2 = create_mock_cookie();
     // Create a new Dcp producer
     dcp_producer_t producer = connMap.newProducer(cookie1, "test_producer",
-                                                  /*notifyOnly*/false);
+                                                  /*notifyOnly*/false,
+                                                  /*isKeyOnly*/false);
     EXPECT_NE(0, producer) << "producer is null";
 
     // Create a duplicate Dcp producer
     dcp_producer_t duplicateproducer = connMap.newProducer(cookie2, "test_producer",
-                                                           /*notifyOnly*/false);
+                                                           /*notifyOnly*/false,
+                                                           /*isKeyOnly*/false);
     EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
     EXPECT_NE(0, duplicateproducer) << "duplicateproducer is null";
 
@@ -625,11 +746,13 @@ TEST_F(ConnectionTest, test_mb17042_duplicate_cookie_producer_connections) {
     const void* cookie = create_mock_cookie();
     // Create a new Dcp producer
     dcp_producer_t producer = connMap.newProducer(cookie, "test_producer1",
-                                                   /*notifyOnly*/false);
+                                                   /*notifyOnly*/false,
+                                                   /*isKeyOnly*/false);
 
     // Create a duplicate Dcp producer
     dcp_producer_t duplicateproducer = connMap.newProducer(cookie, "test_producer2",
-                                                            /*notifyOnly*/false);
+                                                            /*notifyOnly*/false,
+                                                            /*isKeyOnly*/false);
     EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
     EXPECT_EQ(0, duplicateproducer) << "duplicateproducer is not null";
 
@@ -791,7 +914,7 @@ TEST_F(ConnectionTest, test_mb20645_stats_after_closeAllStreams) {
     const void *cookie = create_mock_cookie();
     // Create a new Dcp producer
     dcp_producer_t producer = connMap.newProducer(cookie, "test_producer",
-                                    /*notifyOnly*/false);
+                                    /*notifyOnly*/false, /*isKeyOnly*/false);
 
     // Disconnect the producer connection
     connMap.disconnect(cookie);
@@ -815,7 +938,8 @@ TEST_F(ConnectionTest, test_mb20716_connmap_notify_on_delete) {
     const void *cookie = create_mock_cookie();
     // Create a new Dcp producer.
     dcp_producer_t producer = connMap.newProducer(cookie, "mb_20716r",
-                                                  /*notifyOnly*/false);
+                                                  /*notifyOnly*/false,
+                                                  /*isKeyOnly*/false);
 
     // Check preconditions.
     EXPECT_TRUE(producer->isPaused());
@@ -1000,7 +1124,8 @@ public:
         // Use 'this' instead of a mock cookie
         producer = connMap->newProducer(static_cast<void*>(this),
                                         "test_producer",
-                                        /*notifyOnly*/false);
+                                        /*notifyOnly*/false,
+                                        /*isKeyOnly*/false);
     }
 
     void notify() {
index 95f71c5..06d4aa7 100644 (file)
@@ -310,7 +310,7 @@ TEST_P(RollbackTest, MB21784) {
     // Create a new Dcp producer, reserving its cookie.
     get_mock_server_api()->cookie->reserve(cookie);
     dcp_producer_t producer = engine->getDcpConnMap().newProducer(
-            cookie, "test_producer", /*notifyOnly*/false);
+            cookie, "test_producer", /*notifyOnly*/false, /*isKeyOnly*/false);
 
     uint64_t rollbackSeqno;
     auto err = producer->streamRequest(/*flags*/0,
index 4ebaa42..d0a6189 100644 (file)
@@ -56,7 +56,7 @@ TEST_F(EPBucketTest, test_mb20751_deadlock_on_disconnect_delete) {
     // Create a new Dcp producer, reserving its cookie.
     get_mock_server_api()->cookie->reserve(cookie);
     dcp_producer_t producer = engine->getDcpConnMap().newProducer(
-        cookie, "mb_20716r", /*notifyOnly*/false);
+        cookie, "mb_20716r", /*notifyOnly*/false, /*isKeyOnly*/false);
 
     // Check preconditions.
     EXPECT_TRUE(producer->isPaused());