MB-16632: Use a background task to handle snapshot creation 48/57148/17
authorJim Walker <jim@couchbase.com>
Thu, 12 Nov 2015 11:14:11 +0000 (11:14 +0000)
committerChiyoung Seo <chiyoung@couchbase.com>
Fri, 8 Jan 2016 15:59:40 +0000 (15:59 +0000)
Frontend threads are delayed by large snaphots due to the time taken
in processing the items into the readyQ.

Moving this work to a background task frees frontend threads to
do other work.

Change-Id: Ic399ef06be996b7b7e179c4c8934a0f5a74cb8f7
Reviewed-on: http://review.couchbase.org/57148
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
configuration.json
src/dcp-producer.cc
src/dcp-producer.h
src/dcp-stream.cc
src/dcp-stream.h
src/priority.cc
src/priority.h

index b233cd4..1a255b9 100644 (file)
             "descr": "The number of notifications before DcpProducerNotifier::run yields.",
             "type": "size_t"
         },
+        "dcp_producer_snapshot_marker_yield_limit": {
+            "default": "10",
+            "descr": "The number of snapshots before ActiveStreamCheckpointProcessorTask::run yields.",
+            "type": "size_t"
+        },
         "vb0": {
             "default": "false",
             "type": "bool"
index c2e4812..f484334 100644 (file)
@@ -146,6 +146,14 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
     noopCtx.noopInterval = defaultNoopInerval;
     noopCtx.pendingRecv = false;
     noopCtx.enabled = false;
+
+    // 1 task per producer
+    checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e);
+    ExecutorPool::get()->schedule(checkpointCreatorTask, AUXIO_TASK_IDX);
+}
+
+DcpProducer::~DcpProducer() {
+    ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
 }
 
 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
@@ -257,7 +265,8 @@ ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
         streams[vbucket] = new ActiveStream(&engine_, this, getName(), flags,
                                             opaque, vbucket, start_seqno,
                                             end_seqno, vbucket_uuid,
-                                            snap_start_seqno, snap_end_seqno);
+                                            snap_start_seqno, snap_end_seqno,
+                                            checkpointCreatorTask);
         static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
     }
     vbReady[vbucket].store(true);
index 0554893..223cf3b 100644 (file)
@@ -23,8 +23,6 @@
 #include "tapconnection.h"
 #include "dcp-stream.h"
 
-typedef SingleThreadedRCPtr<Stream> stream_t;
-
 class DcpResponse;
 
 class DcpProducer : public Producer {
@@ -33,6 +31,8 @@ public:
     DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
                 const std::string &n, bool notifyOnly);
 
+    ~DcpProducer();
+
     ENGINE_ERROR_CODE streamRequest(uint32_t flags, uint32_t opaque,
                                     uint16_t vbucket, uint64_t start_seqno,
                                     uint64_t end_seqno, uint64_t vbucket_uuid,
@@ -219,6 +219,7 @@ private:
     AtomicValue<size_t> totalBytesSent;
 
     size_t roundRobinVbReady;
+    ExTask checkpointCreatorTask;
     static const uint32_t defaultNoopInerval;
 };
 
index b6a467c..f70de4b 100644 (file)
@@ -268,14 +268,16 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                            const std::string &n, uint32_t flags,
                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
                            uint64_t en_seqno, uint64_t vb_uuid,
-                           uint64_t snap_start_seqno, uint64_t snap_end_seqno)
+                           uint64_t snap_start_seqno, uint64_t snap_end_seqno,
+                           ExTask task)
     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
               snap_start_seqno, snap_end_seqno),
        lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
        takeoverState(vbucket_state_pending), backfillRemaining(0),
        itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
        waitForSnapshot(0), engine(e), producer(p),
