1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2017 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #include "ep_engine.h"
25 #include "dcp/consumer.h"
26 #include "dcp/dcpconnmap.h"
27 #include "dcp/flow-control-manager.h"
28 #include "dcp/producer.h"
29 #include "ep_bucket.h"
31 #include "ephemeral_bucket.h"
32 #include "failover-table.h"
34 #include "htresizer.h"
36 #include "memory_tracker.h"
37 #include "replicationthrottle.h"
38 #include "stats-info.h"
39 #define STATWRITER_NAMESPACE core_engine
40 #include "statwriter.h"
41 #undef STATWRITER_NAMESPACE
42 #include "string_utils.h"
43 #include "tapconnmap.h"
44 #include "vb_count_visitor.h"
47 #include <JSON_checker.h>
48 #include <cJSON_utils.h>
49 #include <memcached/engine.h>
50 #include <memcached/extension.h>
51 #include <memcached/protocol_binary.h>
52 #include <memcached/server_api.h>
53 #include <memcached/util.h>
54 #include <platform/cb_malloc.h>
55 #include <platform/checked_snprintf.h>
56 #include <platform/make_unique.h>
57 #include <platform/platform.h>
58 #include <platform/processclock.h>
59 #include <xattr/utils.h>
72 static size_t percentOf(size_t val, double percent) {
73 return static_cast<size_t>(static_cast<double>(val) * percent);
76 struct EPHandleReleaser {
77 void operator()(EventuallyPersistentEngine*) {
78 ObjectRegistry::onSwitchThread(nullptr);
82 using EPHandle = std::unique_ptr<EventuallyPersistentEngine, EPHandleReleaser>;
85 * Helper function to acquire a handle to the engine which allows access to
86 * the engine while the handle is in scope.
87 * @param handle pointer to the engine
88 * @return EPHandle which is a unique_ptr to an EventuallyPersistentEngine
89 * with a custom deleter (EPHandleReleaser) which performs the required
90 * ObjectRegistry release.
93 static inline EPHandle acquireEngine(ENGINE_HANDLE* handle) {
94 auto ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
95 ObjectRegistry::onSwitchThread(ret);
101 * Call the response callback and return the appropriate value so that
102 * the core knows what to do..
104 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
106 const void *ext, uint8_t extlen,
107 const void *body, uint32_t bodylen,
108 uint8_t datatype, uint16_t status,
109 uint64_t cas, const void *cookie)
111 ENGINE_ERROR_CODE rv = ENGINE_FAILED;
112 EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
113 if (response(key, keylen, ext, extlen, body, bodylen, datatype,
114 status, cas, cookie)) {
117 ObjectRegistry::onSwitchThread(e);
121 template <typename T>
122 static void validate(T v, T l, T h) {
123 if (v < l || v > h) {
124 throw std::runtime_error("Value out of range.");
129 static void checkNumeric(const char* str) {
134 for (; str[i]; i++) {
136 if (!isdigit(str[i])) {
137 throw std::runtime_error("Value is not numeric");
142 static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle) {
143 return acquireEngine(handle)->getInfo();
146 static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
147 const char* config_str) {
148 return acquireEngine(handle)->initialize(config_str);
151 static void EvpDestroy(ENGINE_HANDLE* handle, const bool force) {
152 auto eng = acquireEngine(handle);
157 static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
163 const rel_time_t exptime,
166 if (!mcbp::datatype::is_valid(datatype)) {
167 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
169 return ENGINE_EINVAL;
172 return acquireEngine(handle)->itemAllocate(itm,
175 0, // No privileged bytes
182 static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
183 const item* itm, item_info *itm_info);
184 static void EvpItemRelease(ENGINE_HANDLE* handle, const void *cookie,
188 static std::pair<cb::unique_item_ptr, item_info> EvpItemAllocateEx(ENGINE_HANDLE* handle,
192 const size_t priv_nbytes,
194 const rel_time_t exptime,
199 auto err = acquireEngine(handle)->itemAllocate(
200 &it, key, nbytes, priv_nbytes, flags, exptime, datatype, vbucket);
202 if (err != ENGINE_SUCCESS) {
203 throw cb::engine_error(cb::engine_errc(err),
204 "EvpItemAllocateEx: failed to allocate memory");
208 if (!EvpGetItemInfo(handle, cookie, it, &info)) {
209 EvpItemRelease(handle, cookie, it);
210 throw cb::engine_error(cb::engine_errc::failed,
211 "EvpItemAllocateEx: EvpGetItemInfo failed");
214 return std::make_pair(cb::unique_item_ptr{it, cb::ItemDeleter{handle}},
218 static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
223 mutation_descr_t* mut_info) {
225 LOG(EXTENSION_LOG_WARNING,
226 "EvpItemDelete(): cas ptr passed is null for vb: %" PRIu16,
228 return ENGINE_EINVAL;
230 return acquireEngine(handle)->itemDelete(
231 cookie, key, *cas, vbucket, nullptr, mut_info);
234 static void EvpItemRelease(ENGINE_HANDLE* handle,
237 acquireEngine(handle)->itemRelease(cookie, itm);
240 static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
245 DocStateFilter documentStateFilter) {
246 get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
253 switch (documentStateFilter) {
254 case DocStateFilter::Alive:
256 case DocStateFilter::Deleted:
257 // MB-23640 was caused by this bug as the frontend asked for
258 // Alive and Deleted documents. The internals don't have a
259 // way of requesting just deleted documents, and luckily for
260 // us no part of our code is using this yet. Return an error
261 // if anyone start using it
262 return ENGINE_ENOTSUP;
263 case DocStateFilter::AliveOrDeleted:
264 options = static_cast<get_options_t>(options | GET_DELETED_VALUE);
268 return acquireEngine(handle)->get(cookie, itm, key, vbucket, options);
271 static cb::EngineErrorItemPair EvpGetIf(ENGINE_HANDLE* handle,
276 const item_info&)> filter) {
277 return acquireEngine(handle)->get_if(cookie, key, vbucket, filter);
280 static cb::EngineErrorItemPair EvpGetAndTouch(ENGINE_HANDLE* handle,
284 uint32_t expiry_time) {
285 return acquireEngine(handle)->get_and_touch(cookie, key, vbucket,
289 static ENGINE_ERROR_CODE EvpGetLocked(ENGINE_HANDLE* handle,
294 uint32_t lock_timeout) {
295 return acquireEngine(handle)->get_locked(
296 cookie, itm, key, vbucket, lock_timeout);
299 static ENGINE_ERROR_CODE EvpUnlock(ENGINE_HANDLE* handle,
304 return acquireEngine(handle)->unlock(cookie, key, vbucket, cas);
307 static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
309 const char* stat_key,
312 return acquireEngine(handle)->getStats(cookie, stat_key, nkey, add_stat);
315 static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
319 ENGINE_STORE_OPERATION operation,
320 DocumentState document_state) {
321 auto engine = acquireEngine(handle);
323 if (document_state == DocumentState::Deleted) {
324 Item* item = static_cast<Item*>(itm);
328 return engine->store(cookie, itm, cas, operation);
331 static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
332 const void* cookie) {
333 return acquireEngine(handle)->flush(cookie);
336 static void EvpResetStats(ENGINE_HANDLE* handle, const void*) {
337 acquireEngine(handle)->resetStats();
340 protocol_binary_response_status EventuallyPersistentEngine::setTapParam(
341 const char* keyz, const char* valz, std::string& msg) {
342 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
345 if (strcmp(keyz, "tap_keepalive") == 0) {
346 int v = std::stoi(valz);
347 validate(v, 0, MAX_TAP_KEEP_ALIVE);
348 getConfiguration().requirementsMetOrThrow("tap_keepalive");
349 setTapKeepAlive(static_cast<uint32_t>(v));
350 } else if (strcmp(keyz, "replication_throttle_threshold") == 0) {
351 getConfiguration().setReplicationThrottleThreshold(
353 } else if (strcmp(keyz, "replication_throttle_queue_cap") == 0) {
354 getConfiguration().setReplicationThrottleQueueCap(std::stoll(valz));
355 } else if (strcmp(keyz, "replication_throttle_cap_pcnt") == 0) {
356 getConfiguration().setReplicationThrottleCapPcnt(std::stoull(valz));
358 msg = "Unknown config param";
359 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
361 // Handles exceptions thrown by the standard
362 // library stoi/stoul style functions when not numeric
363 } catch (std::invalid_argument&) {
364 msg = "Argument was not numeric";
365 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
367 // Handles exceptions thrown by the standard library stoi/stoul
368 // style functions when the conversion does not fit in the datatype
369 } catch (std::out_of_range&) {
370 msg = "Argument was out of range";
371 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
373 // Handles any miscellaenous exceptions in addition to the range_error
374 // exceptions thrown by the configuration::set<param>() methods
375 } catch (std::exception& error) {
377 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
383 protocol_binary_response_status EventuallyPersistentEngine::setCheckpointParam(
384 const char* keyz, const char* valz, std::string& msg) {
385 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
388 if (strcmp(keyz, "chk_max_items") == 0) {
389 size_t v = std::stoull(valz);
390 validate(v, size_t(MIN_CHECKPOINT_ITEMS),
391 size_t(MAX_CHECKPOINT_ITEMS));
392 getConfiguration().setChkMaxItems(v);
393 } else if (strcmp(keyz, "chk_period") == 0) {
394 size_t v = std::stoull(valz);
395 validate(v, size_t(MIN_CHECKPOINT_PERIOD),
396 size_t(MAX_CHECKPOINT_PERIOD));
397 getConfiguration().setChkPeriod(v);
398 } else if (strcmp(keyz, "max_checkpoints") == 0) {
399 size_t v = std::stoull(valz);
400 validate(v, size_t(DEFAULT_MAX_CHECKPOINTS),
401 size_t(MAX_CHECKPOINTS_UPPER_BOUND));
402 getConfiguration().setMaxCheckpoints(v);
403 } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
404 getConfiguration().setItemNumBasedNewChk(cb_stob(valz));
405 } else if (strcmp(keyz, "keep_closed_chks") == 0) {
406 getConfiguration().setKeepClosedChks(cb_stob(valz));
407 } else if (strcmp(keyz, "enable_chk_merge") == 0) {
408 getConfiguration().setEnableChkMerge(cb_stob(valz));
410 msg = "Unknown config param";
411 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
414 // Handles exceptions thrown by the cb_stob function
415 } catch (invalid_argument_bool& error) {
417 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
419 // Handles exceptions thrown by the standard
420 // library stoi/stoul style functions when not numeric
421 } catch (std::invalid_argument&) {
422 msg = "Argument was not numeric";
423 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
425 // Handles exceptions thrown by the standard library stoi/stoul
426 // style functions when the conversion does not fit in the datatype
427 } catch (std::out_of_range&) {
428 msg = "Argument was out of range";
429 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
431 // Handles any miscellaenous exceptions in addition to the range_error
432 // exceptions thrown by the configuration::set<param>() methods
433 } catch (std::exception& error) {
435 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
441 protocol_binary_response_status EventuallyPersistentEngine::setFlushParam(
442 const char* keyz, const char* valz, std::string& msg) {
443 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
445 // Handle the actual mutation.
447 if (strcmp(keyz, "bg_fetch_delay") == 0) {
448 getConfiguration().setBgFetchDelay(std::stoull(valz));
449 } else if (strcmp(keyz, "flushall_enabled") == 0) {
450 getConfiguration().setFlushallEnabled(cb_stob(valz));
451 } else if (strcmp(keyz, "max_size") == 0) {
452 size_t vsize = std::stoull(valz);
454 getConfiguration().setMaxSize(vsize);
455 EPStats& st = getEpStats();
456 getConfiguration().setMemLowWat(
457 percentOf(vsize, st.mem_low_wat_percent));
458 getConfiguration().setMemHighWat(
459 percentOf(vsize, st.mem_high_wat_percent));
460 } else if (strcmp(keyz, "mem_low_wat") == 0) {
461 getConfiguration().setMemLowWat(std::stoull(valz));
462 } else if (strcmp(keyz, "mem_high_wat") == 0) {
463 getConfiguration().setMemHighWat(std::stoull(valz));
464 } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
465 getConfiguration().setBackfillMemThreshold(std::stoull(valz));
466 } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
467 getConfiguration().setCompactionExpMemThreshold(std::stoull(valz));
468 } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
469 getConfiguration().setMutationMemThreshold(std::stoull(valz));
470 } else if (strcmp(keyz, "timing_log") == 0) {
471 EPStats& stats = getEpStats();
472 std::ostream* old = stats.timingLog;
473 stats.timingLog = NULL;
475 if (strcmp(valz, "off") == 0) {
476 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
478 std::ofstream* tmp(new std::ofstream(valz));
480 LOG(EXTENSION_LOG_INFO,
481 "Logging detailed timings to ``%s''.", valz);
482 stats.timingLog = tmp;
484 LOG(EXTENSION_LOG_WARNING,
485 "Error setting detailed timing log to ``%s'': %s",
486 valz, strerror(errno));
490 } else if (strcmp(keyz, "exp_pager_enabled") == 0) {
491 getConfiguration().setExpPagerEnabled(cb_stob(valz));
492 } else if (strcmp(keyz, "exp_pager_stime") == 0) {
493 getConfiguration().setExpPagerStime(std::stoull(valz));
494 } else if (strcmp(keyz, "exp_pager_initial_run_time") == 0) {
495 getConfiguration().setExpPagerInitialRunTime(std::stoll(valz));
496 } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
497 getConfiguration().requirementsMetOrThrow("access_scanner_enabled");
498 getConfiguration().setAccessScannerEnabled(cb_stob(valz));
499 } else if (strcmp(keyz, "alog_sleep_time") == 0) {
500 getConfiguration().requirementsMetOrThrow("alog_sleep_time");
501 getConfiguration().setAlogSleepTime(std::stoull(valz));
502 } else if (strcmp(keyz, "alog_task_time") == 0) {
503 getConfiguration().requirementsMetOrThrow("alog_task_time");
504 getConfiguration().setAlogTaskTime(std::stoull(valz));
505 } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
506 getConfiguration().setPagerActiveVbPcnt(std::stoull(valz));
507 } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
508 getConfiguration().setWarmupMinMemoryThreshold(std::stoull(valz));
509 } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
510 getConfiguration().setWarmupMinItemsThreshold(std::stoull(valz));
511 } else if (strcmp(keyz, "max_num_readers") == 0 ||
512 strcmp(keyz, "num_reader_threads") == 0) {
513 size_t value = std::stoull(valz);
514 getConfiguration().setNumReaderThreads(value);
515 ExecutorPool::get()->setNumReaders(value);
516 } else if (strcmp(keyz, "max_num_writers") == 0 ||
517 strcmp(keyz, "num_writer_threads") == 0) {
518 size_t value = std::stoull(valz);
519 getConfiguration().setNumWriterThreads(value);
520 ExecutorPool::get()->setNumWriters(value);
521 } else if (strcmp(keyz, "max_num_auxio") == 0 ||
522 strcmp(keyz, "num_auxio_threads") == 0) {
523 size_t value = std::stoull(valz);
524 getConfiguration().setNumAuxioThreads(value);
525 ExecutorPool::get()->setNumAuxIO(value);
526 } else if (strcmp(keyz, "max_num_nonio") == 0 ||
527 strcmp(keyz, "num_nonio_threads") == 0) {
528 size_t value = std::stoull(valz);
529 getConfiguration().setNumNonioThreads(value);
530 ExecutorPool::get()->setNumNonIO(value);
531 } else if (strcmp(keyz, "bfilter_enabled") == 0) {
532 getConfiguration().setBfilterEnabled(cb_stob(valz));
533 } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
534 getConfiguration().setBfilterResidencyThreshold(std::stof(valz));
535 } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
536 getConfiguration().setDefragmenterEnabled(cb_stob(valz));
537 } else if (strcmp(keyz, "defragmenter_interval") == 0) {
538 size_t v = std::stoull(valz);
539 // Adding separate validation as external limit is minimum 1
540 // to prevent setting defragmenter to constantly run
541 validate(v, size_t(1), std::numeric_limits<size_t>::max());
542 getConfiguration().setDefragmenterInterval(v);
543 } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
544 getConfiguration().setDefragmenterAgeThreshold(std::stoull(valz));
545 } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
546 getConfiguration().setDefragmenterChunkDuration(std::stoull(valz));
547 } else if (strcmp(keyz, "defragmenter_run") == 0) {
548 runDefragmenterTask();
549 } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
550 getConfiguration().setCompactionWriteQueueCap(std::stoull(valz));
551 } else if (strcmp(keyz, "dcp_min_compression_ratio") == 0) {
552 getConfiguration().setDcpMinCompressionRatio(std::stof(valz));
553 } else if (strcmp(keyz, "access_scanner_run") == 0) {
554 if (!(runAccessScannerTask())) {
555 rv = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
557 } else if (strcmp(keyz, "vb_state_persist_run") == 0) {
558 runVbStatePersistTask(std::stoi(valz));
559 } else if (strcmp(keyz, "ephemeral_full_policy") == 0) {
560 getConfiguration().requirementsMetOrThrow("ephemeral_full_policy");
561 getConfiguration().setEphemeralFullPolicy(valz);
562 } else if (strcmp(keyz, "ephemeral_metadata_purge_age") == 0) {
563 getConfiguration().requirementsMetOrThrow(
564 "ephemeral_metadata_purge_age");
565 getConfiguration().setEphemeralMetadataPurgeAge(std::stoull(valz));
566 } else if (strcmp(keyz, "ephemeral_metadata_purge_interval") == 0) {
567 getConfiguration().requirementsMetOrThrow("ephemeral_metadata_purge_interval");
568 getConfiguration().setEphemeralMetadataPurgeInterval(
570 } else if (strcmp(keyz, "mem_merge_count_threshold") == 0) {
571 getConfiguration().setMemMergeCountThreshold(std::stoul(valz));
572 } else if (strcmp(keyz, "mem_merge_bytes_threshold") == 0) {
573 getConfiguration().setMemMergeBytesThreshold(std::stoul(valz));
575 msg = "Unknown config param";
576 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
578 // Handles exceptions thrown by the cb_stob function
579 } catch (invalid_argument_bool& error) {
581 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
583 // Handles exceptions thrown by the standard
584 // library stoi/stoul style functions when not numeric
585 } catch (std::invalid_argument&) {
586 msg = "Argument was not numeric";
587 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
589 // Handles exceptions thrown by the standard library stoi/stoul
590 // style functions when the conversion does not fit in the datatype
591 } catch (std::out_of_range&) {
592 msg = "Argument was out of range";
593 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
595 // Handles any miscellaneous exceptions in addition to the range_error
596 // exceptions thrown by the configuration::set<param>() methods
597 } catch (std::exception& error) {
599 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
605 protocol_binary_response_status EventuallyPersistentEngine::setDcpParam(
606 const char* keyz, const char* valz, std::string& msg) {
607 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
611 "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
612 size_t v = atoi(valz);
614 validate(v, size_t(1), std::numeric_limits<size_t>::max());
615 getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(
618 strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") ==
620 size_t v = atoi(valz);
622 validate(v, size_t(1), std::numeric_limits<size_t>::max());
623 getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(
626 msg = "Unknown config param";
627 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
629 } catch (std::runtime_error& ex) {
630 msg = "Value out of range.";
631 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
637 protocol_binary_response_status EventuallyPersistentEngine::setVbucketParam(
642 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
644 if (strcmp(keyz, "hlc_drift_ahead_threshold_us") == 0) {
645 uint64_t v = std::strtoull(valz, nullptr, 10);
647 getConfiguration().setHlcDriftAheadThresholdUs(v);
648 } else if (strcmp(keyz, "hlc_drift_behind_threshold_us") == 0) {
649 uint64_t v = std::strtoull(valz, nullptr, 10);
651 getConfiguration().setHlcDriftBehindThresholdUs(v);
652 } else if (strcmp(keyz, "max_cas") == 0) {
653 uint64_t v = std::strtoull(valz, nullptr, 10);
655 LOG(EXTENSION_LOG_WARNING, "setVbucketParam: max_cas:%" PRIu64 " "
656 "vb:%" PRIu16 "\n", v, vbucket);
657 if (getKVBucket()->forceMaxCas(vbucket, v) != ENGINE_SUCCESS) {
658 rv = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
659 msg = "Not my vbucket";
662 msg = "Unknown config param";
663 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
665 } catch (std::runtime_error& ex) {
666 msg = "Value out of range.";
667 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
672 static protocol_binary_response_status evictKey(
673 EventuallyPersistentEngine* e,
674 protocol_binary_request_header
678 DocNamespace docNamespace) {
679 protocol_binary_request_no_extras* req =
680 (protocol_binary_request_no_extras*)request;
682 const uint8_t* keyPtr = reinterpret_cast<const uint8_t*>(request) +
684 size_t keylen = ntohs(req->message.header.request.keylen);
685 uint16_t vbucket = ntohs(request->request.vbucket);
687 LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key{%.*s}\n",
688 int(keylen), keyPtr);
690 auto rv = e->evictKey(DocKey(keyPtr, keylen, docNamespace), vbucket, msg);
691 if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
692 rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
693 if (e->isDegradedMode()) {
694 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
700 protocol_binary_response_status EventuallyPersistentEngine::setParam(
701 protocol_binary_request_set_param* req, std::string& msg) {
702 size_t keylen = ntohs(req->message.header.request.keylen);
703 uint8_t extlen = req->message.header.request.extlen;
704 size_t vallen = ntohl(req->message.header.request.bodylen);
705 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
706 protocol_binary_engine_param_t paramtype =
707 static_cast<protocol_binary_engine_param_t>(ntohl(
708 req->message.body.param_type));
710 if (keylen == 0 || (vallen - keylen - extlen) == 0) {
711 return PROTOCOL_BINARY_RESPONSE_EINVAL;
714 const char* keyp = reinterpret_cast<const char*>(req->bytes)
715 + sizeof(req->bytes);
716 const char* valuep = keyp + keylen;
717 vallen -= (keylen + extlen);
723 if (keylen >= sizeof(keyz)) {
724 msg = "Key is too large.";
725 return PROTOCOL_BINARY_RESPONSE_EINVAL;
727 memcpy(keyz, keyp, keylen);
731 if (vallen >= sizeof(valz)) {
732 msg = "Value is too large.";
733 return PROTOCOL_BINARY_RESPONSE_EINVAL;
735 memcpy(valz, valuep, vallen);
738 protocol_binary_response_status rv;
741 case protocol_binary_engine_param_flush:
742 rv = setFlushParam(keyz, valz, msg);
744 case protocol_binary_engine_param_tap:
745 rv = setTapParam(keyz, valz, msg);
747 case protocol_binary_engine_param_checkpoint:
748 rv = setCheckpointParam(keyz, valz, msg);
750 case protocol_binary_engine_param_dcp:
751 rv = setDcpParam(keyz, valz, msg);
753 case protocol_binary_engine_param_vbucket:
754 rv = setVbucketParam(vbucket, keyz, valz, msg);
757 rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
763 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine* e,
765 protocol_binary_request_header* request,
766 ADD_RESPONSE response) {
767 protocol_binary_request_get_vbucket* req =
768 reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
769 if (req == nullptr) {
770 throw std::invalid_argument("getVBucket: Unable to convert req"
771 " to protocol_binary_request_get_vbucket");
774 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
775 VBucketPtr vb = e->getVBucket(vbucket);
777 return e->sendNotMyVBucketResponse(response, cookie, 0);
779 vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
780 return sendResponse(response, NULL, 0, NULL, 0, &state,
782 PROTOCOL_BINARY_RAW_BYTES,
783 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
787 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine* e,
789 protocol_binary_request_header* request,
790 ADD_RESPONSE response) {
792 protocol_binary_request_set_vbucket* req =
793 reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
795 uint64_t cas = ntohll(req->message.header.request.cas);
797 size_t bodylen = ntohl(req->message.header.request.bodylen)
798 - ntohs(req->message.header.request.keylen);
799 if (bodylen != sizeof(vbucket_state_t)) {
800 const std::string msg("Incorrect packet format");
801 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
802 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
803 PROTOCOL_BINARY_RESPONSE_EINVAL,
807 vbucket_state_t state;
808 memcpy(&state, &req->message.body.state, sizeof(state));
809 state = static_cast<vbucket_state_t>(ntohl(state));
811 if (!is_valid_vbucket_state_t(state)) {
812 const std::string msg("Invalid vbucket state");
813 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
814 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
815 PROTOCOL_BINARY_RESPONSE_EINVAL,
819 uint16_t vb = ntohs(req->message.header.request.vbucket);
820 if (e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
821 const std::string msg("VBucket number too big");
822 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
823 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
824 PROTOCOL_BINARY_RESPONSE_ERANGE,
827 return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
828 PROTOCOL_BINARY_RAW_BYTES,
829 PROTOCOL_BINARY_RESPONSE_SUCCESS,
833 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine* e,
835 protocol_binary_request_header* req,
836 ADD_RESPONSE response) {
838 uint64_t cas = ntohll(req->request.cas);
840 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
841 uint16_t vbucket = ntohs(req->request.vbucket);
843 std::string msg = "";
844 if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
845 msg = "Incorrect packet format.";
846 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
848 PROTOCOL_BINARY_RAW_BYTES,
849 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
853 uint32_t bodylen = ntohl(req->request.bodylen);
855 const char* ptr = reinterpret_cast<const char*>(req->bytes) +
857 if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
862 ENGINE_ERROR_CODE err;
863 void* es = e->getEngineSpecific(cookie);
866 err = e->deleteVBucket(vbucket, cookie);
867 e->storeEngineSpecific(cookie, e);
869 e->storeEngineSpecific(cookie, NULL);
870 LOG(EXTENSION_LOG_INFO,
871 "Completed sync deletion of vbucket %u",
873 err = ENGINE_SUCCESS;
876 err = e->deleteVBucket(vbucket);
880 LOG(EXTENSION_LOG_NOTICE,
881 "Deletion of vbucket %d was completed.", vbucket);
883 case ENGINE_NOT_MY_VBUCKET:
884 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
885 "because the vbucket doesn't exist!!!", vbucket);
886 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
889 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
890 "because the vbucket is not in a dead state\n", vbucket);
891 msg = "Failed to delete vbucket. Must be in the dead state.";
892 res = PROTOCOL_BINARY_RESPONSE_EINVAL;
894 case ENGINE_EWOULDBLOCK:
895 LOG(EXTENSION_LOG_NOTICE, "Request for vbucket %d deletion is in"
896 " EWOULDBLOCK until the database file is removed from disk",
898 e->storeEngineSpecific(cookie, req);
899 return ENGINE_EWOULDBLOCK;
901 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
902 "because of unknown reasons\n", vbucket);
903 msg = "Failed to delete vbucket. Unknown reason.";
904 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
907 if (err != ENGINE_NOT_MY_VBUCKET) {
908 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
909 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
912 return e->sendNotMyVBucketResponse(response, cookie, cas);
916 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine* e,
917 protocol_binary_request_header* request,
921 protocol_binary_response_status* res,
922 DocNamespace docNamespace) {
923 KVBucketIface* kvb = e->getKVBucket();
924 protocol_binary_request_no_extras* req =
925 (protocol_binary_request_no_extras*)request;
926 int keylen = ntohs(req->message.header.request.keylen);
927 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
928 ENGINE_ERROR_CODE error_code;
929 DocKey key(reinterpret_cast<const uint8_t*>(request) + sizeof(*request),
930 keylen, docNamespace);
932 GetValue rv(kvb->getReplica(key, vbucket, cookie));
934 if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
935 if (error_code == ENGINE_NOT_MY_VBUCKET) {
936 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
938 } else if (error_code == ENGINE_TMPFAIL) {
940 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
946 *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
948 ++(e->getEpStats().numOpsGet);
949 return ENGINE_SUCCESS;
952 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine* e,
954 protocol_binary_request_compact_db* req,
955 ADD_RESPONSE response) {
957 std::string msg = "";
958 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
959 compaction_ctx compactreq;
960 uint64_t cas = ntohll(req->message.header.request.cas);
962 if (ntohs(req->message.header.request.keylen) > 0 ||
963 req->message.header.request.extlen != 24) {
964 LOG(EXTENSION_LOG_WARNING,
965 "Compaction received bad ext/key len %d/%d.",
966 req->message.header.request.extlen,
967 ntohs(req->message.header.request.keylen));
968 msg = "Incorrect packet format.";
969 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
971 PROTOCOL_BINARY_RAW_BYTES,
972 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
974 EPStats& stats = e->getEpStats();
975 compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
976 compactreq.purge_before_seq =
977 ntohll(req->message.body.purge_before_seq);
978 compactreq.drop_deletes = req->message.body.drop_deletes;
979 compactreq.db_file_id = e->getKVBucket()->getDBFileId(*req);
980 uint16_t vbid = ntohs(req->message.header.request.vbucket);
982 ENGINE_ERROR_CODE err;
983 void* es = e->getEngineSpecific(cookie);
985 ++stats.pendingCompactions;
986 e->storeEngineSpecific(cookie, e);
987 err = e->compactDB(vbid, compactreq, cookie);
989 e->storeEngineSpecific(cookie, NULL);
990 err = ENGINE_SUCCESS;
995 LOG(EXTENSION_LOG_NOTICE,
996 "Compaction of db file id: %d completed.", compactreq.db_file_id);
998 case ENGINE_NOT_MY_VBUCKET:
999 --stats.pendingCompactions;
1000 LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1001 "because the db file doesn't exist!!!", compactreq.db_file_id);
1002 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1005 --stats.pendingCompactions;
1006 LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1007 "because of an invalid argument", compactreq.db_file_id);
1008 res = PROTOCOL_BINARY_RESPONSE_EINVAL;
1010 case ENGINE_EWOULDBLOCK:
1011 LOG(EXTENSION_LOG_NOTICE,
1012 "Compaction of db file id: %d scheduled "
1013 "(awaiting completion).", compactreq.db_file_id);
1014 e->storeEngineSpecific(cookie, req);
1015 return ENGINE_EWOULDBLOCK;
1016 case ENGINE_TMPFAIL:
1017 LOG(EXTENSION_LOG_WARNING, "Request to compact db file id: %d hit"
1018 " a temporary failure and may need to be retried",
1019 compactreq.db_file_id);
1020 msg = "Temporary failure in compacting db file.";
1021 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1024 --stats.pendingCompactions;
1025 LOG(EXTENSION_LOG_WARNING, "Compaction of db file id: %d failed "
1026 "because of unknown reasons\n", compactreq.db_file_id);
1027 msg = "Failed to compact db file. Unknown reason.";
1028 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1032 if (err != ENGINE_NOT_MY_VBUCKET) {
1033 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1034 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1037 return e->sendNotMyVBucketResponse(response, cookie, cas);
1041 static ENGINE_ERROR_CODE processUnknownCommand(
1042 EventuallyPersistentEngine* h,
1044 protocol_binary_request_header* request,
1045 ADD_RESPONSE response,
1046 DocNamespace docNamespace) {
1047 protocol_binary_response_status res =
1048 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1049 std::string dynamic_msg;
1050 const char* msg = NULL;
1051 size_t msg_size = 0;
1054 EPStats& stats = h->getEpStats();
1055 ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1058 * Session validation
1059 * (For ns_server commands only)
1061 switch (request->request.opcode) {
1062 case PROTOCOL_BINARY_CMD_SET_PARAM:
1063 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1064 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1065 case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1066 case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1067 case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1068 case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1069 if (h->getEngineSpecific(cookie) == NULL) {
1070 uint64_t cas = ntohll(request->request.cas);
1071 if (!h->validateSessionCas(cas)) {
1072 const std::string message("Invalid session token");
1073 return sendResponse(response, NULL, 0, NULL, 0,
1074 message.c_str(), message.length(),
1075 PROTOCOL_BINARY_RAW_BYTES,
1076 PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1086 switch (request->request.opcode) {
1087 case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1088 return h->getAllVBucketSequenceNumbers(cookie, request, response);
1090 case PROTOCOL_BINARY_CMD_GET_VBUCKET: {
1091 BlockTimer timer(&stats.getVbucketCmdHisto);
1092 rv = getVBucket(h, cookie, request, response);
1095 case PROTOCOL_BINARY_CMD_DEL_VBUCKET: {
1096 BlockTimer timer(&stats.delVbucketCmdHisto);
1097 rv = delVBucket(h, cookie, request, response);
1098 if (rv != ENGINE_EWOULDBLOCK) {
1099 h->decrementSessionCtr();
1100 h->storeEngineSpecific(cookie, NULL);
1104 case PROTOCOL_BINARY_CMD_SET_VBUCKET: {
1105 BlockTimer timer(&stats.setVbucketCmdHisto);
1106 rv = setVBucket(h, cookie, request, response);
1107 h->decrementSessionCtr();
1110 case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1111 res = h->stopFlusher(&msg, &msg_size);
1113 case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1114 res = h->startFlusher(&msg, &msg_size);
1116 case PROTOCOL_BINARY_CMD_SET_PARAM:
1118 reinterpret_cast<protocol_binary_request_set_param*>(request),
1120 msg = dynamic_msg.c_str();
1121 msg_size = dynamic_msg.length();
1122 h->decrementSessionCtr();
1124 case PROTOCOL_BINARY_CMD_EVICT_KEY:
1125 res = evictKey(h, request, &msg, &msg_size, docNamespace);
1127 case PROTOCOL_BINARY_CMD_OBSERVE:
1128 return h->observe(cookie, request, response, docNamespace);
1129 case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1130 return h->observe_seqno(cookie, request, response);
1131 case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT: {
1132 rv = h->deregisterTapClient(cookie, request, response);
1133 h->decrementSessionCtr();
1136 case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN: {
1137 rv = h->resetReplicationChain(cookie, request, response);
1140 case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER: {
1141 rv = h->changeTapVBFilter(cookie, request, response);
1142 h->decrementSessionCtr();
1145 case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1146 case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1147 case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE: {
1148 rv = h->handleCheckpointCmds(cookie, request, response);
1151 case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE: {
1152 rv = h->handleSeqnoCmds(cookie, request, response);
1155 case PROTOCOL_BINARY_CMD_GET_META:
1156 case PROTOCOL_BINARY_CMD_GETQ_META: {
1157 rv = h->getMeta(cookie,
1158 reinterpret_cast<protocol_binary_request_get_meta*>
1159 (request), response,
1163 case PROTOCOL_BINARY_CMD_SET_WITH_META:
1164 case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1165 case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1166 case PROTOCOL_BINARY_CMD_ADDQ_WITH_META: {
1167 rv = h->setWithMeta(cookie,
1168 reinterpret_cast<protocol_binary_request_set_with_meta*>
1169 (request), response,
1173 case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1174 case PROTOCOL_BINARY_CMD_DELQ_WITH_META: {
1175 rv = h->deleteWithMeta(cookie,
1176 reinterpret_cast<protocol_binary_request_delete_with_meta*>
1177 (request), response,
1181 case PROTOCOL_BINARY_CMD_RETURN_META: {
1182 return h->returnMeta(cookie,
1183 reinterpret_cast<protocol_binary_request_return_meta*>
1184 (request), response,
1187 case PROTOCOL_BINARY_CMD_GET_REPLICA:
1188 rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res, docNamespace);
1189 if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1193 case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1194 case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC: {
1195 rv = h->handleTrafficControlCmd(cookie, request, response);
1198 case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG: {
1199 rv = h->setClusterConfig(cookie,
1200 reinterpret_cast<protocol_binary_request_set_cluster_config*>
1201 (request), response);
1202 h->decrementSessionCtr();
1205 case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1206 return h->getClusterConfig(cookie,
1207 reinterpret_cast<protocol_binary_request_get_cluster_config*>
1208 (request), response);
1209 case PROTOCOL_BINARY_CMD_COMPACT_DB: {
1210 rv = compactDB(h, cookie,
1211 (protocol_binary_request_compact_db*)(request),
1213 if (rv != ENGINE_EWOULDBLOCK) {
1214 h->decrementSessionCtr();
1215 h->storeEngineSpecific(cookie, NULL);
1219 case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY: {
1220 if (request->request.extlen != 0 ||
1221 request->request.keylen != 0 ||
1222 request->request.bodylen != 0) {
1223 return ENGINE_EINVAL;
1225 return h->getRandomKey(cookie, response);
1227 case PROTOCOL_BINARY_CMD_GET_KEYS: {
1228 return h->getAllKeys(cookie,
1229 reinterpret_cast<protocol_binary_request_get_keys*>
1230 (request), response,
1233 // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
1234 case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1235 case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE: {
1236 return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
1237 PROTOCOL_BINARY_RAW_BYTES,
1238 PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
1244 uint32_t flags = itm->getFlags();
1245 rv = sendResponse(response,
1246 static_cast<const void*>(itm->getKey().data()),
1247 itm->getKey().size(),
1248 (const void*)&flags, sizeof(uint32_t),
1249 static_cast<const void*>(itm->getData()),
1250 itm->getNBytes(), itm->getDataType(),
1251 static_cast<uint16_t>(res), itm->getCas(),
1254 } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1255 return h->sendNotMyVBucketResponse(response, cookie, 0);
1257 msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1258 rv = sendResponse(response, NULL, 0, NULL, 0,
1259 msg, static_cast<uint16_t>(msg_size),
1260 PROTOCOL_BINARY_RAW_BYTES,
1261 static_cast<uint16_t>(res), 0, cookie);
1267 static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1269 protocol_binary_request_header
1271 ADD_RESPONSE response,
1272 DocNamespace doc_namespace) {
1273 auto engine = acquireEngine(handle);
1274 auto ret = processUnknownCommand(
1275 engine.get(), cookie, request, response, doc_namespace);
1279 static void EvpItemSetCas(ENGINE_HANDLE*, const void*,
1280 item* itm, uint64_t cas) {
1281 static_cast<Item*>(itm)->setCas(cas);
1284 static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1286 void* engine_specific,
1290 tap_event_t tap_event,
1301 if (!mcbp::datatype::is_valid(datatype)) {
1302 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1304 return ENGINE_EINVAL;
1307 return acquireEngine(handle)->tapNotify(cookie,
1312 (uint16_t)tap_event,
1325 static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1326 const void* cookie, item** itm,
1327 void** es, uint16_t* nes, uint8_t* ttl,
1328 uint16_t* flags, uint32_t* seqno,
1329 uint16_t* vbucket) {
1330 uint16_t tap_event = acquireEngine(handle)->walkTapQueue(
1331 cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1332 return static_cast<tap_event_t>(tap_event);
1335 static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1340 const void* userdata,
1342 auto engine = acquireEngine(handle);
1343 TAP_ITERATOR iterator = NULL;
1345 std::string c(static_cast<const char*>(client), nclient);
1346 // Figure out what we want from the userdata before adding it to
1347 // the API to the handle
1348 if (engine->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1349 iterator = EvpTapIterator;
1356 static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1358 struct dcp_message_producers* producers) {
1359 auto engine = acquireEngine(handle);
1360 ConnHandler* conn = engine->getConnHandler(cookie);
1362 return conn->step(producers);
1364 return ENGINE_DISCONNECT;
1368 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1375 return acquireEngine(handle)->dcpOpen(
1376 cookie, opaque, seqno, flags, name, nname);
1379 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1384 return acquireEngine(handle)->dcpAddStream(cookie, opaque, vbucket, flags);
1387 static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1391 auto engine = acquireEngine(handle);
1392 ConnHandler* conn = engine->getConnHandler(cookie);
1394 return conn->closeStream(opaque, vbucket);
1396 return ENGINE_DISCONNECT;
1400 static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1405 uint64_t startSeqno,
1407 uint64_t vbucketUuid,
1408 uint64_t snapStartSeqno,
1409 uint64_t snapEndSeqno,
1410 uint64_t* rollbackSeqno,
1411 dcp_add_failover_log callback) {
1412 auto engine = acquireEngine(handle);
1413 ConnHandler* conn = engine->getConnHandler(cookie);
1415 return conn->streamRequest(flags,
1426 return ENGINE_DISCONNECT;
1429 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1433 dcp_add_failover_log callback) {
1434 auto engine = acquireEngine(handle);
1435 ConnHandler* conn = engine->getConnHandler(cookie);
1437 return conn->getFailoverLog(opaque, vbucket, callback);
1439 return ENGINE_DISCONNECT;
1443 static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1448 auto engine = acquireEngine(handle);
1449 ConnHandler* conn = engine->getConnHandler(cookie);
1451 return conn->streamEnd(opaque, vbucket, flags);
1453 return ENGINE_DISCONNECT;
1457 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1461 uint64_t start_seqno,
1464 auto engine = acquireEngine(handle);
1465 ConnHandler* conn = engine->getConnHandler(cookie);
1467 return conn->snapshotMarker(
1468 opaque, vbucket, start_seqno, end_seqno, flags);
1470 return ENGINE_DISCONNECT;
1473 static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1477 cb::const_byte_buffer value,
1485 uint32_t expiration,
1487 cb::const_byte_buffer meta,
1489 if (!mcbp::datatype::is_valid(datatype)) {
1490 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1492 return ENGINE_EINVAL;
1494 auto engine = acquireEngine(handle);
1495 ConnHandler* conn = engine->getConnHandler(cookie);
1497 return conn->mutation(opaque, key, value, priv_bytes, datatype, cas,
1498 vbucket, flags, by_seqno, rev_seqno, expiration,
1499 lock_time, meta, nru);
1501 return ENGINE_DISCONNECT;
1504 static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1508 cb::const_byte_buffer value,
1515 cb::const_byte_buffer meta) {
1516 auto engine = acquireEngine(handle);
1517 ConnHandler* conn = engine->getConnHandler(cookie);
1519 return conn->deletion(opaque, key, value, priv_bytes, datatype, cas,
1520 vbucket, by_seqno, rev_seqno, meta);
1522 return ENGINE_DISCONNECT;
1525 static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1529 cb::const_byte_buffer value,
1536 cb::const_byte_buffer meta) {
1537 auto engine = acquireEngine(handle);
1538 ConnHandler* conn = engine->getConnHandler(cookie);
1540 return conn->expiration(opaque, key, value, priv_bytes, datatype, cas,
1541 vbucket, by_seqno, rev_seqno, meta);
1543 return ENGINE_DISCONNECT;
1546 static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1550 auto engine = acquireEngine(handle);
1551 ConnHandler* conn = engine->getConnHandler(cookie);
1553 return conn->flushall(opaque, vbucket);
1555 return ENGINE_DISCONNECT;
1558 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1562 vbucket_state_t state) {
1563 auto engine = acquireEngine(handle);
1564 ConnHandler* conn = engine->getConnHandler(cookie);
1566 return conn->setVBucketState(opaque, vbucket, state);
1568 return ENGINE_DISCONNECT;
1571 static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1574 auto engine = acquireEngine(handle);
1575 ConnHandler* conn = engine->getConnHandler(cookie);
1577 return conn->noop(opaque);
1579 return ENGINE_DISCONNECT;
1582 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1586 uint32_t buffer_bytes) {
1587 auto engine = acquireEngine(handle);
1588 ConnHandler* conn = engine->getConnHandler(cookie);
1590 return conn->bufferAcknowledgement(opaque, vbucket, buffer_bytes);
1592 return ENGINE_DISCONNECT;
1595 static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1602 auto engine = acquireEngine(handle);
1603 ConnHandler* conn = engine->getConnHandler(cookie);
1605 return conn->control(opaque, key, nkey, value, nvalue);
1607 return ENGINE_DISCONNECT;
1610 static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1612 protocol_binary_response_header* response) {
1613 auto engine = acquireEngine(handle);
1614 ConnHandler* conn = engine->getConnHandler(cookie);
1616 if (conn->handleResponse(response)) {
1617 return ENGINE_SUCCESS;
1620 return ENGINE_DISCONNECT;
1623 static void EvpHandleDisconnect(const void* cookie,
1624 ENGINE_EVENT_TYPE type,
1625 const void* event_data,
1626 const void* cb_data) {
1627 if (type != ON_DISCONNECT) {
1628 throw std::invalid_argument("EvpHandleDisconnect: type "
1629 "(which is" + std::to_string(type) +
1630 ") is not ON_DISCONNECT");
1632 if (event_data != nullptr) {
1633 throw std::invalid_argument("EvpHandleDisconnect: event_data "
1636 void* c = const_cast<void*>(cb_data);
1637 acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1640 static void EvpHandleDeleteBucket(const void* cookie,
1641 ENGINE_EVENT_TYPE type,
1642 const void* event_data,
1643 const void* cb_data) {
1644 if (type != ON_DELETE_BUCKET) {
1645 throw std::invalid_argument("EvpHandleDeleteBucket: type "
1646 "(which is" + std::to_string(type) +
1647 ") is not ON_DELETE_BUCKET");
1649 if (event_data != nullptr) {
1650 throw std::invalid_argument("EvpHandleDeleteBucket: event_data "
1653 void* c = const_cast<void*>(cb_data);
1654 acquireEngine(static_cast<ENGINE_HANDLE*>(c))->handleDeleteBucket(cookie);
1657 void EvpSetLogLevel(ENGINE_HANDLE* handle, EXTENSION_LOG_LEVEL level) {
1658 Logger::setGlobalLogLevel(level);
1662 * The only public interface to the eventually persistent engine.
1663 * Allocate a new instance and initialize it
1664 * @param interface the highest interface the server supports (we only
1665 * support interface 1)
1666 * @param get_server_api callback function to get the server exported API
1668 * @param handle Where to return the new instance
1669 * @return ENGINE_SUCCESS on success
1671 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1672 GET_SERVER_API get_server_api,
1673 ENGINE_HANDLE** handle) {
1674 SERVER_HANDLE_V1* api = get_server_api();
1675 if (interface != 1 || api == NULL) {
1676 return ENGINE_ENOTSUP;
1679 Logger::setLoggerAPI(api->log);
1681 MemoryTracker::getInstance(*api->alloc_hooks);
1682 ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1684 std::atomic<size_t>* inital_tracking = new std::atomic<size_t>();
1686 ObjectRegistry::setStats(inital_tracking);
1687 EventuallyPersistentEngine* engine;
1688 engine = new EventuallyPersistentEngine(get_server_api);
1689 ObjectRegistry::setStats(NULL);
1691 if (engine == NULL) {
1692 return ENGINE_ENOMEM;
1695 if (MemoryTracker::trackingMemoryAllocations()) {
1696 engine->getEpStats().memoryTrackerEnabled.store(true);
1697 engine->getEpStats().totalMemory->store(inital_tracking->load());
1699 delete inital_tracking;
1701 initialize_time_functions(api->core);
1703 *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1705 return ENGINE_SUCCESS;
1709 This method is called prior to unloading of the shared-object.
1710 Global clean-up should be performed from this method.
1712 void destroy_engine() {
1713 ExecutorPool::shutdown();
1714 // A single MemoryTracker exists for *all* buckets
1715 // and must be destroyed before unloading the shared object.
1716 MemoryTracker::destroyInstance();
1717 ObjectRegistry::reset();
1720 static bool EvpGetItemInfo(ENGINE_HANDLE* handle, const void*,
1721 const item* itm, item_info* itm_info) {
1722 const Item* it = reinterpret_cast<const Item*>(itm);
1723 auto engine = acquireEngine(handle);
1724 VBucketPtr vb = engine->getKVBucket()->getVBucket(it->getVBucketId());
1725 uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
1726 *itm_info = it->toItemInfo(vb_uuid);
1730 static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1731 item* itm, const item_info* itm_info) {
1732 Item* it = reinterpret_cast<Item*>(itm);
1736 it->setDataType(itm_info->datatype);
1740 static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1742 engine_get_vb_map_cb callback) {
1743 auto engine = acquireEngine(handle);
1744 LockHolder lh(engine->clusterConfig.lock);
1745 const char* config = engine->clusterConfig.config.data();
1746 uint32_t len = engine->clusterConfig.config.size();
1747 engine.reset(); // Want to release the engine before the callback
1748 return callback(cookie, config, len);
1751 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1754 global_logger.vlog(severity, fmt, va);
1758 EventuallyPersistentEngine::EventuallyPersistentEngine(
1759 GET_SERVER_API get_server_api)
1763 workloadPriority(NO_BUCKET_PRIORITY),
1764 replicationThrottle(NULL),
1765 getServerApiFunc(get_server_api),
1767 dcpFlowControlManager_(NULL),
1770 checkpointConfig(NULL),
1771 trafficEnabled(false),
1772 deleteAllEnabled(false),
1775 interface.interface = 1;
1776 ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1777 ENGINE_HANDLE_V1::initialize = EvpInitialize;
1778 ENGINE_HANDLE_V1::destroy = EvpDestroy;
1779 ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1780 ENGINE_HANDLE_V1::allocate_ex = EvpItemAllocateEx;
1781 ENGINE_HANDLE_V1::remove = EvpItemDelete;
1782 ENGINE_HANDLE_V1::release = EvpItemRelease;
1783 ENGINE_HANDLE_V1::get = EvpGet;
1784 ENGINE_HANDLE_V1::get_if = EvpGetIf;
1785 ENGINE_HANDLE_V1::get_and_touch = EvpGetAndTouch;
1786 ENGINE_HANDLE_V1::get_locked = EvpGetLocked;
1787 ENGINE_HANDLE_V1::unlock = EvpUnlock;
1788 ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1789 ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1790 ENGINE_HANDLE_V1::store = EvpStore;
1791 ENGINE_HANDLE_V1::flush = EvpFlush;
1792 ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1793 ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1794 ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1795 ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1796 ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1797 ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1798 ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1800 ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1801 ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1802 ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1803 ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1804 ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1805 ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1806 ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1807 ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1808 ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1809 ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1810 ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1811 ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1812 ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1813 ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1814 ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1815 ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1816 ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1817 ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
1819 serverApi = getServerApiFunc();
1820 memset(&info, 0, sizeof(info));
1821 info.info.description = "EP engine v" VERSION;
1822 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1823 info.info.features[info.info.num_features++].feature =
1824 ENGINE_FEATURE_PERSISTENT_STORAGE;
1825 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1826 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1829 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1831 EventuallyPersistentEngine *epe =
1832 ObjectRegistry::onSwitchThread(NULL, true);
1833 ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1834 ObjectRegistry::onSwitchThread(epe);
1838 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1840 EventuallyPersistentEngine *epe =
1841 ObjectRegistry::onSwitchThread(NULL, true);
1842 ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1843 ObjectRegistry::onSwitchThread(epe);
1847 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1849 const void *cb_data) {
1850 EventuallyPersistentEngine *epe =
1851 ObjectRegistry::onSwitchThread(NULL, true);
1852 SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1853 sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1855 ObjectRegistry::onSwitchThread(epe);
1859 * A configuration value changed listener that responds to ep-engine
1860 * parameter changes by invoking engine-specific methods on
1861 * configuration change events.
1863 class EpEngineValueChangeListener : public ValueChangedListener {
1865 EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1869 virtual void sizeValueChanged(const std::string &key, size_t value) {
1870 if (key.compare("getl_max_timeout") == 0) {
1871 engine.setGetlMaxTimeout(value);
1872 } else if (key.compare("getl_default_timeout") == 0) {
1873 engine.setGetlDefaultTimeout(value);
1874 } else if (key.compare("max_item_size") == 0) {
1875 engine.setMaxItemSize(value);
1876 } else if (key.compare("max_item_privileged_bytes") == 0) {
1877 engine.setMaxItemPrivilegedBytes(value);
1878 } else if (key.compare("mem_merge_count_threshold") == 0) {
1879 engine.stats.mem_merge_count_threshold = value;
1880 } else if (key.compare("mem_merge_bytes_threshold") == 0) {
1881 engine.stats.mem_merge_bytes_threshold = value;
1885 virtual void booleanValueChanged(const std::string &key, bool value) {
1886 if (key.compare("flushall_enabled") == 0) {
1887 engine.setDeleteAll(value);
1891 EventuallyPersistentEngine &engine;
1896 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1898 if (config != NULL) {
1899 LOG(EXTENSION_LOG_NOTICE, "EPEngine::initialize: parsing config:\"%s\"",
1901 if (!configuration.parseConfiguration(config, serverApi)) {
1902 LOG(EXTENSION_LOG_WARNING, "Failed to parse the configuration config "
1903 "during bucket initialization. config=%s", config);
1904 return ENGINE_FAILED;
1908 name = configuration.getCouchBucket();
1909 maxFailoverEntries = configuration.getMaxFailoverEntries();
1911 // Start updating the variables from the config!
1912 HashTable::setDefaultNumBuckets(configuration.getHtSize());
1913 HashTable::setDefaultNumLocks(configuration.getHtLocks());
1914 StoredValue::setMutationMemoryThreshold(
1915 configuration.getMutationMemThreshold());
1917 if (configuration.getMaxSize() == 0) {
1918 configuration.setMaxSize(std::numeric_limits<size_t>::max());
1921 if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1922 stats.mem_low_wat_percent.store(0.75);
1923 configuration.setMemLowWat(percentOf(
1924 configuration.getMaxSize(), stats.mem_low_wat_percent.load()));
1927 if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1928 stats.mem_high_wat_percent.store(0.85);
1929 configuration.setMemHighWat(percentOf(
1930 configuration.getMaxSize(), stats.mem_high_wat_percent.load()));
1933 stats.mem_merge_count_threshold = configuration.getMemMergeCountThreshold();
1934 configuration.addValueChangedListener(
1935 "mem_merge_count_threshold",
1936 new EpEngineValueChangeListener(*this));
1938 stats.mem_merge_bytes_threshold = configuration.getMemMergeBytesThreshold();
1939 configuration.addValueChangedListener(
1940 "mem_merge_bytes_threshold",
1941 new EpEngineValueChangeListener(*this));
1943 maxItemSize = configuration.getMaxItemSize();
1944 configuration.addValueChangedListener("max_item_size",
1945 new EpEngineValueChangeListener(*this));
1947 maxItemPrivilegedBytes = configuration.getMaxItemPrivilegedBytes();
1948 configuration.addValueChangedListener(
1949 "max_item_privileged_bytes",
1950 new EpEngineValueChangeListener(*this));
1952 getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1953 configuration.addValueChangedListener("getl_default_timeout",
1954 new EpEngineValueChangeListener(*this));
1955 getlMaxTimeout = configuration.getGetlMaxTimeout();
1956 configuration.addValueChangedListener("getl_max_timeout",
1957 new EpEngineValueChangeListener(*this));
1959 deleteAllEnabled = configuration.isFlushallEnabled();
1960 configuration.addValueChangedListener("flushall_enabled",
1961 new EpEngineValueChangeListener(*this));
1963 workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
1964 configuration.getMaxNumShards());
1965 if ((unsigned int)workload->getNumShards() >
1966 configuration.getMaxVbuckets()) {
1967 LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
1968 "equal or less than max number of vbuckets");
1969 return ENGINE_FAILED;
1972 dcpConnMap_ = new DcpConnMap(*this);
1974 /* Get the flow control policy */
1975 std::string flowCtlPolicy = configuration.getDcpFlowControlPolicy();
1977 if (!flowCtlPolicy.compare("static")) {
1978 dcpFlowControlManager_ = new DcpFlowControlManagerStatic(*this);
1979 } else if (!flowCtlPolicy.compare("dynamic")) {
1980 dcpFlowControlManager_ = new DcpFlowControlManagerDynamic(*this);
1981 } else if (!flowCtlPolicy.compare("aggressive")) {
1982 dcpFlowControlManager_ = new DcpFlowControlManagerAggressive(*this);
1984 /* Flow control is not enabled */
1985 dcpFlowControlManager_ = new DcpFlowControlManager(*this);
1988 tapConnMap = new TapConnMap(*this);
1989 tapConfig = new TapConfig(*this);
1990 replicationThrottle = new ReplicationThrottle(configuration, stats);
1991 TapConfig::addConfigChangeListener(*this);
1993 checkpointConfig = new CheckpointConfig(*this);
1994 CheckpointConfig::addConfigChangeListener(*this);
1996 kvBucket = makeBucket(configuration);
1998 initializeEngineCallbacks();
2000 // Complete the initialization of the ep-store
2001 if (!kvBucket->initialize()) {
2002 return ENGINE_FAILED;
2005 if(configuration.isDataTrafficEnabled()) {
2006 enableTraffic(true);
2009 tapConnMap->initialize(TAP_CONN_NOTIFIER);
2010 dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2012 // record engine initialization time
2013 startupTime.store(ep_real_time());
2015 LOG(EXTENSION_LOG_NOTICE,
2016 "EP Engine: Initialization of %s bucket complete",
2017 configuration.getBucketType().c_str());
2019 return ENGINE_SUCCESS;
2022 void EventuallyPersistentEngine::destroy(bool force) {
2023 stats.forceShutdown = force;
2024 stats.isShutdown = true;
2026 // Perform a snapshot of the stats before shutting down so we can persist
2027 // the type of shutdown (stats.forceShutdown), and consequently on the
2028 // next warmup can determine is there was a clean shutdown - see
2029 // Warmup::cleanShutdown
2031 kvBucket->snapshotStats();
2034 tapConnMap->shutdownAllConnections();
2037 dcpConnMap_->shutdownAllConnections();
2041 ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
2044 const size_t nbytes,
2045 const size_t priv_nbytes,
2047 const rel_time_t exptime,
2050 if (priv_nbytes > maxItemPrivilegedBytes) {
2051 return ENGINE_E2BIG;
2054 if ((nbytes - priv_nbytes) > maxItemSize) {
2055 return ENGINE_E2BIG;
2058 if (!hasAvailableSpace(sizeof(Item) + sizeof(Blob) + key.size() + nbytes)) {
2059 return memoryCondition();
2062 time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
2064 uint8_t ext_meta[1];
2065 uint8_t ext_len = EXT_META_LEN;
2066 *(ext_meta) = datatype;
2067 *itm = new Item(key,
2078 return memoryCondition();
2080 stats.itemAllocSizeHisto.add(nbytes);
2081 return ENGINE_SUCCESS;
2085 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
2086 if (!deleteAllEnabled) {
2087 return ENGINE_ENOTSUP;
2090 if (!isDegradedMode()) {
2091 return ENGINE_TMPFAIL;
2095 * Supporting only a SYNC operation for bucket flush
2098 void* es = getEngineSpecific(cookie);
2100 // Check if diskDeleteAll was false and set it to true
2101 // if yes, if the atomic variable weren't false, then
2102 // we will assume that a deleteAll has been scheduled
2103 // already and return TMPFAIL.
2104 if (kvBucket->scheduleDeleteAllTask(cookie)) {
2105 storeEngineSpecific(cookie, this);
2106 return ENGINE_EWOULDBLOCK;
2108 LOG(EXTENSION_LOG_INFO,
2109 "Tried to trigger a bucket deleteAll, but"
2110 "there seems to be a task running already!");
2111 return ENGINE_TMPFAIL;
2115 storeEngineSpecific(cookie, NULL);
2116 LOG(EXTENSION_LOG_NOTICE, "Completed bucket deleteAll operation");
2117 return ENGINE_SUCCESS;
2121 cb::EngineErrorItemPair EventuallyPersistentEngine::get_and_touch(const void* cookie,
2125 auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2127 time_t expiry_time = exptime;
2129 auto* core = serverApi->core;
2130 expiry_time = core->abstime(core->realtime(exptime));
2132 GetValue gv(kvBucket->getAndUpdateTtl(key, vbucket, cookie, expiry_time));
2134 auto rv = gv.getStatus();
2135 if (rv == ENGINE_SUCCESS) {
2137 ++stats.numOpsStore;
2138 return std::make_pair(cb::engine_errc::success,
2139 cb::unique_item_ptr{gv.getValue(),
2140 cb::ItemDeleter{handle}});
2143 if (isDegradedMode()) {
2144 // Remap all some of the error codes
2146 case ENGINE_KEY_EEXISTS:
2147 case ENGINE_KEY_ENOENT:
2148 case ENGINE_NOT_MY_VBUCKET:
2149 rv = ENGINE_TMPFAIL;
2156 if (rv == ENGINE_KEY_EEXISTS) {
2160 return std::make_pair(cb::engine_errc(rv),
2161 cb::unique_item_ptr{nullptr,
2162 cb::ItemDeleter{handle}});
2165 cb::EngineErrorItemPair EventuallyPersistentEngine::get_if(const void* cookie,
2168 std::function<bool(const item_info&)>filter) {
2170 auto* handle = reinterpret_cast<ENGINE_HANDLE*>(this);
2172 // Fetch an item from the hashtable (without trying to schedule a bg-fetch
2173 // and pass it through the filter. If the filter accepts the document
2174 // based on the metadata, return the document. If the document's data
2175 // isn't resident we run another iteration in the loop and retries the
2176 // action but this time we _do_ schedule a bg-fetch.
2177 for (int ii = 0; ii < 2; ++ii) {
2178 auto options = static_cast<get_options_t>(HONOR_STATES |
2183 if (ii == 1 || kvBucket->getItemEvictionPolicy() == FULL_EVICTION) {
2184 options = static_cast<get_options_t>(int(options) | QUEUE_BG_FETCH);
2187 BlockTimer timer(&stats.getCmdHisto);
2188 GetValue gv(kvBucket->get(key, vbucket, cookie, options));
2189 ENGINE_ERROR_CODE status = gv.getStatus();
2192 case ENGINE_SUCCESS:
2195 case ENGINE_KEY_ENOENT: // FALLTHROUGH
2196 case ENGINE_NOT_MY_VBUCKET: // FALLTHROUGH
2197 if (isDegradedMode()) {
2198 status = ENGINE_TMPFAIL;
2202 return std::make_pair(cb::engine_errc(status),
2203 cb::unique_item_ptr{nullptr,
2204 cb::ItemDeleter{handle}});
2207 auto* item = gv.getValue();
2208 cb::unique_item_ptr ret{item, cb::ItemDeleter{handle}};
2210 const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
2211 const uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
2214 if (filter(item->toItemInfo(vb_uuid))) {
2215 if (!gv.isPartial()) {
2216 return std::make_pair(cb::engine_errc::success,
2217 cb::unique_item_ptr{ret.release(),
2218 cb::ItemDeleter{handle}});
2220 // We want this item, but we need to fetch it off disk
2222 // the client don't care about this thing..
2224 return std::make_pair(cb::engine_errc::success,
2225 cb::unique_item_ptr{ret.release(),
2226 cb::ItemDeleter{handle}});
2230 // It should not be possible to get as the second iteration in the loop
2231 // SHOULD handle backround fetches an the item should NOT be partial!
2232 throw std::logic_error("EventuallyPersistentEngine::get_if: loop terminated");
2235 ENGINE_ERROR_CODE EventuallyPersistentEngine::get_locked(const void* cookie,
2239 uint32_t lock_timeout) {
2241 auto default_timeout = static_cast<uint32_t>(getGetlDefaultTimeout());
2243 if (lock_timeout == 0) {
2244 lock_timeout = default_timeout;
2245 } else if (lock_timeout > static_cast<uint32_t>(getGetlMaxTimeout())) {
2246 LOG(EXTENSION_LOG_WARNING,
2247 "EventuallyPersistentEngine::get_locked: "
2248 "Illegal value for lock timeout specified %u. "
2249 "Using default value: %u", lock_timeout, default_timeout);
2250 lock_timeout = default_timeout;
2253 auto result = kvBucket->getLocked(key, vbucket, ep_current_time(),
2254 lock_timeout, cookie);
2256 if (result.getStatus() == ENGINE_SUCCESS) {
2258 *itm = result.getValue();
2261 return result.getStatus();
2264 ENGINE_ERROR_CODE EventuallyPersistentEngine::unlock(const void* cookie,
2268 return kvBucket->unlockKey(key, vbucket, cas, ep_current_time());
2272 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2275 ENGINE_STORE_OPERATION
2277 BlockTimer timer(&stats.storeCmdHisto);
2278 ENGINE_ERROR_CODE ret;
2279 Item *it = static_cast<Item*>(itm);
2281 switch (operation) {
2283 if (it->getCas() == 0) {
2284 // Using a cas command with a cas wildcard doesn't make sense
2285 ret = ENGINE_NOT_STORED;
2290 if (isDegradedMode()) {
2291 return ENGINE_TMPFAIL;
2293 ret = kvBucket->set(*it, cookie);
2294 if (ret == ENGINE_SUCCESS) {
2295 *cas = it->getCas();
2301 if (isDegradedMode()) {
2302 return ENGINE_TMPFAIL;
2305 if (it->getCas() != 0) {
2306 // Adding an item with a cas value doesn't really make sense...
2307 return ENGINE_KEY_EEXISTS;
2310 ret = kvBucket->add(*it, cookie);
2311 if (ret == ENGINE_SUCCESS) {
2312 *cas = it->getCas();
2316 case OPERATION_REPLACE:
2317 ret = kvBucket->replace(*it, cookie);
2318 if (ret == ENGINE_SUCCESS) {
2319 *cas = it->getCas();
2323 ret = ENGINE_ENOTSUP;
2327 case ENGINE_SUCCESS:
2328 ++stats.numOpsStore;
2331 ret = memoryCondition();
2333 case ENGINE_NOT_STORED:
2334 case ENGINE_NOT_MY_VBUCKET:
2335 if (isDegradedMode()) {
2336 return ENGINE_TMPFAIL;
2346 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2366 if (connection->shouldFlush()) {
2370 if (connection->isTimeForNoop()) {
2371 LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2372 connection->logHeader());
2376 if (connection->isSuspended() || connection->windowIsFull()) {
2377 LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2378 " suspended state or its ack windows is full.\n",
2379 connection->logHeader());
2383 uint16_t ret = TAP_PAUSE;
2384 VBucketEvent ev = connection->nextVBucketHighPriority();
2385 if (ev.event != TAP_PAUSE) {
2387 case TAP_VBUCKET_SET:
2388 LOG(EXTENSION_LOG_NOTICE,
2389 "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2390 connection->logHeader(), ev.vbucket,
2391 VBucket::toString(ev.state));
2392 connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2395 LOG(EXTENSION_LOG_NOTICE,
2396 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2397 connection->logHeader(),
2398 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2400 connection->opaqueCommandCode = (uint32_t) ev.state;
2401 *vbucket = ev.vbucket;
2402 *es = &connection->opaqueCommandCode;
2403 *nes = sizeof(connection->opaqueCommandCode);
2407 throw std::logic_error("EventuallyPersistentEngine::doWalkTapQueue:"
2408 " Unknown VBucketEvent message type:" +
2409 std::to_string(ev.event) + " for connection:" +
2410 connection->logHeader());
2415 if (connection->waitForOpaqueMsgAck()) {
2419 VBucketFilter backFillVBFilter;
2420 if (connection->runBackfill(backFillVBFilter)) {
2421 queueBackfill(backFillVBFilter, connection);
2424 uint8_t nru = INITIAL_NRU_VALUE;
2425 Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2427 case TAP_CHECKPOINT_START:
2428 case TAP_CHECKPOINT_END:
2432 if (ret == TAP_MUTATION) {
2433 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2434 it->getRevSeqno(), nru);
2435 *es = connection->specificData;
2436 } else if (ret == TAP_DELETION) {
2437 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2439 *es = connection->specificData;
2440 } else if (ret == TAP_CHECKPOINT_START) {
2441 // Send the current value of the max deleted seqno
2442 VBucketPtr vb = getVBucket(*vbucket);
2447 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2448 vb->ht.getMaxDeletedRevSeqno());
2449 *es = connection->specificData;
2459 if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2460 VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2461 if (vbev.event == TAP_VBUCKET_SET) {
2462 LOG(EXTENSION_LOG_NOTICE,
2463 "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2464 connection->logHeader(), vbev.vbucket,
2465 VBucket::toString(vbev.state));
2466 connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2474 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2481 uint16_t *vbucket) {
2482 TapProducer *connection = getTapProducer(cookie);
2484 LOG(EXTENSION_LOG_WARNING,
2485 "Failed to lookup TAP connection.. Disconnecting\n");
2486 return TAP_DISCONNECT;
2489 connection->setPaused(false);
2494 connection->setLastWalkTime();
2496 ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2497 seqno, vbucket, connection, retry);
2500 if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2501 connection->lastMsgTime = ep_current_time();
2502 if (ret == TAP_NOOP) {
2505 ++stats.numTapFetched;
2506 *seqno = connection->getSeqno();
2507 if (connection->requestAck(ret, *vbucket)) {
2508 *flags = TAP_FLAG_ACK;
2509 connection->seqnoAckRequested = *seqno;
2512 if (ret == TAP_MUTATION) {
2513 if (connection->haveFlagByteorderSupport()) {
2514 *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2519 connection->setPaused(true);
2520 connection->setNotifySent(false);
2526 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2527 std::string &client,
2529 const void *userdata,
2531 if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2535 std::string tapName = "eq_tapq:";
2536 if (client.length() == 0) {
2537 tapName.assign(ConnHandler::getAnonName());
2539 tapName.append(client);
2542 // Decoding the userdata section of the packet and update the filters
2543 const char *ptr = static_cast<const char*>(userdata);
2544 uint64_t backfillAge = 0;
2545 std::vector<uint16_t> vbuckets;
2546 std::map<uint16_t, uint64_t> lastCheckpointIds;
2548 if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2549 if (nuserdata < sizeof(backfillAge)) {
2550 LOG(EXTENSION_LOG_WARNING,
2551 "Backfill age is missing. Reject connection request from %s\n",
2555 // use memcpy to avoid alignemt issues
2556 memcpy(&backfillAge, ptr, sizeof(backfillAge));
2557 backfillAge = ntohll(backfillAge);
2558 nuserdata -= sizeof(backfillAge);
2559 ptr += sizeof(backfillAge);
2562 if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2564 if (nuserdata < sizeof(nvbuckets)) {
2565 LOG(EXTENSION_LOG_WARNING,
2566 "Number of vbuckets is missing. Reject connection request from %s"
2567 "\n", tapName.c_str());
2570 memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2571 nuserdata -= sizeof(nvbuckets);
2572 ptr += sizeof(nvbuckets);
2573 nvbuckets = ntohs(nvbuckets);
2574 if (nvbuckets > 0) {
2575 if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2576 LOG(EXTENSION_LOG_WARNING,
2577 "# of vbuckets not matched. Reject connection request from %s"
2578 "\n", tapName.c_str());
2581 for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2583 memcpy(&val, ptr, sizeof(nvbuckets));
2584 ptr += sizeof(uint16_t);
2585 vbuckets.push_back(ntohs(val));
2587 nuserdata -= (sizeof(uint16_t) * nvbuckets);
2591 if (flags & TAP_CONNECT_CHECKPOINT) {
2592 uint16_t nCheckpoints = 0;
2593 if (nuserdata >= sizeof(nCheckpoints)) {
2594 memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2595 nuserdata -= sizeof(nCheckpoints);
2596 ptr += sizeof(nCheckpoints);
2597 nCheckpoints = ntohs(nCheckpoints);
2599 if (nCheckpoints > 0) {
2601 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2602 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2603 "Reject connection request from %s\n", tapName.c_str());
2606 for (uint16_t j = 0; j < nCheckpoints; ++j) {
2608 uint64_t checkpointId;
2609 memcpy(&vbid, ptr, sizeof(vbid));
2610 ptr += sizeof(uint16_t);
2611 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2612 ptr += sizeof(uint64_t);
2613 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2618 TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2621 configuration.getTapKeepalive()),
2625 tapConnMap->notifyPausedConnection(tp, true);
2629 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2630 void *engine_specific,
2647 void *specific = getEngineSpecific(cookie);
2648 ConnHandler *connection = NULL;
2649 if (specific == NULL) {
2650 if (tap_event == TAP_ACK) {
2651 LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2652 "exist. Force disconnect...\n", (char *) cookie);
2653 // tap producer is no longer connected..
2654 return ENGINE_DISCONNECT;
2656 connection = tapConnMap->newConsumer(cookie);
2657 if (connection == NULL) {
2658 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2659 " Force disconnect\n");
2660 return ENGINE_DISCONNECT;
2662 storeEngineSpecific(cookie, connection);
2665 connection = reinterpret_cast<ConnHandler *>(specific);
2669 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2671 if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2672 if (!replicationThrottle->shouldProcess()) {
2673 ++stats.replicationThrottled;
2674 if (connection->supportsAck()) {
2675 ret = ENGINE_TMPFAIL;
2677 ret = ENGINE_DISCONNECT;
2678 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2679 "ack support. Force disconnect...\n",
2680 connection->logHeader());
2686 switch (tap_event) {
2688 // TAP works only with the DefaultCollection
2689 ret = processTapAck(cookie, tap_seqno, tap_flags,
2690 DocKey(static_cast<const uint8_t*>(key), nkey,
2691 DocNamespace::DefaultCollection));
2694 ret = flush(cookie);
2695 LOG(EXTENSION_LOG_NOTICE, "%s Received flush.\n",
2696 connection->logHeader());
2701 TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2702 nengine, &revSeqno);
2704 // Create key in DefaultCollection since TAP won't support collections
2705 const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2706 DocNamespace::DefaultCollection};
2707 ret = connection->deletion(0, docKey, {}, 0,
2708 PROTOCOL_BINARY_RAW_BYTES, cas, vbucket,
2713 case TAP_CHECKPOINT_START:
2714 case TAP_CHECKPOINT_END:
2716 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2718 if (tap_event == TAP_CHECKPOINT_START &&
2719 nengine == TapEngineSpecific::sizeRevSeqno) {
2720 // Set the current value for the max deleted seqno
2721 VBucketPtr vb = getVBucket(vbucket);
2723 return ENGINE_TMPFAIL;
2726 TapEngineSpecific::readSpecificData(tap_event,
2730 vb->ht.setMaxDeletedRevSeqno(seqnum);
2734 uint64_t checkpointId;
2735 memcpy(&checkpointId, data, sizeof(checkpointId));
2736 checkpointId = ntohll(checkpointId);
2737 ConnHandlerCheckPoint(tc, tap_event, vbucket,
2741 ret = ENGINE_DISCONNECT;
2742 LOG(EXTENSION_LOG_WARNING,
2743 "%s Checkpoint Id is missing in "
2744 "CHECKPOINT messages. Force disconnect...\n",
2745 connection->logHeader());
2749 ret = ENGINE_DISCONNECT;
2750 LOG(EXTENSION_LOG_WARNING,
2751 "%s not a consumer! Force disconnect\n",
2752 connection->logHeader());
2760 uint8_t nru = INITIAL_NRU_VALUE;
2761 uint64_t revSeqno = 0;
2762 TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2763 nengine, &revSeqno, &nru);
2765 if (!isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_JSON)) {
2766 datatype = PROTOCOL_BINARY_RAW_BYTES;
2767 const unsigned char *dat = (const unsigned char*)data;
2768 const int datlen = ndata;
2769 if (checkUTF8JSON(dat, datlen)) {
2770 datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2773 // Create key in DefaultCollection since TAP won't support collections
2774 const DocKey docKey{static_cast<const uint8_t*>(key), nkey,
2775 DocNamespace::DefaultCollection};
2776 ret = connection->mutation(0, docKey,
2777 {static_cast<const uint8_t*>(data), ndata},
2779 vbucket, flags, 0, revSeqno, exptime, 0,
2786 if (nengine == sizeof(uint32_t)) {
2788 memcpy(&cc, engine_specific, sizeof(cc));
2792 case TAP_OPAQUE_ENABLE_AUTO_NACK:
2793 // @todo: the memcached core will _ALWAYS_ send nack
2794 // if it encounter an error. This should be
2795 // set as the default when we move to .next after 2.0
2796 // (currently we need to allow the message for
2797 // backwards compatibility)
2798 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2799 connection->logHeader());
2801 case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2802 connection->setSupportCheckpointSync(true);
2803 LOG(EXTENSION_LOG_INFO,
2804 "%s Enable checkpoint synchronization\n",
2805 connection->logHeader());
2807 case TAP_OPAQUE_OPEN_CHECKPOINT:
2809 * This event is only received by the TAP client that wants to
2810 * get mutations from closed checkpoints only. At this time,
2811 * only incremental backup client receives this event so that
2812 * it can close the connection and reconnect later.
2814 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2815 connection->logHeader());
2817 case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2819 LOG(EXTENSION_LOG_INFO,
2820 "%s Backfill started for vbucket %d.\n",
2821 connection->logHeader(), vbucket);
2822 BlockTimer timer(&stats.tapVbucketResetHisto);
2823 ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2825 if (ret == ENGINE_DISCONNECT) {
2826 LOG(EXTENSION_LOG_WARNING,
2827 "%s Failed to reset a vbucket %d. Force disconnect\n",
2828 connection->logHeader(), vbucket);
2830 LOG(EXTENSION_LOG_NOTICE,
2831 "%s Reset vbucket %d was completed succecssfully.\n",
2832 connection->logHeader(), vbucket);
2835 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2837 tc->setBackfillPhase(true, vbucket);
2839 ret = ENGINE_DISCONNECT;
2840 LOG(EXTENSION_LOG_WARNING,
2841 "TAP consumer doesn't exists. Force disconnect\n");
2845 case TAP_OPAQUE_CLOSE_BACKFILL:
2847 LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2848 connection->logHeader());
2849 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2851 tc->setBackfillPhase(false, vbucket);
2853 ret = ENGINE_DISCONNECT;
2854 LOG(EXTENSION_LOG_WARNING,
2855 "%s not a consumer! Force disconnect\n",
2856 connection->logHeader());
2860 case TAP_OPAQUE_CLOSE_TAP_STREAM:
2862 * This event is sent by the eVBucketMigrator to notify that
2863 * the source node closes the tap replication stream and
2864 * switches to TAKEOVER_VBUCKETS phase.
2865 * This is just an informative message and doesn't require any
2868 LOG(EXTENSION_LOG_INFO,
2869 "%s Received close tap stream. Switching to takeover phase.\n",
2870 connection->logHeader());
2872 case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2874 * This opaque message is just for notifying that the source
2875 * node receives change_vbucket_filter request and processes
2878 LOG(EXTENSION_LOG_INFO,
2879 "%s Notified that the source node changed a vbucket filter.\n",
2880 connection->logHeader());
2883 LOG(EXTENSION_LOG_WARNING,
2884 "%s Received an unknown opaque command\n",
2885 connection->logHeader());
2888 LOG(EXTENSION_LOG_WARNING,
2889 "%s Received tap opaque with unknown size %d\n",
2890 connection->logHeader(), nengine);
2894 case TAP_VBUCKET_SET:
2896 BlockTimer timer(&stats.tapVbucketSetHisto);
2898 if (nengine != sizeof(vbucket_state_t)) {
2900 LOG(EXTENSION_LOG_WARNING,
2901 "%s Received TAP_VBUCKET_SET with illegal size."
2902 " Force disconnect\n", connection->logHeader());
2903 ret = ENGINE_DISCONNECT;
2907 vbucket_state_t state;
2908 memcpy(&state, engine_specific, nengine);
2909 state = (vbucket_state_t)ntohl(state);
2911 ret = connection->setVBucketState(0, vbucket, state);
2917 LOG(EXTENSION_LOG_WARNING,
2918 "%s Recieved bad opcode, ignoring message\n",
2919 connection->logHeader());
2922 connection->processedEvent(tap_event, ret);
2926 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2927 TapConsumer *consumer,
2930 uint64_t checkpointId) {
2931 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2933 if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2934 getKVBucket()->wakeUpFlusher();
2935 ret = ENGINE_SUCCESS;
2938 ret = ENGINE_DISCONNECT;
2939 LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2940 "checkpoint %" PRIu64 ". Force disconnect\n",
2941 consumer->logHeader(), checkpointId);
2947 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2949 reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2950 if (!(rv && rv->isConnected())) {
2951 LOG(EXTENSION_LOG_WARNING,
2952 "Walking a non-existent tap queue, disconnecting\n");
2956 if (rv->doDisconnect()) {
2957 LOG(EXTENSION_LOG_WARNING,
2958 "%s Disconnecting pending connection\n", rv->logHeader());
2964 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2965 // Register the ON_DISCONNECT callback
2966 registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2967 // Register the ON_DELETE_BUCKET callback
2968 registerEngineCallback(ON_DELETE_BUCKET, EvpHandleDeleteBucket, this);
2971 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2976 TapProducer *connection = getTapProducer(cookie);
2978 LOG(EXTENSION_LOG_WARNING,
2979 "Unable to process tap ack. No producer found\n");
2980 return ENGINE_DISCONNECT;
2983 return connection->processAck(seqno, status, key);
2986 ENGINE_ERROR_CODE EventuallyPersistentEngine::memoryCondition() {
2987 // Do we think it's possible we could free something?
2988 bool haveEvidenceWeCanFreeMemory =
2989 (stats.getMaxDataSize() > stats.memOverhead->load());
2990 if (haveEvidenceWeCanFreeMemory) {
2991 // Look for more evidence by seeing if we have resident items.
2992 VBucketCountVisitor countVisitor(vbucket_state_active);
2993 kvBucket->visit(countVisitor);
2995 haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
2996 countVisitor.getNumItems();
2998 if (haveEvidenceWeCanFreeMemory) {
2999 ++stats.tmp_oom_errors;
3000 // Wake up the item pager task as memory usage
3001 // seems to have exceeded high water mark
3002 getKVBucket()->wakeUpItemPager();
3003 return ENGINE_TMPFAIL;
3006 return ENGINE_ENOMEM;
3010 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
3014 auto bfv = std::make_unique<BackFillVisitor>(
3015 this, *tapConnMap, tc, backfillVBFilter);
3016 getKVBucket()->visit(std::move(bfv),
3018 TaskId::BackfillVisitorTask,
3022 void VBucketCountAggregator::visitBucket(VBucketPtr &vb) {
3023 std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3024 it = visitorMap.find(vb->getState());
3025 if ( it != visitorMap.end() ) {
3026 it->second->visitBucket(vb);
3030 void VBucketCountAggregator::addVisitor(VBucketCountVisitor* visitor) {
3031 visitorMap[visitor->getVBucketState()] = visitor;
3034 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3035 ADD_STAT add_stat) {
3037 configuration.addStats(add_stat, cookie);
3039 EPStats &epstats = getEpStats();
3040 add_casted_stat("ep_version", VERSION, add_stat, cookie);
3041 add_casted_stat("ep_storage_age",
3042 epstats.dirtyAge, add_stat, cookie);
3043 add_casted_stat("ep_storage_age_highwat",
3044 epstats.dirtyAgeHighWat, add_stat, cookie);
3045 add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3048 if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3049 add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3050 } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3051 add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3054 add_casted_stat("ep_total_enqueued",
3055 epstats.totalEnqueued, add_stat, cookie);
3056 add_casted_stat("ep_expired_access", epstats.expired_access,
3058 add_casted_stat("ep_expired_compactor", epstats.expired_compactor,
3060 add_casted_stat("ep_expired_pager", epstats.expired_pager,
3062 add_casted_stat("ep_queue_size",
3063 epstats.diskQueueSize, add_stat, cookie);
3064 add_casted_stat("ep_diskqueue_items",
3065 epstats.diskQueueSize, add_stat, cookie);
3066 add_casted_stat("ep_vb_backfill_queue_size",
3067 epstats.vbBackfillQueueSize,
3070 auto* flusher = kvBucket->getFlusher(EP_PRIMARY_SHARD);
3072 add_casted_stat("ep_commit_num", epstats.flusherCommits,
3074 add_casted_stat("ep_commit_time",
3075 epstats.commit_time, add_stat, cookie);
3076 add_casted_stat("ep_commit_time_total",
3077 epstats.cumulativeCommitTime, add_stat, cookie);
3078 add_casted_stat("ep_item_begin_failed",
3079 epstats.beginFailed, add_stat, cookie);
3080 add_casted_stat("ep_item_commit_failed",
3081 epstats.commitFailed, add_stat, cookie);
3082 add_casted_stat("ep_item_flush_expired",
3083 epstats.flushExpired, add_stat, cookie);
3084 add_casted_stat("ep_item_flush_failed",
3085 epstats.flushFailed, add_stat, cookie);
3086 add_casted_stat("ep_flusher_state",
3087 flusher->stateName(), add_stat, cookie);
3088 add_casted_stat("ep_flusher_todo",
3089 epstats.flusher_todo, add_stat, cookie);
3090 add_casted_stat("ep_total_persisted",
3091 epstats.totalPersisted, add_stat, cookie);
3092 add_casted_stat("ep_uncommitted_items",
3093 epstats.flusher_todo, add_stat, cookie);
3094 add_casted_stat("ep_chk_persistence_timeout",
3095 VBucket::getCheckpointFlushTimeout(),
3099 add_casted_stat("ep_vbucket_del",
3100 epstats.vbucketDeletions, add_stat, cookie);
3101 add_casted_stat("ep_vbucket_del_fail",
3102 epstats.vbucketDeletionFail, add_stat, cookie);
3103 add_casted_stat("ep_flush_duration_total",
3104 epstats.cumulativeFlushTime, add_stat, cookie);
3106 kvBucket->getAggregatedVBucketStats(cookie, add_stat);
3108 kvBucket->getFileStats(cookie, add_stat);
3110 add_casted_stat("ep_persist_vbstate_total",
3111 epstats.totalPersistVBState, add_stat, cookie);
3113 size_t memUsed = stats.getTotalMemoryUsed();
3114 add_casted_stat("mem_used", memUsed, add_stat, cookie);
3115 add_casted_stat("ep_mem_low_wat_percent", stats.mem_low_wat_percent,
3117 add_casted_stat("ep_mem_high_wat_percent", stats.mem_high_wat_percent,
3119 add_casted_stat("bytes", memUsed, add_stat, cookie);
3120 add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3121 add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3122 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3123 add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3125 add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3127 add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3128 add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3130 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3131 add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3133 add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3135 add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3136 add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3137 add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3139 add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3140 add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3142 add_casted_stat("ep_mem_tracker_enabled", stats.memoryTrackerEnabled,
3144 add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3146 add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3148 add_casted_stat("ep_bg_remaining_items", epstats.numRemainingBgItems,
3150 add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3152 add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3154 add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3156 add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3158 add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3160 add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3162 add_casted_stat("ep_items_rm_from_checkpoints",
3163 epstats.itemsRemovedFromCheckpoints,
3165 add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3167 add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3169 add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3172 add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3173 add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3175 add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3177 add_casted_stat("ep_pending_ops_max_duration",
3178 epstats.pendingOpsMaxDuration,
3181 add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3183 add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3186 size_t vbDeletions = epstats.vbucketDeletions.load();
3187 if (vbDeletions > 0) {
3188 add_casted_stat("ep_vbucket_del_max_walltime",
3189 epstats.vbucketDelMaxWalltime,
3191 add_casted_stat("ep_vbucket_del_avg_walltime",
3192 epstats.vbucketDelTotWalltime / vbDeletions,
3196 size_t numBgOps = epstats.bgNumOperations.load();
3198 add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3200 add_casted_stat("ep_bg_min_wait",
3203 add_casted_stat("ep_bg_max_wait",
3206 add_casted_stat("ep_bg_wait_avg",
3207 epstats.bgWait / numBgOps,
3209 add_casted_stat("ep_bg_min_load",
3212 add_casted_stat("ep_bg_max_load",
3215 add_casted_stat("ep_bg_load_avg",
3216 epstats.bgLoad / numBgOps,
3218 add_casted_stat("ep_bg_wait",
3221 add_casted_stat("ep_bg_load",
3226 add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3228 add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3230 add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3232 add_casted_stat("ep_num_access_scanner_skips",
3233 epstats.accessScannerSkips, add_stat, cookie);
3234 add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3236 add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3239 if (kvBucket->isAccessScannerEnabled() && epstats.alogTime.load() != 0)
3243 hrtime_t alogTime = epstats.alogTime.load();
3244 if (cb_gmtime_r((time_t *)&alogTime, &alogTim) == -1) {
3245 add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3248 strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3249 add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3253 add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3257 if (kvBucket->isExpPagerEnabled()) {
3259 struct tm expPagerTim;
3260 hrtime_t expPagerTime = epstats.expPagerTime.load();
3261 if (cb_gmtime_r((time_t *)&expPagerTime, &expPagerTim) == -1) {
3262 add_casted_stat("ep_expiry_pager_task_time", "UNKNOWN", add_stat,
3265 strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &expPagerTim);
3266 add_casted_stat("ep_expiry_pager_task_time", timestr, add_stat,
<