MB-17502: DCP performance regression fixed. 86/58886/9
authorJim Walker <jim@couchbase.com>
Wed, 6 Jan 2016 15:40:57 +0000 (15:40 +0000)
committerabhinav dangeti <abhinav@couchbase.com>
Wed, 27 Jan 2016 22:37:24 +0000 (22:37 +0000)
Many patches were added to speed up DCP, however some of that
performance was lost when doing some code tidying without
re-profiling.

With all the DCP performance patches (particuarly) 87869fd39 straight
DCP performance is a touch slower. This is because DCP used to take
one lock and then do work. The new code has more locks, but holds
them for fewer lines of code. This means that DCP is friendlier/fairer
to the other threads interacting with a DCP producer.
The front-end operation threads are no longer stalled for long periods
whilst DCP holds the one lock.

Frontend latency before locking changes:

 === Latency [With background DCP] - 100000 items
                                 Percentile
                   Median     95th     99th  Std Dev
 Add               16.337   34.894   45.241   25.627
 Get                1.226    1.524    1.745    0.435
 Replace           16.311   34.386   42.097    8.435
 Delete            15.636   32.915   41.999    7.408

Frontend latency after locking changes:

 === Latency [With background DCP] - 100000 items
                                 Percentile
                   Median     95th     99th  Std Dev
 Add                3.996   12.159   20.724   11.376
 Get                1.299    1.629    1.730    0.634
 Replace            4.274   12.831   22.988    4.523
 Delete             3.142   10.302   14.292    3.350

The average and 95th/99th are all improved.

Fix details:

The roundRobin/vbReady code has a bufferLog.pauseIfFull call on the
"hot" part of the loop, this is the main cause of the regression.

With that fixed CPU profiling and benchmarking shows that DCP is back
to 3.1.3 levels but highlighted that:

1. DcpProducer::getNextItem was hot (5% of a DcpProducer thread).
2. DcpConsumer::processBufferedItems was hitting SpinLock hard.
   20 to 30% at times was consumed by SpinLock code.
3. snapshot creation was frequently yielding even though it had work todo.

So to address 1. the fix is actually to remove the roundRobin/vbReady
code. It is actually no better and in some cases a little slower than
the orginal. This code is replaces with std:: structures *but* the
Mutex used has a much smaller scope.

Note the DcpProducerReadyQueue has been profiled and proven that having
the std::map powering find() is much faster than searching the list.
This is important because the find method is part of the front-end
operation thread.

To address 2. it was observed that the consumer code is constructing
a passive_stream_t frequently, then testing if there is a pointer.
The construction uses the SpinLock code and can be avoided just by
testing the streams[vb] directly and only then do we construct
a copy of the passive_stream_t. This avoids the SpinLock code on
every iteration of the for loop in the affected function.

To address 3. ensure that the snapshot tasks work queue doesn't have
duplicates, there's no need. Then raise the number of snapshots before
yield. Various rebalances showed that around 250 was enough, so let's go
with 256.

Change-Id: I8fb0bd30f8e07d000192675de425726ad26e403a
Reviewed-on: http://review.couchbase.org/58886
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: abhinav dangeti <abhinav@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
configuration.json
src/dcp-consumer.cc
src/dcp-producer.cc
src/dcp-producer.h
src/dcp-stream.cc
src/dcp-stream.h

index 1a255b9..a2a0a67 100644 (file)
             "dynamic": false,
             "type": "size_t"
         },
-        "dcp_producer_notifier_yield_limit": {
-            "default": "50",
-            "descr": "The number of notifications before DcpProducerNotifier::run yields.",
-            "type": "size_t"
-        },
         "dcp_producer_snapshot_marker_yield_limit": {
-            "default": "10",
+            "default": "256",
             "descr": "The number of snapshots before ActiveStreamCheckpointProcessorTask::run yields.",
             "type": "size_t"
         },
index 3c8a55c..e8d7282 100644 (file)
@@ -573,7 +573,16 @@ process_items_error_t DcpConsumer::processBufferedItems() {
     int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
     for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
 
-        passive_stream_t stream = streams[vbucket];
+        passive_stream_t stream;
+        if (streams[vbucket]) {
+            // only assign a stream if there is one present to reduce
+            // the amount of cycles the RCPtr spinlock will use.
+            stream = streams[vbucket];
+        }
+
+        // Now that we think there's a stream, check again in-case it was
+        // removed after our first "cheap" if (streams[vb]) test and the actual
+        // copy construction onto stream.
         if (!stream) {
             continue;
         }
index f484334..3d53924 100644 (file)
@@ -110,8 +110,7 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
                          const std::string &name, bool isNotifier)
     : Producer(e, cookie, name), rejectResp(NULL),
       notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(*this),