-       isBackfillTaskRunning(false), lastSentSnapEndSeqno(0) {
+       isBackfillTaskRunning(false), lastSentSnapEndSeqno(0),
+       checkpointCreatorTask(task) {
 
     const char* type = "";
     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
@@ -507,7 +509,9 @@ DcpResponse* ActiveStream::inMemoryPhase() {
     if (lastSentSeqno >= end_seqno_) {
         endStream(END_STREAM_OK);
     } else if (readyQ.empty()) {
-        nextCheckpointItem();
+        if (nextCheckpointItem()) {
+            return NULL;
+        }
     }
 
     return nextQueuedItem();
@@ -517,9 +521,8 @@ DcpResponse* ActiveStream::takeoverSendPhase() {
     if (!readyQ.empty()) {
         return nextQueuedItem();
     } else {
-        nextCheckpointItem();
-        if (!readyQ.empty()) {
-            return nextQueuedItem();
+        if (nextCheckpointItem()) {
+            return NULL;
         }
     }
 
@@ -624,7 +627,70 @@ DcpResponse* ActiveStream::nextQueuedItem() {
     return NULL;
 }
 
-void ActiveStream::nextCheckpointItem() {
+bool ActiveStream::nextCheckpointItem() {
+    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+    if (vbucket && vbucket->checkpointManager.getNumItemsForTAPConnection(name_) > 0) {
+        // schedule this stream to build the next checkpoint
+        static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
+        ->schedule(this);
+        return true;
+    }
+    return false;
+}
+
+bool ActiveStreamCheckpointProcessorTask::run() {
+    if (engine->getEpStats().isShutdown) {
+        return false;
+    }
+
+    // Setup that we will sleep forever when done.
+    snooze(INT_MAX);
+
+    // Clear the notfification flag
+    notified.store(false);
+
+    size_t iterations = 0;
+    do {
+        stream_t nextStream = queuePop();
+        ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
+
+        if (stream) {
+            stream->nextCheckpointItemTask();
+        } else {
+            break;
+        }
+        iterations++;
+    } while(!queueEmpty()
+            && iterations < iterationsBeforeYield);
+
+    // Now check if we were re-notified or there are still checkpoints
+    bool expected = true;
+    if (notified.compare_exchange_strong(expected, false)
+        || !queueEmpty()) {
+        // snooze for 0, essentially yielding and allowing other tasks a go
+        snooze(0.0);
+    }
+
+    return true;
+}
+
+void ActiveStreamCheckpointProcessorTask::wakeup() {
+    ExecutorPool::get()->wake(getId());
+}
+
+void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
+    {
+        LockHolder lh(workQueueLock);
+        queue.push_back(stream);
+    }
+
+    bool expected = false;
+    if (notified.compare_exchange_strong(expected, true)) {
+        wakeup();
+    }
+}
+
+void ActiveStream::nextCheckpointItemTask() {
     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
     if (!vbucket) {
         /* The entity deleting the vbucket must set stream to dead,
@@ -633,7 +699,6 @@ void ActiveStream::nextCheckpointItem() {
            point here */
         return;
     }
-
     bool mark = false;
     std::deque<queued_item> items;
     std::deque<MutationResponse*> mutations;
@@ -643,6 +708,7 @@ void ActiveStream::nextCheckpointItem() {
     }
 
     if (items.empty()) {
+        producer->notifyStreamReady(vb_, true);
         return;
     }
 
@@ -669,10 +735,12 @@ void ActiveStream::nextCheckpointItem() {
     if (mutations.empty()) {
         // If we only got checkpoint start or ends check to see if there are
         // any more snapshots before pausing the stream.
-        nextCheckpointItem();
+        nextCheckpointItemTask();
     } else {
         snapshot(mutations, mark);
     }
+    // ...notify...
+    producer->notifyStreamReady(vb_, true);
 }
 
 void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
@@ -680,6 +748,8 @@ void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
         return;
     }
 
+    LockHolder lh(streamMutex);
+
     if (isCurrentSnapshotCompleted()) {
         uint32_t flags = MARKER_FLAG_MEMORY;
         uint64_t snapStart = items.front()->getBySeqno();
@@ -1205,7 +1275,6 @@ process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed
                 break;
             case DCP_STREAM_END:
                 transitionState(STREAM_DEAD);
-                delete response;
                 break;
             default:
                 abort();
@@ -1216,6 +1285,7 @@ process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed
             break;
         }
 
+        delete response;
         buffer.messages.pop();
         buffer.items--;
         buffer.bytes -= message_bytes;
@@ -1245,7 +1315,6 @@ ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
             "number (%llu) greater than current snapshot end seqno (%llu)] "
             "being processed; Dropping the mutation!", consumer->logHeader(),
             vb_, mutation->getBySeqno(), cur_snapshot_end);
-        delete mutation;
         return ENGINE_ERANGE;
     }
 
@@ -1270,10 +1339,6 @@ ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
         handleSnapshotEnd(vb, mutation->getBySeqno());
     }
 
-    if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
-        delete mutation;
-    }
-
     return ret;
 }
 
@@ -1302,7 +1367,6 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
             "number (%llu) greater than current snapshot end seqno (%llu)] "
             "being processed; Dropping the deletion!", consumer->logHeader(),
             vb_, deletion->getBySeqno(), cur_snapshot_end);
-        delete deletion;
         return ENGINE_ERANGE;
     }
 
@@ -1331,10 +1395,6 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
         handleSnapshotEnd(vb, deletion->getBySeqno());
     }
 
-    if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
-        delete deletion;
-    }
-
     return ret;
 }
 
@@ -1376,12 +1436,10 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
             cur_snapshot_ack = true;
         }
     }
