DCP Backfill: Use size_t instead of uint32_t to record mem usage
[ep-engine.git] / src / dcp / backfill-manager.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "connmap.h"
21 #include "ep_engine.h"
22
23 #include "dcp/backfill-manager.h"
24 #include "dcp/backfill_disk.h"
25 #include "dcp/dcpconnmap.h"
26 #include "dcp/producer.h"
27
28 #include <phosphor/phosphor.h>
29
30 static const size_t sleepTime = 1;
31
32 class BackfillManagerTask : public GlobalTask {
33 public:
34     BackfillManagerTask(EventuallyPersistentEngine& e,
35                         std::weak_ptr<BackfillManager> mgr,
36                         double sleeptime = 0,
37                         bool completeBeforeShutdown = false)
38         : GlobalTask(&e,
39                      TaskId::BackfillManagerTask,
40                      sleeptime,
41                      completeBeforeShutdown),
42           weak_manager(mgr) {
43     }
44
45     bool run();
46
47     cb::const_char_buffer getDescription();
48
49 private:
50     // A weak pointer to the backfill manager which owns this
51     // task. The manager is owned by the DcpProducer, but we need to
52     // give the BackfillManagerTask access to the manager as it runs
53     // concurrently in a different thread.
54     // If the manager is deleted (by the DcpProducer) then the
55     // ManagerTask simply cancels itself and stops running.
56     std::weak_ptr<BackfillManager> weak_manager;
57 };
58
59 bool BackfillManagerTask::run() {
60     TRACE_EVENT0("ep-engine/task", "BackFillManagerTask");
61     // Create a new shared_ptr to the manager for the duration of this
62     // execution.
63     auto manager = weak_manager.lock();
64     if (!manager) {
65         // backfill manager no longer exists - cancel ourself and stop
66         // running.
67         cancel();
68         return false;
69     }
70
71     backfill_status_t status = manager->backfill();
72     if (status == backfill_finished) {
73         return false;
74     } else if (status == backfill_snooze) {
75         snooze(sleepTime);
76     }
77
78     if (engine->getEpStats().isShutdown) {
79         return false;
80     }
81
82     return true;
83 }
84
85 cb::const_char_buffer BackfillManagerTask::getDescription() {
86     return "Backfilling items for a DCP Connection";
87 }
88
89 BackfillManager::BackfillManager(EventuallyPersistentEngine& e)
90     : engine(e), managerTask(NULL) {
91     Configuration& config = e.getConfiguration();
92
93     scanBuffer.bytesRead = 0;
94     scanBuffer.itemsRead = 0;
95     scanBuffer.maxBytes = config.getDcpScanByteLimit();
96     scanBuffer.maxItems = config.getDcpScanItemLimit();
97
98     buffer.bytesRead = 0;
99     buffer.maxBytes = config.getDcpBackfillByteLimit();
100     buffer.nextReadSize = 0;
101     buffer.full = false;
102 }
103
104 void BackfillManager::addStats(connection_t conn, ADD_STAT add_stat,
105                                const void *c) {
106     LockHolder lh(lock);
107     conn->addStat("backfill_buffer_bytes_read", buffer.bytesRead, add_stat, c);
108     conn->addStat("backfill_buffer_max_bytes", buffer.maxBytes, add_stat, c);
109     conn->addStat("backfill_buffer_full", buffer.full, add_stat, c);
110     conn->addStat("backfill_num_active", activeBackfills.size(), add_stat, c);
111     conn->addStat("backfill_num_snoozing", snoozingBackfills.size(), add_stat, c);
112     conn->addStat("backfill_num_pending", pendingBackfills.size(), add_stat, c);
113 }
114
115 BackfillManager::~BackfillManager() {
116     if (managerTask) {
117         managerTask->cancel();
118         managerTask.reset();
119     }
120
121     while (!activeBackfills.empty()) {
122         UniqueDCPBackfillPtr backfill = std::move(activeBackfills.front());
123         activeBackfills.pop_front();
124         backfill->cancel();
125         engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
126     }
127
128     while (!snoozingBackfills.empty()) {
129         UniqueDCPBackfillPtr backfill =
130                 std::move((snoozingBackfills.front()).second);
131         snoozingBackfills.pop_front();
132         backfill->cancel();
133         engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
134     }
135
136     while (!pendingBackfills.empty()) {
137         UniqueDCPBackfillPtr backfill = std::move(pendingBackfills.front());
138         pendingBackfills.pop_front();
139         backfill->cancel();
140     }
141 }
142
143 void BackfillManager::schedule(VBucket& vb,
144                                const active_stream_t& stream,
145                                uint64_t start,
146                                uint64_t end) {
147     LockHolder lh(lock);
148     UniqueDCPBackfillPtr backfill =
149             vb.createDCPBackfill(engine, stream, start, end);
150     if (engine.getDcpConnMap().canAddBackfillToActiveQ()) {
151         activeBackfills.push_back(std::move(backfill));
152     } else {
153         pendingBackfills.push_back(std::move(backfill));
154     }
155
156     if (managerTask && !managerTask->isdead()) {
157         ExecutorPool::get()->wake(managerTask->getId());
158         return;
159     }
160
161     managerTask.reset(new BackfillManagerTask(engine, shared_from_this()));
162     ExecutorPool::get()->schedule(managerTask);
163 }
164
165 bool BackfillManager::bytesRead(size_t bytes) {
166     LockHolder lh(lock);
167     if (scanBuffer.itemsRead >= scanBuffer.maxItems) {
168         return false;
169     }
170
171     // Always allow an item to be backfilled if the scan buffer is empty,
172     // otherwise check to see if there is room for the item.
173     if (scanBuffer.bytesRead + bytes <= scanBuffer.maxBytes ||
174         scanBuffer.bytesRead == 0) {
175         scanBuffer.bytesRead += bytes;
176     } else {
177         /* Subsequent items for this backfill will be read in next run */
178         return false;
179     }
180
181     if (buffer.bytesRead == 0 || buffer.bytesRead + bytes <= buffer.maxBytes) {
182         buffer.bytesRead += bytes;
183     } else {
184         scanBuffer.bytesRead -= bytes;
185         buffer.full = true;
186         buffer.nextReadSize = bytes;
187         return false;
188     }
189
190     scanBuffer.itemsRead++;
191
192     return true;
193 }
194
195 void BackfillManager::bytesSent(size_t bytes) {
196     LockHolder lh(lock);
197     if (bytes > buffer.bytesRead) {
198         throw std::invalid_argument("BackfillManager::bytesSent: bytes "
199                 "(which is" + std::to_string(bytes) + ") is greater than "
200                 "buffer.bytesRead (which is" + std::to_string(buffer.bytesRead) + ")");
201     }
202     buffer.bytesRead -= bytes;
203
204     if (buffer.full) {
205         uint32_t bufferSize = buffer.bytesRead;
206         bool canFitNext = buffer.maxBytes - bufferSize >= buffer.nextReadSize;
207         bool enoughCleared = bufferSize < (buffer.maxBytes * 3 / 4);
208         if (canFitNext && enoughCleared) {
209             buffer.nextReadSize = 0;
210             buffer.full = false;
211             if (managerTask) {
212                 ExecutorPool::get()->wake(managerTask->getId());
213             }
214         }
215     }
216 }
217
218 backfill_status_t BackfillManager::backfill() {
219     std::unique_lock<std::mutex> lh(lock);
220
221     if (activeBackfills.empty() && snoozingBackfills.empty()
222         && pendingBackfills.empty()) {
223         managerTask.reset();
224         return backfill_finished;
225     }
226
227     if (engine.getKVBucket()->isMemoryUsageTooHigh()) {
228         LOG(EXTENSION_LOG_NOTICE, "DCP backfilling task temporarily suspended "
229             "because the current memory usage is too high");
230         return backfill_snooze;
231     }
232
233     moveToActiveQueue();
234
235     if (activeBackfills.empty()) {
236         return backfill_snooze;
237     }
238
239     if (buffer.full) {
240         // If the buffer is full check to make sure we don't have any backfills
241         // that no longer have active streams and remove them. This prevents an
242         // issue where we have dead backfills taking up buffer space.
243         std::list<UniqueDCPBackfillPtr> toDelete;
244         for (auto a_itr = activeBackfills.begin();
245              a_itr != activeBackfills.end();) {
246             if ((*a_itr)->isStreamDead()) {
247                 (*a_itr)->cancel();
248                 toDelete.push_back(std::move(*a_itr));
249                 a_itr = activeBackfills.erase(a_itr);
250                 engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
251             } else {
252                 ++a_itr;
253             }
254         }
255
256         lh.unlock();
257         bool reschedule = !toDelete.empty();
258         while (!toDelete.empty()) {
259             UniqueDCPBackfillPtr backfill = std::move(toDelete.front());
260             toDelete.pop_front();
261         }
262         return reschedule ? backfill_success : backfill_snooze;
263     }
264
265     UniqueDCPBackfillPtr backfill = std::move(activeBackfills.front());
266     activeBackfills.pop_front();
267
268     lh.unlock();
269     backfill_status_t status = backfill->run();
270     lh.lock();
271
272     scanBuffer.bytesRead = 0;
273     scanBuffer.itemsRead = 0;
274
275     switch (status) {
276         case backfill_success:
277             activeBackfills.push_back(std::move(backfill));
278             break;
279         case backfill_finished:
280             lh.unlock();
281             engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
282             break;
283         case backfill_snooze: {
284             uint16_t vbid = backfill->getVBucketId();
285             VBucketPtr vb = engine.getVBucket(vbid);
286             if (vb) {
287                 snoozingBackfills.push_back(
288                         std::make_pair(ep_current_time(), std::move(backfill)));
289             } else {
290                 lh.unlock();
291                 LOG(EXTENSION_LOG_WARNING, "Deleting the backfill, as vbucket %d "
292                     "seems to have been deleted!", vbid);
293                 backfill->cancel();
294                 engine.getDcpConnMap().decrNumActiveSnoozingBackfills();
295             }
296             break;
297         }
298     }
299
300     return backfill_success;
301 }
302
303 void BackfillManager::moveToActiveQueue() {
304     // Order in below AND is important
305     while (!pendingBackfills.empty() &&
306            engine.getDcpConnMap().canAddBackfillToActiveQ()) {
307         activeBackfills.splice(activeBackfills.end(),
308                                pendingBackfills,
309                                pendingBackfills.begin());
310     }
311
312     while (!snoozingBackfills.empty()) {
313         std::pair<rel_time_t, UniqueDCPBackfillPtr> snoozer =
314                 std::move(snoozingBackfills.front());
315         snoozingBackfills.pop_front();
316         // If snoozing task is found to be sleeping for greater than
317         // allowed snoozetime, push into active queue
318         if (snoozer.first + sleepTime <= ep_current_time()) {
319             activeBackfills.push_back(std::move(snoozer.second));
320         } else {
321             // Push back the popped snoozing backfill
322             snoozingBackfills.push_back(std::move(snoozer));
323             break;
324         }
325     }
326 }
327
328 void BackfillManager::wakeUpTask() {
329     LockHolder lh(lock);
330     if (managerTask) {
331         ExecutorPool::get()->wake(managerTask->getId());
332     }
333 }