MB-23906: Implement delete-with-value with store() instead of delete()
[ep-engine.git] / src / ep_engine.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21
22 #include "backfill.h"
23 #include "common.h"
24 #include "connmap.h"
25 #include "dcp/consumer.h"
26 #include "dcp/dcpconnmap.h"
27 #include "dcp/flow-control-manager.h"
28 #include "dcp/producer.h"
29 #include "ep_bucket.h"
30 #include "ep_vb.h"
31 #include "ephemeral_bucket.h"
32 #include "failover-table.h"
33 #include "flusher.h"
34 #include "htresizer.h"
35 #include "logger.h"
36 #include "memory_tracker.h"
37 #include "replicationthrottle.h"
38 #include "stats-info.h"
39 #define STATWRITER_NAMESPACE core_engine
40 #include "statwriter.h"
41 #undef STATWRITER_NAMESPACE
42 #include "string_utils.h"
43 #include "tapconnmap.h"
44 #include "vb_count_visitor.h"
45 #include "warmup.h"
46
47 #include <JSON_checker.h>
48 #include <cJSON_utils.h>
49 #include <memcached/engine.h>
50 #include <memcached/extension.h>
51 #include <memcached/protocol_binary.h>
52 #include <memcached/server_api.h>
53 #include <memcached/util.h>
54 #include <platform/cb_malloc.h>
55 #include <platform/checked_snprintf.h>
56 #include <platform/make_unique.h>
57 #include <platform/platform.h>
58 #include <platform/processclock.h>
59 #include <xattr/utils.h>
60
61 #include <cstdio>
62 #include <cstring>
63 #include <fcntl.h>
64 #include <fstream>
65 #include <iostream>
66 #include <limits>
67 #include <mutex>
68 #include <stdarg.h>
69 #include <string>
70 #include <vector>
71
72 static size_t percentOf(size_t val, double percent) {
73     return static_cast<size_t>(static_cast<double>(val) * percent);
74 }
75
76 struct EPHandleReleaser {
77     void operator()(EventuallyPersistentEngine*) {
78         ObjectRegistry::onSwitchThread(nullptr);
79     }
80 };
81
82 using EPHandle = std::unique_ptr<EventuallyPersistentEngine, EPHandleReleaser>;
83
84 /**
85  * Helper function to acquire a handle to the engine which allows access to
86  * the engine while the handle is in scope.
87  * @param handle pointer to the engine
88  * @return EPHandle which is a unique_ptr to an EventuallyPersistentEngine
89  * with a custom deleter (EPHandleReleaser) which performs the required
90  * ObjectRegistry release.
91  */
92
93 static inline EPHandle acquireEngine(ENGINE_HANDLE* handle) {
94     auto ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
95     ObjectRegistry::onSwitchThread(ret);
96
97     return EPHandle(ret);
98 }
99
100 /**
101  * Call the response callback and return the appropriate value so that
102  * the core knows what to do..
103  */
104 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
105                                       uint16_t keylen,
106                                       const void *ext, uint8_t extlen,
107                                       const void *body, uint32_t bodylen,
108                                       uint8_t datatype, uint16_t status,
109                                       uint64_t cas, const void *cookie)
110 {
111     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
112     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
113     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
114                  status, cas, cookie)) {
115         rv = ENGINE_SUCCESS;
116     }
117     ObjectRegistry::onSwitchThread(e);
118     return rv;
119 }
120
121 template <typename T>
122 static void validate(T v, T l, T h) {
123     if (v < l || v > h) {
124         throw std::runtime_error("Value out of range.");
125     }
126 }
127
128
129 static void checkNumeric(const char* str) {
130     int i = 0;
131     if (str[0] == '-') {
132         i++;
133     }
134     for (; str[i]; i++) {
135         using namespace std;
136         if (!isdigit(str[i])) {
137             throw std::runtime_error("Value is not numeric");
138         }
139     }
140 }
141
142 static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle) {
143     return acquireEngine(handle)->getInfo();
144 }
145
146 static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
147                                        const char* config_str) {
148     return acquireEngine(handle)->initialize(config_str);
149 }
150
151 static void EvpDestroy(ENGINE_HANDLE* handle, const bool force) {
152     auto eng = acquireEngine(handle);
153     eng->destroy(force);
154     delete eng.get();
155 }
156
157 static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
158                                          const void* cookie,
159                                          item** itm,
160                                          const DocKey& key,
161                                          const size_t nbytes,
162                                          const int flags,
163                                          const rel_time_t exptime,
164                                          uint8_t datatype,
165                                          uint16_t vbucket) {
166     if (!mcbp::datatype::is_valid(datatype)) {
167         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
168             " (ItemAllocate)");
169         return ENGINE_EINVAL;
170     }
171
172     return acquireEngine(handle)->itemAllocate(itm,
173                                                key,
174                                                nbytes,
175                                                0, // No privileged bytes
176                                                flags,
177                                                exptime,
178                                                datatype,
179                                                vbucket);
180 }
181
182 static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
183                            const item* itm, item_info *itm_info);
184 static void EvpItemRelease(ENGINE_HANDLE* handle, const void *cookie,
185                            item* itm);
186
187
188 static std::pair<cb::unique_item_ptr, item_info> EvpItemAllocateEx(ENGINE_HANDLE* handle,
189                                                                    const void* cookie,
190                                                                    const DocKey& key,
191                                                                    const size_t nbytes,
192                                                                    const size_t priv_nbytes,
193                                                                    const int flags,
194                                                                    const rel_time_t exptime,
195                                                                    uint8_t datatype,
196                                                                    uint16_t vbucket) {
197
198     item* it = nullptr;
199     auto err = acquireEngine(handle)->itemAllocate(
200             &it, key, nbytes, priv_nbytes, flags, exptime, datatype, vbucket);
201
202     if (err != ENGINE_SUCCESS) {
203         throw cb::engine_error(cb::engine_errc(err),
204                                "EvpItemAllocateEx: failed to allocate memory");
205     }
206
207     item_info info;
208     if (!EvpGetItemInfo(handle, cookie, it, &info)) {
209         EvpItemRelease(handle, cookie, it);
210         throw cb::engine_error(cb::engine_errc::failed,
211                                "EvpItemAllocateEx: EvpGetItemInfo failed");
212     }
213
214     return std::make_pair(cb::unique_item_ptr{it, cb::ItemDeleter{handle}},
215                           info);
216 }
217
218 static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
219                                        const void* cookie,
220                                        const DocKey& key,
221                                        uint64_t* cas,
222                                        uint16_t vbucket,
223                                        mutation_descr_t* mut_info) {
224     if (!cas) {
225         LOG(EXTENSION_LOG_WARNING,
226             "EvpItemDelete(): cas ptr passed is null for vb: %" PRIu16,
227             vbucket);
228         return ENGINE_EINVAL;
229     }
230     return acquireEngine(handle)->itemDelete(
231             cookie, key, *cas, vbucket, nullptr, mut_info);
232 }
233
234 static void EvpItemRelease(ENGINE_HANDLE* handle,
235                            const void* cookie,
236                            item* itm) {
237     acquireEngine(handle)->itemRelease(cookie, itm);
238 }
239
240 static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
241                                 const void* cookie,
242                                 item** itm,
243                                 const DocKey& key,
244                                 uint16_t vbucket,
245                                 DocStateFilter documentStateFilter) {
246     get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
247                                                        HONOR_STATES |
248                                                        TRACK_REFERENCE |
249                                                        DELETE_TEMP |
250                                                        HIDE_LOCKED_CAS |
251                                                        TRACK_STATISTICS);
252
253     switch (documentStateFilter) {
254     case DocStateFilter::Alive:
255         break;
256     case DocStateFilter::Deleted:
257         // MB-23640 was caused by this bug as the frontend asked for
258         // Alive and Deleted documents. The internals don't have a
259         // way of requesting just deleted documents, and luckily for
260         // us no part of our code is using this yet. Return an error
261         // if anyone start using it
262         return ENGINE_ENOTSUP;
263     case DocStateFilter::AliveOrDeleted:
264         options = static_cast<get_options_t>(options | GET_DELETED_VALUE);
265         break;
266     }
267
268     return acquireEngine(handle)->get(cookie, itm, key, vbucket, options);
269 }
270
271 static cb::EngineErrorItemPair EvpGetIf(ENGINE_HANDLE* handle,
272                                         const void* cookie,
273                                         const DocKey& key,
274                                         uint16_t vbucket,
275                                         std::function<bool(
276                                             const item_info&)> filter) {
277     return acquireEngine(handle)->get_if(cookie, key, vbucket, filter);
278 }
279
280 static ENGINE_ERROR_CODE EvpGetLocked(ENGINE_HANDLE* handle,
281                                       const void* cookie,
282                                       item** itm,
283                                       const DocKey& key,
284                                       uint16_t vbucket,
285                                       uint32_t lock_timeout) {
286     return acquireEngine(handle)->get_locked(
287             cookie, itm, key, vbucket, lock_timeout);
288 }
289
290 static ENGINE_ERROR_CODE EvpUnlock(ENGINE_HANDLE* handle,
291                                    const void* cookie,
292                                    const DocKey& key,
293                                    uint16_t vbucket,
294                                    uint64_t cas) {
295     return acquireEngine(handle)->unlock(cookie, key, vbucket, cas);
296 }
297
298 static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
299                                      const void* cookie,
300                                      const char* stat_key,
301                                      int nkey,
302                                      ADD_STAT add_stat) {
303     return acquireEngine(handle)->getStats(cookie, stat_key, nkey, add_stat);
304 }
305
306 static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
307                                   const void* cookie,
308                                   item* itm,
309                                   uint64_t* cas,
310                                   ENGINE_STORE_OPERATION operation,
311                                   DocumentState document_state) {
312     auto engine = acquireEngine(handle);
313
314     if (document_state == DocumentState::Deleted) {
315         Item* item = static_cast<Item*>(itm);
316         item->setDeleted();
317     }
318
319     return engine->store(cookie, itm, cas, operation);
320 }
321
322 static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
323                                   const void* cookie) {
324     return acquireEngine(handle)->flush(cookie);
325 }
326
327 static void EvpResetStats(ENGINE_HANDLE* handle, const void*) {
328     acquireEngine(handle)->resetStats();
329 }
330
331 protocol_binary_response_status EventuallyPersistentEngine::setTapParam(
332         const char* keyz, const char* valz, std::string& msg) {
333     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
334
335     try {
336         if (strcmp(keyz, "tap_keepalive") == 0) {
337             int v = std::stoi(valz);
338             validate(v, 0, MAX_TAP_KEEP_ALIVE);
339             getConfiguration().requirementsMetOrThrow("tap_keepalive");
340             setTapKeepAlive(static_cast<uint32_t>(v));
341         } else if (strcmp(keyz, "replication_throttle_threshold") == 0) {
342             getConfiguration().setReplicationThrottleThreshold(
343                     std::stoull(valz));
344         } else if (strcmp(keyz, "replication_throttle_queue_cap") == 0) {
345             getConfiguration().setReplicationThrottleQueueCap(std::stoll(valz));
346         } else if (strcmp(keyz, "replication_throttle_cap_pcnt") == 0) {
347             getConfiguration().setReplicationThrottleCapPcnt(std::stoull(valz));
348         } else {
349             msg = "Unknown config param";
350             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
351         }
352         // Handles exceptions thrown by the standard
353         // library stoi/stoul style functions when not numeric
354     } catch (std::invalid_argument&) {
355         msg = "Argument was not numeric";
356         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
357
358         // Handles exceptions thrown by the standard library stoi/stoul
359         // style functions when the conversion does not fit in the datatype
360     } catch (std::out_of_range&) {
361         msg = "Argument was out of range";
362         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
363
364         // Handles any miscellaenous exceptions in addition to the range_error
365         // exceptions thrown by the configuration::set<param>() methods
366     } catch (std::exception& error) {
367         msg = error.what();
368         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
369     }
370
371     return rv;
372 }
373
374 protocol_binary_response_status EventuallyPersistentEngine::setCheckpointParam(
375         const char* keyz, const char* valz, std::string& msg) {
376     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
377
378     try {
379         if (strcmp(keyz, "chk_max_items") == 0) {
380             size_t v = std::stoull(valz);
381             validate(v, size_t(MIN_CHECKPOINT_ITEMS),
382                      size_t(MAX_CHECKPOINT_ITEMS));
383             getConfiguration().setChkMaxItems(v);
384         } else if (strcmp(keyz, "chk_period") == 0) {
385             size_t v = std::stoull(valz);
386             validate(v, size_t(MIN_CHECKPOINT_PERIOD),
387                      size_t(MAX_CHECKPOINT_PERIOD));
388             getConfiguration().setChkPeriod(v);
389         } else if (strcmp(keyz, "max_checkpoints") == 0) {
390             size_t v = std::stoull(valz);
391             validate(v, size_t(DEFAULT_MAX_CHECKPOINTS),
392                      size_t(MAX_CHECKPOINTS_UPPER_BOUND));
393             getConfiguration().setMaxCheckpoints(v);
394         } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
395             getConfiguration().setItemNumBasedNewChk(cb_stob(valz));
396         } else if (strcmp(keyz, "keep_closed_chks") == 0) {
397             getConfiguration().setKeepClosedChks(cb_stob(valz));
398         } else if (strcmp(keyz, "enable_chk_merge") == 0) {
399             getConfiguration().setEnableChkMerge(cb_stob(valz));
400         } else {
401             msg = "Unknown config param";
402             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
403         }
404
405         // Handles exceptions thrown by the cb_stob function
406     } catch (invalid_argument_bool& error) {
407         msg = error.what();
408         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
409
410         // Handles exceptions thrown by the standard
411         // library stoi/stoul style functions when not numeric
412     } catch (std::invalid_argument&) {
413         msg = "Argument was not numeric";
414         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
415
416         // Handles exceptions thrown by the standard library stoi/stoul
417         // style functions when the conversion does not fit in the datatype
418     } catch (std::out_of_range&) {
419         msg = "Argument was out of range";
420         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
421
422         // Handles any miscellaenous exceptions in addition to the range_error
423         // exceptions thrown by the configuration::set<param>() methods
424     } catch (std::exception& error) {
425         msg = error.what();
426         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
427     }
428
429     return rv;
430 }
431
432 protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
433         const char* keyz, const char* valz, std::string& msg) {
434     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
435
436     // Handle the actual mutation.
437     try {
438         if (strcmp(keyz, "bg_fetch_delay") == 0) {
439             getConfiguration().setBgFetchDelay(std::stoull(valz));
440         } else if (strcmp(keyz, "flushall_enabled") == 0) {
441             getConfiguration().setFlushallEnabled(cb_stob(valz));
442         } else if (strcmp(keyz, "max_size") == 0) {
443             size_t vsize = std::stoull(valz);
444
445             getConfiguration().setMaxSize(vsize);
446             EPStats& st = getEpStats();
447             getConfiguration().setMemLowWat(
448                     percentOf(vsize, st.mem_low_wat_percent));
449             getConfiguration().setMemHighWat(
450                     percentOf(vsize, st.mem_high_wat_percent));
451         } else if (strcmp(keyz, "mem_low_wat") == 0) {
452             getConfiguration().setMemLowWat(std::stoull(valz));
453         } else if (strcmp(keyz, "mem_high_wat") == 0) {
454             getConfiguration().setMemHighWat(std::stoull(valz));
455         } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
456             getConfiguration().setBackfillMemThreshold(std::stoull(valz));
457         } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
458             getConfiguration().setCompactionExpMemThreshold(std::stoull(valz));
459         } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
460             getConfiguration().setMutationMemThreshold(std::stoull(valz));
461         } else if (strcmp(keyz, "timing_log") == 0) {
462             EPStats& stats = getEpStats();
463             std::ostream* old = stats.timingLog;
464             stats.timingLog = NULL;
465             delete old;
466             if (strcmp(valz, "off") == 0) {
467                 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
468             } else {
469                 std::ofstream* tmp(new std::ofstream(valz));
470                 if (tmp->good()) {
471                     LOG(EXTENSION_LOG_INFO,
472                         "Logging detailed timings to ``%s''.", valz);
473                     stats.timingLog = tmp;
474                 } else {
475                     LOG(EXTENSION_LOG_WARNING,
476                         "Error setting detailed timing log to ``%s'':  %s",
477                         valz, strerror(errno));
478                     delete tmp;
479                 }
480             }
481         } else if (strcmp(keyz, "exp_pager_enabled") == 0) {
482             getConfiguration().setExpPagerEnabled(cb_stob(valz));
483         } else if (strcmp(keyz, "exp_pager_stime") == 0) {
484             getConfiguration().setExpPagerStime(std::stoull(valz));
485         } else if (strcmp(keyz, "exp_pager_initial_run_time") == 0) {
486             getConfiguration().setExpPagerInitialRunTime(std::stoll(valz));
487         } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
488             getConfiguration().requirementsMetOrThrow("access_scanner_enabled");
489             getConfiguration().setAccessScannerEnabled(cb_stob(valz));
490         } else if (strcmp(keyz, "alog_sleep_time") == 0) {
491             getConfiguration().requirementsMetOrThrow("alog_sleep_time");
492             getConfiguration().setAlogSleepTime(std::stoull(valz));
493         } else if (strcmp(keyz, "alog_task_time") == 0) {
494             getConfiguration().requirementsMetOrThrow("alog_task_time");
495             getConfiguration().setAlogTaskTime(std::stoull(valz));
496         } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
497             getConfiguration().setPagerActiveVbPcnt(std::stoull(valz));
498         } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
499             getConfiguration().setWarmupMinMemoryThreshold(std::stoull(valz));
500         } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
501             getConfiguration().setWarmupMinItemsThreshold(std::stoull(valz));
502         } else if (strcmp(keyz, "max_num_readers") == 0 ||
503                    strcmp(keyz, "num_reader_threads") == 0) {
504             size_t value = std::stoull(valz);
505             getConfiguration().setNumReaderThreads(value);
506             ExecutorPool::get()->setNumReaders(value);
507         } else if (strcmp(keyz, "max_num_writers") == 0 ||
508                    strcmp(keyz, "num_writer_threads") == 0) {
509             size_t value = std::stoull(valz);
510             getConfiguration().setNumWriterThreads(value);
511             ExecutorPool::get()->setNumWriters(value);
512         } else if (strcmp(keyz, "max_num_auxio") == 0 ||
513                    strcmp(keyz, "num_auxio_threads") == 0) {
514             size_t value = std::stoull(valz);
515             getConfiguration().setNumAuxioThreads(value);
516             ExecutorPool::get()->setNumAuxIO(value);
517         } else if (strcmp(keyz, "max_num_nonio") == 0 ||
518                    strcmp(keyz, "num_nonio_threads") == 0) {
519             size_t value = std::stoull(valz);
520             getConfiguration().setNumNonioThreads(value);
521             ExecutorPool::get()->setNumNonIO(value);
522         } else if (strcmp(keyz, "bfilter_enabled") == 0) {
523             getConfiguration().setBfilterEnabled(cb_stob(valz));
524         } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
525             getConfiguration().setBfilterResidencyThreshold(std::stof(valz));
526         } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
527             getConfiguration().setDefragmenterEnabled(cb_stob(valz));
528         } else if (strcmp(keyz, "defragmenter_interval") == 0) {
529             size_t v = std::stoull(valz);
530             // Adding separate validation as external limit is minimum 1
531             // to prevent setting defragmenter to constantly run
532             validate(v, size_t(1), std::numeric_limits<size_t>::max());
533             getConfiguration().setDefragmenterInterval(v);
534         } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
535             getConfiguration().setDefragmenterAgeThreshold(std::stoull(valz));
536         } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
537             getConfiguration().setDefragmenterChunkDuration(std::stoull(valz));
538         } else if (strcmp(keyz, "defragmenter_run") == 0) {
539             runDefragmenterTask();
540         } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
541             getConfiguration().setCompactionWriteQueueCap(std::stoull(valz));
542         } else if (strcmp(keyz, "dcp_min_compression_ratio") == 0) {
543             getConfiguration().setDcpMinCompressionRatio(std::stof(valz));
544         } else if (strcmp(keyz, "access_scanner_run") == 0) {
545             if (!(runAccessScannerTask())) {
546                 rv = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
547             }
548         } else if (strcmp(keyz, "vb_state_persist_run") == 0) {
549             runVbStatePersistTask(std::stoi(valz));
550         } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
551             getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
552             getConfiguration().setEphemeralFullPolicy(valz);
553         } else if (strcmp(keyz, "ephemeral_metadata_purge_age") == 0) {
554             getConfiguration().requirementsMetOrThrow(
555                     "ephemeral_metadata_purge_age");
556             getConfiguration().setEphemeralMetadataPurgeAge(std::stoull(valz));
557         } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
558             getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
559             getConfiguration().setEphemeralMetadataPurgeInterval(
560                     std::stoull(valz));
561         } else {
562             msg = "Unknown config param";
563             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
564         }
565         // Handles exceptions thrown by the cb_stob function
566     } catch (invalid_argument_bool& error) {
567         msg = error.what();
568         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
569
570         // Handles exceptions thrown by the standard
571         // library stoi/stoul style functions when not numeric
572     } catch (std::invalid_argument&) {
573         msg = "Argument was not numeric";
574         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
575
576         // Handles exceptions thrown by the standard library stoi/stoul
577         // style functions when the conversion does not fit in the datatype
578     } catch (std::out_of_range&) {
579         msg = "Argument was out of range";
580         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
581
582         // Handles any miscellaneous exceptions in addition to the range_error
583         // exceptions thrown by the configuration::set<param>() methods
584     } catch (std::exception& error) {
585         msg = error.what();
586         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
587     }
588
589     return rv;
590 }
591
592 protocol_binary_response_status EventuallyPersistentEngine::setDcpParam(
593         const char* keyz, const char* valz, std::string& msg) {
594     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
595     try {
596
597         if (strcmp(keyz,
598                    "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
599             size_t v = atoi(valz);
600             checkNumeric(valz);
601             validate(v, size_t(1), std::numeric_limits<size_t>::max());
602             getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(
603                     v);
604         } else if (
605             strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") ==
606             0) {
607             size_t v = atoi(valz);
608             checkNumeric(valz);
609             validate(v, size_t(1), std::numeric_limits<size_t>::max());
610             getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(
611                     v);
612         } else {
613             msg = "Unknown config param";
614             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
615         }
616     } catch (std::runtime_error& ex) {
617         msg = "Value out of range.";
618         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
619     }
620
621     return rv;
622 }
623
624 protocol_binary_response_status EventuallyPersistentEngine::setVbucketParam(
625         uint16_t vbucket,
626         const char* keyz,
627         const char* valz,
628         std::string& msg) {
629     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
630     try {
631         if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
632             uint64_t v = std::strtoull(valz, nullptr, 10);
633             checkNumeric(valz);
634             getConfiguration().setHlcDriftAheadThresholdUs(v);
635         } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
636             uint64_t v = std::strtoull(valz, nullptr, 10);
637             checkNumeric(valz);
638             getConfiguration().setHlcDriftBehindThresholdUs(v);
639         } else if (strcmp(keyz, "max_cas") == 0) {
640             uint64_t v = std::strtoull(valz, nullptr, 10);
641             checkNumeric(valz);
642             LOG(EXTENSION_LOG_WARNING, "setVbucketParam: max_cas:%" PRIu64 " "
643                 "vb:%" PRIu16 "\n", v, vbucket);
644             if (getKVBucket()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
645                 rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
646                 msg = "Not my vbucket";
647             }
648         } else {
649             msg = "Unknown config param";
650             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
651         }
652     } catch (std::runtime_error& ex) {
653         msg = "Value out of range.";
654         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
655     }
656     return rv;
657 }
658
659 static protocol_binary_response_status evictKey(
660     EventuallyPersistentEngine* e,
661     protocol_binary_request_header
662     * request,
663     const char** msg,
664     size_t* msg_size,
665     DocNamespace docNamespace) {
666     protocol_binary_request_no_extras* req =
667         (protocol_binary_request_no_extras*)request;
668
669     const uint8_t* keyPtr = reinterpret_cast<const uint8_t*>(request) +
670                             sizeof(*request);
671     size_t keylen = ntohs(req->message.header.request.keylen);
672     uint16_t vbucket = ntohs(request->request.vbucket);
673
674     LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key{%.*s}\n",
675         int(keylen), keyPtr);
676     msg_size = 0;
677     auto rv = e->evictKey(DocKey(keyPtr, keylen, docNamespace), vbucket, msg);
678     if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
679         rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
680         if (e->isDegradedMode()) {
681             return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
682         }
683     }
684     return rv;
685 }
686
687 protocol_binary_response_status EventuallyPersistentEngine::setParam(
688         protocol_binary_request_set_param* req, std::string& msg) {
689     size_t keylen = ntohs(req->message.header.request.keylen);
690     uint8_t extlen = req->message.header.request.extlen;
691     size_t vallen = ntohl(req->message.header.request.bodylen);
692     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
693     protocol_binary_engine_param_t paramtype =
694         static_cast<protocol_binary_engine_param_t>(ntohl(
695             req->message.body.param_type));
696
697     if (keylen == 0 || (vallen - keylen - extlen) == 0) {
698         return PROTOCOL_BINARY_RESPONSE_EINVAL;
699     }
700
701     const char* keyp = reinterpret_cast<const char*>(req->bytes)
702                        + sizeof(req->bytes);
703     const char* valuep = keyp + keylen;
704     vallen -= (keylen + extlen);
705
706     char keyz[128];
707     char valz[512];
708
709     // Read the key.
710     if (keylen >= sizeof(keyz)) {
711         msg = "Key is too large.";
712         return PROTOCOL_BINARY_RESPONSE_EINVAL;
713     }
714     memcpy(keyz, keyp, keylen);
715     keyz[keylen] = 0x00;
716
717     // Read the value.
718     if (vallen >= sizeof(valz)) {
719         msg = "Value is too large.";
720         return PROTOCOL_BINARY_RESPONSE_EINVAL;
721     }
722     memcpy(valz, valuep, vallen);
723     valz[vallen] = 0x00;
724
725     protocol_binary_response_status rv;
726
727     switch (paramtype) {
728     case protocol_binary_engine_param_flush:
729         rv = setFlushParam(keyz, valz, msg);
730         break;
731     case protocol_binary_engine_param_tap:
732         rv = setTapParam(keyz, valz, msg);
733         break;
734     case protocol_binary_engine_param_checkpoint:
735         rv = setCheckpointParam(keyz, valz, msg);
736         break;
737     case protocol_binary_engine_param_dcp:
738         rv = setDcpParam(keyz, valz, msg);
739         break;
740     case protocol_binary_engine_param_vbucket:
741         rv = setVbucketParam(vbucket, keyz, valz, msg);
742         break;
743     default:
744         rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
745     }
746
747     return rv;
748 }
749
750 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine* e,
751                                     const void* cookie,
752                                     protocol_binary_request_header* request,
753                                     ADD_RESPONSE response) {
754     protocol_binary_request_get_vbucket* req =
755         reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
756     if (req == nullptr) {
757         throw std::invalid_argument("getVBucket: Unable to convert req"
758                                         " to protocol_binary_request_get_vbucket");
759     }
760
761     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
762     VBucketPtr vb = e->getVBucket(vbucket);
763     if (!vb) {
764         return e->sendNotMyVBucketResponse(response, cookie, 0);
765     } else {
766         vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
767         return sendResponse(response, NULL, 0, NULL, 0, &state,
768                             sizeof(state),
769                             PROTOCOL_BINARY_RAW_BYTES,
770                             PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
771     }
772 }
773
774 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine* e,
775                                     const void* cookie,
776                                     protocol_binary_request_header* request,
777                                     ADD_RESPONSE response) {
778
779     protocol_binary_request_set_vbucket* req =
780         reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
781
782     uint64_t cas = ntohll(req->message.header.request.cas);
783
784     size_t bodylen = ntohl(req->message.header.request.bodylen)
785                      - ntohs(req->message.header.request.keylen);
786     if (bodylen != sizeof(vbucket_state_t)) {
787         const std::string msg("Incorrect packet format");
788         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
789                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
790                             PROTOCOL_BINARY_RESPONSE_EINVAL,
791                             cas, cookie);
792     }
793
794     vbucket_state_t state;
795     memcpy(&state, &req->message.body.state, sizeof(state));
796     state = static_cast<vbucket_state_t>(ntohl(state));
797
798     if (!is_valid_vbucket_state_t(state)) {
799         const std::string msg("Invalid vbucket state");
800         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
801                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
802                             PROTOCOL_BINARY_RESPONSE_EINVAL,
803                             cas, cookie);
804     }
805
806     uint16_t vb = ntohs(req->message.header.request.vbucket);
807     if (e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
808         const std::string msg("VBucket number too big");
809         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
810                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
811                             PROTOCOL_BINARY_RESPONSE_ERANGE,
812                             cas, cookie);
813     }
814     return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
815                         PROTOCOL_BINARY_RAW_BYTES,
816                         PROTOCOL_BINARY_RESPONSE_SUCCESS,
817                         cas, cookie);
818 }
819
820 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine* e,
821                                     const void* cookie,
822                                     protocol_binary_request_header* req,
823                                     ADD_RESPONSE response) {
824
825     uint64_t cas = ntohll(req->request.cas);
826
827     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
828     uint16_t vbucket = ntohs(req->request.vbucket);
829
830     std::string msg = "";
831     if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
832         msg = "Incorrect packet format.";
833         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
834                             msg.length(),
835                             PROTOCOL_BINARY_RAW_BYTES,
836                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
837     }
838
839     bool sync = false;
840     uint32_t bodylen = ntohl(req->request.bodylen);
841     if (bodylen > 0) {
842         const char* ptr = reinterpret_cast<const char*>(req->bytes) +
843                           sizeof(req->bytes);
844         if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
845             sync = true;
846         }
847     }
848
849     ENGINE_ERROR_CODE err;
850     void* es = e->getEngineSpecific(cookie);
851     if (sync) {
852         if (es == NULL) {
853             err = e->deleteVBucket(vbucket, cookie);
854             e->storeEngineSpecific(cookie, e);
855         } else {
856             e->storeEngineSpecific(cookie, NULL);
857             LOG(EXTENSION_LOG_INFO,
858                 "Completed sync deletion of vbucket %u",
859                 (unsigned)vbucket);
860             err = ENGINE_SUCCESS;
861         }
862     } else {
863         err = e->deleteVBucket(vbucket);
864     }
865     switch (err) {
866     case ENGINE_SUCCESS:
867         LOG(EXTENSION_LOG_NOTICE,
868             "Deletion of vbucket %d was completed.", vbucket);
869         break;
870     case ENGINE_NOT_MY_VBUCKET:
871         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
872             "because the vbucket doesn't exist!!!", vbucket);
873         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
874         break;
875     case ENGINE_EINVAL:
876         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
877             "because the vbucket is not in a dead state\n", vbucket);
878         msg = "Failed to delete vbucket.  Must be in the dead state.";
879         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
880         break;
881     case ENGINE_EWOULDBLOCK:
882         LOG(EXTENSION_LOG_NOTICE, "Request for vbucket %d deletion is in"
883                 " EWOULDBLOCK until the database file is removed from disk",
884             vbucket);
885         e->storeEngineSpecific(cookie, req);
886         return ENGINE_EWOULDBLOCK;
887     default:
888         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
889             "because of unknown reasons\n", vbucket);
890         msg = "Failed to delete vbucket.  Unknown reason.";
891         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
892     }
893
894     if (err != ENGINE_NOT_MY_VBUCKET) {
895         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
896                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
897                             res, cas, cookie);
898     } else {
899         return e->sendNotMyVBucketResponse(response, cookie, cas);
900     }
901 }
902
903 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine* e,
904                                        protocol_binary_request_header* request,
905                                        const void* cookie,
906                                        Item** it,
907                                        const char** msg,
908                                        protocol_binary_response_status* res,
909                                        DocNamespace docNamespace) {
910     KVBucketIface* kvb = e->getKVBucket();
911     protocol_binary_request_no_extras* req =
912         (protocol_binary_request_no_extras*)request;
913     int keylen = ntohs(req->message.header.request.keylen);
914     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
915     ENGINE_ERROR_CODE error_code;
916     DocKey key(reinterpret_cast<const uint8_t*>(request) + sizeof(*request),
917                keylen, docNamespace);
918
919     GetValue rv(kvb->getReplica(key, vbucket, cookie));
920
921     if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
922         if (error_code == ENGINE_NOT_MY_VBUCKET) {
923             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
924             return error_code;
925         } else if (error_code == ENGINE_TMPFAIL) {
926             *msg = "NOT_FOUND";
927             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
928         } else {
929             return error_code;
930         }
931     } else {
932         *it = rv.getValue();
933         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
934     }
935     ++(e->getEpStats().numOpsGet);
936     return ENGINE_SUCCESS;
937 }
938
939 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine* e,
940                                    const void* cookie,
941                                    protocol_binary_request_compact_db* req,
942                                    ADD_RESPONSE response) {
943
944     std::string msg = "";
945     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
946     compaction_ctx compactreq;
947     uint64_t cas = ntohll(req->message.header.request.cas);
948
949     if (ntohs(req->message.header.request.keylen) > 0 ||
950         req->message.header.request.extlen != 24) {
951         LOG(EXTENSION_LOG_WARNING,
952             "Compaction received bad ext/key len %d/%d.",
953             req->message.header.request.extlen,
954             ntohs(req->message.header.request.keylen));
955         msg = "Incorrect packet format.";
956         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
957                             msg.length(),
958                             PROTOCOL_BINARY_RAW_BYTES,
959                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
960     }
961     EPStats& stats = e->getEpStats();
962     compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
963     compactreq.purge_before_seq =
964         ntohll(req->message.body.purge_before_seq);
965     compactreq.drop_deletes = req->message.body.drop_deletes;
966     compactreq.db_file_id = e->getKVBucket()->getDBFileId(*req);
967     uint16_t vbid = ntohs(req->message.header.request.vbucket);
968
969     ENGINE_ERROR_CODE err;
970     void* es = e->getEngineSpecific(cookie);
971     if (es == NULL) {
972         ++stats.pendingCompactions;
973         e->storeEngineSpecific(cookie, e);
974         err = e->compactDB(vbid, compactreq, cookie);
975     } else {
976         e->storeEngineSpecific(cookie, NULL);
977         err = ENGINE_SUCCESS;
978     }
979
980     switch (err) {
981     case ENGINE_SUCCESS:
982         LOG(EXTENSION_LOG_NOTICE,
983             "Compaction of db file id: %d completed.", compactreq.db_file_id);
984         break;
985     case ENGINE_NOT_MY_VBUCKET:
986         --stats.pendingCompactions;
987         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
988             "because the db file doesn't exist!!!", compactreq.db_file_id);
989         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
990         break;
991     case ENGINE_EINVAL:
992         --stats.pendingCompactions;
993         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
994             "because of an invalid argument", compactreq.db_file_id);
995         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
996         break;
997     case ENGINE_EWOULDBLOCK:
998         LOG(EXTENSION_LOG_NOTICE,
999             "Compaction of db file id: %d scheduled "
1000                 "(awaiting completion).", compactreq.db_file_id);
1001         e->storeEngineSpecific(cookie, req);
1002         return ENGINE_EWOULDBLOCK;
1003     case ENGINE_TMPFAIL:
1004         LOG(EXTENSION_LOG_WARNING, "Request to compact db file id: %d hit"
1005                 " a temporary failure and may need to be retried",
1006             compactreq.db_file_id);
1007         msg = "Temporary failure in compacting db file.";
1008         res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1009         break;
1010     default:
1011         --stats.pendingCompactions;
1012         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1013             "because of unknown reasons\n", compactreq.db_file_id);
1014         msg = "Failed to compact db file.  Unknown reason.";
1015         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1016         break;
1017     }
1018
1019     if (err != ENGINE_NOT_MY_VBUCKET) {
1020         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1021                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1022                             res, cas, cookie);
1023     } else {
1024         return e->sendNotMyVBucketResponse(response, cookie, cas);
1025     }
1026 }
1027
1028 static ENGINE_ERROR_CODE processUnknownCommand(
1029     EventuallyPersistentEngine* h,
1030     const void* cookie,
1031     protocol_binary_request_header* request,
1032     ADD_RESPONSE response,
1033     DocNamespace docNamespace) {
1034     protocol_binary_response_status res =
1035         PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1036     std::string dynamic_msg;
1037     const char* msg = NULL;
1038     size_t msg_size = 0;
1039     Item* itm = NULL;
1040
1041     EPStats& stats = h->getEpStats();
1042     ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1043
1044     /**
1045      * Session validation
1046      * (For ns_server commands only)
1047      */
1048     switch (request->request.opcode) {
1049     case PROTOCOL_BINARY_CMD_SET_PARAM:
1050     case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1051     case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1052     case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1053     case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1054     case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1055     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1056         if (h->getEngineSpecific(cookie) == NULL) {
1057             uint64_t cas = ntohll(request->request.cas);
1058             if (!h->validateSessionCas(cas)) {
1059                 const std::string message("Invalid session token");
1060                 return sendResponse(response, NULL, 0, NULL, 0,
1061                                     message.c_str(), message.length(),
1062                                     PROTOCOL_BINARY_RAW_BYTES,
1063                                     PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1064                                     cas, cookie);
1065             }
1066         }
1067         break;
1068     }
1069     default:
1070         break;
1071     }
1072
1073     switch (request->request.opcode) {
1074     case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1075         return h->getAllVBucketSequenceNumbers(cookie, request, response);
1076
1077     case PROTOCOL_BINARY_CMD_GET_VBUCKET: {
1078         BlockTimer timer(&stats.getVbucketCmdHisto);
1079         rv = getVBucket(h, cookie, request, response);
1080         return rv;
1081     }
1082     case PROTOCOL_BINARY_CMD_DEL_VBUCKET: {
1083         BlockTimer timer(&stats.delVbucketCmdHisto);
1084         rv = delVBucket(h, cookie, request, response);
1085         if (rv != ENGINE_EWOULDBLOCK) {
1086             h->decrementSessionCtr();
1087             h->storeEngineSpecific(cookie, NULL);
1088         }
1089         return rv;
1090     }
1091     case PROTOCOL_BINARY_CMD_SET_VBUCKET: {
1092         BlockTimer timer(&stats.setVbucketCmdHisto);
1093         rv = setVBucket(h, cookie, request, response);
1094         h->decrementSessionCtr();
1095         return rv;
1096     }
1097     case PROTOCOL_BINARY_CMD_TOUCH:
1098     case PROTOCOL_BINARY_CMD_GAT:
1099     case PROTOCOL_BINARY_CMD_GATQ: {
1100         rv = h->touch(cookie, request, response, docNamespace);
1101         return rv;
1102     }
1103     case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1104         res = h->stopFlusher(&msg, &msg_size);
1105         break;
1106     case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1107         res = h->startFlusher(&msg, &msg_size);
1108         break;
1109     case PROTOCOL_BINARY_CMD_SET_PARAM:
1110         res = h->setParam(
1111                 reinterpret_cast<protocol_binary_request_set_param*>(request),
1112                 dynamic_msg);
1113         msg = dynamic_msg.c_str();
1114         msg_size = dynamic_msg.length();
1115         h->decrementSessionCtr();
1116         break;
1117     case PROTOCOL_BINARY_CMD_EVICT_KEY:
1118         res = evictKey(h, request, &msg, &msg_size, docNamespace);
1119         break;
1120     case PROTOCOL_BINARY_CMD_OBSERVE:
1121         return h->observe(cookie, request, response, docNamespace);
1122     case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1123         return h->observe_seqno(cookie, request, response);
1124     case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT: {
1125         rv = h->deregisterTapClient(cookie, request, response);
1126         h->decrementSessionCtr();
1127         return rv;
1128     }
1129     case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN: {
1130         rv = h->resetReplicationChain(cookie, request, response);
1131         return rv;
1132     }
1133     case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER: {
1134         rv = h->changeTapVBFilter(cookie, request, response);
1135         h->decrementSessionCtr();
1136         return rv;
1137     }
1138     case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1139     case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1140     case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE: {
1141         rv = h->handleCheckpointCmds(cookie, request, response);
1142         return rv;
1143     }
1144     case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE: {
1145         rv = h->handleSeqnoCmds(cookie, request, response);
1146         return rv;
1147     }
1148     case PROTOCOL_BINARY_CMD_GET_META:
1149     case PROTOCOL_BINARY_CMD_GETQ_META: {
1150         rv = h->getMeta(cookie,
1151                         reinterpret_cast<protocol_binary_request_get_meta*>
1152                         (request), response,
1153                         docNamespace);
1154         return rv;
1155     }
1156     case PROTOCOL_BINARY_CMD_SET_WITH_META:
1157     case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1158     case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1159     case PROTOCOL_BINARY_CMD_ADDQ_WITH_META: {
1160         rv = h->setWithMeta(cookie,
1161                             reinterpret_cast<protocol_binary_request_set_with_meta*>
1162                             (request), response,
1163                             docNamespace);
1164         return rv;
1165     }
1166     case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1167     case PROTOCOL_BINARY_CMD_DELQ_WITH_META: {
1168         rv = h->deleteWithMeta(cookie,
1169                                reinterpret_cast<protocol_binary_request_delete_with_meta*>
1170                                (request), response,
1171                                docNamespace);
1172         return rv;
1173     }
1174     case PROTOCOL_BINARY_CMD_RETURN_META: {
1175         return h->returnMeta(cookie,
1176                              reinterpret_cast<protocol_binary_request_return_meta*>
1177                              (request), response,
1178                              docNamespace);
1179     }
1180     case PROTOCOL_BINARY_CMD_GET_REPLICA:
1181         rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res, docNamespace);
1182         if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1183             return rv;
1184         }
1185         break;
1186     case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1187     case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC: {
1188         rv = h->handleTrafficControlCmd(cookie, request, response);
1189         return rv;
1190     }
1191     case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG: {
1192         rv = h->setClusterConfig(cookie,
1193                                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1194                                  (request), response);
1195         h->decrementSessionCtr();
1196         return rv;
1197     }
1198     case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1199         return h->getClusterConfig(cookie,
1200                                    reinterpret_cast<protocol_binary_request_get_cluster_config*>
1201                                    (request), response);
1202     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1203         rv = compactDB(h, cookie,
1204                        (protocol_binary_request_compact_db*)(request),
1205                        response);
1206         if (rv != ENGINE_EWOULDBLOCK) {
1207             h->decrementSessionCtr();
1208             h->storeEngineSpecific(cookie, NULL);
1209         }
1210         return rv;
1211     }
1212     case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY: {
1213         if (request->request.extlen != 0 ||
1214             request->request.keylen != 0 ||
1215             request->request.bodylen != 0) {
1216             return ENGINE_EINVAL;
1217         }
1218         return h->getRandomKey(cookie, response);
1219     }
1220     case PROTOCOL_BINARY_CMD_GET_KEYS: {
1221         return h->getAllKeys(cookie,
1222                              reinterpret_cast<protocol_binary_request_get_keys*>
1223                              (request), response,
1224                              docNamespace);
1225     }
1226         // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
1227     case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1228     case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE: {
1229         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
1230                             PROTOCOL_BINARY_RAW_BYTES,
1231                             PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
1232                             cookie);
1233     }
1234     }
1235
1236     if (itm) {
1237         uint32_t flags = itm->getFlags();
1238         rv = sendResponse(response,
1239                           static_cast<const void*>(itm->getKey().data()),
1240                           itm->getKey().size(),
1241                           (const void*)&flags, sizeof(uint32_t),
1242                           static_cast<const void*>(itm->getData()),
1243                           itm->getNBytes(), itm->getDataType(),
1244                           static_cast<uint16_t>(res), itm->getCas(),
1245                           cookie);
1246         delete itm;
1247     } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1248         return h->sendNotMyVBucketResponse(response, cookie, 0);
1249     } else {
1250         msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1251         rv = sendResponse(response, NULL, 0, NULL, 0,
1252                           msg, static_cast<uint16_t>(msg_size),
1253                           PROTOCOL_BINARY_RAW_BYTES,
1254                           static_cast<uint16_t>(res), 0, cookie);
1255
1256     }
1257     return rv;
1258 }
1259
1260 static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1261                                            const void* cookie,
1262                                            protocol_binary_request_header
1263                                            * request,
1264                                            ADD_RESPONSE response,
1265                                            DocNamespace doc_namespace) {
1266     auto engine = acquireEngine(handle);
1267     auto ret = processUnknownCommand(
1268             engine.get(), cookie, request, response, doc_namespace);
1269     return ret;
1270 }
1271
1272 static void EvpItemSetCas(ENGINE_HANDLE*, const void*,
1273                           item* itm, uint64_t cas) {
1274     static_cast<Item*>(itm)->setCas(cas);
1275 }
1276
1277 static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1278                                       const void* cookie,
1279                                       void* engine_specific,
1280                                       uint16_t nengine,
1281                                       uint8_t ttl,
1282                                       uint16_t tap_flags,
1283                                       tap_event_t tap_event,
1284                                       uint32_t tap_seqno,
1285                                       const void* key,
1286                                       size_t nkey,
1287                                       uint32_t flags,
1288                                       uint32_t exptime,
1289                                       uint64_t cas,
1290                                       uint8_t datatype,
1291                                       const void* data,
1292                                       size_t ndata,
1293                                       uint16_t vbucket) {
1294     if (!mcbp::datatype::is_valid(datatype)) {
1295         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1296             " (TapNotify)");
1297         return ENGINE_EINVAL;
1298     }
1299
1300     return acquireEngine(handle)->tapNotify(cookie,
1301                                             engine_specific,
1302                                             nengine,
1303                                             ttl,
1304                                             tap_flags,
1305                                             (uint16_t)tap_event,
1306                                             tap_seqno,
1307                                             key,
1308                                             nkey,
1309                                             flags,
1310                                             exptime,
1311                                             cas,
1312                                             datatype,
1313                                             data,
1314                                             ndata,
1315                                             vbucket);
1316 }
1317
1318 static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1319                                   const void* cookie, item** itm,
1320                                   void** es, uint16_t* nes, uint8_t* ttl,
1321                                   uint16_t* flags, uint32_t* seqno,
1322                                   uint16_t* vbucket) {
1323     uint16_t tap_event = acquireEngine(handle)->walkTapQueue(
1324             cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1325     return static_cast<tap_event_t>(tap_event);
1326 }
1327
1328 static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1329                                       const void* cookie,
1330                                       const void* client,
1331                                       size_t nclient,
1332                                       uint32_t flags,
1333                                       const void* userdata,
1334                                       size_t nuserdata) {
1335     auto engine = acquireEngine(handle);
1336     TAP_ITERATOR iterator = NULL;
1337     {
1338         std::string c(static_cast<const char*>(client), nclient);
1339         // Figure out what we want from the userdata before adding it to
1340         // the API to the handle
1341         if (engine->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1342             iterator = EvpTapIterator;
1343         }
1344     }
1345     return iterator;
1346 }
1347
1348
1349 static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1350                                     const void* cookie,
1351                                     struct dcp_message_producers* producers) {
1352     auto engine = acquireEngine(handle);
1353     ConnHandler* conn = engine->getConnHandler(cookie);
1354     if (conn) {
1355         return conn->step(producers);
1356     }
1357     return ENGINE_DISCONNECT;
1358 }
1359
1360
1361 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1362                                     const void* cookie,
1363                                     uint32_t opaque,
1364                                     uint32_t seqno,
1365                                     uint32_t flags,
1366                                     void* name,
1367                                     uint16_t nname) {
1368     return acquireEngine(handle)->dcpOpen(
1369             cookie, opaque, seqno, flags, name, nname);
1370 }
1371
1372 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1373                                          const void* cookie,
1374                                          uint32_t opaque,
1375                                          uint16_t vbucket,
1376                                          uint32_t flags) {
1377     return acquireEngine(handle)->dcpAddStream(cookie, opaque, vbucket, flags);
1378 }
1379
1380 static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1381                                            const void* cookie,
1382                                            uint32_t opaque,
1383                                            uint16_t vbucket) {
1384     auto engine = acquireEngine(handle);
1385     ConnHandler* conn = engine->getConnHandler(cookie);
1386     if (conn) {
1387         return conn->closeStream(opaque, vbucket);
1388     }
1389     return ENGINE_DISCONNECT;
1390 }
1391
1392
1393 static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1394                                          const void* cookie,
1395                                          uint32_t flags,
1396                                          uint32_t opaque,
1397                                          uint16_t vbucket,
1398                                          uint64_t startSeqno,
1399                                          uint64_t endSeqno,
1400                                          uint64_t vbucketUuid,
1401                                          uint64_t snapStartSeqno,
1402                                          uint64_t snapEndSeqno,
1403                                          uint64_t* rollbackSeqno,
1404                                          dcp_add_failover_log callback) {
1405     auto engine = acquireEngine(handle);
1406     ConnHandler* conn = engine->getConnHandler(cookie);
1407     if (conn) {
1408         return conn->streamRequest(flags,
1409                                    opaque,
1410                                    vbucket,
1411                                    startSeqno,
1412                                    endSeqno,
1413                                    vbucketUuid,
1414                                    snapStartSeqno,
1415                                    snapEndSeqno,
1416                                    rollbackSeqno,
1417                                    callback);
1418     }
1419     return ENGINE_DISCONNECT;
1420 }
1421
1422 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1423                                               const void* cookie,
1424                                               uint32_t opaque,
1425                                               uint16_t vbucket,
1426                                               dcp_add_failover_log callback) {
1427     auto engine = acquireEngine(handle);
1428     ConnHandler* conn = engine->getConnHandler(cookie);
1429     if (conn) {
1430         return conn->getFailoverLog(opaque, vbucket, callback);
1431     }
1432     return ENGINE_DISCONNECT;
1433 }
1434
1435
1436 static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1437                                          const void* cookie,
1438                                          uint32_t opaque,
1439                                          uint16_t vbucket,
1440                                          uint32_t flags) {
1441     auto engine = acquireEngine(handle);
1442     ConnHandler* conn = engine->getConnHandler(cookie);
1443     if (conn) {
1444         return conn->streamEnd(opaque, vbucket, flags);
1445     }
1446     return ENGINE_DISCONNECT;
1447 }
1448
1449
1450 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1451                                               const void* cookie,
1452                                               uint32_t opaque,
1453                                               uint16_t vbucket,
1454                                               uint64_t start_seqno,
1455                                               uint64_t end_seqno,
1456                                               uint32_t flags) {
1457     auto engine = acquireEngine(handle);
1458     ConnHandler* conn = engine->getConnHandler(cookie);
1459     if (conn) {
1460         return conn->snapshotMarker(
1461                 opaque, vbucket, start_seqno, end_seqno, flags);
1462     }
1463     return ENGINE_DISCONNECT;
1464 }
1465
1466 static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1467                                         const void* cookie,
1468                                         uint32_t opaque,
1469                                         const DocKey& key,
1470                                         cb::const_byte_buffer value,
1471                                         size_t priv_bytes,
1472                                         uint8_t datatype,
1473                                         uint64_t cas,
1474                                         uint16_t vbucket,
1475                                         uint32_t flags,
1476                                         uint64_t by_seqno,
1477                                         uint64_t rev_seqno,
1478                                         uint32_t expiration,
1479                                         uint32_t lock_time,
1480                                         cb::const_byte_buffer meta,
1481                                         uint8_t nru) {
1482     if (!mcbp::datatype::is_valid(datatype)) {
1483         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1484             " (DCPMutation)");
1485         return ENGINE_EINVAL;
1486     }
1487     auto engine = acquireEngine(handle);
1488     ConnHandler* conn = engine->getConnHandler(cookie);
1489     if (conn) {
1490         return conn->mutation(opaque, key, value, priv_bytes, datatype, cas,
1491                               vbucket, flags, by_seqno, rev_seqno, expiration,
1492                               lock_time, meta, nru);
1493     }
1494     return ENGINE_DISCONNECT;
1495 }
1496
1497 static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1498                                         const void* cookie,
1499                                         uint32_t opaque,
1500                                         const DocKey& key,
1501                                         cb::const_byte_buffer value,
1502                                         size_t priv_bytes,
1503                                         uint8_t datatype,
1504                                         uint64_t cas,
1505                                         uint16_t vbucket,
1506                                         uint64_t by_seqno,
1507                                         uint64_t rev_seqno,
1508                                         cb::const_byte_buffer meta) {
1509     auto engine = acquireEngine(handle);
1510     ConnHandler* conn = engine->getConnHandler(cookie);
1511     if (conn) {
1512         return conn->deletion(opaque, key, value, priv_bytes, datatype, cas,
1513                               vbucket, by_seqno, rev_seqno, meta);
1514     }
1515     return ENGINE_DISCONNECT;
1516 }
1517
1518 static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1519                                           const void* cookie,
1520                                           uint32_t opaque,
1521                                           const DocKey& key,
1522                                           cb::const_byte_buffer value,
1523                                           size_t priv_bytes,
1524                                           uint8_t datatype,
1525                                           uint64_t cas,
1526                                           uint16_t vbucket,
1527                                           uint64_t by_seqno,
1528                                           uint64_t rev_seqno,
1529                                           cb::const_byte_buffer meta) {
1530     auto engine = acquireEngine(handle);
1531     ConnHandler* conn = engine->getConnHandler(cookie);
1532     if (conn) {
1533         return conn->expiration(opaque, key, value, priv_bytes, datatype, cas,
1534                                 vbucket, by_seqno, rev_seqno, meta);
1535     }
1536     return ENGINE_DISCONNECT;
1537 }
1538
1539 static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1540                                      const void* cookie,
1541                                      uint32_t opaque,
1542                                      uint16_t vbucket) {
1543     auto engine = acquireEngine(handle);
1544     ConnHandler* conn = engine->getConnHandler(cookie);
1545     if (conn) {
1546         return conn->flushall(opaque, vbucket);
1547     }
1548     return ENGINE_DISCONNECT;
1549 }
1550
1551 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1552                                                const void* cookie,
1553                                                uint32_t opaque,
1554                                                uint16_t vbucket,
1555                                                vbucket_state_t state) {
1556     auto engine = acquireEngine(handle);
1557     ConnHandler* conn = engine->getConnHandler(cookie);
1558     if (conn) {
1559         return conn->setVBucketState(opaque, vbucket, state);
1560     }
1561     return ENGINE_DISCONNECT;
1562 }
1563
1564 static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1565                                     const void* cookie,
1566                                     uint32_t opaque) {
1567     auto engine = acquireEngine(handle);
1568     ConnHandler* conn = engine->getConnHandler(cookie);
1569     if (conn) {
1570         return conn->noop(opaque);
1571     }
1572     return ENGINE_DISCONNECT;
1573 }
1574
1575 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1576                                                      const void* cookie,
1577                                                      uint32_t opaque,
1578                                                      uint16_t vbucket,
1579                                                      uint32_t buffer_bytes) {
1580     auto engine = acquireEngine(handle);
1581     ConnHandler* conn = engine->getConnHandler(cookie);
1582     if (conn) {
1583         return conn->bufferAcknowledgement(opaque, vbucket, buffer_bytes);
1584     }
1585     return ENGINE_DISCONNECT;
1586 }
1587
1588 static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1589                                        const void* cookie,
1590                                        uint32_t opaque,
1591                                        const void* key,
1592                                        uint16_t nkey,
1593                                        const void* value,
1594                                        uint32_t nvalue) {
1595     auto engine = acquireEngine(handle);
1596     ConnHandler* conn = engine->getConnHandler(cookie);
1597     if (conn) {
1598         return conn->control(opaque, key, nkey, value, nvalue);
1599     }
1600     return ENGINE_DISCONNECT;
1601 }
1602
1603 static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1604                                                const void* cookie,
1605                                                protocol_binary_response_header* response) {
1606     auto engine = acquireEngine(handle);
1607     ConnHandler* conn = engine->getConnHandler(cookie);
1608     if (conn) {
1609         if (conn->handleResponse(response)) {
1610             return ENGINE_SUCCESS;
1611         }
1612     }
1613     return ENGINE_DISCONNECT;
1614 }
1615
1616 static void EvpHandleDisconnect(const void* cookie,
1617                                 ENGINE_EVENT_TYPE type,
1618                                 const void* event_data,
1619                                 const void* cb_data) {
1620     if (type != ON_DISCONNECT) {
1621         throw std::invalid_argument("EvpHandleDisconnect: type "
1622                                         "(which is" + std::to_string(type) +
1623                                     ") is not ON_DISCONNECT");
1624     }
1625     if (event_data != nullptr) {
1626         throw std::invalid_argument("EvpHandleDisconnect: event_data "
1627                                         "is not NULL");
1628     }
1629     void* c = const_cast<void*>(cb_data);
1630     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1631 }
1632
1633 static void EvpHandleDeleteBucket(const void* cookie,
1634                                   ENGINE_EVENT_TYPE type,
1635                                   const void* event_data,
1636                                   const void* cb_data) {
1637     if (type != ON_DELETE_BUCKET) {
1638         throw std::invalid_argument("EvpHandleDeleteBucket: type "
1639                                         "(which is" + std::to_string(type) +
1640                                     ") is not ON_DELETE_BUCKET");
1641     }
1642     if (event_data != nullptr) {
1643         throw std::invalid_argument("EvpHandleDeleteBucket: event_data "
1644                                         "is not NULL");
1645     }
1646     void* c = const_cast<void*>(cb_data);
1647     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDeleteBucket(cookie);
1648 }
1649
1650 void EvpSetLogLevel(ENGINE_HANDLE* handle, EXTENSION_LOG_LEVEL level) {
1651     Logger::setGlobalLogLevel(level);
1652 }
1653
1654 /**
1655  * The only public interface to the eventually persistent engine.
1656  * Allocate a new instance and initialize it
1657  * @param interface the highest interface the server supports (we only
1658  *                  support interface 1)
1659  * @param get_server_api callback function to get the server exported API
1660  *                  functions
1661  * @param handle Where to return the new instance
1662  * @return ENGINE_SUCCESS on success
1663  */
1664 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1665                                   GET_SERVER_API get_server_api,
1666                                   ENGINE_HANDLE** handle) {
1667     SERVER_HANDLE_V1* api = get_server_api();
1668     if (interface != 1 || api == NULL) {
1669         return ENGINE_ENOTSUP;
1670     }
1671
1672     Logger::setLoggerAPI(api->log);
1673
1674     MemoryTracker::getInstance(*api->alloc_hooks);
1675     ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1676
1677     std::atomic<size_t>* inital_tracking = new std::atomic<size_t>();
1678
1679     ObjectRegistry::setStats(inital_tracking);
1680     EventuallyPersistentEngine* engine;
1681     engine = new EventuallyPersistentEngine(get_server_api);
1682     ObjectRegistry::setStats(NULL);
1683
1684     if (engine == NULL) {
1685         return ENGINE_ENOMEM;
1686     }
1687
1688     if (MemoryTracker::trackingMemoryAllocations()) {
1689         engine->getEpStats().memoryTrackerEnabled.store(true);
1690         engine->getEpStats().totalMemory->store(inital_tracking->load());
1691     }
1692     delete inital_tracking;
1693
1694     initialize_time_functions(api->core);
1695
1696     *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1697
1698     return ENGINE_SUCCESS;
1699 }
1700
1701 /*
1702     This method is called prior to unloading of the shared-object.
1703     Global clean-up should be performed from this method.
1704 */
1705 void destroy_engine() {
1706     ExecutorPool::shutdown();
1707     // A single MemoryTracker exists for *all* buckets
1708     // and must be destroyed before unloading the shared object.
1709     MemoryTracker::destroyInstance();
1710     ObjectRegistry::reset();
1711 }
1712
1713 static bool EvpGetItemInfo(ENGINE_HANDLE* handle, const void*,
1714                            const item* itm, item_info* itm_info) {
1715     const Item* it = reinterpret_cast<const Item*>(itm);
1716     auto engine = acquireEngine(handle);
1717     VBucketPtr vb = engine->getKVBucket()->getVBucket(it->getVBucketId());
1718     uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
1719     *itm_info = it->toItemInfo(vb_uuid);
1720     return true;
1721 }
1722
1723 static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1724                            item* itm, const item_info* itm_info) {
1725     Item* it = reinterpret_cast<Item*>(itm);
1726     if (!it) {
1727         return false;
1728     }
1729     it->setDataType(itm_info->datatype);
1730     return true;
1731 }
1732
1733 static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1734                                              const void* cookie,
1735                                              engine_get_vb_map_cb callback) {
1736     auto engine = acquireEngine(handle);
1737     LockHolder lh(engine->clusterConfig.lock);
1738     const char* config = engine->clusterConfig.config.data();
1739     uint32_t len = engine->clusterConfig.config.size();
1740     engine.reset(); // Want to release the engine before the callback
1741     return callback(cookie, config, len);
1742 }
1743
1744 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1745     va_list va;
1746     va_start(va, fmt);
1747     global_logger.vlog(severity, fmt, va);
1748     va_end(va);
1749 }
1750
1751 EventuallyPersistentEngine::EventuallyPersistentEngine(
1752         GET_SERVER_API get_server_api)
1753     : clusterConfig(),
1754       kvBucket(nullptr),
1755       workload(NULL),
1756       workloadPriority(NO_BUCKET_PRIORITY),
1757       replicationThrottle(NULL),
1758       getServerApiFunc(get_server_api),
1759       dcpConnMap_(NULL),
1760       dcpFlowControlManager_(NULL),
1761       tapConnMap(NULL),
1762       tapConfig(NULL),
1763       checkpointConfig(NULL),
1764       trafficEnabled(false),
1765       deleteAllEnabled(false),
1766       startupTime(0),
1767       taskable(this) {
1768     interface.interface = 1;
1769     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1770     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1771     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1772     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1773     ENGINE_HANDLE_V1::allocate_ex = EvpItemAllocateEx;
1774     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1775     ENGINE_HANDLE_V1::release = EvpItemRelease;
1776     ENGINE_HANDLE_V1::get = EvpGet;
1777     ENGINE_HANDLE_V1::get_if = EvpGetIf;
1778     ENGINE_HANDLE_V1::get_locked = EvpGetLocked;
1779     ENGINE_HANDLE_V1::unlock = EvpUnlock;
1780     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1781     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1782     ENGINE_HANDLE_V1::store = EvpStore;
1783     ENGINE_HANDLE_V1::flush = EvpFlush;
1784     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1785     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1786     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1787     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1788     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1789     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1790     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1791
1792     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1793     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1794     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1795     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1796     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1797     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1798     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1799     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1800     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1801     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1802     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1803     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1804     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1805     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1806     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1807     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1808     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1809     ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
1810
1811     serverApi = getServerApiFunc();
1812     memset(&info, 0, sizeof(info));
1813     info.info.description = "EP engine v" VERSION;
1814     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1815     info.info.features[info.info.num_features++].feature =
1816                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1817     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1818     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1819 }
1820
1821 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1822 {
1823     EventuallyPersistentEngine *epe =
1824                                     ObjectRegistry::onSwitchThread(NULL, true);
1825     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1826     ObjectRegistry::onSwitchThread(epe);
1827     return rv;
1828 }
1829
1830 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1831 {
1832     EventuallyPersistentEngine *epe =
1833                                     ObjectRegistry::onSwitchThread(NULL, true);
1834     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1835     ObjectRegistry::onSwitchThread(epe);
1836     return rv;
1837 }
1838
1839 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1840                                                         EVENT_CALLBACK cb,
1841                                                         const void *cb_data) {
1842     EventuallyPersistentEngine *epe =
1843                                     ObjectRegistry::onSwitchThread(NULL, true);
1844     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1845     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1846                             type, cb, cb_data);
1847     ObjectRegistry::onSwitchThread(epe);
1848 }
1849
1850 /**
1851  * A configuration value changed listener that responds to ep-engine
1852  * parameter changes by invoking engine-specific methods on
1853  * configuration change events.
1854  */
1855 class EpEngineValueChangeListener : public ValueChangedListener {
1856 public:
1857     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1858         // EMPTY
1859     }
1860
1861     virtual void sizeValueChanged(const std::string &key, size_t value) {
1862         if (key.compare("getl_max_timeout") == 0) {
1863             engine.setGetlMaxTimeout(value);
1864         } else if (key.compare("getl_default_timeout") == 0) {
1865             engine.setGetlDefaultTimeout(value);
1866         } else if (key.compare("max_item_size") == 0) {
1867             engine.setMaxItemSize(value);
1868         } else if (key.compare("max_item_privileged_bytes") == 0) {
1869             engine.setMaxItemPrivilegedBytes(value);
1870         }
1871     }
1872
1873     virtual void booleanValueChanged(const std::string &key, bool value) {
1874         if (key.compare("flushall_enabled") == 0) {
1875             engine.setDeleteAll(value);
1876         }
1877     }
1878 private:
1879     EventuallyPersistentEngine &engine;
1880 };
1881
1882
1883
1884 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1885     resetStats();
1886     if (config != NULL) {
1887         if (!configuration.parseConfiguration(config, serverApi)) {
1888             LOG(EXTENSION_LOG_NOTICE, "Failed to parse the configuration config "
1889                 "during bucket initialization.  config=%s", config);
1890             return ENGINE_FAILED;
1891         }
1892     }
1893
1894     name = configuration.getCouchBucket();
1895     maxFailoverEntries = configuration.getMaxFailoverEntries();
1896
1897     // Start updating the variables from the config!
1898     HashTable::setDefaultNumBuckets(configuration.getHtSize());
1899     HashTable::setDefaultNumLocks(configuration.getHtLocks());
1900     StoredValue::setMutationMemoryThreshold(
1901                                       configuration.getMutationMemThreshold());
1902
1903     if (configuration.getMaxSize() == 0) {
1904         configuration.setMaxSize(std::numeric_limits<size_t>::max());
1905     }
1906
1907     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1908         stats.mem_low_wat_percent.store(0.75);
1909         configuration.setMemLowWat(percentOf(
1910                 configuration.getMaxSize(), stats.mem_low_wat_percent.load()));
1911     }
1912
1913     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1914         stats.mem_high_wat_percent.store(0.85);
1915         configuration.setMemHighWat(percentOf(
1916                 configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
1917     }
1918
1919     maxItemSize = configuration.getMaxItemSize();
1920     configuration.addValueChangedListener("max_item_size",
1921                                        new EpEngineValueChangeListener(*this));
1922
1923     maxItemPrivilegedBytes = configuration.getMaxItemPrivilegedBytes();
1924     configuration.addValueChangedListener(
1925             "max_item_privileged_bytes",
1926             new EpEngineValueChangeListener(*this));
1927
1928     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1929     configuration.addValueChangedListener("getl_default_timeout",
1930                                        new EpEngineValueChangeListener(*this));
1931     getlMaxTimeout = configuration.getGetlMaxTimeout();
1932     configuration.addValueChangedListener("getl_max_timeout",
1933                                        new EpEngineValueChangeListener(*this));
1934
1935     deleteAllEnabled = configuration.isFlushallEnabled();
1936     configuration.addValueChangedListener("flushall_enabled",
1937                                        new EpEngineValueChangeListener(*this));
1938
1939     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
1940                                   configuration.getMaxNumShards());
1941     if ((unsigned int)workload->getNumShards() >
1942                                               configuration.getMaxVbuckets()) {
1943         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
1944             "equal or less than max number of vbuckets");
1945         return ENGINE_FAILED;
1946     }
1947
1948     dcpConnMap_ = new DcpConnMap(*this);
1949
1950     /* Get the flow control policy */
1951     std::string flowCtlPolicy = configuration.getDcpFlowControlPolicy();
1952
1953     if (!flowCtlPolicy.compare("static")) {
1954         dcpFlowControlManager_ = new DcpFlowControlManagerStatic(*this);
1955     } else if (!flowCtlPolicy.compare("dynamic")) {
1956         dcpFlowControlManager_ = new DcpFlowControlManagerDynamic(*this);
1957     } else if (!flowCtlPolicy.compare("aggressive")) {
1958         dcpFlowControlManager_ = new DcpFlowControlManagerAggressive(*this);
1959     } else {
1960         /* Flow control is not enabled */
1961         dcpFlowControlManager_ = new DcpFlowControlManager(*this);
1962     }
1963
1964     tapConnMap = new TapConnMap(*this);
1965     tapConfig = new TapConfig(*this);
1966     replicationThrottle = new ReplicationThrottle(configuration, stats);
1967     TapConfig::addConfigChangeListener(*this);
1968
1969     checkpointConfig = new CheckpointConfig(*this);
1970     CheckpointConfig::addConfigChangeListener(*this);
1971
1972     kvBucket = makeBucket(configuration);
1973
1974     initializeEngineCallbacks();
1975
1976     // Complete the initialization of the ep-store
1977     if (!kvBucket->initialize()) {
1978         return ENGINE_FAILED;
1979     }
1980
1981     if(configuration.isDataTrafficEnabled()) {
1982         enableTraffic(true);
1983     }
1984
1985     tapConnMap->initialize(TAP_CONN_NOTIFIER);
1986     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
1987
1988     // record engine initialization time
1989     startupTime.store(ep_real_time());
1990
1991     LOG(EXTENSION_LOG_NOTICE,
1992         "EP Engine: Initialization of %s bucket complete",
1993         configuration.getBucketType().c_str());
1994
1995     return ENGINE_SUCCESS;
1996 }
1997
1998 void EventuallyPersistentEngine::destroy(bool force) {
1999     stats.forceShutdown = force;
2000     stats.isShutdown = true;
2001
2002     // Perform a snapshot of the stats before shutting down so we can persist
2003     // the type of shutdown (stats.forceShutdown), and consequently on the
2004     // next warmup can determine is there was a clean shutdown - see
2005     // Warmup::cleanShutdown
2006     if (kvBucket) {
2007         kvBucket->snapshotStats();
2008     }
2009     if (tapConnMap) {
2010         tapConnMap->shutdownAllConnections();
2011     }
2012     if (dcpConnMap_) {
2013         dcpConnMap_->shutdownAllConnections();
2014     }
2015 }
2016
2017 ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
2018         item** itm,
2019         const DocKey& key,
2020         const size_t nbytes,
2021         const size_t priv_nbytes,
2022         const int flags,
2023         const rel_time_t exptime,
2024         uint8_t datatype,
2025         uint16_t vbucket) {
2026     if (priv_nbytes > maxItemPrivilegedBytes) {
2027         return ENGINE_E2BIG;
2028     }
2029
2030     if ((nbytes - priv_nbytes) > maxItemSize) {
2031         return ENGINE_E2BIG;
2032     }
2033
2034     if (!hasAvailableSpace(sizeof(Item) + sizeof(Blob) + key.size() + nbytes)) {
2035         return memoryCondition();
2036     }
2037
2038     time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
2039
2040     uint8_t ext_meta[1];
2041     uint8_t ext_len = EXT_META_LEN;
2042     *(ext_meta) = datatype;
2043     *itm = new Item(key,
2044                     flags,
2045                     expiretime,
2046                     nullptr,
2047                     nbytes,
2048                     ext_meta,
2049                     ext_len,
2050                     0 /*cas*/,
2051                     -1 /*seq*/,
2052                     vbucket);
2053     if (*itm == NULL) {
2054         return memoryCondition();
2055     } else {
2056         stats.itemAllocSizeHisto.add(nbytes);
2057         return ENGINE_SUCCESS;
2058     }
2059 }
2060
2061 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
2062     if (!deleteAllEnabled) {
2063         return ENGINE_ENOTSUP;
2064     }
2065
2066     if (!isDegradedMode()) {
2067         return ENGINE_TMPFAIL;
2068     }
2069
2070     /*
2071      * Supporting only a SYNC operation for bucket flush
2072      */
2073
2074     void* es = getEngineSpecific(cookie);
2075     if (es == NULL) {
2076         // Check if diskDeleteAll was false and set it to true
2077         // if yes, if the atomic variable weren't false, then
2078         // we will assume that a deleteAll has been scheduled
2079         // already and return TMPFAIL.
2080         if (kvBucket->scheduleDeleteAllTask(cookie)) {
2081             storeEngineSpecific(cookie, this);
2082             return ENGINE_EWOULDBLOCK;
2083         } else {
2084             LOG(EXTENSION_LOG_INFO,
2085                 "Tried to trigger a bucket deleteAll, but"
2086                 "there seems to be a task running already!");
2087             return ENGINE_TMPFAIL;
2088         }
2089
2090     } else {
2091         storeEngineSpecific(cookie, NULL);
2092         LOG(EXTENSION_LOG_NOTICE, "Completed bucket deleteAll operation");
2093         return ENGINE_SUCCESS;
2094     }
2095 }
2096
2097 cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
2098                                                        const DocKey& key,
2099                                                        uint16_t vbucket,
2100                                                        std::function<bool(const item_info&)>filter) {
2101
2102     auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2103
2104     // Fetch an item from the hashtable (without trying to schedule a bg-fetch
2105     // and pass it through the filter. If the filter accepts the document
2106     // based on the metadata, return the document. If the document's data
2107     // isn't resident we run another iteration in the loop and retries the
2108     // action but this time we _do_ schedule a bg-fetch.
2109     for (int ii = 0; ii < 2; ++ii) {
2110         auto options = static_cast<get_options_t>(HONOR_STATES |
2111                                                   TRACK_REFERENCE |
2112                                                   DELETE_TEMP |
2113                                                   HIDE_LOCKED_CAS |
2114                                                   ALLOW_META_ONLY);
2115         if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
2116             options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
2117         }
2118
2119         BlockTimer timer(&stats.getCmdHisto);
2120         GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2121         ENGINE_ERROR_CODE status = gv.getStatus();
2122
2123         switch (status) {
2124         case ENGINE_SUCCESS:
2125             break;
2126
2127         case ENGINE_KEY_ENOENT: // FALLTHROUGH
2128         case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
2129             if (isDegradedMode()) {
2130                 status = ENGINE_TMPFAIL;
2131             }
2132             // FALLTHROUGH
2133         default:
2134             return std::make_pair(cb::engine_errc(status),
2135                                   cb::unique_item_ptr{nullptr,
2136                                                       cb::ItemDeleter{handle}});
2137         }
2138
2139         auto* item = gv.getValue();
2140         cb::unique_item_ptr ret{item, cb::ItemDeleter{handle}};
2141
2142         const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
2143         const uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
2144
2145         // Currently
2146         if (filter(item->toItemInfo(vb_uuid))) {
2147             if (!gv.isPartial()) {
2148                 return std::make_pair(cb::engine_errc::success,
2149                                cb::unique_item_ptr{ret.release(),
2150                                                    cb::ItemDeleter{handle}});
2151             }
2152             // We want this item, but we need to fetch it off disk
2153         } else {
2154             // the client don't care about this thing..
2155             ret.reset(nullptr);
2156             return std::make_pair(cb::engine_errc::success,
2157                                   cb::unique_item_ptr{ret.release(),
2158                                                       cb::ItemDeleter{handle}});
2159         }
2160     }
2161
2162     // It should not be possible to get as the second iteration in the loop
2163     // SHOULD handle backround fetches an the item should NOT be partial!
2164     throw std::logic_error("EventuallyPersistentEngine::get_if: loop terminated");
2165 }
2166
2167 ENGINE_ERROR_CODE EventuallyPersistentEngine::get_locked(const void* cookie,
2168                                                          item** itm,
2169                                                          const DocKey& key,
2170                                                          uint16_t vbucket,
2171                                                          uint32_t lock_timeout) {
2172
2173     auto default_timeout = static_cast<uint32_t>(getGetlDefaultTimeout());
2174
2175     if (lock_timeout == 0) {
2176         lock_timeout = default_timeout;
2177     } else if (lock_timeout > static_cast<uint32_t>(getGetlMaxTimeout())) {
2178         LOG(EXTENSION_LOG_WARNING,
2179             "EventuallyPersistentEngine::get_locked: "
2180             "Illegal value for lock timeout specified %u. "
2181             "Using default value: %u", lock_timeout, default_timeout);
2182         lock_timeout = default_timeout;
2183     }
2184
2185     auto result = kvBucket->getLocked(key, vbucket, ep_current_time(),
2186                                       lock_timeout, cookie);
2187
2188     if (result.getStatus() == ENGINE_SUCCESS) {
2189         ++stats.numOpsGet;
2190         *itm = result.getValue();
2191     }
2192
2193     return result.getStatus();
2194 }
2195
2196 ENGINE_ERROR_CODE EventuallyPersistentEngine::unlock(const void* cookie,
2197                                                      const DocKey& key,
2198                                                      uint16_t vbucket,
2199                                                      uint64_t cas) {
2200     return kvBucket->unlockKey(key, vbucket, cas, ep_current_time());
2201 }
2202
2203
2204 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2205                                                     item* itm,
2206                                                     uint64_t *cas,
2207                                                     ENGINE_STORE_OPERATION
2208                                                                      operation) {
2209     BlockTimer timer(&stats.storeCmdHisto);
2210     ENGINE_ERROR_CODE ret;
2211     Item *it = static_cast<Item*>(itm);
2212
2213     switch (operation) {
2214     case OPERATION_CAS:
2215         if (it->getCas() == 0) {
2216             // Using a cas command with a cas wildcard doesn't make sense
2217             ret = ENGINE_NOT_STORED;
2218             break;
2219         }
2220         // FALLTHROUGH
2221     case OPERATION_SET:
2222         if (isDegradedMode()) {
2223             return ENGINE_TMPFAIL;
2224         }
2225         ret = kvBucket->set(*it, cookie);
2226         if (ret == ENGINE_SUCCESS) {
2227             *cas = it->getCas();
2228         }
2229
2230         break;
2231
2232     case OPERATION_ADD:
2233         if (isDegradedMode()) {
2234             return ENGINE_TMPFAIL;
2235         }
2236
2237         if (it->getCas() != 0) {
2238             // Adding an item with a cas value doesn't really make sense...
2239             return ENGINE_KEY_EEXISTS;
2240         }
2241
2242         ret = kvBucket->add(*it, cookie);
2243         if (ret == ENGINE_SUCCESS) {
2244             *cas = it->getCas();
2245         }
2246         break;
2247
2248     case OPERATION_REPLACE:
2249         ret = kvBucket->replace(*it, cookie);
2250         if (ret == ENGINE_SUCCESS) {
2251             *cas = it->getCas();
2252         }
2253         break;
2254     default:
2255         ret = ENGINE_ENOTSUP;
2256     }
2257
2258     switch (ret) {
2259     case ENGINE_SUCCESS:
2260         ++stats.numOpsStore;
2261         break;
2262     case ENGINE_ENOMEM:
2263         ret = memoryCondition();
2264         break;
2265     case ENGINE_NOT_STORED:
2266     case ENGINE_NOT_MY_VBUCKET:
2267         if (isDegradedMode()) {
2268             return ENGINE_TMPFAIL;
2269         }
2270         break;
2271     default:
2272         break;
2273     }
2274
2275     return ret;
2276 }
2277
2278 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2279                                                            item **itm,
2280                                                            void **es,
2281                                                            uint16_t *nes,
2282                                                            uint8_t *ttl,
2283                                                            uint16_t *flags,
2284                                                            uint32_t *seqno,
2285                                                            uint16_t *vbucket,
2286                                                            TapProducer
2287                                                                    *connection,
2288                                                            bool &retry) {
2289     *es = NULL;
2290     *nes = 0;
2291     *ttl = (uint8_t)-1;
2292     *seqno = 0;
2293     *flags = 0;
2294     *vbucket = 0;
2295
2296     retry = false;
2297
2298     if (connection->shouldFlush()) {
2299         return TAP_FLUSH;
2300     }
2301
2302     if (connection->isTimeForNoop()) {
2303         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2304             connection->logHeader());
2305         return TAP_NOOP;
2306     }
2307
2308     if (connection->isSuspended() || connection->windowIsFull()) {
2309         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2310             " suspended state or its ack windows is full.\n",
2311             connection->logHeader());
2312         return TAP_PAUSE;
2313     }
2314
2315     uint16_t ret = TAP_PAUSE;
2316     VBucketEvent ev = connection->nextVBucketHighPriority();
2317     if (ev.event != TAP_PAUSE) {
2318         switch (ev.event) {
2319         case TAP_VBUCKET_SET:
2320             LOG(EXTENSION_LOG_NOTICE,
2321                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2322                 connection->logHeader(), ev.vbucket,
2323                 VBucket::toString(ev.state));
2324             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2325             break;
2326         case TAP_OPAQUE:
2327             LOG(EXTENSION_LOG_NOTICE,
2328                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2329                 connection->logHeader(),
2330                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2331                 ev.vbucket);
2332             connection->opaqueCommandCode = (uint32_t) ev.state;
2333             *vbucket = ev.vbucket;
2334             *es = &connection->opaqueCommandCode;
2335             *nes = sizeof(connection->opaqueCommandCode);
2336             break;
2337
2338         default:
2339             throw std::logic_error("EventuallyPersistentEngine::doWalkTapQueue:"
2340                     " Unknown VBucketEvent message type:" +
2341                     std::to_string(ev.event) + " for connection:" +
2342                     connection->logHeader());
2343         }
2344         return ev.event;
2345     }
2346
2347     if (connection->waitForOpaqueMsgAck()) {
2348         return TAP_PAUSE;
2349     }
2350
2351     VBucketFilter backFillVBFilter;
2352     if (connection->runBackfill(backFillVBFilter)) {
2353         queueBackfill(backFillVBFilter, connection);
2354     }
2355
2356     uint8_t nru = INITIAL_NRU_VALUE;
2357     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2358     switch (ret) {
2359     case TAP_CHECKPOINT_START:
2360     case TAP_CHECKPOINT_END:
2361     case TAP_MUTATION:
2362     case TAP_DELETION:
2363         *itm = it;
2364         if (ret == TAP_MUTATION) {
2365             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2366                                                        it->getRevSeqno(), nru);
2367             *es = connection->specificData;
2368         } else if (ret == TAP_DELETION) {
2369             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2370                                                        it->getRevSeqno());
2371             *es = connection->specificData;
2372         } else if (ret == TAP_CHECKPOINT_START) {
2373             // Send the current value of the max deleted seqno
2374             VBucketPtr vb = getVBucket(*vbucket);
2375             if (!vb) {
2376                 retry = true;
2377                 return TAP_NOOP;
2378             }
2379             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2380                                                vb->ht.getMaxDeletedRevSeqno());
2381             *es = connection->specificData;
2382         }
2383         break;
2384     case TAP_NOOP:
2385         retry = true;
2386         break;
2387     default:
2388         break;
2389     }
2390
2391     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2392         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2393         if (vbev.event == TAP_VBUCKET_SET) {
2394             LOG(EXTENSION_LOG_NOTICE,
2395                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2396                 connection->logHeader(), vbev.vbucket,
2397                 VBucket::toString(vbev.state));
2398             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2399         }
2400         ret = vbev.event;
2401     }
2402
2403     return ret;
2404 }
2405
2406 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2407                                                   item **itm,
2408                                                   void **es,
2409                                                   uint16_t *nes,
2410                                                   uint8_t *ttl,
2411                                                   uint16_t *flags,
2412                                                   uint32_t *seqno,
2413                                                   uint16_t *vbucket) {
2414     TapProducer *connection = getTapProducer(cookie);
2415     if (!connection) {
2416         LOG(EXTENSION_LOG_WARNING,
2417             "Failed to lookup TAP connection.. Disconnecting\n");
2418         return TAP_DISCONNECT;
2419     }
2420
2421     connection->setPaused(false);
2422
2423     bool retry = false;
2424     uint16_t ret;
2425
2426     connection->setLastWalkTime();
2427     do {
2428         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2429                              seqno, vbucket, connection, retry);
2430     } while (retry);
2431
2432     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2433         connection->lastMsgTime = ep_current_time();
2434         if (ret == TAP_NOOP) {
2435             *seqno = 0;
2436         } else {
2437             ++stats.numTapFetched;
2438             *seqno = connection->getSeqno();
2439             if (connection->requestAck(ret, *vbucket)) {
2440                 *flags = TAP_FLAG_ACK;
2441                 connection->seqnoAckRequested = *seqno;
2442             }
2443
2444             if (ret == TAP_MUTATION) {
2445                 if (connection->haveFlagByteorderSupport()) {
2446                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2447                 }
2448             }
2449         }
2450     } else {
2451         connection->setPaused(true);
2452         connection->setNotifySent(false);
2453     }
2454
2455     return ret;
2456 }
2457
2458 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2459                                                 std::string &client,
2460                                                 uint32_t flags,
2461                                                 const void *userdata,
2462                                                 size_t nuserdata) {
2463     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2464         return false;
2465     }
2466
2467     std::string tapName = "eq_tapq:";
2468     if (client.length() == 0) {
2469         tapName.assign(ConnHandler::getAnonName());
2470     } else {
2471         tapName.append(client);
2472     }
2473
2474     // Decoding the userdata section of the packet and update the filters
2475     const char *ptr = static_cast<const char*>(userdata);
2476     uint64_t backfillAge = 0;
2477     std::vector<uint16_t> vbuckets;
2478     std::map<uint16_t, uint64_t> lastCheckpointIds;
2479
2480     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2481         if (nuserdata < sizeof(backfillAge)) {
2482             LOG(EXTENSION_LOG_WARNING,
2483                 "Backfill age is missing. Reject connection request from %s\n",
2484                 tapName.c_str());
2485             return false;
2486         }
2487         // use memcpy to avoid alignemt issues
2488         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2489         backfillAge = ntohll(backfillAge);
2490         nuserdata -= sizeof(backfillAge);
2491         ptr += sizeof(backfillAge);
2492     }
2493
2494     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2495         uint16_t nvbuckets;
2496         if (nuserdata < sizeof(nvbuckets)) {
2497             LOG(EXTENSION_LOG_WARNING,
2498             "Number of vbuckets is missing. Reject connection request from %s"
2499             "\n", tapName.c_str());
2500             return false;
2501         }
2502         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2503         nuserdata -= sizeof(nvbuckets);
2504         ptr += sizeof(nvbuckets);
2505         nvbuckets = ntohs(nvbuckets);
2506         if (nvbuckets > 0) {
2507             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2508                 LOG(EXTENSION_LOG_WARNING,
2509                 "# of vbuckets not matched. Reject connection request from %s"
2510                 "\n", tapName.c_str());
2511                 return false;
2512             }
2513             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2514                 uint16_t val;
2515                 memcpy(&val, ptr, sizeof(nvbuckets));
2516                 ptr += sizeof(uint16_t);
2517                 vbuckets.push_back(ntohs(val));
2518             }
2519             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2520         }
2521     }
2522
2523     if (flags & TAP_CONNECT_CHECKPOINT) {
2524         uint16_t nCheckpoints = 0;
2525         if (nuserdata >= sizeof(nCheckpoints)) {
2526             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2527             nuserdata -= sizeof(nCheckpoints);
2528             ptr += sizeof(nCheckpoints);
2529             nCheckpoints = ntohs(nCheckpoints);
2530         }
2531         if (nCheckpoints > 0) {
2532             if (nuserdata <
2533                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2534                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2535                     "Reject connection request from %s\n", tapName.c_str());
2536                 return false;
2537             }
2538             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2539                 uint16_t vbid;
2540                 uint64_t checkpointId;
2541                 memcpy(&vbid, ptr, sizeof(vbid));
2542                 ptr += sizeof(uint16_t);
2543                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2544                 ptr += sizeof(uint64_t);
2545                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2546             }
2547         }
2548     }
2549
2550     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2551                                  backfillAge,
2552                                  static_cast<int>(
2553                                  configuration.getTapKeepalive()),
2554                                  vbuckets,
2555                                  lastCheckpointIds);
2556
2557     tapConnMap->notifyPausedConnection(tp, true);
2558     return true;
2559 }
2560
2561 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2562                                                         void *engine_specific,
2563                                                         uint16_t nengine,
2564                                                         uint8_t ttl,
2565                                                         uint16_t tap_flags,
2566                                                         uint16_t tap_event,
2567                                                         uint32_t tap_seqno,
2568                                                         const void *key,
2569                                                         size_t nkey,
2570                                                         uint32_t flags,
2571                                                         uint32_t exptime,
2572                                                         uint64_t cas,
2573                                                         uint8_t datatype,
2574                                                         const void *data,
2575                                                         size_t ndata,
2576                                                         uint16_t vbucket)
2577 {
2578     (void) ttl;
2579     void *specific = getEngineSpecific(cookie);
2580     ConnHandler *connection = NULL;
2581     if (specific == NULL) {
2582         if (tap_event == TAP_ACK) {
2583             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2584                 "exist. Force disconnect...\n", (char *) cookie);
2585             // tap producer is no longer connected..
2586             return ENGINE_DISCONNECT;
2587         } else {
2588             connection = tapConnMap->newConsumer(cookie);
2589             if (connection == NULL) {
2590                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2591                     " Force disconnect\n");
2592                 return ENGINE_DISCONNECT;
2593             }
2594             storeEngineSpecific(cookie, connection);
2595         }
2596     } else {
2597         connection = reinterpret_cast<ConnHandler *>(specific);
2598     }
2599
2600
2601     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2602
2603     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2604         if (!replicationThrottle->shouldProcess()) {
2605             ++stats.replicationThrottled;
2606             if (connection->supportsAck()) {
2607                 ret = ENGINE_TMPFAIL;
2608             } else {
2609                 ret = ENGINE_DISCONNECT;
2610                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2611                     "ack support. Force disconnect...\n",
2612                     connection->logHeader());
2613             }
2614             return ret;
2615         }
2616     }
2617
2618     switch (tap_event) {
2619     case TAP_ACK:
2620         // TAP works only with the DefaultCollection
2621         ret = processTapAck(cookie, tap_seqno, tap_flags,
2622                             DocKey(static_cast<const uint8_t*>(key), nkey,
2623                                    DocNamespace::DefaultCollection));
2624         break;
2625     case TAP_FLUSH:
2626         ret = flush(cookie);
2627         LOG(EXTENSION_LOG_NOTICE, "%s Received flush.\n",
2628             connection->logHeader());
2629         break;
2630     case TAP_DELETION:
2631         {
2632             uint64_t revSeqno;
2633             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2634                                                 nengine, &revSeqno);
2635
2636             // Create key in DefaultCollection since TAP won't support collections
2637             const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2638                                 DocNamespace::DefaultCollection};
2639             ret = connection->deletion(0, docKey, {}, 0,
2640                                        PROTOCOL_BINARY_RAW_BYTES, cas, vbucket,
2641                                        0, revSeqno, {});
2642         }
2643         break;
2644
2645     case TAP_CHECKPOINT_START:
2646     case TAP_CHECKPOINT_END:
2647         {
2648             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2649             if (tc) {
2650                 if (tap_event == TAP_CHECKPOINT_START &&
2651                     nengine == TapEngineSpecific::sizeRevSeqno) {
2652                     // Set the current value for the max deleted seqno
2653                     VBucketPtr vb = getVBucket(vbucket);
2654                     if (!vb) {
2655                         return ENGINE_TMPFAIL;
2656                     }
2657                     uint64_t seqnum;
2658                     TapEngineSpecific::readSpecificData(tap_event,
2659                                                         engine_specific,
2660                                                         nengine,
2661                                                         &seqnum);
2662                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2663                 }
2664
2665                 if (data) {
2666                     uint64_t checkpointId;
2667                     memcpy(&checkpointId, data, sizeof(checkpointId));
2668                     checkpointId = ntohll(checkpointId);
2669                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2670                                           checkpointId);
2671                 }
2672                 else {
2673                     ret = ENGINE_DISCONNECT;
2674                     LOG(EXTENSION_LOG_WARNING,
2675                         "%s Checkpoint Id is missing in "
2676                         "CHECKPOINT messages. Force disconnect...\n",
2677                         connection->logHeader());
2678                 }
2679             }
2680             else {
2681                 ret = ENGINE_DISCONNECT;
2682                 LOG(EXTENSION_LOG_WARNING,
2683                     "%s not a consumer! Force disconnect\n",
2684                     connection->logHeader());
2685             }
2686         }
2687
2688         break;
2689
2690     case TAP_MUTATION:
2691         {
2692             uint8_t nru = INITIAL_NRU_VALUE;
2693             uint64_t revSeqno = 0;
2694             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2695                                                 nengine, &revSeqno, &nru);
2696
2697             if (!isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_JSON)) {
2698                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2699                 const unsigned char *dat = (const unsigned char*)data;
2700                 const int datlen = ndata;
2701                 if (checkUTF8JSON(dat, datlen)) {
2702                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2703                 }
2704             }
2705             // Create key in DefaultCollection since TAP won't support collections
2706             const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2707                                 DocNamespace::DefaultCollection};
2708             ret = connection->mutation(0, docKey,
2709                                        {static_cast<const uint8_t*>(data), ndata},
2710                                        0, datatype, cas,
2711                                        vbucket, flags, 0, revSeqno, exptime, 0,
2712                                        {}, nru);
2713         }
2714
2715         break;
2716
2717     case TAP_OPAQUE:
2718         if (nengine == sizeof(uint32_t)) {
2719             uint32_t cc;
2720             memcpy(&cc, engine_specific, sizeof(cc));
2721             cc = ntohl(cc);
2722
2723             switch (cc) {
2724             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2725                 // @todo: the memcached core will _ALWAYS_ send nack
2726                 //        if it encounter an error. This should be
2727                 // set as the default when we move to .next after 2.0
2728                 // (currently we need to allow the message for
2729                 // backwards compatibility)
2730                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2731                     connection->logHeader());
2732                 break;
2733             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2734                 connection->setSupportCheckpointSync(true);
2735                 LOG(EXTENSION_LOG_INFO,
2736                     "%s Enable checkpoint synchronization\n",
2737                     connection->logHeader());
2738                 break;
2739             case TAP_OPAQUE_OPEN_CHECKPOINT:
2740                 /**
2741                  * This event is only received by the TAP client that wants to
2742                  * get mutations from closed checkpoints only. At this time,
2743                  * only incremental backup client receives this event so that
2744                  * it can close the connection and reconnect later.
2745                  */
2746                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2747                     connection->logHeader());
2748                 break;
2749             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2750                 {
2751                     LOG(EXTENSION_LOG_INFO,
2752                         "%s Backfill started for vbucket %d.\n",
2753                         connection->logHeader(), vbucket);
2754                     BlockTimer timer(&stats.tapVbucketResetHisto);
2755                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2756                                                   ENGINE_DISCONNECT;
2757                     if (ret == ENGINE_DISCONNECT) {
2758                         LOG(EXTENSION_LOG_WARNING,
2759                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2760                             connection->logHeader(), vbucket);
2761                     } else {
2762                         LOG(EXTENSION_LOG_NOTICE,
2763                          "%s Reset vbucket %d was completed succecssfully.\n",
2764                             connection->logHeader(), vbucket);
2765                     }
2766
2767                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2768                     if (tc) {
2769                         tc->setBackfillPhase(true, vbucket);
2770                     } else {
2771                         ret = ENGINE_DISCONNECT;
2772                         LOG(EXTENSION_LOG_WARNING,
2773                             "TAP consumer doesn't exists. Force disconnect\n");
2774                     }
2775                 }
2776                 break;
2777             case TAP_OPAQUE_CLOSE_BACKFILL:
2778                 {
2779                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2780                         connection->logHeader());
2781                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2782                     if (tc) {
2783                         tc->setBackfillPhase(false, vbucket);
2784                     } else {
2785                         ret = ENGINE_DISCONNECT;
2786                         LOG(EXTENSION_LOG_WARNING,
2787                             "%s not a consumer! Force disconnect\n",
2788                             connection->logHeader());
2789                     }
2790                 }
2791                 break;
2792             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2793                 /**
2794                  * This event is sent by the eVBucketMigrator to notify that
2795                  * the source node closes the tap replication stream and
2796                  * switches to TAKEOVER_VBUCKETS phase.
2797                  * This is just an informative message and doesn't require any
2798                  * action.
2799                  */
2800                 LOG(EXTENSION_LOG_INFO,
2801                 "%s Received close tap stream. Switching to takeover phase.\n",
2802                     connection->logHeader());
2803                 break;
2804             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2805                 /**
2806                  * This opaque message is just for notifying that the source
2807                  * node receives change_vbucket_filter request and processes
2808                  * it successfully.
2809                  */
2810                 LOG(EXTENSION_LOG_INFO,
2811                 "%s Notified that the source node changed a vbucket filter.\n",
2812                     connection->logHeader());
2813                 break;
2814             default:
2815                 LOG(EXTENSION_LOG_WARNING,
2816                     "%s Received an unknown opaque command\n",
2817                     connection->logHeader());
2818             }
2819         } else {
2820             LOG(EXTENSION_LOG_WARNING,
2821                 "%s Received tap opaque with unknown size %d\n",
2822                 connection->logHeader(), nengine);
2823         }
2824         break;
2825
2826     case TAP_VBUCKET_SET:
2827         {
2828             BlockTimer timer(&stats.tapVbucketSetHisto);
2829
2830             if (nengine != sizeof(vbucket_state_t)) {
2831                 // illegal datasize
2832                 LOG(EXTENSION_LOG_WARNING,
2833                     "%s Received TAP_VBUCKET_SET with illegal size."
2834                     " Force disconnect\n", connection->logHeader());
2835                 ret = ENGINE_DISCONNECT;
2836                 break;
2837             }
2838
2839             vbucket_state_t state;
2840             memcpy(&state, engine_specific, nengine);
2841             state = (vbucket_state_t)ntohl(state);
2842
2843             ret = connection->setVBucketState(0, vbucket, state);
2844         }
2845         break;
2846
2847     default:
2848         // Unknown command
2849         LOG(EXTENSION_LOG_WARNING,
2850             "%s Recieved bad opcode, ignoring message\n",
2851             connection->logHeader());
2852     }
2853
2854     connection->processedEvent(tap_event, ret);
2855     return ret;
2856 }
2857
2858 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2859                                                       TapConsumer *consumer,
2860                                                       uint8_t event,
2861                                                       uint16_t vbucket,
2862                                                       uint64_t checkpointId) {
2863     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2864
2865     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2866         getKVBucket()->wakeUpFlusher();
2867         ret = ENGINE_SUCCESS;
2868     }
2869     else {
2870         ret = ENGINE_DISCONNECT;
2871         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2872             "checkpoint %" PRIu64 ". Force disconnect\n",
2873             consumer->logHeader(), checkpointId);
2874     }
2875
2876     return ret;
2877 }
2878
2879 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2880     TapProducer *rv =
2881         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2882     if (!(rv && rv->isConnected())) {
2883         LOG(EXTENSION_LOG_WARNING,
2884             "Walking a non-existent tap queue, disconnecting\n");
2885         return NULL;
2886     }
2887
2888     if (rv->doDisconnect()) {
2889         LOG(EXTENSION_LOG_WARNING,
2890             "%s Disconnecting pending connection\n", rv->logHeader());
2891         return NULL;
2892     }
2893     return rv;
2894 }
2895
2896 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2897     // Register the ON_DISCONNECT callback
2898     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2899     // Register the ON_DELETE_BUCKET callback
2900     registerEngineCallback(ON_DELETE_BUCKET, EvpHandleDeleteBucket, this);
2901 }
2902
2903 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2904                                                             uint32_t seqno,
2905                                                             uint16_t status,
2906                                                             const DocKey& key)
2907 {
2908     TapProducer *connection = getTapProducer(cookie);
2909     if (!connection) {
2910         LOG(EXTENSION_LOG_WARNING,
2911             "Unable to process tap ack. No producer found\n");
2912         return ENGINE_DISCONNECT;
2913     }
2914
2915     return connection->processAck(seqno, status, key);
2916 }
2917
2918 ENGINE_ERROR_CODE EventuallyPersistentEngine::memoryCondition() {
2919     // Do we think it's possible we could free something?
2920     bool haveEvidenceWeCanFreeMemory =
2921         (stats.getMaxDataSize() > stats.memOverhead->load());
2922     if (haveEvidenceWeCanFreeMemory) {
2923         // Look for more evidence by seeing if we have resident items.
2924         VBucketCountVisitor countVisitor(vbucket_state_active);
2925         kvBucket->visit(countVisitor);
2926
2927         haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
2928             countVisitor.getNumItems();
2929     }
2930     if (haveEvidenceWeCanFreeMemory) {
2931         ++stats.tmp_oom_errors;
2932         // Wake up the item pager task as memory usage
2933         // seems to have exceeded high water mark
2934         getKVBucket()->wakeUpItemPager();
2935         return ENGINE_TMPFAIL;
2936     } else {
2937         ++stats.oom_errors;
2938         return ENGINE_ENOMEM;
2939     }
2940 }
2941
2942 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2943                                                              &backfillVBFilter,
2944                                                Producer *tc)
2945 {
2946     auto bfv = std::make_unique<BackFillVisitor>(
2947             this, *tapConnMap, tc, backfillVBFilter);
2948     getKVBucket()->visit(std::move(bfv),
2949                          "Backfill task",
2950                          TaskId::BackfillVisitorTask,
2951                          1);
2952 }
2953
2954 void VBucketCountAggregator::visitBucket(VBucketPtr &vb) {
2955     std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
2956     it = visitorMap.find(vb->getState());
2957     if ( it != visitorMap.end() ) {
2958         it->second->visitBucket(vb);
2959     }
2960 }
2961
2962 void VBucketCountAggregator::addVisitor(VBucketCountVisitor* visitor) {
2963     visitorMap[visitor->getVBucketState()] = visitor;
2964 }
2965
2966 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
2967                                                            ADD_STAT add_stat) {
2968
2969     configuration.addStats(add_stat, cookie);
2970
2971     EPStats &epstats = getEpStats();
2972     add_casted_stat("ep_version", VERSION, add_stat, cookie);
2973     add_casted_stat("ep_storage_age",
2974                     epstats.dirtyAge, add_stat, cookie);
2975     add_casted_stat("ep_storage_age_highwat",
2976                     epstats.dirtyAgeHighWat, add_stat, cookie);
2977     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
2978                     add_stat, cookie);
2979
2980     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
2981         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
2982     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
2983         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
2984     }
2985
2986     add_casted_stat("ep_total_enqueued",
2987                     epstats.totalEnqueued, add_stat, cookie);
2988     add_casted_stat("ep_expired_access", epstats.expired_access,
2989                     add_stat, cookie);
2990     add_casted_stat("ep_expired_compactor", epstats.expired_compactor,
2991                     add_stat, cookie);
2992     add_casted_stat("ep_expired_pager", epstats.expired_pager,
2993                     add_stat, cookie);
2994     add_casted_stat("ep_queue_size",
2995                     epstats.diskQueueSize, add_stat, cookie);
2996     add_casted_stat("ep_diskqueue_items",
2997                     epstats.diskQueueSize, add_stat, cookie);
2998     add_casted_stat("ep_vb_backfill_queue_size",
2999                     epstats.vbBackfillQueueSize,
3000                     add_stat,
3001                     cookie);
3002     auto* flusher = kvBucket->getFlusher(EP_PRIMARY_SHARD);
3003     if (flusher) {
3004         add_casted_stat("ep_commit_num", epstats.flusherCommits,
3005                         add_stat, cookie);
3006         add_casted_stat("ep_commit_time",
3007                         epstats.commit_time, add_stat, cookie);
3008         add_casted_stat("ep_commit_time_total",
3009                         epstats.cumulativeCommitTime, add_stat, cookie);
3010         add_casted_stat("ep_item_begin_failed",
3011                         epstats.beginFailed, add_stat, cookie);
3012         add_casted_stat("ep_item_commit_failed",
3013                         epstats.commitFailed, add_stat, cookie);
3014         add_casted_stat("ep_item_flush_expired",
3015                         epstats.flushExpired, add_stat, cookie);
3016         add_casted_stat("ep_item_flush_failed",
3017                         epstats.flushFailed, add_stat, cookie);
3018         add_casted_stat("ep_flusher_state",
3019                         flusher->stateName(), add_stat, cookie);
3020         add_casted_stat("ep_flusher_todo",
3021                         epstats.flusher_todo, add_stat, cookie);
3022         add_casted_stat("ep_total_persisted",
3023                         epstats.totalPersisted, add_stat, cookie);
3024         add_casted_stat("ep_uncommitted_items",
3025                         epstats.flusher_todo, add_stat, cookie);
3026         add_casted_stat("ep_chk_persistence_timeout",
3027                         VBucket::getCheckpointFlushTimeout(),
3028                         add_stat,
3029                         cookie);
3030     }
3031     add_casted_stat("ep_vbucket_del",
3032                     epstats.vbucketDeletions, add_stat, cookie);
3033     add_casted_stat("ep_vbucket_del_fail",
3034                     epstats.vbucketDeletionFail, add_stat, cookie);
3035     add_casted_stat("ep_flush_duration_total",
3036                     epstats.cumulativeFlushTime, add_stat, cookie);
3037
3038     kvBucket->getAggregatedVBucketStats(cookie, add_stat);
3039
3040     kvBucket->getFileStats(cookie, add_stat);
3041
3042     add_casted_stat("ep_persist_vbstate_total",
3043                     epstats.totalPersistVBState, add_stat, cookie);
3044
3045     size_t memUsed =  stats.getTotalMemoryUsed();
3046     add_casted_stat("mem_used", memUsed, add_stat, cookie);
3047     add_casted_stat("ep_mem_low_wat_percent", stats.mem_low_wat_percent,
3048                     add_stat, cookie);
3049     add_casted_stat("ep_mem_high_wat_percent", stats.mem_high_wat_percent,
3050                     add_stat, cookie);
3051     add_casted_stat("bytes", memUsed, add_stat, cookie);
3052     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3053     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3054 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3055     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3056 #else
3057     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3058 #endif
3059     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3060     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3061                     add_stat, cookie);
3062 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3063     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3064 #else
3065     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3066 #endif
3067     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3068     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3069     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3070
3071     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3072     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3073                     add_stat, cookie);
3074     add_casted_stat("ep_mem_tracker_enabled", stats.memoryTrackerEnabled,
3075                     add_stat, cookie);
3076     add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3077                     add_stat, cookie);
3078     add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3079                     add_stat, cookie);
3080     add_casted_stat("ep_bg_remaining_items", epstats.numRemainingBgItems,
3081                     add_stat, cookie);
3082     add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3083                     add_stat, cookie);
3084     add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3085                     add_stat, cookie);
3086     add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3087                     add_stat, cookie);
3088     add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3089                     add_stat, cookie);
3090     add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3091                     add_stat, cookie);
3092     add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3093                     add_stat, cookie);
3094     add_casted_stat("ep_items_rm_from_checkpoints",
3095                     epstats.itemsRemovedFromCheckpoints,
3096                     add_stat, cookie);
3097     add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3098                     add_stat, cookie);
3099     add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3100                     add_stat, cookie);
3101     add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3102                     add_stat, cookie);
3103
3104     add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3105     add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3106                     add_stat, cookie);
3107     add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3108                     add_stat, cookie);
3109     add_casted_stat("ep_pending_ops_max_duration",
3110                     epstats.pendingOpsMaxDuration,
3111                     add_stat, cookie);
3112
3113     add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3114                     add_stat, cookie);
3115     add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3116                     add_stat, cookie);
3117
3118     size_t vbDeletions = epstats.vbucketDeletions.load();
3119     if (vbDeletions > 0) {
3120         add_casted_stat("ep_vbucket_del_max_walltime",
3121                         epstats.vbucketDelMaxWalltime,
3122                         add_stat, cookie);
3123         add_casted_stat("ep_vbucket_del_avg_walltime",
3124                         epstats.vbucketDelTotWalltime / vbDeletions,
3125                         add_stat, cookie);
3126     }
3127
3128     size_t numBgOps = epstats.bgNumOperations.load();
3129     if (numBgOps > 0) {
3130         add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3131                         add_stat, cookie);
3132         add_casted_stat("ep_bg_min_wait",
3133                         epstats.bgMinWait,
3134                         add_stat, cookie);
3135         add_casted_stat("ep_bg_max_wait",
3136                         epstats.bgMaxWait,
3137                         add_stat, cookie);
3138         add_casted_stat("ep_bg_wait_avg",
3139                         epstats.bgWait / numBgOps,
3140                         add_stat, cookie);
3141         add_casted_stat("ep_bg_min_load",
3142                         epstats.bgMinLoad,
3143                         add_stat, cookie);
3144         add_casted_stat("ep_bg_max_load",
3145                         epstats.bgMaxLoad,
3146                         add_stat, cookie);
3147         add_casted_stat("ep_bg_load_avg",
3148                         epstats.bgLoad / numBgOps,
3149                         add_stat, cookie);
3150         add_casted_stat("ep_bg_wait",
3151                         epstats.bgWait,
3152                         add_stat, cookie);
3153         add_casted_stat("ep_bg_load",
3154                         epstats.bgLoad,
3155                         add_stat, cookie);
3156     }
3157
3158     add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3159
3160     add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3161                     add_stat, cookie);
3162     add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3163                     add_stat, cookie);
3164     add_casted_stat("ep_num_access_scanner_skips",
3165                     epstats.accessScannerSkips, add_stat, cookie);
3166     add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3167                     add_stat, cookie);
3168     add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3169                     add_stat, cookie);
3170
3171     if (kvBucket->isAccessScannerEnabled() && epstats.alogTime.load() != 0)
3172     {
3173         char timestr[20];
3174         struct tm alogTim;
3175         hrtime_t alogTime = epstats.alogTime.load();
3176         if (cb_gmtime_r((time_t *)&alogTime, &alogTim) == -1) {
3177             add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3178                             cookie);
3179         } else {
3180             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3181             add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3182                             cookie);
3183         }
3184     } else {
3185         add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3186                         add_stat, cookie);
3187     }
3188
3189     if (kvBucket->isExpPagerEnabled()) {
3190         char timestr[20];
3191         struct tm expPagerTim;
3192         hrtime_t expPagerTime = epstats.expPagerTime.load();
3193         if (cb_gmtime_r((time_t *)&expPagerTime, &expPagerTim) == -1) {
3194             add_casted_stat("ep_expiry_pager_task_time", "UNKNOWN", add_stat,
3195                             cookie);
3196         } else {
3197             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &expPagerTim);
3198             add_casted_stat("ep_expiry_pager_task_time", timestr, add_stat,
3199                             cookie);
3200         }
3201     } else {
3202         add_casted_stat("ep_expiry_pager_task_time", "NOT_SCHEDULED",
3203                         add_stat, cookie);
3204     }
3205
3206     add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
3207
3208     if (getConfiguration().isWarmup()) {
3209         Warmup *wp = kvBucket->getWarmup();
3210         if (wp == nullptr) {
3211             throw std::logic_error("EPEngine::doEngineStats: warmup is NULL");
3212         }
3213         if (!kvBucket->isWarmingUp()) {
3214             add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
3215         } else {
3216             add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);
3217         }
3218         if (wp->getTime() > 0) {
3219             add_casted_stat("ep_warmup_time", wp->getTime() / 1000,
3220                             add_stat, cookie);
3221         }
3222         add_casted_stat("ep_warmup_oom", epstats.warmOOM, add_stat, cookie);
3223         add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
3224     }
3225
3226     add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
3227                     add_stat, cookie);
3228     add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
3229                     add_stat, cookie);
3230     add_casted_stat("ep_num_ops_del_meta", epstats.numOpsDelMeta,
3231                     add_stat, cookie);
3232     add_casted_stat("ep_num_ops_set_meta_res_fail",
3233                     epstats.numOpsSetMetaResolutionFailed, add_stat, cookie);
3234     add_casted_stat("ep_num_ops_del_meta_res_fail",
3235                     epstats.numOpsDelMetaResolutionFailed, add_stat, cookie);
3236     add_casted_stat("ep_num_ops_set_ret_meta", epstats.numOpsSetRetMeta,
3237                     add_stat, cookie);
3238     add_casted_stat("ep_num_ops_del_ret_meta", epstats.numOpsDelRetMeta,
3239                     add_stat, cookie);
3240     add_casted_stat("ep_num_ops_get_meta_on_set_meta",
3241                     epstats.numOpsGetMetaOnSetWithMeta, add_stat, cookie);
3242     add_casted_stat("ep_workload_pattern",
3243                     workload->stringOfWorkLoadPattern(),
3244                     add_stat, cookie);
3245
3246     add_casted_stat("ep_defragmenter_num_visited", epstats.defragNumVisited,
3247                     add_stat, cookie);
3248     add_casted_stat("ep_defragmenter_num_moved", epstats.defragNumMoved,
3249                     add_stat, cookie);
3250
3251     add_casted_stat("ep_cursor_dropping_lower_threshold",
3252                     epstats.cursorDroppingLThreshold, add_stat, cookie);
3253     add_casted_stat("ep_cursor_dropping_upper_threshold",
3254                     epstats.cursorDroppingUThreshold, add_stat, cookie);
3255     add_casted_stat("ep_cursors_dropped",
3256                     epstats.cursorsDropped, add_stat, cookie);
3257
3258
3259     // Note: These are also reported per-shard in 'kvstore' stats, however
3260     // we want to be able to graph these over time, and hence need to expose
3261     // to ns_sever at the top-level.
3262     size_t value = 0;
3263     if (kvBucket->getKVStoreStat("io_total_read_bytes", value,
3264                                  KVBucketIface::KVSOption::BOTH)) {
3265         add_casted_stat("ep_io_total_read_bytes",  value, add_stat, cookie);
3266     }
3267     if (kvBucket->getKVStoreStat("io_total_write_bytes", value,
3268                                  KVB