62135b0142f62147f72a31a8e128c9133649851f
[ep-engine.git] / src / collections / vbucket_filter.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "collections/vbucket_filter.h"
19 #include "collections/collections_dockey.h"
20 #include "collections/filter.h"
21 #include "collections/vbucket_manifest.h"
22 #include "dcp/response.h"
23 #include "statwriter.h"
24
25 #include <platform/checked_snprintf.h>
26 #include <platform/make_unique.h>
27
28 Collections::VB::Filter::Filter(const Collections::Filter& filter,
29                                 const Collections::VB::Manifest& manifest)
30     : defaultAllowed(false),
31       passthrough(filter.isPassthrough()),
32       systemEventsAllowed(filter.allowSystemEvents()) {
33     // Don't build a filter if all documents are allowed
34     if (passthrough) {
35         defaultAllowed = true;
36         return;
37     }
38
39     // Lock for reading and create a VB filter
40     auto rh = manifest.lock();
41     separator = rh.getSeparator();
42     if (filter.allowDefaultCollection()) {
43         if (rh.doesDefaultCollectionExist()) {
44             defaultAllowed = true;
45         } else {
46             // The VB::Manifest no longer has $default so don't filter it
47             LOG(EXTENSION_LOG_NOTICE,
48                 "VB::Filter::Filter: dropping $default as it's not in the "
49                 "VB::Manifest");
50         }
51     }
52
53     for (const auto& c : filter.getFilter()) {
54         if (rh.doesCollectionExist({c.data(), c.size()})) {
55             auto m = std::make_unique<std::string>(c);
56             cb::const_char_buffer b{m->data(), m->size()};
57             this->filter.emplace(b, std::move(m));
58         } else {
59             // The VB::Manifest no longer has the collection so we won't filter
60             // it
61             LOG(EXTENSION_LOG_NOTICE,
62                 "VB::Filter::Filter: dropping collection:%s as it's not in the "
63                 "VB::Manifest",
64                 c.c_str());
65         }
66     }
67 }
68
69 bool Collections::VB::Filter::allow(::DocKey key) const {
70     // passthrough, everything is allowed.
71     if (passthrough) {
72         return true;
73     }
74
75     // The presence of $default is a simple check against defaultAllowed
76     if (key.getDocNamespace() == DocNamespace::DefaultCollection &&
77         defaultAllowed) {
78         return true;
79     } else if (key.getDocNamespace() == DocNamespace::Collections &&
80                !filter.empty()) {
81         // Collections require a look up in the filter
82         const auto cKey = Collections::DocKey::make(key, separator);
83         return filter.count({reinterpret_cast<const char*>(cKey.data()),
84                              cKey.getCollectionLen()}) > 0;
85     } else if (key.getDocNamespace() == DocNamespace::System) {
86         // ::allow should only be called for the Default or Collection namespace
87         throw std::invalid_argument(
88                 "Collections::VB::Filter::allow namespace system invalid:" +
89                 std::to_string(int(key.getDocNamespace())));
90     }
91     return false;
92 }
93
94 bool Collections::VB::Filter::remove(cb::const_char_buffer collection) {
95     if (passthrough) {
96         // passthrough can never be empty, so return false
97         return false;
98     }
99
100     if (collection == DefaultCollectionIdentifier) {
101         defaultAllowed = false;
102     } else {
103         filter.erase(collection);
104     }
105
106     // If the map is empty and the defaultCollection isn't present, we're empty
107     return filter.empty() && !defaultAllowed;
108 }
109
110 bool Collections::VB::Filter::allowSystemEvent(
111         SystemEventMessage* response) const {
112     switch (response->getSystemEvent()) {
113     case SystemEvent::CreateCollection:
114     case SystemEvent::BeginDeleteCollection: {
115         if ((response->getKey() == DefaultCollectionIdentifier &&
116              defaultAllowed) ||
117             passthrough) {
118             return true;
119         } else {
120             // These events are sent only if they relate to a collection in the
121             // filter
122             return filter.count(response->getKey()) > 0;
123         }
124     }
125     case SystemEvent::CollectionsSeparatorChanged:
126         // The separator changed event is sent if system events are allowed
127         return systemEventsAllowed;
128     case SystemEvent::DeleteCollectionHard:
129     case SystemEvent::DeleteCollectionSoft:
130         break;
131     }
132     throw std::invalid_argument(
133             "SystemEventReplicate::filter event:" +
134             std::to_string(int(response->getSystemEvent())) +
135             " should not be present in SystemEventMessage");
136 }
137
138 void Collections::VB::Filter::addStats(ADD_STAT add_stat,
139                                        const void* c,
140                                        const std::string& prefix,
141                                        uint16_t vb) const {
142     try {
143         const int bsize = 1024;
144         char buffer[bsize];
145         checked_snprintf(
146                 buffer, bsize, "%s:filter_%d_passthrough", prefix.c_str(), vb);
147         add_casted_stat(buffer, passthrough, add_stat, c);
148
149         checked_snprintf(buffer,
150                          bsize,
151                          "%s:filter_%d_default_allowed",
152                          prefix.c_str(),
153                          vb);
154         add_casted_stat(buffer, defaultAllowed, add_stat, c);
155
156         checked_snprintf(
157                 buffer, bsize, "%s:filter_%d_size", prefix.c_str(), vb);
158         add_casted_stat(buffer, filter.size(), add_stat, c);
159     } catch (std::exception& error) {
160         LOG(EXTENSION_LOG_WARNING,
161             "Collections::VB::Filter::addStats: %s:vb:%" PRIu16
162             " exception.what:%s",
163             prefix.c_str(),
164             vb,
165             error.what());
166     }
167 }
168
169 void Collections::VB::Filter::dump() const {
170     std::cerr << *this << std::endl;
171 }
172
173 std::ostream& Collections::VB::operator<<(
174         std::ostream& os, const Collections::VB::Filter& filter) {
175     os << "VBucket::Filter"
176        << ": defaultAllowed:" << filter.defaultAllowed
177        << ", passthrough:" << filter.passthrough
178        << ", systemEventsAllowed:" << filter.systemEventsAllowed;
179
180     if (filter.separator.empty()) {
181         os << ", separator empty";
182     } else {
183         os << ", separator:" << filter.separator;
184     }
185
186     os << ", filter.size:" << filter.filter.size() << std::endl;
187     for (auto& m : filter.filter) {
188         os << *m.second << std::endl;
189     }
190     return os;
191 }