DefragmenterTask: Don't wait task to complete before shutdown
[ep-engine.git] / src / defragmenter.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "defragmenter.h"
19
20 #include <phosphor/phosphor.h>
21
22 #include "defragmenter_visitor.h"
23 #include "ep_engine.h"
24 #include "stored-value.h"
25
26 DefragmenterTask::DefragmenterTask(EventuallyPersistentEngine* e,
27                                    EPStats& stats_)
28   : GlobalTask(e, TaskId::DefragmenterTask, 0, false),
29     stats(stats_),
30     epstore_position(engine->getKVBucket()->startPosition()),
31     visitor(NULL) {
32 }
33
34 DefragmenterTask::~DefragmenterTask() {
35     delete visitor;
36 }
37
38 bool DefragmenterTask::run(void) {
39     TRACE_EVENT0("ep-engine/task", "DefragmenterTask");
40     if (engine->getConfiguration().isDefragmenterEnabled()) {
41         // Get our visitor. If we didn't finish the previous pass,
42         // then resume from where we last were, otherwise create a new visitor and
43         // reset the position.
44         if (visitor == NULL) {
45             visitor = new DefragmentVisitor(getAgeThreshold());
46             epstore_position = engine->getKVBucket()->startPosition();
47         }
48
49         // Print start status.
50         std::stringstream ss;
51         ss << to_string(getDescription()) << " for bucket '"
52            << engine->getName() << "'";
53         if (epstore_position == engine->getKVBucket()->startPosition()) {
54             ss << " starting. ";
55         } else {
56             ss << " resuming from " << epstore_position << ", ";
57             ss << visitor->getHashtablePosition() << ".";
58         }
59         ss << " Using chunk_duration=" << getChunkDurationMS() << " ms."
60            << " mem_used=" << stats.getTotalMemoryUsed()
61            << ", mapped_bytes=" << getMappedBytes();
62         LOG(EXTENSION_LOG_INFO, "%s", ss.str().c_str());
63
64         // Disable thread-caching (as we are about to defragment, and hence don't
65         // want any of the new Blobs in tcache).
66         ALLOCATOR_HOOKS_API* alloc_hooks = engine->getServerApi()->alloc_hooks;
67         bool old_tcache = alloc_hooks->enable_thread_cache(false);
68
69         // Prepare the visitor.
70         hrtime_t start = gethrtime();
71         hrtime_t deadline = start + (getChunkDurationMS() * 1000 * 1000);
72         visitor->setDeadline(deadline);
73         visitor->clearStats();
74
75         // Do it - set off the visitor.
76         epstore_position = engine->getKVBucket()->pauseResumeVisit(
77                                                             *visitor,
78                                                             epstore_position);
79         hrtime_t end = gethrtime();
80
81         // Defrag complete. Restore thread caching.
82         alloc_hooks->enable_thread_cache(old_tcache);
83
84         // Update stats
85         stats.defragNumMoved.fetch_add(visitor->getDefragCount());
86         stats.defragNumVisited.fetch_add(visitor->getVisitedCount());
87
88         // Release any free memory we now have in the allocator back to the OS.
89         // TODO: Benchmark this - is it necessary? How much of a slowdown does it
90         // add? How much memory does it return?
91         alloc_hooks->release_free_memory();
92
93         // Check if the visitor completed a full pass.
94         bool completed = (epstore_position ==
95                                     engine->getKVBucket()->endPosition());
96
97         // Print status.
98         ss.str("");
99         ss << to_string(getDescription()) << " for bucket '"
100            << engine->getName() << "'";
101         if (completed) {
102             ss << " finished.";
103         } else {
104             ss << " paused at position " << epstore_position << ".";
105         }
106         ss << " Took " << (end - start) / 1024 << " us."
107            << " moved " << visitor->getDefragCount() << "/"
108            << visitor->getVisitedCount() << " visited documents."
109            << " mem_used=" << stats.getTotalMemoryUsed()
110            << ", mapped_bytes=" << getMappedBytes()
111            << ". Sleeping for " << getSleepTime() << " seconds.";
112         LOG(EXTENSION_LOG_INFO, "%s", ss.str().c_str());
113
114         // Delete visitor if it finished.
115         if (completed) {
116             delete visitor;
117             visitor = NULL;
118         }
119     }
120
121     snooze(getSleepTime());
122     if (engine->getEpStats().isShutdown) {
123             return false;
124     }
125     return true;
126 }
127
128 void DefragmenterTask::stop(void) {
129     if (uid) {
130         ExecutorPool::get()->cancel(uid);
131     }
132 }
133
134 cb::const_char_buffer DefragmenterTask::getDescription() {
135     return "Memory defragmenter";
136 }
137
138 size_t DefragmenterTask::getSleepTime() const {
139     return engine->getConfiguration().getDefragmenterInterval();
140 }
141
142 size_t DefragmenterTask::getAgeThreshold() const {
143     return engine->getConfiguration().getDefragmenterAgeThreshold();
144 }
145
146 size_t DefragmenterTask::getChunkDurationMS() const {
147     return engine->getConfiguration().getDefragmenterChunkDuration();
148 }
149
150 size_t DefragmenterTask::getMappedBytes() {
151     ALLOCATOR_HOOKS_API* alloc_hooks = engine->getServerApi()->alloc_hooks;
152
153     allocator_stats stats = {0};
154     stats.ext_stats_size = alloc_hooks->get_extra_stats_size();
155     stats.ext_stats = new allocator_ext_stat[stats.ext_stats_size];
156     alloc_hooks->get_allocator_stats(&stats);
157
158     size_t mapped_bytes = stats.heap_size - stats.free_mapped_size -
159                           stats.free_unmapped_size;
160     delete[] stats.ext_stats;
161     return mapped_bytes;
162 }