-    delete marker;
 }
 
 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
     engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
-    delete state;
 
     LockHolder lh (streamMutex);
     pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
index e94e472..d41774f 100644 (file)
@@ -156,13 +156,17 @@ private:
     uint64_t readyQueueMemory;
 };
 
+typedef RCPtr<Stream> stream_t;
+
+class ActiveStreamCheckpointProcessorTask;
+
 class ActiveStream : public Stream {
 public:
     ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
                  const std::string &name, uint32_t flags, uint32_t opaque,
                  uint16_t vb, uint64_t st_seqno, uint64_t en_seqno,
                  uint64_t vb_uuid, uint64_t snap_start_seqno,
-                 uint64_t snap_end_seqno);
+                 uint64_t snap_end_seqno, ExTask task);
 
     ~ActiveStream() {
         LockHolder lh(streamMutex);
@@ -205,6 +209,9 @@ public:
 
     const char* logHeader();
 
+    // Runs on ActiveStreamCheckpointProcessorTask
+    void nextCheckpointItemTask();
+
 private:
 
     void transitionState(stream_state_t newState);
@@ -221,7 +228,7 @@ private:
 
     DcpResponse* nextQueuedItem();
 
-    void nextCheckpointItem();
+    bool nextCheckpointItem();
 
     void snapshot(std::deque<MutationResponse*>& snapshot, bool mark);
 
@@ -259,6 +266,48 @@ private:
 
     //! Last snapshot end seqno sent to the DCP client
     uint64_t lastSentSnapEndSeqno;
+    ExTask checkpointCreatorTask;
+};
+
+
+class ActiveStreamCheckpointProcessorTask : public GlobalTask {
+public:
+    ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
+      : GlobalTask(&e, Priority::ActiveStreamCheckpointProcessor, INT_MAX, false),
+      notified(false),
+      iterationsBeforeYield(e.getConfiguration()
+                            .getDcpProducerSnapshotMarkerYieldLimit()) { }
+
+    std::string getDescription() {
+        std::string rv("Process checkpoint(s) for DCP producer");
+        return rv;
+    }
+
+    bool run();
+    void schedule(stream_t stream);
+    void wakeup();
+
+private:
+
+    stream_t queuePop() {
+        stream_t rval;
+        LockHolder lh(workQueueLock);
+        if (!queue.empty()) {
+            rval = queue.front();
+            queue.pop_front();
+        }
+        return rval;
+    }
+
+    bool queueEmpty() {
+        LockHolder lh(workQueueLock);
+        return queue.empty();
+    }
+
+    Mutex workQueueLock;
+    std::deque<stream_t> queue;
+    AtomicValue<bool> notified;
+    size_t iterationsBeforeYield;
 };
 
 class NotifierStream : public Stream {
@@ -358,4 +407,6 @@ private:
     } buffer;
 };
 
+typedef RCPtr<PassiveStream> passive_stream_t;
+
 #endif  // SRC_DCP_STREAM_H_
index 780940a..071f15f 100644 (file)
@@ -52,6 +52,7 @@ const Priority Priority::BackfillTaskPriority(BACKFILL_TASK_ID, 8);
 const Priority Priority::WorkLoadMonitorPriority(WORKLOAD_MONITOR_TASK_ID, 10);
 const Priority Priority::HTResizePriority(HT_RESIZER_ID, 211);
 const Priority Priority::TapResumePriority(TAP_RESUME_ID, 316);
+const Priority Priority::ActiveStreamCheckpointProcessor(ACTIVE_STREAM_CHKPT_PROCESSOR_ID, 5);
 
 const char *Priority::getTypeName(const type_id_t i) {
         switch (i) {
@@ -107,6 +108,8 @@ const char *Priority::getTypeName(const type_id_t i) {
                 return "pending_ops_tasks";
             case TAP_CONN_MGR_ID:
                 return "conn_manager_tasks";
+            case ACTIVE_STREAM_CHKPT_PROCESSOR_ID:
+                return "activestream_chkpt_processor_tasks";
             default: break;
         }
 
index a838480..f40c635 100644 (file)
@@ -51,6 +51,7 @@ typedef enum {
     HT_RESIZER_ID,
     PENDING_OPS_ID,
     TAP_CONN_MGR_ID,
+    ACTIVE_STREAM_CHKPT_PROCESSOR_ID,
 
     MAX_TYPE_ID // Keep this as the last enum value
 } type_id_t;
@@ -91,6 +92,7 @@ public:
     static const Priority HTResizePriority;
     static const Priority PendingOpsPriority;
     static const Priority TapConnMgrPriority;
+    static const Priority ActiveStreamCheckpointProcessor;
 
     bool operator==(const Priority &other) const {
         return other.getPriorityValue() == this->priority;