MB-24221: Don't fetch deleted values as part of get_if
[ep-engine.git] / src / ep_engine.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21
22 #include "backfill.h"
23 #include "common.h"
24 #include "connmap.h"
25 #include "dcp/consumer.h"
26 #include "dcp/dcpconnmap.h"
27 #include "dcp/flow-control-manager.h"
28 #include "dcp/producer.h"
29 #include "ep_bucket.h"
30 #include "ep_vb.h"
31 #include "ephemeral_bucket.h"
32 #include "failover-table.h"
33 #include "flusher.h"
34 #include "htresizer.h"
35 #include "logger.h"
36 #include "memory_tracker.h"
37 #include "replicationthrottle.h"
38 #include "stats-info.h"
39 #define STATWRITER_NAMESPACE core_engine
40 #include "statwriter.h"
41 #undef STATWRITER_NAMESPACE
42 #include "string_utils.h"
43 #include "tapconnmap.h"
44 #include "vb_count_visitor.h"
45 #include "warmup.h"
46
47 #include <JSON_checker.h>
48 #include <cJSON_utils.h>
49 #include <memcached/engine.h>
50 #include <memcached/extension.h>
51 #include <memcached/protocol_binary.h>
52 #include <memcached/server_api.h>
53 #include <memcached/util.h>
54 #include <platform/cb_malloc.h>
55 #include <platform/checked_snprintf.h>
56 #include <platform/make_unique.h>
57 #include <platform/platform.h>
58 #include <platform/processclock.h>
59 #include <xattr/utils.h>
60
61 #include <cstdio>
62 #include <cstring>
63 #include <fcntl.h>
64 #include <fstream>
65 #include <iostream>
66 #include <limits>
67 #include <mutex>
68 #include <stdarg.h>
69 #include <string>
70 #include <vector>
71
72 static size_t percentOf(size_t val, double percent) {
73     return static_cast<size_t>(static_cast<double>(val) * percent);
74 }
75
76 struct EPHandleReleaser {
77     void operator()(EventuallyPersistentEngine*) {
78         ObjectRegistry::onSwitchThread(nullptr);
79     }
80 };
81
82 using EPHandle = std::unique_ptr<EventuallyPersistentEngine, EPHandleReleaser>;
83
84 /**
85  * Helper function to acquire a handle to the engine which allows access to
86  * the engine while the handle is in scope.
87  * @param handle pointer to the engine
88  * @return EPHandle which is a unique_ptr to an EventuallyPersistentEngine
89  * with a custom deleter (EPHandleReleaser) which performs the required
90  * ObjectRegistry release.
91  */
92
93 static inline EPHandle acquireEngine(ENGINE_HANDLE* handle) {
94     auto ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
95     ObjectRegistry::onSwitchThread(ret);
96
97     return EPHandle(ret);
98 }
99
100 /**
101  * Call the response callback and return the appropriate value so that
102  * the core knows what to do..
103  */
104 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
105                                       uint16_t keylen,
106                                       const void *ext, uint8_t extlen,
107                                       const void *body, uint32_t bodylen,
108                                       uint8_t datatype, uint16_t status,
109                                       uint64_t cas, const void *cookie)
110 {
111     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
112     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
113     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
114                  status, cas, cookie)) {
115         rv = ENGINE_SUCCESS;
116     }
117     ObjectRegistry::onSwitchThread(e);
118     return rv;
119 }
120
121 template <typename T>
122 static void validate(T v, T l, T h) {
123     if (v < l || v > h) {
124         throw std::runtime_error("Value out of range.");
125     }
126 }
127
128
129 static void checkNumeric(const char* str) {
130     int i = 0;
131     if (str[0] == '-') {
132         i++;
133     }
134     for (; str[i]; i++) {
135         using namespace std;
136         if (!isdigit(str[i])) {
137             throw std::runtime_error("Value is not numeric");
138         }
139     }
140 }
141
142 static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle) {
143     return acquireEngine(handle)->getInfo();
144 }
145
146 static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
147                                        const char* config_str) {
148     return acquireEngine(handle)->initialize(config_str);
149 }
150
151 static void EvpDestroy(ENGINE_HANDLE* handle, const bool force) {
152     auto eng = acquireEngine(handle);
153     eng->destroy(force);
154     delete eng.get();
155 }
156
157 static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
158                                          const void* cookie,
159                                          item** itm,
160                                          const DocKey& key,
161                                          const size_t nbytes,
162                                          const int flags,
163                                          const rel_time_t exptime,
164                                          uint8_t datatype,
165                                          uint16_t vbucket) {
166     if (!mcbp::datatype::is_valid(datatype)) {
167         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
168             " (ItemAllocate)");
169         return ENGINE_EINVAL;
170     }
171
172     return acquireEngine(handle)->itemAllocate(itm,
173                                                key,
174                                                nbytes,
175                                                0, // No privileged bytes
176                                                flags,
177                                                exptime,
178                                                datatype,
179                                                vbucket);
180 }
181
182 static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
183                            const item* itm, item_info *itm_info);
184 static void EvpItemRelease(ENGINE_HANDLE* handle, const void *cookie,
185                            item* itm);
186
187
188 static std::pair<cb::unique_item_ptr, item_info> EvpItemAllocateEx(ENGINE_HANDLE* handle,
189                                                                    const void* cookie,
190                                                                    const DocKey& key,
191                                                                    const size_t nbytes,
192                                                                    const size_t priv_nbytes,
193                                                                    const int flags,
194                                                                    const rel_time_t exptime,
195                                                                    uint8_t datatype,
196                                                                    uint16_t vbucket) {
197
198     item* it = nullptr;
199     auto err = acquireEngine(handle)->itemAllocate(
200             &it, key, nbytes, priv_nbytes, flags, exptime, datatype, vbucket);
201
202     if (err != ENGINE_SUCCESS) {
203         throw cb::engine_error(cb::engine_errc(err),
204                                "EvpItemAllocateEx: failed to allocate memory");
205     }
206
207     item_info info;
208     if (!EvpGetItemInfo(handle, cookie, it, &info)) {
209         EvpItemRelease(handle, cookie, it);
210         throw cb::engine_error(cb::engine_errc::failed,
211                                "EvpItemAllocateEx: EvpGetItemInfo failed");
212     }
213
214     return std::make_pair(cb::unique_item_ptr{it, cb::ItemDeleter{handle}},
215                           info);
216 }
217
218 static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
219                                        const void* cookie,
220                                        const DocKey& key,
221                                        uint64_t* cas,
222                                        uint16_t vbucket,
223                                        mutation_descr_t* mut_info) {
224     if (!cas) {
225         LOG(EXTENSION_LOG_WARNING,
226             "EvpItemDelete(): cas ptr passed is null for vb: %" PRIu16,
227             vbucket);
228         return ENGINE_EINVAL;
229     }
230     return acquireEngine(handle)->itemDelete(
231             cookie, key, *cas, vbucket, nullptr, mut_info);
232 }
233
234 static void EvpItemRelease(ENGINE_HANDLE* handle,
235                            const void* cookie,
236                            item* itm) {
237     acquireEngine(handle)->itemRelease(cookie, itm);
238 }
239
240 static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
241                                 const void* cookie,
242                                 item** itm,
243                                 const DocKey& key,
244                                 uint16_t vbucket,
245                                 DocStateFilter documentStateFilter) {
246     get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
247                                                        HONOR_STATES |
248                                                        TRACK_REFERENCE |
249                                                        DELETE_TEMP |
250                                                        HIDE_LOCKED_CAS |
251                                                        TRACK_STATISTICS);
252
253     switch (documentStateFilter) {
254     case DocStateFilter::Alive:
255         break;
256     case DocStateFilter::Deleted:
257         // MB-23640 was caused by this bug as the frontend asked for
258         // Alive and Deleted documents. The internals don't have a
259         // way of requesting just deleted documents, and luckily for
260         // us no part of our code is using this yet. Return an error
261         // if anyone start using it
262         return ENGINE_ENOTSUP;
263     case DocStateFilter::AliveOrDeleted:
264         options = static_cast<get_options_t>(options | GET_DELETED_VALUE);
265         break;
266     }
267
268     return acquireEngine(handle)->get(cookie, itm, key, vbucket, options);
269 }
270
271 static cb::EngineErrorItemPair EvpGetIf(ENGINE_HANDLE* handle,
272                                         const void* cookie,
273                                         const DocKey& key,
274                                         uint16_t vbucket,
275                                         std::function<bool(
276                                             const item_info&)> filter) {
277     return acquireEngine(handle)->get_if(cookie, key, vbucket, filter);
278 }
279
280 static cb::EngineErrorItemPair EvpGetAndTouch(ENGINE_HANDLE* handle,
281                                               const void* cookie,
282                                               const DocKey& key,
283                                               uint16_t vbucket,
284                                               uint32_t expiry_time) {
285     return acquireEngine(handle)->get_and_touch(cookie, key, vbucket,
286                                                 expiry_time);
287 }
288
289 static ENGINE_ERROR_CODE EvpGetLocked(ENGINE_HANDLE* handle,
290                                       const void* cookie,
291                                       item** itm,
292                                       const DocKey& key,
293                                       uint16_t vbucket,
294                                       uint32_t lock_timeout) {
295     return acquireEngine(handle)->get_locked(
296             cookie, itm, key, vbucket, lock_timeout);
297 }
298
299 static ENGINE_ERROR_CODE EvpUnlock(ENGINE_HANDLE* handle,
300                                    const void* cookie,
301                                    const DocKey& key,
302                                    uint16_t vbucket,
303                                    uint64_t cas) {
304     return acquireEngine(handle)->unlock(cookie, key, vbucket, cas);
305 }
306
307 static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
308                                      const void* cookie,
309                                      const char* stat_key,
310                                      int nkey,
311                                      ADD_STAT add_stat) {
312     return acquireEngine(handle)->getStats(cookie, stat_key, nkey, add_stat);
313 }
314
315 static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
316                                   const void* cookie,
317                                   item* itm,
318                                   uint64_t* cas,
319                                   ENGINE_STORE_OPERATION operation,
320                                   DocumentState document_state) {
321     auto engine = acquireEngine(handle);
322
323     if (document_state == DocumentState::Deleted) {
324         Item* item = static_cast<Item*>(itm);
325         item->setDeleted();
326     }
327
328     return engine->store(cookie, itm, cas, operation);
329 }
330
331 static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
332                                   const void* cookie) {
333     return acquireEngine(handle)->flush(cookie);
334 }
335
336 static void EvpResetStats(ENGINE_HANDLE* handle, const void*) {
337     acquireEngine(handle)->resetStats();
338 }
339
340 protocol_binary_response_status EventuallyPersistentEngine::setTapParam(
341         const char* keyz, const char* valz, std::string& msg) {
342     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
343
344     try {
345         if (strcmp(keyz, "tap_keepalive") == 0) {
346             int v = std::stoi(valz);
347             validate(v, 0, MAX_TAP_KEEP_ALIVE);
348             getConfiguration().requirementsMetOrThrow("tap_keepalive");
349             setTapKeepAlive(static_cast<uint32_t>(v));
350         } else if (strcmp(keyz, "replication_throttle_threshold") == 0) {
351             getConfiguration().setReplicationThrottleThreshold(
352                     std::stoull(valz));
353         } else if (strcmp(keyz, "replication_throttle_queue_cap") == 0) {
354             getConfiguration().setReplicationThrottleQueueCap(std::stoll(valz));
355         } else if (strcmp(keyz, "replication_throttle_cap_pcnt") == 0) {
356             getConfiguration().setReplicationThrottleCapPcnt(std::stoull(valz));
357         } else {
358             msg = "Unknown config param";
359             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
360         }
361         // Handles exceptions thrown by the standard
362         // library stoi/stoul style functions when not numeric
363     } catch (std::invalid_argument&) {
364         msg = "Argument was not numeric";
365         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
366
367         // Handles exceptions thrown by the standard library stoi/stoul
368         // style functions when the conversion does not fit in the datatype
369     } catch (std::out_of_range&) {
370         msg = "Argument was out of range";
371         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
372
373         // Handles any miscellaenous exceptions in addition to the range_error
374         // exceptions thrown by the configuration::set<param>() methods
375     } catch (std::exception& error) {
376         msg = error.what();
377         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
378     }
379
380     return rv;
381 }
382
383 protocol_binary_response_status EventuallyPersistentEngine::setCheckpointParam(
384         const char* keyz, const char* valz, std::string& msg) {
385     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
386
387     try {
388         if (strcmp(keyz, "chk_max_items") == 0) {
389             size_t v = std::stoull(valz);
390             validate(v, size_t(MIN_CHECKPOINT_ITEMS),
391                      size_t(MAX_CHECKPOINT_ITEMS));
392             getConfiguration().setChkMaxItems(v);
393         } else if (strcmp(keyz, "chk_period") == 0) {
394             size_t v = std::stoull(valz);
395             validate(v, size_t(MIN_CHECKPOINT_PERIOD),
396                      size_t(MAX_CHECKPOINT_PERIOD));
397             getConfiguration().setChkPeriod(v);
398         } else if (strcmp(keyz, "max_checkpoints") == 0) {
399             size_t v = std::stoull(valz);
400             validate(v, size_t(DEFAULT_MAX_CHECKPOINTS),
401                      size_t(MAX_CHECKPOINTS_UPPER_BOUND));
402             getConfiguration().setMaxCheckpoints(v);
403         } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
404             getConfiguration().setItemNumBasedNewChk(cb_stob(valz));
405         } else if (strcmp(keyz, "keep_closed_chks") == 0) {
406             getConfiguration().setKeepClosedChks(cb_stob(valz));
407         } else if (strcmp(keyz, "enable_chk_merge") == 0) {
408             getConfiguration().setEnableChkMerge(cb_stob(valz));
409         } else {
410             msg = "Unknown config param";
411             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
412         }
413
414         // Handles exceptions thrown by the cb_stob function
415     } catch (invalid_argument_bool& error) {
416         msg = error.what();
417         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
418
419         // Handles exceptions thrown by the standard
420         // library stoi/stoul style functions when not numeric
421     } catch (std::invalid_argument&) {
422         msg = "Argument was not numeric";
423         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
424
425         // Handles exceptions thrown by the standard library stoi/stoul
426         // style functions when the conversion does not fit in the datatype
427     } catch (std::out_of_range&) {
428         msg = "Argument was out of range";
429         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
430
431         // Handles any miscellaenous exceptions in addition to the range_error
432         // exceptions thrown by the configuration::set<param>() methods
433     } catch (std::exception& error) {
434         msg = error.what();
435         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
436     }
437
438     return rv;
439 }
440
441 protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
442         const char* keyz, const char* valz, std::string& msg) {
443     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
444
445     // Handle the actual mutation.
446     try {
447         if (strcmp(keyz, "bg_fetch_delay") == 0) {
448             getConfiguration().setBgFetchDelay(std::stoull(valz));
449         } else if (strcmp(keyz, "flushall_enabled") == 0) {
450             getConfiguration().setFlushallEnabled(cb_stob(valz));
451         } else if (strcmp(keyz, "max_size") == 0) {
452             size_t vsize = std::stoull(valz);
453
454             getConfiguration().setMaxSize(vsize);
455             EPStats& st = getEpStats();
456             getConfiguration().setMemLowWat(
457                     percentOf(vsize, st.mem_low_wat_percent));
458             getConfiguration().setMemHighWat(
459                     percentOf(vsize, st.mem_high_wat_percent));
460         } else if (strcmp(keyz, "mem_low_wat") == 0) {
461             getConfiguration().setMemLowWat(std::stoull(valz));
462         } else if (strcmp(keyz, "mem_high_wat") == 0) {
463             getConfiguration().setMemHighWat(std::stoull(valz));
464         } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
465             getConfiguration().setBackfillMemThreshold(std::stoull(valz));
466         } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
467             getConfiguration().setCompactionExpMemThreshold(std::stoull(valz));
468         } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
469             getConfiguration().setMutationMemThreshold(std::stoull(valz));
470         } else if (strcmp(keyz, "timing_log") == 0) {
471             EPStats& stats = getEpStats();
472             std::ostream* old = stats.timingLog;
473             stats.timingLog = NULL;
474             delete old;
475             if (strcmp(valz, "off") == 0) {
476                 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
477             } else {
478                 std::ofstream* tmp(new std::ofstream(valz));
479                 if (tmp->good()) {
480                     LOG(EXTENSION_LOG_INFO,
481                         "Logging detailed timings to ``%s''.", valz);
482                     stats.timingLog = tmp;
483                 } else {
484                     LOG(EXTENSION_LOG_WARNING,
485                         "Error setting detailed timing log to ``%s'':  %s",
486                         valz, strerror(errno));
487                     delete tmp;
488                 }
489             }
490         } else if (strcmp(keyz, "exp_pager_enabled") == 0) {
491             getConfiguration().setExpPagerEnabled(cb_stob(valz));
492         } else if (strcmp(keyz, "exp_pager_stime") == 0) {
493             getConfiguration().setExpPagerStime(std::stoull(valz));
494         } else if (strcmp(keyz, "exp_pager_initial_run_time") == 0) {
495             getConfiguration().setExpPagerInitialRunTime(std::stoll(valz));
496         } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
497             getConfiguration().requirementsMetOrThrow("access_scanner_enabled");
498             getConfiguration().setAccessScannerEnabled(cb_stob(valz));
499         } else if (strcmp(keyz, "alog_sleep_time") == 0) {
500             getConfiguration().requirementsMetOrThrow("alog_sleep_time");
501             getConfiguration().setAlogSleepTime(std::stoull(valz));
502         } else if (strcmp(keyz, "alog_task_time") == 0) {
503             getConfiguration().requirementsMetOrThrow("alog_task_time");
504             getConfiguration().setAlogTaskTime(std::stoull(valz));
505         } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
506             getConfiguration().setPagerActiveVbPcnt(std::stoull(valz));
507         } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
508             getConfiguration().setWarmupMinMemoryThreshold(std::stoull(valz));
509         } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
510             getConfiguration().setWarmupMinItemsThreshold(std::stoull(valz));
511         } else if (strcmp(keyz, "max_num_readers") == 0 ||
512                    strcmp(keyz, "num_reader_threads") == 0) {
513             size_t value = std::stoull(valz);
514             getConfiguration().setNumReaderThreads(value);
515             ExecutorPool::get()->setNumReaders(value);
516         } else if (strcmp(keyz, "max_num_writers") == 0 ||
517                    strcmp(keyz, "num_writer_threads") == 0) {
518             size_t value = std::stoull(valz);
519             getConfiguration().setNumWriterThreads(value);
520             ExecutorPool::get()->setNumWriters(value);
521         } else if (strcmp(keyz, "max_num_auxio") == 0 ||
522                    strcmp(keyz, "num_auxio_threads") == 0) {
523             size_t value = std::stoull(valz);
524             getConfiguration().setNumAuxioThreads(value);
525             ExecutorPool::get()->setNumAuxIO(value);
526         } else if (strcmp(keyz, "max_num_nonio") == 0 ||
527                    strcmp(keyz, "num_nonio_threads") == 0) {
528             size_t value = std::stoull(valz);
529             getConfiguration().setNumNonioThreads(value);
530             ExecutorPool::get()->setNumNonIO(value);
531         } else if (strcmp(keyz, "bfilter_enabled") == 0) {
532             getConfiguration().setBfilterEnabled(cb_stob(valz));
533         } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
534             getConfiguration().setBfilterResidencyThreshold(std::stof(valz));
535         } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
536             getConfiguration().setDefragmenterEnabled(cb_stob(valz));
537         } else if (strcmp(keyz, "defragmenter_interval") == 0) {
538             size_t v = std::stoull(valz);
539             // Adding separate validation as external limit is minimum 1
540             // to prevent setting defragmenter to constantly run
541             validate(v, size_t(1), std::numeric_limits<size_t>::max());
542             getConfiguration().setDefragmenterInterval(v);
543         } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
544             getConfiguration().setDefragmenterAgeThreshold(std::stoull(valz));
545         } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
546             getConfiguration().setDefragmenterChunkDuration(std::stoull(valz));
547         } else if (strcmp(keyz, "defragmenter_run") == 0) {
548             runDefragmenterTask();
549         } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
550             getConfiguration().setCompactionWriteQueueCap(std::stoull(valz));
551         } else if (strcmp(keyz, "dcp_min_compression_ratio") == 0) {
552             getConfiguration().setDcpMinCompressionRatio(std::stof(valz));
553         } else if (strcmp(keyz, "access_scanner_run") == 0) {
554             if (!(runAccessScannerTask())) {
555                 rv = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
556             }
557         } else if (strcmp(keyz, "vb_state_persist_run") == 0) {
558             runVbStatePersistTask(std::stoi(valz));
559         } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
560             getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
561             getConfiguration().setEphemeralFullPolicy(valz);
562         } else if (strcmp(keyz, "ephemeral_metadata_purge_age") == 0) {
563             getConfiguration().requirementsMetOrThrow(
564                     "ephemeral_metadata_purge_age");
565             getConfiguration().setEphemeralMetadataPurgeAge(std::stoull(valz));
566         } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
567             getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
568             getConfiguration().setEphemeralMetadataPurgeInterval(
569                     std::stoull(valz));
570         } else if (strcmp(keyz, "mem_merge_count_threshold") == 0) {
571             getConfiguration().setMemMergeCountThreshold(std::stoul(valz));
572         } else if (strcmp(keyz, "mem_merge_bytes_threshold") == 0) {
573             getConfiguration().setMemMergeBytesThreshold(std::stoul(valz));
574         } else {
575             msg = "Unknown config param";
576             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
577         }
578         // Handles exceptions thrown by the cb_stob function
579     } catch (invalid_argument_bool& error) {
580         msg = error.what();
581         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
582
583         // Handles exceptions thrown by the standard
584         // library stoi/stoul style functions when not numeric
585     } catch (std::invalid_argument&) {
586         msg = "Argument was not numeric";
587         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
588
589         // Handles exceptions thrown by the standard library stoi/stoul
590         // style functions when the conversion does not fit in the datatype
591     } catch (std::out_of_range&) {
592         msg = "Argument was out of range";
593         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
594
595         // Handles any miscellaneous exceptions in addition to the range_error
596         // exceptions thrown by the configuration::set<param>() methods
597     } catch (std::exception& error) {
598         msg = error.what();
599         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
600     }
601
602     return rv;
603 }
604
605 protocol_binary_response_status EventuallyPersistentEngine::setDcpParam(
606         const char* keyz, const char* valz, std::string& msg) {
607     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
608     try {
609
610         if (strcmp(keyz,
611                    "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
612             size_t v = atoi(valz);
613             checkNumeric(valz);
614             validate(v, size_t(1), std::numeric_limits<size_t>::max());
615             getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(
616                     v);
617         } else if (
618             strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") ==
619             0) {
620             size_t v = atoi(valz);
621             checkNumeric(valz);
622             validate(v, size_t(1), std::numeric_limits<size_t>::max());
623             getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(
624                     v);
625         } else {
626             msg = "Unknown config param";
627             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
628         }
629     } catch (std::runtime_error& ex) {
630         msg = "Value out of range.";
631         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
632     }
633
634     return rv;
635 }
636
637 protocol_binary_response_status EventuallyPersistentEngine::setVbucketParam(
638         uint16_t vbucket,
639         const char* keyz,
640         const char* valz,
641         std::string& msg) {
642     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
643     try {
644         if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
645             uint64_t v = std::strtoull(valz, nullptr, 10);
646             checkNumeric(valz);
647             getConfiguration().setHlcDriftAheadThresholdUs(v);
648         } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
649             uint64_t v = std::strtoull(valz, nullptr, 10);
650             checkNumeric(valz);
651             getConfiguration().setHlcDriftBehindThresholdUs(v);
652         } else if (strcmp(keyz, "max_cas") == 0) {
653             uint64_t v = std::strtoull(valz, nullptr, 10);
654             checkNumeric(valz);
655             LOG(EXTENSION_LOG_WARNING, "setVbucketParam: max_cas:%" PRIu64 " "
656                 "vb:%" PRIu16 "\n", v, vbucket);
657             if (getKVBucket()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
658                 rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
659                 msg = "Not my vbucket";
660             }
661         } else {
662             msg = "Unknown config param";
663             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
664         }
665     } catch (std::runtime_error& ex) {
666         msg = "Value out of range.";
667         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
668     }
669     return rv;
670 }
671
672 static protocol_binary_response_status evictKey(
673     EventuallyPersistentEngine* e,
674     protocol_binary_request_header
675     * request,
676     const char** msg,
677     size_t* msg_size,
678     DocNamespace docNamespace) {
679     protocol_binary_request_no_extras* req =
680         (protocol_binary_request_no_extras*)request;
681
682     const uint8_t* keyPtr = reinterpret_cast<const uint8_t*>(request) +
683                             sizeof(*request);
684     size_t keylen = ntohs(req->message.header.request.keylen);
685     uint16_t vbucket = ntohs(request->request.vbucket);
686
687     LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key{%.*s}\n",
688         int(keylen), keyPtr);
689     msg_size = 0;
690     auto rv = e->evictKey(DocKey(keyPtr, keylen, docNamespace), vbucket, msg);
691     if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
692         rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
693         if (e->isDegradedMode()) {
694             return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
695         }
696     }
697     return rv;
698 }
699
700 protocol_binary_response_status EventuallyPersistentEngine::setParam(
701         protocol_binary_request_set_param* req, std::string& msg) {
702     size_t keylen = ntohs(req->message.header.request.keylen);
703     uint8_t extlen = req->message.header.request.extlen;
704     size_t vallen = ntohl(req->message.header.request.bodylen);
705     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
706     protocol_binary_engine_param_t paramtype =
707         static_cast<protocol_binary_engine_param_t>(ntohl(
708             req->message.body.param_type));
709
710     if (keylen == 0 || (vallen - keylen - extlen) == 0) {
711         return PROTOCOL_BINARY_RESPONSE_EINVAL;
712     }
713
714     const char* keyp = reinterpret_cast<const char*>(req->bytes)
715                        + sizeof(req->bytes);
716     const char* valuep = keyp + keylen;
717     vallen -= (keylen + extlen);
718
719     char keyz[128];
720     char valz[512];
721
722     // Read the key.
723     if (keylen >= sizeof(keyz)) {
724         msg = "Key is too large.";
725         return PROTOCOL_BINARY_RESPONSE_EINVAL;
726     }
727     memcpy(keyz, keyp, keylen);
728     keyz[keylen] = 0x00;
729
730     // Read the value.
731     if (vallen >= sizeof(valz)) {
732         msg = "Value is too large.";
733         return PROTOCOL_BINARY_RESPONSE_EINVAL;
734     }
735     memcpy(valz, valuep, vallen);
736     valz[vallen] = 0x00;
737
738     protocol_binary_response_status rv;
739
740     switch (paramtype) {
741     case protocol_binary_engine_param_flush:
742         rv = setFlushParam(keyz, valz, msg);
743         break;
744     case protocol_binary_engine_param_tap:
745         rv = setTapParam(keyz, valz, msg);
746         break;
747     case protocol_binary_engine_param_checkpoint:
748         rv = setCheckpointParam(keyz, valz, msg);
749         break;
750     case protocol_binary_engine_param_dcp:
751         rv = setDcpParam(keyz, valz, msg);
752         break;
753     case protocol_binary_engine_param_vbucket:
754         rv = setVbucketParam(vbucket, keyz, valz, msg);
755         break;
756     default:
757         rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
758     }
759
760     return rv;
761 }
762
763 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine* e,
764                                     const void* cookie,
765                                     protocol_binary_request_header* request,
766                                     ADD_RESPONSE response) {
767     protocol_binary_request_get_vbucket* req =
768         reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
769     if (req == nullptr) {
770         throw std::invalid_argument("getVBucket: Unable to convert req"
771                                         " to protocol_binary_request_get_vbucket");
772     }
773
774     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
775     VBucketPtr vb = e->getVBucket(vbucket);
776     if (!vb) {
777         return e->sendNotMyVBucketResponse(response, cookie, 0);
778     } else {
779         vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
780         return sendResponse(response, NULL, 0, NULL, 0, &state,
781                             sizeof(state),
782                             PROTOCOL_BINARY_RAW_BYTES,
783                             PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
784     }
785 }
786
787 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine* e,
788                                     const void* cookie,
789                                     protocol_binary_request_header* request,
790                                     ADD_RESPONSE response) {
791
792     protocol_binary_request_set_vbucket* req =
793         reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
794
795     uint64_t cas = ntohll(req->message.header.request.cas);
796
797     size_t bodylen = ntohl(req->message.header.request.bodylen)
798                      - ntohs(req->message.header.request.keylen);
799     if (bodylen != sizeof(vbucket_state_t)) {
800         const std::string msg("Incorrect packet format");
801         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
802                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
803                             PROTOCOL_BINARY_RESPONSE_EINVAL,
804                             cas, cookie);
805     }
806
807     vbucket_state_t state;
808     memcpy(&state, &req->message.body.state, sizeof(state));
809     state = static_cast<vbucket_state_t>(ntohl(state));
810
811     if (!is_valid_vbucket_state_t(state)) {
812         const std::string msg("Invalid vbucket state");
813         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
814                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
815                             PROTOCOL_BINARY_RESPONSE_EINVAL,
816                             cas, cookie);
817     }
818
819     uint16_t vb = ntohs(req->message.header.request.vbucket);
820     if (e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
821         const std::string msg("VBucket number too big");
822         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
823                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
824                             PROTOCOL_BINARY_RESPONSE_ERANGE,
825                             cas, cookie);
826     }
827     return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
828                         PROTOCOL_BINARY_RAW_BYTES,
829                         PROTOCOL_BINARY_RESPONSE_SUCCESS,
830                         cas, cookie);
831 }
832
833 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine* e,
834                                     const void* cookie,
835                                     protocol_binary_request_header* req,
836                                     ADD_RESPONSE response) {
837
838     uint64_t cas = ntohll(req->request.cas);
839
840     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
841     uint16_t vbucket = ntohs(req->request.vbucket);
842
843     std::string msg = "";
844     if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
845         msg = "Incorrect packet format.";
846         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
847                             msg.length(),
848                             PROTOCOL_BINARY_RAW_BYTES,
849                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
850     }
851
852     bool sync = false;
853     uint32_t bodylen = ntohl(req->request.bodylen);
854     if (bodylen > 0) {
855         const char* ptr = reinterpret_cast<const char*>(req->bytes) +
856                           sizeof(req->bytes);
857         if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
858             sync = true;
859         }
860     }
861
862     ENGINE_ERROR_CODE err;
863     void* es = e->getEngineSpecific(cookie);
864     if (sync) {
865         if (es == NULL) {
866             err = e->deleteVBucket(vbucket, cookie);
867             e->storeEngineSpecific(cookie, e);
868         } else {
869             e->storeEngineSpecific(cookie, NULL);
870             LOG(EXTENSION_LOG_INFO,
871                 "Completed sync deletion of vbucket %u",
872                 (unsigned)vbucket);
873             err = ENGINE_SUCCESS;
874         }
875     } else {
876         err = e->deleteVBucket(vbucket);
877     }
878     switch (err) {
879     case ENGINE_SUCCESS:
880         LOG(EXTENSION_LOG_NOTICE,
881             "Deletion of vbucket %d was completed.", vbucket);
882         break;
883     case ENGINE_NOT_MY_VBUCKET:
884         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
885             "because the vbucket doesn't exist!!!", vbucket);
886         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
887         break;
888     case ENGINE_EINVAL:
889         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
890             "because the vbucket is not in a dead state\n", vbucket);
891         msg = "Failed to delete vbucket.  Must be in the dead state.";
892         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
893         break;
894     case ENGINE_EWOULDBLOCK:
895         LOG(EXTENSION_LOG_NOTICE, "Request for vbucket %d deletion is in"
896                 " EWOULDBLOCK until the database file is removed from disk",
897             vbucket);
898         e->storeEngineSpecific(cookie, req);
899         return ENGINE_EWOULDBLOCK;
900     default:
901         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
902             "because of unknown reasons\n", vbucket);
903         msg = "Failed to delete vbucket.  Unknown reason.";
904         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
905     }
906
907     if (err != ENGINE_NOT_MY_VBUCKET) {
908         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
909                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
910                             res, cas, cookie);
911     } else {
912         return e->sendNotMyVBucketResponse(response, cookie, cas);
913     }
914 }
915
916 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine* e,
917                                        protocol_binary_request_header* request,
918                                        const void* cookie,
919                                        Item** it,
920                                        const char** msg,
921                                        protocol_binary_response_status* res,
922                                        DocNamespace docNamespace) {
923     KVBucketIface* kvb = e->getKVBucket();
924     protocol_binary_request_no_extras* req =
925         (protocol_binary_request_no_extras*)request;
926     int keylen = ntohs(req->message.header.request.keylen);
927     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
928     ENGINE_ERROR_CODE error_code;
929     DocKey key(reinterpret_cast<const uint8_t*>(request) + sizeof(*request),
930                keylen, docNamespace);
931
932     GetValue rv(kvb->getReplica(key, vbucket, cookie));
933
934     if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
935         if (error_code == ENGINE_NOT_MY_VBUCKET) {
936             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
937             return error_code;
938         } else if (error_code == ENGINE_TMPFAIL) {
939             *msg = "NOT_FOUND";
940             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
941         } else {
942             return error_code;
943         }
944     } else {
945         *it = rv.getValue();
946         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
947     }
948     ++(e->getEpStats().numOpsGet);
949     return ENGINE_SUCCESS;
950 }
951
952 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine* e,
953                                    const void* cookie,
954                                    protocol_binary_request_compact_db* req,
955                                    ADD_RESPONSE response) {
956
957     std::string msg = "";
958     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
959     compaction_ctx compactreq;
960     uint64_t cas = ntohll(req->message.header.request.cas);
961
962     if (ntohs(req->message.header.request.keylen) > 0 ||
963         req->message.header.request.extlen != 24) {
964         LOG(EXTENSION_LOG_WARNING,
965             "Compaction received bad ext/key len %d/%d.",
966             req->message.header.request.extlen,
967             ntohs(req->message.header.request.keylen));
968         msg = "Incorrect packet format.";
969         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
970                             msg.length(),
971                             PROTOCOL_BINARY_RAW_BYTES,
972                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
973     }
974     EPStats& stats = e->getEpStats();
975     compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
976     compactreq.purge_before_seq =
977         ntohll(req->message.body.purge_before_seq);
978     compactreq.drop_deletes = req->message.body.drop_deletes;
979     compactreq.db_file_id = e->getKVBucket()->getDBFileId(*req);
980     uint16_t vbid = ntohs(req->message.header.request.vbucket);
981
982     ENGINE_ERROR_CODE err;
983     void* es = e->getEngineSpecific(cookie);
984     if (es == NULL) {
985         ++stats.pendingCompactions;
986         e->storeEngineSpecific(cookie, e);
987         err = e->compactDB(vbid, compactreq, cookie);
988     } else {
989         e->storeEngineSpecific(cookie, NULL);
990         err = ENGINE_SUCCESS;
991     }
992
993     switch (err) {
994     case ENGINE_SUCCESS:
995         LOG(EXTENSION_LOG_NOTICE,
996             "Compaction of db file id: %d completed.", compactreq.db_file_id);
997         break;
998     case ENGINE_NOT_MY_VBUCKET:
999         --stats.pendingCompactions;
1000         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1001             "because the db file doesn't exist!!!", compactreq.db_file_id);
1002         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1003         break;
1004     case ENGINE_EINVAL:
1005         --stats.pendingCompactions;
1006         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1007             "because of an invalid argument", compactreq.db_file_id);
1008         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
1009         break;
1010     case ENGINE_EWOULDBLOCK:
1011         LOG(EXTENSION_LOG_NOTICE,
1012             "Compaction of db file id: %d scheduled "
1013                 "(awaiting completion).", compactreq.db_file_id);
1014         e->storeEngineSpecific(cookie, req);
1015         return ENGINE_EWOULDBLOCK;
1016     case ENGINE_TMPFAIL:
1017         LOG(EXTENSION_LOG_WARNING, "Request to compact db file id: %d hit"
1018                 " a temporary failure and may need to be retried",
1019             compactreq.db_file_id);
1020         msg = "Temporary failure in compacting db file.";
1021         res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1022         break;
1023     default:
1024         --stats.pendingCompactions;
1025         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1026             "because of unknown reasons\n", compactreq.db_file_id);
1027         msg = "Failed to compact db file.  Unknown reason.";
1028         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1029         break;
1030     }
1031
1032     if (err != ENGINE_NOT_MY_VBUCKET) {
1033         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1034                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1035                             res, cas, cookie);
1036     } else {
1037         return e->sendNotMyVBucketResponse(response, cookie, cas);
1038     }
1039 }
1040
1041 static ENGINE_ERROR_CODE processUnknownCommand(
1042     EventuallyPersistentEngine* h,
1043     const void* cookie,
1044     protocol_binary_request_header* request,
1045     ADD_RESPONSE response,
1046     DocNamespace docNamespace) {
1047     protocol_binary_response_status res =
1048         PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1049     std::string dynamic_msg;
1050     const char* msg = NULL;
1051     size_t msg_size = 0;
1052     Item* itm = NULL;
1053
1054     EPStats& stats = h->getEpStats();
1055     ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1056
1057     /**
1058      * Session validation
1059      * (For ns_server commands only)
1060      */
1061     switch (request->request.opcode) {
1062     case PROTOCOL_BINARY_CMD_SET_PARAM:
1063     case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1064     case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1065     case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1066     case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1067     case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1068     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1069         if (h->getEngineSpecific(cookie) == NULL) {
1070             uint64_t cas = ntohll(request->request.cas);
1071             if (!h->validateSessionCas(cas)) {
1072                 const std::string message("Invalid session token");
1073                 return sendResponse(response, NULL, 0, NULL, 0,
1074                                     message.c_str(), message.length(),
1075                                     PROTOCOL_BINARY_RAW_BYTES,
1076                                     PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1077                                     cas, cookie);
1078             }
1079         }
1080         break;
1081     }
1082     default:
1083         break;
1084     }
1085
1086     switch (request->request.opcode) {
1087     case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1088         return h->getAllVBucketSequenceNumbers(cookie, request, response);
1089
1090     case PROTOCOL_BINARY_CMD_GET_VBUCKET: {
1091         BlockTimer timer(&stats.getVbucketCmdHisto);
1092         rv = getVBucket(h, cookie, request, response);
1093         return rv;
1094     }
1095     case PROTOCOL_BINARY_CMD_DEL_VBUCKET: {
1096         BlockTimer timer(&stats.delVbucketCmdHisto);
1097         rv = delVBucket(h, cookie, request, response);
1098         if (rv != ENGINE_EWOULDBLOCK) {
1099             h->decrementSessionCtr();
1100             h->storeEngineSpecific(cookie, NULL);
1101         }
1102         return rv;
1103     }
1104     case PROTOCOL_BINARY_CMD_SET_VBUCKET: {
1105         BlockTimer timer(&stats.setVbucketCmdHisto);
1106         rv = setVBucket(h, cookie, request, response);
1107         h->decrementSessionCtr();
1108         return rv;
1109     }
1110     case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1111         res = h->stopFlusher(&msg, &msg_size);
1112         break;
1113     case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1114         res = h->startFlusher(&msg, &msg_size);
1115         break;
1116     case PROTOCOL_BINARY_CMD_SET_PARAM:
1117         res = h->setParam(
1118                 reinterpret_cast<protocol_binary_request_set_param*>(request),
1119                 dynamic_msg);
1120         msg = dynamic_msg.c_str();
1121         msg_size = dynamic_msg.length();
1122         h->decrementSessionCtr();
1123         break;
1124     case PROTOCOL_BINARY_CMD_EVICT_KEY:
1125         res = evictKey(h, request, &msg, &msg_size, docNamespace);
1126         break;
1127     case PROTOCOL_BINARY_CMD_OBSERVE:
1128         return h->observe(cookie, request, response, docNamespace);
1129     case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1130         return h->observe_seqno(cookie, request, response);
1131     case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT: {
1132         rv = h->deregisterTapClient(cookie, request, response);
1133         h->decrementSessionCtr();
1134         return rv;
1135     }
1136     case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN: {
1137         rv = h->resetReplicationChain(cookie, request, response);
1138         return rv;
1139     }
1140     case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER: {
1141         rv = h->changeTapVBFilter(cookie, request, response);
1142         h->decrementSessionCtr();
1143         return rv;
1144     }
1145     case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1146     case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1147     case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE: {
1148         rv = h->handleCheckpointCmds(cookie, request, response);
1149         return rv;
1150     }
1151     case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE: {
1152         rv = h->handleSeqnoCmds(cookie, request, response);
1153         return rv;
1154     }
1155     case PROTOCOL_BINARY_CMD_GET_META:
1156     case PROTOCOL_BINARY_CMD_GETQ_META: {
1157         rv = h->getMeta(cookie,
1158                         reinterpret_cast<protocol_binary_request_get_meta*>
1159                         (request), response,
1160                         docNamespace);
1161         return rv;
1162     }
1163     case PROTOCOL_BINARY_CMD_SET_WITH_META:
1164     case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1165     case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1166     case PROTOCOL_BINARY_CMD_ADDQ_WITH_META: {
1167         rv = h->setWithMeta(cookie,
1168                             reinterpret_cast<protocol_binary_request_set_with_meta*>
1169                             (request), response,
1170                             docNamespace);
1171         return rv;
1172     }
1173     case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1174     case PROTOCOL_BINARY_CMD_DELQ_WITH_META: {
1175         rv = h->deleteWithMeta(cookie,
1176                                reinterpret_cast<protocol_binary_request_delete_with_meta*>
1177                                (request), response,
1178                                docNamespace);
1179         return rv;
1180     }
1181     case PROTOCOL_BINARY_CMD_RETURN_META: {
1182         return h->returnMeta(cookie,
1183                              reinterpret_cast<protocol_binary_request_return_meta*>
1184                              (request), response,
1185                              docNamespace);
1186     }
1187     case PROTOCOL_BINARY_CMD_GET_REPLICA:
1188         rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res, docNamespace);
1189         if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1190             return rv;
1191         }
1192         break;
1193     case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1194     case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC: {
1195         rv = h->handleTrafficControlCmd(cookie, request, response);
1196         return rv;
1197     }
1198     case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG: {
1199         rv = h->setClusterConfig(cookie,
1200                                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1201                                  (request), response);
1202         h->decrementSessionCtr();
1203         return rv;
1204     }
1205     case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1206         return h->getClusterConfig(cookie,
1207                                    reinterpret_cast<protocol_binary_request_get_cluster_config*>
1208                                    (request), response);
1209     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1210         rv = compactDB(h, cookie,
1211                        (protocol_binary_request_compact_db*)(request),
1212                        response);
1213         if (rv != ENGINE_EWOULDBLOCK) {
1214             h->decrementSessionCtr();
1215             h->storeEngineSpecific(cookie, NULL);
1216         }
1217         return rv;
1218     }
1219     case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY: {
1220         if (request->request.extlen != 0 ||
1221             request->request.keylen != 0 ||
1222             request->request.bodylen != 0) {
1223             return ENGINE_EINVAL;
1224         }
1225         return h->getRandomKey(cookie, response);
1226     }
1227     case PROTOCOL_BINARY_CMD_GET_KEYS: {
1228         return h->getAllKeys(cookie,
1229                              reinterpret_cast<protocol_binary_request_get_keys*>
1230                              (request), response,
1231                              docNamespace);
1232     }
1233         // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
1234     case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1235     case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE: {
1236         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
1237                             PROTOCOL_BINARY_RAW_BYTES,
1238                             PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
1239                             cookie);
1240     }
1241     }
1242
1243     if (itm) {
1244         uint32_t flags = itm->getFlags();
1245         rv = sendResponse(response,
1246                           static_cast<const void*>(itm->getKey().data()),
1247                           itm->getKey().size(),
1248                           (const void*)&flags, sizeof(uint32_t),
1249                           static_cast<const void*>(itm->getData()),
1250                           itm->getNBytes(), itm->getDataType(),
1251                           static_cast<uint16_t>(res), itm->getCas(),
1252                           cookie);
1253         delete itm;
1254     } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1255         return h->sendNotMyVBucketResponse(response, cookie, 0);
1256     } else {
1257         msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1258         rv = sendResponse(response, NULL, 0, NULL, 0,
1259                           msg, static_cast<uint16_t>(msg_size),
1260                           PROTOCOL_BINARY_RAW_BYTES,
1261                           static_cast<uint16_t>(res), 0, cookie);
1262
1263     }
1264     return rv;
1265 }
1266
1267 static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1268                                            const void* cookie,
1269                                            protocol_binary_request_header
1270                                            * request,
1271                                            ADD_RESPONSE response,
1272                                            DocNamespace doc_namespace) {
1273     auto engine = acquireEngine(handle);
1274     auto ret = processUnknownCommand(
1275             engine.get(), cookie, request, response, doc_namespace);
1276     return ret;
1277 }
1278
1279 static void EvpItemSetCas(ENGINE_HANDLE*, const void*,
1280                           item* itm, uint64_t cas) {
1281     static_cast<Item*>(itm)->setCas(cas);
1282 }
1283
1284 static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1285                                       const void* cookie,
1286                                       void* engine_specific,
1287                                       uint16_t nengine,
1288                                       uint8_t ttl,
1289                                       uint16_t tap_flags,
1290                                       tap_event_t tap_event,
1291                                       uint32_t tap_seqno,
1292                                       const void* key,
1293                                       size_t nkey,
1294                                       uint32_t flags,
1295                                       uint32_t exptime,
1296                                       uint64_t cas,
1297                                       uint8_t datatype,
1298                                       const void* data,
1299                                       size_t ndata,
1300                                       uint16_t vbucket) {
1301     if (!mcbp::datatype::is_valid(datatype)) {
1302         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1303             " (TapNotify)");
1304         return ENGINE_EINVAL;
1305     }
1306
1307     return acquireEngine(handle)->tapNotify(cookie,
1308                                             engine_specific,
1309                                             nengine,
1310                                             ttl,
1311                                             tap_flags,
1312                                             (uint16_t)tap_event,
1313                                             tap_seqno,
1314                                             key,
1315                                             nkey,
1316                                             flags,
1317                                             exptime,
1318                                             cas,
1319                                             datatype,
1320                                             data,
1321                                             ndata,
1322                                             vbucket);
1323 }
1324
1325 static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1326                                   const void* cookie, item** itm,
1327                                   void** es, uint16_t* nes, uint8_t* ttl,
1328                                   uint16_t* flags, uint32_t* seqno,
1329                                   uint16_t* vbucket) {
1330     uint16_t tap_event = acquireEngine(handle)->walkTapQueue(
1331             cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1332     return static_cast<tap_event_t>(tap_event);
1333 }
1334
1335 static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1336                                       const void* cookie,
1337                                       const void* client,
1338                                       size_t nclient,
1339                                       uint32_t flags,
1340                                       const void* userdata,
1341                                       size_t nuserdata) {
1342     auto engine = acquireEngine(handle);
1343     TAP_ITERATOR iterator = NULL;
1344     {
1345         std::string c(static_cast<const char*>(client), nclient);
1346         // Figure out what we want from the userdata before adding it to
1347         // the API to the handle
1348         if (engine->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1349             iterator = EvpTapIterator;
1350         }
1351     }
1352     return iterator;
1353 }
1354
1355
1356 static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1357                                     const void* cookie,
1358                                     struct dcp_message_producers* producers) {
1359     auto engine = acquireEngine(handle);
1360     ConnHandler* conn = engine->getConnHandler(cookie);
1361     if (conn) {
1362         return conn->step(producers);
1363     }
1364     return ENGINE_DISCONNECT;
1365 }
1366
1367
1368 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1369                                     const void* cookie,
1370                                     uint32_t opaque,
1371                                     uint32_t seqno,
1372                                     uint32_t flags,
1373                                     void* name,
1374                                     uint16_t nname) {
1375     return acquireEngine(handle)->dcpOpen(
1376             cookie, opaque, seqno, flags, name, nname);
1377 }
1378
1379 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1380                                          const void* cookie,
1381                                          uint32_t opaque,
1382                                          uint16_t vbucket,
1383                                          uint32_t flags) {
1384     return acquireEngine(handle)->dcpAddStream(cookie, opaque, vbucket, flags);
1385 }
1386
1387 static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1388                                            const void* cookie,
1389                                            uint32_t opaque,
1390                                            uint16_t vbucket) {
1391     auto engine = acquireEngine(handle);
1392     ConnHandler* conn = engine->getConnHandler(cookie);
1393     if (conn) {
1394         return conn->closeStream(opaque, vbucket);
1395     }
1396     return ENGINE_DISCONNECT;
1397 }
1398
1399
1400 static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1401                                          const void* cookie,
1402                                          uint32_t flags,
1403                                          uint32_t opaque,
1404                                          uint16_t vbucket,
1405                                          uint64_t startSeqno,
1406                                          uint64_t endSeqno,
1407                                          uint64_t vbucketUuid,
1408                                          uint64_t snapStartSeqno,
1409                                          uint64_t snapEndSeqno,
1410                                          uint64_t* rollbackSeqno,
1411                                          dcp_add_failover_log callback) {
1412     auto engine = acquireEngine(handle);
1413     ConnHandler* conn = engine->getConnHandler(cookie);
1414     if (conn) {
1415         return conn->streamRequest(flags,
1416                                    opaque,
1417                                    vbucket,
1418                                    startSeqno,
1419                                    endSeqno,
1420                                    vbucketUuid,
1421                                    snapStartSeqno,
1422                                    snapEndSeqno,
1423                                    rollbackSeqno,
1424                                    callback);
1425     }
1426     return ENGINE_DISCONNECT;
1427 }
1428
1429 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1430                                               const void* cookie,
1431                                               uint32_t opaque,
1432                                               uint16_t vbucket,
1433                                               dcp_add_failover_log callback) {
1434     auto engine = acquireEngine(handle);
1435     ConnHandler* conn = engine->getConnHandler(cookie);
1436     if (conn) {
1437         return conn->getFailoverLog(opaque, vbucket, callback);
1438     }
1439     return ENGINE_DISCONNECT;
1440 }
1441
1442
1443 static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1444                                          const void* cookie,
1445                                          uint32_t opaque,
1446                                          uint16_t vbucket,
1447                                          uint32_t flags) {
1448     auto engine = acquireEngine(handle);
1449     ConnHandler* conn = engine->getConnHandler(cookie);
1450     if (conn) {
1451         return conn->streamEnd(opaque, vbucket, flags);
1452     }
1453     return ENGINE_DISCONNECT;
1454 }
1455
1456
1457 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1458                                               const void* cookie,
1459                                               uint32_t opaque,
1460                                               uint16_t vbucket,
1461                                               uint64_t start_seqno,
1462                                               uint64_t end_seqno,
1463                                               uint32_t flags) {
1464     auto engine = acquireEngine(handle);
1465     ConnHandler* conn = engine->getConnHandler(cookie);
1466     if (conn) {
1467         return conn->snapshotMarker(
1468                 opaque, vbucket, start_seqno, end_seqno, flags);
1469     }
1470     return ENGINE_DISCONNECT;
1471 }
1472
1473 static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1474                                         const void* cookie,
1475                                         uint32_t opaque,
1476                                         const DocKey& key,
1477                                         cb::const_byte_buffer value,
1478                                         size_t priv_bytes,
1479                                         uint8_t datatype,
1480                                         uint64_t cas,
1481                                         uint16_t vbucket,
1482                                         uint32_t flags,
1483                                         uint64_t by_seqno,
1484                                         uint64_t rev_seqno,
1485                                         uint32_t expiration,
1486                                         uint32_t lock_time,
1487                                         cb::const_byte_buffer meta,
1488                                         uint8_t nru) {
1489     if (!mcbp::datatype::is_valid(datatype)) {
1490         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1491             " (DCPMutation)");
1492         return ENGINE_EINVAL;
1493     }
1494     auto engine = acquireEngine(handle);
1495     ConnHandler* conn = engine->getConnHandler(cookie);
1496     if (conn) {
1497         return conn->mutation(opaque, key, value, priv_bytes, datatype, cas,
1498                               vbucket, flags, by_seqno, rev_seqno, expiration,
1499                               lock_time, meta, nru);
1500     }
1501     return ENGINE_DISCONNECT;
1502 }
1503
1504 static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1505                                         const void* cookie,
1506                                         uint32_t opaque,
1507                                         const DocKey& key,
1508                                         cb::const_byte_buffer value,
1509                                         size_t priv_bytes,
1510                                         uint8_t datatype,
1511                                         uint64_t cas,
1512                                         uint16_t vbucket,
1513                                         uint64_t by_seqno,
1514                                         uint64_t rev_seqno,
1515                                         cb::const_byte_buffer meta) {
1516     auto engine = acquireEngine(handle);
1517     ConnHandler* conn = engine->getConnHandler(cookie);
1518     if (conn) {
1519         return conn->deletion(opaque, key, value, priv_bytes, datatype, cas,
1520                               vbucket, by_seqno, rev_seqno, meta);
1521     }
1522     return ENGINE_DISCONNECT;
1523 }
1524
1525 static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1526                                           const void* cookie,
1527                                           uint32_t opaque,
1528                                           const DocKey& key,
1529                                           cb::const_byte_buffer value,
1530                                           size_t priv_bytes,
1531                                           uint8_t datatype,
1532                                           uint64_t cas,
1533                                           uint16_t vbucket,
1534                                           uint64_t by_seqno,
1535                                           uint64_t rev_seqno,
1536                                           cb::const_byte_buffer meta) {
1537     auto engine = acquireEngine(handle);
1538     ConnHandler* conn = engine->getConnHandler(cookie);
1539     if (conn) {
1540         return conn->expiration(opaque, key, value, priv_bytes, datatype, cas,
1541                                 vbucket, by_seqno, rev_seqno, meta);
1542     }
1543     return ENGINE_DISCONNECT;
1544 }
1545
1546 static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1547                                      const void* cookie,
1548                                      uint32_t opaque,
1549                                      uint16_t vbucket) {
1550     auto engine = acquireEngine(handle);
1551     ConnHandler* conn = engine->getConnHandler(cookie);
1552     if (conn) {
1553         return conn->flushall(opaque, vbucket);
1554     }
1555     return ENGINE_DISCONNECT;
1556 }
1557
1558 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1559                                                const void* cookie,
1560                                                uint32_t opaque,
1561                                                uint16_t vbucket,
1562                                                vbucket_state_t state) {
1563     auto engine = acquireEngine(handle);
1564     ConnHandler* conn = engine->getConnHandler(cookie);
1565     if (conn) {
1566         return conn->setVBucketState(opaque, vbucket, state);
1567     }
1568     return ENGINE_DISCONNECT;
1569 }
1570
1571 static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1572                                     const void* cookie,
1573                                     uint32_t opaque) {
1574     auto engine = acquireEngine(handle);
1575     ConnHandler* conn = engine->getConnHandler(cookie);
1576     if (conn) {
1577         return conn->noop(opaque);
1578     }
1579     return ENGINE_DISCONNECT;
1580 }
1581
1582 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1583                                                      const void* cookie,
1584                                                      uint32_t opaque,
1585                                                      uint16_t vbucket,
1586                                                      uint32_t buffer_bytes) {
1587     auto engine = acquireEngine(handle);
1588     ConnHandler* conn = engine->getConnHandler(cookie);
1589     if (conn) {
1590         return conn->bufferAcknowledgement(opaque, vbucket, buffer_bytes);
1591     }
1592     return ENGINE_DISCONNECT;
1593 }
1594
1595 static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1596                                        const void* cookie,
1597                                        uint32_t opaque,
1598                                        const void* key,
1599                                        uint16_t nkey,
1600                                        const void* value,
1601                                        uint32_t nvalue) {
1602     auto engine = acquireEngine(handle);
1603     ConnHandler* conn = engine->getConnHandler(cookie);
1604     if (conn) {
1605         return conn->control(opaque, key, nkey, value, nvalue);
1606     }
1607     return ENGINE_DISCONNECT;
1608 }
1609
1610 static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1611                                                const void* cookie,
1612                                                protocol_binary_response_header* response) {
1613     auto engine = acquireEngine(handle);
1614     ConnHandler* conn = engine->getConnHandler(cookie);
1615     if (conn) {
1616         if (conn->handleResponse(response)) {
1617             return ENGINE_SUCCESS;
1618         }
1619     }
1620     return ENGINE_DISCONNECT;
1621 }
1622
1623 static void EvpHandleDisconnect(const void* cookie,
1624                                 ENGINE_EVENT_TYPE type,
1625                                 const void* event_data,
1626                                 const void* cb_data) {
1627     if (type != ON_DISCONNECT) {
1628         throw std::invalid_argument("EvpHandleDisconnect: type "
1629                                         "(which is" + std::to_string(type) +
1630                                     ") is not ON_DISCONNECT");
1631     }
1632     if (event_data != nullptr) {
1633         throw std::invalid_argument("EvpHandleDisconnect: event_data "
1634                                         "is not NULL");
1635     }
1636     void* c = const_cast<void*>(cb_data);
1637     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1638 }
1639
1640 static void EvpHandleDeleteBucket(const void* cookie,
1641                                   ENGINE_EVENT_TYPE type,
1642                                   const void* event_data,
1643                                   const void* cb_data) {
1644     if (type != ON_DELETE_BUCKET) {
1645         throw std::invalid_argument("EvpHandleDeleteBucket: type "
1646                                         "(which is" + std::to_string(type) +
1647                                     ") is not ON_DELETE_BUCKET");
1648     }
1649     if (event_data != nullptr) {
1650         throw std::invalid_argument("EvpHandleDeleteBucket: event_data "
1651                                         "is not NULL");
1652     }
1653     void* c = const_cast<void*>(cb_data);
1654     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDeleteBucket(cookie);
1655 }
1656
1657 void EvpSetLogLevel(ENGINE_HANDLE* handle, EXTENSION_LOG_LEVEL level) {
1658     Logger::setGlobalLogLevel(level);
1659 }
1660
1661 /**
1662  * The only public interface to the eventually persistent engine.
1663  * Allocate a new instance and initialize it
1664  * @param interface the highest interface the server supports (we only
1665  *                  support interface 1)
1666  * @param get_server_api callback function to get the server exported API
1667  *                  functions
1668  * @param handle Where to return the new instance
1669  * @return ENGINE_SUCCESS on success
1670  */
1671 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1672                                   GET_SERVER_API get_server_api,
1673                                   ENGINE_HANDLE** handle) {
1674     SERVER_HANDLE_V1* api = get_server_api();
1675     if (interface != 1 || api == NULL) {
1676         return ENGINE_ENOTSUP;
1677     }
1678
1679     Logger::setLoggerAPI(api->log);
1680
1681     MemoryTracker::getInstance(*api->alloc_hooks);
1682     ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1683
1684     std::atomic<size_t>* inital_tracking = new std::atomic<size_t>();
1685
1686     ObjectRegistry::setStats(inital_tracking);
1687     EventuallyPersistentEngine* engine;
1688     engine = new EventuallyPersistentEngine(get_server_api);
1689     ObjectRegistry::setStats(NULL);
1690
1691     if (engine == NULL) {
1692         return ENGINE_ENOMEM;
1693     }
1694
1695     if (MemoryTracker::trackingMemoryAllocations()) {
1696         engine->getEpStats().memoryTrackerEnabled.store(true);
1697         engine->getEpStats().totalMemory->store(inital_tracking->load());
1698     }
1699     delete inital_tracking;
1700
1701     initialize_time_functions(api->core);
1702
1703     *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1704
1705     return ENGINE_SUCCESS;
1706 }
1707
1708 /*
1709     This method is called prior to unloading of the shared-object.
1710     Global clean-up should be performed from this method.
1711 */
1712 void destroy_engine() {
1713     ExecutorPool::shutdown();
1714     // A single MemoryTracker exists for *all* buckets
1715     // and must be destroyed before unloading the shared object.
1716     MemoryTracker::destroyInstance();
1717     ObjectRegistry::reset();
1718 }
1719
1720 static bool EvpGetItemInfo(ENGINE_HANDLE* handle, const void*,
1721                            const item* itm, item_info* itm_info) {
1722     const Item* it = reinterpret_cast<const Item*>(itm);
1723     auto engine = acquireEngine(handle);
1724     VBucketPtr vb = engine->getKVBucket()->getVBucket(it->getVBucketId());
1725     uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
1726     *itm_info = it->toItemInfo(vb_uuid);
1727     return true;
1728 }
1729
1730 static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1731                            item* itm, const item_info* itm_info) {
1732     Item* it = reinterpret_cast<Item*>(itm);
1733     if (!it) {
1734         return false;
1735     }
1736     it->setDataType(itm_info->datatype);
1737     return true;
1738 }
1739
1740 static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1741                                              const void* cookie,
1742                                              engine_get_vb_map_cb callback) {
1743     auto engine = acquireEngine(handle);
1744     LockHolder lh(engine->clusterConfig.lock);
1745     const char* config = engine->clusterConfig.config.data();
1746     uint32_t len = engine->clusterConfig.config.size();
1747     engine.reset(); // Want to release the engine before the callback
1748     return callback(cookie, config, len);
1749 }
1750
1751 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1752     va_list va;
1753     va_start(va, fmt);
1754     global_logger.vlog(severity, fmt, va);
1755     va_end(va);
1756 }
1757
1758 EventuallyPersistentEngine::EventuallyPersistentEngine(
1759         GET_SERVER_API get_server_api)
1760     : clusterConfig(),
1761       kvBucket(nullptr),
1762       workload(NULL),
1763       workloadPriority(NO_BUCKET_PRIORITY),
1764       replicationThrottle(NULL),
1765       getServerApiFunc(get_server_api),
1766       dcpConnMap_(NULL),
1767       dcpFlowControlManager_(NULL),
1768       tapConnMap(NULL),
1769       tapConfig(NULL),
1770       checkpointConfig(NULL),
1771       trafficEnabled(false),
1772       deleteAllEnabled(false),
1773       startupTime(0),
1774       taskable(this) {
1775     interface.interface = 1;
1776     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1777     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1778     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1779     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1780     ENGINE_HANDLE_V1::allocate_ex = EvpItemAllocateEx;
1781     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1782     ENGINE_HANDLE_V1::release = EvpItemRelease;
1783     ENGINE_HANDLE_V1::get = EvpGet;
1784     ENGINE_HANDLE_V1::get_if = EvpGetIf;
1785     ENGINE_HANDLE_V1::get_and_touch = EvpGetAndTouch;
1786     ENGINE_HANDLE_V1::get_locked = EvpGetLocked;
1787     ENGINE_HANDLE_V1::unlock = EvpUnlock;
1788     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1789     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1790     ENGINE_HANDLE_V1::store = EvpStore;
1791     ENGINE_HANDLE_V1::flush = EvpFlush;
1792     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1793     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1794     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1795     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1796     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1797     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1798     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1799
1800     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1801     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1802     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1803     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1804     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1805     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1806     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1807     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1808     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1809     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1810     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1811     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1812     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1813     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1814     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1815     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1816     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1817     ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
1818
1819     serverApi = getServerApiFunc();
1820     memset(&info, 0, sizeof(info));
1821     info.info.description = "EP engine v" VERSION;
1822     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1823     info.info.features[info.info.num_features++].feature =
1824                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1825     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1826     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1827 }
1828
1829 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1830 {
1831     EventuallyPersistentEngine *epe =
1832                                     ObjectRegistry::onSwitchThread(NULL, true);
1833     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1834     ObjectRegistry::onSwitchThread(epe);
1835     return rv;
1836 }
1837
1838 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1839 {
1840     EventuallyPersistentEngine *epe =
1841                                     ObjectRegistry::onSwitchThread(NULL, true);
1842     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1843     ObjectRegistry::onSwitchThread(epe);
1844     return rv;
1845 }
1846
1847 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1848                                                         EVENT_CALLBACK cb,
1849                                                         const void *cb_data) {
1850     EventuallyPersistentEngine *epe =
1851                                     ObjectRegistry::onSwitchThread(NULL, true);
1852     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1853     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1854                             type, cb, cb_data);
1855     ObjectRegistry::onSwitchThread(epe);
1856 }
1857
1858 /**
1859  * A configuration value changed listener that responds to ep-engine
1860  * parameter changes by invoking engine-specific methods on
1861  * configuration change events.
1862  */
1863 class EpEngineValueChangeListener : public ValueChangedListener {
1864 public:
1865     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1866         // EMPTY
1867     }
1868
1869     virtual void sizeValueChanged(const std::string &key, size_t value) {
1870         if (key.compare("getl_max_timeout") == 0) {
1871             engine.setGetlMaxTimeout(value);
1872         } else if (key.compare("getl_default_timeout") == 0) {
1873             engine.setGetlDefaultTimeout(value);
1874         } else if (key.compare("max_item_size") == 0) {
1875             engine.setMaxItemSize(value);
1876         } else if (key.compare("max_item_privileged_bytes") == 0) {
1877             engine.setMaxItemPrivilegedBytes(value);
1878         } else if (key.compare("mem_merge_count_threshold") == 0) {
1879             engine.stats.mem_merge_count_threshold = value;
1880         } else if (key.compare("mem_merge_bytes_threshold") == 0) {
1881             engine.stats.mem_merge_bytes_threshold = value;
1882         }
1883     }
1884
1885     virtual void booleanValueChanged(const std::string &key, bool value) {
1886         if (key.compare("flushall_enabled") == 0) {
1887             engine.setDeleteAll(value);
1888         }
1889     }
1890 private:
1891     EventuallyPersistentEngine &engine;
1892 };
1893
1894
1895
1896 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1897     resetStats();
1898     if (config != NULL) {
1899         LOG(EXTENSION_LOG_NOTICE, "EPEngine::initialize: parsing config:\"%s\"",
1900             config);
1901         if (!configuration.parseConfiguration(config, serverApi)) {
1902             LOG(EXTENSION_LOG_WARNING, "Failed to parse the configuration config "
1903                 "during bucket initialization.  config=%s", config);
1904             return ENGINE_FAILED;
1905         }
1906     }
1907
1908     name = configuration.getCouchBucket();
1909     maxFailoverEntries = configuration.getMaxFailoverEntries();
1910
1911     // Start updating the variables from the config!
1912     HashTable::setDefaultNumBuckets(configuration.getHtSize());
1913     HashTable::setDefaultNumLocks(configuration.getHtLocks());
1914     StoredValue::setMutationMemoryThreshold(
1915                                       configuration.getMutationMemThreshold());
1916
1917     if (configuration.getMaxSize() == 0) {
1918         configuration.setMaxSize(std::numeric_limits<size_t>::max());
1919     }
1920
1921     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1922         stats.mem_low_wat_percent.store(0.75);
1923         configuration.setMemLowWat(percentOf(
1924                 configuration.getMaxSize(), stats.mem_low_wat_percent.load()));
1925     }
1926
1927     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1928         stats.mem_high_wat_percent.store(0.85);
1929         configuration.setMemHighWat(percentOf(
1930                 configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
1931     }
1932
1933     stats.mem_merge_count_threshold = configuration.getMemMergeCountThreshold();
1934     configuration.addValueChangedListener(
1935             "mem_merge_count_threshold",
1936             new EpEngineValueChangeListener(*this));
1937
1938     stats.mem_merge_bytes_threshold = configuration.getMemMergeBytesThreshold();
1939     configuration.addValueChangedListener(
1940             "mem_merge_bytes_threshold",
1941             new EpEngineValueChangeListener(*this));
1942
1943     maxItemSize = configuration.getMaxItemSize();
1944     configuration.addValueChangedListener("max_item_size",
1945                                        new EpEngineValueChangeListener(*this));
1946
1947     maxItemPrivilegedBytes = configuration.getMaxItemPrivilegedBytes();
1948     configuration.addValueChangedListener(
1949             "max_item_privileged_bytes",
1950             new EpEngineValueChangeListener(*this));
1951
1952     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1953     configuration.addValueChangedListener("getl_default_timeout",
1954                                        new EpEngineValueChangeListener(*this));
1955     getlMaxTimeout = configuration.getGetlMaxTimeout();
1956     configuration.addValueChangedListener("getl_max_timeout",
1957                                        new EpEngineValueChangeListener(*this));
1958
1959     deleteAllEnabled = configuration.isFlushallEnabled();
1960     configuration.addValueChangedListener("flushall_enabled",
1961                                        new EpEngineValueChangeListener(*this));
1962
1963     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
1964                                   configuration.getMaxNumShards());
1965     if ((unsigned int)workload->getNumShards() >
1966                                               configuration.getMaxVbuckets()) {
1967         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
1968             "equal or less than max number of vbuckets");
1969         return ENGINE_FAILED;
1970     }
1971
1972     dcpConnMap_ = new DcpConnMap(*this);
1973
1974     /* Get the flow control policy */
1975     std::string flowCtlPolicy = configuration.getDcpFlowControlPolicy();
1976
1977     if (!flowCtlPolicy.compare("static")) {
1978         dcpFlowControlManager_ = new DcpFlowControlManagerStatic(*this);
1979     } else if (!flowCtlPolicy.compare("dynamic")) {
1980         dcpFlowControlManager_ = new DcpFlowControlManagerDynamic(*this);
1981     } else if (!flowCtlPolicy.compare("aggressive")) {
1982         dcpFlowControlManager_ = new DcpFlowControlManagerAggressive(*this);
1983     } else {
1984         /* Flow control is not enabled */
1985         dcpFlowControlManager_ = new DcpFlowControlManager(*this);
1986     }
1987
1988     tapConnMap = new TapConnMap(*this);
1989     tapConfig = new TapConfig(*this);
1990     replicationThrottle = new ReplicationThrottle(configuration, stats);
1991     TapConfig::addConfigChangeListener(*this);
1992
1993     checkpointConfig = new CheckpointConfig(*this);
1994     CheckpointConfig::addConfigChangeListener(*this);
1995
1996     kvBucket = makeBucket(configuration);
1997
1998     initializeEngineCallbacks();
1999
2000     // Complete the initialization of the ep-store
2001     if (!kvBucket->initialize()) {
2002         return ENGINE_FAILED;
2003     }
2004
2005     if(configuration.isDataTrafficEnabled()) {
2006         enableTraffic(true);
2007     }
2008
2009     tapConnMap->initialize(TAP_CONN_NOTIFIER);
2010     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2011
2012     // record engine initialization time
2013     startupTime.store(ep_real_time());
2014
2015     LOG(EXTENSION_LOG_NOTICE,
2016         "EP Engine: Initialization of %s bucket complete",
2017         configuration.getBucketType().c_str());
2018
2019     return ENGINE_SUCCESS;
2020 }
2021
2022 void EventuallyPersistentEngine::destroy(bool force) {
2023     stats.forceShutdown = force;
2024     stats.isShutdown = true;
2025
2026     // Perform a snapshot of the stats before shutting down so we can persist
2027     // the type of shutdown (stats.forceShutdown), and consequently on the
2028     // next warmup can determine is there was a clean shutdown - see
2029     // Warmup::cleanShutdown
2030     if (kvBucket) {
2031         kvBucket->snapshotStats();
2032     }
2033     if (tapConnMap) {
2034         tapConnMap->shutdownAllConnections();
2035     }
2036     if (dcpConnMap_) {
2037         dcpConnMap_->shutdownAllConnections();
2038     }
2039 }
2040
2041 ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
2042         item** itm,
2043         const DocKey& key,
2044         const size_t nbytes,
2045         const size_t priv_nbytes,
2046         const int flags,
2047         const rel_time_t exptime,
2048         uint8_t datatype,
2049         uint16_t vbucket) {
2050     if (priv_nbytes > maxItemPrivilegedBytes) {
2051         return ENGINE_E2BIG;
2052     }
2053
2054     if ((nbytes - priv_nbytes) > maxItemSize) {
2055         return ENGINE_E2BIG;
2056     }
2057
2058     if (!hasAvailableSpace(sizeof(Item) + sizeof(Blob) + key.size() + nbytes)) {
2059         return memoryCondition();
2060     }
2061
2062     time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
2063
2064     uint8_t ext_meta[1];
2065     uint8_t ext_len = EXT_META_LEN;
2066     *(ext_meta) = datatype;
2067     *itm = new Item(key,
2068                     flags,
2069                     expiretime,
2070                     nullptr,
2071                     nbytes,
2072                     ext_meta,
2073                     ext_len,
2074                     0 /*cas*/,
2075                     -1 /*seq*/,
2076                     vbucket);
2077     if (*itm == NULL) {
2078         return memoryCondition();
2079     } else {
2080         stats.itemAllocSizeHisto.add(nbytes);
2081         return ENGINE_SUCCESS;
2082     }
2083 }
2084
2085 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
2086     if (!deleteAllEnabled) {
2087         return ENGINE_ENOTSUP;
2088     }
2089
2090     if (!isDegradedMode()) {
2091         return ENGINE_TMPFAIL;
2092     }
2093
2094     /*
2095      * Supporting only a SYNC operation for bucket flush
2096      */
2097
2098     void* es = getEngineSpecific(cookie);
2099     if (es == NULL) {
2100         // Check if diskDeleteAll was false and set it to true
2101         // if yes, if the atomic variable weren't false, then
2102         // we will assume that a deleteAll has been scheduled
2103         // already and return TMPFAIL.
2104         if (kvBucket->scheduleDeleteAllTask(cookie)) {
2105             storeEngineSpecific(cookie, this);
2106             return ENGINE_EWOULDBLOCK;
2107         } else {
2108             LOG(EXTENSION_LOG_INFO,
2109                 "Tried to trigger a bucket deleteAll, but"
2110                 "there seems to be a task running already!");
2111             return ENGINE_TMPFAIL;
2112         }
2113
2114     } else {
2115         storeEngineSpecific(cookie, NULL);
2116         LOG(EXTENSION_LOG_NOTICE, "Completed bucket deleteAll operation");
2117         return ENGINE_SUCCESS;
2118     }
2119 }
2120
2121 cb::EngineErrorItemPair EventuallyPersistentEngine::get_and_touch(const void* cookie,
2122                                                            const DocKey& key,
2123                                                            uint16_t vbucket,
2124                                                            uint32_t exptime) {
2125     auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2126
2127     time_t expiry_time = exptime;
2128     if (exptime != 0) {
2129         auto* core = serverApi->core;
2130         expiry_time = core->abstime(core->realtime(exptime));
2131     }
2132     GetValue gv(kvBucket->getAndUpdateTtl(key, vbucket, cookie, expiry_time));
2133
2134     auto rv = gv.getStatus();
2135     if (rv == ENGINE_SUCCESS) {
2136         ++stats.numOpsGet;
2137         ++stats.numOpsStore;
2138         return std::make_pair(cb::engine_errc::success,
2139                               cb::unique_item_ptr{gv.getValue(),
2140                                                   cb::ItemDeleter{handle}});
2141     }
2142
2143     if (isDegradedMode()) {
2144         // Remap all some of the error codes
2145         switch (rv) {
2146         case ENGINE_KEY_EEXISTS:
2147         case ENGINE_KEY_ENOENT:
2148         case ENGINE_NOT_MY_VBUCKET:
2149             rv = ENGINE_TMPFAIL;
2150             break;
2151         default:
2152             break;
2153         }
2154     }
2155
2156     if (rv == ENGINE_KEY_EEXISTS) {
2157         rv = ENGINE_LOCKED;
2158     }
2159
2160     return std::make_pair(cb::engine_errc(rv),
2161                           cb::unique_item_ptr{nullptr,
2162                                               cb::ItemDeleter{handle}});
2163 }
2164
2165 cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
2166                                                        const DocKey& key,
2167                                                        uint16_t vbucket,
2168                                                        std::function<bool(const item_info&)>filter) {
2169
2170     auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2171
2172     // Fetch an item from the hashtable (without trying to schedule a bg-fetch
2173     // and pass it through the filter. If the filter accepts the document
2174     // based on the metadata, return the document. If the document's data
2175     // isn't resident we run another iteration in the loop and retries the
2176     // action but this time we _do_ schedule a bg-fetch.
2177     for (int ii = 0; ii < 2; ++ii) {
2178         auto options = static_cast<get_options_t>(HONOR_STATES |
2179                                                   TRACK_REFERENCE |
2180                                                   DELETE_TEMP |
2181                                                   HIDE_LOCKED_CAS |
2182                                                   ALLOW_META_ONLY);
2183         if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
2184             options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
2185         }
2186
2187         BlockTimer timer(&stats.getCmdHisto);
2188         GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2189         ENGINE_ERROR_CODE status = gv.getStatus();
2190
2191         switch (status) {
2192         case ENGINE_SUCCESS:
2193             break;
2194
2195         case ENGINE_KEY_ENOENT: // FALLTHROUGH
2196         case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
2197             if (isDegradedMode()) {
2198                 status = ENGINE_TMPFAIL;
2199             }
2200             // FALLTHROUGH
2201         default:
2202             return std::make_pair(cb::engine_errc(status),
2203                                   cb::unique_item_ptr{nullptr,
2204                                                       cb::ItemDeleter{handle}});
2205         }
2206
2207         auto* item = gv.getValue();
2208         cb::unique_item_ptr ret{item, cb::ItemDeleter{handle}};
2209
2210         const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
2211         const uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
2212
2213         // Currently
2214         if (filter(item->toItemInfo(vb_uuid))) {
2215             if (!gv.isPartial()) {
2216                 return std::make_pair(cb::engine_errc::success,
2217                                cb::unique_item_ptr{ret.release(),
2218                                                    cb::ItemDeleter{handle}});
2219             }
2220             // We want this item, but we need to fetch it off disk
2221         } else {
2222             // the client don't care about this thing..
2223             ret.reset(nullptr);
2224             return std::make_pair(cb::engine_errc::success,
2225                                   cb::unique_item_ptr{ret.release(),
2226                                                       cb::ItemDeleter{handle}});
2227         }
2228     }
2229
2230     // It should not be possible to get as the second iteration in the loop
2231     // SHOULD handle backround fetches an the item should NOT be partial!
2232     throw std::logic_error("EventuallyPersistentEngine::get_if: loop terminated");
2233 }
2234
2235 ENGINE_ERROR_CODE EventuallyPersistentEngine::get_locked(const void* cookie,
2236                                                          item** itm,
2237                                                          const DocKey& key,
2238                                                          uint16_t vbucket,
2239                                                          uint32_t lock_timeout) {
2240
2241     auto default_timeout = static_cast<uint32_t>(getGetlDefaultTimeout());
2242
2243     if (lock_timeout == 0) {
2244         lock_timeout = default_timeout;
2245     } else if (lock_timeout > static_cast<uint32_t>(getGetlMaxTimeout())) {
2246         LOG(EXTENSION_LOG_WARNING,
2247             "EventuallyPersistentEngine::get_locked: "
2248             "Illegal value for lock timeout specified %u. "
2249             "Using default value: %u", lock_timeout, default_timeout);
2250         lock_timeout = default_timeout;
2251     }
2252
2253     auto result = kvBucket->getLocked(key, vbucket, ep_current_time(),
2254                                       lock_timeout, cookie);
2255
2256     if (result.getStatus() == ENGINE_SUCCESS) {
2257         ++stats.numOpsGet;
2258         *itm = result.getValue();
2259     }
2260
2261     return result.getStatus();
2262 }
2263
2264 ENGINE_ERROR_CODE EventuallyPersistentEngine::unlock(const void* cookie,
2265                                                      const DocKey& key,
2266                                                      uint16_t vbucket,
2267                                                      uint64_t cas) {
2268     return kvBucket->unlockKey(key, vbucket, cas, ep_current_time());
2269 }
2270
2271
2272 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2273                                                     item* itm,
2274                                                     uint64_t *cas,
2275                                                     ENGINE_STORE_OPERATION
2276                                                                      operation) {
2277     BlockTimer timer(&stats.storeCmdHisto);
2278     ENGINE_ERROR_CODE ret;
2279     Item *it = static_cast<Item*>(itm);
2280
2281     switch (operation) {
2282     case OPERATION_CAS:
2283         if (it->getCas() == 0) {
2284             // Using a cas command with a cas wildcard doesn't make sense
2285             ret = ENGINE_NOT_STORED;
2286             break;
2287         }
2288         // FALLTHROUGH
2289     case OPERATION_SET:
2290         if (isDegradedMode()) {
2291             return ENGINE_TMPFAIL;
2292         }
2293         ret = kvBucket->set(*it, cookie);
2294         if (ret == ENGINE_SUCCESS) {
2295             *cas = it->getCas();
2296         }
2297
2298         break;
2299
2300     case OPERATION_ADD:
2301         if (isDegradedMode()) {
2302             return ENGINE_TMPFAIL;
2303         }
2304
2305         if (it->getCas() != 0) {
2306             // Adding an item with a cas value doesn't really make sense...
2307             return ENGINE_KEY_EEXISTS;
2308         }
2309
2310         ret = kvBucket->add(*it, cookie);
2311         if (ret == ENGINE_SUCCESS) {
2312             *cas = it->getCas();
2313         }
2314         break;
2315
2316     case OPERATION_REPLACE:
2317         ret = kvBucket->replace(*it, cookie);
2318         if (ret == ENGINE_SUCCESS) {
2319             *cas = it->getCas();
2320         }
2321         break;
2322     default:
2323         ret = ENGINE_ENOTSUP;
2324     }
2325
2326     switch (ret) {
2327     case ENGINE_SUCCESS:
2328         ++stats.numOpsStore;
2329         break;
2330     case ENGINE_ENOMEM:
2331         ret = memoryCondition();
2332         break;
2333     case ENGINE_NOT_STORED:
2334     case ENGINE_NOT_MY_VBUCKET:
2335         if (isDegradedMode()) {
2336             return ENGINE_TMPFAIL;
2337         }
2338         break;
2339     default:
2340         break;
2341     }
2342
2343     return ret;
2344 }
2345
2346 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2347                                                            item **itm,
2348                                                            void **es,
2349                                                            uint16_t *nes,
2350                                                            uint8_t *ttl,
2351                                                            uint16_t *flags,
2352                                                            uint32_t *seqno,
2353                                                            uint16_t *vbucket,
2354                                                            TapProducer
2355                                                                    *connection,
2356                                                            bool &retry) {
2357     *es = NULL;
2358     *nes = 0;
2359     *ttl = (uint8_t)-1;
2360     *seqno = 0;
2361     *flags = 0;
2362     *vbucket = 0;
2363
2364     retry = false;
2365
2366     if (connection->shouldFlush()) {
2367         return TAP_FLUSH;
2368     }
2369
2370     if (connection->isTimeForNoop()) {
2371         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2372             connection->logHeader());
2373         return TAP_NOOP;
2374     }
2375
2376     if (connection->isSuspended() || connection->windowIsFull()) {
2377         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2378             " suspended state or its ack windows is full.\n",
2379             connection->logHeader());
2380         return TAP_PAUSE;
2381     }
2382
2383     uint16_t ret = TAP_PAUSE;
2384     VBucketEvent ev = connection->nextVBucketHighPriority();
2385     if (ev.event != TAP_PAUSE) {
2386         switch (ev.event) {
2387         case TAP_VBUCKET_SET:
2388             LOG(EXTENSION_LOG_NOTICE,
2389                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2390                 connection->logHeader(), ev.vbucket,
2391                 VBucket::toString(ev.state));
2392             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2393             break;
2394         case TAP_OPAQUE:
2395             LOG(EXTENSION_LOG_NOTICE,
2396                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2397                 connection->logHeader(),
2398                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2399                 ev.vbucket);
2400             connection->opaqueCommandCode = (uint32_t) ev.state;
2401             *vbucket = ev.vbucket;
2402             *es = &connection->opaqueCommandCode;
2403             *nes = sizeof(connection->opaqueCommandCode);
2404             break;
2405
2406         default:
2407             throw std::logic_error("EventuallyPersistentEngine::doWalkTapQueue:"
2408                     " Unknown VBucketEvent message type:" +
2409                     std::to_string(ev.event) + " for connection:" +
2410                     connection->logHeader());
2411         }
2412         return ev.event;
2413     }
2414
2415     if (connection->waitForOpaqueMsgAck()) {
2416         return TAP_PAUSE;
2417     }
2418
2419     VBucketFilter backFillVBFilter;
2420     if (connection->runBackfill(backFillVBFilter)) {
2421         queueBackfill(backFillVBFilter, connection);
2422     }
2423
2424     uint8_t nru = INITIAL_NRU_VALUE;
2425     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2426     switch (ret) {
2427     case TAP_CHECKPOINT_START:
2428     case TAP_CHECKPOINT_END:
2429     case TAP_MUTATION:
2430     case TAP_DELETION:
2431         *itm = it;
2432         if (ret == TAP_MUTATION) {
2433             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2434                                                        it->getRevSeqno(), nru);
2435             *es = connection->specificData;
2436         } else if (ret == TAP_DELETION) {
2437             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2438                                                        it->getRevSeqno());
2439             *es = connection->specificData;
2440         } else if (ret == TAP_CHECKPOINT_START) {
2441             // Send the current value of the max deleted seqno
2442             VBucketPtr vb = getVBucket(*vbucket);
2443             if (!vb) {
2444                 retry = true;
2445                 return TAP_NOOP;
2446             }
2447             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2448                                                vb->ht.getMaxDeletedRevSeqno());
2449             *es = connection->specificData;
2450         }
2451         break;
2452     case TAP_NOOP:
2453         retry = true;
2454         break;
2455     default:
2456         break;
2457     }
2458
2459     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2460         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2461         if (vbev.event == TAP_VBUCKET_SET) {
2462             LOG(EXTENSION_LOG_NOTICE,
2463                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2464                 connection->logHeader(), vbev.vbucket,
2465                 VBucket::toString(vbev.state));
2466             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2467         }
2468         ret = vbev.event;
2469     }
2470
2471     return ret;
2472 }
2473
2474 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2475                                                   item **itm,
2476                                                   void **es,
2477                                                   uint16_t *nes,
2478                                                   uint8_t *ttl,
2479                                                   uint16_t *flags,
2480                                                   uint32_t *seqno,
2481                                                   uint16_t *vbucket) {
2482     TapProducer *connection = getTapProducer(cookie);
2483     if (!connection) {
2484         LOG(EXTENSION_LOG_WARNING,
2485             "Failed to lookup TAP connection.. Disconnecting\n");
2486         return TAP_DISCONNECT;
2487     }
2488
2489     connection->setPaused(false);
2490
2491     bool retry = false;
2492     uint16_t ret;
2493
2494     connection->setLastWalkTime();
2495     do {
2496         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2497                              seqno, vbucket, connection, retry);
2498     } while (retry);
2499
2500     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2501         connection->lastMsgTime = ep_current_time();
2502         if (ret == TAP_NOOP) {
2503             *seqno = 0;
2504         } else {
2505             ++stats.numTapFetched;
2506             *seqno = connection->getSeqno();
2507             if (connection->requestAck(ret, *vbucket)) {
2508                 *flags = TAP_FLAG_ACK;
2509                 connection->seqnoAckRequested = *seqno;
2510             }
2511
2512             if (ret == TAP_MUTATION) {
2513                 if (connection->haveFlagByteorderSupport()) {
2514                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2515                 }
2516             }
2517         }
2518     } else {
2519         connection->setPaused(true);
2520         connection->setNotifySent(false);
2521     }
2522
2523     return ret;
2524 }
2525
2526 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2527                                                 std::string &client,
2528                                                 uint32_t flags,
2529                                                 const void *userdata,
2530                                                 size_t nuserdata) {
2531     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2532         return false;
2533     }
2534
2535     std::string tapName = "eq_tapq:";
2536     if (client.length() == 0) {
2537         tapName.assign(ConnHandler::getAnonName());
2538     } else {
2539         tapName.append(client);
2540     }
2541
2542     // Decoding the userdata section of the packet and update the filters
2543     const char *ptr = static_cast<const char*>(userdata);
2544     uint64_t backfillAge = 0;
2545     std::vector<uint16_t> vbuckets;
2546     std::map<uint16_t, uint64_t> lastCheckpointIds;
2547
2548     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2549         if (nuserdata < sizeof(backfillAge)) {
2550             LOG(EXTENSION_LOG_WARNING,
2551                 "Backfill age is missing. Reject connection request from %s\n",
2552                 tapName.c_str());
2553             return false;
2554         }
2555         // use memcpy to avoid alignemt issues
2556         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2557         backfillAge = ntohll(backfillAge);
2558         nuserdata -= sizeof(backfillAge);
2559         ptr += sizeof(backfillAge);
2560     }
2561
2562     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2563         uint16_t nvbuckets;
2564         if (nuserdata < sizeof(nvbuckets)) {
2565             LOG(EXTENSION_LOG_WARNING,
2566             "Number of vbuckets is missing. Reject connection request from %s"
2567             "\n", tapName.c_str());
2568             return false;
2569         }
2570         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2571         nuserdata -= sizeof(nvbuckets);
2572         ptr += sizeof(nvbuckets);
2573         nvbuckets = ntohs(nvbuckets);
2574         if (nvbuckets > 0) {
2575             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2576                 LOG(EXTENSION_LOG_WARNING,
2577                 "# of vbuckets not matched. Reject connection request from %s"
2578                 "\n", tapName.c_str());
2579                 return false;
2580             }
2581             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2582                 uint16_t val;
2583                 memcpy(&val, ptr, sizeof(nvbuckets));
2584                 ptr += sizeof(uint16_t);
2585                 vbuckets.push_back(ntohs(val));
2586             }
2587             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2588         }
2589     }
2590
2591     if (flags & TAP_CONNECT_CHECKPOINT) {
2592         uint16_t nCheckpoints = 0;
2593         if (nuserdata >= sizeof(nCheckpoints)) {
2594             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2595             nuserdata -= sizeof(nCheckpoints);
2596             ptr += sizeof(nCheckpoints);
2597             nCheckpoints = ntohs(nCheckpoints);
2598         }
2599         if (nCheckpoints > 0) {
2600             if (nuserdata <
2601                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2602                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2603                     "Reject connection request from %s\n", tapName.c_str());
2604                 return false;
2605             }
2606             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2607                 uint16_t vbid;
2608                 uint64_t checkpointId;
2609                 memcpy(&vbid, ptr, sizeof(vbid));
2610                 ptr += sizeof(uint16_t);
2611                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2612                 ptr += sizeof(uint64_t);
2613                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2614             }
2615         }
2616     }
2617
2618     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2619                                  backfillAge,
2620                                  static_cast<int>(
2621                                  configuration.getTapKeepalive()),
2622                                  vbuckets,
2623                                  lastCheckpointIds);
2624
2625     tapConnMap->notifyPausedConnection(tp, true);
2626     return true;
2627 }
2628
2629 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2630                                                         void *engine_specific,
2631                                                         uint16_t nengine,
2632                                                         uint8_t ttl,
2633                                                         uint16_t tap_flags,
2634                                                         uint16_t tap_event,
2635                                                         uint32_t tap_seqno,
2636                                                         const void *key,
2637                                                         size_t nkey,
2638                                                         uint32_t flags,
2639                                                         uint32_t exptime,
2640                                                         uint64_t cas,
2641                                                         uint8_t datatype,
2642                                                         const void *data,
2643                                                         size_t ndata,
2644                                                         uint16_t vbucket)
2645 {
2646     (void) ttl;
2647     void *specific = getEngineSpecific(cookie);
2648     ConnHandler *connection = NULL;
2649     if (specific == NULL) {
2650         if (tap_event == TAP_ACK) {
2651             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2652                 "exist. Force disconnect...\n", (char *) cookie);
2653             // tap producer is no longer connected..
2654             return ENGINE_DISCONNECT;
2655         } else {
2656             connection = tapConnMap->newConsumer(cookie);
2657             if (connection == NULL) {
2658                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2659                     " Force disconnect\n");
2660                 return ENGINE_DISCONNECT;
2661             }
2662             storeEngineSpecific(cookie, connection);
2663         }
2664     } else {
2665         connection = reinterpret_cast<ConnHandler *>(specific);
2666     }
2667
2668
2669     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2670
2671     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2672         if (!replicationThrottle->shouldProcess()) {
2673             ++stats.replicationThrottled;
2674             if (connection->supportsAck()) {
2675                 ret = ENGINE_TMPFAIL;
2676             } else {
2677                 ret = ENGINE_DISCONNECT;
2678                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2679                     "ack support. Force disconnect...\n",
2680                     connection->logHeader());
2681             }
2682             return ret;
2683         }
2684     }
2685
2686     switch (tap_event) {
2687     case TAP_ACK:
2688         // TAP works only with the DefaultCollection
2689         ret = processTapAck(cookie, tap_seqno, tap_flags,
2690                             DocKey(static_cast<const uint8_t*>(key), nkey,
2691                                    DocNamespace::DefaultCollection));
2692         break;
2693     case TAP_FLUSH:
2694         ret = flush(cookie);
2695         LOG(EXTENSION_LOG_NOTICE, "%s Received flush.\n",
2696             connection->logHeader());
2697         break;
2698     case TAP_DELETION:
2699         {
2700             uint64_t revSeqno;
2701             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2702                                                 nengine, &revSeqno);
2703
2704             // Create key in DefaultCollection since TAP won't support collections
2705             const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2706                                 DocNamespace::DefaultCollection};
2707             ret = connection->deletion(0, docKey, {}, 0,
2708                                        PROTOCOL_BINARY_RAW_BYTES, cas, vbucket,
2709                                        0, revSeqno, {});
2710         }
2711         break;
2712
2713     case TAP_CHECKPOINT_START:
2714     case TAP_CHECKPOINT_END:
2715         {
2716             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2717             if (tc) {
2718                 if (tap_event == TAP_CHECKPOINT_START &&
2719                     nengine == TapEngineSpecific::sizeRevSeqno) {
2720                     // Set the current value for the max deleted seqno
2721                     VBucketPtr vb = getVBucket(vbucket);
2722                     if (!vb) {
2723                         return ENGINE_TMPFAIL;
2724                     }
2725                     uint64_t seqnum;
2726                     TapEngineSpecific::readSpecificData(tap_event,
2727                                                         engine_specific,
2728                                                         nengine,
2729                                                         &seqnum);
2730                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2731                 }
2732
2733                 if (data) {
2734                     uint64_t checkpointId;
2735                     memcpy(&checkpointId, data, sizeof(checkpointId));
2736                     checkpointId = ntohll(checkpointId);
2737                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2738                                           checkpointId);
2739                 }
2740                 else {
2741                     ret = ENGINE_DISCONNECT;
2742                     LOG(EXTENSION_LOG_WARNING,
2743                         "%s Checkpoint Id is missing in "
2744                         "CHECKPOINT messages. Force disconnect...\n",
2745                         connection->logHeader());
2746                 }
2747             }
2748             else {
2749                 ret = ENGINE_DISCONNECT;
2750                 LOG(EXTENSION_LOG_WARNING,
2751                     "%s not a consumer! Force disconnect\n",
2752                     connection->logHeader());
2753             }
2754         }
2755
2756         break;
2757
2758     case TAP_MUTATION:
2759         {
2760             uint8_t nru = INITIAL_NRU_VALUE;
2761             uint64_t revSeqno = 0;
2762             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2763                                                 nengine, &revSeqno, &nru);
2764
2765             if (!isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_JSON)) {
2766                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2767                 const unsigned char *dat = (const unsigned char*)data;
2768                 const int datlen = ndata;
2769                 if (checkUTF8JSON(dat, datlen)) {
2770                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2771                 }
2772             }
2773             // Create key in DefaultCollection since TAP won't support collections
2774             const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2775                                 DocNamespace::DefaultCollection};
2776             ret = connection->mutation(0, docKey,
2777                                        {static_cast<const uint8_t*>(data), ndata},
2778                                        0, datatype, cas,
2779                                        vbucket, flags, 0, revSeqno, exptime, 0,
2780                                        {}, nru);
2781         }
2782
2783         break;
2784
2785     case TAP_OPAQUE:
2786         if (nengine == sizeof(uint32_t)) {
2787             uint32_t cc;
2788             memcpy(&cc, engine_specific, sizeof(cc));
2789             cc = ntohl(cc);
2790
2791             switch (cc) {
2792             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2793                 // @todo: the memcached core will _ALWAYS_ send nack
2794                 //        if it encounter an error. This should be
2795                 // set as the default when we move to .next after 2.0
2796                 // (currently we need to allow the message for
2797                 // backwards compatibility)
2798                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2799                     connection->logHeader());
2800                 break;
2801             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2802                 connection->setSupportCheckpointSync(true);
2803                 LOG(EXTENSION_LOG_INFO,
2804                     "%s Enable checkpoint synchronization\n",
2805                     connection->logHeader());
2806                 break;
2807             case TAP_OPAQUE_OPEN_CHECKPOINT:
2808                 /**
2809                  * This event is only received by the TAP client that wants to
2810                  * get mutations from closed checkpoints only. At this time,
2811                  * only incremental backup client receives this event so that
2812                  * it can close the connection and reconnect later.
2813                  */
2814                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2815                     connection->logHeader());
2816                 break;
2817             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2818                 {
2819                     LOG(EXTENSION_LOG_INFO,
2820                         "%s Backfill started for vbucket %d.\n",
2821                         connection->logHeader(), vbucket);
2822                     BlockTimer timer(&stats.tapVbucketResetHisto);
2823                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2824                                                   ENGINE_DISCONNECT;
2825                     if (ret == ENGINE_DISCONNECT) {
2826                         LOG(EXTENSION_LOG_WARNING,
2827                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2828                             connection->logHeader(), vbucket);
2829                     } else {
2830                         LOG(EXTENSION_LOG_NOTICE,
2831                          "%s Reset vbucket %d was completed succecssfully.\n",
2832                             connection->logHeader(), vbucket);
2833                     }
2834
2835                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2836                     if (tc) {
2837                         tc->setBackfillPhase(true, vbucket);
2838                     } else {
2839                         ret = ENGINE_DISCONNECT;
2840                         LOG(EXTENSION_LOG_WARNING,
2841                             "TAP consumer doesn't exists. Force disconnect\n");
2842                     }
2843                 }
2844                 break;
2845             case TAP_OPAQUE_CLOSE_BACKFILL:
2846                 {
2847                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2848                         connection->logHeader());
2849                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2850                     if (tc) {
2851                         tc->setBackfillPhase(false, vbucket);
2852                     } else {
2853                         ret = ENGINE_DISCONNECT;
2854                         LOG(EXTENSION_LOG_WARNING,
2855                             "%s not a consumer! Force disconnect\n",
2856                             connection->logHeader());
2857                     }
2858                 }
2859                 break;
2860             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2861                 /**
2862                  * This event is sent by the eVBucketMigrator to notify that
2863                  * the source node closes the tap replication stream and
2864                  * switches to TAKEOVER_VBUCKETS phase.
2865                  * This is just an informative message and doesn't require any
2866                  * action.
2867                  */
2868                 LOG(EXTENSION_LOG_INFO,
2869                 "%s Received close tap stream. Switching to takeover phase.\n",
2870                     connection->logHeader());
2871                 break;
2872             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2873                 /**
2874                  * This opaque message is just for notifying that the source
2875                  * node receives change_vbucket_filter request and processes
2876                  * it successfully.
2877                  */
2878                 LOG(EXTENSION_LOG_INFO,
2879                 "%s Notified that the source node changed a vbucket filter.\n",
2880                     connection->logHeader());
2881                 break;
2882             default:
2883                 LOG(EXTENSION_LOG_WARNING,
2884                     "%s Received an unknown opaque command\n",
2885                     connection->logHeader());
2886             }
2887         } else {
2888             LOG(EXTENSION_LOG_WARNING,
2889                 "%s Received tap opaque with unknown size %d\n",
2890                 connection->logHeader(), nengine);
2891         }
2892         break;
2893
2894     case TAP_VBUCKET_SET:
2895         {
2896             BlockTimer timer(&stats.tapVbucketSetHisto);
2897
2898             if (nengine != sizeof(vbucket_state_t)) {
2899                 // illegal datasize
2900                 LOG(EXTENSION_LOG_WARNING,
2901                     "%s Received TAP_VBUCKET_SET with illegal size."
2902                     " Force disconnect\n", connection->logHeader());
2903                 ret = ENGINE_DISCONNECT;
2904                 break;
2905             }
2906
2907             vbucket_state_t state;
2908             memcpy(&state, engine_specific, nengine);
2909             state = (vbucket_state_t)ntohl(state);
2910
2911             ret = connection->setVBucketState(0, vbucket, state);
2912         }
2913         break;
2914
2915     default:
2916         // Unknown command
2917         LOG(EXTENSION_LOG_WARNING,
2918             "%s Recieved bad opcode, ignoring message\n",
2919             connection->logHeader());
2920     }
2921
2922     connection->processedEvent(tap_event, ret);
2923     return ret;
2924 }
2925
2926 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2927                                                       TapConsumer *consumer,
2928                                                       uint8_t event,
2929                                                       uint16_t vbucket,
2930                                                       uint64_t checkpointId) {
2931     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2932
2933     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2934         getKVBucket()->wakeUpFlusher();
2935         ret = ENGINE_SUCCESS;
2936     }
2937     else {
2938         ret = ENGINE_DISCONNECT;
2939         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2940             "checkpoint %" PRIu64 ". Force disconnect\n",
2941             consumer->logHeader(), checkpointId);
2942     }
2943
2944     return ret;
2945 }
2946
2947 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2948     TapProducer *rv =
2949         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2950     if (!(rv && rv->isConnected())) {
2951         LOG(EXTENSION_LOG_WARNING,
2952             "Walking a non-existent tap queue, disconnecting\n");
2953         return NULL;
2954     }
2955
2956     if (rv->doDisconnect()) {
2957         LOG(EXTENSION_LOG_WARNING,
2958             "%s Disconnecting pending connection\n", rv->logHeader());
2959         return NULL;
2960     }
2961     return rv;
2962 }
2963
2964 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2965     // Register the ON_DISCONNECT callback
2966     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2967     // Register the ON_DELETE_BUCKET callback
2968     registerEngineCallback(ON_DELETE_BUCKET, EvpHandleDeleteBucket, this);
2969 }
2970
2971 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2972                                                             uint32_t seqno,
2973                                                             uint16_t status,
2974                                                             const DocKey& key)
2975 {
2976     TapProducer *connection = getTapProducer(cookie);
2977     if (!connection) {
2978         LOG(EXTENSION_LOG_WARNING,
2979             "Unable to process tap ack. No producer found\n");
2980         return ENGINE_DISCONNECT;
2981     }
2982
2983     return connection->processAck(seqno, status, key);
2984 }
2985
2986 ENGINE_ERROR_CODE EventuallyPersistentEngine::memoryCondition() {
2987     // Do we think it's possible we could free something?
2988     bool haveEvidenceWeCanFreeMemory =
2989         (stats.getMaxDataSize() > stats.memOverhead->load());
2990     if (haveEvidenceWeCanFreeMemory) {
2991         // Look for more evidence by seeing if we have resident items.
2992         VBucketCountVisitor countVisitor(vbucket_state_active);
2993         kvBucket->visit(countVisitor);
2994
2995         haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
2996             countVisitor.getNumItems();
2997     }
2998     if (haveEvidenceWeCanFreeMemory) {
2999         ++stats.tmp_oom_errors;
3000         // Wake up the item pager task as memory usage
3001         // seems to have exceeded high water mark
3002         getKVBucket()->wakeUpItemPager();
3003         return ENGINE_TMPFAIL;
3004     } else {
3005         ++stats.oom_errors;
3006         return ENGINE_ENOMEM;
3007     }
3008 }
3009
3010 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
3011                                                              &backfillVBFilter,
3012                                                Producer *tc)
3013 {
3014     auto bfv = std::make_unique<BackFillVisitor>(
3015             this, *tapConnMap, tc, backfillVBFilter);
3016     getKVBucket()->visit(std::move(bfv),
3017                          "Backfill task",
3018                          TaskId::BackfillVisitorTask,
3019                          1);
3020 }
3021
3022 void VBucketCountAggregator::visitBucket(VBucketPtr &vb) {
3023     std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3024     it = visitorMap.find(vb->getState());
3025     if ( it != visitorMap.end() ) {
3026         it->second->visitBucket(vb);
3027     }
3028 }
3029
3030 void VBucketCountAggregator::addVisitor(VBucketCountVisitor* visitor) {
3031     visitorMap[visitor->getVBucketState()] = visitor;
3032 }
3033
3034 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3035                                                            ADD_STAT add_stat) {
3036
3037     configuration.addStats(add_stat, cookie);
3038
3039     EPStats &epstats = getEpStats();
3040     add_casted_stat("ep_version", VERSION, add_stat, cookie);
3041     add_casted_stat("ep_storage_age",
3042                     epstats.dirtyAge, add_stat, cookie);
3043     add_casted_stat("ep_storage_age_highwat",
3044                     epstats.dirtyAgeHighWat, add_stat, cookie);
3045     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3046                     add_stat, cookie);
3047
3048     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3049         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3050     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3051         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3052     }
3053
3054     add_casted_stat("ep_total_enqueued",
3055                     epstats.totalEnqueued, add_stat, cookie);
3056     add_casted_stat("ep_expired_access", epstats.expired_access,
3057                     add_stat, cookie);
3058     add_casted_stat("ep_expired_compactor", epstats.expired_compactor,
3059                     add_stat, cookie);
3060     add_casted_stat("ep_expired_pager", epstats.expired_pager,
3061                     add_stat, cookie);
3062     add_casted_stat("ep_queue_size",
3063                     epstats.diskQueueSize, add_stat, cookie);
3064     add_casted_stat("ep_diskqueue_items",
3065                     epstats.diskQueueSize, add_stat, cookie);
3066     add_casted_stat("ep_vb_backfill_queue_size",
3067                     epstats.vbBackfillQueueSize,
3068                     add_stat,
3069                     cookie);
3070     auto* flusher = kvBucket->getFlusher(EP_PRIMARY_SHARD);
3071     if (flusher) {
3072         add_casted_stat("ep_commit_num", epstats.flusherCommits,
3073                         add_stat, cookie);
3074         add_casted_stat("ep_commit_time",
3075                         epstats.commit_time, add_stat, cookie);
3076         add_casted_stat("ep_commit_time_total",
3077                         epstats.cumulativeCommitTime, add_stat, cookie);
3078         add_casted_stat("ep_item_begin_failed",
3079                         epstats.beginFailed, add_stat, cookie);
3080         add_casted_stat("ep_item_commit_failed",
3081                         epstats.commitFailed, add_stat, cookie);
3082         add_casted_stat("ep_item_flush_expired",
3083                         epstats.flushExpired, add_stat, cookie);
3084         add_casted_stat("ep_item_flush_failed",
3085                         epstats.flushFailed, add_stat, cookie);
3086         add_casted_stat("ep_flusher_state",
3087                         flusher->stateName(), add_stat, cookie);
3088         add_casted_stat("ep_flusher_todo",
3089                         epstats.flusher_todo, add_stat, cookie);
3090         add_casted_stat("ep_total_persisted",
3091                         epstats.totalPersisted, add_stat, cookie);
3092         add_casted_stat("ep_uncommitted_items",
3093                         epstats.flusher_todo, add_stat, cookie);
3094         add_casted_stat("ep_chk_persistence_timeout",
3095                         VBucket::getCheckpointFlushTimeout(),
3096                         add_stat,
3097                         cookie);
3098     }
3099     add_casted_stat("ep_vbucket_del",
3100                     epstats.vbucketDeletions, add_stat, cookie);
3101     add_casted_stat("ep_vbucket_del_fail",
3102                     epstats.vbucketDeletionFail, add_stat, cookie);
3103     add_casted_stat("ep_flush_duration_total",
3104                     epstats.cumulativeFlushTime, add_stat, cookie);
3105
3106     kvBucket->getAggregatedVBucketStats(cookie, add_stat);
3107
3108     kvBucket->getFileStats(cookie, add_stat);
3109
3110     add_casted_stat("ep_persist_vbstate_total",
3111                     epstats.totalPersistVBState, add_stat, cookie);
3112
3113     size_t memUsed =  stats.getTotalMemoryUsed();
3114     add_casted_stat("mem_used", memUsed, add_stat, cookie);
3115     add_casted_stat("ep_mem_low_wat_percent", stats.mem_low_wat_percent,
3116                     add_stat, cookie);
3117     add_casted_stat("ep_mem_high_wat_percent", stats.mem_high_wat_percent,
3118                     add_stat, cookie);
3119     add_casted_stat("bytes", memUsed, add_stat, cookie);
3120     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3121     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3122 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3123     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3124 #else
3125     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3126 #endif
3127     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3128     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3129                     add_stat, cookie);
3130 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3131     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3132 #else
3133     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3134 #endif
3135     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3136     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3137     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3138
3139     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3140     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3141                     add_stat, cookie);
3142     add_casted_stat("ep_mem_tracker_enabled", stats.memoryTrackerEnabled,
3143                     add_stat, cookie);
3144     add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3145                     add_stat, cookie);
3146     add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3147                     add_stat, cookie);
3148     add_casted_stat("ep_bg_remaining_items", epstats.numRemainingBgItems,
3149                     add_stat, cookie);
3150     add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3151                     add_stat, cookie);
3152     add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3153                     add_stat, cookie);
3154     add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3155                     add_stat, cookie);
3156     add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3157                     add_stat, cookie);
3158     add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3159                     add_stat, cookie);
3160     add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3161                     add_stat, cookie);
3162     add_casted_stat("ep_items_rm_from_checkpoints",
3163                     epstats.itemsRemovedFromCheckpoints,
3164                     add_stat, cookie);
3165     add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3166                     add_stat, cookie);
3167     add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3168                     add_stat, cookie);
3169     add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3170                     add_stat, cookie);
3171
3172     add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3173     add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3174                     add_stat, cookie);
3175     add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3176                     add_stat, cookie);
3177     add_casted_stat("ep_pending_ops_max_duration",
3178                     epstats.pendingOpsMaxDuration,
3179                     add_stat, cookie);
3180
3181     add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3182                     add_stat, cookie);
3183     add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3184                     add_stat, cookie);
3185
3186     size_t vbDeletions = epstats.vbucketDeletions.load();
3187     if (vbDeletions > 0) {
3188         add_casted_stat("ep_vbucket_del_max_walltime",
3189                         epstats.vbucketDelMaxWalltime,
3190                         add_stat, cookie);
3191         add_casted_stat("ep_vbucket_del_avg_walltime",
3192                         epstats.vbucketDelTotWalltime / vbDeletions,
3193                         add_stat, cookie);
3194     }
3195
3196     size_t numBgOps = epstats.bgNumOperations.load();
3197     if (numBgOps > 0) {
3198         add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3199                         add_stat, cookie);
3200         add_casted_stat("ep_bg_min_wait",
3201                         epstats.bgMinWait,
3202                         add_stat, cookie);
3203         add_casted_stat("ep_bg_max_wait",
3204                         epstats.bgMaxWait,
3205                         add_stat, cookie);
3206         add_casted_stat("ep_bg_wait_avg",
3207                         epstats.bgWait / numBgOps,
3208                         add_stat, cookie);
3209         add_casted_stat("ep_bg_min_load",
3210                         epstats.bgMinLoad,
3211                         add_stat, cookie);
3212         add_casted_stat("ep_bg_max_load",
3213                         epstats.bgMaxLoad,
3214                         add_stat, cookie);
3215         add_casted_stat("ep_bg_load_avg",
3216                         epstats.bgLoad / numBgOps,
3217                         add_stat, cookie);
3218         add_casted_stat("ep_bg_wait",
3219                         epstats.bgWait,
3220                         add_stat, cookie);
3221         add_casted_stat("ep_bg_load",
3222                         epstats.bgLoad,
3223                         add_stat, cookie);
3224     }
3225
3226     add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3227
3228     add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3229                     add_stat, cookie);
3230     add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3231                     add_stat, cookie);
3232     add_casted_stat("ep_num_access_scanner_skips",
3233                     epstats.accessScannerSkips, add_stat, cookie);
3234     add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3235                     add_stat, cookie);
3236     add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3237                     add_stat, cookie);
3238
3239     if (kvBucket->isAccessScannerEnabled() && epstats.alogTime.load() != 0)
3240     {
3241         char timestr[20];
3242         struct tm alogTim;
3243         hrtime_t alogTime = epstats.alogTime.load();
3244         if (cb_gmtime_r((time_t *)&alogTime, &alogTim) == -1) {
3245             add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3246                             cookie);
3247         } else {
3248             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3249             add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3250                             cookie);
3251         }
3252     } else {
3253         add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3254                         add_stat, cookie);
3255     }
3256
3257     if (kvBucket->isExpPagerEnabled()) {
3258         char timestr[20];
3259         struct tm expPagerTim;
3260         hrtime_t expPagerTime = epstats.expPagerTime.load();
3261         if (cb_gmtime_r((time_t *)&expPagerTime, &expPagerTim) == -1) {
3262             add_casted_stat("ep_expiry_pager_task_time", "UNKNOWN", add_stat,
3263                             cookie);
3264         } else {
3265             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &expPagerTim);
3266             add_casted_stat("ep_expiry_pager_task_time", timestr, add_stat,
<