MB-16181: Filters with deleted collections
[ep-engine.git] / src / collections / vbucket_filter.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "collections/vbucket_filter.h"
19 #include "collections/collections_dockey.h"
20 #include "collections/filter.h"
21 #include "collections/vbucket_manifest.h"
22 #include "dcp/response.h"
23 #include "statwriter.h"
24
25 #include <platform/checked_snprintf.h>
26 #include <platform/make_unique.h>
27
28 Collections::VB::Filter::Filter(const Collections::Filter& filter,
29                                 const Collections::VB::Manifest& manifest)
30     : defaultAllowed(false),
31       passthrough(filter.isPassthrough()),
32       systemEventsAllowed(filter.allowSystemEvents()) {
33     // Don't build a filter if all documents are allowed
34     if (passthrough) {
35         defaultAllowed = true;
36         return;
37     }
38
39     // Lock for reading and create a VB filter
40     auto rh = manifest.lock();
41     separator = rh.getSeparator();
42     if (filter.allowDefaultCollection()) {
43         if (rh.doesDefaultCollectionExist()) {
44             defaultAllowed = true;
45         } else {
46             // The VB::Manifest no longer has $default so don't filter it
47             LOG(EXTENSION_LOG_NOTICE,
48                 "VB::Filter::Filter: dropping $default as it's not in the "
49                 "VB::Manifest");
50         }
51     }
52
53     for (const auto& c : filter.getFilter()) {
54         if (rh.isCollectionOpen({c.data(), c.size()})) {
55             auto m = std::make_unique<std::string>(c);
56             cb::const_char_buffer b{m->data(), m->size()};
57             this->filter.emplace(b, std::move(m));
58         } else {
59             // The VB::Manifest doesn't gave the collection, or the collection
60             // is deleted
61             LOG(EXTENSION_LOG_NOTICE,
62                 "VB::Filter::Filter: dropping collection:%s as it's not open",
63                 c.c_str());
64         }
65     }
66
67     // All collections dropped
68     if (this->filter.empty() && !defaultAllowed) {
69         throw std::invalid_argument("VB::Filter::Filter: nothing to filter");
70     }
71 }
72
73 bool Collections::VB::Filter::allow(::DocKey key) const {
74     // passthrough, everything is allowed.
75     if (passthrough) {
76         return true;
77     }
78
79     // The presence of $default is a simple check against defaultAllowed
80     if (key.getDocNamespace() == DocNamespace::DefaultCollection &&
81         defaultAllowed) {
82         return true;
83     } else if (key.getDocNamespace() == DocNamespace::Collections &&
84                !filter.empty()) {
85         // Collections require a look up in the filter
86         const auto cKey = Collections::DocKey::make(key, separator);
87         return filter.count({reinterpret_cast<const char*>(cKey.data()),
88                              cKey.getCollectionLen()}) > 0;
89     } else if (key.getDocNamespace() == DocNamespace::System) {
90         // ::allow should only be called for the Default or Collection namespace
91         throw std::invalid_argument(
92                 "Collections::VB::Filter::allow namespace system invalid:" +
93                 std::to_string(int(key.getDocNamespace())));
94     }
95     return false;
96 }
97
98 bool Collections::VB::Filter::remove(cb::const_char_buffer collection) {
99     if (passthrough) {
100         // passthrough can never be empty, so return false
101         return false;
102     }
103
104     if (collection == DefaultCollectionIdentifier) {
105         defaultAllowed = false;
106     } else {
107         filter.erase(collection);
108     }
109
110     // If the map is empty and the defaultCollection isn't present, we're empty
111     return filter.empty() && !defaultAllowed;
112 }
113
114 bool Collections::VB::Filter::allowSystemEvent(
115         SystemEventMessage* response) const {
116     switch (response->getSystemEvent()) {
117     case SystemEvent::CreateCollection:
118     case SystemEvent::BeginDeleteCollection: {
119         if ((response->getKey() == DefaultCollectionIdentifier &&
120              defaultAllowed) ||
121             passthrough) {
122             return true;
123         } else {
124             // These events are sent only if they relate to a collection in the
125             // filter
126             return filter.count(response->getKey()) > 0;
127         }
128     }
129     case SystemEvent::CollectionsSeparatorChanged:
130         // The separator changed event is sent if system events are allowed
131         return systemEventsAllowed;
132     case SystemEvent::DeleteCollectionHard:
133     case SystemEvent::DeleteCollectionSoft:
134         break;
135     }
136     throw std::invalid_argument(
137             "SystemEventReplicate::filter event:" +
138             std::to_string(int(response->getSystemEvent())) +
139             " should not be present in SystemEventMessage");
140 }
141
142 void Collections::VB::Filter::addStats(ADD_STAT add_stat,
143                                        const void* c,
144                                        const std::string& prefix,
145                                        uint16_t vb) const {
146     try {
147         const int bsize = 1024;
148         char buffer[bsize];
149         checked_snprintf(
150                 buffer, bsize, "%s:filter_%d_passthrough", prefix.c_str(), vb);
151         add_casted_stat(buffer, passthrough, add_stat, c);
152
153         checked_snprintf(buffer,
154                          bsize,
155                          "%s:filter_%d_default_allowed",
156                          prefix.c_str(),
157                          vb);
158         add_casted_stat(buffer, defaultAllowed, add_stat, c);
159
160         checked_snprintf(
161                 buffer, bsize, "%s:filter_%d_size", prefix.c_str(), vb);
162         add_casted_stat(buffer, filter.size(), add_stat, c);
163     } catch (std::exception& error) {
164         LOG(EXTENSION_LOG_WARNING,
165             "Collections::VB::Filter::addStats: %s:vb:%" PRIu16
166             " exception.what:%s",
167             prefix.c_str(),
168             vb,
169             error.what());
170     }
171 }
172
173 void Collections::VB::Filter::dump() const {
174     std::cerr << *this << std::endl;
175 }
176
177 std::ostream& Collections::VB::operator<<(
178         std::ostream& os, const Collections::VB::Filter& filter) {
179     os << "VBucket::Filter"
180        << ": defaultAllowed:" << filter.defaultAllowed
181        << ", passthrough:" << filter.passthrough
182        << ", systemEventsAllowed:" << filter.systemEventsAllowed;
183
184     if (filter.separator.empty()) {
185         os << ", separator empty";
186     } else {
187         os << ", separator:" << filter.separator;
188     }
189
190     os << ", filter.size:" << filter.filter.size() << std::endl;
191     for (auto& m : filter.filter) {
192         os << *m.second << std::endl;
193     }
194     return os;
195 }