MB-23875: Remove old (now unused) gat/touch impl
[ep-engine.git] / src / ep_engine.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21
22 #include "backfill.h"
23 #include "common.h"
24 #include "connmap.h"
25 #include "dcp/consumer.h"
26 #include "dcp/dcpconnmap.h"
27 #include "dcp/flow-control-manager.h"
28 #include "dcp/producer.h"
29 #include "ep_bucket.h"
30 #include "ep_vb.h"
31 #include "ephemeral_bucket.h"
32 #include "failover-table.h"
33 #include "flusher.h"
34 #include "htresizer.h"
35 #include "logger.h"
36 #include "memory_tracker.h"
37 #include "replicationthrottle.h"
38 #include "stats-info.h"
39 #define STATWRITER_NAMESPACE core_engine
40 #include "statwriter.h"
41 #undef STATWRITER_NAMESPACE
42 #include "string_utils.h"
43 #include "tapconnmap.h"
44 #include "vb_count_visitor.h"
45 #include "warmup.h"
46
47 #include <JSON_checker.h>
48 #include <cJSON_utils.h>
49 #include <memcached/engine.h>
50 #include <memcached/extension.h>
51 #include <memcached/protocol_binary.h>
52 #include <memcached/server_api.h>
53 #include <memcached/util.h>
54 #include <platform/cb_malloc.h>
55 #include <platform/checked_snprintf.h>
56 #include <platform/make_unique.h>
57 #include <platform/platform.h>
58 #include <platform/processclock.h>
59 #include <xattr/utils.h>
60
61 #include <cstdio>
62 #include <cstring>
63 #include <fcntl.h>
64 #include <fstream>
65 #include <iostream>
66 #include <limits>
67 #include <mutex>
68 #include <stdarg.h>
69 #include <string>
70 #include <vector>
71
72 static size_t percentOf(size_t val, double percent) {
73     return static_cast<size_t>(static_cast<double>(val) * percent);
74 }
75
76 struct EPHandleReleaser {
77     void operator()(EventuallyPersistentEngine*) {
78         ObjectRegistry::onSwitchThread(nullptr);
79     }
80 };
81
82 using EPHandle = std::unique_ptr<EventuallyPersistentEngine, EPHandleReleaser>;
83
84 /**
85  * Helper function to acquire a handle to the engine which allows access to
86  * the engine while the handle is in scope.
87  * @param handle pointer to the engine
88  * @return EPHandle which is a unique_ptr to an EventuallyPersistentEngine
89  * with a custom deleter (EPHandleReleaser) which performs the required
90  * ObjectRegistry release.
91  */
92
93 static inline EPHandle acquireEngine(ENGINE_HANDLE* handle) {
94     auto ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
95     ObjectRegistry::onSwitchThread(ret);
96
97     return EPHandle(ret);
98 }
99
100 /**
101  * Call the response callback and return the appropriate value so that
102  * the core knows what to do..
103  */
104 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
105                                       uint16_t keylen,
106                                       const void *ext, uint8_t extlen,
107                                       const void *body, uint32_t bodylen,
108                                       uint8_t datatype, uint16_t status,
109                                       uint64_t cas, const void *cookie)
110 {
111     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
112     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
113     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
114                  status, cas, cookie)) {
115         rv = ENGINE_SUCCESS;
116     }
117     ObjectRegistry::onSwitchThread(e);
118     return rv;
119 }
120
121 template <typename T>
122 static void validate(T v, T l, T h) {
123     if (v < l || v > h) {
124         throw std::runtime_error("Value out of range.");
125     }
126 }
127
128
129 static void checkNumeric(const char* str) {
130     int i = 0;
131     if (str[0] == '-') {
132         i++;
133     }
134     for (; str[i]; i++) {
135         using namespace std;
136         if (!isdigit(str[i])) {
137             throw std::runtime_error("Value is not numeric");
138         }
139     }
140 }
141
142 static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle) {
143     return acquireEngine(handle)->getInfo();
144 }
145
146 static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
147                                        const char* config_str) {
148     return acquireEngine(handle)->initialize(config_str);
149 }
150
151 static void EvpDestroy(ENGINE_HANDLE* handle, const bool force) {
152     auto eng = acquireEngine(handle);
153     eng->destroy(force);
154     delete eng.get();
155 }
156
157 static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
158                                          const void* cookie,
159                                          item** itm,
160                                          const DocKey& key,
161                                          const size_t nbytes,
162                                          const int flags,
163                                          const rel_time_t exptime,
164                                          uint8_t datatype,
165                                          uint16_t vbucket) {
166     if (!mcbp::datatype::is_valid(datatype)) {
167         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
168             " (ItemAllocate)");
169         return ENGINE_EINVAL;
170     }
171
172     return acquireEngine(handle)->itemAllocate(itm,
173                                                key,
174                                                nbytes,
175                                                0, // No privileged bytes
176                                                flags,
177                                                exptime,
178                                                datatype,
179                                                vbucket);
180 }
181
182 static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
183                            const item* itm, item_info *itm_info);
184 static void EvpItemRelease(ENGINE_HANDLE* handle, const void *cookie,
185                            item* itm);
186
187
188 static std::pair<cb::unique_item_ptr, item_info> EvpItemAllocateEx(ENGINE_HANDLE* handle,
189                                                                    const void* cookie,
190                                                                    const DocKey& key,
191                                                                    const size_t nbytes,
192                                                                    const size_t priv_nbytes,
193                                                                    const int flags,
194                                                                    const rel_time_t exptime,
195                                                                    uint8_t datatype,
196                                                                    uint16_t vbucket) {
197
198     item* it = nullptr;
199     auto err = acquireEngine(handle)->itemAllocate(
200             &it, key, nbytes, priv_nbytes, flags, exptime, datatype, vbucket);
201
202     if (err != ENGINE_SUCCESS) {
203         throw cb::engine_error(cb::engine_errc(err),
204                                "EvpItemAllocateEx: failed to allocate memory");
205     }
206
207     item_info info;
208     if (!EvpGetItemInfo(handle, cookie, it, &info)) {
209         EvpItemRelease(handle, cookie, it);
210         throw cb::engine_error(cb::engine_errc::failed,
211                                "EvpItemAllocateEx: EvpGetItemInfo failed");
212     }
213
214     return std::make_pair(cb::unique_item_ptr{it, cb::ItemDeleter{handle}},
215                           info);
216 }
217
218 static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
219                                        const void* cookie,
220                                        const DocKey& key,
221                                        uint64_t* cas,
222                                        uint16_t vbucket,
223                                        mutation_descr_t* mut_info) {
224     if (!cas) {
225         LOG(EXTENSION_LOG_WARNING,
226             "EvpItemDelete(): cas ptr passed is null for vb: %" PRIu16,
227             vbucket);
228         return ENGINE_EINVAL;
229     }
230     return acquireEngine(handle)->itemDelete(
231             cookie, key, *cas, vbucket, nullptr, mut_info);
232 }
233
234 static void EvpItemRelease(ENGINE_HANDLE* handle,
235                            const void* cookie,
236                            item* itm) {
237     acquireEngine(handle)->itemRelease(cookie, itm);
238 }
239
240 static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
241                                 const void* cookie,
242                                 item** itm,
243                                 const DocKey& key,
244                                 uint16_t vbucket,
245                                 DocStateFilter documentStateFilter) {
246     get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
247                                                        HONOR_STATES |
248                                                        TRACK_REFERENCE |
249                                                        DELETE_TEMP |
250                                                        HIDE_LOCKED_CAS |
251                                                        TRACK_STATISTICS);
252
253     switch (documentStateFilter) {
254     case DocStateFilter::Alive:
255         break;
256     case DocStateFilter::Deleted:
257         // MB-23640 was caused by this bug as the frontend asked for
258         // Alive and Deleted documents. The internals don't have a
259         // way of requesting just deleted documents, and luckily for
260         // us no part of our code is using this yet. Return an error
261         // if anyone start using it
262         return ENGINE_ENOTSUP;
263     case DocStateFilter::AliveOrDeleted:
264         options = static_cast<get_options_t>(options | GET_DELETED_VALUE);
265         break;
266     }
267
268     return acquireEngine(handle)->get(cookie, itm, key, vbucket, options);
269 }
270
271 static cb::EngineErrorItemPair EvpGetIf(ENGINE_HANDLE* handle,
272                                         const void* cookie,
273                                         const DocKey& key,
274                                         uint16_t vbucket,
275                                         std::function<bool(
276                                             const item_info&)> filter) {
277     return acquireEngine(handle)->get_if(cookie, key, vbucket, filter);
278 }
279
280 static cb::EngineErrorItemPair EvpGetAndTouch(ENGINE_HANDLE* handle,
281                                               const void* cookie,
282                                               const DocKey& key,
283                                               uint16_t vbucket,
284                                               uint32_t expiry_time) {
285     return acquireEngine(handle)->get_and_touch(cookie, key, vbucket,
286                                                 expiry_time);
287 }
288
289 static ENGINE_ERROR_CODE EvpGetLocked(ENGINE_HANDLE* handle,
290                                       const void* cookie,
291                                       item** itm,
292                                       const DocKey& key,
293                                       uint16_t vbucket,
294                                       uint32_t lock_timeout) {
295     return acquireEngine(handle)->get_locked(
296             cookie, itm, key, vbucket, lock_timeout);
297 }
298
299 static ENGINE_ERROR_CODE EvpUnlock(ENGINE_HANDLE* handle,
300                                    const void* cookie,
301                                    const DocKey& key,
302                                    uint16_t vbucket,
303                                    uint64_t cas) {
304     return acquireEngine(handle)->unlock(cookie, key, vbucket, cas);
305 }
306
307 static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
308                                      const void* cookie,
309                                      const char* stat_key,
310                                      int nkey,
311                                      ADD_STAT add_stat) {
312     return acquireEngine(handle)->getStats(cookie, stat_key, nkey, add_stat);
313 }
314
315 static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
316                                   const void* cookie,
317                                   item* itm,
318                                   uint64_t* cas,
319                                   ENGINE_STORE_OPERATION operation,
320                                   DocumentState document_state) {
321     auto engine = acquireEngine(handle);
322
323     if (document_state == DocumentState::Deleted) {
324         Item* item = static_cast<Item*>(itm);
325         item->setDeleted();
326     }
327
328     return engine->store(cookie, itm, cas, operation);
329 }
330
331 static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
332                                   const void* cookie) {
333     return acquireEngine(handle)->flush(cookie);
334 }
335
336 static void EvpResetStats(ENGINE_HANDLE* handle, const void*) {
337     acquireEngine(handle)->resetStats();
338 }
339
340 protocol_binary_response_status EventuallyPersistentEngine::setTapParam(
341         const char* keyz, const char* valz, std::string& msg) {
342     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
343
344     try {
345         if (strcmp(keyz, "tap_keepalive") == 0) {
346             int v = std::stoi(valz);
347             validate(v, 0, MAX_TAP_KEEP_ALIVE);
348             getConfiguration().requirementsMetOrThrow("tap_keepalive");
349             setTapKeepAlive(static_cast<uint32_t>(v));
350         } else if (strcmp(keyz, "replication_throttle_threshold") == 0) {
351             getConfiguration().setReplicationThrottleThreshold(
352                     std::stoull(valz));
353         } else if (strcmp(keyz, "replication_throttle_queue_cap") == 0) {
354             getConfiguration().setReplicationThrottleQueueCap(std::stoll(valz));
355         } else if (strcmp(keyz, "replication_throttle_cap_pcnt") == 0) {
356             getConfiguration().setReplicationThrottleCapPcnt(std::stoull(valz));
357         } else {
358             msg = "Unknown config param";
359             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
360         }
361         // Handles exceptions thrown by the standard
362         // library stoi/stoul style functions when not numeric
363     } catch (std::invalid_argument&) {
364         msg = "Argument was not numeric";
365         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
366
367         // Handles exceptions thrown by the standard library stoi/stoul
368         // style functions when the conversion does not fit in the datatype
369     } catch (std::out_of_range&) {
370         msg = "Argument was out of range";
371         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
372
373         // Handles any miscellaenous exceptions in addition to the range_error
374         // exceptions thrown by the configuration::set<param>() methods
375     } catch (std::exception& error) {
376         msg = error.what();
377         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
378     }
379
380     return rv;
381 }
382
383 protocol_binary_response_status EventuallyPersistentEngine::setCheckpointParam(
384         const char* keyz, const char* valz, std::string& msg) {
385     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
386
387     try {
388         if (strcmp(keyz, "chk_max_items") == 0) {
389             size_t v = std::stoull(valz);
390             validate(v, size_t(MIN_CHECKPOINT_ITEMS),
391                      size_t(MAX_CHECKPOINT_ITEMS));
392             getConfiguration().setChkMaxItems(v);
393         } else if (strcmp(keyz, "chk_period") == 0) {
394             size_t v = std::stoull(valz);
395             validate(v, size_t(MIN_CHECKPOINT_PERIOD),
396                      size_t(MAX_CHECKPOINT_PERIOD));
397             getConfiguration().setChkPeriod(v);
398         } else if (strcmp(keyz, "max_checkpoints") == 0) {
399             size_t v = std::stoull(valz);
400             validate(v, size_t(DEFAULT_MAX_CHECKPOINTS),
401                      size_t(MAX_CHECKPOINTS_UPPER_BOUND));
402             getConfiguration().setMaxCheckpoints(v);
403         } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
404             getConfiguration().setItemNumBasedNewChk(cb_stob(valz));
405         } else if (strcmp(keyz, "keep_closed_chks") == 0) {
406             getConfiguration().setKeepClosedChks(cb_stob(valz));
407         } else if (strcmp(keyz, "enable_chk_merge") == 0) {
408             getConfiguration().setEnableChkMerge(cb_stob(valz));
409         } else {
410             msg = "Unknown config param";
411             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
412         }
413
414         // Handles exceptions thrown by the cb_stob function
415     } catch (invalid_argument_bool& error) {
416         msg = error.what();
417         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
418
419         // Handles exceptions thrown by the standard
420         // library stoi/stoul style functions when not numeric
421     } catch (std::invalid_argument&) {
422         msg = "Argument was not numeric";
423         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
424
425         // Handles exceptions thrown by the standard library stoi/stoul
426         // style functions when the conversion does not fit in the datatype
427     } catch (std::out_of_range&) {
428         msg = "Argument was out of range";
429         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
430
431         // Handles any miscellaenous exceptions in addition to the range_error
432         // exceptions thrown by the configuration::set<param>() methods
433     } catch (std::exception& error) {
434         msg = error.what();
435         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
436     }
437
438     return rv;
439 }
440
441 protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
442         const char* keyz, const char* valz, std::string& msg) {
443     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
444
445     // Handle the actual mutation.
446     try {
447         if (strcmp(keyz, "bg_fetch_delay") == 0) {
448             getConfiguration().setBgFetchDelay(std::stoull(valz));
449         } else if (strcmp(keyz, "flushall_enabled") == 0) {
450             getConfiguration().setFlushallEnabled(cb_stob(valz));
451         } else if (strcmp(keyz, "max_size") == 0) {
452             size_t vsize = std::stoull(valz);
453
454             getConfiguration().setMaxSize(vsize);
455             EPStats& st = getEpStats();
456             getConfiguration().setMemLowWat(
457                     percentOf(vsize, st.mem_low_wat_percent));
458             getConfiguration().setMemHighWat(
459                     percentOf(vsize, st.mem_high_wat_percent));
460         } else if (strcmp(keyz, "mem_low_wat") == 0) {
461             getConfiguration().setMemLowWat(std::stoull(valz));
462         } else if (strcmp(keyz, "mem_high_wat") == 0) {
463             getConfiguration().setMemHighWat(std::stoull(valz));
464         } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
465             getConfiguration().setBackfillMemThreshold(std::stoull(valz));
466         } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
467             getConfiguration().setCompactionExpMemThreshold(std::stoull(valz));
468         } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
469             getConfiguration().setMutationMemThreshold(std::stoull(valz));
470         } else if (strcmp(keyz, "timing_log") == 0) {
471             EPStats& stats = getEpStats();
472             std::ostream* old = stats.timingLog;
473             stats.timingLog = NULL;
474             delete old;
475             if (strcmp(valz, "off") == 0) {
476                 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
477             } else {
478                 std::ofstream* tmp(new std::ofstream(valz));
479                 if (tmp->good()) {
480                     LOG(EXTENSION_LOG_INFO,
481                         "Logging detailed timings to ``%s''.", valz);
482                     stats.timingLog = tmp;
483                 } else {
484                     LOG(EXTENSION_LOG_WARNING,
485                         "Error setting detailed timing log to ``%s'':  %s",
486                         valz, strerror(errno));
487                     delete tmp;
488                 }
489             }
490         } else if (strcmp(keyz, "exp_pager_enabled") == 0) {
491             getConfiguration().setExpPagerEnabled(cb_stob(valz));
492         } else if (strcmp(keyz, "exp_pager_stime") == 0) {
493             getConfiguration().setExpPagerStime(std::stoull(valz));
494         } else if (strcmp(keyz, "exp_pager_initial_run_time") == 0) {
495             getConfiguration().setExpPagerInitialRunTime(std::stoll(valz));
496         } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
497             getConfiguration().requirementsMetOrThrow("access_scanner_enabled");
498             getConfiguration().setAccessScannerEnabled(cb_stob(valz));
499         } else if (strcmp(keyz, "alog_sleep_time") == 0) {
500             getConfiguration().requirementsMetOrThrow("alog_sleep_time");
501             getConfiguration().setAlogSleepTime(std::stoull(valz));
502         } else if (strcmp(keyz, "alog_task_time") == 0) {
503             getConfiguration().requirementsMetOrThrow("alog_task_time");
504             getConfiguration().setAlogTaskTime(std::stoull(valz));
505         } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
506             getConfiguration().setPagerActiveVbPcnt(std::stoull(valz));
507         } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
508             getConfiguration().setWarmupMinMemoryThreshold(std::stoull(valz));
509         } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
510             getConfiguration().setWarmupMinItemsThreshold(std::stoull(valz));
511         } else if (strcmp(keyz, "max_num_readers") == 0 ||
512                    strcmp(keyz, "num_reader_threads") == 0) {
513             size_t value = std::stoull(valz);
514             getConfiguration().setNumReaderThreads(value);
515             ExecutorPool::get()->setNumReaders(value);
516         } else if (strcmp(keyz, "max_num_writers") == 0 ||
517                    strcmp(keyz, "num_writer_threads") == 0) {
518             size_t value = std::stoull(valz);
519             getConfiguration().setNumWriterThreads(value);
520             ExecutorPool::get()->setNumWriters(value);
521         } else if (strcmp(keyz, "max_num_auxio") == 0 ||
522                    strcmp(keyz, "num_auxio_threads") == 0) {
523             size_t value = std::stoull(valz);
524             getConfiguration().setNumAuxioThreads(value);
525             ExecutorPool::get()->setNumAuxIO(value);
526         } else if (strcmp(keyz, "max_num_nonio") == 0 ||
527                    strcmp(keyz, "num_nonio_threads") == 0) {
528             size_t value = std::stoull(valz);
529             getConfiguration().setNumNonioThreads(value);
530             ExecutorPool::get()->setNumNonIO(value);
531         } else if (strcmp(keyz, "bfilter_enabled") == 0) {
532             getConfiguration().setBfilterEnabled(cb_stob(valz));
533         } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
534             getConfiguration().setBfilterResidencyThreshold(std::stof(valz));
535         } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
536             getConfiguration().setDefragmenterEnabled(cb_stob(valz));
537         } else if (strcmp(keyz, "defragmenter_interval") == 0) {
538             size_t v = std::stoull(valz);
539             // Adding separate validation as external limit is minimum 1
540             // to prevent setting defragmenter to constantly run
541             validate(v, size_t(1), std::numeric_limits<size_t>::max());
542             getConfiguration().setDefragmenterInterval(v);
543         } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
544             getConfiguration().setDefragmenterAgeThreshold(std::stoull(valz));
545         } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
546             getConfiguration().setDefragmenterChunkDuration(std::stoull(valz));
547         } else if (strcmp(keyz, "defragmenter_run") == 0) {
548             runDefragmenterTask();
549         } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
550             getConfiguration().setCompactionWriteQueueCap(std::stoull(valz));
551         } else if (strcmp(keyz, "dcp_min_compression_ratio") == 0) {
552             getConfiguration().setDcpMinCompressionRatio(std::stof(valz));
553         } else if (strcmp(keyz, "access_scanner_run") == 0) {
554             if (!(runAccessScannerTask())) {
555                 rv = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
556             }
557         } else if (strcmp(keyz, "vb_state_persist_run") == 0) {
558             runVbStatePersistTask(std::stoi(valz));
559         } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
560             getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
561             getConfiguration().setEphemeralFullPolicy(valz);
562         } else if (strcmp(keyz, "ephemeral_metadata_purge_age") == 0) {
563             getConfiguration().requirementsMetOrThrow(
564                     "ephemeral_metadata_purge_age");
565             getConfiguration().setEphemeralMetadataPurgeAge(std::stoull(valz));
566         } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
567             getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
568             getConfiguration().setEphemeralMetadataPurgeInterval(
569                     std::stoull(valz));
570         } else {
571             msg = "Unknown config param";
572             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
573         }
574         // Handles exceptions thrown by the cb_stob function
575     } catch (invalid_argument_bool& error) {
576         msg = error.what();
577         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
578
579         // Handles exceptions thrown by the standard
580         // library stoi/stoul style functions when not numeric
581     } catch (std::invalid_argument&) {
582         msg = "Argument was not numeric";
583         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
584
585         // Handles exceptions thrown by the standard library stoi/stoul
586         // style functions when the conversion does not fit in the datatype
587     } catch (std::out_of_range&) {
588         msg = "Argument was out of range";
589         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
590
591         // Handles any miscellaneous exceptions in addition to the range_error
592         // exceptions thrown by the configuration::set<param>() methods
593     } catch (std::exception& error) {
594         msg = error.what();
595         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
596     }
597
598     return rv;
599 }
600
601 protocol_binary_response_status EventuallyPersistentEngine::setDcpParam(
602         const char* keyz, const char* valz, std::string& msg) {
603     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
604     try {
605
606         if (strcmp(keyz,
607                    "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
608             size_t v = atoi(valz);
609             checkNumeric(valz);
610             validate(v, size_t(1), std::numeric_limits<size_t>::max());
611             getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(
612                     v);
613         } else if (
614             strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") ==
615             0) {
616             size_t v = atoi(valz);
617             checkNumeric(valz);
618             validate(v, size_t(1), std::numeric_limits<size_t>::max());
619             getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(
620                     v);
621         } else {
622             msg = "Unknown config param";
623             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
624         }
625     } catch (std::runtime_error& ex) {
626         msg = "Value out of range.";
627         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
628     }
629
630     return rv;
631 }
632
633 protocol_binary_response_status EventuallyPersistentEngine::setVbucketParam(
634         uint16_t vbucket,
635         const char* keyz,
636         const char* valz,
637         std::string& msg) {
638     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
639     try {
640         if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
641             uint64_t v = std::strtoull(valz, nullptr, 10);
642             checkNumeric(valz);
643             getConfiguration().setHlcDriftAheadThresholdUs(v);
644         } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
645             uint64_t v = std::strtoull(valz, nullptr, 10);
646             checkNumeric(valz);
647             getConfiguration().setHlcDriftBehindThresholdUs(v);
648         } else if (strcmp(keyz, "max_cas") == 0) {
649             uint64_t v = std::strtoull(valz, nullptr, 10);
650             checkNumeric(valz);
651             LOG(EXTENSION_LOG_WARNING, "setVbucketParam: max_cas:%" PRIu64 " "
652                 "vb:%" PRIu16 "\n", v, vbucket);
653             if (getKVBucket()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
654                 rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
655                 msg = "Not my vbucket";
656             }
657         } else {
658             msg = "Unknown config param";
659             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
660         }
661     } catch (std::runtime_error& ex) {
662         msg = "Value out of range.";
663         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
664     }
665     return rv;
666 }
667
668 static protocol_binary_response_status evictKey(
669     EventuallyPersistentEngine* e,
670     protocol_binary_request_header
671     * request,
672     const char** msg,
673     size_t* msg_size,
674     DocNamespace docNamespace) {
675     protocol_binary_request_no_extras* req =
676         (protocol_binary_request_no_extras*)request;
677
678     const uint8_t* keyPtr = reinterpret_cast<const uint8_t*>(request) +
679                             sizeof(*request);
680     size_t keylen = ntohs(req->message.header.request.keylen);
681     uint16_t vbucket = ntohs(request->request.vbucket);
682
683     LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key{%.*s}\n",
684         int(keylen), keyPtr);
685     msg_size = 0;
686     auto rv = e->evictKey(DocKey(keyPtr, keylen, docNamespace), vbucket, msg);
687     if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
688         rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
689         if (e->isDegradedMode()) {
690             return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
691         }
692     }
693     return rv;
694 }
695
696 protocol_binary_response_status EventuallyPersistentEngine::setParam(
697         protocol_binary_request_set_param* req, std::string& msg) {
698     size_t keylen = ntohs(req->message.header.request.keylen);
699     uint8_t extlen = req->message.header.request.extlen;
700     size_t vallen = ntohl(req->message.header.request.bodylen);
701     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
702     protocol_binary_engine_param_t paramtype =
703         static_cast<protocol_binary_engine_param_t>(ntohl(
704             req->message.body.param_type));
705
706     if (keylen == 0 || (vallen - keylen - extlen) == 0) {
707         return PROTOCOL_BINARY_RESPONSE_EINVAL;
708     }
709
710     const char* keyp = reinterpret_cast<const char*>(req->bytes)
711                        + sizeof(req->bytes);
712     const char* valuep = keyp + keylen;
713     vallen -= (keylen + extlen);
714
715     char keyz[128];
716     char valz[512];
717
718     // Read the key.
719     if (keylen >= sizeof(keyz)) {
720         msg = "Key is too large.";
721         return PROTOCOL_BINARY_RESPONSE_EINVAL;
722     }
723     memcpy(keyz, keyp, keylen);
724     keyz[keylen] = 0x00;
725
726     // Read the value.
727     if (vallen >= sizeof(valz)) {
728         msg = "Value is too large.";
729         return PROTOCOL_BINARY_RESPONSE_EINVAL;
730     }
731     memcpy(valz, valuep, vallen);
732     valz[vallen] = 0x00;
733
734     protocol_binary_response_status rv;
735
736     switch (paramtype) {
737     case protocol_binary_engine_param_flush:
738         rv = setFlushParam(keyz, valz, msg);
739         break;
740     case protocol_binary_engine_param_tap:
741         rv = setTapParam(keyz, valz, msg);
742         break;
743     case protocol_binary_engine_param_checkpoint:
744         rv = setCheckpointParam(keyz, valz, msg);
745         break;
746     case protocol_binary_engine_param_dcp:
747         rv = setDcpParam(keyz, valz, msg);
748         break;
749     case protocol_binary_engine_param_vbucket:
750         rv = setVbucketParam(vbucket, keyz, valz, msg);
751         break;
752     default:
753         rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
754     }
755
756     return rv;
757 }
758
759 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine* e,
760                                     const void* cookie,
761                                     protocol_binary_request_header* request,
762                                     ADD_RESPONSE response) {
763     protocol_binary_request_get_vbucket* req =
764         reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
765     if (req == nullptr) {
766         throw std::invalid_argument("getVBucket: Unable to convert req"
767                                         " to protocol_binary_request_get_vbucket");
768     }
769
770     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
771     VBucketPtr vb = e->getVBucket(vbucket);
772     if (!vb) {
773         return e->sendNotMyVBucketResponse(response, cookie, 0);
774     } else {
775         vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
776         return sendResponse(response, NULL, 0, NULL, 0, &state,
777                             sizeof(state),
778                             PROTOCOL_BINARY_RAW_BYTES,
779                             PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
780     }
781 }
782
783 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine* e,
784                                     const void* cookie,
785                                     protocol_binary_request_header* request,
786                                     ADD_RESPONSE response) {
787
788     protocol_binary_request_set_vbucket* req =
789         reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
790
791     uint64_t cas = ntohll(req->message.header.request.cas);
792
793     size_t bodylen = ntohl(req->message.header.request.bodylen)
794                      - ntohs(req->message.header.request.keylen);
795     if (bodylen != sizeof(vbucket_state_t)) {
796         const std::string msg("Incorrect packet format");
797         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
798                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
799                             PROTOCOL_BINARY_RESPONSE_EINVAL,
800                             cas, cookie);
801     }
802
803     vbucket_state_t state;
804     memcpy(&state, &req->message.body.state, sizeof(state));
805     state = static_cast<vbucket_state_t>(ntohl(state));
806
807     if (!is_valid_vbucket_state_t(state)) {
808         const std::string msg("Invalid vbucket state");
809         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
810                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
811                             PROTOCOL_BINARY_RESPONSE_EINVAL,
812                             cas, cookie);
813     }
814
815     uint16_t vb = ntohs(req->message.header.request.vbucket);
816     if (e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
817         const std::string msg("VBucket number too big");
818         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
819                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
820                             PROTOCOL_BINARY_RESPONSE_ERANGE,
821                             cas, cookie);
822     }
823     return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
824                         PROTOCOL_BINARY_RAW_BYTES,
825                         PROTOCOL_BINARY_RESPONSE_SUCCESS,
826                         cas, cookie);
827 }
828
829 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine* e,
830                                     const void* cookie,
831                                     protocol_binary_request_header* req,
832                                     ADD_RESPONSE response) {
833
834     uint64_t cas = ntohll(req->request.cas);
835
836     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
837     uint16_t vbucket = ntohs(req->request.vbucket);
838
839     std::string msg = "";
840     if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
841         msg = "Incorrect packet format.";
842         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
843                             msg.length(),
844                             PROTOCOL_BINARY_RAW_BYTES,
845                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
846     }
847
848     bool sync = false;
849     uint32_t bodylen = ntohl(req->request.bodylen);
850     if (bodylen > 0) {
851         const char* ptr = reinterpret_cast<const char*>(req->bytes) +
852                           sizeof(req->bytes);
853         if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
854             sync = true;
855         }
856     }
857
858     ENGINE_ERROR_CODE err;
859     void* es = e->getEngineSpecific(cookie);
860     if (sync) {
861         if (es == NULL) {
862             err = e->deleteVBucket(vbucket, cookie);
863             e->storeEngineSpecific(cookie, e);
864         } else {
865             e->storeEngineSpecific(cookie, NULL);
866             LOG(EXTENSION_LOG_INFO,
867                 "Completed sync deletion of vbucket %u",
868                 (unsigned)vbucket);
869             err = ENGINE_SUCCESS;
870         }
871     } else {
872         err = e->deleteVBucket(vbucket);
873     }
874     switch (err) {
875     case ENGINE_SUCCESS:
876         LOG(EXTENSION_LOG_NOTICE,
877             "Deletion of vbucket %d was completed.", vbucket);
878         break;
879     case ENGINE_NOT_MY_VBUCKET:
880         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
881             "because the vbucket doesn't exist!!!", vbucket);
882         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
883         break;
884     case ENGINE_EINVAL:
885         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
886             "because the vbucket is not in a dead state\n", vbucket);
887         msg = "Failed to delete vbucket.  Must be in the dead state.";
888         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
889         break;
890     case ENGINE_EWOULDBLOCK:
891         LOG(EXTENSION_LOG_NOTICE, "Request for vbucket %d deletion is in"
892                 " EWOULDBLOCK until the database file is removed from disk",
893             vbucket);
894         e->storeEngineSpecific(cookie, req);
895         return ENGINE_EWOULDBLOCK;
896     default:
897         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
898             "because of unknown reasons\n", vbucket);
899         msg = "Failed to delete vbucket.  Unknown reason.";
900         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
901     }
902
903     if (err != ENGINE_NOT_MY_VBUCKET) {
904         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
905                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
906                             res, cas, cookie);
907     } else {
908         return e->sendNotMyVBucketResponse(response, cookie, cas);
909     }
910 }
911
912 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine* e,
913                                        protocol_binary_request_header* request,
914                                        const void* cookie,
915                                        Item** it,
916                                        const char** msg,
917                                        protocol_binary_response_status* res,
918                                        DocNamespace docNamespace) {
919     KVBucketIface* kvb = e->getKVBucket();
920     protocol_binary_request_no_extras* req =
921         (protocol_binary_request_no_extras*)request;
922     int keylen = ntohs(req->message.header.request.keylen);
923     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
924     ENGINE_ERROR_CODE error_code;
925     DocKey key(reinterpret_cast<const uint8_t*>(request) + sizeof(*request),
926                keylen, docNamespace);
927
928     GetValue rv(kvb->getReplica(key, vbucket, cookie));
929
930     if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
931         if (error_code == ENGINE_NOT_MY_VBUCKET) {
932             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
933             return error_code;
934         } else if (error_code == ENGINE_TMPFAIL) {
935             *msg = "NOT_FOUND";
936             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
937         } else {
938             return error_code;
939         }
940     } else {
941         *it = rv.getValue();
942         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
943     }
944     ++(e->getEpStats().numOpsGet);
945     return ENGINE_SUCCESS;
946 }
947
948 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine* e,
949                                    const void* cookie,
950                                    protocol_binary_request_compact_db* req,
951                                    ADD_RESPONSE response) {
952
953     std::string msg = "";
954     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
955     compaction_ctx compactreq;
956     uint64_t cas = ntohll(req->message.header.request.cas);
957
958     if (ntohs(req->message.header.request.keylen) > 0 ||
959         req->message.header.request.extlen != 24) {
960         LOG(EXTENSION_LOG_WARNING,
961             "Compaction received bad ext/key len %d/%d.",
962             req->message.header.request.extlen,
963             ntohs(req->message.header.request.keylen));
964         msg = "Incorrect packet format.";
965         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
966                             msg.length(),
967                             PROTOCOL_BINARY_RAW_BYTES,
968                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
969     }
970     EPStats& stats = e->getEpStats();
971     compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
972     compactreq.purge_before_seq =
973         ntohll(req->message.body.purge_before_seq);
974     compactreq.drop_deletes = req->message.body.drop_deletes;
975     compactreq.db_file_id = e->getKVBucket()->getDBFileId(*req);
976     uint16_t vbid = ntohs(req->message.header.request.vbucket);
977
978     ENGINE_ERROR_CODE err;
979     void* es = e->getEngineSpecific(cookie);
980     if (es == NULL) {
981         ++stats.pendingCompactions;
982         e->storeEngineSpecific(cookie, e);
983         err = e->compactDB(vbid, compactreq, cookie);
984     } else {
985         e->storeEngineSpecific(cookie, NULL);
986         err = ENGINE_SUCCESS;
987     }
988
989     switch (err) {
990     case ENGINE_SUCCESS:
991         LOG(EXTENSION_LOG_NOTICE,
992             "Compaction of db file id: %d completed.", compactreq.db_file_id);
993         break;
994     case ENGINE_NOT_MY_VBUCKET:
995         --stats.pendingCompactions;
996         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
997             "because the db file doesn't exist!!!", compactreq.db_file_id);
998         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
999         break;
1000     case ENGINE_EINVAL:
1001         --stats.pendingCompactions;
1002         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1003             "because of an invalid argument", compactreq.db_file_id);
1004         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
1005         break;
1006     case ENGINE_EWOULDBLOCK:
1007         LOG(EXTENSION_LOG_NOTICE,
1008             "Compaction of db file id: %d scheduled "
1009                 "(awaiting completion).", compactreq.db_file_id);
1010         e->storeEngineSpecific(cookie, req);
1011         return ENGINE_EWOULDBLOCK;
1012     case ENGINE_TMPFAIL:
1013         LOG(EXTENSION_LOG_WARNING, "Request to compact db file id: %d hit"
1014                 " a temporary failure and may need to be retried",
1015             compactreq.db_file_id);
1016         msg = "Temporary failure in compacting db file.";
1017         res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1018         break;
1019     default:
1020         --stats.pendingCompactions;
1021         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1022             "because of unknown reasons\n", compactreq.db_file_id);
1023         msg = "Failed to compact db file.  Unknown reason.";
1024         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1025         break;
1026     }
1027
1028     if (err != ENGINE_NOT_MY_VBUCKET) {
1029         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1030                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1031                             res, cas, cookie);
1032     } else {
1033         return e->sendNotMyVBucketResponse(response, cookie, cas);
1034     }
1035 }
1036
1037 static ENGINE_ERROR_CODE processUnknownCommand(
1038     EventuallyPersistentEngine* h,
1039     const void* cookie,
1040     protocol_binary_request_header* request,
1041     ADD_RESPONSE response,
1042     DocNamespace docNamespace) {
1043     protocol_binary_response_status res =
1044         PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1045     std::string dynamic_msg;
1046     const char* msg = NULL;
1047     size_t msg_size = 0;
1048     Item* itm = NULL;
1049
1050     EPStats& stats = h->getEpStats();
1051     ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1052
1053     /**
1054      * Session validation
1055      * (For ns_server commands only)
1056      */
1057     switch (request->request.opcode) {
1058     case PROTOCOL_BINARY_CMD_SET_PARAM:
1059     case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1060     case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1061     case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1062     case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1063     case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1064     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1065         if (h->getEngineSpecific(cookie) == NULL) {
1066             uint64_t cas = ntohll(request->request.cas);
1067             if (!h->validateSessionCas(cas)) {
1068                 const std::string message("Invalid session token");
1069                 return sendResponse(response, NULL, 0, NULL, 0,
1070                                     message.c_str(), message.length(),
1071                                     PROTOCOL_BINARY_RAW_BYTES,
1072                                     PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1073                                     cas, cookie);
1074             }
1075         }
1076         break;
1077     }
1078     default:
1079         break;
1080     }
1081
1082     switch (request->request.opcode) {
1083     case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1084         return h->getAllVBucketSequenceNumbers(cookie, request, response);
1085
1086     case PROTOCOL_BINARY_CMD_GET_VBUCKET: {
1087         BlockTimer timer(&stats.getVbucketCmdHisto);
1088         rv = getVBucket(h, cookie, request, response);
1089         return rv;
1090     }
1091     case PROTOCOL_BINARY_CMD_DEL_VBUCKET: {
1092         BlockTimer timer(&stats.delVbucketCmdHisto);
1093         rv = delVBucket(h, cookie, request, response);
1094         if (rv != ENGINE_EWOULDBLOCK) {
1095             h->decrementSessionCtr();
1096             h->storeEngineSpecific(cookie, NULL);
1097         }
1098         return rv;
1099     }
1100     case PROTOCOL_BINARY_CMD_SET_VBUCKET: {
1101         BlockTimer timer(&stats.setVbucketCmdHisto);
1102         rv = setVBucket(h, cookie, request, response);
1103         h->decrementSessionCtr();
1104         return rv;
1105     }
1106     case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1107         res = h->stopFlusher(&msg, &msg_size);
1108         break;
1109     case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1110         res = h->startFlusher(&msg, &msg_size);
1111         break;
1112     case PROTOCOL_BINARY_CMD_SET_PARAM:
1113         res = h->setParam(
1114                 reinterpret_cast<protocol_binary_request_set_param*>(request),
1115                 dynamic_msg);
1116         msg = dynamic_msg.c_str();
1117         msg_size = dynamic_msg.length();
1118         h->decrementSessionCtr();
1119         break;
1120     case PROTOCOL_BINARY_CMD_EVICT_KEY:
1121         res = evictKey(h, request, &msg, &msg_size, docNamespace);
1122         break;
1123     case PROTOCOL_BINARY_CMD_OBSERVE:
1124         return h->observe(cookie, request, response, docNamespace);
1125     case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1126         return h->observe_seqno(cookie, request, response);
1127     case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT: {
1128         rv = h->deregisterTapClient(cookie, request, response);
1129         h->decrementSessionCtr();
1130         return rv;
1131     }
1132     case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN: {
1133         rv = h->resetReplicationChain(cookie, request, response);
1134         return rv;
1135     }
1136     case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER: {
1137         rv = h->changeTapVBFilter(cookie, request, response);
1138         h->decrementSessionCtr();
1139         return rv;
1140     }
1141     case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1142     case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1143     case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE: {
1144         rv = h->handleCheckpointCmds(cookie, request, response);
1145         return rv;
1146     }
1147     case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE: {
1148         rv = h->handleSeqnoCmds(cookie, request, response);
1149         return rv;
1150     }
1151     case PROTOCOL_BINARY_CMD_GET_META:
1152     case PROTOCOL_BINARY_CMD_GETQ_META: {
1153         rv = h->getMeta(cookie,
1154                         reinterpret_cast<protocol_binary_request_get_meta*>
1155                         (request), response,
1156                         docNamespace);
1157         return rv;
1158     }
1159     case PROTOCOL_BINARY_CMD_SET_WITH_META:
1160     case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1161     case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1162     case PROTOCOL_BINARY_CMD_ADDQ_WITH_META: {
1163         rv = h->setWithMeta(cookie,
1164                             reinterpret_cast<protocol_binary_request_set_with_meta*>
1165                             (request), response,
1166                             docNamespace);
1167         return rv;
1168     }
1169     case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1170     case PROTOCOL_BINARY_CMD_DELQ_WITH_META: {
1171         rv = h->deleteWithMeta(cookie,
1172                                reinterpret_cast<protocol_binary_request_delete_with_meta*>
1173                                (request), response,
1174                                docNamespace);
1175         return rv;
1176     }
1177     case PROTOCOL_BINARY_CMD_RETURN_META: {
1178         return h->returnMeta(cookie,
1179                              reinterpret_cast<protocol_binary_request_return_meta*>
1180                              (request), response,
1181                              docNamespace);
1182     }
1183     case PROTOCOL_BINARY_CMD_GET_REPLICA:
1184         rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res, docNamespace);
1185         if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1186             return rv;
1187         }
1188         break;
1189     case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1190     case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC: {
1191         rv = h->handleTrafficControlCmd(cookie, request, response);
1192         return rv;
1193     }
1194     case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG: {
1195         rv = h->setClusterConfig(cookie,
1196                                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1197                                  (request), response);
1198         h->decrementSessionCtr();
1199         return rv;
1200     }
1201     case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1202         return h->getClusterConfig(cookie,
1203                                    reinterpret_cast<protocol_binary_request_get_cluster_config*>
1204                                    (request), response);
1205     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1206         rv = compactDB(h, cookie,
1207                        (protocol_binary_request_compact_db*)(request),
1208                        response);
1209         if (rv != ENGINE_EWOULDBLOCK) {
1210             h->decrementSessionCtr();
1211             h->storeEngineSpecific(cookie, NULL);
1212         }
1213         return rv;
1214     }
1215     case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY: {
1216         if (request->request.extlen != 0 ||
1217             request->request.keylen != 0 ||
1218             request->request.bodylen != 0) {
1219             return ENGINE_EINVAL;
1220         }
1221         return h->getRandomKey(cookie, response);
1222     }
1223     case PROTOCOL_BINARY_CMD_GET_KEYS: {
1224         return h->getAllKeys(cookie,
1225                              reinterpret_cast<protocol_binary_request_get_keys*>
1226                              (request), response,
1227                              docNamespace);
1228     }
1229         // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
1230     case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1231     case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE: {
1232         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
1233                             PROTOCOL_BINARY_RAW_BYTES,
1234                             PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
1235                             cookie);
1236     }
1237     }
1238
1239     if (itm) {
1240         uint32_t flags = itm->getFlags();
1241         rv = sendResponse(response,
1242                           static_cast<const void*>(itm->getKey().data()),
1243                           itm->getKey().size(),
1244                           (const void*)&flags, sizeof(uint32_t),
1245                           static_cast<const void*>(itm->getData()),
1246                           itm->getNBytes(), itm->getDataType(),
1247                           static_cast<uint16_t>(res), itm->getCas(),
1248                           cookie);
1249         delete itm;
1250     } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1251         return h->sendNotMyVBucketResponse(response, cookie, 0);
1252     } else {
1253         msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1254         rv = sendResponse(response, NULL, 0, NULL, 0,
1255                           msg, static_cast<uint16_t>(msg_size),
1256                           PROTOCOL_BINARY_RAW_BYTES,
1257                           static_cast<uint16_t>(res), 0, cookie);
1258
1259     }
1260     return rv;
1261 }
1262
1263 static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1264                                            const void* cookie,
1265                                            protocol_binary_request_header
1266                                            * request,
1267                                            ADD_RESPONSE response,
1268                                            DocNamespace doc_namespace) {
1269     auto engine = acquireEngine(handle);
1270     auto ret = processUnknownCommand(
1271             engine.get(), cookie, request, response, doc_namespace);
1272     return ret;
1273 }
1274
1275 static void EvpItemSetCas(ENGINE_HANDLE*, const void*,
1276                           item* itm, uint64_t cas) {
1277     static_cast<Item*>(itm)->setCas(cas);
1278 }
1279
1280 static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1281                                       const void* cookie,
1282                                       void* engine_specific,
1283                                       uint16_t nengine,
1284                                       uint8_t ttl,
1285                                       uint16_t tap_flags,
1286                                       tap_event_t tap_event,
1287                                       uint32_t tap_seqno,
1288                                       const void* key,
1289                                       size_t nkey,
1290                                       uint32_t flags,
1291                                       uint32_t exptime,
1292                                       uint64_t cas,
1293                                       uint8_t datatype,
1294                                       const void* data,
1295                                       size_t ndata,
1296                                       uint16_t vbucket) {
1297     if (!mcbp::datatype::is_valid(datatype)) {
1298         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1299             " (TapNotify)");
1300         return ENGINE_EINVAL;
1301     }
1302
1303     return acquireEngine(handle)->tapNotify(cookie,
1304                                             engine_specific,
1305                                             nengine,
1306                                             ttl,
1307                                             tap_flags,
1308                                             (uint16_t)tap_event,
1309                                             tap_seqno,
1310                                             key,
1311                                             nkey,
1312                                             flags,
1313                                             exptime,
1314                                             cas,
1315                                             datatype,
1316                                             data,
1317                                             ndata,
1318                                             vbucket);
1319 }
1320
1321 static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1322                                   const void* cookie, item** itm,
1323                                   void** es, uint16_t* nes, uint8_t* ttl,
1324                                   uint16_t* flags, uint32_t* seqno,
1325                                   uint16_t* vbucket) {
1326     uint16_t tap_event = acquireEngine(handle)->walkTapQueue(
1327             cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1328     return static_cast<tap_event_t>(tap_event);
1329 }
1330
1331 static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1332                                       const void* cookie,
1333                                       const void* client,
1334                                       size_t nclient,
1335                                       uint32_t flags,
1336                                       const void* userdata,
1337                                       size_t nuserdata) {
1338     auto engine = acquireEngine(handle);
1339     TAP_ITERATOR iterator = NULL;
1340     {
1341         std::string c(static_cast<const char*>(client), nclient);
1342         // Figure out what we want from the userdata before adding it to
1343         // the API to the handle
1344         if (engine->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1345             iterator = EvpTapIterator;
1346         }
1347     }
1348     return iterator;
1349 }
1350
1351
1352 static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1353                                     const void* cookie,
1354                                     struct dcp_message_producers* producers) {
1355     auto engine = acquireEngine(handle);
1356     ConnHandler* conn = engine->getConnHandler(cookie);
1357     if (conn) {
1358         return conn->step(producers);
1359     }
1360     return ENGINE_DISCONNECT;
1361 }
1362
1363
1364 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1365                                     const void* cookie,
1366                                     uint32_t opaque,
1367                                     uint32_t seqno,
1368                                     uint32_t flags,
1369                                     void* name,
1370                                     uint16_t nname) {
1371     return acquireEngine(handle)->dcpOpen(
1372             cookie, opaque, seqno, flags, name, nname);
1373 }
1374
1375 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1376                                          const void* cookie,
1377                                          uint32_t opaque,
1378                                          uint16_t vbucket,
1379                                          uint32_t flags) {
1380     return acquireEngine(handle)->dcpAddStream(cookie, opaque, vbucket, flags);
1381 }
1382
1383 static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1384                                            const void* cookie,
1385                                            uint32_t opaque,
1386                                            uint16_t vbucket) {
1387     auto engine = acquireEngine(handle);
1388     ConnHandler* conn = engine->getConnHandler(cookie);
1389     if (conn) {
1390         return conn->closeStream(opaque, vbucket);
1391     }
1392     return ENGINE_DISCONNECT;
1393 }
1394
1395
1396 static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1397                                          const void* cookie,
1398                                          uint32_t flags,
1399                                          uint32_t opaque,
1400                                          uint16_t vbucket,
1401                                          uint64_t startSeqno,
1402                                          uint64_t endSeqno,
1403                                          uint64_t vbucketUuid,
1404                                          uint64_t snapStartSeqno,
1405                                          uint64_t snapEndSeqno,
1406                                          uint64_t* rollbackSeqno,
1407                                          dcp_add_failover_log callback) {
1408     auto engine = acquireEngine(handle);
1409     ConnHandler* conn = engine->getConnHandler(cookie);
1410     if (conn) {
1411         return conn->streamRequest(flags,
1412                                    opaque,
1413                                    vbucket,
1414                                    startSeqno,
1415                                    endSeqno,
1416                                    vbucketUuid,
1417                                    snapStartSeqno,
1418                                    snapEndSeqno,
1419                                    rollbackSeqno,
1420                                    callback);
1421     }
1422     return ENGINE_DISCONNECT;
1423 }
1424
1425 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1426                                               const void* cookie,
1427                                               uint32_t opaque,
1428                                               uint16_t vbucket,
1429                                               dcp_add_failover_log callback) {
1430     auto engine = acquireEngine(handle);
1431     ConnHandler* conn = engine->getConnHandler(cookie);
1432     if (conn) {
1433         return conn->getFailoverLog(opaque, vbucket, callback);
1434     }
1435     return ENGINE_DISCONNECT;
1436 }
1437
1438
1439 static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1440                                          const void* cookie,
1441                                          uint32_t opaque,
1442                                          uint16_t vbucket,
1443                                          uint32_t flags) {
1444     auto engine = acquireEngine(handle);
1445     ConnHandler* conn = engine->getConnHandler(cookie);
1446     if (conn) {
1447         return conn->streamEnd(opaque, vbucket, flags);
1448     }
1449     return ENGINE_DISCONNECT;
1450 }
1451
1452
1453 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1454                                               const void* cookie,
1455                                               uint32_t opaque,
1456                                               uint16_t vbucket,
1457                                               uint64_t start_seqno,
1458                                               uint64_t end_seqno,
1459                                               uint32_t flags) {
1460     auto engine = acquireEngine(handle);
1461     ConnHandler* conn = engine->getConnHandler(cookie);
1462     if (conn) {
1463         return conn->snapshotMarker(
1464                 opaque, vbucket, start_seqno, end_seqno, flags);
1465     }
1466     return ENGINE_DISCONNECT;
1467 }
1468
1469 static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1470                                         const void* cookie,
1471                                         uint32_t opaque,
1472                                         const DocKey& key,
1473                                         cb::const_byte_buffer value,
1474                                         size_t priv_bytes,
1475                                         uint8_t datatype,
1476                                         uint64_t cas,
1477                                         uint16_t vbucket,
1478                                         uint32_t flags,
1479                                         uint64_t by_seqno,
1480                                         uint64_t rev_seqno,
1481                                         uint32_t expiration,
1482                                         uint32_t lock_time,
1483                                         cb::const_byte_buffer meta,
1484                                         uint8_t nru) {
1485     if (!mcbp::datatype::is_valid(datatype)) {
1486         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1487             " (DCPMutation)");
1488         return ENGINE_EINVAL;
1489     }
1490     auto engine = acquireEngine(handle);
1491     ConnHandler* conn = engine->getConnHandler(cookie);
1492     if (conn) {
1493         return conn->mutation(opaque, key, value, priv_bytes, datatype, cas,
1494                               vbucket, flags, by_seqno, rev_seqno, expiration,
1495                               lock_time, meta, nru);
1496     }
1497     return ENGINE_DISCONNECT;
1498 }
1499
1500 static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1501                                         const void* cookie,
1502                                         uint32_t opaque,
1503                                         const DocKey& key,
1504                                         cb::const_byte_buffer value,
1505                                         size_t priv_bytes,
1506                                         uint8_t datatype,
1507                                         uint64_t cas,
1508                                         uint16_t vbucket,
1509                                         uint64_t by_seqno,
1510                                         uint64_t rev_seqno,
1511                                         cb::const_byte_buffer meta) {
1512     auto engine = acquireEngine(handle);
1513     ConnHandler* conn = engine->getConnHandler(cookie);
1514     if (conn) {
1515         return conn->deletion(opaque, key, value, priv_bytes, datatype, cas,
1516                               vbucket, by_seqno, rev_seqno, meta);
1517     }
1518     return ENGINE_DISCONNECT;
1519 }
1520
1521 static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1522                                           const void* cookie,
1523                                           uint32_t opaque,
1524                                           const DocKey& key,
1525                                           cb::const_byte_buffer value,
1526                                           size_t priv_bytes,
1527                                           uint8_t datatype,
1528                                           uint64_t cas,
1529                                           uint16_t vbucket,
1530                                           uint64_t by_seqno,
1531                                           uint64_t rev_seqno,
1532                                           cb::const_byte_buffer meta) {
1533     auto engine = acquireEngine(handle);
1534     ConnHandler* conn = engine->getConnHandler(cookie);
1535     if (conn) {
1536         return conn->expiration(opaque, key, value, priv_bytes, datatype, cas,
1537                                 vbucket, by_seqno, rev_seqno, meta);
1538     }
1539     return ENGINE_DISCONNECT;
1540 }
1541
1542 static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1543                                      const void* cookie,
1544                                      uint32_t opaque,
1545                                      uint16_t vbucket) {
1546     auto engine = acquireEngine(handle);
1547     ConnHandler* conn = engine->getConnHandler(cookie);
1548     if (conn) {
1549         return conn->flushall(opaque, vbucket);
1550     }
1551     return ENGINE_DISCONNECT;
1552 }
1553
1554 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1555                                                const void* cookie,
1556                                                uint32_t opaque,
1557                                                uint16_t vbucket,
1558                                                vbucket_state_t state) {
1559     auto engine = acquireEngine(handle);
1560     ConnHandler* conn = engine->getConnHandler(cookie);
1561     if (conn) {
1562         return conn->setVBucketState(opaque, vbucket, state);
1563     }
1564     return ENGINE_DISCONNECT;
1565 }
1566
1567 static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1568                                     const void* cookie,
1569                                     uint32_t opaque) {
1570     auto engine = acquireEngine(handle);
1571     ConnHandler* conn = engine->getConnHandler(cookie);
1572     if (conn) {
1573         return conn->noop(opaque);
1574     }
1575     return ENGINE_DISCONNECT;
1576 }
1577
1578 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1579                                                      const void* cookie,
1580                                                      uint32_t opaque,
1581                                                      uint16_t vbucket,
1582                                                      uint32_t buffer_bytes) {
1583     auto engine = acquireEngine(handle);
1584     ConnHandler* conn = engine->getConnHandler(cookie);
1585     if (conn) {
1586         return conn->bufferAcknowledgement(opaque, vbucket, buffer_bytes);
1587     }
1588     return ENGINE_DISCONNECT;
1589 }
1590
1591 static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1592                                        const void* cookie,
1593                                        uint32_t opaque,
1594                                        const void* key,
1595                                        uint16_t nkey,
1596                                        const void* value,
1597                                        uint32_t nvalue) {
1598     auto engine = acquireEngine(handle);
1599     ConnHandler* conn = engine->getConnHandler(cookie);
1600     if (conn) {
1601         return conn->control(opaque, key, nkey, value, nvalue);
1602     }
1603     return ENGINE_DISCONNECT;
1604 }
1605
1606 static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1607                                                const void* cookie,
1608                                                protocol_binary_response_header* response) {
1609     auto engine = acquireEngine(handle);
1610     ConnHandler* conn = engine->getConnHandler(cookie);
1611     if (conn) {
1612         if (conn->handleResponse(response)) {
1613             return ENGINE_SUCCESS;
1614         }
1615     }
1616     return ENGINE_DISCONNECT;
1617 }
1618
1619 static void EvpHandleDisconnect(const void* cookie,
1620                                 ENGINE_EVENT_TYPE type,
1621                                 const void* event_data,
1622                                 const void* cb_data) {
1623     if (type != ON_DISCONNECT) {
1624         throw std::invalid_argument("EvpHandleDisconnect: type "
1625                                         "(which is" + std::to_string(type) +
1626                                     ") is not ON_DISCONNECT");
1627     }
1628     if (event_data != nullptr) {
1629         throw std::invalid_argument("EvpHandleDisconnect: event_data "
1630                                         "is not NULL");
1631     }
1632     void* c = const_cast<void*>(cb_data);
1633     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1634 }
1635
1636 static void EvpHandleDeleteBucket(const void* cookie,
1637                                   ENGINE_EVENT_TYPE type,
1638                                   const void* event_data,
1639                                   const void* cb_data) {
1640     if (type != ON_DELETE_BUCKET) {
1641         throw std::invalid_argument("EvpHandleDeleteBucket: type "
1642                                         "(which is" + std::to_string(type) +
1643                                     ") is not ON_DELETE_BUCKET");
1644     }
1645     if (event_data != nullptr) {
1646         throw std::invalid_argument("EvpHandleDeleteBucket: event_data "
1647                                         "is not NULL");
1648     }
1649     void* c = const_cast<void*>(cb_data);
1650     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDeleteBucket(cookie);
1651 }
1652
1653 void EvpSetLogLevel(ENGINE_HANDLE* handle, EXTENSION_LOG_LEVEL level) {
1654     Logger::setGlobalLogLevel(level);
1655 }
1656
1657 /**
1658  * The only public interface to the eventually persistent engine.
1659  * Allocate a new instance and initialize it
1660  * @param interface the highest interface the server supports (we only
1661  *                  support interface 1)
1662  * @param get_server_api callback function to get the server exported API
1663  *                  functions
1664  * @param handle Where to return the new instance
1665  * @return ENGINE_SUCCESS on success
1666  */
1667 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1668                                   GET_SERVER_API get_server_api,
1669                                   ENGINE_HANDLE** handle) {
1670     SERVER_HANDLE_V1* api = get_server_api();
1671     if (interface != 1 || api == NULL) {
1672         return ENGINE_ENOTSUP;
1673     }
1674
1675     Logger::setLoggerAPI(api->log);
1676
1677     MemoryTracker::getInstance(*api->alloc_hooks);
1678     ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1679
1680     std::atomic<size_t>* inital_tracking = new std::atomic<size_t>();
1681
1682     ObjectRegistry::setStats(inital_tracking);
1683     EventuallyPersistentEngine* engine;
1684     engine = new EventuallyPersistentEngine(get_server_api);
1685     ObjectRegistry::setStats(NULL);
1686
1687     if (engine == NULL) {
1688         return ENGINE_ENOMEM;
1689     }
1690
1691     if (MemoryTracker::trackingMemoryAllocations()) {
1692         engine->getEpStats().memoryTrackerEnabled.store(true);
1693         engine->getEpStats().totalMemory->store(inital_tracking->load());
1694     }
1695     delete inital_tracking;
1696
1697     initialize_time_functions(api->core);
1698
1699     *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1700
1701     return ENGINE_SUCCESS;
1702 }
1703
1704 /*
1705     This method is called prior to unloading of the shared-object.
1706     Global clean-up should be performed from this method.
1707 */
1708 void destroy_engine() {
1709     ExecutorPool::shutdown();
1710     // A single MemoryTracker exists for *all* buckets
1711     // and must be destroyed before unloading the shared object.
1712     MemoryTracker::destroyInstance();
1713     ObjectRegistry::reset();
1714 }
1715
1716 static bool EvpGetItemInfo(ENGINE_HANDLE* handle, const void*,
1717                            const item* itm, item_info* itm_info) {
1718     const Item* it = reinterpret_cast<const Item*>(itm);
1719     auto engine = acquireEngine(handle);
1720     VBucketPtr vb = engine->getKVBucket()->getVBucket(it->getVBucketId());
1721     uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
1722     *itm_info = it->toItemInfo(vb_uuid);
1723     return true;
1724 }
1725
1726 static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1727                            item* itm, const item_info* itm_info) {
1728     Item* it = reinterpret_cast<Item*>(itm);
1729     if (!it) {
1730         return false;
1731     }
1732     it->setDataType(itm_info->datatype);
1733     return true;
1734 }
1735
1736 static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1737                                              const void* cookie,
1738                                              engine_get_vb_map_cb callback) {
1739     auto engine = acquireEngine(handle);
1740     LockHolder lh(engine->clusterConfig.lock);
1741     const char* config = engine->clusterConfig.config.data();
1742     uint32_t len = engine->clusterConfig.config.size();
1743     engine.reset(); // Want to release the engine before the callback
1744     return callback(cookie, config, len);
1745 }
1746
1747 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1748     va_list va;
1749     va_start(va, fmt);
1750     global_logger.vlog(severity, fmt, va);
1751     va_end(va);
1752 }
1753
1754 EventuallyPersistentEngine::EventuallyPersistentEngine(
1755         GET_SERVER_API get_server_api)
1756     : clusterConfig(),
1757       kvBucket(nullptr),
1758       workload(NULL),
1759       workloadPriority(NO_BUCKET_PRIORITY),
1760       replicationThrottle(NULL),
1761       getServerApiFunc(get_server_api),
1762       dcpConnMap_(NULL),
1763       dcpFlowControlManager_(NULL),
1764       tapConnMap(NULL),
1765       tapConfig(NULL),
1766       checkpointConfig(NULL),
1767       trafficEnabled(false),
1768       deleteAllEnabled(false),
1769       startupTime(0),
1770       taskable(this) {
1771     interface.interface = 1;
1772     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1773     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1774     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1775     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1776     ENGINE_HANDLE_V1::allocate_ex = EvpItemAllocateEx;
1777     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1778     ENGINE_HANDLE_V1::release = EvpItemRelease;
1779     ENGINE_HANDLE_V1::get = EvpGet;
1780     ENGINE_HANDLE_V1::get_if = EvpGetIf;
1781     ENGINE_HANDLE_V1::get_and_touch = EvpGetAndTouch;
1782     ENGINE_HANDLE_V1::get_locked = EvpGetLocked;
1783     ENGINE_HANDLE_V1::unlock = EvpUnlock;
1784     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1785     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1786     ENGINE_HANDLE_V1::store = EvpStore;
1787     ENGINE_HANDLE_V1::flush = EvpFlush;
1788     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1789     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1790     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1791     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1792     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1793     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1794     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1795
1796     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1797     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1798     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1799     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1800     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1801     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1802     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1803     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1804     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1805     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1806     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1807     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1808     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1809     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1810     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1811     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1812     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1813     ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
1814
1815     serverApi = getServerApiFunc();
1816     memset(&info, 0, sizeof(info));
1817     info.info.description = "EP engine v" VERSION;
1818     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1819     info.info.features[info.info.num_features++].feature =
1820                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1821     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1822     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1823 }
1824
1825 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1826 {
1827     EventuallyPersistentEngine *epe =
1828                                     ObjectRegistry::onSwitchThread(NULL, true);
1829     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1830     ObjectRegistry::onSwitchThread(epe);
1831     return rv;
1832 }
1833
1834 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1835 {
1836     EventuallyPersistentEngine *epe =
1837                                     ObjectRegistry::onSwitchThread(NULL, true);
1838     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1839     ObjectRegistry::onSwitchThread(epe);
1840     return rv;
1841 }
1842
1843 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1844                                                         EVENT_CALLBACK cb,
1845                                                         const void *cb_data) {
1846     EventuallyPersistentEngine *epe =
1847                                     ObjectRegistry::onSwitchThread(NULL, true);
1848     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1849     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1850                             type, cb, cb_data);
1851     ObjectRegistry::onSwitchThread(epe);
1852 }
1853
1854 /**
1855  * A configuration value changed listener that responds to ep-engine
1856  * parameter changes by invoking engine-specific methods on
1857  * configuration change events.
1858  */
1859 class EpEngineValueChangeListener : public ValueChangedListener {
1860 public:
1861     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1862         // EMPTY
1863     }
1864
1865     virtual void sizeValueChanged(const std::string &key, size_t value) {
1866         if (key.compare("getl_max_timeout") == 0) {
1867             engine.setGetlMaxTimeout(value);
1868         } else if (key.compare("getl_default_timeout") == 0) {
1869             engine.setGetlDefaultTimeout(value);
1870         } else if (key.compare("max_item_size") == 0) {
1871             engine.setMaxItemSize(value);
1872         } else if (key.compare("max_item_privileged_bytes") == 0) {
1873             engine.setMaxItemPrivilegedBytes(value);
1874         }
1875     }
1876
1877     virtual void booleanValueChanged(const std::string &key, bool value) {
1878         if (key.compare("flushall_enabled") == 0) {
1879             engine.setDeleteAll(value);
1880         }
1881     }
1882 private:
1883     EventuallyPersistentEngine &engine;
1884 };
1885
1886
1887
1888 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1889     resetStats();
1890     if (config != NULL) {
1891         LOG(EXTENSION_LOG_NOTICE, "EPEngine::initialize: parsing config:\"%s\"",
1892             config);
1893         if (!configuration.parseConfiguration(config, serverApi)) {
1894             LOG(EXTENSION_LOG_WARNING, "Failed to parse the configuration config "
1895                 "during bucket initialization.  config=%s", config);
1896             return ENGINE_FAILED;
1897         }
1898     }
1899
1900     name = configuration.getCouchBucket();
1901     maxFailoverEntries = configuration.getMaxFailoverEntries();
1902
1903     // Start updating the variables from the config!
1904     HashTable::setDefaultNumBuckets(configuration.getHtSize());
1905     HashTable::setDefaultNumLocks(configuration.getHtLocks());
1906     StoredValue::setMutationMemoryThreshold(
1907                                       configuration.getMutationMemThreshold());
1908
1909     if (configuration.getMaxSize() == 0) {
1910         configuration.setMaxSize(std::numeric_limits<size_t>::max());
1911     }
1912
1913     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1914         stats.mem_low_wat_percent.store(0.75);
1915         configuration.setMemLowWat(percentOf(
1916                 configuration.getMaxSize(), stats.mem_low_wat_percent.load()));
1917     }
1918
1919     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1920         stats.mem_high_wat_percent.store(0.85);
1921         configuration.setMemHighWat(percentOf(
1922                 configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
1923     }
1924
1925     maxItemSize = configuration.getMaxItemSize();
1926     configuration.addValueChangedListener("max_item_size",
1927                                        new EpEngineValueChangeListener(*this));
1928
1929     maxItemPrivilegedBytes = configuration.getMaxItemPrivilegedBytes();
1930     configuration.addValueChangedListener(
1931             "max_item_privileged_bytes",
1932             new EpEngineValueChangeListener(*this));
1933
1934     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1935     configuration.addValueChangedListener("getl_default_timeout",
1936                                        new EpEngineValueChangeListener(*this));
1937     getlMaxTimeout = configuration.getGetlMaxTimeout();
1938     configuration.addValueChangedListener("getl_max_timeout",
1939                                        new EpEngineValueChangeListener(*this));
1940
1941     deleteAllEnabled = configuration.isFlushallEnabled();
1942     configuration.addValueChangedListener("flushall_enabled",
1943                                        new EpEngineValueChangeListener(*this));
1944
1945     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
1946                                   configuration.getMaxNumShards());
1947     if ((unsigned int)workload->getNumShards() >
1948                                               configuration.getMaxVbuckets()) {
1949         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
1950             "equal or less than max number of vbuckets");
1951         return ENGINE_FAILED;
1952     }
1953
1954     dcpConnMap_ = new DcpConnMap(*this);
1955
1956     /* Get the flow control policy */
1957     std::string flowCtlPolicy = configuration.getDcpFlowControlPolicy();
1958
1959     if (!flowCtlPolicy.compare("static")) {
1960         dcpFlowControlManager_ = new DcpFlowControlManagerStatic(*this);
1961     } else if (!flowCtlPolicy.compare("dynamic")) {
1962         dcpFlowControlManager_ = new DcpFlowControlManagerDynamic(*this);
1963     } else if (!flowCtlPolicy.compare("aggressive")) {
1964         dcpFlowControlManager_ = new DcpFlowControlManagerAggressive(*this);
1965     } else {
1966         /* Flow control is not enabled */
1967         dcpFlowControlManager_ = new DcpFlowControlManager(*this);
1968     }
1969
1970     tapConnMap = new TapConnMap(*this);
1971     tapConfig = new TapConfig(*this);
1972     replicationThrottle = new ReplicationThrottle(configuration, stats);
1973     TapConfig::addConfigChangeListener(*this);
1974
1975     checkpointConfig = new CheckpointConfig(*this);
1976     CheckpointConfig::addConfigChangeListener(*this);
1977
1978     kvBucket = makeBucket(configuration);
1979
1980     initializeEngineCallbacks();
1981
1982     // Complete the initialization of the ep-store
1983     if (!kvBucket->initialize()) {
1984         return ENGINE_FAILED;
1985     }
1986
1987     if(configuration.isDataTrafficEnabled()) {
1988         enableTraffic(true);
1989     }
1990
1991     tapConnMap->initialize(TAP_CONN_NOTIFIER);
1992     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
1993
1994     // record engine initialization time
1995     startupTime.store(ep_real_time());
1996
1997     LOG(EXTENSION_LOG_NOTICE,
1998         "EP Engine: Initialization of %s bucket complete",
1999         configuration.getBucketType().c_str());
2000
2001     return ENGINE_SUCCESS;
2002 }
2003
2004 void EventuallyPersistentEngine::destroy(bool force) {
2005     stats.forceShutdown = force;
2006     stats.isShutdown = true;
2007
2008     // Perform a snapshot of the stats before shutting down so we can persist
2009     // the type of shutdown (stats.forceShutdown), and consequently on the
2010     // next warmup can determine is there was a clean shutdown - see
2011     // Warmup::cleanShutdown
2012     if (kvBucket) {
2013         kvBucket->snapshotStats();
2014     }
2015     if (tapConnMap) {
2016         tapConnMap->shutdownAllConnections();
2017     }
2018     if (dcpConnMap_) {
2019         dcpConnMap_->shutdownAllConnections();
2020     }
2021 }
2022
2023 ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
2024         item** itm,
2025         const DocKey& key,
2026         const size_t nbytes,
2027         const size_t priv_nbytes,
2028         const int flags,
2029         const rel_time_t exptime,
2030         uint8_t datatype,
2031         uint16_t vbucket) {
2032     if (priv_nbytes > maxItemPrivilegedBytes) {
2033         return ENGINE_E2BIG;
2034     }
2035
2036     if ((nbytes - priv_nbytes) > maxItemSize) {
2037         return ENGINE_E2BIG;
2038     }
2039
2040     if (!hasAvailableSpace(sizeof(Item) + sizeof(Blob) + key.size() + nbytes)) {
2041         return memoryCondition();
2042     }
2043
2044     time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
2045
2046     uint8_t ext_meta[1];
2047     uint8_t ext_len = EXT_META_LEN;
2048     *(ext_meta) = datatype;
2049     *itm = new Item(key,
2050                     flags,
2051                     expiretime,
2052                     nullptr,
2053                     nbytes,
2054                     ext_meta,
2055                     ext_len,
2056                     0 /*cas*/,
2057                     -1 /*seq*/,
2058                     vbucket);
2059     if (*itm == NULL) {
2060         return memoryCondition();
2061     } else {
2062         stats.itemAllocSizeHisto.add(nbytes);
2063         return ENGINE_SUCCESS;
2064     }
2065 }
2066
2067 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
2068     if (!deleteAllEnabled) {
2069         return ENGINE_ENOTSUP;
2070     }
2071
2072     if (!isDegradedMode()) {
2073         return ENGINE_TMPFAIL;
2074     }
2075
2076     /*
2077      * Supporting only a SYNC operation for bucket flush
2078      */
2079
2080     void* es = getEngineSpecific(cookie);
2081     if (es == NULL) {
2082         // Check if diskDeleteAll was false and set it to true
2083         // if yes, if the atomic variable weren't false, then
2084         // we will assume that a deleteAll has been scheduled
2085         // already and return TMPFAIL.
2086         if (kvBucket->scheduleDeleteAllTask(cookie)) {
2087             storeEngineSpecific(cookie, this);
2088             return ENGINE_EWOULDBLOCK;
2089         } else {
2090             LOG(EXTENSION_LOG_INFO,
2091                 "Tried to trigger a bucket deleteAll, but"
2092                 "there seems to be a task running already!");
2093             return ENGINE_TMPFAIL;
2094         }
2095
2096     } else {
2097         storeEngineSpecific(cookie, NULL);
2098         LOG(EXTENSION_LOG_NOTICE, "Completed bucket deleteAll operation");
2099         return ENGINE_SUCCESS;
2100     }
2101 }
2102
2103 cb::EngineErrorItemPair EventuallyPersistentEngine::get_and_touch(const void* cookie,
2104                                                            const DocKey& key,
2105                                                            uint16_t vbucket,
2106                                                            uint32_t exptime) {
2107     auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2108
2109     time_t expiry_time = exptime;
2110     if (exptime != 0) {
2111         auto* core = serverApi->core;
2112         expiry_time = core->abstime(core->realtime(exptime));
2113     }
2114     GetValue gv(kvBucket->getAndUpdateTtl(key, vbucket, cookie, expiry_time));
2115
2116     auto rv = gv.getStatus();
2117     if (rv == ENGINE_SUCCESS) {
2118         ++stats.numOpsGet;
2119         ++stats.numOpsStore;
2120         return std::make_pair(cb::engine_errc::success,
2121                               cb::unique_item_ptr{gv.getValue(),
2122                                                   cb::ItemDeleter{handle}});
2123     }
2124
2125     if (isDegradedMode()) {
2126         // Remap all some of the error codes
2127         switch (rv) {
2128         case ENGINE_KEY_EEXISTS:
2129         case ENGINE_KEY_ENOENT:
2130         case ENGINE_NOT_MY_VBUCKET:
2131             rv = ENGINE_TMPFAIL;
2132             break;
2133         default:
2134             break;
2135         }
2136     }
2137
2138     if (rv == ENGINE_KEY_EEXISTS) {
2139         rv = ENGINE_LOCKED;
2140     }
2141
2142     return std::make_pair(cb::engine_errc(rv),
2143                           cb::unique_item_ptr{nullptr,
2144                                               cb::ItemDeleter{handle}});
2145 }
2146
2147 cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
2148                                                        const DocKey& key,
2149                                                        uint16_t vbucket,
2150                                                        std::function<bool(const item_info&)>filter) {
2151
2152     auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2153
2154     // Fetch an item from the hashtable (without trying to schedule a bg-fetch
2155     // and pass it through the filter. If the filter accepts the document
2156     // based on the metadata, return the document. If the document's data
2157     // isn't resident we run another iteration in the loop and retries the
2158     // action but this time we _do_ schedule a bg-fetch.
2159     for (int ii = 0; ii < 2; ++ii) {
2160         auto options = static_cast<get_options_t>(HONOR_STATES |
2161                                                   TRACK_REFERENCE |
2162                                                   DELETE_TEMP |
2163                                                   HIDE_LOCKED_CAS |
2164                                                   ALLOW_META_ONLY);
2165         if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
2166             options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
2167         }
2168
2169         BlockTimer timer(&stats.getCmdHisto);
2170         GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2171         ENGINE_ERROR_CODE status = gv.getStatus();
2172
2173         switch (status) {
2174         case ENGINE_SUCCESS:
2175             break;
2176
2177         case ENGINE_KEY_ENOENT: // FALLTHROUGH
2178         case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
2179             if (isDegradedMode()) {
2180                 status = ENGINE_TMPFAIL;
2181             }
2182             // FALLTHROUGH
2183         default:
2184             return std::make_pair(cb::engine_errc(status),
2185                                   cb::unique_item_ptr{nullptr,
2186                                                       cb::ItemDeleter{handle}});
2187         }
2188
2189         auto* item = gv.getValue();
2190         cb::unique_item_ptr ret{item, cb::ItemDeleter{handle}};
2191
2192         const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
2193         const uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
2194
2195         // Currently
2196         if (filter(item->toItemInfo(vb_uuid))) {
2197             if (!gv.isPartial()) {
2198                 return std::make_pair(cb::engine_errc::success,
2199                                cb::unique_item_ptr{ret.release(),
2200                                                    cb::ItemDeleter{handle}});
2201             }
2202             // We want this item, but we need to fetch it off disk
2203         } else {
2204             // the client don't care about this thing..
2205             ret.reset(nullptr);
2206             return std::make_pair(cb::engine_errc::success,
2207                                   cb::unique_item_ptr{ret.release(),
2208                                                       cb::ItemDeleter{handle}});
2209         }
2210     }
2211
2212     // It should not be possible to get as the second iteration in the loop
2213     // SHOULD handle backround fetches an the item should NOT be partial!
2214     throw std::logic_error("EventuallyPersistentEngine::get_if: loop terminated");
2215 }
2216
2217 ENGINE_ERROR_CODE EventuallyPersistentEngine::get_locked(const void* cookie,
2218                                                          item** itm,
2219                                                          const DocKey& key,
2220                                                          uint16_t vbucket,
2221                                                          uint32_t lock_timeout) {
2222
2223     auto default_timeout = static_cast<uint32_t>(getGetlDefaultTimeout());
2224
2225     if (lock_timeout == 0) {
2226         lock_timeout = default_timeout;
2227     } else if (lock_timeout > static_cast<uint32_t>(getGetlMaxTimeout())) {
2228         LOG(EXTENSION_LOG_WARNING,
2229             "EventuallyPersistentEngine::get_locked: "
2230             "Illegal value for lock timeout specified %u. "
2231             "Using default value: %u", lock_timeout, default_timeout);
2232         lock_timeout = default_timeout;
2233     }
2234
2235     auto result = kvBucket->getLocked(key, vbucket, ep_current_time(),
2236                                       lock_timeout, cookie);
2237
2238     if (result.getStatus() == ENGINE_SUCCESS) {
2239         ++stats.numOpsGet;
2240         *itm = result.getValue();
2241     }
2242
2243     return result.getStatus();
2244 }
2245
2246 ENGINE_ERROR_CODE EventuallyPersistentEngine::unlock(const void* cookie,
2247                                                      const DocKey& key,
2248                                                      uint16_t vbucket,
2249                                                      uint64_t cas) {
2250     return kvBucket->unlockKey(key, vbucket, cas, ep_current_time());
2251 }
2252
2253
2254 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2255                                                     item* itm,
2256                                                     uint64_t *cas,
2257                                                     ENGINE_STORE_OPERATION
2258                                                                      operation) {
2259     BlockTimer timer(&stats.storeCmdHisto);
2260     ENGINE_ERROR_CODE ret;
2261     Item *it = static_cast<Item*>(itm);
2262
2263     switch (operation) {
2264     case OPERATION_CAS:
2265         if (it->getCas() == 0) {
2266             // Using a cas command with a cas wildcard doesn't make sense
2267             ret = ENGINE_NOT_STORED;
2268             break;
2269         }
2270         // FALLTHROUGH
2271     case OPERATION_SET:
2272         if (isDegradedMode()) {
2273             return ENGINE_TMPFAIL;
2274         }
2275         ret = kvBucket->set(*it, cookie);
2276         if (ret == ENGINE_SUCCESS) {
2277             *cas = it->getCas();
2278         }
2279
2280         break;
2281
2282     case OPERATION_ADD:
2283         if (isDegradedMode()) {
2284             return ENGINE_TMPFAIL;
2285         }
2286
2287         if (it->getCas() != 0) {
2288             // Adding an item with a cas value doesn't really make sense...
2289             return ENGINE_KEY_EEXISTS;
2290         }
2291
2292         ret = kvBucket->add(*it, cookie);
2293         if (ret == ENGINE_SUCCESS) {
2294             *cas = it->getCas();
2295         }
2296         break;
2297
2298     case OPERATION_REPLACE:
2299         ret = kvBucket->replace(*it, cookie);
2300         if (ret == ENGINE_SUCCESS) {
2301             *cas = it->getCas();
2302         }
2303         break;
2304     default:
2305         ret = ENGINE_ENOTSUP;
2306     }
2307
2308     switch (ret) {
2309     case ENGINE_SUCCESS:
2310         ++stats.numOpsStore;
2311         break;
2312     case ENGINE_ENOMEM:
2313         ret = memoryCondition();
2314         break;
2315     case ENGINE_NOT_STORED:
2316     case ENGINE_NOT_MY_VBUCKET:
2317         if (isDegradedMode()) {
2318             return ENGINE_TMPFAIL;
2319         }
2320         break;
2321     default:
2322         break;
2323     }
2324
2325     return ret;
2326 }
2327
2328 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2329                                                            item **itm,
2330                                                            void **es,
2331                                                            uint16_t *nes,
2332                                                            uint8_t *ttl,
2333                                                            uint16_t *flags,
2334                                                            uint32_t *seqno,
2335                                                            uint16_t *vbucket,
2336                                                            TapProducer
2337                                                                    *connection,
2338                                                            bool &retry) {
2339     *es = NULL;
2340     *nes = 0;
2341     *ttl = (uint8_t)-1;
2342     *seqno = 0;
2343     *flags = 0;
2344     *vbucket = 0;
2345
2346     retry = false;
2347
2348     if (connection->shouldFlush()) {
2349         return TAP_FLUSH;
2350     }
2351
2352     if (connection->isTimeForNoop()) {
2353         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2354             connection->logHeader());
2355         return TAP_NOOP;
2356     }
2357
2358     if (connection->isSuspended() || connection->windowIsFull()) {
2359         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2360             " suspended state or its ack windows is full.\n",
2361             connection->logHeader());
2362         return TAP_PAUSE;
2363     }
2364
2365     uint16_t ret = TAP_PAUSE;
2366     VBucketEvent ev = connection->nextVBucketHighPriority();
2367     if (ev.event != TAP_PAUSE) {
2368         switch (ev.event) {
2369         case TAP_VBUCKET_SET:
2370             LOG(EXTENSION_LOG_NOTICE,
2371                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2372                 connection->logHeader(), ev.vbucket,
2373                 VBucket::toString(ev.state));
2374             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2375             break;
2376         case TAP_OPAQUE:
2377             LOG(EXTENSION_LOG_NOTICE,
2378                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2379                 connection->logHeader(),
2380                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2381                 ev.vbucket);
2382             connection->opaqueCommandCode = (uint32_t) ev.state;
2383             *vbucket = ev.vbucket;
2384             *es = &connection->opaqueCommandCode;
2385             *nes = sizeof(connection->opaqueCommandCode);
2386             break;
2387
2388         default:
2389             throw std::logic_error("EventuallyPersistentEngine::doWalkTapQueue:"
2390                     " Unknown VBucketEvent message type:" +
2391                     std::to_string(ev.event) + " for connection:" +
2392                     connection->logHeader());
2393         }
2394         return ev.event;
2395     }
2396
2397     if (connection->waitForOpaqueMsgAck()) {
2398         return TAP_PAUSE;
2399     }
2400
2401     VBucketFilter backFillVBFilter;
2402     if (connection->runBackfill(backFillVBFilter)) {
2403         queueBackfill(backFillVBFilter, connection);
2404     }
2405
2406     uint8_t nru = INITIAL_NRU_VALUE;
2407     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2408     switch (ret) {
2409     case TAP_CHECKPOINT_START:
2410     case TAP_CHECKPOINT_END:
2411     case TAP_MUTATION:
2412     case TAP_DELETION:
2413         *itm = it;
2414         if (ret == TAP_MUTATION) {
2415             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2416                                                        it->getRevSeqno(), nru);
2417             *es = connection->specificData;
2418         } else if (ret == TAP_DELETION) {
2419             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2420                                                        it->getRevSeqno());
2421             *es = connection->specificData;
2422         } else if (ret == TAP_CHECKPOINT_START) {
2423             // Send the current value of the max deleted seqno
2424             VBucketPtr vb = getVBucket(*vbucket);
2425             if (!vb) {
2426                 retry = true;
2427                 return TAP_NOOP;
2428             }
2429             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2430                                                vb->ht.getMaxDeletedRevSeqno());
2431             *es = connection->specificData;
2432         }
2433         break;
2434     case TAP_NOOP:
2435         retry = true;
2436         break;
2437     default:
2438         break;
2439     }
2440
2441     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2442         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2443         if (vbev.event == TAP_VBUCKET_SET) {
2444             LOG(EXTENSION_LOG_NOTICE,
2445                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2446                 connection->logHeader(), vbev.vbucket,
2447                 VBucket::toString(vbev.state));
2448             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2449         }
2450         ret = vbev.event;
2451     }
2452
2453     return ret;
2454 }
2455
2456 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2457                                                   item **itm,
2458                                                   void **es,
2459                                                   uint16_t *nes,
2460                                                   uint8_t *ttl,
2461                                                   uint16_t *flags,
2462                                                   uint32_t *seqno,
2463                                                   uint16_t *vbucket) {
2464     TapProducer *connection = getTapProducer(cookie);
2465     if (!connection) {
2466         LOG(EXTENSION_LOG_WARNING,
2467             "Failed to lookup TAP connection.. Disconnecting\n");
2468         return TAP_DISCONNECT;
2469     }
2470
2471     connection->setPaused(false);
2472
2473     bool retry = false;
2474     uint16_t ret;
2475
2476     connection->setLastWalkTime();
2477     do {
2478         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2479                              seqno, vbucket, connection, retry);
2480     } while (retry);
2481
2482     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2483         connection->lastMsgTime = ep_current_time();
2484         if (ret == TAP_NOOP) {
2485             *seqno = 0;
2486         } else {
2487             ++stats.numTapFetched;
2488             *seqno = connection->getSeqno();
2489             if (connection->requestAck(ret, *vbucket)) {
2490                 *flags = TAP_FLAG_ACK;
2491                 connection->seqnoAckRequested = *seqno;
2492             }
2493
2494             if (ret == TAP_MUTATION) {
2495                 if (connection->haveFlagByteorderSupport()) {
2496                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2497                 }
2498             }
2499         }
2500     } else {
2501         connection->setPaused(true);
2502         connection->setNotifySent(false);
2503     }
2504
2505     return ret;
2506 }
2507
2508 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2509                                                 std::string &client,
2510                                                 uint32_t flags,
2511                                                 const void *userdata,
2512                                                 size_t nuserdata) {
2513     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2514         return false;
2515     }
2516
2517     std::string tapName = "eq_tapq:";
2518     if (client.length() == 0) {
2519         tapName.assign(ConnHandler::getAnonName());
2520     } else {
2521         tapName.append(client);
2522     }
2523
2524     // Decoding the userdata section of the packet and update the filters
2525     const char *ptr = static_cast<const char*>(userdata);
2526     uint64_t backfillAge = 0;
2527     std::vector<uint16_t> vbuckets;
2528     std::map<uint16_t, uint64_t> lastCheckpointIds;
2529
2530     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2531         if (nuserdata < sizeof(backfillAge)) {
2532             LOG(EXTENSION_LOG_WARNING,
2533                 "Backfill age is missing. Reject connection request from %s\n",
2534                 tapName.c_str());
2535             return false;
2536         }
2537         // use memcpy to avoid alignemt issues
2538         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2539         backfillAge = ntohll(backfillAge);
2540         nuserdata -= sizeof(backfillAge);
2541         ptr += sizeof(backfillAge);
2542     }
2543
2544     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2545         uint16_t nvbuckets;
2546         if (nuserdata < sizeof(nvbuckets)) {
2547             LOG(EXTENSION_LOG_WARNING,
2548             "Number of vbuckets is missing. Reject connection request from %s"
2549             "\n", tapName.c_str());
2550             return false;
2551         }
2552         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2553         nuserdata -= sizeof(nvbuckets);
2554         ptr += sizeof(nvbuckets);
2555         nvbuckets = ntohs(nvbuckets);
2556         if (nvbuckets > 0) {
2557             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2558                 LOG(EXTENSION_LOG_WARNING,
2559                 "# of vbuckets not matched. Reject connection request from %s"
2560                 "\n", tapName.c_str());
2561                 return false;
2562             }
2563             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2564                 uint16_t val;
2565                 memcpy(&val, ptr, sizeof(nvbuckets));
2566                 ptr += sizeof(uint16_t);
2567                 vbuckets.push_back(ntohs(val));
2568             }
2569             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2570         }
2571     }
2572
2573     if (flags & TAP_CONNECT_CHECKPOINT) {
2574         uint16_t nCheckpoints = 0;
2575         if (nuserdata >= sizeof(nCheckpoints)) {
2576             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2577             nuserdata -= sizeof(nCheckpoints);
2578             ptr += sizeof(nCheckpoints);
2579             nCheckpoints = ntohs(nCheckpoints);
2580         }
2581         if (nCheckpoints > 0) {
2582             if (nuserdata <
2583                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2584                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2585                     "Reject connection request from %s\n", tapName.c_str());
2586                 return false;
2587             }
2588             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2589                 uint16_t vbid;
2590                 uint64_t checkpointId;
2591                 memcpy(&vbid, ptr, sizeof(vbid));
2592                 ptr += sizeof(uint16_t);
2593                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2594                 ptr += sizeof(uint64_t);
2595                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2596             }
2597         }
2598     }
2599
2600     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2601                                  backfillAge,
2602                                  static_cast<int>(
2603                                  configuration.getTapKeepalive()),
2604                                  vbuckets,
2605                                  lastCheckpointIds);
2606
2607     tapConnMap->notifyPausedConnection(tp, true);
2608     return true;
2609 }
2610
2611 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2612                                                         void *engine_specific,
2613                                                         uint16_t nengine,
2614                                                         uint8_t ttl,
2615                                                         uint16_t tap_flags,
2616                                                         uint16_t tap_event,
2617                                                         uint32_t tap_seqno,
2618                                                         const void *key,
2619                                                         size_t nkey,
2620                                                         uint32_t flags,
2621                                                         uint32_t exptime,
2622                                                         uint64_t cas,
2623                                                         uint8_t datatype,
2624                                                         const void *data,
2625                                                         size_t ndata,
2626                                                         uint16_t vbucket)
2627 {
2628     (void) ttl;
2629     void *specific = getEngineSpecific(cookie);
2630     ConnHandler *connection = NULL;
2631     if (specific == NULL) {
2632         if (tap_event == TAP_ACK) {
2633             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2634                 "exist. Force disconnect...\n", (char *) cookie);
2635             // tap producer is no longer connected..
2636             return ENGINE_DISCONNECT;
2637         } else {
2638             connection = tapConnMap->newConsumer(cookie);
2639             if (connection == NULL) {
2640                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2641                     " Force disconnect\n");
2642                 return ENGINE_DISCONNECT;
2643             }
2644             storeEngineSpecific(cookie, connection);
2645         }
2646     } else {
2647         connection = reinterpret_cast<ConnHandler *>(specific);
2648     }
2649
2650
2651     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2652
2653     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2654         if (!replicationThrottle->shouldProcess()) {
2655             ++stats.replicationThrottled;
2656             if (connection->supportsAck()) {
2657                 ret = ENGINE_TMPFAIL;
2658             } else {
2659                 ret = ENGINE_DISCONNECT;
2660                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2661                     "ack support. Force disconnect...\n",
2662                     connection->logHeader());
2663             }
2664             return ret;
2665         }
2666     }
2667
2668     switch (tap_event) {
2669     case TAP_ACK:
2670         // TAP works only with the DefaultCollection
2671         ret = processTapAck(cookie, tap_seqno, tap_flags,
2672                             DocKey(static_cast<const uint8_t*>(key), nkey,
2673                                    DocNamespace::DefaultCollection));
2674         break;
2675     case TAP_FLUSH:
2676         ret = flush(cookie);
2677         LOG(EXTENSION_LOG_NOTICE, "%s Received flush.\n",
2678             connection->logHeader());
2679         break;
2680     case TAP_DELETION:
2681         {
2682             uint64_t revSeqno;
2683             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2684                                                 nengine, &revSeqno);
2685
2686             // Create key in DefaultCollection since TAP won't support collections
2687             const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2688                                 DocNamespace::DefaultCollection};
2689             ret = connection->deletion(0, docKey, {}, 0,
2690                                        PROTOCOL_BINARY_RAW_BYTES, cas, vbucket,
2691                                        0, revSeqno, {});
2692         }
2693         break;
2694
2695     case TAP_CHECKPOINT_START:
2696     case TAP_CHECKPOINT_END:
2697         {
2698             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2699             if (tc) {
2700                 if (tap_event == TAP_CHECKPOINT_START &&
2701                     nengine == TapEngineSpecific::sizeRevSeqno) {
2702                     // Set the current value for the max deleted seqno
2703                     VBucketPtr vb = getVBucket(vbucket);
2704                     if (!vb) {
2705                         return ENGINE_TMPFAIL;
2706                     }
2707                     uint64_t seqnum;
2708                     TapEngineSpecific::readSpecificData(tap_event,
2709                                                         engine_specific,
2710                                                         nengine,
2711                                                         &seqnum);
2712                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2713                 }
2714
2715                 if (data) {
2716                     uint64_t checkpointId;
2717                     memcpy(&checkpointId, data, sizeof(checkpointId));
2718                     checkpointId = ntohll(checkpointId);
2719                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2720                                           checkpointId);
2721                 }
2722                 else {
2723                     ret = ENGINE_DISCONNECT;
2724                     LOG(EXTENSION_LOG_WARNING,
2725                         "%s Checkpoint Id is missing in "
2726                         "CHECKPOINT messages. Force disconnect...\n",
2727                         connection->logHeader());
2728                 }
2729             }
2730             else {
2731                 ret = ENGINE_DISCONNECT;
2732                 LOG(EXTENSION_LOG_WARNING,
2733                     "%s not a consumer! Force disconnect\n",
2734                     connection->logHeader());
2735             }
2736         }
2737
2738         break;
2739
2740     case TAP_MUTATION:
2741         {
2742             uint8_t nru = INITIAL_NRU_VALUE;
2743             uint64_t revSeqno = 0;
2744             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2745                                                 nengine, &revSeqno, &nru);
2746
2747             if (!isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_JSON)) {
2748                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2749                 const unsigned char *dat = (const unsigned char*)data;
2750                 const int datlen = ndata;
2751                 if (checkUTF8JSON(dat, datlen)) {
2752                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2753                 }
2754             }
2755             // Create key in DefaultCollection since TAP won't support collections
2756             const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2757                                 DocNamespace::DefaultCollection};
2758             ret = connection->mutation(0, docKey,
2759                                        {static_cast<const uint8_t*>(data), ndata},
2760                                        0, datatype, cas,
2761                                        vbucket, flags, 0, revSeqno, exptime, 0,
2762                                        {}, nru);
2763         }
2764
2765         break;
2766
2767     case TAP_OPAQUE:
2768         if (nengine == sizeof(uint32_t)) {
2769             uint32_t cc;
2770             memcpy(&cc, engine_specific, sizeof(cc));
2771             cc = ntohl(cc);
2772
2773             switch (cc) {
2774             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2775                 // @todo: the memcached core will _ALWAYS_ send nack
2776                 //        if it encounter an error. This should be
2777                 // set as the default when we move to .next after 2.0
2778                 // (currently we need to allow the message for
2779                 // backwards compatibility)
2780                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2781                     connection->logHeader());
2782                 break;
2783             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2784                 connection->setSupportCheckpointSync(true);
2785                 LOG(EXTENSION_LOG_INFO,
2786                     "%s Enable checkpoint synchronization\n",
2787                     connection->logHeader());
2788                 break;
2789             case TAP_OPAQUE_OPEN_CHECKPOINT:
2790                 /**
2791                  * This event is only received by the TAP client that wants to
2792                  * get mutations from closed checkpoints only. At this time,
2793                  * only incremental backup client receives this event so that
2794                  * it can close the connection and reconnect later.
2795                  */
2796                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2797                     connection->logHeader());
2798                 break;
2799             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2800                 {
2801                     LOG(EXTENSION_LOG_INFO,
2802                         "%s Backfill started for vbucket %d.\n",
2803                         connection->logHeader(), vbucket);
2804                     BlockTimer timer(&stats.tapVbucketResetHisto);
2805                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2806                                                   ENGINE_DISCONNECT;
2807                     if (ret == ENGINE_DISCONNECT) {
2808                         LOG(EXTENSION_LOG_WARNING,
2809                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2810                             connection->logHeader(), vbucket);
2811                     } else {
2812                         LOG(EXTENSION_LOG_NOTICE,
2813                          "%s Reset vbucket %d was completed succecssfully.\n",
2814                             connection->logHeader(), vbucket);
2815                     }
2816
2817                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2818                     if (tc) {
2819                         tc->setBackfillPhase(true, vbucket);
2820                     } else {
2821                         ret = ENGINE_DISCONNECT;
2822                         LOG(EXTENSION_LOG_WARNING,
2823                             "TAP consumer doesn't exists. Force disconnect\n");
2824                     }
2825                 }
2826                 break;
2827             case TAP_OPAQUE_CLOSE_BACKFILL:
2828                 {
2829                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2830                         connection->logHeader());
2831                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2832                     if (tc) {
2833                         tc->setBackfillPhase(false, vbucket);
2834                     } else {
2835                         ret = ENGINE_DISCONNECT;
2836                         LOG(EXTENSION_LOG_WARNING,
2837                             "%s not a consumer! Force disconnect\n",
2838                             connection->logHeader());
2839                     }
2840                 }
2841                 break;
2842             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2843                 /**
2844                  * This event is sent by the eVBucketMigrator to notify that
2845                  * the source node closes the tap replication stream and
2846                  * switches to TAKEOVER_VBUCKETS phase.
2847                  * This is just an informative message and doesn't require any
2848                  * action.
2849                  */
2850                 LOG(EXTENSION_LOG_INFO,
2851                 "%s Received close tap stream. Switching to takeover phase.\n",
2852                     connection->logHeader());
2853                 break;
2854             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2855                 /**
2856                  * This opaque message is just for notifying that the source
2857                  * node receives change_vbucket_filter request and processes
2858                  * it successfully.
2859                  */
2860                 LOG(EXTENSION_LOG_INFO,
2861                 "%s Notified that the source node changed a vbucket filter.\n",
2862                     connection->logHeader());
2863                 break;
2864             default:
2865                 LOG(EXTENSION_LOG_WARNING,
2866                     "%s Received an unknown opaque command\n",
2867                     connection->logHeader());
2868             }
2869         } else {
2870             LOG(EXTENSION_LOG_WARNING,
2871                 "%s Received tap opaque with unknown size %d\n",
2872                 connection->logHeader(), nengine);
2873         }
2874         break;
2875
2876     case TAP_VBUCKET_SET:
2877         {
2878             BlockTimer timer(&stats.tapVbucketSetHisto);
2879
2880             if (nengine != sizeof(vbucket_state_t)) {
2881                 // illegal datasize
2882                 LOG(EXTENSION_LOG_WARNING,
2883                     "%s Received TAP_VBUCKET_SET with illegal size."
2884                     " Force disconnect\n", connection->logHeader());
2885                 ret = ENGINE_DISCONNECT;
2886                 break;
2887             }
2888
2889             vbucket_state_t state;
2890             memcpy(&state, engine_specific, nengine);
2891             state = (vbucket_state_t)ntohl(state);
2892
2893             ret = connection->setVBucketState(0, vbucket, state);
2894         }
2895         break;
2896
2897     default:
2898         // Unknown command
2899         LOG(EXTENSION_LOG_WARNING,
2900             "%s Recieved bad opcode, ignoring message\n",
2901             connection->logHeader());
2902     }
2903
2904     connection->processedEvent(tap_event, ret);
2905     return ret;
2906 }
2907
2908 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2909                                                       TapConsumer *consumer,
2910                                                       uint8_t event,
2911                                                       uint16_t vbucket,
2912                                                       uint64_t checkpointId) {
2913     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2914
2915     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2916         getKVBucket()->wakeUpFlusher();
2917         ret = ENGINE_SUCCESS;
2918     }
2919     else {
2920         ret = ENGINE_DISCONNECT;
2921         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2922             "checkpoint %" PRIu64 ". Force disconnect\n",
2923             consumer->logHeader(), checkpointId);
2924     }
2925
2926     return ret;
2927 }
2928
2929 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2930     TapProducer *rv =
2931         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2932     if (!(rv && rv->isConnected())) {
2933         LOG(EXTENSION_LOG_WARNING,
2934             "Walking a non-existent tap queue, disconnecting\n");
2935         return NULL;
2936     }
2937
2938     if (rv->doDisconnect()) {
2939         LOG(EXTENSION_LOG_WARNING,
2940             "%s Disconnecting pending connection\n", rv->logHeader());
2941         return NULL;
2942     }
2943     return rv;
2944 }
2945
2946 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2947     // Register the ON_DISCONNECT callback
2948     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2949     // Register the ON_DELETE_BUCKET callback
2950     registerEngineCallback(ON_DELETE_BUCKET, EvpHandleDeleteBucket, this);
2951 }
2952
2953 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2954                                                             uint32_t seqno,
2955                                                             uint16_t status,
2956                                                             const DocKey& key)
2957 {
2958     TapProducer *connection = getTapProducer(cookie);
2959     if (!connection) {
2960         LOG(EXTENSION_LOG_WARNING,
2961             "Unable to process tap ack. No producer found\n");
2962         return ENGINE_DISCONNECT;
2963     }
2964
2965     return connection->processAck(seqno, status, key);
2966 }
2967
2968 ENGINE_ERROR_CODE EventuallyPersistentEngine::memoryCondition() {
2969     // Do we think it's possible we could free something?
2970     bool haveEvidenceWeCanFreeMemory =
2971         (stats.getMaxDataSize() > stats.memOverhead->load());
2972     if (haveEvidenceWeCanFreeMemory) {
2973         // Look for more evidence by seeing if we have resident items.
2974         VBucketCountVisitor countVisitor(vbucket_state_active);
2975         kvBucket->visit(countVisitor);
2976
2977         haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
2978             countVisitor.getNumItems();
2979     }
2980     if (haveEvidenceWeCanFreeMemory) {
2981         ++stats.tmp_oom_errors;
2982         // Wake up the item pager task as memory usage
2983         // seems to have exceeded high water mark
2984         getKVBucket()->wakeUpItemPager();
2985         return ENGINE_TMPFAIL;
2986     } else {
2987         ++stats.oom_errors;
2988         return ENGINE_ENOMEM;
2989     }
2990 }
2991
2992 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2993                                                              &backfillVBFilter,
2994                                                Producer *tc)
2995 {
2996     auto bfv = std::make_unique<BackFillVisitor>(
2997             this, *tapConnMap, tc, backfillVBFilter);
2998     getKVBucket()->visit(std::move(bfv),
2999                          "Backfill task",
3000                          TaskId::BackfillVisitorTask,
3001                          1);
3002 }
3003
3004 void VBucketCountAggregator::visitBucket(VBucketPtr &vb) {
3005     std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3006     it = visitorMap.find(vb->getState());
3007     if ( it != visitorMap.end() ) {
3008         it->second->visitBucket(vb);
3009     }
3010 }
3011
3012 void VBucketCountAggregator::addVisitor(VBucketCountVisitor* visitor) {
3013     visitorMap[visitor->getVBucketState()] = visitor;
3014 }
3015
3016 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3017                                                            ADD_STAT add_stat) {
3018
3019     configuration.addStats(add_stat, cookie);
3020
3021     EPStats &epstats = getEpStats();
3022     add_casted_stat("ep_version", VERSION, add_stat, cookie);
3023     add_casted_stat("ep_storage_age",
3024                     epstats.dirtyAge, add_stat, cookie);
3025     add_casted_stat("ep_storage_age_highwat",
3026                     epstats.dirtyAgeHighWat, add_stat, cookie);
3027     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3028                     add_stat, cookie);
3029
3030     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3031         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3032     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3033         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3034     }
3035
3036     add_casted_stat("ep_total_enqueued",
3037                     epstats.totalEnqueued, add_stat, cookie);
3038     add_casted_stat("ep_expired_access", epstats.expired_access,
3039                     add_stat, cookie);
3040     add_casted_stat("ep_expired_compactor", epstats.expired_compactor,
3041                     add_stat, cookie);
3042     add_casted_stat("ep_expired_pager", epstats.expired_pager,
3043                     add_stat, cookie);
3044     add_casted_stat("ep_queue_size",
3045                     epstats.diskQueueSize, add_stat, cookie);
3046     add_casted_stat("ep_diskqueue_items",
3047                     epstats.diskQueueSize, add_stat, cookie);
3048     add_casted_stat("ep_vb_backfill_queue_size",
3049                     epstats.vbBackfillQueueSize,
3050                     add_stat,
3051                     cookie);
3052     auto* flusher = kvBucket->getFlusher(EP_PRIMARY_SHARD);
3053     if (flusher) {
3054         add_casted_stat("ep_commit_num", epstats.flusherCommits,
3055                         add_stat, cookie);
3056         add_casted_stat("ep_commit_time",
3057                         epstats.commit_time, add_stat, cookie);
3058         add_casted_stat("ep_commit_time_total",
3059                         epstats.cumulativeCommitTime, add_stat, cookie);
3060         add_casted_stat("ep_item_begin_failed",
3061                         epstats.beginFailed, add_stat, cookie);
3062         add_casted_stat("ep_item_commit_failed",
3063                         epstats.commitFailed, add_stat, cookie);
3064         add_casted_stat("ep_item_flush_expired",
3065                         epstats.flushExpired, add_stat, cookie);
3066         add_casted_stat("ep_item_flush_failed",
3067                         epstats.flushFailed, add_stat, cookie);
3068         add_casted_stat("ep_flusher_state",
3069                         flusher->stateName(), add_stat, cookie);
3070         add_casted_stat("ep_flusher_todo",
3071                         epstats.flusher_todo, add_stat, cookie);
3072         add_casted_stat("ep_total_persisted",
3073                         epstats.totalPersisted, add_stat, cookie);
3074         add_casted_stat("ep_uncommitted_items",
3075                         epstats.flusher_todo, add_stat, cookie);
3076         add_casted_stat("ep_chk_persistence_timeout",
3077                         VBucket::getCheckpointFlushTimeout(),
3078                         add_stat,
3079                         cookie);
3080     }
3081     add_casted_stat("ep_vbucket_del",
3082                     epstats.vbucketDeletions, add_stat, cookie);
3083     add_casted_stat("ep_vbucket_del_fail",
3084                     epstats.vbucketDeletionFail, add_stat, cookie);
3085     add_casted_stat("ep_flush_duration_total",
3086                     epstats.cumulativeFlushTime, add_stat, cookie);
3087
3088     kvBucket->getAggregatedVBucketStats(cookie, add_stat);
3089
3090     kvBucket->getFileStats(cookie, add_stat);
3091
3092     add_casted_stat("ep_persist_vbstate_total",
3093                     epstats.totalPersistVBState, add_stat, cookie);
3094
3095     size_t memUsed =  stats.getTotalMemoryUsed();
3096     add_casted_stat("mem_used", memUsed, add_stat, cookie);
3097     add_casted_stat("ep_mem_low_wat_percent", stats.mem_low_wat_percent,
3098                     add_stat, cookie);
3099     add_casted_stat("ep_mem_high_wat_percent", stats.mem_high_wat_percent,
3100                     add_stat, cookie);
3101     add_casted_stat("bytes", memUsed, add_stat, cookie);
3102     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3103     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3104 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3105     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3106 #else
3107     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3108 #endif
3109     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3110     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3111                     add_stat, cookie);
3112 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3113     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3114 #else
3115     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3116 #endif
3117     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3118     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3119     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3120
3121     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3122     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3123                     add_stat, cookie);
3124     add_casted_stat("ep_mem_tracker_enabled", stats.memoryTrackerEnabled,
3125                     add_stat, cookie);
3126     add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3127                     add_stat, cookie);
3128     add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3129                     add_stat, cookie);
3130     add_casted_stat("ep_bg_remaining_items", epstats.numRemainingBgItems,
3131                     add_stat, cookie);
3132     add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3133                     add_stat, cookie);
3134     add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3135                     add_stat, cookie);
3136     add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3137                     add_stat, cookie);
3138     add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3139                     add_stat, cookie);
3140     add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3141                     add_stat, cookie);
3142     add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3143                     add_stat, cookie);
3144     add_casted_stat("ep_items_rm_from_checkpoints",
3145                     epstats.itemsRemovedFromCheckpoints,
3146                     add_stat, cookie);
3147     add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3148                     add_stat, cookie);
3149     add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3150                     add_stat, cookie);
3151     add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3152                     add_stat, cookie);
3153
3154     add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3155     add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3156                     add_stat, cookie);
3157     add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3158                     add_stat, cookie);
3159     add_casted_stat("ep_pending_ops_max_duration",
3160                     epstats.pendingOpsMaxDuration,
3161                     add_stat, cookie);
3162
3163     add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3164                     add_stat, cookie);
3165     add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3166                     add_stat, cookie);
3167
3168     size_t vbDeletions = epstats.vbucketDeletions.load();
3169     if (vbDeletions > 0) {
3170         add_casted_stat("ep_vbucket_del_max_walltime",
3171                         epstats.vbucketDelMaxWalltime,
3172                         add_stat, cookie);
3173         add_casted_stat("ep_vbucket_del_avg_walltime",
3174                         epstats.vbucketDelTotWalltime / vbDeletions,
3175                         add_stat, cookie);
3176     }
3177
3178     size_t numBgOps = epstats.bgNumOperations.load();
3179     if (numBgOps > 0) {
3180         add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3181                         add_stat, cookie);
3182         add_casted_stat("ep_bg_min_wait",
3183                         epstats.bgMinWait,
3184                         add_stat, cookie);
3185         add_casted_stat("ep_bg_max_wait",
3186                         epstats.bgMaxWait,
3187                         add_stat, cookie);
3188         add_casted_stat("ep_bg_wait_avg",
3189                         epstats.bgWait / numBgOps,
3190                         add_stat, cookie);
3191         add_casted_stat("ep_bg_min_load",
3192                         epstats.bgMinLoad,
3193                         add_stat, cookie);
3194         add_casted_stat("ep_bg_max_load",
3195                         epstats.bgMaxLoad,
3196                         add_stat, cookie);
3197         add_casted_stat("ep_bg_load_avg",
3198                         epstats.bgLoad / numBgOps,
3199                         add_stat, cookie);
3200         add_casted_stat("ep_bg_wait",
3201                         epstats.bgWait,
3202                         add_stat, cookie);
3203         add_casted_stat("ep_bg_load",
3204                         epstats.bgLoad,
3205                         add_stat, cookie);
3206     }
3207
3208     add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3209
3210     add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3211                     add_stat, cookie);
3212     add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3213                     add_stat, cookie);
3214     add_casted_stat("ep_num_access_scanner_skips",
3215                     epstats.accessScannerSkips, add_stat, cookie);
3216     add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3217                     add_stat, cookie);
3218     add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3219                     add_stat, cookie);
3220
3221     if (kvBucket->isAccessScannerEnabled() && epstats.alogTime.load() != 0)
3222     {
3223         char timestr[20];
3224         struct tm alogTim;
3225         hrtime_t alogTime = epstats.alogTime.load();
3226         if (cb_gmtime_r((time_t *)&alogTime, &alogTim) == -1) {
3227             add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3228                             cookie);
3229         } else {
3230             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3231             add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3232                             cookie);
3233         }
3234     } else {
3235         add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3236                         add_stat, cookie);
3237     }
3238
3239     if (kvBucket->isExpPagerEnabled()) {
3240         char timestr[20];
3241         struct tm expPagerTim;
3242         hrtime_t expPagerTime = epstats.expPagerTime.load();
3243         if (cb_gmtime_r((time_t *)&expPagerTime, &expPagerTim) == -1) {
3244             add_casted_stat("ep_expiry_pager_task_time", "UNKNOWN", add_stat,
3245                             cookie);
3246         } else {
3247             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &expPagerTim);
3248             add_casted_stat("ep_expiry_pager_task_time", timestr, add_stat,
3249                             cookie);
3250         }
3251     } else {
3252         add_casted_stat("ep_expiry_pager_task_time", "NOT_SCHEDULED",
3253                         add_stat, cookie);
3254     }
3255
3256     add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
3257
3258     if (getConfiguration().isWarmup()) {
3259         Warmup *wp = kvBucket->getWarmup();
3260         if (wp == nullptr) {
3261             throw std::logic_error("EPEngine::doEngineStats: warmup is NULL");
3262         }
3263         if (!kvBucket->isWarmingUp()) {
3264             add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
3265         } else {
3266             add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);
3267         }</