d82beda844e693bfc4b0771db62c29009a507dd1
[ep-engine.git] / src / ep_engine.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21
22 #include "backfill.h"
23 #include "common.h"
24 #include "connmap.h"
25 #include "dcp/consumer.h"
26 #include "dcp/dcpconnmap.h"
27 #include "dcp/flow-control-manager.h"
28 #include "dcp/producer.h"
29 #include "ep_bucket.h"
30 #include "ep_vb.h"
31 #include "ephemeral_bucket.h"
32 #include "failover-table.h"
33 #include "flusher.h"
34 #include "htresizer.h"
35 #include "logger.h"
36 #include "memory_tracker.h"
37 #include "replicationthrottle.h"
38 #include "stats-info.h"
39 #define STATWRITER_NAMESPACE core_engine
40 #include "statwriter.h"
41 #undef STATWRITER_NAMESPACE
42 #include "string_utils.h"
43 #include "tapconnmap.h"
44 #include "vb_count_visitor.h"
45 #include "warmup.h"
46
47 #include <JSON_checker.h>
48 #include <cJSON_utils.h>
49 #include <memcached/engine.h>
50 #include <memcached/extension.h>
51 #include <memcached/protocol_binary.h>
52 #include <memcached/server_api.h>
53 #include <memcached/util.h>
54 #include <platform/cb_malloc.h>
55 #include <platform/checked_snprintf.h>
56 #include <platform/make_unique.h>
57 #include <platform/platform.h>
58 #include <platform/processclock.h>
59 #include <xattr/utils.h>
60
61 #include <cstdio>
62 #include <cstring>
63 #include <fcntl.h>
64 #include <fstream>
65 #include <iostream>
66 #include <limits>
67 #include <mutex>
68 #include <stdarg.h>
69 #include <string>
70 #include <vector>
71
72 static size_t percentOf(size_t val, double percent) {
73     return static_cast<size_t>(static_cast<double>(val) * percent);
74 }
75
76 struct EPHandleReleaser {
77     void operator()(EventuallyPersistentEngine*) {
78         ObjectRegistry::onSwitchThread(nullptr);
79     }
80 };
81
82 using EPHandle = std::unique_ptr<EventuallyPersistentEngine, EPHandleReleaser>;
83
84 /**
85  * Helper function to acquire a handle to the engine which allows access to
86  * the engine while the handle is in scope.
87  * @param handle pointer to the engine
88  * @return EPHandle which is a unique_ptr to an EventuallyPersistentEngine
89  * with a custom deleter (EPHandleReleaser) which performs the required
90  * ObjectRegistry release.
91  */
92
93 static inline EPHandle acquireEngine(ENGINE_HANDLE* handle) {
94     auto ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
95     ObjectRegistry::onSwitchThread(ret);
96
97     return EPHandle(ret);
98 }
99
100 /**
101  * Call the response callback and return the appropriate value so that
102  * the core knows what to do..
103  */
104 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
105                                       uint16_t keylen,
106                                       const void *ext, uint8_t extlen,
107                                       const void *body, uint32_t bodylen,
108                                       uint8_t datatype, uint16_t status,
109                                       uint64_t cas, const void *cookie)
110 {
111     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
112     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
113     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
114                  status, cas, cookie)) {
115         rv = ENGINE_SUCCESS;
116     }
117     ObjectRegistry::onSwitchThread(e);
118     return rv;
119 }
120
121 template <typename T>
122 static void validate(T v, T l, T h) {
123     if (v < l || v > h) {
124         throw std::runtime_error("Value out of range.");
125     }
126 }
127
128
129 static void checkNumeric(const char* str) {
130     int i = 0;
131     if (str[0] == '-') {
132         i++;
133     }
134     for (; str[i]; i++) {
135         using namespace std;
136         if (!isdigit(str[i])) {
137             throw std::runtime_error("Value is not numeric");
138         }
139     }
140 }
141
142 static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle) {
143     return acquireEngine(handle)->getInfo();
144 }
145
146 static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
147                                        const char* config_str) {
148     return acquireEngine(handle)->initialize(config_str);
149 }
150
151 static void EvpDestroy(ENGINE_HANDLE* handle, const bool force) {
152     auto eng = acquireEngine(handle);
153     eng->destroy(force);
154     delete eng.get();
155 }
156
157 static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
158                                          const void* cookie,
159                                          item** itm,
160                                          const DocKey& key,
161                                          const size_t nbytes,
162                                          const int flags,
163                                          const rel_time_t exptime,
164                                          uint8_t datatype,
165                                          uint16_t vbucket) {
166     if (!mcbp::datatype::is_valid(datatype)) {
167         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
168             " (ItemAllocate)");
169         return ENGINE_EINVAL;
170     }
171
172     return acquireEngine(handle)->itemAllocate(itm,
173                                                key,
174                                                nbytes,
175                                                0, // No privileged bytes
176                                                flags,
177                                                exptime,
178                                                datatype,
179                                                vbucket);
180 }
181
182 static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
183                            const item* itm, item_info *itm_info);
184 static void EvpItemRelease(ENGINE_HANDLE* handle, const void *cookie,
185                            item* itm);
186
187
188 static std::pair<cb::unique_item_ptr, item_info> EvpItemAllocateEx(ENGINE_HANDLE* handle,
189                                                                    const void* cookie,
190                                                                    const DocKey& key,
191                                                                    const size_t nbytes,
192                                                                    const size_t priv_nbytes,
193                                                                    const int flags,
194                                                                    const rel_time_t exptime,
195                                                                    uint8_t datatype,
196                                                                    uint16_t vbucket) {
197
198     item* it = nullptr;
199     auto err = acquireEngine(handle)->itemAllocate(
200             &it, key, nbytes, priv_nbytes, flags, exptime, datatype, vbucket);
201
202     if (err != ENGINE_SUCCESS) {
203         throw cb::engine_error(cb::engine_errc(err),
204                                "EvpItemAllocateEx: failed to allocate memory");
205     }
206
207     item_info info;
208     if (!EvpGetItemInfo(handle, cookie, it, &info)) {
209         EvpItemRelease(handle, cookie, it);
210         throw cb::engine_error(cb::engine_errc::failed,
211                                "EvpItemAllocateEx: EvpGetItemInfo failed");
212     }
213
214     return std::make_pair(cb::unique_item_ptr{it, cb::ItemDeleter{handle}},
215                           info);
216 }
217
218 static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
219                                        const void* cookie,
220                                        const DocKey& key,
221                                        uint64_t* cas,
222                                        uint16_t vbucket,
223                                        mutation_descr_t* mut_info) {
224     if (!cas) {
225         LOG(EXTENSION_LOG_WARNING,
226             "EvpItemDelete(): cas ptr passed is null for vb: %" PRIu16,
227             vbucket);
228         return ENGINE_EINVAL;
229     }
230     return acquireEngine(handle)->itemDelete(
231             cookie, key, *cas, vbucket, nullptr, nullptr, mut_info);
232 }
233
234 static void EvpItemRelease(ENGINE_HANDLE* handle,
235                            const void* cookie,
236                            item* itm) {
237     acquireEngine(handle)->itemRelease(cookie, itm);
238 }
239
240 static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
241                                 const void* cookie,
242                                 item** itm,
243                                 const DocKey& key,
244                                 uint16_t vbucket,
245                                 DocStateFilter documentStateFilter) {
246     get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
247                                                        HONOR_STATES |
248                                                        TRACK_REFERENCE |
249                                                        DELETE_TEMP |
250                                                        HIDE_LOCKED_CAS |
251                                                        TRACK_STATISTICS);
252
253     switch (documentStateFilter) {
254     case DocStateFilter::Alive:
255         break;
256     case DocStateFilter::Deleted:
257         // MB-23640 was caused by this bug as the frontend asked for
258         // Alive and Deleted documents. The internals don't have a
259         // way of requesting just deleted documents, and luckily for
260         // us no part of our code is using this yet. Return an error
261         // if anyone start using it
262         return ENGINE_ENOTSUP;
263     case DocStateFilter::AliveOrDeleted:
264         options = static_cast<get_options_t>(options | GET_DELETED_VALUE);
265         break;
266     }
267
268     return acquireEngine(handle)->get(cookie, itm, key, vbucket, options);
269 }
270
271 static cb::EngineErrorItemPair EvpGetIf(ENGINE_HANDLE* handle,
272                                         const void* cookie,
273                                         const DocKey& key,
274                                         uint16_t vbucket,
275                                         std::function<bool(
276                                             const item_info&)> filter) {
277     return acquireEngine(handle)->get_if(cookie, key, vbucket, filter);
278 }
279
280 static ENGINE_ERROR_CODE EvpGetLocked(ENGINE_HANDLE* handle,
281                                       const void* cookie,
282                                       item** itm,
283                                       const DocKey& key,
284                                       uint16_t vbucket,
285                                       uint32_t lock_timeout) {
286     return acquireEngine(handle)->get_locked(
287             cookie, itm, key, vbucket, lock_timeout);
288 }
289
290 static ENGINE_ERROR_CODE EvpUnlock(ENGINE_HANDLE* handle,
291                                    const void* cookie,
292                                    const DocKey& key,
293                                    uint16_t vbucket,
294                                    uint64_t cas) {
295     return acquireEngine(handle)->unlock(cookie, key, vbucket, cas);
296 }
297
298 static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
299                                      const void* cookie,
300                                      const char* stat_key,
301                                      int nkey,
302                                      ADD_STAT add_stat) {
303     return acquireEngine(handle)->getStats(cookie, stat_key, nkey, add_stat);
304 }
305
306 static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
307                                   const void* cookie,
308                                   item* itm,
309                                   uint64_t* cas,
310                                   ENGINE_STORE_OPERATION operation,
311                                   DocumentState document_state) {
312     ENGINE_ERROR_CODE err_code = ENGINE_SUCCESS;
313     auto engine = acquireEngine(handle);
314     switch (document_state) {
315     case DocumentState::Alive: {
316         err_code = engine->store(cookie, itm, cas, operation);
317         break;
318     }
319     case DocumentState::Deleted: {
320         Item* item = static_cast<Item*>(itm);
321         ItemMetaData itm_meta;
322         mutation_descr_t mut_info;
323
324         if (!cas) {
325             LOG(EXTENSION_LOG_WARNING,
326                 "EvpStore(): cas ptr passed is null for vb: %" PRIu16,
327                 item->getVBucketId());
328             return ENGINE_EINVAL;
329         }
330
331         /* Set the item as deleted */
332         item->setDeleted();
333         *cas = item->getCas();
334         err_code = engine->itemDelete(cookie,
335                                       item->getKey(),
336                                       *cas,
337                                       item->getVBucketId(),
338                                       item,
339                                       &itm_meta,
340                                       &mut_info);
341         if (err_code == ENGINE_SUCCESS) {
342             item->setBySeqno(mut_info.seqno);
343             item->setCas(itm_meta.cas);
344             item->setFlags(itm_meta.flags);
345             item->setExpTime(itm_meta.exptime);
346         }
347         break;
348     }
349     default:
350         return ENGINE_ENOTSUP;
351     }
352     return err_code;
353 }
354
355 static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
356                                   const void* cookie) {
357     return acquireEngine(handle)->flush(cookie);
358 }
359
360 static void EvpResetStats(ENGINE_HANDLE* handle, const void*) {
361     acquireEngine(handle)->resetStats();
362 }
363
364 protocol_binary_response_status EventuallyPersistentEngine::setTapParam(
365         const char* keyz, const char* valz, std::string& msg) {
366     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
367
368     try {
369         if (strcmp(keyz, "tap_keepalive") == 0) {
370             int v = std::stoi(valz);
371             validate(v, 0, MAX_TAP_KEEP_ALIVE);
372             getConfiguration().requirementsMetOrThrow("tap_keepalive");
373             setTapKeepAlive(static_cast<uint32_t>(v));
374         } else if (strcmp(keyz, "replication_throttle_threshold") == 0) {
375             getConfiguration().setReplicationThrottleThreshold(
376                     std::stoull(valz));
377         } else if (strcmp(keyz, "replication_throttle_queue_cap") == 0) {
378             getConfiguration().setReplicationThrottleQueueCap(std::stoll(valz));
379         } else if (strcmp(keyz, "replication_throttle_cap_pcnt") == 0) {
380             getConfiguration().setReplicationThrottleCapPcnt(std::stoull(valz));
381         } else {
382             msg = "Unknown config param";
383             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
384         }
385         // Handles exceptions thrown by the standard
386         // library stoi/stoul style functions when not numeric
387     } catch (std::invalid_argument&) {
388         msg = "Argument was not numeric";
389         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
390
391         // Handles exceptions thrown by the standard library stoi/stoul
392         // style functions when the conversion does not fit in the datatype
393     } catch (std::out_of_range&) {
394         msg = "Argument was out of range";
395         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
396
397         // Handles any miscellaenous exceptions in addition to the range_error
398         // exceptions thrown by the configuration::set<param>() methods
399     } catch (std::exception& error) {
400         msg = error.what();
401         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
402     }
403
404     return rv;
405 }
406
407 protocol_binary_response_status EventuallyPersistentEngine::setCheckpointParam(
408         const char* keyz, const char* valz, std::string& msg) {
409     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
410
411     try {
412         if (strcmp(keyz, "chk_max_items") == 0) {
413             size_t v = std::stoull(valz);
414             validate(v, size_t(MIN_CHECKPOINT_ITEMS),
415                      size_t(MAX_CHECKPOINT_ITEMS));
416             getConfiguration().setChkMaxItems(v);
417         } else if (strcmp(keyz, "chk_period") == 0) {
418             size_t v = std::stoull(valz);
419             validate(v, size_t(MIN_CHECKPOINT_PERIOD),
420                      size_t(MAX_CHECKPOINT_PERIOD));
421             getConfiguration().setChkPeriod(v);
422         } else if (strcmp(keyz, "max_checkpoints") == 0) {
423             size_t v = std::stoull(valz);
424             validate(v, size_t(DEFAULT_MAX_CHECKPOINTS),
425                      size_t(MAX_CHECKPOINTS_UPPER_BOUND));
426             getConfiguration().setMaxCheckpoints(v);
427         } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
428             getConfiguration().setItemNumBasedNewChk(cb_stob(valz));
429         } else if (strcmp(keyz, "keep_closed_chks") == 0) {
430             getConfiguration().setKeepClosedChks(cb_stob(valz));
431         } else if (strcmp(keyz, "enable_chk_merge") == 0) {
432             getConfiguration().setEnableChkMerge(cb_stob(valz));
433         } else {
434             msg = "Unknown config param";
435             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
436         }
437
438         // Handles exceptions thrown by the cb_stob function
439     } catch (invalid_argument_bool& error) {
440         msg = error.what();
441         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
442
443         // Handles exceptions thrown by the standard
444         // library stoi/stoul style functions when not numeric
445     } catch (std::invalid_argument&) {
446         msg = "Argument was not numeric";
447         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
448
449         // Handles exceptions thrown by the standard library stoi/stoul
450         // style functions when the conversion does not fit in the datatype
451     } catch (std::out_of_range&) {
452         msg = "Argument was out of range";
453         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
454
455         // Handles any miscellaenous exceptions in addition to the range_error
456         // exceptions thrown by the configuration::set<param>() methods
457     } catch (std::exception& error) {
458         msg = error.what();
459         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
460     }
461
462     return rv;
463 }
464
465 protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
466         const char* keyz, const char* valz, std::string& msg) {
467     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
468
469     // Handle the actual mutation.
470     try {
471         if (strcmp(keyz, "bg_fetch_delay") == 0) {
472             getConfiguration().setBgFetchDelay(std::stoull(valz));
473         } else if (strcmp(keyz, "flushall_enabled") == 0) {
474             getConfiguration().setFlushallEnabled(cb_stob(valz));
475         } else if (strcmp(keyz, "max_size") == 0) {
476             size_t vsize = std::stoull(valz);
477
478             getConfiguration().setMaxSize(vsize);
479             EPStats& st = getEpStats();
480             getConfiguration().setMemLowWat(
481                     percentOf(vsize, st.mem_low_wat_percent));
482             getConfiguration().setMemHighWat(
483                     percentOf(vsize, st.mem_high_wat_percent));
484         } else if (strcmp(keyz, "mem_low_wat") == 0) {
485             getConfiguration().setMemLowWat(std::stoull(valz));
486         } else if (strcmp(keyz, "mem_high_wat") == 0) {
487             getConfiguration().setMemHighWat(std::stoull(valz));
488         } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
489             getConfiguration().setBackfillMemThreshold(std::stoull(valz));
490         } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
491             getConfiguration().setCompactionExpMemThreshold(std::stoull(valz));
492         } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
493             getConfiguration().setMutationMemThreshold(std::stoull(valz));
494         } else if (strcmp(keyz, "timing_log") == 0) {
495             EPStats& stats = getEpStats();
496             std::ostream* old = stats.timingLog;
497             stats.timingLog = NULL;
498             delete old;
499             if (strcmp(valz, "off") == 0) {
500                 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
501             } else {
502                 std::ofstream* tmp(new std::ofstream(valz));
503                 if (tmp->good()) {
504                     LOG(EXTENSION_LOG_INFO,
505                         "Logging detailed timings to ``%s''.", valz);
506                     stats.timingLog = tmp;
507                 } else {
508                     LOG(EXTENSION_LOG_WARNING,
509                         "Error setting detailed timing log to ``%s'':  %s",
510                         valz, strerror(errno));
511                     delete tmp;
512                 }
513             }
514         } else if (strcmp(keyz, "exp_pager_enabled") == 0) {
515             getConfiguration().setExpPagerEnabled(cb_stob(valz));
516         } else if (strcmp(keyz, "exp_pager_stime") == 0) {
517             getConfiguration().setExpPagerStime(std::stoull(valz));
518         } else if (strcmp(keyz, "exp_pager_initial_run_time") == 0) {
519             getConfiguration().setExpPagerInitialRunTime(std::stoll(valz));
520         } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
521             getConfiguration().requirementsMetOrThrow("access_scanner_enabled");
522             getConfiguration().setAccessScannerEnabled(cb_stob(valz));
523         } else if (strcmp(keyz, "alog_sleep_time") == 0) {
524             getConfiguration().requirementsMetOrThrow("alog_sleep_time");
525             getConfiguration().setAlogSleepTime(std::stoull(valz));
526         } else if (strcmp(keyz, "alog_task_time") == 0) {
527             getConfiguration().requirementsMetOrThrow("alog_task_time");
528             getConfiguration().setAlogTaskTime(std::stoull(valz));
529         } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
530             getConfiguration().setPagerActiveVbPcnt(std::stoull(valz));
531         } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
532             getConfiguration().setWarmupMinMemoryThreshold(std::stoull(valz));
533         } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
534             getConfiguration().setWarmupMinItemsThreshold(std::stoull(valz));
535         } else if (strcmp(keyz, "max_num_readers") == 0 ||
536                    strcmp(keyz, "num_reader_threads") == 0) {
537             size_t value = std::stoull(valz);
538             getConfiguration().setNumReaderThreads(value);
539             ExecutorPool::get()->setNumReaders(value);
540         } else if (strcmp(keyz, "max_num_writers") == 0 ||
541                    strcmp(keyz, "num_writer_threads") == 0) {
542             size_t value = std::stoull(valz);
543             getConfiguration().setNumWriterThreads(value);
544             ExecutorPool::get()->setNumWriters(value);
545         } else if (strcmp(keyz, "max_num_auxio") == 0 ||
546                    strcmp(keyz, "num_auxio_threads") == 0) {
547             size_t value = std::stoull(valz);
548             getConfiguration().setNumAuxioThreads(value);
549             ExecutorPool::get()->setNumAuxIO(value);
550         } else if (strcmp(keyz, "max_num_nonio") == 0 ||
551                    strcmp(keyz, "num_nonio_threads") == 0) {
552             size_t value = std::stoull(valz);
553             getConfiguration().setNumNonioThreads(value);
554             ExecutorPool::get()->setNumNonIO(value);
555         } else if (strcmp(keyz, "bfilter_enabled") == 0) {
556             getConfiguration().setBfilterEnabled(cb_stob(valz));
557         } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
558             getConfiguration().setBfilterResidencyThreshold(std::stof(valz));
559         } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
560             getConfiguration().setDefragmenterEnabled(cb_stob(valz));
561         } else if (strcmp(keyz, "defragmenter_interval") == 0) {
562             size_t v = std::stoull(valz);
563             // Adding separate validation as external limit is minimum 1
564             // to prevent setting defragmenter to constantly run
565             validate(v, size_t(1), std::numeric_limits<size_t>::max());
566             getConfiguration().setDefragmenterInterval(v);
567         } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
568             getConfiguration().setDefragmenterAgeThreshold(std::stoull(valz));
569         } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
570             getConfiguration().setDefragmenterChunkDuration(std::stoull(valz));
571         } else if (strcmp(keyz, "defragmenter_run") == 0) {
572             runDefragmenterTask();
573         } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
574             getConfiguration().setCompactionWriteQueueCap(std::stoull(valz));
575         } else if (strcmp(keyz, "dcp_min_compression_ratio") == 0) {
576             getConfiguration().setDcpMinCompressionRatio(std::stof(valz));
577         } else if (strcmp(keyz, "access_scanner_run") == 0) {
578             if (!(runAccessScannerTask())) {
579                 rv = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
580             }
581         } else if (strcmp(keyz, "vb_state_persist_run") == 0) {
582             runVbStatePersistTask(std::stoi(valz));
583         } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
584             getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
585             getConfiguration().setEphemeralFullPolicy(valz);
586         } else if (strcmp(keyz, "ephemeral_metadata_purge_age") == 0) {
587             getConfiguration().requirementsMetOrThrow(
588                     "ephemeral_metadata_purge_age");
589             getConfiguration().setEphemeralMetadataPurgeAge(std::stoull(valz));
590         } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
591             getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
592             getConfiguration().setEphemeralMetadataPurgeInterval(
593                     std::stoull(valz));
594         } else {
595             msg = "Unknown config param";
596             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
597         }
598         // Handles exceptions thrown by the cb_stob function
599     } catch (invalid_argument_bool& error) {
600         msg = error.what();
601         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
602
603         // Handles exceptions thrown by the standard
604         // library stoi/stoul style functions when not numeric
605     } catch (std::invalid_argument&) {
606         msg = "Argument was not numeric";
607         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
608
609         // Handles exceptions thrown by the standard library stoi/stoul
610         // style functions when the conversion does not fit in the datatype
611     } catch (std::out_of_range&) {
612         msg = "Argument was out of range";
613         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
614
615         // Handles any miscellaneous exceptions in addition to the range_error
616         // exceptions thrown by the configuration::set<param>() methods
617     } catch (std::exception& error) {
618         msg = error.what();
619         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
620     }
621
622     return rv;
623 }
624
625 protocol_binary_response_status EventuallyPersistentEngine::setDcpParam(
626         const char* keyz, const char* valz, std::string& msg) {
627     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
628     try {
629
630         if (strcmp(keyz,
631                    "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
632             size_t v = atoi(valz);
633             checkNumeric(valz);
634             validate(v, size_t(1), std::numeric_limits<size_t>::max());
635             getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(
636                     v);
637         } else if (
638             strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") ==
639             0) {
640             size_t v = atoi(valz);
641             checkNumeric(valz);
642             validate(v, size_t(1), std::numeric_limits<size_t>::max());
643             getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(
644                     v);
645         } else {
646             msg = "Unknown config param";
647             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
648         }
649     } catch (std::runtime_error& ex) {
650         msg = "Value out of range.";
651         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
652     }
653
654     return rv;
655 }
656
657 protocol_binary_response_status EventuallyPersistentEngine::setVbucketParam(
658         uint16_t vbucket,
659         const char* keyz,
660         const char* valz,
661         std::string& msg) {
662     protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
663     try {
664         if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
665             uint64_t v = std::strtoull(valz, nullptr, 10);
666             checkNumeric(valz);
667             getConfiguration().setHlcDriftAheadThresholdUs(v);
668         } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
669             uint64_t v = std::strtoull(valz, nullptr, 10);
670             checkNumeric(valz);
671             getConfiguration().setHlcDriftBehindThresholdUs(v);
672         } else if (strcmp(keyz, "max_cas") == 0) {
673             uint64_t v = std::strtoull(valz, nullptr, 10);
674             checkNumeric(valz);
675             LOG(EXTENSION_LOG_WARNING, "setVbucketParam: max_cas:%" PRIu64 " "
676                 "vb:%" PRIu16 "\n", v, vbucket);
677             if (getKVBucket()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
678                 rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
679                 msg = "Not my vbucket";
680             }
681         } else {
682             msg = "Unknown config param";
683             rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
684         }
685     } catch (std::runtime_error& ex) {
686         msg = "Value out of range.";
687         rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
688     }
689     return rv;
690 }
691
692 static protocol_binary_response_status evictKey(
693     EventuallyPersistentEngine* e,
694     protocol_binary_request_header
695     * request,
696     const char** msg,
697     size_t* msg_size,
698     DocNamespace docNamespace) {
699     protocol_binary_request_no_extras* req =
700         (protocol_binary_request_no_extras*)request;
701
702     const uint8_t* keyPtr = reinterpret_cast<const uint8_t*>(request) +
703                             sizeof(*request);
704     size_t keylen = ntohs(req->message.header.request.keylen);
705     uint16_t vbucket = ntohs(request->request.vbucket);
706
707     LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key{%.*s}\n",
708         int(keylen), keyPtr);
709     msg_size = 0;
710     auto rv = e->evictKey(DocKey(keyPtr, keylen, docNamespace), vbucket, msg);
711     if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
712         rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
713         if (e->isDegradedMode()) {
714             return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
715         }
716     }
717     return rv;
718 }
719
720 protocol_binary_response_status EventuallyPersistentEngine::setParam(
721         protocol_binary_request_set_param* req, std::string& msg) {
722     size_t keylen = ntohs(req->message.header.request.keylen);
723     uint8_t extlen = req->message.header.request.extlen;
724     size_t vallen = ntohl(req->message.header.request.bodylen);
725     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
726     protocol_binary_engine_param_t paramtype =
727         static_cast<protocol_binary_engine_param_t>(ntohl(
728             req->message.body.param_type));
729
730     if (keylen == 0 || (vallen - keylen - extlen) == 0) {
731         return PROTOCOL_BINARY_RESPONSE_EINVAL;
732     }
733
734     const char* keyp = reinterpret_cast<const char*>(req->bytes)
735                        + sizeof(req->bytes);
736     const char* valuep = keyp + keylen;
737     vallen -= (keylen + extlen);
738
739     char keyz[128];
740     char valz[512];
741
742     // Read the key.
743     if (keylen >= sizeof(keyz)) {
744         msg = "Key is too large.";
745         return PROTOCOL_BINARY_RESPONSE_EINVAL;
746     }
747     memcpy(keyz, keyp, keylen);
748     keyz[keylen] = 0x00;
749
750     // Read the value.
751     if (vallen >= sizeof(valz)) {
752         msg = "Value is too large.";
753         return PROTOCOL_BINARY_RESPONSE_EINVAL;
754     }
755     memcpy(valz, valuep, vallen);
756     valz[vallen] = 0x00;
757
758     protocol_binary_response_status rv;
759
760     switch (paramtype) {
761     case protocol_binary_engine_param_flush:
762         rv = setFlushParam(keyz, valz, msg);
763         break;
764     case protocol_binary_engine_param_tap:
765         rv = setTapParam(keyz, valz, msg);
766         break;
767     case protocol_binary_engine_param_checkpoint:
768         rv = setCheckpointParam(keyz, valz, msg);
769         break;
770     case protocol_binary_engine_param_dcp:
771         rv = setDcpParam(keyz, valz, msg);
772         break;
773     case protocol_binary_engine_param_vbucket:
774         rv = setVbucketParam(vbucket, keyz, valz, msg);
775         break;
776     default:
777         rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
778     }
779
780     return rv;
781 }
782
783 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine* e,
784                                     const void* cookie,
785                                     protocol_binary_request_header* request,
786                                     ADD_RESPONSE response) {
787     protocol_binary_request_get_vbucket* req =
788         reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
789     if (req == nullptr) {
790         throw std::invalid_argument("getVBucket: Unable to convert req"
791                                         " to protocol_binary_request_get_vbucket");
792     }
793
794     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
795     VBucketPtr vb = e->getVBucket(vbucket);
796     if (!vb) {
797         return e->sendNotMyVBucketResponse(response, cookie, 0);
798     } else {
799         vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
800         return sendResponse(response, NULL, 0, NULL, 0, &state,
801                             sizeof(state),
802                             PROTOCOL_BINARY_RAW_BYTES,
803                             PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
804     }
805 }
806
807 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine* e,
808                                     const void* cookie,
809                                     protocol_binary_request_header* request,
810                                     ADD_RESPONSE response) {
811
812     protocol_binary_request_set_vbucket* req =
813         reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
814
815     uint64_t cas = ntohll(req->message.header.request.cas);
816
817     size_t bodylen = ntohl(req->message.header.request.bodylen)
818                      - ntohs(req->message.header.request.keylen);
819     if (bodylen != sizeof(vbucket_state_t)) {
820         const std::string msg("Incorrect packet format");
821         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
822                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
823                             PROTOCOL_BINARY_RESPONSE_EINVAL,
824                             cas, cookie);
825     }
826
827     vbucket_state_t state;
828     memcpy(&state, &req->message.body.state, sizeof(state));
829     state = static_cast<vbucket_state_t>(ntohl(state));
830
831     if (!is_valid_vbucket_state_t(state)) {
832         const std::string msg("Invalid vbucket state");
833         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
834                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
835                             PROTOCOL_BINARY_RESPONSE_EINVAL,
836                             cas, cookie);
837     }
838
839     uint16_t vb = ntohs(req->message.header.request.vbucket);
840     if (e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
841         const std::string msg("VBucket number too big");
842         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
843                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
844                             PROTOCOL_BINARY_RESPONSE_ERANGE,
845                             cas, cookie);
846     }
847     return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
848                         PROTOCOL_BINARY_RAW_BYTES,
849                         PROTOCOL_BINARY_RESPONSE_SUCCESS,
850                         cas, cookie);
851 }
852
853 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine* e,
854                                     const void* cookie,
855                                     protocol_binary_request_header* req,
856                                     ADD_RESPONSE response) {
857
858     uint64_t cas = ntohll(req->request.cas);
859
860     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
861     uint16_t vbucket = ntohs(req->request.vbucket);
862
863     std::string msg = "";
864     if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
865         msg = "Incorrect packet format.";
866         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
867                             msg.length(),
868                             PROTOCOL_BINARY_RAW_BYTES,
869                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
870     }
871
872     bool sync = false;
873     uint32_t bodylen = ntohl(req->request.bodylen);
874     if (bodylen > 0) {
875         const char* ptr = reinterpret_cast<const char*>(req->bytes) +
876                           sizeof(req->bytes);
877         if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
878             sync = true;
879         }
880     }
881
882     ENGINE_ERROR_CODE err;
883     void* es = e->getEngineSpecific(cookie);
884     if (sync) {
885         if (es == NULL) {
886             err = e->deleteVBucket(vbucket, cookie);
887             e->storeEngineSpecific(cookie, e);
888         } else {
889             e->storeEngineSpecific(cookie, NULL);
890             LOG(EXTENSION_LOG_INFO,
891                 "Completed sync deletion of vbucket %u",
892                 (unsigned)vbucket);
893             err = ENGINE_SUCCESS;
894         }
895     } else {
896         err = e->deleteVBucket(vbucket);
897     }
898     switch (err) {
899     case ENGINE_SUCCESS:
900         LOG(EXTENSION_LOG_NOTICE,
901             "Deletion of vbucket %d was completed.", vbucket);
902         break;
903     case ENGINE_NOT_MY_VBUCKET:
904         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
905             "because the vbucket doesn't exist!!!", vbucket);
906         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
907         break;
908     case ENGINE_EINVAL:
909         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
910             "because the vbucket is not in a dead state\n", vbucket);
911         msg = "Failed to delete vbucket.  Must be in the dead state.";
912         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
913         break;
914     case ENGINE_EWOULDBLOCK:
915         LOG(EXTENSION_LOG_NOTICE, "Request for vbucket %d deletion is in"
916                 " EWOULDBLOCK until the database file is removed from disk",
917             vbucket);
918         e->storeEngineSpecific(cookie, req);
919         return ENGINE_EWOULDBLOCK;
920     default:
921         LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
922             "because of unknown reasons\n", vbucket);
923         msg = "Failed to delete vbucket.  Unknown reason.";
924         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
925     }
926
927     if (err != ENGINE_NOT_MY_VBUCKET) {
928         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
929                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
930                             res, cas, cookie);
931     } else {
932         return e->sendNotMyVBucketResponse(response, cookie, cas);
933     }
934 }
935
936 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine* e,
937                                        protocol_binary_request_header* request,
938                                        const void* cookie,
939                                        Item** it,
940                                        const char** msg,
941                                        protocol_binary_response_status* res,
942                                        DocNamespace docNamespace) {
943     KVBucketIface* kvb = e->getKVBucket();
944     protocol_binary_request_no_extras* req =
945         (protocol_binary_request_no_extras*)request;
946     int keylen = ntohs(req->message.header.request.keylen);
947     uint16_t vbucket = ntohs(req->message.header.request.vbucket);
948     ENGINE_ERROR_CODE error_code;
949     DocKey key(reinterpret_cast<const uint8_t*>(request) + sizeof(*request),
950                keylen, docNamespace);
951
952     GetValue rv(kvb->getReplica(key, vbucket, cookie));
953
954     if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
955         if (error_code == ENGINE_NOT_MY_VBUCKET) {
956             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
957             return error_code;
958         } else if (error_code == ENGINE_TMPFAIL) {
959             *msg = "NOT_FOUND";
960             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
961         } else {
962             return error_code;
963         }
964     } else {
965         *it = rv.getValue();
966         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
967     }
968     ++(e->getEpStats().numOpsGet);
969     return ENGINE_SUCCESS;
970 }
971
972 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine* e,
973                                    const void* cookie,
974                                    protocol_binary_request_compact_db* req,
975                                    ADD_RESPONSE response) {
976
977     std::string msg = "";
978     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
979     compaction_ctx compactreq;
980     uint64_t cas = ntohll(req->message.header.request.cas);
981
982     if (ntohs(req->message.header.request.keylen) > 0 ||
983         req->message.header.request.extlen != 24) {
984         LOG(EXTENSION_LOG_WARNING,
985             "Compaction received bad ext/key len %d/%d.",
986             req->message.header.request.extlen,
987             ntohs(req->message.header.request.keylen));
988         msg = "Incorrect packet format.";
989         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
990                             msg.length(),
991                             PROTOCOL_BINARY_RAW_BYTES,
992                             PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
993     }
994     EPStats& stats = e->getEpStats();
995     compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
996     compactreq.purge_before_seq =
997         ntohll(req->message.body.purge_before_seq);
998     compactreq.drop_deletes = req->message.body.drop_deletes;
999     compactreq.db_file_id = e->getKVBucket()->getDBFileId(*req);
1000     uint16_t vbid = ntohs(req->message.header.request.vbucket);
1001
1002     ENGINE_ERROR_CODE err;
1003     void* es = e->getEngineSpecific(cookie);
1004     if (es == NULL) {
1005         ++stats.pendingCompactions;
1006         e->storeEngineSpecific(cookie, e);
1007         err = e->compactDB(vbid, compactreq, cookie);
1008     } else {
1009         e->storeEngineSpecific(cookie, NULL);
1010         err = ENGINE_SUCCESS;
1011     }
1012
1013     switch (err) {
1014     case ENGINE_SUCCESS:
1015         LOG(EXTENSION_LOG_NOTICE,
1016             "Compaction of db file id: %d completed.", compactreq.db_file_id);
1017         break;
1018     case ENGINE_NOT_MY_VBUCKET:
1019         --stats.pendingCompactions;
1020         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1021             "because the db file doesn't exist!!!", compactreq.db_file_id);
1022         res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1023         break;
1024     case ENGINE_EINVAL:
1025         --stats.pendingCompactions;
1026         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1027             "because of an invalid argument", compactreq.db_file_id);
1028         res = PROTOCOL_BINARY_RESPONSE_EINVAL;
1029         break;
1030     case ENGINE_EWOULDBLOCK:
1031         LOG(EXTENSION_LOG_NOTICE,
1032             "Compaction of db file id: %d scheduled "
1033                 "(awaiting completion).", compactreq.db_file_id);
1034         e->storeEngineSpecific(cookie, req);
1035         return ENGINE_EWOULDBLOCK;
1036     case ENGINE_TMPFAIL:
1037         LOG(EXTENSION_LOG_WARNING, "Request to compact db file id: %d hit"
1038                 " a temporary failure and may need to be retried",
1039             compactreq.db_file_id);
1040         msg = "Temporary failure in compacting db file.";
1041         res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1042         break;
1043     default:
1044         --stats.pendingCompactions;
1045         LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1046             "because of unknown reasons\n", compactreq.db_file_id);
1047         msg = "Failed to compact db file.  Unknown reason.";
1048         res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1049         break;
1050     }
1051
1052     if (err != ENGINE_NOT_MY_VBUCKET) {
1053         return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1054                             msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1055                             res, cas, cookie);
1056     } else {
1057         return e->sendNotMyVBucketResponse(response, cookie, cas);
1058     }
1059 }
1060
1061 static ENGINE_ERROR_CODE processUnknownCommand(
1062     EventuallyPersistentEngine* h,
1063     const void* cookie,
1064     protocol_binary_request_header* request,
1065     ADD_RESPONSE response,
1066     DocNamespace docNamespace) {
1067     protocol_binary_response_status res =
1068         PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1069     std::string dynamic_msg;
1070     const char* msg = NULL;
1071     size_t msg_size = 0;
1072     Item* itm = NULL;
1073
1074     EPStats& stats = h->getEpStats();
1075     ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1076
1077     /**
1078      * Session validation
1079      * (For ns_server commands only)
1080      */
1081     switch (request->request.opcode) {
1082     case PROTOCOL_BINARY_CMD_SET_PARAM:
1083     case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1084     case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1085     case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1086     case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1087     case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1088     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1089         if (h->getEngineSpecific(cookie) == NULL) {
1090             uint64_t cas = ntohll(request->request.cas);
1091             if (!h->validateSessionCas(cas)) {
1092                 const std::string message("Invalid session token");
1093                 return sendResponse(response, NULL, 0, NULL, 0,
1094                                     message.c_str(), message.length(),
1095                                     PROTOCOL_BINARY_RAW_BYTES,
1096                                     PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1097                                     cas, cookie);
1098             }
1099         }
1100         break;
1101     }
1102     default:
1103         break;
1104     }
1105
1106     switch (request->request.opcode) {
1107     case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1108         return h->getAllVBucketSequenceNumbers(cookie, request, response);
1109
1110     case PROTOCOL_BINARY_CMD_GET_VBUCKET: {
1111         BlockTimer timer(&stats.getVbucketCmdHisto);
1112         rv = getVBucket(h, cookie, request, response);
1113         return rv;
1114     }
1115     case PROTOCOL_BINARY_CMD_DEL_VBUCKET: {
1116         BlockTimer timer(&stats.delVbucketCmdHisto);
1117         rv = delVBucket(h, cookie, request, response);
1118         if (rv != ENGINE_EWOULDBLOCK) {
1119             h->decrementSessionCtr();
1120             h->storeEngineSpecific(cookie, NULL);
1121         }
1122         return rv;
1123     }
1124     case PROTOCOL_BINARY_CMD_SET_VBUCKET: {
1125         BlockTimer timer(&stats.setVbucketCmdHisto);
1126         rv = setVBucket(h, cookie, request, response);
1127         h->decrementSessionCtr();
1128         return rv;
1129     }
1130     case PROTOCOL_BINARY_CMD_TOUCH:
1131     case PROTOCOL_BINARY_CMD_GAT:
1132     case PROTOCOL_BINARY_CMD_GATQ: {
1133         rv = h->touch(cookie, request, response, docNamespace);
1134         return rv;
1135     }
1136     case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1137         res = h->stopFlusher(&msg, &msg_size);
1138         break;
1139     case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1140         res = h->startFlusher(&msg, &msg_size);
1141         break;
1142     case PROTOCOL_BINARY_CMD_SET_PARAM:
1143         res = h->setParam(
1144                 reinterpret_cast<protocol_binary_request_set_param*>(request),
1145                 dynamic_msg);
1146         msg = dynamic_msg.c_str();
1147         msg_size = dynamic_msg.length();
1148         h->decrementSessionCtr();
1149         break;
1150     case PROTOCOL_BINARY_CMD_EVICT_KEY:
1151         res = evictKey(h, request, &msg, &msg_size, docNamespace);
1152         break;
1153     case PROTOCOL_BINARY_CMD_OBSERVE:
1154         return h->observe(cookie, request, response, docNamespace);
1155     case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1156         return h->observe_seqno(cookie, request, response);
1157     case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT: {
1158         rv = h->deregisterTapClient(cookie, request, response);
1159         h->decrementSessionCtr();
1160         return rv;
1161     }
1162     case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN: {
1163         rv = h->resetReplicationChain(cookie, request, response);
1164         return rv;
1165     }
1166     case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER: {
1167         rv = h->changeTapVBFilter(cookie, request, response);
1168         h->decrementSessionCtr();
1169         return rv;
1170     }
1171     case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1172     case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1173     case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE: {
1174         rv = h->handleCheckpointCmds(cookie, request, response);
1175         return rv;
1176     }
1177     case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE: {
1178         rv = h->handleSeqnoCmds(cookie, request, response);
1179         return rv;
1180     }
1181     case PROTOCOL_BINARY_CMD_GET_META:
1182     case PROTOCOL_BINARY_CMD_GETQ_META: {
1183         rv = h->getMeta(cookie,
1184                         reinterpret_cast<protocol_binary_request_get_meta*>
1185                         (request), response,
1186                         docNamespace);
1187         return rv;
1188     }
1189     case PROTOCOL_BINARY_CMD_SET_WITH_META:
1190     case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1191     case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1192     case PROTOCOL_BINARY_CMD_ADDQ_WITH_META: {
1193         rv = h->setWithMeta(cookie,
1194                             reinterpret_cast<protocol_binary_request_set_with_meta*>
1195                             (request), response,
1196                             docNamespace);
1197         return rv;
1198     }
1199     case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1200     case PROTOCOL_BINARY_CMD_DELQ_WITH_META: {
1201         rv = h->deleteWithMeta(cookie,
1202                                reinterpret_cast<protocol_binary_request_delete_with_meta*>
1203                                (request), response,
1204                                docNamespace);
1205         return rv;
1206     }
1207     case PROTOCOL_BINARY_CMD_RETURN_META: {
1208         return h->returnMeta(cookie,
1209                              reinterpret_cast<protocol_binary_request_return_meta*>
1210                              (request), response,
1211                              docNamespace);
1212     }
1213     case PROTOCOL_BINARY_CMD_GET_REPLICA:
1214         rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res, docNamespace);
1215         if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1216             return rv;
1217         }
1218         break;
1219     case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1220     case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC: {
1221         rv = h->handleTrafficControlCmd(cookie, request, response);
1222         return rv;
1223     }
1224     case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG: {
1225         rv = h->setClusterConfig(cookie,
1226                                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1227                                  (request), response);
1228         h->decrementSessionCtr();
1229         return rv;
1230     }
1231     case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1232         return h->getClusterConfig(cookie,
1233                                    reinterpret_cast<protocol_binary_request_get_cluster_config*>
1234                                    (request), response);
1235     case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1236         rv = compactDB(h, cookie,
1237                        (protocol_binary_request_compact_db*)(request),
1238                        response);
1239         if (rv != ENGINE_EWOULDBLOCK) {
1240             h->decrementSessionCtr();
1241             h->storeEngineSpecific(cookie, NULL);
1242         }
1243         return rv;
1244     }
1245     case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY: {
1246         if (request->request.extlen != 0 ||
1247             request->request.keylen != 0 ||
1248             request->request.bodylen != 0) {
1249             return ENGINE_EINVAL;
1250         }
1251         return h->getRandomKey(cookie, response);
1252     }
1253     case PROTOCOL_BINARY_CMD_GET_KEYS: {
1254         return h->getAllKeys(cookie,
1255                              reinterpret_cast<protocol_binary_request_get_keys*>
1256                              (request), response,
1257                              docNamespace);
1258     }
1259         // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
1260     case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1261     case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE: {
1262         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
1263                             PROTOCOL_BINARY_RAW_BYTES,
1264                             PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
1265                             cookie);
1266     }
1267     }
1268
1269     if (itm) {
1270         uint32_t flags = itm->getFlags();
1271         rv = sendResponse(response,
1272                           static_cast<const void*>(itm->getKey().data()),
1273                           itm->getKey().size(),
1274                           (const void*)&flags, sizeof(uint32_t),
1275                           static_cast<const void*>(itm->getData()),
1276                           itm->getNBytes(), itm->getDataType(),
1277                           static_cast<uint16_t>(res), itm->getCas(),
1278                           cookie);
1279         delete itm;
1280     } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1281         return h->sendNotMyVBucketResponse(response, cookie, 0);
1282     } else {
1283         msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1284         rv = sendResponse(response, NULL, 0, NULL, 0,
1285                           msg, static_cast<uint16_t>(msg_size),
1286                           PROTOCOL_BINARY_RAW_BYTES,
1287                           static_cast<uint16_t>(res), 0, cookie);
1288
1289     }
1290     return rv;
1291 }
1292
1293 static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1294                                            const void* cookie,
1295                                            protocol_binary_request_header
1296                                            * request,
1297                                            ADD_RESPONSE response,
1298                                            DocNamespace doc_namespace) {
1299     auto engine = acquireEngine(handle);
1300     auto ret = processUnknownCommand(
1301             engine.get(), cookie, request, response, doc_namespace);
1302     return ret;
1303 }
1304
1305 static void EvpItemSetCas(ENGINE_HANDLE*, const void*,
1306                           item* itm, uint64_t cas) {
1307     static_cast<Item*>(itm)->setCas(cas);
1308 }
1309
1310 static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1311                                       const void* cookie,
1312                                       void* engine_specific,
1313                                       uint16_t nengine,
1314                                       uint8_t ttl,
1315                                       uint16_t tap_flags,
1316                                       tap_event_t tap_event,
1317                                       uint32_t tap_seqno,
1318                                       const void* key,
1319                                       size_t nkey,
1320                                       uint32_t flags,
1321                                       uint32_t exptime,
1322                                       uint64_t cas,
1323                                       uint8_t datatype,
1324                                       const void* data,
1325                                       size_t ndata,
1326                                       uint16_t vbucket) {
1327     if (!mcbp::datatype::is_valid(datatype)) {
1328         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1329             " (TapNotify)");
1330         return ENGINE_EINVAL;
1331     }
1332
1333     return acquireEngine(handle)->tapNotify(cookie,
1334                                             engine_specific,
1335                                             nengine,
1336                                             ttl,
1337                                             tap_flags,
1338                                             (uint16_t)tap_event,
1339                                             tap_seqno,
1340                                             key,
1341                                             nkey,
1342                                             flags,
1343                                             exptime,
1344                                             cas,
1345                                             datatype,
1346                                             data,
1347                                             ndata,
1348                                             vbucket);
1349 }
1350
1351 static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1352                                   const void* cookie, item** itm,
1353                                   void** es, uint16_t* nes, uint8_t* ttl,
1354                                   uint16_t* flags, uint32_t* seqno,
1355                                   uint16_t* vbucket) {
1356     uint16_t tap_event = acquireEngine(handle)->walkTapQueue(
1357             cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1358     return static_cast<tap_event_t>(tap_event);
1359 }
1360
1361 static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1362                                       const void* cookie,
1363                                       const void* client,
1364                                       size_t nclient,
1365                                       uint32_t flags,
1366                                       const void* userdata,
1367                                       size_t nuserdata) {
1368     auto engine = acquireEngine(handle);
1369     TAP_ITERATOR iterator = NULL;
1370     {
1371         std::string c(static_cast<const char*>(client), nclient);
1372         // Figure out what we want from the userdata before adding it to
1373         // the API to the handle
1374         if (engine->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1375             iterator = EvpTapIterator;
1376         }
1377     }
1378     return iterator;
1379 }
1380
1381
1382 static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1383                                     const void* cookie,
1384                                     struct dcp_message_producers* producers) {
1385     auto engine = acquireEngine(handle);
1386     ConnHandler* conn = engine->getConnHandler(cookie);
1387     if (conn) {
1388         return conn->step(producers);
1389     }
1390     return ENGINE_DISCONNECT;
1391 }
1392
1393
1394 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1395                                     const void* cookie,
1396                                     uint32_t opaque,
1397                                     uint32_t seqno,
1398                                     uint32_t flags,
1399                                     void* name,
1400                                     uint16_t nname) {
1401     return acquireEngine(handle)->dcpOpen(
1402             cookie, opaque, seqno, flags, name, nname);
1403 }
1404
1405 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1406                                          const void* cookie,
1407                                          uint32_t opaque,
1408                                          uint16_t vbucket,
1409                                          uint32_t flags) {
1410     return acquireEngine(handle)->dcpAddStream(cookie, opaque, vbucket, flags);
1411 }
1412
1413 static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1414                                            const void* cookie,
1415                                            uint32_t opaque,
1416                                            uint16_t vbucket) {
1417     auto engine = acquireEngine(handle);
1418     ConnHandler* conn = engine->getConnHandler(cookie);
1419     if (conn) {
1420         return conn->closeStream(opaque, vbucket);
1421     }
1422     return ENGINE_DISCONNECT;
1423 }
1424
1425
1426 static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1427                                          const void* cookie,
1428                                          uint32_t flags,
1429                                          uint32_t opaque,
1430                                          uint16_t vbucket,
1431                                          uint64_t startSeqno,
1432                                          uint64_t endSeqno,
1433                                          uint64_t vbucketUuid,
1434                                          uint64_t snapStartSeqno,
1435                                          uint64_t snapEndSeqno,
1436                                          uint64_t* rollbackSeqno,
1437                                          dcp_add_failover_log callback) {
1438     auto engine = acquireEngine(handle);
1439     ConnHandler* conn = engine->getConnHandler(cookie);
1440     if (conn) {
1441         return conn->streamRequest(flags,
1442                                    opaque,
1443                                    vbucket,
1444                                    startSeqno,
1445                                    endSeqno,
1446                                    vbucketUuid,
1447                                    snapStartSeqno,
1448                                    snapEndSeqno,
1449                                    rollbackSeqno,
1450                                    callback);
1451     }
1452     return ENGINE_DISCONNECT;
1453 }
1454
1455 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1456                                               const void* cookie,
1457                                               uint32_t opaque,
1458                                               uint16_t vbucket,
1459                                               dcp_add_failover_log callback) {
1460     auto engine = acquireEngine(handle);
1461     ConnHandler* conn = engine->getConnHandler(cookie);
1462     if (conn) {
1463         return conn->getFailoverLog(opaque, vbucket, callback);
1464     }
1465     return ENGINE_DISCONNECT;
1466 }
1467
1468
1469 static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1470                                          const void* cookie,
1471                                          uint32_t opaque,
1472                                          uint16_t vbucket,
1473                                          uint32_t flags) {
1474     auto engine = acquireEngine(handle);
1475     ConnHandler* conn = engine->getConnHandler(cookie);
1476     if (conn) {
1477         return conn->streamEnd(opaque, vbucket, flags);
1478     }
1479     return ENGINE_DISCONNECT;
1480 }
1481
1482
1483 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1484                                               const void* cookie,
1485                                               uint32_t opaque,
1486                                               uint16_t vbucket,
1487                                               uint64_t start_seqno,
1488                                               uint64_t end_seqno,
1489                                               uint32_t flags) {
1490     auto engine = acquireEngine(handle);
1491     ConnHandler* conn = engine->getConnHandler(cookie);
1492     if (conn) {
1493         return conn->snapshotMarker(
1494                 opaque, vbucket, start_seqno, end_seqno, flags);
1495     }
1496     return ENGINE_DISCONNECT;
1497 }
1498
1499 static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1500                                         const void* cookie,
1501                                         uint32_t opaque,
1502                                         const DocKey& key,
1503                                         cb::const_byte_buffer value,
1504                                         size_t priv_bytes,
1505                                         uint8_t datatype,
1506                                         uint64_t cas,
1507                                         uint16_t vbucket,
1508                                         uint32_t flags,
1509                                         uint64_t by_seqno,
1510                                         uint64_t rev_seqno,
1511                                         uint32_t expiration,
1512                                         uint32_t lock_time,
1513                                         cb::const_byte_buffer meta,
1514                                         uint8_t nru) {
1515     if (!mcbp::datatype::is_valid(datatype)) {
1516         LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1517             " (DCPMutation)");
1518         return ENGINE_EINVAL;
1519     }
1520     auto engine = acquireEngine(handle);
1521     ConnHandler* conn = engine->getConnHandler(cookie);
1522     if (conn) {
1523         return conn->mutation(opaque, key, value, priv_bytes, datatype, cas,
1524                               vbucket, flags, by_seqno, rev_seqno, expiration,
1525                               lock_time, meta, nru);
1526     }
1527     return ENGINE_DISCONNECT;
1528 }
1529
1530 static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1531                                         const void* cookie,
1532                                         uint32_t opaque,
1533                                         const DocKey& key,
1534                                         cb::const_byte_buffer value,
1535                                         size_t priv_bytes,
1536                                         uint8_t datatype,
1537                                         uint64_t cas,
1538                                         uint16_t vbucket,
1539                                         uint64_t by_seqno,
1540                                         uint64_t rev_seqno,
1541                                         cb::const_byte_buffer meta) {
1542     auto engine = acquireEngine(handle);
1543     ConnHandler* conn = engine->getConnHandler(cookie);
1544     if (conn) {
1545         return conn->deletion(opaque, key, value, priv_bytes, datatype, cas,
1546                               vbucket, by_seqno, rev_seqno, meta);
1547     }
1548     return ENGINE_DISCONNECT;
1549 }
1550
1551 static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1552                                           const void* cookie,
1553                                           uint32_t opaque,
1554                                           const DocKey& key,
1555                                           cb::const_byte_buffer value,
1556                                           size_t priv_bytes,
1557                                           uint8_t datatype,
1558                                           uint64_t cas,
1559                                           uint16_t vbucket,
1560                                           uint64_t by_seqno,
1561                                           uint64_t rev_seqno,
1562                                           cb::const_byte_buffer meta) {
1563     auto engine = acquireEngine(handle);
1564     ConnHandler* conn = engine->getConnHandler(cookie);
1565     if (conn) {
1566         return conn->expiration(opaque, key, value, priv_bytes, datatype, cas,
1567                                 vbucket, by_seqno, rev_seqno, meta);
1568     }
1569     return ENGINE_DISCONNECT;
1570 }
1571
1572 static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1573                                      const void* cookie,
1574                                      uint32_t opaque,
1575                                      uint16_t vbucket) {
1576     auto engine = acquireEngine(handle);
1577     ConnHandler* conn = engine->getConnHandler(cookie);
1578     if (conn) {
1579         return conn->flushall(opaque, vbucket);
1580     }
1581     return ENGINE_DISCONNECT;
1582 }
1583
1584 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1585                                                const void* cookie,
1586                                                uint32_t opaque,
1587                                                uint16_t vbucket,
1588                                                vbucket_state_t state) {
1589     auto engine = acquireEngine(handle);
1590     ConnHandler* conn = engine->getConnHandler(cookie);
1591     if (conn) {
1592         return conn->setVBucketState(opaque, vbucket, state);
1593     }
1594     return ENGINE_DISCONNECT;
1595 }
1596
1597 static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1598                                     const void* cookie,
1599                                     uint32_t opaque) {
1600     auto engine = acquireEngine(handle);
1601     ConnHandler* conn = engine->getConnHandler(cookie);
1602     if (conn) {
1603         return conn->noop(opaque);
1604     }
1605     return ENGINE_DISCONNECT;
1606 }
1607
1608 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1609                                                      const void* cookie,
1610                                                      uint32_t opaque,
1611                                                      uint16_t vbucket,
1612                                                      uint32_t buffer_bytes) {
1613     auto engine = acquireEngine(handle);
1614     ConnHandler* conn = engine->getConnHandler(cookie);
1615     if (conn) {
1616         return conn->bufferAcknowledgement(opaque, vbucket, buffer_bytes);
1617     }
1618     return ENGINE_DISCONNECT;
1619 }
1620
1621 static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1622                                        const void* cookie,
1623                                        uint32_t opaque,
1624                                        const void* key,
1625                                        uint16_t nkey,
1626                                        const void* value,
1627                                        uint32_t nvalue) {
1628     auto engine = acquireEngine(handle);
1629     ConnHandler* conn = engine->getConnHandler(cookie);
1630     if (conn) {
1631         return conn->control(opaque, key, nkey, value, nvalue);
1632     }
1633     return ENGINE_DISCONNECT;
1634 }
1635
1636 static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1637                                                const void* cookie,
1638                                                protocol_binary_response_header* response) {
1639     auto engine = acquireEngine(handle);
1640     ConnHandler* conn = engine->getConnHandler(cookie);
1641     if (conn) {
1642         if (conn->handleResponse(response)) {
1643             return ENGINE_SUCCESS;
1644         }
1645     }
1646     return ENGINE_DISCONNECT;
1647 }
1648
1649 static void EvpHandleDisconnect(const void* cookie,
1650                                 ENGINE_EVENT_TYPE type,
1651                                 const void* event_data,
1652                                 const void* cb_data) {
1653     if (type != ON_DISCONNECT) {
1654         throw std::invalid_argument("EvpHandleDisconnect: type "
1655                                         "(which is" + std::to_string(type) +
1656                                     ") is not ON_DISCONNECT");
1657     }
1658     if (event_data != nullptr) {
1659         throw std::invalid_argument("EvpHandleDisconnect: event_data "
1660                                         "is not NULL");
1661     }
1662     void* c = const_cast<void*>(cb_data);
1663     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1664 }
1665
1666 static void EvpHandleDeleteBucket(const void* cookie,
1667                                   ENGINE_EVENT_TYPE type,
1668                                   const void* event_data,
1669                                   const void* cb_data) {
1670     if (type != ON_DELETE_BUCKET) {
1671         throw std::invalid_argument("EvpHandleDeleteBucket: type "
1672                                         "(which is" + std::to_string(type) +
1673                                     ") is not ON_DELETE_BUCKET");
1674     }
1675     if (event_data != nullptr) {
1676         throw std::invalid_argument("EvpHandleDeleteBucket: event_data "
1677                                         "is not NULL");
1678     }
1679     void* c = const_cast<void*>(cb_data);
1680     acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDeleteBucket(cookie);
1681 }
1682
1683 void EvpSetLogLevel(ENGINE_HANDLE* handle, EXTENSION_LOG_LEVEL level) {
1684     Logger::setGlobalLogLevel(level);
1685 }
1686
1687 /**
1688  * The only public interface to the eventually persistent engine.
1689  * Allocate a new instance and initialize it
1690  * @param interface the highest interface the server supports (we only
1691  *                  support interface 1)
1692  * @param get_server_api callback function to get the server exported API
1693  *                  functions
1694  * @param handle Where to return the new instance
1695  * @return ENGINE_SUCCESS on success
1696  */
1697 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1698                                   GET_SERVER_API get_server_api,
1699                                   ENGINE_HANDLE** handle) {
1700     SERVER_HANDLE_V1* api = get_server_api();
1701     if (interface != 1 || api == NULL) {
1702         return ENGINE_ENOTSUP;
1703     }
1704
1705     Logger::setLoggerAPI(api->log);
1706
1707     MemoryTracker::getInstance(*api->alloc_hooks);
1708     ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1709
1710     std::atomic<size_t>* inital_tracking = new std::atomic<size_t>();
1711
1712     ObjectRegistry::setStats(inital_tracking);
1713     EventuallyPersistentEngine* engine;
1714     engine = new EventuallyPersistentEngine(get_server_api);
1715     ObjectRegistry::setStats(NULL);
1716
1717     if (engine == NULL) {
1718         return ENGINE_ENOMEM;
1719     }
1720
1721     if (MemoryTracker::trackingMemoryAllocations()) {
1722         engine->getEpStats().memoryTrackerEnabled.store(true);
1723         engine->getEpStats().totalMemory->store(inital_tracking->load());
1724     }
1725     delete inital_tracking;
1726
1727     initialize_time_functions(api->core);
1728
1729     *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1730
1731     return ENGINE_SUCCESS;
1732 }
1733
1734 /*
1735     This method is called prior to unloading of the shared-object.
1736     Global clean-up should be performed from this method.
1737 */
1738 void destroy_engine() {
1739     ExecutorPool::shutdown();
1740     // A single MemoryTracker exists for *all* buckets
1741     // and must be destroyed before unloading the shared object.
1742     MemoryTracker::destroyInstance();
1743     ObjectRegistry::reset();
1744 }
1745
1746 static bool EvpGetItemInfo(ENGINE_HANDLE* handle, const void*,
1747                            const item* itm, item_info* itm_info) {
1748     const Item* it = reinterpret_cast<const Item*>(itm);
1749     auto engine = acquireEngine(handle);
1750     VBucketPtr vb = engine->getKVBucket()->getVBucket(it->getVBucketId());
1751     uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
1752     *itm_info = it->toItemInfo(vb_uuid);
1753     return true;
1754 }
1755
1756 static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1757                            item* itm, const item_info* itm_info) {
1758     Item* it = reinterpret_cast<Item*>(itm);
1759     if (!it) {
1760         return false;
1761     }
1762     it->setDataType(itm_info->datatype);
1763     return true;
1764 }
1765
1766 static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1767                                              const void* cookie,
1768                                              engine_get_vb_map_cb callback) {
1769     auto engine = acquireEngine(handle);
1770     LockHolder lh(engine->clusterConfig.lock);
1771     const char* config = engine->clusterConfig.config.data();
1772     uint32_t len = engine->clusterConfig.config.size();
1773     engine.reset(); // Want to release the engine before the callback
1774     return callback(cookie, config, len);
1775 }
1776
1777 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1778     va_list va;
1779     va_start(va, fmt);
1780     global_logger.vlog(severity, fmt, va);
1781     va_end(va);
1782 }
1783
1784 EventuallyPersistentEngine::EventuallyPersistentEngine(
1785         GET_SERVER_API get_server_api)
1786     : clusterConfig(),
1787       kvBucket(nullptr),
1788       workload(NULL),
1789       workloadPriority(NO_BUCKET_PRIORITY),
1790       replicationThrottle(NULL),
1791       getServerApiFunc(get_server_api),
1792       dcpConnMap_(NULL),
1793       dcpFlowControlManager_(NULL),
1794       tapConnMap(NULL),
1795       tapConfig(NULL),
1796       checkpointConfig(NULL),
1797       trafficEnabled(false),
1798       deleteAllEnabled(false),
1799       startupTime(0),
1800       taskable(this) {
1801     interface.interface = 1;
1802     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1803     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1804     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1805     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1806     ENGINE_HANDLE_V1::allocate_ex = EvpItemAllocateEx;
1807     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1808     ENGINE_HANDLE_V1::release = EvpItemRelease;
1809     ENGINE_HANDLE_V1::get = EvpGet;
1810     ENGINE_HANDLE_V1::get_if = EvpGetIf;
1811     ENGINE_HANDLE_V1::get_locked = EvpGetLocked;
1812     ENGINE_HANDLE_V1::unlock = EvpUnlock;
1813     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1814     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1815     ENGINE_HANDLE_V1::store = EvpStore;
1816     ENGINE_HANDLE_V1::flush = EvpFlush;
1817     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1818     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1819     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1820     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1821     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1822     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1823     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1824
1825     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1826     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1827     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1828     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1829     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1830     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1831     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1832     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1833     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1834     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1835     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1836     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1837     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1838     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1839     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1840     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1841     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1842     ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
1843
1844     serverApi = getServerApiFunc();
1845     memset(&info, 0, sizeof(info));
1846     info.info.description = "EP engine v" VERSION;
1847     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1848     info.info.features[info.info.num_features++].feature =
1849                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1850     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1851     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1852 }
1853
1854 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1855 {
1856     EventuallyPersistentEngine *epe =
1857                                     ObjectRegistry::onSwitchThread(NULL, true);
1858     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1859     ObjectRegistry::onSwitchThread(epe);
1860     return rv;
1861 }
1862
1863 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1864 {
1865     EventuallyPersistentEngine *epe =
1866                                     ObjectRegistry::onSwitchThread(NULL, true);
1867     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1868     ObjectRegistry::onSwitchThread(epe);
1869     return rv;
1870 }
1871
1872 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1873                                                         EVENT_CALLBACK cb,
1874                                                         const void *cb_data) {
1875     EventuallyPersistentEngine *epe =
1876                                     ObjectRegistry::onSwitchThread(NULL, true);
1877     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1878     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1879                             type, cb, cb_data);
1880     ObjectRegistry::onSwitchThread(epe);
1881 }
1882
1883 /**
1884  * A configuration value changed listener that responds to ep-engine
1885  * parameter changes by invoking engine-specific methods on
1886  * configuration change events.
1887  */
1888 class EpEngineValueChangeListener : public ValueChangedListener {
1889 public:
1890     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1891         // EMPTY
1892     }
1893
1894     virtual void sizeValueChanged(const std::string &key, size_t value) {
1895         if (key.compare("getl_max_timeout") == 0) {
1896             engine.setGetlMaxTimeout(value);
1897         } else if (key.compare("getl_default_timeout") == 0) {
1898             engine.setGetlDefaultTimeout(value);
1899         } else if (key.compare("max_item_size") == 0) {
1900             engine.setMaxItemSize(value);
1901         } else if (key.compare("max_item_privileged_bytes") == 0) {
1902             engine.setMaxItemPrivilegedBytes(value);
1903         }
1904     }
1905
1906     virtual void booleanValueChanged(const std::string &key, bool value) {
1907         if (key.compare("flushall_enabled") == 0) {
1908             engine.setDeleteAll(value);
1909         }
1910     }
1911 private:
1912     EventuallyPersistentEngine &engine;
1913 };
1914
1915
1916
1917 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1918     resetStats();
1919     if (config != NULL) {
1920         if (!configuration.parseConfiguration(config, serverApi)) {
1921             LOG(EXTENSION_LOG_NOTICE, "Failed to parse the configuration config "
1922                 "during bucket initialization.  config=%s", config);
1923             return ENGINE_FAILED;
1924         }
1925     }
1926
1927     name = configuration.getCouchBucket();
1928     maxFailoverEntries = configuration.getMaxFailoverEntries();
1929
1930     // Start updating the variables from the config!
1931     HashTable::setDefaultNumBuckets(configuration.getHtSize());
1932     HashTable::setDefaultNumLocks(configuration.getHtLocks());
1933     StoredValue::setMutationMemoryThreshold(
1934                                       configuration.getMutationMemThreshold());
1935
1936     if (configuration.getMaxSize() == 0) {
1937         configuration.setMaxSize(std::numeric_limits<size_t>::max());
1938     }
1939
1940     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1941         stats.mem_low_wat_percent.store(0.75);
1942         configuration.setMemLowWat(percentOf(
1943                 configuration.getMaxSize(), stats.mem_low_wat_percent.load()));
1944     }
1945
1946     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1947         stats.mem_high_wat_percent.store(0.85);
1948         configuration.setMemHighWat(percentOf(
1949                 configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
1950     }
1951
1952     maxItemSize = configuration.getMaxItemSize();
1953     configuration.addValueChangedListener("max_item_size",
1954                                        new EpEngineValueChangeListener(*this));
1955
1956     maxItemPrivilegedBytes = configuration.getMaxItemPrivilegedBytes();
1957     configuration.addValueChangedListener(
1958             "max_item_privileged_bytes",
1959             new EpEngineValueChangeListener(*this));
1960
1961     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1962     configuration.addValueChangedListener("getl_default_timeout",
1963                                        new EpEngineValueChangeListener(*this));
1964     getlMaxTimeout = configuration.getGetlMaxTimeout();
1965     configuration.addValueChangedListener("getl_max_timeout",
1966                                        new EpEngineValueChangeListener(*this));
1967
1968     deleteAllEnabled = configuration.isFlushallEnabled();
1969     configuration.addValueChangedListener("flushall_enabled",
1970                                        new EpEngineValueChangeListener(*this));
1971
1972     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
1973                                   configuration.getMaxNumShards());
1974     if ((unsigned int)workload->getNumShards() >
1975                                               configuration.getMaxVbuckets()) {
1976         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
1977             "equal or less than max number of vbuckets");
1978         return ENGINE_FAILED;
1979     }
1980
1981     dcpConnMap_ = new DcpConnMap(*this);
1982
1983     /* Get the flow control policy */
1984     std::string flowCtlPolicy = configuration.getDcpFlowControlPolicy();
1985
1986     if (!flowCtlPolicy.compare("static")) {
1987         dcpFlowControlManager_ = new DcpFlowControlManagerStatic(*this);
1988     } else if (!flowCtlPolicy.compare("dynamic")) {
1989         dcpFlowControlManager_ = new DcpFlowControlManagerDynamic(*this);
1990     } else if (!flowCtlPolicy.compare("aggressive")) {
1991         dcpFlowControlManager_ = new DcpFlowControlManagerAggressive(*this);
1992     } else {
1993         /* Flow control is not enabled */
1994         dcpFlowControlManager_ = new DcpFlowControlManager(*this);
1995     }
1996
1997     tapConnMap = new TapConnMap(*this);
1998     tapConfig = new TapConfig(*this);
1999     replicationThrottle = new ReplicationThrottle(configuration, stats);
2000     TapConfig::addConfigChangeListener(*this);
2001
2002     checkpointConfig = new CheckpointConfig(*this);
2003     CheckpointConfig::addConfigChangeListener(*this);
2004
2005     kvBucket = makeBucket(configuration);
2006
2007     initializeEngineCallbacks();
2008
2009     // Complete the initialization of the ep-store
2010     if (!kvBucket->initialize()) {
2011         return ENGINE_FAILED;
2012     }
2013
2014     if(configuration.isDataTrafficEnabled()) {
2015         enableTraffic(true);
2016     }
2017
2018     tapConnMap->initialize(TAP_CONN_NOTIFIER);
2019     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2020
2021     // record engine initialization time
2022     startupTime.store(ep_real_time());
2023
2024     LOG(EXTENSION_LOG_NOTICE,
2025         "EP Engine: Initialization of %s bucket complete",
2026         configuration.getBucketType().c_str());
2027
2028     return ENGINE_SUCCESS;
2029 }
2030
2031 void EventuallyPersistentEngine::destroy(bool force) {
2032     stats.forceShutdown = force;
2033     stats.isShutdown = true;
2034
2035     // Perform a snapshot of the stats before shutting down so we can persist
2036     // the type of shutdown (stats.forceShutdown), and consequently on the
2037     // next warmup can determine is there was a clean shutdown - see
2038     // Warmup::cleanShutdown
2039     if (kvBucket) {
2040         kvBucket->snapshotStats();
2041     }
2042     if (tapConnMap) {
2043         tapConnMap->shutdownAllConnections();
2044     }
2045     if (dcpConnMap_) {
2046         dcpConnMap_->shutdownAllConnections();
2047     }
2048 }
2049
2050 ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
2051         item** itm,
2052         const DocKey& key,
2053         const size_t nbytes,
2054         const size_t priv_nbytes,
2055         const int flags,
2056         const rel_time_t exptime,
2057         uint8_t datatype,
2058         uint16_t vbucket) {
2059     if (priv_nbytes > maxItemPrivilegedBytes) {
2060         return ENGINE_E2BIG;
2061     }
2062
2063     if ((nbytes - priv_nbytes) > maxItemSize) {
2064         return ENGINE_E2BIG;
2065     }
2066
2067     if (!hasAvailableSpace(sizeof(Item) + sizeof(Blob) + key.size() + nbytes)) {
2068         return memoryCondition();
2069     }
2070
2071     time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
2072
2073     uint8_t ext_meta[1];
2074     uint8_t ext_len = EXT_META_LEN;
2075     *(ext_meta) = datatype;
2076     *itm = new Item(key,
2077                     flags,
2078                     expiretime,
2079                     nullptr,
2080                     nbytes,
2081                     ext_meta,
2082                     ext_len,
2083                     0 /*cas*/,
2084                     -1 /*seq*/,
2085                     vbucket);
2086     if (*itm == NULL) {
2087         return memoryCondition();
2088     } else {
2089         stats.itemAllocSizeHisto.add(nbytes);
2090         return ENGINE_SUCCESS;
2091     }
2092 }
2093
2094 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
2095     if (!deleteAllEnabled) {
2096         return ENGINE_ENOTSUP;
2097     }
2098
2099     if (!isDegradedMode()) {
2100         return ENGINE_TMPFAIL;
2101     }
2102
2103     /*
2104      * Supporting only a SYNC operation for bucket flush
2105      */
2106
2107     void* es = getEngineSpecific(cookie);
2108     if (es == NULL) {
2109         // Check if diskDeleteAll was false and set it to true
2110         // if yes, if the atomic variable weren't false, then
2111         // we will assume that a deleteAll has been scheduled
2112         // already and return TMPFAIL.
2113         if (kvBucket->scheduleDeleteAllTask(cookie)) {
2114             storeEngineSpecific(cookie, this);
2115             return ENGINE_EWOULDBLOCK;
2116         } else {
2117             LOG(EXTENSION_LOG_INFO,
2118                 "Tried to trigger a bucket deleteAll, but"
2119                 "there seems to be a task running already!");
2120             return ENGINE_TMPFAIL;
2121         }
2122
2123     } else {
2124         storeEngineSpecific(cookie, NULL);
2125         LOG(EXTENSION_LOG_NOTICE, "Completed bucket deleteAll operation");
2126         return ENGINE_SUCCESS;
2127     }
2128 }
2129
2130 cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
2131                                                        const DocKey& key,
2132                                                        uint16_t vbucket,
2133                                                        std::function<bool(const item_info&)>filter) {
2134
2135     auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2136
2137     // Fetch an item from the hashtable (without trying to schedule a bg-fetch
2138     // and pass it through the filter. If the filter accepts the document
2139     // based on the metadata, return the document. If the document's data
2140     // isn't resident we run another iteration in the loop and retries the
2141     // action but this time we _do_ schedule a bg-fetch.
2142     for (int ii = 0; ii < 2; ++ii) {
2143         auto options = static_cast<get_options_t>(HONOR_STATES |
2144                                                   TRACK_REFERENCE |
2145                                                   DELETE_TEMP |
2146                                                   HIDE_LOCKED_CAS |
2147                                                   ALLOW_META_ONLY);
2148         if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
2149             options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
2150         }
2151
2152         BlockTimer timer(&stats.getCmdHisto);
2153         GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2154         ENGINE_ERROR_CODE status = gv.getStatus();
2155
2156         switch (status) {
2157         case ENGINE_SUCCESS:
2158             break;
2159
2160         case ENGINE_KEY_ENOENT: // FALLTHROUGH
2161         case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
2162             if (isDegradedMode()) {
2163                 status = ENGINE_TMPFAIL;
2164             }
2165             // FALLTHROUGH
2166         default:
2167             return std::make_pair(cb::engine_errc(status),
2168                                   cb::unique_item_ptr{nullptr,
2169                                                       cb::ItemDeleter{handle}});
2170         }
2171
2172         auto* item = gv.getValue();
2173         cb::unique_item_ptr ret{item, cb::ItemDeleter{handle}};
2174
2175         const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
2176         const uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
2177
2178         // Currently
2179         if (filter(item->toItemInfo(vb_uuid))) {
2180             if (!gv.isPartial()) {
2181                 return std::make_pair(cb::engine_errc::success,
2182                                cb::unique_item_ptr{ret.release(),
2183                                                    cb::ItemDeleter{handle}});
2184             }
2185             // We want this item, but we need to fetch it off disk
2186         } else {
2187             // the client don't care about this thing..
2188             ret.reset(nullptr);
2189             return std::make_pair(cb::engine_errc::success,
2190                                   cb::unique_item_ptr{ret.release(),
2191                                                       cb::ItemDeleter{handle}});
2192         }
2193     }
2194
2195     // It should not be possible to get as the second iteration in the loop
2196     // SHOULD handle backround fetches an the item should NOT be partial!
2197     throw std::logic_error("EventuallyPersistentEngine::get_if: loop terminated");
2198 }
2199
2200 ENGINE_ERROR_CODE EventuallyPersistentEngine::get_locked(const void* cookie,
2201                                                          item** itm,
2202                                                          const DocKey& key,
2203                                                          uint16_t vbucket,
2204                                                          uint32_t lock_timeout) {
2205
2206     auto default_timeout = static_cast<uint32_t>(getGetlDefaultTimeout());
2207
2208     if (lock_timeout == 0) {
2209         lock_timeout = default_timeout;
2210     } else if (lock_timeout > static_cast<uint32_t>(getGetlMaxTimeout())) {
2211         LOG(EXTENSION_LOG_WARNING,
2212             "EventuallyPersistentEngine::get_locked: "
2213             "Illegal value for lock timeout specified %u. "
2214             "Using default value: %u", lock_timeout, default_timeout);
2215         lock_timeout = default_timeout;
2216     }
2217
2218     auto result = kvBucket->getLocked(key, vbucket, ep_current_time(),
2219                                       lock_timeout, cookie);
2220
2221     if (result.getStatus() == ENGINE_SUCCESS) {
2222         ++stats.numOpsGet;
2223         *itm = result.getValue();
2224     }
2225
2226     return result.getStatus();
2227 }
2228
2229 ENGINE_ERROR_CODE EventuallyPersistentEngine::unlock(const void* cookie,
2230                                                      const DocKey& key,
2231                                                      uint16_t vbucket,
2232                                                      uint64_t cas) {
2233     return kvBucket->unlockKey(key, vbucket, cas, ep_current_time());
2234 }
2235
2236
2237 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2238                                                     item* itm,
2239                                                     uint64_t *cas,
2240                                                     ENGINE_STORE_OPERATION
2241                                                                      operation) {
2242     BlockTimer timer(&stats.storeCmdHisto);
2243     ENGINE_ERROR_CODE ret;
2244     Item *it = static_cast<Item*>(itm);
2245
2246     switch (operation) {
2247     case OPERATION_CAS:
2248         if (it->getCas() == 0) {
2249             // Using a cas command with a cas wildcard doesn't make sense
2250             ret = ENGINE_NOT_STORED;
2251             break;
2252         }
2253         // FALLTHROUGH
2254     case OPERATION_SET:
2255         if (isDegradedMode()) {
2256             return ENGINE_TMPFAIL;
2257         }
2258         ret = kvBucket->set(*it, cookie);
2259         if (ret == ENGINE_SUCCESS) {
2260             *cas = it->getCas();
2261         }
2262
2263         break;
2264
2265     case OPERATION_ADD:
2266         if (isDegradedMode()) {
2267             return ENGINE_TMPFAIL;
2268         }
2269
2270         if (it->getCas() != 0) {
2271             // Adding an item with a cas value doesn't really make sense...
2272             return ENGINE_KEY_EEXISTS;
2273         }
2274
2275         ret = kvBucket->add(*it, cookie);
2276         if (ret == ENGINE_SUCCESS) {
2277             *cas = it->getCas();
2278         }
2279         break;
2280
2281     case OPERATION_REPLACE:
2282         ret = kvBucket->replace(*it, cookie);
2283         if (ret == ENGINE_SUCCESS) {
2284             *cas = it->getCas();
2285         }
2286         break;
2287     default:
2288         ret = ENGINE_ENOTSUP;
2289     }
2290
2291     switch (ret) {
2292     case ENGINE_SUCCESS:
2293         ++stats.numOpsStore;
2294         break;
2295     case ENGINE_ENOMEM:
2296         ret = memoryCondition();
2297         break;
2298     case ENGINE_NOT_STORED:
2299     case ENGINE_NOT_MY_VBUCKET:
2300         if (isDegradedMode()) {
2301             return ENGINE_TMPFAIL;
2302         }
2303         break;
2304     default:
2305         break;
2306     }
2307
2308     return ret;
2309 }
2310
2311 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2312                                                            item **itm,
2313                                                            void **es,
2314                                                            uint16_t *nes,
2315                                                            uint8_t *ttl,
2316                                                            uint16_t *flags,
2317                                                            uint32_t *seqno,
2318                                                            uint16_t *vbucket,
2319                                                            TapProducer
2320                                                                    *connection,
2321                                                            bool &retry) {
2322     *es = NULL;
2323     *nes = 0;
2324     *ttl = (uint8_t)-1;
2325     *seqno = 0;
2326     *flags = 0;
2327     *vbucket = 0;
2328
2329     retry = false;
2330
2331     if (connection->shouldFlush()) {
2332         return TAP_FLUSH;
2333     }
2334
2335     if (connection->isTimeForNoop()) {
2336         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2337             connection->logHeader());
2338         return TAP_NOOP;
2339     }
2340
2341     if (connection->isSuspended() || connection->windowIsFull()) {
2342         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2343             " suspended state or its ack windows is full.\n",
2344             connection->logHeader());
2345         return TAP_PAUSE;
2346     }
2347
2348     uint16_t ret = TAP_PAUSE;
2349     VBucketEvent ev = connection->nextVBucketHighPriority();
2350     if (ev.event != TAP_PAUSE) {
2351         switch (ev.event) {
2352         case TAP_VBUCKET_SET:
2353             LOG(EXTENSION_LOG_NOTICE,
2354                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2355                 connection->logHeader(), ev.vbucket,
2356                 VBucket::toString(ev.state));
2357             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2358             break;
2359         case TAP_OPAQUE:
2360             LOG(EXTENSION_LOG_NOTICE,
2361                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2362                 connection->logHeader(),
2363                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2364                 ev.vbucket);
2365             connection->opaqueCommandCode = (uint32_t) ev.state;
2366             *vbucket = ev.vbucket;
2367             *es = &connection->opaqueCommandCode;
2368             *nes = sizeof(connection->opaqueCommandCode);
2369             break;
2370
2371         default:
2372             throw std::logic_error("EventuallyPersistentEngine::doWalkTapQueue:"
2373                     " Unknown VBucketEvent message type:" +
2374                     std::to_string(ev.event) + " for connection:" +
2375                     connection->logHeader());
2376         }
2377         return ev.event;
2378     }
2379
2380     if (connection->waitForOpaqueMsgAck()) {
2381         return TAP_PAUSE;
2382     }
2383
2384     VBucketFilter backFillVBFilter;
2385     if (connection->runBackfill(backFillVBFilter)) {
2386         queueBackfill(backFillVBFilter, connection);
2387     }
2388
2389     uint8_t nru = INITIAL_NRU_VALUE;
2390     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2391     switch (ret) {
2392     case TAP_CHECKPOINT_START:
2393     case TAP_CHECKPOINT_END:
2394     case TAP_MUTATION:
2395     case TAP_DELETION:
2396         *itm = it;
2397         if (ret == TAP_MUTATION) {
2398             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2399                                                        it->getRevSeqno(), nru);
2400             *es = connection->specificData;
2401         } else if (ret == TAP_DELETION) {
2402             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2403                                                        it->getRevSeqno());
2404             *es = connection->specificData;
2405         } else if (ret == TAP_CHECKPOINT_START) {
2406             // Send the current value of the max deleted seqno
2407             VBucketPtr vb = getVBucket(*vbucket);
2408             if (!vb) {
2409                 retry = true;
2410                 return TAP_NOOP;
2411             }
2412             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2413                                                vb->ht.getMaxDeletedRevSeqno());
2414             *es = connection->specificData;
2415         }
2416         break;
2417     case TAP_NOOP:
2418         retry = true;
2419         break;
2420     default:
2421         break;
2422     }
2423
2424     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2425         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2426         if (vbev.event == TAP_VBUCKET_SET) {
2427             LOG(EXTENSION_LOG_NOTICE,
2428                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2429                 connection->logHeader(), vbev.vbucket,
2430                 VBucket::toString(vbev.state));
2431             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2432         }
2433         ret = vbev.event;
2434     }
2435
2436     return ret;
2437 }
2438
2439 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2440                                                   item **itm,
2441                                                   void **es,
2442                                                   uint16_t *nes,
2443                                                   uint8_t *ttl,
2444                                                   uint16_t *flags,
2445                                                   uint32_t *seqno,
2446                                                   uint16_t *vbucket) {
2447     TapProducer *connection = getTapProducer(cookie);
2448     if (!connection) {
2449         LOG(EXTENSION_LOG_WARNING,
2450             "Failed to lookup TAP connection.. Disconnecting\n");
2451         return TAP_DISCONNECT;
2452     }
2453
2454     connection->setPaused(false);
2455
2456     bool retry = false;
2457     uint16_t ret;
2458
2459     connection->setLastWalkTime();
2460     do {
2461         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2462                              seqno, vbucket, connection, retry);
2463     } while (retry);
2464
2465     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2466         connection->lastMsgTime = ep_current_time();
2467         if (ret == TAP_NOOP) {
2468             *seqno = 0;
2469         } else {
2470             ++stats.numTapFetched;
2471             *seqno = connection->getSeqno();
2472             if (connection->requestAck(ret, *vbucket)) {
2473                 *flags = TAP_FLAG_ACK;
2474                 connection->seqnoAckRequested = *seqno;
2475             }
2476
2477             if (ret == TAP_MUTATION) {
2478                 if (connection->haveFlagByteorderSupport()) {
2479                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2480                 }
2481             }
2482         }
2483     } else {
2484         connection->setPaused(true);
2485         connection->setNotifySent(false);
2486     }
2487
2488     return ret;
2489 }
2490
2491 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2492                                                 std::string &client,
2493                                                 uint32_t flags,
2494                                                 const void *userdata,
2495                                                 size_t nuserdata) {
2496     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2497         return false;
2498     }
2499
2500     std::string tapName = "eq_tapq:";
2501     if (client.length() == 0) {
2502         tapName.assign(ConnHandler::getAnonName());
2503     } else {
2504         tapName.append(client);
2505     }
2506
2507     // Decoding the userdata section of the packet and update the filters
2508     const char *ptr = static_cast<const char*>(userdata);
2509     uint64_t backfillAge = 0;
2510     std::vector<uint16_t> vbuckets;
2511     std::map<uint16_t, uint64_t> lastCheckpointIds;
2512
2513     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2514         if (nuserdata < sizeof(backfillAge)) {
2515             LOG(EXTENSION_LOG_WARNING,
2516                 "Backfill age is missing. Reject connection request from %s\n",
2517                 tapName.c_str());
2518             return false;
2519         }
2520         // use memcpy to avoid alignemt issues
2521         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2522         backfillAge = ntohll(backfillAge);
2523         nuserdata -= sizeof(backfillAge);
2524         ptr += sizeof(backfillAge);
2525     }
2526
2527     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2528         uint16_t nvbuckets;
2529         if (nuserdata < sizeof(nvbuckets)) {
2530             LOG(EXTENSION_LOG_WARNING,
2531             "Number of vbuckets is missing. Reject connection request from %s"
2532             "\n", tapName.c_str());
2533             return false;
2534         }
2535         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2536         nuserdata -= sizeof(nvbuckets);
2537         ptr += sizeof(nvbuckets);
2538         nvbuckets = ntohs(nvbuckets);
2539         if (nvbuckets > 0) {
2540             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2541                 LOG(EXTENSION_LOG_WARNING,
2542                 "# of vbuckets not matched. Reject connection request from %s"
2543                 "\n", tapName.c_str());
2544                 return false;
2545             }
2546             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2547                 uint16_t val;
2548                 memcpy(&val, ptr, sizeof(nvbuckets));
2549                 ptr += sizeof(uint16_t);
2550                 vbuckets.push_back(ntohs(val));
2551             }
2552             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2553         }
2554     }
2555
2556     if (flags & TAP_CONNECT_CHECKPOINT) {
2557         uint16_t nCheckpoints = 0;
2558         if (nuserdata >= sizeof(nCheckpoints)) {
2559             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2560             nuserdata -= sizeof(nCheckpoints);
2561             ptr += sizeof(nCheckpoints);
2562             nCheckpoints = ntohs(nCheckpoints);
2563         }
2564         if (nCheckpoints > 0) {
2565             if (nuserdata <
2566                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2567                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2568                     "Reject connection request from %s\n", tapName.c_str());
2569                 return false;
2570             }
2571             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2572                 uint16_t vbid;
2573                 uint64_t checkpointId;
2574                 memcpy(&vbid, ptr, sizeof(vbid));
2575                 ptr += sizeof(uint16_t);
2576                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2577                 ptr += sizeof(uint64_t);
2578                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2579             }
2580         }
2581     }
2582
2583     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2584                                  backfillAge,
2585                                  static_cast<int>(
2586                                  configuration.getTapKeepalive()),
2587                                  vbuckets,
2588                                  lastCheckpointIds);
2589
2590     tapConnMap->notifyPausedConnection(tp, true);
2591     return true;
2592 }
2593
2594 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2595                                                         void *engine_specific,
2596                                                         uint16_t nengine,
2597                                                         uint8_t ttl,
2598                                                         uint16_t tap_flags,
2599                                                         uint16_t tap_event,
2600                                                         uint32_t tap_seqno,
2601                                                         const void *key,
2602                                                         size_t nkey,
2603                                                         uint32_t flags,
2604                                                         uint32_t exptime,
2605                                                         uint64_t cas,
2606                                                         uint8_t datatype,
2607                                                         const void *data,
2608                                                         size_t ndata,
2609                                                         uint16_t vbucket)
2610 {
2611     (void) ttl;
2612     void *specific = getEngineSpecific(cookie);
2613     ConnHandler *connection = NULL;
2614     if (specific == NULL) {
2615         if (tap_event == TAP_ACK) {
2616             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2617                 "exist. Force disconnect...\n", (char *) cookie);
2618             // tap producer is no longer connected..
2619             return ENGINE_DISCONNECT;
2620         } else {
2621             connection = tapConnMap->newConsumer(cookie);
2622             if (connection == NULL) {
2623                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2624                     " Force disconnect\n");
2625                 return ENGINE_DISCONNECT;
2626             }
2627             storeEngineSpecific(cookie, connection);
2628         }
2629     } else {
2630         connection = reinterpret_cast<ConnHandler *>(specific);
2631     }
2632
2633
2634     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2635
2636     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2637         if (!replicationThrottle->shouldProcess()) {
2638             ++stats.replicationThrottled;
2639             if (connection->supportsAck()) {
2640                 ret = ENGINE_TMPFAIL;
2641             } else {
2642                 ret = ENGINE_DISCONNECT;
2643                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2644                     "ack support. Force disconnect...\n",
2645                     connection->logHeader());
2646             }
2647             return ret;
2648         }
2649     }
2650
2651     switch (tap_event) {
2652     case TAP_ACK:
2653         // TAP works only with the DefaultCollection
2654         ret = processTapAck(cookie, tap_seqno, tap_flags,
2655                             DocKey(static_cast<const uint8_t*>(key), nkey,
2656                                    DocNamespace::DefaultCollection));
2657         break;
2658     case TAP_FLUSH:
2659         ret = flush(cookie);
2660         LOG(EXTENSION_LOG_NOTICE, "%s Received flush.\n",
2661             connection->logHeader());
2662         break;
2663     case TAP_DELETION:
2664         {
2665             uint64_t revSeqno;
2666             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2667                                                 nengine, &revSeqno);
2668
2669             // Create key in DefaultCollection since TAP won't support collections
2670             const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2671                                 DocNamespace::DefaultCollection};
2672             ret = connection->deletion(0, docKey, {}, 0,
2673                                        PROTOCOL_BINARY_RAW_BYTES, cas, vbucket,
2674                                        0, revSeqno, {});
2675         }
2676         break;
2677
2678     case TAP_CHECKPOINT_START:
2679     case TAP_CHECKPOINT_END:
2680         {
2681             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2682             if (tc) {
2683                 if (tap_event == TAP_CHECKPOINT_START &&
2684                     nengine == TapEngineSpecific::sizeRevSeqno) {
2685                     // Set the current value for the max deleted seqno
2686                     VBucketPtr vb = getVBucket(vbucket);
2687                     if (!vb) {
2688                         return ENGINE_TMPFAIL;
2689                     }
2690                     uint64_t seqnum;
2691                     TapEngineSpecific::readSpecificData(tap_event,
2692                                                         engine_specific,
2693                                                         nengine,
2694                                                         &seqnum);
2695                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2696                 }
2697
2698                 if (data) {
2699                     uint64_t checkpointId;
2700                     memcpy(&checkpointId, data, sizeof(checkpointId));
2701                     checkpointId = ntohll(checkpointId);
2702                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2703                                           checkpointId);
2704                 }
2705                 else {
2706                     ret = ENGINE_DISCONNECT;
2707                     LOG(EXTENSION_LOG_WARNING,
2708                         "%s Checkpoint Id is missing in "
2709                         "CHECKPOINT messages. Force disconnect...\n",
2710                         connection->logHeader());
2711                 }
2712             }
2713             else {
2714                 ret = ENGINE_DISCONNECT;
2715                 LOG(EXTENSION_LOG_WARNING,
2716                     "%s not a consumer! Force disconnect\n",
2717                     connection->logHeader());
2718             }
2719         }
2720
2721         break;
2722
2723     case TAP_MUTATION:
2724         {
2725             uint8_t nru = INITIAL_NRU_VALUE;
2726             uint64_t revSeqno = 0;
2727             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2728                                                 nengine, &revSeqno, &nru);
2729
2730             if (!isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_JSON)) {
2731                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2732                 const unsigned char *dat = (const unsigned char*)data;
2733                 const int datlen = ndata;
2734                 if (checkUTF8JSON(dat, datlen)) {
2735                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2736                 }
2737             }
2738             // Create key in DefaultCollection since TAP won't support collections
2739             const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2740                                 DocNamespace::DefaultCollection};
2741             ret = connection->mutation(0, docKey,
2742                                        {static_cast<const uint8_t*>(data), ndata},
2743                                        0, datatype, cas,
2744                                        vbucket, flags, 0, revSeqno, exptime, 0,
2745                                        {}, nru);
2746         }
2747
2748         break;
2749
2750     case TAP_OPAQUE:
2751         if (nengine == sizeof(uint32_t)) {
2752             uint32_t cc;
2753             memcpy(&cc, engine_specific, sizeof(cc));
2754             cc = ntohl(cc);
2755
2756             switch (cc) {
2757             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2758                 // @todo: the memcached core will _ALWAYS_ send nack
2759                 //        if it encounter an error. This should be
2760                 // set as the default when we move to .next after 2.0
2761                 // (currently we need to allow the message for
2762                 // backwards compatibility)
2763                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2764                     connection->logHeader());
2765                 break;
2766             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2767                 connection->setSupportCheckpointSync(true);
2768                 LOG(EXTENSION_LOG_INFO,
2769                     "%s Enable checkpoint synchronization\n",
2770                     connection->logHeader());
2771                 break;
2772             case TAP_OPAQUE_OPEN_CHECKPOINT:
2773                 /**
2774                  * This event is only received by the TAP client that wants to
2775                  * get mutations from closed checkpoints only. At this time,
2776                  * only incremental backup client receives this event so that
2777                  * it can close the connection and reconnect later.
2778                  */
2779                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2780                     connection->logHeader());
2781                 break;
2782             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2783                 {
2784                     LOG(EXTENSION_LOG_INFO,
2785                         "%s Backfill started for vbucket %d.\n",
2786                         connection->logHeader(), vbucket);
2787                     BlockTimer timer(&stats.tapVbucketResetHisto);
2788                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2789                                                   ENGINE_DISCONNECT;
2790                     if (ret == ENGINE_DISCONNECT) {
2791                         LOG(EXTENSION_LOG_WARNING,
2792                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2793                             connection->logHeader(), vbucket);
2794                     } else {
2795                         LOG(EXTENSION_LOG_NOTICE,
2796                          "%s Reset vbucket %d was completed succecssfully.\n",
2797                             connection->logHeader(), vbucket);
2798                     }
2799
2800                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2801                     if (tc) {
2802                         tc->setBackfillPhase(true, vbucket);
2803                     } else {
2804                         ret = ENGINE_DISCONNECT;
2805                         LOG(EXTENSION_LOG_WARNING,
2806                             "TAP consumer doesn't exists. Force disconnect\n");
2807                     }
2808                 }
2809                 break;
2810             case TAP_OPAQUE_CLOSE_BACKFILL:
2811                 {
2812                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2813                         connection->logHeader());
2814                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2815                     if (tc) {
2816                         tc->setBackfillPhase(false, vbucket);
2817                     } else {
2818                         ret = ENGINE_DISCONNECT;
2819                         LOG(EXTENSION_LOG_WARNING,
2820                             "%s not a consumer! Force disconnect\n",
2821                             connection->logHeader());
2822                     }
2823                 }
2824                 break;
2825             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2826                 /**
2827                  * This event is sent by the eVBucketMigrator to notify that
2828                  * the source node closes the tap replication stream and
2829                  * switches to TAKEOVER_VBUCKETS phase.
2830                  * This is just an informative message and doesn't require any
2831                  * action.
2832                  */
2833                 LOG(EXTENSION_LOG_INFO,
2834                 "%s Received close tap stream. Switching to takeover phase.\n",
2835                     connection->logHeader());
2836                 break;
2837             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2838                 /**
2839                  * This opaque message is just for notifying that the source
2840                  * node receives change_vbucket_filter request and processes
2841                  * it successfully.
2842                  */
2843                 LOG(EXTENSION_LOG_INFO,
2844                 "%s Notified that the source node changed a vbucket filter.\n",
2845                     connection->logHeader());
2846                 break;
2847             default:
2848                 LOG(EXTENSION_LOG_WARNING,
2849                     "%s Received an unknown opaque command\n",
2850                     connection->logHeader());
2851             }
2852         } else {
2853             LOG(EXTENSION_LOG_WARNING,
2854                 "%s Received tap opaque with unknown size %d\n",
2855                 connection->logHeader(), nengine);
2856         }
2857         break;
2858
2859     case TAP_VBUCKET_SET:
2860         {
2861             BlockTimer timer(&stats.tapVbucketSetHisto);
2862
2863             if (nengine != sizeof(vbucket_state_t)) {
2864                 // illegal datasize
2865                 LOG(EXTENSION_LOG_WARNING,
2866                     "%s Received TAP_VBUCKET_SET with illegal size."
2867                     " Force disconnect\n", connection->logHeader());
2868                 ret = ENGINE_DISCONNECT;
2869                 break;
2870             }
2871
2872             vbucket_state_t state;
2873             memcpy(&state, engine_specific, nengine);
2874             state = (vbucket_state_t)ntohl(state);
2875
2876             ret = connection->setVBucketState(0, vbucket, state);
2877         }
2878         break;
2879
2880     default:
2881         // Unknown command
2882         LOG(EXTENSION_LOG_WARNING,
2883             "%s Recieved bad opcode, ignoring message\n",
2884             connection->logHeader());
2885     }
2886
2887     connection->processedEvent(tap_event, ret);
2888     return ret;
2889 }
2890
2891 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2892                                                       TapConsumer *consumer,
2893                                                       uint8_t event,
2894                                                       uint16_t vbucket,
2895                                                       uint64_t checkpointId) {
2896     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2897
2898     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2899         getKVBucket()->wakeUpFlusher();
2900         ret = ENGINE_SUCCESS;
2901     }
2902     else {
2903         ret = ENGINE_DISCONNECT;
2904         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2905             "checkpoint %" PRIu64 ". Force disconnect\n",
2906             consumer->logHeader(), checkpointId);
2907     }
2908
2909     return ret;
2910 }
2911
2912 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2913     TapProducer *rv =
2914         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2915     if (!(rv && rv->isConnected())) {
2916         LOG(EXTENSION_LOG_WARNING,
2917             "Walking a non-existent tap queue, disconnecting\n");
2918         return NULL;
2919     }
2920
2921     if (rv->doDisconnect()) {
2922         LOG(EXTENSION_LOG_WARNING,
2923             "%s Disconnecting pending connection\n", rv->logHeader());
2924         return NULL;
2925     }
2926     return rv;
2927 }
2928
2929 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2930     // Register the ON_DISCONNECT callback
2931     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2932     // Register the ON_DELETE_BUCKET callback
2933     registerEngineCallback(ON_DELETE_BUCKET, EvpHandleDeleteBucket, this);
2934 }
2935
2936 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2937                                                             uint32_t seqno,
2938                                                             uint16_t status,
2939                                                             const DocKey& key)
2940 {
2941     TapProducer *connection = getTapProducer(cookie);
2942     if (!connection) {
2943         LOG(EXTENSION_LOG_WARNING,
2944             "Unable to process tap ack. No producer found\n");
2945         return ENGINE_DISCONNECT;
2946     }
2947
2948     return connection->processAck(seqno, status, key);
2949 }
2950
2951 ENGINE_ERROR_CODE EventuallyPersistentEngine::memoryCondition() {
2952     // Do we think it's possible we could free something?
2953     bool haveEvidenceWeCanFreeMemory =
2954         (stats.getMaxDataSize() > stats.memOverhead->load());
2955     if (haveEvidenceWeCanFreeMemory) {
2956         // Look for more evidence by seeing if we have resident items.
2957         VBucketCountVisitor countVisitor(vbucket_state_active);
2958         kvBucket->visit(countVisitor);
2959
2960         haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
2961             countVisitor.getNumItems();
2962     }
2963     if (haveEvidenceWeCanFreeMemory) {
2964         ++stats.tmp_oom_errors;
2965         // Wake up the item pager task as memory usage
2966         // seems to have exceeded high water mark
2967         getKVBucket()->wakeUpItemPager();
2968         return ENGINE_TMPFAIL;
2969     } else {
2970         ++stats.oom_errors;
2971         return ENGINE_ENOMEM;
2972     }
2973 }
2974
2975 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2976                                                              &backfillVBFilter,
2977                                                Producer *tc)
2978 {
2979     auto bfv = std::make_unique<BackFillVisitor>(
2980             this, *tapConnMap, tc, backfillVBFilter);
2981     getKVBucket()->visit(std::move(bfv),
2982                          "Backfill task",
2983                          TaskId::BackfillVisitorTask,
2984                          1);
2985 }
2986
2987 void VBucketCountAggregator::visitBucket(VBucketPtr &vb) {
2988     std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
2989     it = visitorMap.find(vb->getState());
2990     if ( it != visitorMap.end() ) {
2991         it->second->visitBucket(vb);
2992     }
2993 }
2994
2995 void VBucketCountAggregator::addVisitor(VBucketCountVisitor* visitor) {
2996     visitorMap[visitor->getVBucketState()] = visitor;
2997 }
2998
2999 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3000                                                            ADD_STAT add_stat) {
3001
3002     configuration.addStats(add_stat, cookie);
3003
3004     EPStats &epstats = getEpStats();
3005     add_casted_stat("ep_version", VERSION, add_stat, cookie);
3006     add_casted_stat("ep_storage_age",
3007                     epstats.dirtyAge, add_stat, cookie);
3008     add_casted_stat("ep_storage_age_highwat",
3009                     epstats.dirtyAgeHighWat, add_stat, cookie);
3010     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3011                     add_stat, cookie);
3012
3013     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3014         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3015     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3016         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3017     }
3018
3019     add_casted_stat("ep_total_enqueued",
3020                     epstats.totalEnqueued, add_stat, cookie);
3021     add_casted_stat("ep_expired_access", epstats.expired_access,
3022                     add_stat, cookie);
3023     add_casted_stat("ep_expired_compactor", epstats.expired_compactor,
3024                     add_stat, cookie);
3025     add_casted_stat("ep_expired_pager", epstats.expired_pager,
3026                     add_stat, cookie);
3027     add_casted_stat("ep_queue_size",
3028                     epstats.diskQueueSize, add_stat, cookie);
3029     add_casted_stat("ep_diskqueue_items",
3030                     epstats.diskQueueSize, add_stat, cookie);
3031     add_casted_stat("ep_vb_backfill_queue_size",
3032                     epstats.vbBackfillQueueSize,
3033                     add_stat,
3034                     cookie);
3035     auto* flusher = kvBucket->getFlusher(EP_PRIMARY_SHARD);
3036     if (flusher) {
3037         add_casted_stat("ep_commit_num", epstats.flusherCommits,
3038                         add_stat, cookie);
3039         add_casted_stat("ep_commit_time",
3040                         epstats.commit_time, add_stat, cookie);
3041         add_casted_stat("ep_commit_time_total",
3042                         epstats.cumulativeCommitTime, add_stat, cookie);
3043         add_casted_stat("ep_item_begin_failed",
3044                         epstats.beginFailed, add_stat, cookie);
3045         add_casted_stat("ep_item_commit_failed",
3046                         epstats.commitFailed, add_stat, cookie);
3047         add_casted_stat("ep_item_flush_expired",
3048                         epstats.flushExpired, add_stat, cookie);
3049         add_casted_stat("ep_item_flush_failed",
3050                         epstats.flushFailed, add_stat, cookie);
3051         add_casted_stat("ep_flusher_state",
3052                         flusher->stateName(), add_stat, cookie);
3053         add_casted_stat("ep_flusher_todo",
3054                         epstats.flusher_todo, add_stat, cookie);
3055         add_casted_stat("ep_total_persisted",
3056                         epstats.totalPersisted, add_stat, cookie);
3057         add_casted_stat("ep_uncommitted_items",
3058                         epstats.flusher_todo, add_stat, cookie);
3059         add_casted_stat("ep_chk_persistence_timeout",
3060                         VBucket::getCheckpointFlushTimeout(),
3061                         add_stat,
3062                         cookie);
3063     }
3064     add_casted_stat("ep_vbucket_del",
3065                     epstats.vbucketDeletions, add_stat, cookie);
3066     add_casted_stat("ep_vbucket_del_fail",
3067                     epstats.vbucketDeletionFail, add_stat, cookie);
3068     add_casted_stat("ep_flush_duration_total",
3069                     epstats.cumulativeFlushTime, add_stat, cookie);
3070
3071     kvBucket->getAggregatedVBucketStats(cookie, add_stat);
3072
3073     kvBucket->getFileStats(cookie, add_stat);
3074
3075     add_casted_stat("ep_persist_vbstate_total",
3076                     epstats.totalPersistVBState, add_stat, cookie);
3077
3078     size_t memUsed =  stats.getTotalMemoryUsed();
3079     add_casted_stat("mem_used", memUsed, add_stat, cookie);
3080     add_casted_stat("ep_mem_low_wat_percent", stats.mem_low_wat_percent,
3081                     add_stat, cookie);
3082     add_casted_stat("ep_mem_high_wat_percent", stats.mem_high_wat_percent,
3083                     add_stat, cookie);
3084     add_casted_stat("bytes", memUsed, add_stat, cookie);
3085     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3086     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3087 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3088     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3089 #else
3090     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3091 #endif
3092     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3093     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3094                     add_stat, cookie);
3095 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3096     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3097 #else
3098     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3099 #endif
3100     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3101     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3102     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3103
3104     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3105     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3106                     add_stat, cookie);
3107     add_casted_stat("ep_mem_tracker_enabled", stats.memoryTrackerEnabled,
3108                     add_stat, cookie);
3109     add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3110                     add_stat, cookie);
3111     add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3112                     add_stat, cookie);
3113     add_casted_stat("ep_bg_remaining_items", epstats.numRemainingBgItems,
3114                     add_stat, cookie);
3115     add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3116                     add_stat, cookie);
3117     add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3118                     add_stat, cookie);
3119     add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3120                     add_stat, cookie);
3121     add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3122                     add_stat, cookie);
3123     add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3124                     add_stat, cookie);
3125     add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3126                     add_stat, cookie);
3127     add_casted_stat("ep_items_rm_from_checkpoints",
3128                     epstats.itemsRemovedFromCheckpoints,
3129                     add_stat, cookie);
3130     add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3131                     add_stat, cookie);
3132     add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3133                     add_stat, cookie);
3134     add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3135                     add_stat, cookie);
3136
3137     add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3138     add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3139                     add_stat, cookie);
3140     add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3141                     add_stat, cookie);
3142     add_casted_stat("ep_pending_ops_max_duration",
3143                     epstats.pendingOpsMaxDuration,
3144                     add_stat, cookie);
3145
3146     add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3147                     add_stat, cookie);
3148     add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3149                     add_stat, cookie);
3150
3151     size_t vbDeletions = epstats.vbucketDeletions.load();
3152     if (vbDeletions > 0) {
3153         add_casted_stat("ep_vbucket_del_max_walltime",
3154                         epstats.vbucketDelMaxWalltime,
3155                         add_stat, cookie);
3156         add_casted_stat("ep_vbucket_del_avg_walltime",
3157                         epstats.vbucketDelTotWalltime / vbDeletions,
3158                         add_stat, cookie);
3159     }
3160
3161     size_t numBgOps = epstats.bgNumOperations.load();
3162     if (numBgOps > 0) {
3163         add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3164                         add_stat, cookie);
3165         add_casted_stat("ep_bg_min_wait",
3166                         epstats.bgMinWait,
3167                         add_stat, cookie);
3168         add_casted_stat("ep_bg_max_wait",
3169                         epstats.bgMaxWait,
3170                         add_stat, cookie);
3171         add_casted_stat("ep_bg_wait_avg",
3172                         epstats.bgWait / numBgOps,
3173                         add_stat, cookie);
3174         add_casted_stat("ep_bg_min_load",
3175                         epstats.bgMinLoad,
3176                         add_stat, cookie);
3177         add_casted_stat("ep_bg_max_load",
3178                         epstats.bgMaxLoad,
3179                         add_stat, cookie);
3180         add_casted_stat("ep_bg_load_avg",
3181                         epstats.bgLoad / numBgOps,
3182                         add_stat, cookie);
3183         add_casted_stat("ep_bg_wait",
3184                         epstats.bgWait,
3185                         add_stat, cookie);
3186         add_casted_stat("ep_bg_load",
3187                         epstats.bgLoad,
3188                         add_stat, cookie);
3189     }
3190
3191     add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3192
3193     add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3194                     add_stat, cookie);
3195     add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3196                     add_stat, cookie);
3197     add_casted_stat("ep_num_access_scanner_skips",
3198                     epstats.accessScannerSkips, add_stat, cookie);
3199     add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3200                     add_stat, cookie);
3201     add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3202                     add_stat, cookie);
3203
3204     if (kvBucket->isAccessScannerEnabled() && epstats.alogTime.load() != 0)
3205     {
3206         char timestr[20];
3207         struct tm alogTim;
3208         hrtime_t alogTime = epstats.alogTime.load();
3209         if (cb_gmtime_r((time_t *)&alogTime, &alogTim) == -1) {
3210             add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3211                             cookie);
3212         } else {
3213             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3214             add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3215                             cookie);
3216         }
3217     } else {
3218         add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3219                         add_stat, cookie);
3220     }
3221
3222     if (kvBucket->isExpPagerEnabled()) {
3223         char timestr[20];
3224         struct tm expPagerTim;
3225         hrtime_t expPagerTime = epstats.expPagerTime.load();
3226         if (cb_gmtime_r((time_t *)&expPagerTime, &expPagerTim) == -1) {
3227             add_casted_stat("ep_expiry_pager_task_time", "UNKNOWN", add_stat,
3228                             cookie);
3229         } else {
3230             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &expPagerTim);
3231             add_casted_stat("ep_expiry_pager_task_time", timestr, add_stat,
3232                             cookie);
3233         }
3234     } else {
3235         add_casted_stat("ep_expiry_pager_task_time", "NOT_SCHEDULED",
3236                         add_stat, cookie);
3237     }
3238
3239     add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
3240
3241     if (getConfiguration().isWarmup()) {
3242         Warmup *wp = kvBucket->getWarmup();
3243         if (wp == nullptr) {
3244             throw std::logic_error("EPEngine::doEngineStats: warmup is NULL");
3245         }
3246         if (!kvBucket->isWarmingUp()) {
3247             add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
3248         } else {
3249             add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);
3250         }
3251         if (wp->getTime() > 0) {
3252             add_casted_stat("ep_warmup_time", wp->getTime() / 1000,
3253                             add_stat, cookie);
3254         }
3255         add_casted_stat("ep_warmup_oom", epstats.warmOOM, add_stat, cookie);
3256         add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
3257     }
3258
3259     add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
3260                     add_stat, cookie);
3261     add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
3262                     add_stat, cookie);
3263     add_casted_stat("ep_num_ops_del_meta", epstats.numOpsDelMeta,
3264                     add_stat, cookie);
3265     add_casted_stat("ep_num_ops_set_meta_res_fail",
3266                     epstats.numOpsSetMetaResolutionFailed, add_stat, cookie);
3267     add_casted_stat("ep_num_ops_del_meta_res_fail",
3268                     epstats.numOpsDelMetaResolutionFailed, add_stat, cookie);
3269     add_casted_stat("ep_num_ops_set_ret_meta", epstats.numOpsSetRetMeta,
3270                     add_stat, cookie);
3271     add_casted_stat("ep_num_ops_del_ret_meta", epstats.numOpsDelRetMeta,
3272                     add_stat, cookie);
3273     add_casted_stat("ep_num_ops_get_meta_on_set_meta",
3274                     epstats.numOpsGetMetaOnSetWithMeta, add_stat, cookie);
3275     add_casted_stat("ep_workload_pattern",
3276                     workload->stringOfWorkLoadPattern(),
3277                     add_stat, cookie);
3278
3279     add_casted_stat("ep_defragmenter_num_visited", epstats.defragNumVisited,
3280                     add_stat, cookie);
3281     add_casted_stat("ep_defragmenter_num_moved", epstats.defragNumMoved,
3282                     add_stat, cookie);
3283
3284     add_casted_stat("ep_cursor_dropping_lower_threshold",
3285                     epstats.cursorDroppingLThreshold, add_stat, cookie);
3286     add_casted_stat("ep_cursor_dropping_upper_threshold",
3287                     epstats.cursorDroppingUThreshold, add_stat, cookie);
3288     add_casted_stat("ep_cursors_dropped",
3289                     epstats.cursorsDropped, add_stat, cookie);
3290
3291
3292     // Note: These are also reported per-shard in 'kvstore' stats, however
3293     // we want to be able to graph these over time, and hence need to expose
3294     // to ns_sever at the top-level.
3295     size_t value = 0;
3296     if (kvBucket->getKVStoreStat("io_total_read_bytes", value,
3297                                  KVBucketIface::KVSOption::BOTH)) {
3298         add_casted_stat("ep_io_total_read_bytes",  value, add_stat, cookie);
3299     }
3300     if (kvBucket->getKVStoreStat("io_total_write_bytes", value,
3301                                  KVBucketIface::KVSOption::BOTH)) {
3302         add_casted_stat("ep_io_total_write_bytes",  value, add_stat, cookie);
3303     }
3304     if (kvBucket->getKVStoreStat("io_compaction_read_bytes", value,
3305                                  KVBucketIface::KVSOption::BOTH)) {
3306         add_casted_stat("ep_io_compaction_read_bytes",  value, add_stat, cookie);
3307     }
3308     if (kvBucket->getKVStoreStat("io_compaction_write_bytes", value,
3309                                  KVBucketIface::KVSOption::BOTH)) {
3310         add_casted_stat("ep_io_compaction_write_bytes",  value, add_stat, cookie);
3311     }
3312     if (kvBucket->getKVStoreStat("Block_cache_hits", value,
3313                                  KVBucketIface::KVSOption::RW)) {
3314         add_casted_stat("ep_block_cache_hits", value, add_stat, cookie);
3315     }
3316     if (kvBucket->getKVStoreStat("Block_cache_misses", value,
3317                                  KVBucketIface::KVSOption::RW)) {
3318         add_casted_stat("ep_block_cache_misses", value, add_stat, cookie);
3319     }
3320
3321     return ENGINE_SUCCESS;
3322 }
3323
3324 ENGINE_ERROR_CODE EventuallyPersistentEngine::doMemoryStats(const void *cookie,
3325                                                            ADD_STAT add_stat) {
3326     add_casted_stat("bytes", stats.getTotalMemoryUsed(), add_stat, cookie);
3327     add_casted_stat("mem_used", stats.getTotalMemoryUsed(), add_stat, cookie);
3328     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3329     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3330     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3331     add_casted_stat("ep_max_size", stats.getMaxDataSize(), add_stat, cookie);
3332     add_casted_stat("ep_mem_low_wat", stats.mem_low_wat, add_stat, cookie);
3333     add_casted_stat("ep_mem_low_wat_percent", stats.mem_low_wat_percent,
3334                     add_stat, cookie);
3335     add_casted_stat("ep_mem_high_wat", stats.mem_high_wat, add_stat, cookie);
3336     add_casted_stat("ep_mem_high_wat_percent", stats.mem_high_wat_percent,
3337                     add_stat, cookie);
3338     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3339     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3340                     add_stat, cookie);
3341
3342     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3343 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3344     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3345 #else
3346     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3347 #endif
3348     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3349                     add_stat, cookie);
3350 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3351     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3352 #else
3353     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3354 #endif
3355     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3356     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3357
3358     std::map<std::string, size_t> alloc_stats;
3359     MemoryTracker::getInstance(*getServerApiFunc()->alloc_hooks)->
3360         getAllocatorStats(alloc_stats);
3361
3362     for (const auto& it : alloc_stats) {
3363         add_casted_stat(it.first.c_str(), it.second, add_stat, cookie);
3364     }
3365
3366     return ENGINE_SUCCESS;
3367 }
3368
3369 ENGINE_ERROR_CODE EventuallyPersistentEngine::doVBucketStats(
3370                                                        const void *cookie,
3371                                                        ADD_STAT add_stat,
3372                                                        const char* stat_key,
3373                                                        int nkey,
3374                                                        bool prevStateRequested,
3375                                                        bool details) {
3376     class StatVBucketVisitor : public VBucketVisitor {
3377     public:
3378         StatVBucketVisitor(KVBucketIface* store,
3379                            const void *c, ADD_STAT a,
3380                            bool isPrevStateRequested, bool detailsRequested) :
3381             eps(store), cookie(c), add_stat(a),
3382             isPrevState(isPrevStateRequested),
3383             isDetailsRequested(detailsRequested) {}
3384
3385         void visitBucket(VBucketPtr &vb) override {
3386             addVBStats(cookie, add_stat, vb, eps, isPrevState,
3387                        isDetailsRequested);
3388         }
3389
3390         static void addVBStats(const void *cookie, ADD_STAT add_stat,
3391                                VBucketPtr &vb,
3392                                KVBucketIface* store,
3393                                bool isPrevStateRequested,
3394                                bool detailsRequested) {
3395             if (!vb) {
3396                 return;
3397             }
3398
3399             if (isPrevStateRequested) {
3400                 try {
3401                     char buf[16];
3402                     checked_snprintf(buf, sizeof(buf), "vb_%d", vb->getId());
3403                     add_casted_stat(buf,
3404                                     VBucket::toString(vb->getInitialState()),
3405                                     add_stat, cookie);
3406                 } catch (std::exception& error) {
3407                     LOG(EXTENSION_LOG_WARNING,
3408                         "addVBStats: Failed building stats: %s", error.what());
3409                 }
3410             } else {
3411                 vb->addStats(detailsRequested, add_stat, cookie);
3412             }
3413         }
3414
3415     private:
3416         KVBucketIface* eps;
3417         const void *cookie;
3418         ADD_STAT add_stat;
3419         bool isPrevState;
3420         bool isDetailsRequested;
3421     };
3422
3423     if (nkey > 16 && strncmp(stat_key, "vbucket-details", 15) == 0) {
3424         std::string vbid(&stat_key[16], nkey - 16);
3425         uint16_t vbucket_id(0);
3426         if (!parseUint16(vbid.c_str(), &vbucket_id)) {
3427             return ENGINE_EINVAL;
3428         }
3429         VBucketPtr vb = getVBucket(vbucket_id);
3430         if (!vb) {
3431             return ENGINE_NOT_MY_VBUCKET;
3432         }
3433
3434         StatVBucketVisitor::addVBStats(cookie, add_stat, vb, kvBucket.get(),
3435                                        prevStateRequested, details);
3436     }
3437     else {
3438         StatVBucketVisitor svbv(kvBucket.get(), cookie, add_stat,
3439                                 prevStateRequested, details);
3440         kvBucket->visit(svbv);
3441     }
3442     return ENGINE_SUCCESS;
3443 }
3444
3445 ENGINE_ERROR_CODE EventuallyPersistentEngine::doHashStats(const void *cookie,
3446                                                           ADD_STAT add_stat) {
3447
3448     class StatVBucketVisitor : public VBucketVisitor {
3449     public:
3450         StatVBucketVisitor(const void *c, ADD_STAT a) : cookie(c),
3451                                                         add_stat(a) {}
3452
3453         void visitBucket(VBucketPtr &vb) override {
3454             uint16_t vbid = vb->getId();
3455             char buf[32];
3456             try {
3457                 checked_snprintf(buf, sizeof(buf), "vb_%d:state", vbid);
3458                 add_casted_stat(buf, VBucket::toString(vb->getState()),
3459                                 add_stat, cookie);
3460             } catch (std::exception& error) {
3461                 LOG(EXTENSION_LOG_WARNING,
3462                     "StatVBucketVisitor::visitBucket: Failed to build stat: %s",
3463                     error.what());
3464             }
3465
3466             HashTableDepthStatVisitor depthVisitor;
3467             vb->ht.visitDepth(depthVisitor);
3468
3469             try {
3470                 checked_snprintf(buf, sizeof(buf), "vb_%d:size", vbid);
3471                 add_casted_stat(buf, vb->ht.getSize(), add_stat, cookie);
3472                 checked_snprintf(buf, sizeof(buf), "vb_%d:locks", vbid);
3473                 add_casted_stat(buf, vb->ht.getNumLocks(), add_stat, cookie);
3474                 checked_snprintf(buf, sizeof(buf), "vb_%d:min_depth", vbid);
3475                 add_casted_stat(buf,
3476                                 depthVisitor.min == -1 ? 0 : depthVisitor.min,
3477                                 add_stat, cookie);
3478                 checked_snprintf(buf, sizeof(buf), "vb_%d:max_depth", vbid);
3479                 add_casted_stat(buf, depthVisitor.max, add_stat, cookie);
3480                 checked_snprintf(buf, sizeof(buf), "vb_%d:histo", vbid);
3481                 add_casted_stat(buf, depthVisitor.depthHisto, add_stat, cookie);
3482                 checked_snprintf(buf, sizeof(buf), "vb_%d:reported", vbid);
3483                 add_casted_stat(buf, vb->ht.getNumInMemoryItems(), add_stat,
3484                                 cookie);
3485                 checked_snprintf(buf, sizeof(buf), "vb_%d:counted", vbid);
3486                 add_casted_stat(buf, depthVisitor.size, add_stat, cookie);
3487                 checked_snprintf(buf, sizeof(buf), "vb_%d:resized", vbid);
3488                 add_casted_stat(buf, vb->ht.getNumResizes(), add_stat, cookie);
3489                 checked_snprintf(buf, sizeof(buf), "vb_%d:mem_size", vbid);
3490                 add_casted_stat(buf, vb->ht.memSize, add_stat, cookie);
3491                 checked_snprintf(buf, sizeof(buf), "vb_%d:mem_size_counted",
3492                                  vbid);
3493                 add_casted_stat(buf, depthVisitor.memUsed, add_stat, cookie);
3494             } catch (std::exception& error) {
3495                 LOG(EXTENSION_LOG_WARNING,
3496                     "StatVBucketVisitor::visitBucket: Failed to build stat: %s",
3497                     error.what());
3498             }
3499         }
3500
3501         const void *cookie;
3502         ADD_STAT add_stat;
3503     };
3504
3505     StatVBucketVisitor svbv(cookie, add_stat);
3506     kvBucket->visit(svbv);
3507
3508     return ENGINE_SUCCESS;
3509 }
3510
3511 class StatCheckpointVisitor : public VBucketVisitor {
3512 public:
3513     StatCheckpointVisitor(KVBucketIface* kvs, const void *c,
3514                           ADD_STAT a) : kvBucket(kvs), cookie(c),
3515                                         add_stat(a) {}
3516
3517     void visitBucket(VBucketPtr &vb) override {
3518         addCheckpointStat(cookie, add_stat, kvBucket, vb);
3519     }
3520
3521     static void addCheckpointStat(const void *cookie, ADD_STAT add_stat,
3522                                   KVBucketIface* eps,
3523                                   VBucketPtr &vb) {
3524         if (!vb) {
3525             return;
3526         }
3527
3528         uint16_t vbid = vb->getId();
3529         char buf[256];
3530         try {
3531             checked_snprintf(buf, sizeof(buf), "vb_%d:state", vbid);
3532             add_casted_stat(buf, VBucket::toString(vb->getState()),
3533                             add_stat, cookie);
3534             vb->checkpointManager.addStats(add_stat, cookie);
3535
3536             auto result = eps->getLastPersistedCheckpointId(vbid);
3537             if (result.second) {
3538                 checked_snprintf(buf,
3539                                  sizeof(buf),
3540                                  "vb_%d:persisted_checkpoint_id",
3541                                  vbid);
3542                 add_casted_stat(buf, result.first, add_stat, cookie);
3543             }
3544         } catch (std::exception& error) {
3545             LOG(EXTENSION_LOG_WARNING,
3546                 "StatCheckpointVisitor::addCheckpointStat: error building stats: %s",
3547                 error.what());
3548         }
3549     }
3550
3551     KVBucketIface* kvBucket;
3552     const void *cookie;
3553     ADD_STAT add_stat;
3554 };
3555
3556
3557 class StatCheckpointTask : public GlobalTask {
3558 public:
3559     StatCheckpointTask(EventuallyPersistentEngine *e, const void *c,
3560             ADD_STAT a) : GlobalTask(e, TaskId::StatCheckpointTask,
3561                                      0, false),
3562                           ep(e), cookie(c), add_stat(a) { }
3563     bool run(void) {
3564         TRACE_EVENT0("ep-engine/task", "StatsCheckpointTask");
3565         StatCheckpointVisitor scv(ep->getKVBucket(), cookie, add_stat);
3566         ep->getKVBucket()->visit(scv);
3567         ep->notifyIOComplete(cookie, ENGINE_SUCCESS);
3568         return false;
3569     }
3570
3571     cb::const_char_buffer getDescription() {
3572         return "checkpoint stats for all vbuckets";
3573     }
3574
3575 private:
3576     EventuallyPersistentEngine *ep;
3577     const void *cookie;
3578     ADD_STAT add_stat;
3579 };
3580 /// @endcond
3581
3582 ENGINE_ERROR_CODE EventuallyPersistentEngine::doCheckpointStats(
3583                                                           const void *cookie,
3584                                                           ADD_STAT add_stat,
3585                                                           const char* stat_key,
3586                                                           int nkey) {
3587
3588     if (nkey == 10) {
3589         void* es = getEngineSpecific(cookie);
3590         if (es == NULL) {
3591             ExTask task = new StatCheckpointTask(this, cookie, add_stat);
3592             ExecutorPool::get()->schedule(task);
3593             storeEngineSpecific(cookie, this);
3594             return ENGINE_EWOULDBLOCK;
3595         } else {
3596             storeEngineSpecific(cookie, NULL);
3597         }
3598     } else if (nkey > 11) {
3599         std::string vbid(&stat_key[11], nkey - 11);
3600         uint16_t vbucket_id(0);
3601         if (!parseUint16(vbid.c_str(), &vbucket_id)) {
3602             return ENGINE_EINVAL;
3603         }
3604         VBucketPtr vb = getVBucket(vbucket_id);
3605
3606         StatCheckpointVisitor::addCheckpointStat(cookie, add_stat,
3607                                                  kvBucket.get(), vb);
3608     }
3609
3610     return ENGINE_SUCCESS;
3611 }
3612
3613 /**
3614  * Function object to send stats for a single tap or dcp connection.
3615  */
3616 struct ConnStatBuilder {
3617     ConnStatBuilder(const void *c, ADD_STAT as, ConnCounter& tc)
3618         : cookie(c), add_stat(as), aggregator(tc) {}
3619
3620     void operator() (connection_t &tc) {
3621         ++aggregator.totalConns;
3622         tc->addStats(add_stat, cookie);
3623
3624         Producer *tp = dynamic_cast<Producer*>(tc.get());
3625         if (tp) {
3626             ++aggregator.totalProducers;
3627             tp->aggregateQueueStats(aggregator);
3628         }
3629     }
3630
3631     const void *cookie;
3632     ADD_STAT    add_stat;
3633     ConnCounter& aggregator;
3634 };
3635
3636 struct ConnAggStatBuilder {
3637     ConnAggStatBuilder(std::map<std::string, ConnCounter*> *m,
3638                       const char *s, size_t sl)
3639         : counters(m), sep(s), sep_len(sl) {}
3640
3641     ConnCounter *getTarget(connection_t& tc) {
3642         ConnCounter *rv = NULL;
3643
3644         if (tc.get()) {
3645             const std::string name(tc->getName());
3646             size_t pos1 = name.find(':');
3647             if (pos1 == name.npos) {
3648                 throw std::invalid_argument("ConnAggStatBuilder::getTarget: "
3649                         "connection tc (which has name '" + tc->getName() +
3650                         "' does not include a colon (:)");
3651             }
3652             size_t pos2 = name.find(sep, pos1+1, sep_len);
3653             if (pos2 != name.npos) {
3654                 std::string prefix(name.substr(pos1+1, pos2 - pos1 - 1));
3655                 rv = (*counters)[prefix];
3656                 if (rv == NULL) {
3657                     rv = new ConnCounter;
3658                     (*counters)[prefix] = rv;
3659                 }
3660             }
3661         }
3662         return rv;
3663     }
3664
3665     void aggregate(connection_t& c, ConnCounter *tc){
3666         ConnCounter counter;
3667
3668         ++counter.totalConns;
3669         if (dynamic_cast<Producer*>(c.get())) {
3670             ++counter.totalProducers;
3671         }
3672
3673         c->aggregateQueueStats(counter);
3674
3675         ConnCounter* total = getTotalCounter();
3676         *total += counter;
3677
3678         if (tc) {
3679             *tc += counter;
3680         }
3681     }
3682
3683     ConnCounter *getTotalCounter() {
3684         ConnCounter *rv = NULL;
3685         std::string sepr(sep);
3686         std::string total(sepr + "total");
3687         rv = (*counters)[total];
3688         if(rv == NULL) {
3689             rv = new ConnCounter;
3690             (*counters)[total] = rv;
3691         }
3692         return rv;
3693     }
3694
3695     void operator() (connection_t& tc) {
3696         if (tc.get() && tc->isConnected()) {
3697             ConnCounter *aggregator = getTarget(tc);
3698             aggregate(tc, aggregator);
3699         }
3700     }
3701
3702     std::map<std::string, ConnCounter*> *counters;
3703     const char *sep;
3704     size_t sep_len;
3705 };
3706
3707 /// @endcond
3708
3709 static void showConnAggStat(const std::string &prefix,
3710                             ConnCounter *counter,
3711                             const void *cookie,
3712                             ADD_STAT add_stat,
3713                             conn_type_t conn_type) {
3714
3715     try {
3716         char statname[80] = {0};
3717         const size_t sl(sizeof(statname));
3718         checked_snprintf(statname, sl, "%s:count", prefix.c_str());
3719         add_casted_stat(statname, counter->totalConns, add_stat, cookie);
3720
3721         checked_snprintf(statname, sl, "%s:total_backlog_size", prefix.c_str());
3722         add_casted_stat(statname, counter->conn_totalBacklogSize,
3723                         add_stat, cookie);
3724
3725         checked_snprintf(statname, sl, "%s:backoff", prefix.c_str());
3726         add_casted_stat(statname, counter->conn_queueBackoff,
3727                         add_stat, cookie);
3728
3729         if (conn_type == TAP_CONN) {
3730             checked_snprintf(statname, sl, "%s:qlen", prefix.c_str());
3731             add_casted_stat(statname, counter->conn_queue, add_stat, cookie);
3732
3733             checked_snprintf(statname, sl, "%s:fill", prefix.c_str());
3734             add_casted_stat(statname, counter->conn_queueFill,
3735                             add_stat, cookie);
3736
3737             checked_snprintf(statname, sl, "%s:drain", prefix.c_str());
3738             add_casted_stat(statname, counter->conn_queueDrain,
3739                             add_stat, cookie);
3740
3741             checked_snprintf(statname, sl, "%s:backfill_remaining",
3742                              prefix.c_str());
3743             add_casted_stat(statname, counter->conn_queueBackfillRemaining,
3744                             add_stat, cookie);
3745
3746             checked_snprintf(statname, sl, "%s:itemondisk", prefix.c_str());
3747             add_casted_stat(statname, counter->conn_queueItemOnDisk,
3748                             add_stat, cookie);
3749         }
3750
3751         if (conn_type == DCP_CONN) {
3752             checked_snprintf(statname, sl, "%s:producer_count", prefix.c_str());
3753             add_casted_stat(statname, counter->totalProducers, add_stat,
3754                             cookie);
3755
3756             checked_snprintf(statname, sl, "%s:items_sent", prefix.c_str());
3757             add_casted_stat(statname, counter->conn_queueDrain,
3758                             add_stat, cookie);
3759
3760             checked_snprintf(statname, sl, "%s:items_remaining",
3761                              prefix.c_str());
3762             add_casted_stat(statname, counter->conn_queueRemaining,
3763                             add_stat, cookie);
3764
3765             checked_snprintf(statname, sl, "%s:total_bytes", prefix.c_str());
3766             add_casted_stat(statname, counter->conn_totalBytes,
3767                             add_stat, cookie);
3768         }
3769     } catch (std::exception& error) {
3770         LOG(EXTENSION_LOG_WARNING,
3771             "showConnAggStat: Failed to build stats: %s", error.what());
3772     }
3773 }
3774
3775 ENGINE_ERROR_CODE EventuallyPersistentEngine::doConnAggStats(
3776                                                         const void *cookie,
3777                                                         ADD_STAT add_stat,
3778                                                         const char *sepPtr,
3779                                                         size_t sep_len,
3780                                                         conn_type_t connType) {
3781     // In practice, this will be 1, but C++ doesn't let me use dynamic
3782     // array sizes.
3783     const size_t max_sep_len(8);
3784     sep_len = std::min(sep_len, max_sep_len);
3785
3786     char sep[max_sep_len + 1];
3787     memcpy(sep, sepPtr, sep_len);
3788     sep[sep_len] = 0x00;
3789
3790     std::map<std::string, ConnCounter*> counters;
3791     ConnAggStatBuilder visitor(&counters, sep, sep_len);
3792     if (connType == TAP_CONN) {
3793         tapConnMap->each(visitor);
3794     } else {
3795         dcpConnMap_->each(visitor);
3796     }
3797
3798     std::map<std::string, ConnCounter*>::iterator it;
3799     for (it = counters.begin(); it != counters.end(); ++it) {
3800         showConnAggStat(it->first, it->second, cookie, add_stat, connType);
3801         delete it->second;