-      vbReady(e.getConfiguration().getMaxVbuckets()), notifiedVbReady(false),
-      itemsSent(0), totalBytesSent(0), roundRobinVbReady(0) {
+      itemsSent(0), totalBytesSent(0) {
     setSupportAck(true);
     setReserved(true);
     setPaused(true);
@@ -219,7 +218,7 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
                 return ENGINE_KEY_EEXISTS;
             } else {
                 streams.erase(vbucket);
-                vbReady[vbucket].store(false);
+
                 // Don't need to add an entry to vbucket-to-conns map
                 add_vb_conn_map = false;
             }
@@ -269,11 +268,8 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
                                             checkpointCreatorTask);
         static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
     }
-    vbReady[vbucket].store(true);
-    bool inverse = false;
-    if (notifiedVbReady.compare_exchange_strong(inverse, true)) {
-        log.unpauseIfSpaceAvailable();
-    }
+
+    notifyStreamReady(vbucket, false/*unused for DCP*/);
 
     if (add_vb_conn_map) {
         connection_t conn(this);
@@ -622,74 +618,56 @@ const char* DcpProducer::getType() const {
 
 DcpResponse* DcpProducer::getNextItem() {
     setPaused(false);
-    bool inverse = true;
-    do {
-        if (roundRobinVbReady >= vbReady.size()) {
-            roundRobinVbReady = 0;
+    uint16_t vbucket = 0;
+    while (ready.popFront(vbucket)) {
+        if (log.pauseIfFull()) {
+            ready.pushUnique(vbucket);
+            return NULL;
         }
-        for (; roundRobinVbReady < vbReady.size(); roundRobinVbReady++) {
 
-            if (log.pauseIfFull()) {
-                return NULL;
+        DcpResponse* op = NULL;
+        {
+            ReaderLockHolder rlh(streamsMutex);
+            std::map<uint16_t, stream_t>::iterator it = streams.find(vbucket);
+            if (it == streams.end()) {
+                continue;
             }
+            op = it->second->next();
+        }
 
-            bool expected = true;
-            if (vbReady[roundRobinVbReady].compare_exchange_strong(expected, false)) {
-                uint16_t vbucket = roundRobinVbReady;
-                DcpResponse *op = NULL;
-                std::map<uint16_t, stream_t>::iterator it;
-                stream_t stream;
-                {
-                    ReaderLockHolder rlh(streamsMutex);
-                    it = streams.find(vbucket);
-                    if (it == streams.end()) {
-                        continue;
-                    }
-                    stream.reset(it->second);
-                }
-
-                // Return the next operation
-                // When an op is returned it is assumed
-                // our bufferLog has been updated.
-                op = stream->next();
-
-                if (!op) {
-                    continue;
-                }
+        if (!op) {
+            // stream is empty, try another vbucket.
+            continue;
+        }
 
-                switch (op->getEvent()) {
-                    case DCP_SNAPSHOT_MARKER:
-                    case DCP_MUTATION:
-                    case DCP_DELETION:
-                    case DCP_EXPIRATION:
-                    case DCP_STREAM_END:
-                    case DCP_SET_VBUCKET:
-                        break;
-                    default:
-                        LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to "
-                                "write an unexpected event %d",
-                                logHeader(), op->getEvent());
-                        abort();
-                }
+        switch (op->getEvent()) {
+            case DCP_SNAPSHOT_MARKER:
+            case DCP_MUTATION:
+            case DCP_DELETION:
+            case DCP_EXPIRATION:
+            case DCP_STREAM_END:
+            case DCP_SET_VBUCKET:
+                break;
+            default:
+                LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
+                    " an unexpected event %d", logHeader(), op->getEvent());
+                abort();
+        }
 
-                vbReady[vbucket].store(true);
-                notifiedVbReady.store(true);
-                ++roundRobinVbReady;
+        ready.pushUnique(vbucket);
 
-                if (op->getEvent() == DCP_MUTATION ||
-                    op->getEvent() == DCP_DELETION ||
-                    op->getEvent() == DCP_EXPIRATION) {
-                    itemsSent++;
-                }
+        if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
+            op->getEvent() == DCP_EXPIRATION) {
+            itemsSent++;
+        }
 
-                totalBytesSent.fetch_add(op->getMessageSize());
+        totalBytesSent.fetch_add(op->getMessageSize());
 
-                return op;
-            }
-        }
-    } while (notifiedVbReady.compare_exchange_strong(inverse, false));
+        return op;
+    }
 
     setPaused(true);
+
     return NULL;
 }
 
@@ -706,9 +684,7 @@ void DcpProducer::setDisconnect(bool disconnect) {
 }
 
 void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
-    bool expected = false;
-    if (vbReady[vbucket].compare_exchange_strong(expected, true) &&
-        notifiedVbReady.compare_exchange_strong(expected, true)) {
+    if (ready.pushUnique(vbucket)) {
         log.unpauseIfSpaceAvailable();
     }
 }
index 223cf3b..6397041 100644 (file)
@@ -184,6 +184,90 @@ public:
 
 private:
 
+    /**
+     * DcpProducerReadyQueue is a std::queue wrapper for managing a
+     * queue of vbucket's that are ready for a DCP producer to process.
+     * The queue does not allow duplicates and the push_unique method enforces
+     * this. The interface is generally customised for the needs of getNextItem
+     * and is thread safe as the frontend operations and DCPProducer threads
+     * are accessing this data.
+     *
+     * Internally a std::queue and std::set track the contents and the std::set
+     * enables a fast exists method which is used by front-end threads.
+     */
+    class DcpProducerReadyQueue {
+    public:
+        bool exists(uint16_t vbucket) {
+            LockHolder lh(lock);
+            return (queuedValues.count(vbucket) != 0);
+        }
+
+        /**
+         * Return true and set the ref-param 'frontValue' if the queue is not
+         * empty. frontValue is set to the front of the queue.
+         */
+        bool popFront(uint16_t &frontValue) {
+            LockHolder lh(lock);
+            if (!readyQueue.empty()) {
+                frontValue = readyQueue.front();
+                readyQueue.pop();
+                queuedValues.erase(frontValue);
+                return true;
+            }
+            return false;
+        }
+
+        /**
+         * Pop the front item.
+         * Safe to call on an empty list
+         */
+        void pop() {
+            LockHolder lh(lock);
+            if (!readyQueue.empty()) {
+                queuedValues.erase(readyQueue.front());
+                readyQueue.pop();
+            }
+        }
+
+        /**
+         * Push the vbucket only if it's not already in the queue
+         * Return true if the vbucket was added to the queue.
+         */
+        bool pushUnique(uint16_t vbucket) {
+            LockHolder lh(lock);
+            if (queuedValues.count(vbucket) == 0) {
+                readyQueue.push(vbucket);
+                queuedValues.insert(vbucket);
+                return true;
+            }
+            return false;
+        }
+
+        /**
+         * Move the front item to the back of the queue
+         */
+        void moveFrontToback() {
+            LockHolder lh(lock);
+            if (readyQueue.size() > 1) {
+                readyQueue.push(readyQueue.front());
+                readyQueue.pop();
+            }
+        }
+
+    private:
+        Mutex lock;
+
+        /* a queue of vbuckets that are ready for producing */
+        std::queue<uint16_t> readyQueue;
+
+        /**
+         * maintain a std::set of values that are in the readyQueue. find() is
+         * performed by front-end threads so we want it to be efficient so just
+         * a set lookup is required.
+         */
+        std::set<uint16_t> queuedValues;
+    };
+
     DcpResponse* getNextItem();
 
     size_t getItemsRemaining();
@@ -209,16 +293,13 @@ private:
     // (i.e. not adding / removing elements) then can acquire ReadLock, even
     // if a non-const method is called on stream_t.
     RWLock streamsMutex;
-
-    std::vector<AtomicValue<bool> > vbReady;
-    AtomicValue<bool> notifiedVbReady;
+    DcpProducerReadyQueue ready;
 
     std::map<uint16_t, stream_t> streams;
 
     AtomicValue<size_t> itemsSent;
     AtomicValue<size_t> totalBytesSent;
 
-    size_t roundRobinVbReady;
     ExTask checkpointCreatorTask;
     static const uint32_t defaultNoopInerval;
 };
