1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2017 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #include "ep_engine.h"
25 #include "dcp/consumer.h"
26 #include "dcp/dcpconnmap.h"
27 #include "dcp/flow-control-manager.h"
28 #include "dcp/producer.h"
29 #include "ep_bucket.h"
31 #include "ephemeral_bucket.h"
32 #include "failover-table.h"
34 #include "htresizer.h"
36 #include "memory_tracker.h"
37 #include "replicationthrottle.h"
38 #include "stats-info.h"
39 #define STATWRITER_NAMESPACE core_engine
40 #include "statwriter.h"
41 #undef STATWRITER_NAMESPACE
42 #include "string_utils.h"
43 #include "tapconnmap.h"
44 #include "vb_count_visitor.h"
47 #include <JSON_checker.h>
48 #include <cJSON_utils.h>
49 #include <memcached/engine.h>
50 #include <memcached/extension.h>
51 #include <memcached/protocol_binary.h>
52 #include <memcached/server_api.h>
53 #include <memcached/util.h>
54 #include <platform/cb_malloc.h>
55 #include <platform/checked_snprintf.h>
56 #include <platform/make_unique.h>
57 #include <platform/platform.h>
58 #include <platform/processclock.h>
59 #include <xattr/utils.h>
72 static size_t percentOf(size_t val, double percent) {
73 return static_cast<size_t>(static_cast<double>(val) * percent);
76 struct EPHandleReleaser {
77 void operator()(EventuallyPersistentEngine*) {
78 ObjectRegistry::onSwitchThread(nullptr);
82 using EPHandle = std::unique_ptr<EventuallyPersistentEngine, EPHandleReleaser>;
85 * Helper function to acquire a handle to the engine which allows access to
86 * the engine while the handle is in scope.
87 * @param handle pointer to the engine
88 * @return EPHandle which is a unique_ptr to an EventuallyPersistentEngine
89 * with a custom deleter (EPHandleReleaser) which performs the required
90 * ObjectRegistry release.
93 static inline EPHandle acquireEngine(ENGINE_HANDLE* handle) {
94 auto ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
95 ObjectRegistry::onSwitchThread(ret);
101 * Call the response callback and return the appropriate value so that
102 * the core knows what to do..
104 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
106 const void *ext, uint8_t extlen,
107 const void *body, uint32_t bodylen,
108 uint8_t datatype, uint16_t status,
109 uint64_t cas, const void *cookie)
111 ENGINE_ERROR_CODE rv = ENGINE_FAILED;
112 EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
113 if (response(key, keylen, ext, extlen, body, bodylen, datatype,
114 status, cas, cookie)) {
117 ObjectRegistry::onSwitchThread(e);
121 template <typename T>
122 static void validate(T v, T l, T h) {
123 if (v < l || v > h) {
124 throw std::runtime_error("Value out of range.");
129 static void checkNumeric(const char* str) {
134 for (; str[i]; i++) {
136 if (!isdigit(str[i])) {
137 throw std::runtime_error("Value is not numeric");
142 static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle) {
143 return acquireEngine(handle)->getInfo();
146 static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
147 const char* config_str) {
148 return acquireEngine(handle)->initialize(config_str);
151 static void EvpDestroy(ENGINE_HANDLE* handle, const bool force) {
152 auto eng = acquireEngine(handle);
157 static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
163 const rel_time_t exptime,
166 if (!mcbp::datatype::is_valid(datatype)) {
167 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
169 return ENGINE_EINVAL;
172 return acquireEngine(handle)->itemAllocate(itm,
175 0, // No privileged bytes
182 static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
183 const item* itm, item_info *itm_info);
184 static void EvpItemRelease(ENGINE_HANDLE* handle, const void *cookie,
188 static std::pair<cb::unique_item_ptr, item_info> EvpItemAllocateEx(ENGINE_HANDLE* handle,
192 const size_t priv_nbytes,
194 const rel_time_t exptime,
199 auto err = acquireEngine(handle)->itemAllocate(
200 &it, key, nbytes, priv_nbytes, flags, exptime, datatype, vbucket);
202 if (err != ENGINE_SUCCESS) {
203 throw cb::engine_error(cb::engine_errc(err),
204 "EvpItemAllocateEx: failed to allocate memory");
208 if (!EvpGetItemInfo(handle, cookie, it, &info)) {
209 EvpItemRelease(handle, cookie, it);
210 throw cb::engine_error(cb::engine_errc::failed,
211 "EvpItemAllocateEx: EvpGetItemInfo failed");
214 return std::make_pair(cb::unique_item_ptr{it, cb::ItemDeleter{handle}},
218 static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
223 mutation_descr_t* mut_info) {
225 LOG(EXTENSION_LOG_WARNING,
226 "EvpItemDelete(): cas ptr passed is null for vb: %" PRIu16,
228 return ENGINE_EINVAL;
230 return acquireEngine(handle)->itemDelete(
231 cookie, key, *cas, vbucket, nullptr, mut_info);
234 static void EvpItemRelease(ENGINE_HANDLE* handle,
237 acquireEngine(handle)->itemRelease(cookie, itm);
240 static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
245 DocStateFilter documentStateFilter) {
246 get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
253 switch (documentStateFilter) {
254 case DocStateFilter::Alive:
256 case DocStateFilter::Deleted:
257 // MB-23640 was caused by this bug as the frontend asked for
258 // Alive and Deleted documents. The internals don't have a
259 // way of requesting just deleted documents, and luckily for
260 // us no part of our code is using this yet. Return an error
261 // if anyone start using it
262 return ENGINE_ENOTSUP;
263 case DocStateFilter::AliveOrDeleted:
264 options = static_cast<get_options_t>(options | GET_DELETED_VALUE);
268 return acquireEngine(handle)->get(cookie, itm, key, vbucket, options);
271 static cb::EngineErrorItemPair EvpGetIf(ENGINE_HANDLE* handle,
276 const item_info&)> filter) {
277 return acquireEngine(handle)->get_if(cookie, key, vbucket, filter);
280 static cb::EngineErrorItemPair EvpGetAndTouch(ENGINE_HANDLE* handle,
284 uint32_t expiry_time) {
285 return acquireEngine(handle)->get_and_touch(cookie, key, vbucket,
289 static ENGINE_ERROR_CODE EvpGetLocked(ENGINE_HANDLE* handle,
294 uint32_t lock_timeout) {
295 return acquireEngine(handle)->get_locked(
296 cookie, itm, key, vbucket, lock_timeout);
299 static ENGINE_ERROR_CODE EvpUnlock(ENGINE_HANDLE* handle,
304 return acquireEngine(handle)->unlock(cookie, key, vbucket, cas);
307 static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
309 const char* stat_key,
312 return acquireEngine(handle)->getStats(cookie, stat_key, nkey, add_stat);
315 static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
319 ENGINE_STORE_OPERATION operation,
320 DocumentState document_state) {
321 auto engine = acquireEngine(handle);
323 if (document_state == DocumentState::Deleted) {
324 Item* item = static_cast<Item*>(itm);
328 return engine->store(cookie, itm, cas, operation);
331 static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
332 const void* cookie) {
333 return acquireEngine(handle)->flush(cookie);
336 static void EvpResetStats(ENGINE_HANDLE* handle, const void*) {
337 acquireEngine(handle)->resetStats();
340 protocol_binary_response_status EventuallyPersistentEngine::setTapParam(
341 const char* keyz, const char* valz, std::string& msg) {
342 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
345 if (strcmp(keyz, "tap_keepalive") == 0) {
346 int v = std::stoi(valz);
347 validate(v, 0, MAX_TAP_KEEP_ALIVE);
348 getConfiguration().requirementsMetOrThrow("tap_keepalive");
349 setTapKeepAlive(static_cast<uint32_t>(v));
350 } else if (strcmp(keyz, "replication_throttle_threshold") == 0) {
351 getConfiguration().setReplicationThrottleThreshold(
353 } else if (strcmp(keyz, "replication_throttle_queue_cap") == 0) {
354 getConfiguration().setReplicationThrottleQueueCap(std::stoll(valz));
355 } else if (strcmp(keyz, "replication_throttle_cap_pcnt") == 0) {
356 getConfiguration().setReplicationThrottleCapPcnt(std::stoull(valz));
358 msg = "Unknown config param";
359 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
361 // Handles exceptions thrown by the standard
362 // library stoi/stoul style functions when not numeric
363 } catch (std::invalid_argument&) {
364 msg = "Argument was not numeric";
365 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
367 // Handles exceptions thrown by the standard library stoi/stoul
368 // style functions when the conversion does not fit in the datatype
369 } catch (std::out_of_range&) {
370 msg = "Argument was out of range";
371 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
373 // Handles any miscellaenous exceptions in addition to the range_error
374 // exceptions thrown by the configuration::set<param>() methods
375 } catch (std::exception& error) {
377 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
383 protocol_binary_response_status EventuallyPersistentEngine::setCheckpointParam(
384 const char* keyz, const char* valz, std::string& msg) {
385 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
388 if (strcmp(keyz, "chk_max_items") == 0) {
389 size_t v = std::stoull(valz);
390 validate(v, size_t(MIN_CHECKPOINT_ITEMS),
391 size_t(MAX_CHECKPOINT_ITEMS));
392 getConfiguration().setChkMaxItems(v);
393 } else if (strcmp(keyz, "chk_period") == 0) {
394 size_t v = std::stoull(valz);
395 validate(v, size_t(MIN_CHECKPOINT_PERIOD),
396 size_t(MAX_CHECKPOINT_PERIOD));
397 getConfiguration().setChkPeriod(v);
398 } else if (strcmp(keyz, "max_checkpoints") == 0) {
399 size_t v = std::stoull(valz);
400 validate(v, size_t(DEFAULT_MAX_CHECKPOINTS),
401 size_t(MAX_CHECKPOINTS_UPPER_BOUND));
402 getConfiguration().setMaxCheckpoints(v);
403 } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
404 getConfiguration().setItemNumBasedNewChk(cb_stob(valz));
405 } else if (strcmp(keyz, "keep_closed_chks") == 0) {
406 getConfiguration().setKeepClosedChks(cb_stob(valz));
407 } else if (strcmp(keyz, "enable_chk_merge") == 0) {
408 getConfiguration().setEnableChkMerge(cb_stob(valz));
410 msg = "Unknown config param";
411 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
414 // Handles exceptions thrown by the cb_stob function
415 } catch (invalid_argument_bool& error) {
417 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
419 // Handles exceptions thrown by the standard
420 // library stoi/stoul style functions when not numeric
421 } catch (std::invalid_argument&) {
422 msg = "Argument was not numeric";
423 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
425 // Handles exceptions thrown by the standard library stoi/stoul
426 // style functions when the conversion does not fit in the datatype
427 } catch (std::out_of_range&) {
428 msg = "Argument was out of range";
429 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
431 // Handles any miscellaenous exceptions in addition to the range_error
432 // exceptions thrown by the configuration::set<param>() methods
433 } catch (std::exception& error) {
435 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
441 protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
442 const char* keyz, const char* valz, std::string& msg) {
443 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
445 // Handle the actual mutation.
447 if (strcmp(keyz, "bg_fetch_delay") == 0) {
448 getConfiguration().setBgFetchDelay(std::stoull(valz));
449 } else if (strcmp(keyz, "flushall_enabled") == 0) {
450 getConfiguration().setFlushallEnabled(cb_stob(valz));
451 } else if (strcmp(keyz, "max_size") == 0) {
452 size_t vsize = std::stoull(valz);
454 getConfiguration().setMaxSize(vsize);
455 EPStats& st = getEpStats();
456 getConfiguration().setMemLowWat(
457 percentOf(vsize, st.mem_low_wat_percent));
458 getConfiguration().setMemHighWat(
459 percentOf(vsize, st.mem_high_wat_percent));
460 } else if (strcmp(keyz, "mem_low_wat") == 0) {
461 getConfiguration().setMemLowWat(std::stoull(valz));
462 } else if (strcmp(keyz, "mem_high_wat") == 0) {
463 getConfiguration().setMemHighWat(std::stoull(valz));
464 } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
465 getConfiguration().setBackfillMemThreshold(std::stoull(valz));
466 } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
467 getConfiguration().setCompactionExpMemThreshold(std::stoull(valz));
468 } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
469 getConfiguration().setMutationMemThreshold(std::stoull(valz));
470 } else if (strcmp(keyz, "timing_log") == 0) {
471 EPStats& stats = getEpStats();
472 std::ostream* old = stats.timingLog;
473 stats.timingLog = NULL;
475 if (strcmp(valz, "off") == 0) {
476 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
478 std::ofstream* tmp(new std::ofstream(valz));
480 LOG(EXTENSION_LOG_INFO,
481 "Logging detailed timings to ``%s''.", valz);
482 stats.timingLog = tmp;
484 LOG(EXTENSION_LOG_WARNING,
485 "Error setting detailed timing log to ``%s'': %s",
486 valz, strerror(errno));
490 } else if (strcmp(keyz, "exp_pager_enabled") == 0) {
491 getConfiguration().setExpPagerEnabled(cb_stob(valz));
492 } else if (strcmp(keyz, "exp_pager_stime") == 0) {
493 getConfiguration().setExpPagerStime(std::stoull(valz));
494 } else if (strcmp(keyz, "exp_pager_initial_run_time") == 0) {
495 getConfiguration().setExpPagerInitialRunTime(std::stoll(valz));
496 } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
497 getConfiguration().requirementsMetOrThrow("access_scanner_enabled");
498 getConfiguration().setAccessScannerEnabled(cb_stob(valz));
499 } else if (strcmp(keyz, "alog_sleep_time") == 0) {
500 getConfiguration().requirementsMetOrThrow("alog_sleep_time");
501 getConfiguration().setAlogSleepTime(std::stoull(valz));
502 } else if (strcmp(keyz, "alog_task_time") == 0) {
503 getConfiguration().requirementsMetOrThrow("alog_task_time");
504 getConfiguration().setAlogTaskTime(std::stoull(valz));
505 } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
506 getConfiguration().setPagerActiveVbPcnt(std::stoull(valz));
507 } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
508 getConfiguration().setWarmupMinMemoryThreshold(std::stoull(valz));
509 } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
510 getConfiguration().setWarmupMinItemsThreshold(std::stoull(valz));
511 } else if (strcmp(keyz, "max_num_readers") == 0 ||
512 strcmp(keyz, "num_reader_threads") == 0) {
513 size_t value = std::stoull(valz);
514 getConfiguration().setNumReaderThreads(value);
515 ExecutorPool::get()->setNumReaders(value);
516 } else if (strcmp(keyz, "max_num_writers") == 0 ||
517 strcmp(keyz, "num_writer_threads") == 0) {
518 size_t value = std::stoull(valz);
519 getConfiguration().setNumWriterThreads(value);
520 ExecutorPool::get()->setNumWriters(value);
521 } else if (strcmp(keyz, "max_num_auxio") == 0 ||
522 strcmp(keyz, "num_auxio_threads") == 0) {
523 size_t value = std::stoull(valz);
524 getConfiguration().setNumAuxioThreads(value);
525 ExecutorPool::get()->setNumAuxIO(value);
526 } else if (strcmp(keyz, "max_num_nonio") == 0 ||
527 strcmp(keyz, "num_nonio_threads") == 0) {
528 size_t value = std::stoull(valz);
529 getConfiguration().setNumNonioThreads(value);
530 ExecutorPool::get()->setNumNonIO(value);
531 } else if (strcmp(keyz, "bfilter_enabled") == 0) {
532 getConfiguration().setBfilterEnabled(cb_stob(valz));
533 } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
534 getConfiguration().setBfilterResidencyThreshold(std::stof(valz));
535 } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
536 getConfiguration().setDefragmenterEnabled(cb_stob(valz));
537 } else if (strcmp(keyz, "defragmenter_interval") == 0) {
538 size_t v = std::stoull(valz);
539 // Adding separate validation as external limit is minimum 1
540 // to prevent setting defragmenter to constantly run
541 validate(v, size_t(1), std::numeric_limits<size_t>::max());
542 getConfiguration().setDefragmenterInterval(v);
543 } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
544 getConfiguration().setDefragmenterAgeThreshold(std::stoull(valz));
545 } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
546 getConfiguration().setDefragmenterChunkDuration(std::stoull(valz));
547 } else if (strcmp(keyz, "defragmenter_run") == 0) {
548 runDefragmenterTask();
549 } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
550 getConfiguration().setCompactionWriteQueueCap(std::stoull(valz));
551 } else if (strcmp(keyz, "dcp_min_compression_ratio") == 0) {
552 getConfiguration().setDcpMinCompressionRatio(std::stof(valz));
553 } else if (strcmp(keyz, "access_scanner_run") == 0) {
554 if (!(runAccessScannerTask())) {
555 rv = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
557 } else if (strcmp(keyz, "vb_state_persist_run") == 0) {
558 runVbStatePersistTask(std::stoi(valz));
559 } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
560 getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
561 getConfiguration().setEphemeralFullPolicy(valz);
562 } else if (strcmp(keyz, "ephemeral_metadata_purge_age") == 0) {
563 getConfiguration().requirementsMetOrThrow(
564 "ephemeral_metadata_purge_age");
565 getConfiguration().setEphemeralMetadataPurgeAge(std::stoull(valz));
566 } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
567 getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
568 getConfiguration().setEphemeralMetadataPurgeInterval(
571 msg = "Unknown config param";
572 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
574 // Handles exceptions thrown by the cb_stob function
575 } catch (invalid_argument_bool& error) {
577 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
579 // Handles exceptions thrown by the standard
580 // library stoi/stoul style functions when not numeric
581 } catch (std::invalid_argument&) {
582 msg = "Argument was not numeric";
583 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
585 // Handles exceptions thrown by the standard library stoi/stoul
586 // style functions when the conversion does not fit in the datatype
587 } catch (std::out_of_range&) {
588 msg = "Argument was out of range";
589 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
591 // Handles any miscellaneous exceptions in addition to the range_error
592 // exceptions thrown by the configuration::set<param>() methods
593 } catch (std::exception& error) {
595 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
601 protocol_binary_response_status EventuallyPersistentEngine::setDcpParam(
602 const char* keyz, const char* valz, std::string& msg) {
603 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
607 "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
608 size_t v = atoi(valz);
610 validate(v, size_t(1), std::numeric_limits<size_t>::max());
611 getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(
614 strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") ==
616 size_t v = atoi(valz);
618 validate(v, size_t(1), std::numeric_limits<size_t>::max());
619 getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(
622 msg = "Unknown config param";
623 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
625 } catch (std::runtime_error& ex) {
626 msg = "Value out of range.";
627 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
633 protocol_binary_response_status EventuallyPersistentEngine::setVbucketParam(
638 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
640 if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
641 uint64_t v = std::strtoull(valz, nullptr, 10);
643 getConfiguration().setHlcDriftAheadThresholdUs(v);
644 } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
645 uint64_t v = std::strtoull(valz, nullptr, 10);
647 getConfiguration().setHlcDriftBehindThresholdUs(v);
648 } else if (strcmp(keyz, "max_cas") == 0) {
649 uint64_t v = std::strtoull(valz, nullptr, 10);
651 LOG(EXTENSION_LOG_WARNING, "setVbucketParam: max_cas:%" PRIu64 " "
652 "vb:%" PRIu16 "\n", v, vbucket);
653 if (getKVBucket()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
654 rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
655 msg = "Not my vbucket";
658 msg = "Unknown config param";
659 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
661 } catch (std::runtime_error& ex) {
662 msg = "Value out of range.";
663 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
668 static protocol_binary_response_status evictKey(
669 EventuallyPersistentEngine* e,
670 protocol_binary_request_header
674 DocNamespace docNamespace) {
675 protocol_binary_request_no_extras* req =
676 (protocol_binary_request_no_extras*)request;
678 const uint8_t* keyPtr = reinterpret_cast<const uint8_t*>(request) +
680 size_t keylen = ntohs(req->message.header.request.keylen);
681 uint16_t vbucket = ntohs(request->request.vbucket);
683 LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key{%.*s}\n",
684 int(keylen), keyPtr);
686 auto rv = e->evictKey(DocKey(keyPtr, keylen, docNamespace), vbucket, msg);
687 if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
688 rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
689 if (e->isDegradedMode()) {
690 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
696 protocol_binary_response_status EventuallyPersistentEngine::setParam(
697 protocol_binary_request_set_param* req, std::string& msg) {
698 size_t keylen = ntohs(req->message.header.request.keylen);
699 uint8_t extlen = req->message.header.request.extlen;
700 size_t vallen = ntohl(req->message.header.request.bodylen);
701 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
702 protocol_binary_engine_param_t paramtype =
703 static_cast<protocol_binary_engine_param_t>(ntohl(
704 req->message.body.param_type));
706 if (keylen == 0 || (vallen - keylen - extlen) == 0) {
707 return PROTOCOL_BINARY_RESPONSE_EINVAL;
710 const char* keyp = reinterpret_cast<const char*>(req->bytes)
711 + sizeof(req->bytes);
712 const char* valuep = keyp + keylen;
713 vallen -= (keylen + extlen);
719 if (keylen >= sizeof(keyz)) {
720 msg = "Key is too large.";
721 return PROTOCOL_BINARY_RESPONSE_EINVAL;
723 memcpy(keyz, keyp, keylen);
727 if (vallen >= sizeof(valz)) {
728 msg = "Value is too large.";
729 return PROTOCOL_BINARY_RESPONSE_EINVAL;
731 memcpy(valz, valuep, vallen);
734 protocol_binary_response_status rv;
737 case protocol_binary_engine_param_flush:
738 rv = setFlushParam(keyz, valz, msg);
740 case protocol_binary_engine_param_tap:
741 rv = setTapParam(keyz, valz, msg);
743 case protocol_binary_engine_param_checkpoint:
744 rv = setCheckpointParam(keyz, valz, msg);
746 case protocol_binary_engine_param_dcp:
747 rv = setDcpParam(keyz, valz, msg);
749 case protocol_binary_engine_param_vbucket:
750 rv = setVbucketParam(vbucket, keyz, valz, msg);
753 rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
759 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine* e,
761 protocol_binary_request_header* request,
762 ADD_RESPONSE response) {
763 protocol_binary_request_get_vbucket* req =
764 reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
765 if (req == nullptr) {
766 throw std::invalid_argument("getVBucket: Unable to convert req"
767 " to protocol_binary_request_get_vbucket");
770 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
771 VBucketPtr vb = e->getVBucket(vbucket);
773 return e->sendNotMyVBucketResponse(response, cookie, 0);
775 vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
776 return sendResponse(response, NULL, 0, NULL, 0, &state,
778 PROTOCOL_BINARY_RAW_BYTES,
779 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
783 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine* e,
785 protocol_binary_request_header* request,
786 ADD_RESPONSE response) {
788 protocol_binary_request_set_vbucket* req =
789 reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
791 uint64_t cas = ntohll(req->message.header.request.cas);
793 size_t bodylen = ntohl(req->message.header.request.bodylen)
794 - ntohs(req->message.header.request.keylen);
795 if (bodylen != sizeof(vbucket_state_t)) {
796 const std::string msg("Incorrect packet format");
797 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
798 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
799 PROTOCOL_BINARY_RESPONSE_EINVAL,
803 vbucket_state_t state;
804 memcpy(&state, &req->message.body.state, sizeof(state));
805 state = static_cast<vbucket_state_t>(ntohl(state));
807 if (!is_valid_vbucket_state_t(state)) {
808 const std::string msg("Invalid vbucket state");
809 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
810 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
811 PROTOCOL_BINARY_RESPONSE_EINVAL,
815 uint16_t vb = ntohs(req->message.header.request.vbucket);
816 if (e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
817 const std::string msg("VBucket number too big");
818 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
819 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
820 PROTOCOL_BINARY_RESPONSE_ERANGE,
823 return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
824 PROTOCOL_BINARY_RAW_BYTES,
825 PROTOCOL_BINARY_RESPONSE_SUCCESS,
829 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine* e,
831 protocol_binary_request_header* req,
832 ADD_RESPONSE response) {
834 uint64_t cas = ntohll(req->request.cas);
836 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
837 uint16_t vbucket = ntohs(req->request.vbucket);
839 std::string msg = "";
840 if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
841 msg = "Incorrect packet format.";
842 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
844 PROTOCOL_BINARY_RAW_BYTES,
845 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
849 uint32_t bodylen = ntohl(req->request.bodylen);
851 const char* ptr = reinterpret_cast<const char*>(req->bytes) +
853 if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
858 ENGINE_ERROR_CODE err;
859 void* es = e->getEngineSpecific(cookie);
862 err = e->deleteVBucket(vbucket, cookie);
863 e->storeEngineSpecific(cookie, e);
865 e->storeEngineSpecific(cookie, NULL);
866 LOG(EXTENSION_LOG_INFO,
867 "Completed sync deletion of vbucket %u",
869 err = ENGINE_SUCCESS;
872 err = e->deleteVBucket(vbucket);
876 LOG(EXTENSION_LOG_NOTICE,
877 "Deletion of vbucket %d was completed.", vbucket);
879 case ENGINE_NOT_MY_VBUCKET:
880 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
881 "because the vbucket doesn't exist!!!", vbucket);
882 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
885 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
886 "because the vbucket is not in a dead state\n", vbucket);
887 msg = "Failed to delete vbucket. Must be in the dead state.";
888 res = PROTOCOL_BINARY_RESPONSE_EINVAL;
890 case ENGINE_EWOULDBLOCK:
891 LOG(EXTENSION_LOG_NOTICE, "Request for vbucket %d deletion is in"
892 " EWOULDBLOCK until the database file is removed from disk",
894 e->storeEngineSpecific(cookie, req);
895 return ENGINE_EWOULDBLOCK;
897 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
898 "because of unknown reasons\n", vbucket);
899 msg = "Failed to delete vbucket. Unknown reason.";
900 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
903 if (err != ENGINE_NOT_MY_VBUCKET) {
904 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
905 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
908 return e->sendNotMyVBucketResponse(response, cookie, cas);
912 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine* e,
913 protocol_binary_request_header* request,
917 protocol_binary_response_status* res,
918 DocNamespace docNamespace) {
919 KVBucketIface* kvb = e->getKVBucket();
920 protocol_binary_request_no_extras* req =
921 (protocol_binary_request_no_extras*)request;
922 int keylen = ntohs(req->message.header.request.keylen);
923 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
924 ENGINE_ERROR_CODE error_code;
925 DocKey key(reinterpret_cast<const uint8_t*>(request) + sizeof(*request),
926 keylen, docNamespace);
928 GetValue rv(kvb->getReplica(key, vbucket, cookie));
930 if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
931 if (error_code == ENGINE_NOT_MY_VBUCKET) {
932 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
934 } else if (error_code == ENGINE_TMPFAIL) {
936 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
942 *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
944 ++(e->getEpStats().numOpsGet);
945 return ENGINE_SUCCESS;
948 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine* e,
950 protocol_binary_request_compact_db* req,
951 ADD_RESPONSE response) {
953 std::string msg = "";
954 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
955 compaction_ctx compactreq;
956 uint64_t cas = ntohll(req->message.header.request.cas);
958 if (ntohs(req->message.header.request.keylen) > 0 ||
959 req->message.header.request.extlen != 24) {
960 LOG(EXTENSION_LOG_WARNING,
961 "Compaction received bad ext/key len %d/%d.",
962 req->message.header.request.extlen,
963 ntohs(req->message.header.request.keylen));
964 msg = "Incorrect packet format.";
965 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
967 PROTOCOL_BINARY_RAW_BYTES,
968 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
970 EPStats& stats = e->getEpStats();
971 compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
972 compactreq.purge_before_seq =
973 ntohll(req->message.body.purge_before_seq);
974 compactreq.drop_deletes = req->message.body.drop_deletes;
975 compactreq.db_file_id = e->getKVBucket()->getDBFileId(*req);
976 uint16_t vbid = ntohs(req->message.header.request.vbucket);
978 ENGINE_ERROR_CODE err;
979 void* es = e->getEngineSpecific(cookie);
981 ++stats.pendingCompactions;
982 e->storeEngineSpecific(cookie, e);
983 err = e->compactDB(vbid, compactreq, cookie);
985 e->storeEngineSpecific(cookie, NULL);
986 err = ENGINE_SUCCESS;
991 LOG(EXTENSION_LOG_NOTICE,
992 "Compaction of db file id: %d completed.", compactreq.db_file_id);
994 case ENGINE_NOT_MY_VBUCKET:
995 --stats.pendingCompactions;
996 LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
997 "because the db file doesn't exist!!!", compactreq.db_file_id);
998 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1001 --stats.pendingCompactions;
1002 LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1003 "because of an invalid argument", compactreq.db_file_id);
1004 res = PROTOCOL_BINARY_RESPONSE_EINVAL;
1006 case ENGINE_EWOULDBLOCK:
1007 LOG(EXTENSION_LOG_NOTICE,
1008 "Compaction of db file id: %d scheduled "
1009 "(awaiting completion).", compactreq.db_file_id);
1010 e->storeEngineSpecific(cookie, req);
1011 return ENGINE_EWOULDBLOCK;
1012 case ENGINE_TMPFAIL:
1013 LOG(EXTENSION_LOG_WARNING, "Request to compact db file id: %d hit"
1014 " a temporary failure and may need to be retried",
1015 compactreq.db_file_id);
1016 msg = "Temporary failure in compacting db file.";
1017 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1020 --stats.pendingCompactions;
1021 LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1022 "because of unknown reasons\n", compactreq.db_file_id);
1023 msg = "Failed to compact db file. Unknown reason.";
1024 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1028 if (err != ENGINE_NOT_MY_VBUCKET) {
1029 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1030 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1033 return e->sendNotMyVBucketResponse(response, cookie, cas);
1037 static ENGINE_ERROR_CODE processUnknownCommand(
1038 EventuallyPersistentEngine* h,
1040 protocol_binary_request_header* request,
1041 ADD_RESPONSE response,
1042 DocNamespace docNamespace) {
1043 protocol_binary_response_status res =
1044 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1045 std::string dynamic_msg;
1046 const char* msg = NULL;
1047 size_t msg_size = 0;
1050 EPStats& stats = h->getEpStats();
1051 ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1054 * Session validation
1055 * (For ns_server commands only)
1057 switch (request->request.opcode) {
1058 case PROTOCOL_BINARY_CMD_SET_PARAM:
1059 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1060 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1061 case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1062 case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1063 case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1064 case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1065 if (h->getEngineSpecific(cookie) == NULL) {
1066 uint64_t cas = ntohll(request->request.cas);
1067 if (!h->validateSessionCas(cas)) {
1068 const std::string message("Invalid session token");
1069 return sendResponse(response, NULL, 0, NULL, 0,
1070 message.c_str(), message.length(),
1071 PROTOCOL_BINARY_RAW_BYTES,
1072 PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1082 switch (request->request.opcode) {
1083 case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1084 return h->getAllVBucketSequenceNumbers(cookie, request, response);
1086 case PROTOCOL_BINARY_CMD_GET_VBUCKET: {
1087 BlockTimer timer(&stats.getVbucketCmdHisto);
1088 rv = getVBucket(h, cookie, request, response);
1091 case PROTOCOL_BINARY_CMD_DEL_VBUCKET: {
1092 BlockTimer timer(&stats.delVbucketCmdHisto);
1093 rv = delVBucket(h, cookie, request, response);
1094 if (rv != ENGINE_EWOULDBLOCK) {
1095 h->decrementSessionCtr();
1096 h->storeEngineSpecific(cookie, NULL);
1100 case PROTOCOL_BINARY_CMD_SET_VBUCKET: {
1101 BlockTimer timer(&stats.setVbucketCmdHisto);
1102 rv = setVBucket(h, cookie, request, response);
1103 h->decrementSessionCtr();
1106 case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1107 res = h->stopFlusher(&msg, &msg_size);
1109 case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1110 res = h->startFlusher(&msg, &msg_size);
1112 case PROTOCOL_BINARY_CMD_SET_PARAM:
1114 reinterpret_cast<protocol_binary_request_set_param*>(request),
1116 msg = dynamic_msg.c_str();
1117 msg_size = dynamic_msg.length();
1118 h->decrementSessionCtr();
1120 case PROTOCOL_BINARY_CMD_EVICT_KEY:
1121 res = evictKey(h, request, &msg, &msg_size, docNamespace);
1123 case PROTOCOL_BINARY_CMD_OBSERVE:
1124 return h->observe(cookie, request, response, docNamespace);
1125 case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1126 return h->observe_seqno(cookie, request, response);
1127 case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT: {
1128 rv = h->deregisterTapClient(cookie, request, response);
1129 h->decrementSessionCtr();
1132 case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN: {
1133 rv = h->resetReplicationChain(cookie, request, response);
1136 case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER: {
1137 rv = h->changeTapVBFilter(cookie, request, response);
1138 h->decrementSessionCtr();
1141 case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1142 case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1143 case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE: {
1144 rv = h->handleCheckpointCmds(cookie, request, response);
1147 case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE: {
1148 rv = h->handleSeqnoCmds(cookie, request, response);
1151 case PROTOCOL_BINARY_CMD_GET_META:
1152 case PROTOCOL_BINARY_CMD_GETQ_META: {
1153 rv = h->getMeta(cookie,
1154 reinterpret_cast<protocol_binary_request_get_meta*>
1155 (request), response,
1159 case PROTOCOL_BINARY_CMD_SET_WITH_META:
1160 case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1161 case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1162 case PROTOCOL_BINARY_CMD_ADDQ_WITH_META: {
1163 rv = h->setWithMeta(cookie,
1164 reinterpret_cast<protocol_binary_request_set_with_meta*>
1165 (request), response,
1169 case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1170 case PROTOCOL_BINARY_CMD_DELQ_WITH_META: {
1171 rv = h->deleteWithMeta(cookie,
1172 reinterpret_cast<protocol_binary_request_delete_with_meta*>
1173 (request), response,
1177 case PROTOCOL_BINARY_CMD_RETURN_META: {
1178 return h->returnMeta(cookie,
1179 reinterpret_cast<protocol_binary_request_return_meta*>
1180 (request), response,
1183 case PROTOCOL_BINARY_CMD_GET_REPLICA:
1184 rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res, docNamespace);
1185 if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1189 case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1190 case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC: {
1191 rv = h->handleTrafficControlCmd(cookie, request, response);
1194 case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG: {
1195 rv = h->setClusterConfig(cookie,
1196 reinterpret_cast<protocol_binary_request_set_cluster_config*>
1197 (request), response);
1198 h->decrementSessionCtr();
1201 case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1202 return h->getClusterConfig(cookie,
1203 reinterpret_cast<protocol_binary_request_get_cluster_config*>
1204 (request), response);
1205 case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1206 rv = compactDB(h, cookie,
1207 (protocol_binary_request_compact_db*)(request),
1209 if (rv != ENGINE_EWOULDBLOCK) {
1210 h->decrementSessionCtr();
1211 h->storeEngineSpecific(cookie, NULL);
1215 case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY: {
1216 if (request->request.extlen != 0 ||
1217 request->request.keylen != 0 ||
1218 request->request.bodylen != 0) {
1219 return ENGINE_EINVAL;
1221 return h->getRandomKey(cookie, response);
1223 case PROTOCOL_BINARY_CMD_GET_KEYS: {
1224 return h->getAllKeys(cookie,
1225 reinterpret_cast<protocol_binary_request_get_keys*>
1226 (request), response,
1229 // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
1230 case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1231 case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE: {
1232 return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
1233 PROTOCOL_BINARY_RAW_BYTES,
1234 PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
1240 uint32_t flags = itm->getFlags();
1241 rv = sendResponse(response,
1242 static_cast<const void*>(itm->getKey().data()),
1243 itm->getKey().size(),
1244 (const void*)&flags, sizeof(uint32_t),
1245 static_cast<const void*>(itm->getData()),
1246 itm->getNBytes(), itm->getDataType(),
1247 static_cast<uint16_t>(res), itm->getCas(),
1250 } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1251 return h->sendNotMyVBucketResponse(response, cookie, 0);
1253 msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1254 rv = sendResponse(response, NULL, 0, NULL, 0,
1255 msg, static_cast<uint16_t>(msg_size),
1256 PROTOCOL_BINARY_RAW_BYTES,
1257 static_cast<uint16_t>(res), 0, cookie);
1263 static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1265 protocol_binary_request_header
1267 ADD_RESPONSE response,
1268 DocNamespace doc_namespace) {
1269 auto engine = acquireEngine(handle);
1270 auto ret = processUnknownCommand(
1271 engine.get(), cookie, request, response, doc_namespace);
1275 static void EvpItemSetCas(ENGINE_HANDLE*, const void*,
1276 item* itm, uint64_t cas) {
1277 static_cast<Item*>(itm)->setCas(cas);
1280 static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1282 void* engine_specific,
1286 tap_event_t tap_event,
1297 if (!mcbp::datatype::is_valid(datatype)) {
1298 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1300 return ENGINE_EINVAL;
1303 return acquireEngine(handle)->tapNotify(cookie,
1308 (uint16_t)tap_event,
1321 static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1322 const void* cookie, item** itm,
1323 void** es, uint16_t* nes, uint8_t* ttl,
1324 uint16_t* flags, uint32_t* seqno,
1325 uint16_t* vbucket) {
1326 uint16_t tap_event = acquireEngine(handle)->walkTapQueue(
1327 cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1328 return static_cast<tap_event_t>(tap_event);
1331 static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1336 const void* userdata,
1338 auto engine = acquireEngine(handle);
1339 TAP_ITERATOR iterator = NULL;
1341 std::string c(static_cast<const char*>(client), nclient);
1342 // Figure out what we want from the userdata before adding it to
1343 // the API to the handle
1344 if (engine->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1345 iterator = EvpTapIterator;
1352 static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1354 struct dcp_message_producers* producers) {
1355 auto engine = acquireEngine(handle);
1356 ConnHandler* conn = engine->getConnHandler(cookie);
1358 return conn->step(producers);
1360 return ENGINE_DISCONNECT;
1364 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1371 return acquireEngine(handle)->dcpOpen(
1372 cookie, opaque, seqno, flags, name, nname);
1375 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1380 return acquireEngine(handle)->dcpAddStream(cookie, opaque, vbucket, flags);
1383 static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1387 auto engine = acquireEngine(handle);
1388 ConnHandler* conn = engine->getConnHandler(cookie);
1390 return conn->closeStream(opaque, vbucket);
1392 return ENGINE_DISCONNECT;
1396 static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1401 uint64_t startSeqno,
1403 uint64_t vbucketUuid,
1404 uint64_t snapStartSeqno,
1405 uint64_t snapEndSeqno,
1406 uint64_t* rollbackSeqno,
1407 dcp_add_failover_log callback) {
1408 auto engine = acquireEngine(handle);
1409 ConnHandler* conn = engine->getConnHandler(cookie);
1411 return conn->streamRequest(flags,
1422 return ENGINE_DISCONNECT;
1425 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1429 dcp_add_failover_log callback) {
1430 auto engine = acquireEngine(handle);
1431 ConnHandler* conn = engine->getConnHandler(cookie);
1433 return conn->getFailoverLog(opaque, vbucket, callback);
1435 return ENGINE_DISCONNECT;
1439 static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1444 auto engine = acquireEngine(handle);
1445 ConnHandler* conn = engine->getConnHandler(cookie);
1447 return conn->streamEnd(opaque, vbucket, flags);
1449 return ENGINE_DISCONNECT;
1453 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1457 uint64_t start_seqno,
1460 auto engine = acquireEngine(handle);
1461 ConnHandler* conn = engine->getConnHandler(cookie);
1463 return conn->snapshotMarker(
1464 opaque, vbucket, start_seqno, end_seqno, flags);
1466 return ENGINE_DISCONNECT;
1469 static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1473 cb::const_byte_buffer value,
1481 uint32_t expiration,
1483 cb::const_byte_buffer meta,
1485 if (!mcbp::datatype::is_valid(datatype)) {
1486 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1488 return ENGINE_EINVAL;
1490 auto engine = acquireEngine(handle);
1491 ConnHandler* conn = engine->getConnHandler(cookie);
1493 return conn->mutation(opaque, key, value, priv_bytes, datatype, cas,
1494 vbucket, flags, by_seqno, rev_seqno, expiration,
1495 lock_time, meta, nru);
1497 return ENGINE_DISCONNECT;
1500 static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1504 cb::const_byte_buffer value,
1511 cb::const_byte_buffer meta) {
1512 auto engine = acquireEngine(handle);
1513 ConnHandler* conn = engine->getConnHandler(cookie);
1515 return conn->deletion(opaque, key, value, priv_bytes, datatype, cas,
1516 vbucket, by_seqno, rev_seqno, meta);
1518 return ENGINE_DISCONNECT;
1521 static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1525 cb::const_byte_buffer value,
1532 cb::const_byte_buffer meta) {
1533 auto engine = acquireEngine(handle);
1534 ConnHandler* conn = engine->getConnHandler(cookie);
1536 return conn->expiration(opaque, key, value, priv_bytes, datatype, cas,
1537 vbucket, by_seqno, rev_seqno, meta);
1539 return ENGINE_DISCONNECT;
1542 static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1546 auto engine = acquireEngine(handle);
1547 ConnHandler* conn = engine->getConnHandler(cookie);
1549 return conn->flushall(opaque, vbucket);
1551 return ENGINE_DISCONNECT;
1554 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1558 vbucket_state_t state) {
1559 auto engine = acquireEngine(handle);
1560 ConnHandler* conn = engine->getConnHandler(cookie);
1562 return conn->setVBucketState(opaque, vbucket, state);
1564 return ENGINE_DISCONNECT;
1567 static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1570 auto engine = acquireEngine(handle);
1571 ConnHandler* conn = engine->getConnHandler(cookie);
1573 return conn->noop(opaque);
1575 return ENGINE_DISCONNECT;
1578 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1582 uint32_t buffer_bytes) {
1583 auto engine = acquireEngine(handle);
1584 ConnHandler* conn = engine->getConnHandler(cookie);
1586 return conn->bufferAcknowledgement(opaque, vbucket, buffer_bytes);
1588 return ENGINE_DISCONNECT;
1591 static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1598 auto engine = acquireEngine(handle);
1599 ConnHandler* conn = engine->getConnHandler(cookie);
1601 return conn->control(opaque, key, nkey, value, nvalue);
1603 return ENGINE_DISCONNECT;
1606 static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1608 protocol_binary_response_header* response) {
1609 auto engine = acquireEngine(handle);
1610 ConnHandler* conn = engine->getConnHandler(cookie);
1612 if (conn->handleResponse(response)) {
1613 return ENGINE_SUCCESS;
1616 return ENGINE_DISCONNECT;
1619 static void EvpHandleDisconnect(const void* cookie,
1620 ENGINE_EVENT_TYPE type,
1621 const void* event_data,
1622 const void* cb_data) {
1623 if (type != ON_DISCONNECT) {
1624 throw std::invalid_argument("EvpHandleDisconnect: type "
1625 "(which is" + std::to_string(type) +
1626 ") is not ON_DISCONNECT");
1628 if (event_data != nullptr) {
1629 throw std::invalid_argument("EvpHandleDisconnect: event_data "
1632 void* c = const_cast<void*>(cb_data);
1633 acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1636 static void EvpHandleDeleteBucket(const void* cookie,
1637 ENGINE_EVENT_TYPE type,
1638 const void* event_data,
1639 const void* cb_data) {
1640 if (type != ON_DELETE_BUCKET) {
1641 throw std::invalid_argument("EvpHandleDeleteBucket: type "
1642 "(which is" + std::to_string(type) +
1643 ") is not ON_DELETE_BUCKET");
1645 if (event_data != nullptr) {
1646 throw std::invalid_argument("EvpHandleDeleteBucket: event_data "
1649 void* c = const_cast<void*>(cb_data);
1650 acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDeleteBucket(cookie);
1653 void EvpSetLogLevel(ENGINE_HANDLE* handle, EXTENSION_LOG_LEVEL level) {
1654 Logger::setGlobalLogLevel(level);
1658 * The only public interface to the eventually persistent engine.
1659 * Allocate a new instance and initialize it
1660 * @param interface the highest interface the server supports (we only
1661 * support interface 1)
1662 * @param get_server_api callback function to get the server exported API
1664 * @param handle Where to return the new instance
1665 * @return ENGINE_SUCCESS on success
1667 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1668 GET_SERVER_API get_server_api,
1669 ENGINE_HANDLE** handle) {
1670 SERVER_HANDLE_V1* api = get_server_api();
1671 if (interface != 1 || api == NULL) {
1672 return ENGINE_ENOTSUP;
1675 Logger::setLoggerAPI(api->log);
1677 MemoryTracker::getInstance(*api->alloc_hooks);
1678 ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1680 std::atomic<size_t>* inital_tracking = new std::atomic<size_t>();
1682 ObjectRegistry::setStats(inital_tracking);
1683 EventuallyPersistentEngine* engine;
1684 engine = new EventuallyPersistentEngine(get_server_api);
1685 ObjectRegistry::setStats(NULL);
1687 if (engine == NULL) {
1688 return ENGINE_ENOMEM;
1691 if (MemoryTracker::trackingMemoryAllocations()) {
1692 engine->getEpStats().memoryTrackerEnabled.store(true);
1693 engine->getEpStats().totalMemory->store(inital_tracking->load());
1695 delete inital_tracking;
1697 initialize_time_functions(api->core);
1699 *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1701 return ENGINE_SUCCESS;
1705 This method is called prior to unloading of the shared-object.
1706 Global clean-up should be performed from this method.
1708 void destroy_engine() {
1709 ExecutorPool::shutdown();
1710 // A single MemoryTracker exists for *all* buckets
1711 // and must be destroyed before unloading the shared object.
1712 MemoryTracker::destroyInstance();
1713 ObjectRegistry::reset();
1716 static bool EvpGetItemInfo(ENGINE_HANDLE* handle, const void*,
1717 const item* itm, item_info* itm_info) {
1718 const Item* it = reinterpret_cast<const Item*>(itm);
1719 auto engine = acquireEngine(handle);
1720 VBucketPtr vb = engine->getKVBucket()->getVBucket(it->getVBucketId());
1721 uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
1722 *itm_info = it->toItemInfo(vb_uuid);
1726 static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1727 item* itm, const item_info* itm_info) {
1728 Item* it = reinterpret_cast<Item*>(itm);
1732 it->setDataType(itm_info->datatype);
1736 static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1738 engine_get_vb_map_cb callback) {
1739 auto engine = acquireEngine(handle);
1740 LockHolder lh(engine->clusterConfig.lock);
1741 const char* config = engine->clusterConfig.config.data();
1742 uint32_t len = engine->clusterConfig.config.size();
1743 engine.reset(); // Want to release the engine before the callback
1744 return callback(cookie, config, len);
1747 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1750 global_logger.vlog(severity, fmt, va);
1754 EventuallyPersistentEngine::EventuallyPersistentEngine(
1755 GET_SERVER_API get_server_api)
1759 workloadPriority(NO_BUCKET_PRIORITY),
1760 replicationThrottle(NULL),
1761 getServerApiFunc(get_server_api),
1763 dcpFlowControlManager_(NULL),
1766 checkpointConfig(NULL),
1767 trafficEnabled(false),
1768 deleteAllEnabled(false),
1771 interface.interface = 1;
1772 ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1773 ENGINE_HANDLE_V1::initialize = EvpInitialize;
1774 ENGINE_HANDLE_V1::destroy = EvpDestroy;
1775 ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1776 ENGINE_HANDLE_V1::allocate_ex = EvpItemAllocateEx;
1777 ENGINE_HANDLE_V1::remove = EvpItemDelete;
1778 ENGINE_HANDLE_V1::release = EvpItemRelease;
1779 ENGINE_HANDLE_V1::get = EvpGet;
1780 ENGINE_HANDLE_V1::get_if = EvpGetIf;
1781 ENGINE_HANDLE_V1::get_and_touch = EvpGetAndTouch;
1782 ENGINE_HANDLE_V1::get_locked = EvpGetLocked;
1783 ENGINE_HANDLE_V1::unlock = EvpUnlock;
1784 ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1785 ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1786 ENGINE_HANDLE_V1::store = EvpStore;
1787 ENGINE_HANDLE_V1::flush = EvpFlush;
1788 ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1789 ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1790 ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1791 ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1792 ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1793 ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1794 ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1796 ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1797 ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1798 ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1799 ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1800 ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1801 ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1802 ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1803 ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1804 ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1805 ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1806 ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1807 ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1808 ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1809 ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1810 ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1811 ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1812 ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1813 ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
1815 serverApi = getServerApiFunc();
1816 memset(&info, 0, sizeof(info));
1817 info.info.description = "EP engine v" VERSION;
1818 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1819 info.info.features[info.info.num_features++].feature =
1820 ENGINE_FEATURE_PERSISTENT_STORAGE;
1821 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1822 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1825 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1827 EventuallyPersistentEngine *epe =
1828 ObjectRegistry::onSwitchThread(NULL, true);
1829 ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1830 ObjectRegistry::onSwitchThread(epe);
1834 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1836 EventuallyPersistentEngine *epe =
1837 ObjectRegistry::onSwitchThread(NULL, true);
1838 ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1839 ObjectRegistry::onSwitchThread(epe);
1843 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1845 const void *cb_data) {
1846 EventuallyPersistentEngine *epe =
1847 ObjectRegistry::onSwitchThread(NULL, true);
1848 SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1849 sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1851 ObjectRegistry::onSwitchThread(epe);
1855 * A configuration value changed listener that responds to ep-engine
1856 * parameter changes by invoking engine-specific methods on
1857 * configuration change events.
1859 class EpEngineValueChangeListener : public ValueChangedListener {
1861 EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1865 virtual void sizeValueChanged(const std::string &key, size_t value) {
1866 if (key.compare("getl_max_timeout") == 0) {
1867 engine.setGetlMaxTimeout(value);
1868 } else if (key.compare("getl_default_timeout") == 0) {
1869 engine.setGetlDefaultTimeout(value);
1870 } else if (key.compare("max_item_size") == 0) {
1871 engine.setMaxItemSize(value);
1872 } else if (key.compare("max_item_privileged_bytes") == 0) {
1873 engine.setMaxItemPrivilegedBytes(value);
1877 virtual void booleanValueChanged(const std::string &key, bool value) {
1878 if (key.compare("flushall_enabled") == 0) {
1879 engine.setDeleteAll(value);
1883 EventuallyPersistentEngine &engine;
1888 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1890 if (config != NULL) {
1891 LOG(EXTENSION_LOG_NOTICE, "EPEngine::initialize: parsing config:\"%s\"",
1893 if (!configuration.parseConfiguration(config, serverApi)) {
1894 LOG(EXTENSION_LOG_WARNING, "Failed to parse the configuration config "
1895 "during bucket initialization. config=%s", config);
1896 return ENGINE_FAILED;
1900 name = configuration.getCouchBucket();
1901 maxFailoverEntries = configuration.getMaxFailoverEntries();
1903 // Start updating the variables from the config!
1904 HashTable::setDefaultNumBuckets(configuration.getHtSize());
1905 HashTable::setDefaultNumLocks(configuration.getHtLocks());
1906 StoredValue::setMutationMemoryThreshold(
1907 configuration.getMutationMemThreshold());
1909 if (configuration.getMaxSize() == 0) {
1910 configuration.setMaxSize(std::numeric_limits<size_t>::max());
1913 if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1914 stats.mem_low_wat_percent.store(0.75);
1915 configuration.setMemLowWat(percentOf(
1916 configuration.getMaxSize(), stats.mem_low_wat_percent.load()));
1919 if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1920 stats.mem_high_wat_percent.store(0.85);
1921 configuration.setMemHighWat(percentOf(
1922 configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
1925 maxItemSize = configuration.getMaxItemSize();
1926 configuration.addValueChangedListener("max_item_size",
1927 new EpEngineValueChangeListener(*this));
1929 maxItemPrivilegedBytes = configuration.getMaxItemPrivilegedBytes();
1930 configuration.addValueChangedListener(
1931 "max_item_privileged_bytes",
1932 new EpEngineValueChangeListener(*this));
1934 getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1935 configuration.addValueChangedListener("getl_default_timeout",
1936 new EpEngineValueChangeListener(*this));
1937 getlMaxTimeout = configuration.getGetlMaxTimeout();
1938 configuration.addValueChangedListener("getl_max_timeout",
1939 new EpEngineValueChangeListener(*this));
1941 deleteAllEnabled = configuration.isFlushallEnabled();
1942 configuration.addValueChangedListener("flushall_enabled",
1943 new EpEngineValueChangeListener(*this));
1945 workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
1946 configuration.getMaxNumShards());
1947 if ((unsigned int)workload->getNumShards() >
1948 configuration.getMaxVbuckets()) {
1949 LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
1950 "equal or less than max number of vbuckets");
1951 return ENGINE_FAILED;
1954 dcpConnMap_ = new DcpConnMap(*this);
1956 /* Get the flow control policy */
1957 std::string flowCtlPolicy = configuration.getDcpFlowControlPolicy();
1959 if (!flowCtlPolicy.compare("static")) {
1960 dcpFlowControlManager_ = new DcpFlowControlManagerStatic(*this);
1961 } else if (!flowCtlPolicy.compare("dynamic")) {
1962 dcpFlowControlManager_ = new DcpFlowControlManagerDynamic(*this);
1963 } else if (!flowCtlPolicy.compare("aggressive")) {
1964 dcpFlowControlManager_ = new DcpFlowControlManagerAggressive(*this);
1966 /* Flow control is not enabled */
1967 dcpFlowControlManager_ = new DcpFlowControlManager(*this);
1970 tapConnMap = new TapConnMap(*this);
1971 tapConfig = new TapConfig(*this);
1972 replicationThrottle = new ReplicationThrottle(configuration, stats);
1973 TapConfig::addConfigChangeListener(*this);
1975 checkpointConfig = new CheckpointConfig(*this);
1976 CheckpointConfig::addConfigChangeListener(*this);
1978 kvBucket = makeBucket(configuration);
1980 initializeEngineCallbacks();
1982 // Complete the initialization of the ep-store
1983 if (!kvBucket->initialize()) {
1984 return ENGINE_FAILED;
1987 if(configuration.isDataTrafficEnabled()) {
1988 enableTraffic(true);
1991 tapConnMap->initialize(TAP_CONN_NOTIFIER);
1992 dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
1994 // record engine initialization time
1995 startupTime.store(ep_real_time());
1997 LOG(EXTENSION_LOG_NOTICE,
1998 "EP Engine: Initialization of %s bucket complete",
1999 configuration.getBucketType().c_str());
2001 return ENGINE_SUCCESS;
2004 void EventuallyPersistentEngine::destroy(bool force) {
2005 stats.forceShutdown = force;
2006 stats.isShutdown = true;
2008 // Perform a snapshot of the stats before shutting down so we can persist
2009 // the type of shutdown (stats.forceShutdown), and consequently on the
2010 // next warmup can determine is there was a clean shutdown - see
2011 // Warmup::cleanShutdown
2013 kvBucket->snapshotStats();
2016 tapConnMap->shutdownAllConnections();
2019 dcpConnMap_->shutdownAllConnections();
2023 ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
2026 const size_t nbytes,
2027 const size_t priv_nbytes,
2029 const rel_time_t exptime,
2032 if (priv_nbytes > maxItemPrivilegedBytes) {
2033 return ENGINE_E2BIG;
2036 if ((nbytes - priv_nbytes) > maxItemSize) {
2037 return ENGINE_E2BIG;
2040 if (!hasAvailableSpace(sizeof(Item) + sizeof(Blob) + key.size() + nbytes)) {
2041 return memoryCondition();
2044 time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
2046 uint8_t ext_meta[1];
2047 uint8_t ext_len = EXT_META_LEN;
2048 *(ext_meta) = datatype;
2049 *itm = new Item(key,
2060 return memoryCondition();
2062 stats.itemAllocSizeHisto.add(nbytes);
2063 return ENGINE_SUCCESS;
2067 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
2068 if (!deleteAllEnabled) {
2069 return ENGINE_ENOTSUP;
2072 if (!isDegradedMode()) {
2073 return ENGINE_TMPFAIL;
2077 * Supporting only a SYNC operation for bucket flush
2080 void* es = getEngineSpecific(cookie);
2082 // Check if diskDeleteAll was false and set it to true
2083 // if yes, if the atomic variable weren't false, then
2084 // we will assume that a deleteAll has been scheduled
2085 // already and return TMPFAIL.
2086 if (kvBucket->scheduleDeleteAllTask(cookie)) {
2087 storeEngineSpecific(cookie, this);
2088 return ENGINE_EWOULDBLOCK;
2090 LOG(EXTENSION_LOG_INFO,
2091 "Tried to trigger a bucket deleteAll, but"
2092 "there seems to be a task running already!");
2093 return ENGINE_TMPFAIL;
2097 storeEngineSpecific(cookie, NULL);
2098 LOG(EXTENSION_LOG_NOTICE, "Completed bucket deleteAll operation");
2099 return ENGINE_SUCCESS;
2103 cb::EngineErrorItemPair EventuallyPersistentEngine::get_and_touch(const void* cookie,
2107 auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2109 time_t expiry_time = exptime;
2111 auto* core = serverApi->core;
2112 expiry_time = core->abstime(core->realtime(exptime));
2114 GetValue gv(kvBucket->getAndUpdateTtl(key, vbucket, cookie, expiry_time));
2116 auto rv = gv.getStatus();
2117 if (rv == ENGINE_SUCCESS) {
2119 ++stats.numOpsStore;
2120 return std::make_pair(cb::engine_errc::success,
2121 cb::unique_item_ptr{gv.getValue(),
2122 cb::ItemDeleter{handle}});
2125 if (isDegradedMode()) {
2126 // Remap all some of the error codes
2128 case ENGINE_KEY_EEXISTS:
2129 case ENGINE_KEY_ENOENT:
2130 case ENGINE_NOT_MY_VBUCKET:
2131 rv = ENGINE_TMPFAIL;
2138 if (rv == ENGINE_KEY_EEXISTS) {
2142 return std::make_pair(cb::engine_errc(rv),
2143 cb::unique_item_ptr{nullptr,
2144 cb::ItemDeleter{handle}});
2147 cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
2150 std::function<bool(const item_info&)>filter) {
2152 auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2154 // Fetch an item from the hashtable (without trying to schedule a bg-fetch
2155 // and pass it through the filter. If the filter accepts the document
2156 // based on the metadata, return the document. If the document's data
2157 // isn't resident we run another iteration in the loop and retries the
2158 // action but this time we _do_ schedule a bg-fetch.
2159 for (int ii = 0; ii < 2; ++ii) {
2160 auto options = static_cast<get_options_t>(HONOR_STATES |
2165 if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
2166 options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
2169 BlockTimer timer(&stats.getCmdHisto);
2170 GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2171 ENGINE_ERROR_CODE status = gv.getStatus();
2174 case ENGINE_SUCCESS:
2177 case ENGINE_KEY_ENOENT: // FALLTHROUGH
2178 case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
2179 if (isDegradedMode()) {
2180 status = ENGINE_TMPFAIL;
2184 return std::make_pair(cb::engine_errc(status),
2185 cb::unique_item_ptr{nullptr,
2186 cb::ItemDeleter{handle}});
2189 auto* item = gv.getValue();
2190 cb::unique_item_ptr ret{item, cb::ItemDeleter{handle}};
2192 const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
2193 const uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
2196 if (filter(item->toItemInfo(vb_uuid))) {
2197 if (!gv.isPartial()) {
2198 return std::make_pair(cb::engine_errc::success,
2199 cb::unique_item_ptr{ret.release(),
2200 cb::ItemDeleter{handle}});
2202 // We want this item, but we need to fetch it off disk
2204 // the client don't care about this thing..
2206 return std::make_pair(cb::engine_errc::success,
2207 cb::unique_item_ptr{ret.release(),
2208 cb::ItemDeleter{handle}});
2212 // It should not be possible to get as the second iteration in the loop
2213 // SHOULD handle backround fetches an the item should NOT be partial!
2214 throw std::logic_error("EventuallyPersistentEngine::get_if: loop terminated");
2217 ENGINE_ERROR_CODE EventuallyPersistentEngine::get_locked(const void* cookie,
2221 uint32_t lock_timeout) {
2223 auto default_timeout = static_cast<uint32_t>(getGetlDefaultTimeout());
2225 if (lock_timeout == 0) {
2226 lock_timeout = default_timeout;
2227 } else if (lock_timeout > static_cast<uint32_t>(getGetlMaxTimeout())) {
2228 LOG(EXTENSION_LOG_WARNING,
2229 "EventuallyPersistentEngine::get_locked: "
2230 "Illegal value for lock timeout specified %u. "
2231 "Using default value: %u", lock_timeout, default_timeout);
2232 lock_timeout = default_timeout;
2235 auto result = kvBucket->getLocked(key, vbucket, ep_current_time(),
2236 lock_timeout, cookie);
2238 if (result.getStatus() == ENGINE_SUCCESS) {
2240 *itm = result.getValue();
2243 return result.getStatus();
2246 ENGINE_ERROR_CODE EventuallyPersistentEngine::unlock(const void* cookie,
2250 return kvBucket->unlockKey(key, vbucket, cas, ep_current_time());
2254 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2257 ENGINE_STORE_OPERATION
2259 BlockTimer timer(&stats.storeCmdHisto);
2260 ENGINE_ERROR_CODE ret;
2261 Item *it = static_cast<Item*>(itm);
2263 switch (operation) {
2265 if (it->getCas() == 0) {
2266 // Using a cas command with a cas wildcard doesn't make sense
2267 ret = ENGINE_NOT_STORED;
2272 if (isDegradedMode()) {
2273 return ENGINE_TMPFAIL;
2275 ret = kvBucket->set(*it, cookie);
2276 if (ret == ENGINE_SUCCESS) {
2277 *cas = it->getCas();
2283 if (isDegradedMode()) {
2284 return ENGINE_TMPFAIL;
2287 if (it->getCas() != 0) {
2288 // Adding an item with a cas value doesn't really make sense...
2289 return ENGINE_KEY_EEXISTS;
2292 ret = kvBucket->add(*it, cookie);
2293 if (ret == ENGINE_SUCCESS) {
2294 *cas = it->getCas();
2298 case OPERATION_REPLACE:
2299 ret = kvBucket->replace(*it, cookie);
2300 if (ret == ENGINE_SUCCESS) {
2301 *cas = it->getCas();
2305 ret = ENGINE_ENOTSUP;
2309 case ENGINE_SUCCESS:
2310 ++stats.numOpsStore;
2313 ret = memoryCondition();
2315 case ENGINE_NOT_STORED:
2316 case ENGINE_NOT_MY_VBUCKET:
2317 if (isDegradedMode()) {
2318 return ENGINE_TMPFAIL;
2328 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2348 if (connection->shouldFlush()) {
2352 if (connection->isTimeForNoop()) {
2353 LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2354 connection->logHeader());
2358 if (connection->isSuspended() || connection->windowIsFull()) {
2359 LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2360 " suspended state or its ack windows is full.\n",
2361 connection->logHeader());
2365 uint16_t ret = TAP_PAUSE;
2366 VBucketEvent ev = connection->nextVBucketHighPriority();
2367 if (ev.event != TAP_PAUSE) {
2369 case TAP_VBUCKET_SET:
2370 LOG(EXTENSION_LOG_NOTICE,
2371 "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2372 connection->logHeader(), ev.vbucket,
2373 VBucket::toString(ev.state));
2374 connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2377 LOG(EXTENSION_LOG_NOTICE,
2378 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2379 connection->logHeader(),
2380 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2382 connection->opaqueCommandCode = (uint32_t) ev.state;
2383 *vbucket = ev.vbucket;
2384 *es = &connection->opaqueCommandCode;
2385 *nes = sizeof(connection->opaqueCommandCode);
2389 throw std::logic_error("EventuallyPersistentEngine::doWalkTapQueue:"
2390 " Unknown VBucketEvent message type:" +
2391 std::to_string(ev.event) + " for connection:" +
2392 connection->logHeader());
2397 if (connection->waitForOpaqueMsgAck()) {
2401 VBucketFilter backFillVBFilter;
2402 if (connection->runBackfill(backFillVBFilter)) {
2403 queueBackfill(backFillVBFilter, connection);
2406 uint8_t nru = INITIAL_NRU_VALUE;
2407 Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2409 case TAP_CHECKPOINT_START:
2410 case TAP_CHECKPOINT_END:
2414 if (ret == TAP_MUTATION) {
2415 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2416 it->getRevSeqno(), nru);
2417 *es = connection->specificData;
2418 } else if (ret == TAP_DELETION) {
2419 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2421 *es = connection->specificData;
2422 } else if (ret == TAP_CHECKPOINT_START) {
2423 // Send the current value of the max deleted seqno
2424 VBucketPtr vb = getVBucket(*vbucket);
2429 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2430 vb->ht.getMaxDeletedRevSeqno());
2431 *es = connection->specificData;
2441 if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2442 VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2443 if (vbev.event == TAP_VBUCKET_SET) {
2444 LOG(EXTENSION_LOG_NOTICE,
2445 "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2446 connection->logHeader(), vbev.vbucket,
2447 VBucket::toString(vbev.state));
2448 connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2456 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2463 uint16_t *vbucket) {
2464 TapProducer *connection = getTapProducer(cookie);
2466 LOG(EXTENSION_LOG_WARNING,
2467 "Failed to lookup TAP connection.. Disconnecting\n");
2468 return TAP_DISCONNECT;
2471 connection->setPaused(false);
2476 connection->setLastWalkTime();
2478 ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2479 seqno, vbucket, connection, retry);
2482 if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2483 connection->lastMsgTime = ep_current_time();
2484 if (ret == TAP_NOOP) {
2487 ++stats.numTapFetched;
2488 *seqno = connection->getSeqno();
2489 if (connection->requestAck(ret, *vbucket)) {
2490 *flags = TAP_FLAG_ACK;
2491 connection->seqnoAckRequested = *seqno;
2494 if (ret == TAP_MUTATION) {
2495 if (connection->haveFlagByteorderSupport()) {
2496 *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2501 connection->setPaused(true);
2502 connection->setNotifySent(false);
2508 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2509 std::string &client,
2511 const void *userdata,
2513 if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2517 std::string tapName = "eq_tapq:";
2518 if (client.length() == 0) {
2519 tapName.assign(ConnHandler::getAnonName());
2521 tapName.append(client);
2524 // Decoding the userdata section of the packet and update the filters
2525 const char *ptr = static_cast<const char*>(userdata);
2526 uint64_t backfillAge = 0;
2527 std::vector<uint16_t> vbuckets;
2528 std::map<uint16_t, uint64_t> lastCheckpointIds;
2530 if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2531 if (nuserdata < sizeof(backfillAge)) {
2532 LOG(EXTENSION_LOG_WARNING,
2533 "Backfill age is missing. Reject connection request from %s\n",
2537 // use memcpy to avoid alignemt issues
2538 memcpy(&backfillAge, ptr, sizeof(backfillAge));
2539 backfillAge = ntohll(backfillAge);
2540 nuserdata -= sizeof(backfillAge);
2541 ptr += sizeof(backfillAge);
2544 if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2546 if (nuserdata < sizeof(nvbuckets)) {
2547 LOG(EXTENSION_LOG_WARNING,
2548 "Number of vbuckets is missing. Reject connection request from %s"
2549 "\n", tapName.c_str());
2552 memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2553 nuserdata -= sizeof(nvbuckets);
2554 ptr += sizeof(nvbuckets);
2555 nvbuckets = ntohs(nvbuckets);
2556 if (nvbuckets > 0) {
2557 if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2558 LOG(EXTENSION_LOG_WARNING,
2559 "# of vbuckets not matched. Reject connection request from %s"
2560 "\n", tapName.c_str());
2563 for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2565 memcpy(&val, ptr, sizeof(nvbuckets));
2566 ptr += sizeof(uint16_t);
2567 vbuckets.push_back(ntohs(val));
2569 nuserdata -= (sizeof(uint16_t) * nvbuckets);
2573 if (flags & TAP_CONNECT_CHECKPOINT) {
2574 uint16_t nCheckpoints = 0;
2575 if (nuserdata >= sizeof(nCheckpoints)) {
2576 memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2577 nuserdata -= sizeof(nCheckpoints);
2578 ptr += sizeof(nCheckpoints);
2579 nCheckpoints = ntohs(nCheckpoints);
2581 if (nCheckpoints > 0) {
2583 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2584 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2585 "Reject connection request from %s\n", tapName.c_str());
2588 for (uint16_t j = 0; j < nCheckpoints; ++j) {
2590 uint64_t checkpointId;
2591 memcpy(&vbid, ptr, sizeof(vbid));
2592 ptr += sizeof(uint16_t);
2593 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2594 ptr += sizeof(uint64_t);
2595 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2600 TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2603 configuration.getTapKeepalive()),
2607 tapConnMap->notifyPausedConnection(tp, true);
2611 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2612 void *engine_specific,
2629 void *specific = getEngineSpecific(cookie);
2630 ConnHandler *connection = NULL;
2631 if (specific == NULL) {
2632 if (tap_event == TAP_ACK) {
2633 LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2634 "exist. Force disconnect...\n", (char *) cookie);
2635 // tap producer is no longer connected..
2636 return ENGINE_DISCONNECT;
2638 connection = tapConnMap->newConsumer(cookie);
2639 if (connection == NULL) {
2640 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2641 " Force disconnect\n");
2642 return ENGINE_DISCONNECT;
2644 storeEngineSpecific(cookie, connection);
2647 connection = reinterpret_cast<ConnHandler *>(specific);
2651 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2653 if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2654 if (!replicationThrottle->shouldProcess()) {
2655 ++stats.replicationThrottled;
2656 if (connection->supportsAck()) {
2657 ret = ENGINE_TMPFAIL;
2659 ret = ENGINE_DISCONNECT;
2660 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2661 "ack support. Force disconnect...\n",
2662 connection->logHeader());
2668 switch (tap_event) {
2670 // TAP works only with the DefaultCollection
2671 ret = processTapAck(cookie, tap_seqno, tap_flags,
2672 DocKey(static_cast<const uint8_t*>(key), nkey,
2673 DocNamespace::DefaultCollection));
2676 ret = flush(cookie);
2677 LOG(EXTENSION_LOG_NOTICE, "%s Received flush.\n",
2678 connection->logHeader());
2683 TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2684 nengine, &revSeqno);
2686 // Create key in DefaultCollection since TAP won't support collections
2687 const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2688 DocNamespace::DefaultCollection};
2689 ret = connection->deletion(0, docKey, {}, 0,
2690 PROTOCOL_BINARY_RAW_BYTES, cas, vbucket,
2695 case TAP_CHECKPOINT_START:
2696 case TAP_CHECKPOINT_END:
2698 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2700 if (tap_event == TAP_CHECKPOINT_START &&
2701 nengine == TapEngineSpecific::sizeRevSeqno) {
2702 // Set the current value for the max deleted seqno
2703 VBucketPtr vb = getVBucket(vbucket);
2705 return ENGINE_TMPFAIL;
2708 TapEngineSpecific::readSpecificData(tap_event,
2712 vb->ht.setMaxDeletedRevSeqno(seqnum);
2716 uint64_t checkpointId;
2717 memcpy(&checkpointId, data, sizeof(checkpointId));
2718 checkpointId = ntohll(checkpointId);
2719 ConnHandlerCheckPoint(tc, tap_event, vbucket,
2723 ret = ENGINE_DISCONNECT;
2724 LOG(EXTENSION_LOG_WARNING,
2725 "%s Checkpoint Id is missing in "
2726 "CHECKPOINT messages. Force disconnect...\n",
2727 connection->logHeader());
2731 ret = ENGINE_DISCONNECT;
2732 LOG(EXTENSION_LOG_WARNING,
2733 "%s not a consumer! Force disconnect\n",
2734 connection->logHeader());
2742 uint8_t nru = INITIAL_NRU_VALUE;
2743 uint64_t revSeqno = 0;
2744 TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2745 nengine, &revSeqno, &nru);
2747 if (!isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_JSON)) {
2748 datatype = PROTOCOL_BINARY_RAW_BYTES;
2749 const unsigned char *dat = (const unsigned char*)data;
2750 const int datlen = ndata;
2751 if (checkUTF8JSON(dat, datlen)) {
2752 datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2755 // Create key in DefaultCollection since TAP won't support collections
2756 const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2757 DocNamespace::DefaultCollection};
2758 ret = connection->mutation(0, docKey,
2759 {static_cast<const uint8_t*>(data), ndata},
2761 vbucket, flags, 0, revSeqno, exptime, 0,
2768 if (nengine == sizeof(uint32_t)) {
2770 memcpy(&cc, engine_specific, sizeof(cc));
2774 case TAP_OPAQUE_ENABLE_AUTO_NACK:
2775 // @todo: the memcached core will _ALWAYS_ send nack
2776 // if it encounter an error. This should be
2777 // set as the default when we move to .next after 2.0
2778 // (currently we need to allow the message for
2779 // backwards compatibility)
2780 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2781 connection->logHeader());
2783 case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2784 connection->setSupportCheckpointSync(true);
2785 LOG(EXTENSION_LOG_INFO,
2786 "%s Enable checkpoint synchronization\n",
2787 connection->logHeader());
2789 case TAP_OPAQUE_OPEN_CHECKPOINT:
2791 * This event is only received by the TAP client that wants to
2792 * get mutations from closed checkpoints only. At this time,
2793 * only incremental backup client receives this event so that
2794 * it can close the connection and reconnect later.
2796 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2797 connection->logHeader());
2799 case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2801 LOG(EXTENSION_LOG_INFO,
2802 "%s Backfill started for vbucket %d.\n",
2803 connection->logHeader(), vbucket);
2804 BlockTimer timer(&stats.tapVbucketResetHisto);
2805 ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2807 if (ret == ENGINE_DISCONNECT) {
2808 LOG(EXTENSION_LOG_WARNING,
2809 "%s Failed to reset a vbucket %d. Force disconnect\n",
2810 connection->logHeader(), vbucket);
2812 LOG(EXTENSION_LOG_NOTICE,
2813 "%s Reset vbucket %d was completed succecssfully.\n",
2814 connection->logHeader(), vbucket);
2817 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2819 tc->setBackfillPhase(true, vbucket);
2821 ret = ENGINE_DISCONNECT;
2822 LOG(EXTENSION_LOG_WARNING,
2823 "TAP consumer doesn't exists. Force disconnect\n");
2827 case TAP_OPAQUE_CLOSE_BACKFILL:
2829 LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2830 connection->logHeader());
2831 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2833 tc->setBackfillPhase(false, vbucket);
2835 ret = ENGINE_DISCONNECT;
2836 LOG(EXTENSION_LOG_WARNING,
2837 "%s not a consumer! Force disconnect\n",
2838 connection->logHeader());
2842 case TAP_OPAQUE_CLOSE_TAP_STREAM:
2844 * This event is sent by the eVBucketMigrator to notify that
2845 * the source node closes the tap replication stream and
2846 * switches to TAKEOVER_VBUCKETS phase.
2847 * This is just an informative message and doesn't require any
2850 LOG(EXTENSION_LOG_INFO,
2851 "%s Received close tap stream. Switching to takeover phase.\n",
2852 connection->logHeader());
2854 case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2856 * This opaque message is just for notifying that the source
2857 * node receives change_vbucket_filter request and processes
2860 LOG(EXTENSION_LOG_INFO,
2861 "%s Notified that the source node changed a vbucket filter.\n",
2862 connection->logHeader());
2865 LOG(EXTENSION_LOG_WARNING,
2866 "%s Received an unknown opaque command\n",
2867 connection->logHeader());
2870 LOG(EXTENSION_LOG_WARNING,
2871 "%s Received tap opaque with unknown size %d\n",
2872 connection->logHeader(), nengine);
2876 case TAP_VBUCKET_SET:
2878 BlockTimer timer(&stats.tapVbucketSetHisto);
2880 if (nengine != sizeof(vbucket_state_t)) {
2882 LOG(EXTENSION_LOG_WARNING,
2883 "%s Received TAP_VBUCKET_SET with illegal size."
2884 " Force disconnect\n", connection->logHeader());
2885 ret = ENGINE_DISCONNECT;
2889 vbucket_state_t state;
2890 memcpy(&state, engine_specific, nengine);
2891 state = (vbucket_state_t)ntohl(state);
2893 ret = connection->setVBucketState(0, vbucket, state);
2899 LOG(EXTENSION_LOG_WARNING,
2900 "%s Recieved bad opcode, ignoring message\n",
2901 connection->logHeader());
2904 connection->processedEvent(tap_event, ret);
2908 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2909 TapConsumer *consumer,
2912 uint64_t checkpointId) {
2913 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2915 if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2916 getKVBucket()->wakeUpFlusher();
2917 ret = ENGINE_SUCCESS;
2920 ret = ENGINE_DISCONNECT;
2921 LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2922 "checkpoint %" PRIu64 ". Force disconnect\n",
2923 consumer->logHeader(), checkpointId);
2929 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2931 reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2932 if (!(rv && rv->isConnected())) {
2933 LOG(EXTENSION_LOG_WARNING,
2934 "Walking a non-existent tap queue, disconnecting\n");
2938 if (rv->doDisconnect()) {
2939 LOG(EXTENSION_LOG_WARNING,
2940 "%s Disconnecting pending connection\n", rv->logHeader());
2946 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2947 // Register the ON_DISCONNECT callback
2948 registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2949 // Register the ON_DELETE_BUCKET callback
2950 registerEngineCallback(ON_DELETE_BUCKET, EvpHandleDeleteBucket, this);
2953 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2958 TapProducer *connection = getTapProducer(cookie);
2960 LOG(EXTENSION_LOG_WARNING,
2961 "Unable to process tap ack. No producer found\n");
2962 return ENGINE_DISCONNECT;
2965 return connection->processAck(seqno, status, key);
2968 ENGINE_ERROR_CODE EventuallyPersistentEngine::memoryCondition() {
2969 // Do we think it's possible we could free something?
2970 bool haveEvidenceWeCanFreeMemory =
2971 (stats.getMaxDataSize() > stats.memOverhead->load());
2972 if (haveEvidenceWeCanFreeMemory) {
2973 // Look for more evidence by seeing if we have resident items.
2974 VBucketCountVisitor countVisitor(vbucket_state_active);
2975 kvBucket->visit(countVisitor);
2977 haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
2978 countVisitor.getNumItems();
2980 if (haveEvidenceWeCanFreeMemory) {
2981 ++stats.tmp_oom_errors;
2982 // Wake up the item pager task as memory usage
2983 // seems to have exceeded high water mark
2984 getKVBucket()->wakeUpItemPager();
2985 return ENGINE_TMPFAIL;
2988 return ENGINE_ENOMEM;
2992 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2996 auto bfv = std::make_unique<BackFillVisitor>(
2997 this, *tapConnMap, tc, backfillVBFilter);
2998 getKVBucket()->visit(std::move(bfv),
3000 TaskId::BackfillVisitorTask,
3004 void VBucketCountAggregator::visitBucket(VBucketPtr &vb) {
3005 std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3006 it = visitorMap.find(vb->getState());
3007 if ( it != visitorMap.end() ) {
3008 it->second->visitBucket(vb);
3012 void VBucketCountAggregator::addVisitor(VBucketCountVisitor* visitor) {
3013 visitorMap[visitor->getVBucketState()] = visitor;
3016 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3017 ADD_STAT add_stat) {
3019 configuration.addStats(add_stat, cookie);
3021 EPStats &epstats = getEpStats();
3022 add_casted_stat("ep_version", VERSION, add_stat, cookie);
3023 add_casted_stat("ep_storage_age",
3024 epstats.dirtyAge, add_stat, cookie);
3025 add_casted_stat("ep_storage_age_highwat",
3026 epstats.dirtyAgeHighWat, add_stat, cookie);
3027 add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3030 if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3031 add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3032 } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3033 add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3036 add_casted_stat("ep_total_enqueued",
3037 epstats.totalEnqueued, add_stat, cookie);
3038 add_casted_stat("ep_expired_access", epstats.expired_access,
3040 add_casted_stat("ep_expired_compactor", epstats.expired_compactor,
3042 add_casted_stat("ep_expired_pager", epstats.expired_pager,
3044 add_casted_stat("ep_queue_size",
3045 epstats.diskQueueSize, add_stat, cookie);
3046 add_casted_stat("ep_diskqueue_items",
3047 epstats.diskQueueSize, add_stat, cookie);
3048 add_casted_stat("ep_vb_backfill_queue_size",
3049 epstats.vbBackfillQueueSize,
3052 auto* flusher = kvBucket->getFlusher(EP_PRIMARY_SHARD);
3054 add_casted_stat("ep_commit_num", epstats.flusherCommits,
3056 add_casted_stat("ep_commit_time",
3057 epstats.commit_time, add_stat, cookie);
3058 add_casted_stat("ep_commit_time_total",
3059 epstats.cumulativeCommitTime, add_stat, cookie);
3060 add_casted_stat("ep_item_begin_failed",
3061 epstats.beginFailed, add_stat, cookie);
3062 add_casted_stat("ep_item_commit_failed",
3063 epstats.commitFailed, add_stat, cookie);
3064 add_casted_stat("ep_item_flush_expired",
3065 epstats.flushExpired, add_stat, cookie);
3066 add_casted_stat("ep_item_flush_failed",
3067 epstats.flushFailed, add_stat, cookie);
3068 add_casted_stat("ep_flusher_state",
3069 flusher->stateName(), add_stat, cookie);
3070 add_casted_stat("ep_flusher_todo",
3071 epstats.flusher_todo, add_stat, cookie);
3072 add_casted_stat("ep_total_persisted",
3073 epstats.totalPersisted, add_stat, cookie);
3074 add_casted_stat("ep_uncommitted_items",
3075 epstats.flusher_todo, add_stat, cookie);
3076 add_casted_stat("ep_chk_persistence_timeout",
3077 VBucket::getCheckpointFlushTimeout(),
3081 add_casted_stat("ep_vbucket_del",
3082 epstats.vbucketDeletions, add_stat, cookie);
3083 add_casted_stat("ep_vbucket_del_fail",
3084 epstats.vbucketDeletionFail, add_stat, cookie);
3085 add_casted_stat("ep_flush_duration_total",
3086 epstats.cumulativeFlushTime, add_stat, cookie);
3088 kvBucket->getAggregatedVBucketStats(cookie, add_stat);
3090 kvBucket->getFileStats(cookie, add_stat);
3092 add_casted_stat("ep_persist_vbstate_total",
3093 epstats.totalPersistVBState, add_stat, cookie);
3095 size_t memUsed = stats.getTotalMemoryUsed();
3096 add_casted_stat("mem_used", memUsed, add_stat, cookie);
3097 add_casted_stat("ep_mem_low_wat_percent", stats.mem_low_wat_percent,
3099 add_casted_stat("ep_mem_high_wat_percent", stats.mem_high_wat_percent,
3101 add_casted_stat("bytes", memUsed, add_stat, cookie);
3102 add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3103 add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3104 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3105 add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3107 add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3109 add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3110 add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3112 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3113 add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3115 add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3117 add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3118 add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3119 add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3121 add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3122 add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3124 add_casted_stat("ep_mem_tracker_enabled", stats.memoryTrackerEnabled,
3126 add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3128 add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3130 add_casted_stat("ep_bg_remaining_items", epstats.numRemainingBgItems,
3132 add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3134 add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3136 add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3138 add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3140 add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3142 add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3144 add_casted_stat("ep_items_rm_from_checkpoints",
3145 epstats.itemsRemovedFromCheckpoints,
3147 add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3149 add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3151 add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3154 add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3155 add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3157 add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3159 add_casted_stat("ep_pending_ops_max_duration",
3160 epstats.pendingOpsMaxDuration,
3163 add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3165 add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3168 size_t vbDeletions = epstats.vbucketDeletions.load();
3169 if (vbDeletions > 0) {
3170 add_casted_stat("ep_vbucket_del_max_walltime",
3171 epstats.vbucketDelMaxWalltime,
3173 add_casted_stat("ep_vbucket_del_avg_walltime",
3174 epstats.vbucketDelTotWalltime / vbDeletions,
3178 size_t numBgOps = epstats.bgNumOperations.load();
3180 add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3182 add_casted_stat("ep_bg_min_wait",
3185 add_casted_stat("ep_bg_max_wait",
3188 add_casted_stat("ep_bg_wait_avg",
3189 epstats.bgWait / numBgOps,
3191 add_casted_stat("ep_bg_min_load",
3194 add_casted_stat("ep_bg_max_load",
3197 add_casted_stat("ep_bg_load_avg",
3198 epstats.bgLoad / numBgOps,
3200 add_casted_stat("ep_bg_wait",
3203 add_casted_stat("ep_bg_load",
3208 add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3210 add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3212 add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3214 add_casted_stat("ep_num_access_scanner_skips",
3215 epstats.accessScannerSkips, add_stat, cookie);
3216 add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3218 add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3221 if (kvBucket->isAccessScannerEnabled() && epstats.alogTime.load() != 0)
3225 hrtime_t alogTime = epstats.alogTime.load();
3226 if (cb_gmtime_r((time_t *)&alogTime, &alogTim) == -1) {
3227 add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3230 strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3231 add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3235 add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3239 if (kvBucket->isExpPagerEnabled()) {
3241 struct tm expPagerTim;
3242 hrtime_t expPagerTime = epstats.expPagerTime.load();
3243 if (cb_gmtime_r((time_t *)&expPagerTime, &expPagerTim) == -1) {
3244 add_casted_stat("ep_expiry_pager_task_time", "UNKNOWN", add_stat,
3247 strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &expPagerTim);
3248 add_casted_stat("ep_expiry_pager_task_time", timestr, add_stat,
3252 add_casted_stat("ep_expiry_pager_task_time", "NOT_SCHEDULED",
3256 add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
3258 if (getConfiguration().isWarmup()) {
3259 Warmup *wp = kvBucket->getWarmup();
3260 if (wp == nullptr) {
3261 throw std::logic_error("EPEngine::doEngineStats: warmup is NULL");
3263 if (!kvBucket->isWarmingUp()) {
3264 add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
3266 add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);