MB-18452: Force DcpConsumer processor task to yield 18/64718/4
authorJim Walker <jim@couchbase.com>
Fri, 3 Jun 2016 10:49:55 +0000 (11:49 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 9 Jun 2016 17:35:36 +0000 (17:35 +0000)
Introduce two config tunable values that limit the DCP processor from
running 'forever'.

* dcp_consumer_process_buffered_messages_yield_limit
* dcp_consumer_process_buffered_messages_batch_size

The yield parameter forces the NONIO task to yield when the
limit is reached.

Change-Id: Ifce5a18fc807285471b08e9737cedb5db2b7923f
Reviewed-on: http://review.couchbase.org/64718
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
configuration.json
management/cbepctl
management/memcacheConstants.py
src/connmap.cc
src/connmap.h
src/dcp-consumer.cc
src/dcp-consumer.h
src/dcp-stream.cc
src/dcp-stream.h
src/ep_engine.cc
tests/ep_testsuite.cc

index 5de8147..5607590 100644 (file)
         "dcp_producer_snapshot_marker_yield_limit": {
             "default": "10",
             "descr": "The number of snapshots before ActiveStreamCheckpointProcessorTask::run yields.",
-            "type": "size_t"
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 100000000,
+                    "min": 1
+                }
+            }
+        },
+        "dcp_consumer_process_buffered_messages_yield_limit" : {
+            "default": "10",
+            "descr": "The number of processBufferedMessages iterations before forcing the task to yield.",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 100000000,
+                    "min": 1
+                }
+            }
+        },
+        "dcp_consumer_process_buffered_messages_batch_size" : {
+            "default": "10",
+            "descr": "The maximum number of items stream->processBufferedMessages will consume.",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 100000000,
+                    "min": 1
+                }
+            }
         },
         "vb0": {
             "default": "false",
index 7a9a829..2e1a53f 100755 (executable)
@@ -49,6 +49,8 @@ def set_param(mc, type, key, val):
         engine_param = memcacheConstants.ENGINE_PARAM_FLUSH
     elif type == 'tap_param':
         engine_param = memcacheConstants.ENGINE_PARAM_TAP
+    elif type == 'dcp_param':
+        engine_param = memcacheConstants.ENGINE_PARAM_DCP
     else:
         print 'Error: Bad parameter %s' % type
 
@@ -222,6 +224,15 @@ Available params for "set":
                                    which we throttle tap input
     tap_throttle_threshold       - Percentage of memory in use to throttle tap
                                    streams.
+  Available params for "set dcp_param":
+    dcp_consumer_process_buffered_messages_yield_limit - The threshold at which
+                                                         the Consumer will yield
+                                                         when processing items.
+
+    dcp_consumer_process_buffered_messages_batch_size - The number of items the
+                                                        DCP processor will consume
+                                                        in a single batch.
+
     """)
 
     c.addCommand('drain', drain, "drain")
index 7364f2b..527e464 100644 (file)
@@ -105,6 +105,7 @@ VB_STATE_NAMES={'active': VB_STATE_ACTIVE,
 ENGINE_PARAM_FLUSH      = 1
 ENGINE_PARAM_TAP        = 2
 ENGINE_PARAM_CHECKPOINT = 3
+ENGINE_PARAM_DCP        = 4
 
 
 COMMAND_NAMES = dict(((globals()[k], k) for k in globals() if k.startswith("CMD_")))
index d0b3445..6fd4c05 100644 (file)
@@ -936,6 +936,14 @@ DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
       aggrDcpConsumerBufferSize(0) {
     numActiveSnoozingBackfills = 0;
     updateMaxActiveSnoozingBackfills(engine.getEpStats().getMaxDataSize());
+
+    // Note: these allocations are deleted by ~Configuration
+    engine.getConfiguration().
+        addValueChangedListener("dcp_consumer_process_buffered_messages_yield_limit",
+                                new DcpConfigChangeListener(*this));
+    engine.getConfiguration().
+        addValueChangedListener("dcp_consumer_process_buffered_messages_batch_size",
+                                new DcpConfigChangeListener(*this));
 }
 
 DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
@@ -997,6 +1005,45 @@ ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler* conn,
     return conn->addStream(opaque, vbucket, flags);
 }
 
+
+DcpConnMap::DcpConfigChangeListener::DcpConfigChangeListener(DcpConnMap& connMap)
+    : myConnMap(connMap){}
+
+void DcpConnMap::DcpConfigChangeListener::sizeValueChanged(const std::string &key,
+                                                           size_t value) {
+    if (key == "dcp_consumer_process_buffered_messages_yield_limit") {
+        myConnMap.consumerYieldConfigChanged(value);
+    } else if (key == "dcp_consumer_process_buffered_messages_batch_size") {
+        myConnMap.consumerBatchSizeConfigChanged(value);
+    }
+}
+
+/*
+ * Find all DcpConsumers and set the yield threshold
+ */
+void DcpConnMap::consumerYieldConfigChanged(size_t newValue) {
+    LockHolder lh(connsLock);
+    for (auto it : all) {
+        DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it.get());
+        if (dcpConsumer) {
+            dcpConsumer->setProcessorYieldThreshold(newValue);
+        }
+    }
+}
+
+/*
+ * Find all DcpConsumers and set the processor batchsize
+ */
+void DcpConnMap::consumerBatchSizeConfigChanged(size_t newValue) {
+    LockHolder lh(connsLock);
+    for (auto it : all) {
+        DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it.get());
+        if (dcpConsumer) {
+            dcpConsumer->setProcessBufferedMessagesBatchSize(newValue);
+        }
+    }
+}
+
 DcpProducer *DcpConnMap::newProducer(const void* cookie,
                                      const std::string &name,
                                      bool notifyOnly)
index b6bd183..f6ba910 100644 (file)
@@ -430,9 +430,7 @@ private:
 
 };
 
-
 class DcpConnMap : public ConnMap {
-
 public:
 
     DcpConnMap(EventuallyPersistentEngine &engine);
@@ -496,6 +494,16 @@ public:
     ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
                                        uint16_t vbucket, uint32_t flags);
 
+    /*
+     * Change the value at which a DcpConsumer::Processor task will yield
+     */
+    void consumerYieldConfigChanged(size_t newValue);
+
+    /*
+     * Change the batchsize that the DcpConsumer::Processor operates with
+     */
+    void consumerBatchSizeConfigChanged(size_t newValue);
+
 private:
 
     bool isPassiveStreamConnected_UNLOCKED(uint16_t vbucket);
@@ -518,6 +526,14 @@ private:
     /* Total memory used by all DCP consumer buffers */
     AtomicValue<size_t> aggrDcpConsumerBufferSize;
 
+    class DcpConfigChangeListener : public ValueChangedListener {
+    public:
+        DcpConfigChangeListener(DcpConnMap& connMap);
+        virtual ~DcpConfigChangeListener() { }
+        virtual void sizeValueChanged(const std::string &key, size_t value);
+    private:
+        DcpConnMap& myConnMap;
+    };
 };
 
 
index 8c7f6a7..b856629 100644 (file)
@@ -70,10 +70,18 @@ std::string Processer::getDescription() {
 
 DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
                          const std::string &name)
-    : Consumer(engine, cookie, name), opaqueCounter(0), processTaskId(0),
-          itemsToProcess(false), lastNoopTime(ep_current_time()), backoffs(0) {
+    : Consumer(engine, cookie, name),
+      opaqueCounter(0),
+      processTaskId(0),
+      itemsToProcess(false),
+      lastNoopTime(ep_current_time()),
+      backoffs(0),
+      processBufferedMessagesYieldThreshold(engine.getConfiguration().
+                                                getDcpConsumerProcessBufferedMessagesYieldLimit()),
+      processBufferedMessagesBatchSize(engine.getConfiguration().
+                                            getDcpConsumerProcessBufferedMessagesBatchSize()) {
     Configuration& config = engine.getConfiguration();
-    streams = new passive_stream_t[config.getMaxVbuckets()];
+    streams.resize(config.getMaxVbuckets());
     setSupportAck(false);
     setLogHeader("DCP (Consumer) " + getName() + " -");
     setReserved(true);
@@ -141,7 +149,6 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
 }
 
 DcpConsumer::~DcpConsumer() {
-    delete[] streams;
     engine_.getDcpConnMap().decAggrDcpConsumerBufferSize
                                 (flowControl.bufferSize);
 }
@@ -697,6 +704,46 @@ void DcpConsumer::aggregateQueueStats(ConnCounter* aggregator) {
     aggregator->conn_queueBackoff += backoffs;
 }
 
+bool DcpConsumer::tryAndAssignVbucketsStream(uint16_t vbid, passive_stream_t& stream) {
+    if (streams[vbid]) {
+        // This assignment hits a spinlock, so we only want to do it
+        // if streams[vbucket] owns a pointer
+        stream = streams[vbid];
+        // Now we've done the 'expensive' copy, recheck as it's possible
+        // the stream went to null after our first cheap check.
+        return stream.get() != nullptr;
+    }
+    return false;
+}
+
+process_items_error_t DcpConsumer::drainStreamsBufferedItems(passive_stream_t& stream,
+                                                             size_t yieldThreshold) {
+    process_items_error_t rval = all_processed;
+    uint32_t bytesProcessed = 0;
+    size_t iterations = 0;
+    do {
+         if (!engine_.getTapThrottle().shouldProcess()) {
+                backoffs++;
+                return cannot_process;
+        }
+
+        bytesProcessed = 0;
+        rval = stream->processBufferedMessages(bytesProcessed,
+                                               processBufferedMessagesBatchSize);
+        flowControl.freedBytes.fetch_add(bytesProcessed);
+        iterations++;
+    } while (bytesProcessed > 0 &&
+             rval == all_processed &&
+             iterations <= yieldThreshold);
+
+    // The stream may not be done yet so must go back in the ready queue
+    if (bytesProcessed > 0) {
+        rval = more_to_process; // Return more_to_process to force a snooze(0.0)
+    }
+
+    return rval;
+}
+
 process_items_error_t DcpConsumer::processBufferedItems() {
     itemsToProcess.store(false);
     process_items_error_t process_ret = all_processed;
@@ -705,31 +752,15 @@ process_items_error_t DcpConsumer::processBufferedItems() {
     for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
 
         passive_stream_t stream;
-        if (streams[vbucket]) {
-            // only assign a stream if there is one present to reduce
-            // the amount of cycles the RCPtr spinlock will use.
-            stream = streams[vbucket];
-        }
 
-        // Now that we think there's a stream, check again in-case it was
-        // removed after our first "cheap" if (streams[vb]) test and the actual
-        // copy construction onto stream.
-        if (!stream) {
+        if (!tryAndAssignVbucketsStream(vbucket, stream)) {
+            // Try popping again
             continue;
         }
 
-        uint32_t bytes_processed;
-
-        do {
-            if (!engine_.getTapThrottle().shouldProcess()) {
-                backoffs++;
-                return cannot_process;
-            }
+        process_ret = drainStreamsBufferedItems(stream,
+                                                processBufferedMessagesYieldThreshold);
 
-            bytes_processed = 0;
-            process_ret = stream->processBufferedMessages(bytes_processed);
-            flowControl.freedBytes.fetch_add(bytes_processed);
-        } while (bytes_processed > 0 && process_ret != cannot_process);
     }
 
     if (isBufferSufficientlyDrained(flowControl.freedBytes.load())) {
index 40748bf..5710781 100644 (file)
@@ -92,6 +92,14 @@ public:
 
     bool isStreamPresent(uint16_t vbucket);
 
+    void setProcessorYieldThreshold(size_t newValue) {
+        processBufferedMessagesYieldThreshold = newValue;
+    }
+
+    void setProcessBufferedMessagesBatchSize(size_t newValue) {
+        processBufferedMessagesBatchSize = newValue;
+    }
+
 private:
 
     DcpResponse* getNextItem();
@@ -118,6 +126,23 @@ private:
     ENGINE_ERROR_CODE handleExtMetaData(struct dcp_message_producers* producers);
     inline bool isBufferSufficientlyDrained(uint32_t ackable_bytes);
 
+    /**
+     * Try to assign the vbucket's stream.
+     * Returns true if one was assigned, else false
+     */
+    bool tryAndAssignVbucketsStream(uint16_t vbid, passive_stream_t& stream);
+
+    /**
+     * Drain the stream of bufferedItems
+     * The function will stop draining
+     *  - if there's no more data - all_processed
+     *  - if the replication throttle says no more - cannot_process
+     *  - if there's an error, e.g. ETMPFAIL/ENOMEM - cannot_process
+     *  - if we hit the yieldThreshold - more_to_process
+     */
+    process_items_error_t drainStreamsBufferedItems(passive_stream_t& stream,
+                                                    size_t yieldThreshold);
+
     uint64_t opaqueCounter;
     size_t processTaskId;
     AtomicValue<bool> itemsToProcess;
@@ -125,7 +150,7 @@ private:
     Mutex readyMutex;
     std::list<uint16_t> ready;
 
-    passive_stream_t* streams;
+    std::vector<passive_stream_t> streams;
     opaque_map opaqueMap_;
 
     rel_time_t lastNoopTime;
@@ -148,6 +173,20 @@ private:
         AtomicValue<uint32_t> freedBytes;
         AtomicValue<uint64_t> ackedBytes;
     } flowControl;
+
+    /**
+     * An upper bound on how many times drainStreamsBufferedItems will
+     * call into processBufferedMessages before returning and triggering
+     * Processor to yield. Initialised from the configuration
+     *  'dcp_consumer_process_buffered_messages_yield_limit'
+     */
+    size_t processBufferedMessagesYieldThreshold;
+
+    /**
+     * An upper bound on how many items a single consumer stream will process
+     * in one call of stream->processBufferedMessages()
+     */
+    size_t processBufferedMessagesBatchSize;
 };
 
 /*
index 4962bf1..7dbc338 100644 (file)
@@ -35,7 +35,6 @@ static const char* snapshotTypeToString(snapshot_type_t type) {
 }
 
 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
-const size_t PassiveStream::batchSize = 10;
 
 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
@@ -1211,7 +1210,6 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
             return ENGINE_DISCONNECT;
         }
     }
-
     buffer.messages.push(resp);
     buffer.items++;
     buffer.bytes += resp->getMessageSize();
@@ -1219,18 +1217,18 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
     return ENGINE_SUCCESS;
 }
 
-process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes) {
+process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
+                                                             size_t batchSize) {
     LockHolder lh(buffer.bufMutex);
     uint32_t count = 0;
     uint32_t message_bytes = 0;
     uint32_t total_bytes_processed = 0;
     bool failed = false;
-
     if (buffer.messages.empty()) {
         return all_processed;
     }
 
-    while (count < PassiveStream::batchSize && !buffer.messages.empty()) {
+    while (count < batchSize && !buffer.messages.empty()) {
         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
         DcpResponse *response = buffer.messages.front();
         message_bytes = response->getMessageSize();
@@ -1253,7 +1251,12 @@ process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed
                 transitionState(STREAM_DEAD);
                 break;
             default:
-                abort();
+                LOG(EXTENSION_LOG_WARNING,
+                    "PassiveStream::processBufferedMessages: "
+                    "(vb %d) PassiveStream failing "
+                    "unknown message type %d",
+                    vb_, response->getEvent());
+                failed = true;
         }
 
         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
index b6c9e0d..e443961 100644 (file)
@@ -396,7 +396,8 @@ public:
 
     ~PassiveStream();
 
-    process_items_error_t processBufferedMessages(uint32_t &processed_bytes);
+    process_items_error_t processBufferedMessages(uint32_t &processed_bytes,
+                                                  size_t batchSize);
 
     DcpResponse* next();
 
@@ -411,8 +412,6 @@ public:
 
     void addStats(ADD_STAT add_stat, const void *c);
 
-    static const size_t batchSize;
-
 private:
 
     ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
index 8b3f6cb..d481f99 100644 (file)
@@ -564,6 +564,37 @@ extern "C" {
         return rv;
     }
 
+    static protocol_binary_response_status setDcpParam(
+                                                    EventuallyPersistentEngine *e,
+                                                    const char *keyz,
+                                                    const char *valz,
+                                                    const char **msg,
+                                                    size_t *) {
+        protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
+        try {
+
+            if (strcmp(keyz, "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
+                size_t v = atoi(valz);
+                checkNumeric(valz);
+                validate(v, size_t(1), std::numeric_limits<size_t>::max());
+                e->getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(v);
+            } else if (strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") == 0) {
+                size_t v = atoi(valz);
+                checkNumeric(valz);
+                validate(v, size_t(1), std::numeric_limits<size_t>::max());
+                e->getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(v);
+            } else {
+                *msg = "Unknown config param";
+                rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
+            }
+        } catch (std::runtime_error& ex) {
+            *msg = "Value out of range.";
+            rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
+        }
+
+        return rv;
+    }
+
     static protocol_binary_response_status evictKey(
                                                  EventuallyPersistentEngine *e,
                                                  protocol_binary_request_header
@@ -757,7 +788,7 @@ extern "C" {
         const char *valuep = keyp + keylen;
         vallen -= (keylen + extlen);
 
-        char keyz[32];
+        char keyz[128];
         char valz[512];
 
         // Read the key.
@@ -788,6 +819,9 @@ extern "C" {
         case protocol_binary_engine_param_checkpoint:
             rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
             break;
+        case protocol_binary_engine_param_dcp:
+            rv = setDcpParam(e, keyz, valz, msg, msg_size);
+            break;
         default:
             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
         }
index bff29f6..9f8d6f0 100644 (file)
@@ -14237,6 +14237,209 @@ static enum test_result test_mb19635_upgrade_from_25x(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
+static enum test_result test_set_dcp_param(ENGINE_HANDLE *h,
+                                           ENGINE_HANDLE_V1 *h1)
+{
+    auto func = [h, h1](std::string key, size_t newValue, bool expectedSetParam){
+        std::string statKey = "ep_" + key;
+        size_t param = get_int_stat(h,
+                                    h1,
+                                    statKey.c_str());
+        std::string value = std::to_string(newValue);
+        check(expectedSetParam == set_param(h, h1,
+                                            protocol_binary_engine_param_dcp,
+                                            key.c_str(),
+                                            value.c_str()),
+                "Set param not expected");
+        check(newValue != param,
+              "Forcing failure as nothing will change");
+
+        if (expectedSetParam) {
+            checkeq(newValue,
+                    size_t(get_int_stat(h,
+                                        h1,
+                                        statKey.c_str())),
+                "Incorrect dcp param value after calling set_param");
+        }
+    };
+
+    func("dcp_consumer_process_buffered_messages_yield_limit", 1000, true);
+    func("dcp_consumer_process_buffered_messages_batch_size", 1000, true);
+    func("dcp_consumer_process_buffered_messages_yield_limit", 0, false);
+    func("dcp_consumer_process_buffered_messages_batch_size", 0, false);
+    return SUCCESS;
+}
+
+
+/*
+ * Test MB-18452
+ * Drive DCP consumer by halting all NONIO tasks
+ * Writing numItems mutations (they get buffered)
+ * Then trigger the NONIO tasks, which will trigger the DCP consumer
+ *  to consume the buffered items.
+ * If the DCP consumer is friendly and not hogging the NONIO threads
+ * we should see it being scheduled many times.
+ * This test function returns the number of times the processor task was
+ * dispatched.
+ */
+static int test_mb18452(ENGINE_HANDLE *h,
+                        ENGINE_HANDLE_V1 *h1,
+                        size_t numItems,
+                        size_t yieldValue,
+                        size_t batchSize) {
+
+    // 1. Setup the consumer params.
+    std::string value = std::to_string(yieldValue);
+    set_param(h, h1, protocol_binary_engine_param_dcp,
+              "dcp_consumer_process_buffered_messages_yield_limit",
+              value.c_str());
+    value = std::to_string(batchSize);
+    set_param(h, h1, protocol_binary_engine_param_dcp,
+              "dcp_consumer_process_buffered_messages_batch_size",
+              value.c_str());
+
+    const uint16_t vbid = 0;
+    const uint32_t opaque = 0xFFFF0000;
+    const uint32_t flags = 0;
+    const void* cookie = testHarness.create_cookie();
+
+    // 2. We need to use a replica
+    check(set_vbucket_state(h, h1, vbid, vbucket_state_replica),
+          "Failed to set vbucket state.");
+
+    // 3. Force the engine to not run any NONIO tasks whilst we 'load up'
+    set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_nonio",
+              "0");
+
+    // 4. Create a consumer and one stream for the vbucket
+    std::string consumer("unittest");
+    checkeq(h1->dcp.open(h,
+                         cookie,
+                         opaque,
+                         0/*seqno*/,
+                         flags,
+                         (void*)consumer.c_str(),
+                         consumer.length()),
+            ENGINE_SUCCESS,
+            "Failed dcp Consumer open connection.");
+    add_stream_for_consumer(h, h1, cookie, opaque + 1, vbid, flags,
+                            PROTOCOL_BINARY_RESPONSE_SUCCESS);
+
+    uint32_t stream_opaque = get_int_stat(h, h1,
+                                          "eq_dcpq:unittest:stream_0_opaque",
+                                          "dcp");
+    checkeq(ENGINE_SUCCESS,
+            h1->dcp.snapshot_marker(h,
+                                    cookie,
+                                    stream_opaque,
+                                    vbid,
+                                    1,//snap start
+                                    numItems,//snap end
+                                    2), //flags
+            "Failed to send snapshot marker");
+
+    for (uint64_t seqno = 1; seqno <= numItems; seqno++) {
+        std::string key = "key" + std::to_string(seqno);
+        checkeq(ENGINE_SUCCESS,
+                h1->dcp.mutation(h,
+                                 cookie,
+                                 stream_opaque,
+                                 key.c_str(),
+                                 key.length(),
+                                 "value", // item value
+                                 sizeof("value"), // item value length
+                                 seqno, // cas
+                                 vbid, // vbucket
+                                 0, // flags
+                                 PROTOCOL_BINARY_RAW_BYTES,
+                                 seqno, // bySeqno
+                                 1, // revSeqno
+                                 0, // expiration
+                                 0, // locktime
+                                 "", //meta
+                                 0, // metalen
+                                 INITIAL_NRU_VALUE),
+                "Failed to send dcp mutation");
+
+        // At n - 1, enable NONIO tasks, the nth mutation will wake up the task.
+        if (seqno == (numItems - 1)) {
+               set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_nonio",
+              "1");
+        }
+    }
+
+    wait_for_stat_to_be(h, h1, "vb_replica_curr_items", numItems);
+
+    // 3. Force the engine to not run any NONIO tasks whilst we 'count up'
+    set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_nonio",
+              "0");
+
+    // Now we should count how many times the NONIO task ran
+    // This is slighly racy, but if the task is yielding we expect many
+    // runs from it, not a small number
+    check(h1->get_stats(h, NULL, "dispatcher",
+                        strlen("dispatcher"), add_stats) == ENGINE_SUCCESS,
+                        "Failed to get worker stats");
+
+    // Count up how many times the Processing task was logged
+    int count = 0;
+    const std::string key1 = "nonio_worker_";
+    const std::string key2 = "Processing buffered items for eq_dcpq:unittest";
+    for (auto kv : vals) {
+        if (kv.first.find(key1) != std::string::npos &&
+            kv.second.find(key2) != std::string::npos) {
+            count++;
+        }
+    }
+
+    // 4. Re-enable NONIO so we can shutdown
+    set_param(h, h1, protocol_binary_engine_param_flush,
+              "max_num_nonio",
+              "1");
+    return count;
+}
+
+/**
+ * Test the behaviour of DCP consumer under load.
+ * The consumer use a NONIO task to process data from an input buffer.
+ * This task when given lots of data should voluntarily yield if it finds
+ * itself running for n iterations...
+ */
+static enum test_result test_mb18452_smallYield(ENGINE_HANDLE* h,
+                                                 ENGINE_HANDLE_V1* h1) {
+    const int batchSize = 10;
+    const int numItems = 1000;
+    const int yield = 10;
+
+    int processorRuns = test_mb18452(h, h1, numItems, yield, batchSize);
+
+    // Before the ep-engine updates, the processor run count was usually 1 or 2
+    // with the fix it's up around 80 (appears to saturate the log).
+
+    // So we check that it ran the same or more times than the numItems/(yield*batch)
+    check(processorRuns >= (numItems / (yield * batchSize)),
+          "DCP Processor ran less times than expected.");
+    return SUCCESS;
+}
+
+static enum test_result test_mb18452_largeYield(ENGINE_HANDLE* h,
+                                                ENGINE_HANDLE_V1* h1) {
+    const int batchSize = 10;
+    const int numItems = 10000;
+    const int yield = 10000;
+    int processorRuns =  test_mb18452(h, h1, numItems, yield, batchSize);
+    // Here we expect very few yields, so very few runs (definitely not enough to fill
+    // the task log (TASK_LOG_SIZE)
+    check(processorRuns < 80,
+          "DCP Processor ran more times than expected.");
+
+
+    return SUCCESS;
+}
+
 static enum test_result prepare(engine_test_t *test) {
 #ifdef __sun
         // Some of the tests doesn't work on Solaris.. Don't know why yet..
@@ -15289,10 +15492,21 @@ engine_test_t* get_tests(void) {
                  test_mb17517_tap_with_locked_key, test_setup, teardown, NULL,
                  prepare, cleanup),
 
-         TestCase("test_mb19635_upgrade_from_25x",
+        TestCase("test_mb19635_upgrade_from_25x",
                  test_mb19635_upgrade_from_25x, test_setup, teardown, NULL,
                  prepare, cleanup),
 
+        TestCase("test_set_dcp_param",
+                 test_set_dcp_param, test_setup, teardown, NULL,
+                 prepare, cleanup),
+
+        TestCase("test_mb18452_largeYield",
+                 test_mb18452_largeYield, test_setup, teardown, "max_num_nonio=1",
+                 prepare, cleanup),
+
+        TestCase("test_mb18452_smallYield",
+                 test_mb18452_smallYield, test_setup, teardown, "max_num_nonio=1",
+                 prepare, cleanup),
 
         TestCase(NULL, NULL, NULL, NULL, NULL, prepare, cleanup)
     };