1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2015 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
21 #include "ep_engine.h"
23 #include "dcp/backfill-manager.h"
24 #include "dcp/backfill_disk.h"
25 #include "dcp/dcpconnmap.h"
26 #include "dcp/producer.h"
28 #include <phosphor/phosphor.h>
30 static const size_t sleepTime = 1;
32 class BackfillManagerTask : public GlobalTask {
34 BackfillManagerTask(EventuallyPersistentEngine& e,
35 std::weak_ptr<BackfillManager> mgr,
37 bool completeBeforeShutdown = false)
39 TaskId::BackfillManagerTask,
41 completeBeforeShutdown),
47 cb::const_char_buffer getDescription();
50 // A weak pointer to the backfill manager which owns this
51 // task. The manager is owned by the DcpProducer, but we need to
52 // give the BackfillManagerTask access to the manager as it runs
53 // concurrently in a different thread.
54 // If the manager is deleted (by the DcpProducer) then the
55 // ManagerTask simply cancels itself and stops running.
56 std::weak_ptr<BackfillManager> weak_manager;
59 bool BackfillManagerTask::run() {
60 TRACE_EVENT0("ep-engine/task", "BackFillManagerTask");
61 // Create a new shared_ptr to the manager for the duration of this
63 auto manager = weak_manager.lock();
65 // backfill manager no longer exists - cancel ourself and stop
71 backfill_status_t status = manager->backfill();
72 if (status == backfill_finished) {
74 } else if (status == backfill_snooze) {
78 if (engine->getEpStats().isShutdown) {
85 cb::const_char_buffer BackfillManagerTask::getDescription() {
86 return "Backfilling items for a DCP Connection";
89 BackfillManager::BackfillManager(EventuallyPersistentEngine& e)
90 : engine(e), managerTask(NULL) {
91 Configuration& config = e.getConfiguration();
93 scanBuffer.bytesRead = 0;
94 scanBuffer.itemsRead = 0;
95 scanBuffer.maxBytes = config.getDcpScanByteLimit();
96 scanBuffer.maxItems = config.getDcpScanItemLimit();
99 buffer.maxBytes = config.getDcpBackfillByteLimit();
100 buffer.nextReadSize = 0;
104 void BackfillManager::addStats(connection_t conn, ADD_STAT add_stat,
107 conn->addStat("backfill_buffer_bytes_read", buffer.bytesRead, add_stat, c);
108 conn->addStat("backfill_buffer_max_bytes", buffer.maxBytes, add_stat, c);
109 conn->addStat("backfill_buffer_full", buffer.full, add_stat, c);
110 conn->addStat("backfill_num_active", activeBackfills.size(), add_stat, c);
111 conn->addStat("backfill_num_snoozing", snoozingBackfills.size(), add_stat, c);
112 conn->addStat("backfill_num_pending", pendingBackfills.size(), add_stat, c);
115 BackfillManager::~BackfillManager() {
117 managerTask->cancel();
121 while (!activeBackfills.empty()) {
122 UniqueDCPBackfillPtr backfill = std::move(activeBackfills.front());
123 activeBackfills.pop_front();
125 engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
128 while (!snoozingBackfills.empty()) {
129 UniqueDCPBackfillPtr backfill =
130 std::move((snoozingBackfills.front()).second);
131 snoozingBackfills.pop_front();
133 engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
136 while (!pendingBackfills.empty()) {
137 UniqueDCPBackfillPtr backfill = std::move(pendingBackfills.front());
138 pendingBackfills.pop_front();
143 void BackfillManager::schedule(VBucket& vb,
144 const active_stream_t& stream,
148 UniqueDCPBackfillPtr backfill =
149 vb.createDCPBackfill(engine, stream, start, end);
150 if (engine.getDcpConnMap().canAddBackfillToActiveQ()) {
151 activeBackfills.push_back(std::move(backfill));
153 pendingBackfills.push_back(std::move(backfill));
156 if (managerTask && !managerTask->isdead()) {
157 ExecutorPool::get()->wake(managerTask->getId());
161 managerTask.reset(new BackfillManagerTask(engine, shared_from_this()));
162 ExecutorPool::get()->schedule(managerTask);
165 bool BackfillManager::bytesRead(uint32_t bytes) {
167 if (scanBuffer.itemsRead >= scanBuffer.maxItems) {
171 // Always allow an item to be backfilled if the scan buffer is empty,
172 // otherwise check to see if there is room for the item.
173 if (scanBuffer.bytesRead + bytes <= scanBuffer.maxBytes ||
174 scanBuffer.bytesRead == 0) {
175 scanBuffer.bytesRead += bytes;
177 /* Subsequent items for this backfill will be read in next run */
181 if (buffer.bytesRead == 0 || buffer.bytesRead + bytes <= buffer.maxBytes) {
182 buffer.bytesRead += bytes;
184 scanBuffer.bytesRead -= bytes;
186 buffer.nextReadSize = bytes;
190 scanBuffer.itemsRead++;
195 void BackfillManager::bytesSent(uint32_t bytes) {
197 if (bytes > buffer.bytesRead) {
198 throw std::invalid_argument("BackfillManager::bytesSent: bytes "
199 "(which is" + std::to_string(bytes) + ") is greater than "
200 "buffer.bytesRead (which is" + std::to_string(buffer.bytesRead) + ")");
202 buffer.bytesRead -= bytes;
205 uint32_t bufferSize = buffer.bytesRead;
206 bool canFitNext = buffer.maxBytes - bufferSize >= buffer.nextReadSize;
207 bool enoughCleared = bufferSize < (buffer.maxBytes * 3 / 4);
208 if (canFitNext && enoughCleared) {
209 buffer.nextReadSize = 0;
212 ExecutorPool::get()->wake(managerTask->getId());
218 backfill_status_t BackfillManager::backfill() {
219 std::unique_lock<std::mutex> lh(lock);
221 if (activeBackfills.empty() && snoozingBackfills.empty()
222 && pendingBackfills.empty()) {
224 return backfill_finished;
227 if (engine.getKVBucket()->isMemoryUsageTooHigh()) {
228 LOG(EXTENSION_LOG_NOTICE, "DCP backfilling task temporarily suspended "
229 "because the current memory usage is too high");
230 return backfill_snooze;
235 if (activeBackfills.empty()) {
236 return backfill_snooze;
240 // If the buffer is full check to make sure we don't have any backfills
241 // that no longer have active streams and remove them. This prevents an
242 // issue where we have dead backfills taking up buffer space.
243 std::list<UniqueDCPBackfillPtr> toDelete;
244 for (auto a_itr = activeBackfills.begin();
245 a_itr != activeBackfills.end();) {
246 if ((*a_itr)->isStreamDead()) {
248 toDelete.push_back(std::move(*a_itr));
249 a_itr = activeBackfills.erase(a_itr);
250 engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
257 bool reschedule = !toDelete.empty();
258 while (!toDelete.empty()) {
259 UniqueDCPBackfillPtr backfill = std::move(toDelete.front());
260 toDelete.pop_front();
262 return reschedule ? backfill_success : backfill_snooze;
265 UniqueDCPBackfillPtr backfill = std::move(activeBackfills.front());
266 activeBackfills.pop_front();
269 backfill_status_t status = backfill->run();
272 scanBuffer.bytesRead = 0;
273 scanBuffer.itemsRead = 0;
276 case backfill_success:
277 activeBackfills.push_back(std::move(backfill));
279 case backfill_finished:
281 engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
283 case backfill_snooze: {
284 uint16_t vbid = backfill->getVBucketId();
285 VBucketPtr vb = engine.getVBucket(vbid);
287 snoozingBackfills.push_back(
288 std::make_pair(ep_current_time(), std::move(backfill)));
291 LOG(EXTENSION_LOG_WARNING, "Deleting the backfill, as vbucket %d "
292 "seems to have been deleted!", vbid);
294 engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
300 return backfill_success;
303 void BackfillManager::moveToActiveQueue() {
304 // Order in below AND is important
305 while (!pendingBackfills.empty() &&
306 engine.getDcpConnMap().canAddBackfillToActiveQ()) {
307 activeBackfills.splice(activeBackfills.end(),
309 pendingBackfills.begin());
312 while (!snoozingBackfills.empty()) {
313 std::pair<rel_time_t, UniqueDCPBackfillPtr> snoozer =
314 std::move(snoozingBackfills.front());
315 snoozingBackfills.pop_front();
316 // If snoozing task is found to be sleeping for greater than
317 // allowed snoozetime, push into active queue
318 if (snoozer.first + sleepTime <= ep_current_time()) {
319 activeBackfills.push_back(std::move(snoozer.second));
321 // Push back the popped snoozing backfill
322 snoozingBackfills.push_back(std::move(snoozer));
328 void BackfillManager::wakeUpTask() {
331 ExecutorPool::get()->wake(managerTask->getId());