index f70de4b..80f5c3f 100644 (file)
@@ -679,10 +679,7 @@ void ActiveStreamCheckpointProcessorTask::wakeup() {
 }
 
 void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
-    {
-        LockHolder lh(workQueueLock);
-        queue.push_back(stream);
-    }
+    pushUnique(stream);
 
     bool expected = false;
     if (notified.compare_exchange_strong(expected, true)) {
index d41774f..0db3223 100644 (file)
@@ -294,7 +294,8 @@ private:
         LockHolder lh(workQueueLock);
         if (!queue.empty()) {
             rval = queue.front();
-            queue.pop_front();
+            queue.pop();
+            queuedVbuckets.erase(rval->getVBucket());
         }
         return rval;
     }
@@ -304,8 +305,23 @@ private:
         return queue.empty();
     }
 
+    void pushUnique(stream_t stream) {
+        LockHolder lh(workQueueLock);
+        if (queuedVbuckets.count(stream->getVBucket()) == 0) {
+            queue.push(stream);
+            queuedVbuckets.insert(stream->getVBucket());
+        }
+    }
+
     Mutex workQueueLock;
-    std::deque<stream_t> queue;
+
+    /**
+     * Maintain a queue of unique stream_t
+     * There's no need to have the same stream in the queue more than once
+     */
+    std::queue<stream_t> queue;
+    std::set<uint16_t> queuedVbuckets;
+
     AtomicValue<bool> notified;
     size_t iterationsBeforeYield;
 };