2 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
4 * Copyright 2010 Couchbase, Inc
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
22 #include <memcached/engine.h>
23 #include <memcached/protocol_binary.h>
24 #include <memcached/util.h>
25 #include <platform/platform.h>
37 #include "ep_engine.h"
38 #include "failover-table.h"
41 #include "htresizer.h"
42 #include "memory_tracker.h"
43 #include "stats-info.h"
44 #define STATWRITER_NAMESPACE core_engine
45 #include "statwriter.h"
46 #undef STATWRITER_NAMESPACE
47 #include "tapthrottle.h"
48 #include "dcp-consumer.h"
49 #include "dcp-producer.h"
52 static ALLOCATOR_HOOKS_API *hooksApi;
53 SERVER_LOG_API* EventuallyPersistentEngine::loggerApi;
56 static size_t percentOf(size_t val, double percent) {
57 return static_cast<size_t>(static_cast<double>(val) * percent);
61 * Helper function to avoid typing in the long cast all over the place
62 * @param handle pointer to the engine
63 * @return the engine as a class
65 static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
67 EventuallyPersistentEngine* ret;
68 ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
69 ObjectRegistry::onSwitchThread(ret);
73 static inline void releaseHandle(ENGINE_HANDLE* handle) {
75 ObjectRegistry::onSwitchThread(NULL);
80 * Call the response callback and return the appropriate value so that
81 * the core knows what to do..
83 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
85 const void *ext, uint8_t extlen,
86 const void *body, uint32_t bodylen,
87 uint8_t datatype, uint16_t status,
88 uint64_t cas, const void *cookie)
90 ENGINE_ERROR_CODE rv = ENGINE_FAILED;
91 EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
92 if (response(key, keylen, ext, extlen, body, bodylen, datatype,
93 status, cas, cookie)) {
96 ObjectRegistry::onSwitchThread(e);
100 template <typename T>
101 static void validate(T v, T l, T h) {
102 if (v < l || v > h) {
103 throw std::runtime_error("Value out of range.");
108 static void checkNumeric(const char* str) {
113 for (; str[i]; i++) {
115 if (!isdigit(str[i])) {
116 throw std::runtime_error("Value is not numeric");
121 // The Engine API specifies C linkage for the functions..
124 static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
126 engine_info* info = getHandle(handle)->getInfo();
127 releaseHandle(handle);
131 static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
132 const char* config_str)
134 ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
135 releaseHandle(handle);
139 static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
141 getHandle(handle)->destroy(force);
142 delete getHandle(handle);
146 static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
153 const rel_time_t exptime,
156 if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
157 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
159 return ENGINE_EINVAL;
161 ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
168 releaseHandle(handle);
172 static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
178 mutation_descr_t *mut_info)
180 ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
183 releaseHandle(handle);
187 static void EvpItemRelease(ENGINE_HANDLE* handle,
191 getHandle(handle)->itemRelease(cookie, itm);
192 releaseHandle(handle);
195 static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
202 ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
203 nkey, vbucket, true);
204 releaseHandle(handle);
208 static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
210 const char* stat_key,
214 ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
218 releaseHandle(handle);
222 static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
226 ENGINE_STORE_OPERATION operation,
229 ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
232 releaseHandle(handle);
236 static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
240 const bool increment,
242 const uint64_t delta,
243 const uint64_t initial,
244 const rel_time_t exptime,
250 if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
251 if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
252 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
255 LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
256 "operations on compressed data!");
258 return ENGINE_EINVAL;
260 ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
269 releaseHandle(handle);
273 static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
274 const void* cookie, time_t when)
276 ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
277 releaseHandle(handle);
281 static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
283 getHandle(handle)->resetStats();
284 releaseHandle(handle);
287 static protocol_binary_response_status stopFlusher(
288 EventuallyPersistentEngine *e,
291 return e->stopFlusher(msg, msg_size);
294 static protocol_binary_response_status startFlusher(
295 EventuallyPersistentEngine *e,
298 return e->startFlusher(msg, msg_size);
301 static protocol_binary_response_status setTapParam(
302 EventuallyPersistentEngine *e,
305 const char **msg, size_t *) {
306 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
310 if (strcmp(keyz, "tap_keepalive") == 0) {
312 validate(v, 0, MAX_TAP_KEEP_ALIVE);
313 e->setTapKeepAlive(static_cast<uint32_t>(v));
314 } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
316 e->getConfiguration().setTapThrottleThreshold(v);
317 } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
319 e->getConfiguration().setTapThrottleQueueCap(v);
320 } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
322 e->getConfiguration().setTapThrottleCapPcnt(v);
324 *msg = "Unknown config param";
325 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
327 } catch(std::runtime_error &) {
328 *msg = "Value out of range.";
329 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
335 static protocol_binary_response_status setCheckpointParam(
336 EventuallyPersistentEngine *e,
341 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
345 if (strcmp(keyz, "chk_max_items") == 0) {
347 validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
348 e->getConfiguration().setChkMaxItems(v);
349 } else if (strcmp(keyz, "chk_period") == 0) {
351 validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
352 e->getConfiguration().setChkPeriod(v);
353 } else if (strcmp(keyz, "max_checkpoints") == 0) {
355 validate(v, DEFAULT_MAX_CHECKPOINTS,
356 MAX_CHECKPOINTS_UPPER_BOUND);
357 e->getConfiguration().setMaxCheckpoints(v);
358 } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
359 if (strcmp(valz, "true") == 0) {
360 e->getConfiguration().setItemNumBasedNewChk(true);
362 e->getConfiguration().setItemNumBasedNewChk(false);
364 } else if (strcmp(keyz, "keep_closed_chks") == 0) {
365 if (strcmp(valz, "true") == 0) {
366 e->getConfiguration().setKeepClosedChks(true);
368 e->getConfiguration().setKeepClosedChks(false);
370 } else if (strcmp(keyz, "enable_chk_merge") == 0) {
371 if (strcmp(valz, "true") == 0) {
372 e->getConfiguration().setEnableChkMerge(true);
374 e->getConfiguration().setEnableChkMerge(false);
377 *msg = "Unknown config param";
378 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
380 } catch(std::runtime_error &) {
381 *msg = "Value out of range.";
382 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
388 static protocol_binary_response_status setFlushParam(
389 EventuallyPersistentEngine *e,
394 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
396 // Handle the actual mutation.
399 if (strcmp(keyz, "bg_fetch_delay") == 0) {
401 e->getConfiguration().setBgFetchDelay(v);
402 } else if (strcmp(keyz, "flushall_enabled") == 0) {
403 if (strcmp(valz, "true") == 0) {
404 e->getConfiguration().setFlushallEnabled(true);
405 } else if(strcmp(valz, "false") == 0) {
406 e->getConfiguration().setFlushallEnabled(false);
408 throw std::runtime_error("value out of range.");
410 } else if (strcmp(keyz, "max_size") == 0) {
413 uint64_t vsize = strtoull(valz, &ptr, 10);
414 validate(vsize, static_cast<uint64_t>(0),
415 std::numeric_limits<uint64_t>::max());
416 e->getConfiguration().setMaxSize(vsize);
417 e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
418 e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
419 } else if (strcmp(keyz, "mem_low_wat") == 0) {
422 uint64_t vsize = strtoull(valz, &ptr, 10);
423 validate(vsize, static_cast<uint64_t>(0),
424 std::numeric_limits<uint64_t>::max());
425 e->getConfiguration().setMemLowWat(vsize);
426 } else if (strcmp(keyz, "mem_high_wat") == 0) {
429 uint64_t vsize = strtoull(valz, &ptr, 10);
430 validate(vsize, static_cast<uint64_t>(0),
431 std::numeric_limits<uint64_t>::max());
432 e->getConfiguration().setMemHighWat(vsize);
433 } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
436 e->getConfiguration().setBackfillMemThreshold(v);
437 } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
440 e->getConfiguration().setCompactionExpMemThreshold(v);
441 } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
444 e->getConfiguration().setMutationMemThreshold(v);
445 } else if (strcmp(keyz, "timing_log") == 0) {
446 EPStats &stats = e->getEpStats();
447 std::ostream *old = stats.timingLog;
448 stats.timingLog = NULL;
450 if (strcmp(valz, "off") == 0) {
451 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
453 std::ofstream *tmp(new std::ofstream(valz));
455 LOG(EXTENSION_LOG_INFO,
456 "Logging detailed timings to ``%s''.", valz);
457 stats.timingLog = tmp;
459 LOG(EXTENSION_LOG_WARNING,
460 "Error setting detailed timing log to ``%s'': %s",
461 valz, strerror(errno));
465 } else if (strcmp(keyz, "exp_pager_stime") == 0) {
468 uint64_t vsize = strtoull(valz, &ptr, 10);
469 validate(vsize, static_cast<uint64_t>(0),
470 std::numeric_limits<uint64_t>::max());
471 e->getConfiguration().setExpPagerStime((size_t)vsize);
472 } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
473 if (strcmp(valz, "true") == 0) {
474 e->getConfiguration().setAccessScannerEnabled(true);
475 } else if (strcmp(valz, "false") == 0) {
476 e->getConfiguration().setAccessScannerEnabled(false);
478 throw std::runtime_error("Value expected: true/false.");
480 } else if (strcmp(keyz, "alog_sleep_time") == 0) {
482 e->getConfiguration().setAlogSleepTime(v);
483 } else if (strcmp(keyz, "alog_task_time") == 0) {
485 e->getConfiguration().setAlogTaskTime(v);
486 } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
488 e->getConfiguration().setPagerActiveVbPcnt(v);
489 } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
491 validate(v, 0, std::numeric_limits<int>::max());
492 e->getConfiguration().setWarmupMinMemoryThreshold(v);
493 } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
495 validate(v, 0, std::numeric_limits<int>::max());
496 e->getConfiguration().setWarmupMinItemsThreshold(v);
497 } else if (strcmp(keyz, "max_num_readers") == 0) {
499 validate(v, 0, std::numeric_limits<int>::max());
500 e->getConfiguration().setMaxNumReaders(v);
501 ExecutorPool::get()->setMaxReaders(v);
502 } else if (strcmp(keyz, "max_num_writers") == 0) {
504 validate(v, 0, std::numeric_limits<int>::max());
505 e->getConfiguration().setMaxNumWriters(v);
506 ExecutorPool::get()->setMaxWriters(v);
507 } else if (strcmp(keyz, "max_num_auxio") == 0) {
509 validate(v, 0, std::numeric_limits<int>::max());
510 e->getConfiguration().setMaxNumAuxio(v);
511 ExecutorPool::get()->setMaxAuxIO(v);
512 } else if (strcmp(keyz, "max_num_nonio") == 0) {
514 validate(v, 0, std::numeric_limits<int>::max());
515 e->getConfiguration().setMaxNumNonio(v);
516 ExecutorPool::get()->setMaxNonIO(v);
517 } else if (strcmp(keyz, "bfilter_enabled") == 0) {
518 if (strcmp(valz, "true") == 0) {
519 e->getConfiguration().setBfilterEnabled(true);
520 } else if (strcmp(valz, "false") == 0) {
521 e->getConfiguration().setBfilterEnabled(false);
523 throw std::runtime_error("Value expected: true/false.");
525 } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
526 float val = atof(valz);
527 if (val >= 0.0 && val <= 1.0) {
528 e->getConfiguration().setBfilterResidencyThreshold(val);
530 throw std::runtime_error("Value out of range [0.0-1.0].");
532 } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
533 if (strcmp(valz, "true") == 0) {
534 e->getConfiguration().setDefragmenterEnabled(true);
536 e->getConfiguration().setDefragmenterEnabled(false);
538 } else if (strcmp(keyz, "defragmenter_interval") == 0) {
540 validate(v, 1, std::numeric_limits<int>::max());
541 e->getConfiguration().setDefragmenterInterval(v);
542 } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
544 validate(v, 0, std::numeric_limits<int>::max());
545 e->getConfiguration().setDefragmenterAgeThreshold(v);
546 } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
548 validate(v, 1, std::numeric_limits<int>::max());
549 e->getConfiguration().setDefragmenterChunkDuration(v);
550 } else if (strcmp(keyz, "defragmenter_run") == 0) {
551 e->runDefragmenterTask();
552 } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
554 validate(v, 1, std::numeric_limits<int>::max());
555 e->getConfiguration().setCompactionWriteQueueCap(v);
557 *msg = "Unknown config param";
558 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
560 } catch(std::runtime_error& ex) {
561 *msg = "Value out of range.";
562 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
568 static protocol_binary_response_status setDcpParam(
569 EventuallyPersistentEngine *e,
574 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
577 if (strcmp(keyz, "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
578 size_t v = atoi(valz);
580 validate(v, size_t(1), std::numeric_limits<size_t>::max());
581 e->getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(v);
582 } else if (strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") == 0) {
583 size_t v = atoi(valz);
585 validate(v, size_t(1), std::numeric_limits<size_t>::max());
586 e->getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(v);
588 *msg = "Unknown config param";
589 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
591 } catch (std::runtime_error& ex) {
592 *msg = "Value out of range.";
593 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
599 static protocol_binary_response_status evictKey(
600 EventuallyPersistentEngine *e,
601 protocol_binary_request_header
605 protocol_binary_request_no_extras *req =
606 (protocol_binary_request_no_extras*)request;
611 int keylen = ntohs(req->message.header.request.keylen);
612 if (keylen >= (int)sizeof(keyz)) {
613 *msg = "Key is too large.";
614 return PROTOCOL_BINARY_RESPONSE_EINVAL;
616 memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
619 uint16_t vbucket = ntohs(request->request.vbucket);
621 std::string key(keyz, keylen);
623 LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
626 protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
628 if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
629 rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
630 if (e->isDegradedMode()) {
631 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
637 static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
638 protocol_binary_request_header *req,
643 protocol_binary_response_status *res) {
645 uint8_t extlen = req->request.extlen;
646 if (extlen != 0 && extlen != 4) {
647 *msg = "Invalid packet format (extlen may be 0 or 4)";
648 *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
649 return ENGINE_EINVAL;
652 protocol_binary_request_getl *grequest =
653 (protocol_binary_request_getl*)req;
654 *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
656 const char *keyp = reinterpret_cast<const char*>(req->bytes);
657 keyp += sizeof(req->bytes) + extlen;
658 std::string key(keyp, ntohs(req->request.keylen));
659 uint16_t vbucket = ntohs(req->request.vbucket);
661 RememberingCallback<GetValue> getCb;
662 uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
663 uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
664 uint32_t lockTimeout = default_timeout;
666 lockTimeout = ntohl(grequest->message.body.expiration);
669 if (lockTimeout > max_timeout || lockTimeout < 1) {
670 LOG(EXTENSION_LOG_WARNING,
671 "Illegal value for lock timeout specified"
672 " %u. Using default value: %u", lockTimeout, default_timeout);
673 lockTimeout = default_timeout;
676 bool gotLock = e->getLocked(key, vbucket, getCb,
678 lockTimeout, cookie);
680 getCb.waitForValue();
681 ENGINE_ERROR_CODE rv = getCb.val.getStatus();
683 if (rv == ENGINE_SUCCESS) {
684 *itm = getCb.val.getValue();
685 ++(e->getEpStats().numOpsGet);
686 } else if (rv == ENGINE_EWOULDBLOCK) {
688 // need to wait for value
690 } else if (rv == ENGINE_NOT_MY_VBUCKET) {
691 *msg = "That's not my bucket.";
692 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
693 return ENGINE_NOT_MY_VBUCKET;
694 } else if (!gotLock){
696 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
697 return ENGINE_TMPFAIL;
699 if (e->isDegradedMode()) {
700 *msg = "LOCK_TMP_ERROR";
701 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
702 return ENGINE_TMPFAIL;
706 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
707 return ENGINE_KEY_ENOENT;
713 static protocol_binary_response_status unlockKey(
714 EventuallyPersistentEngine *e,
715 protocol_binary_request_header
720 protocol_binary_request_no_extras *req =
721 (protocol_binary_request_no_extras*)request;
723 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
727 int keylen = ntohs(req->message.header.request.keylen);
728 if (keylen >= (int)sizeof(keyz)) {
729 *msg = "Key is too large.";
730 return PROTOCOL_BINARY_RESPONSE_EINVAL;
733 memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
736 uint16_t vbucket = ntohs(request->request.vbucket);
737 std::string key(keyz, keylen);
739 LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
741 RememberingCallback<GetValue> getCb;
742 uint64_t cas = ntohll(request->request.cas);
744 ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
747 if (rv == ENGINE_SUCCESS) {
749 } else if (rv == ENGINE_TMPFAIL){
750 *msg = "UNLOCK_ERROR";
751 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
753 if (e->isDegradedMode()) {
754 *msg = "LOCK_TMP_ERROR";
755 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
758 RCPtr<VBucket> vb = e->getVBucket(vbucket);
760 *msg = "That's not my bucket.";
761 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
764 res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
770 static protocol_binary_response_status setParam(
771 EventuallyPersistentEngine *e,
772 protocol_binary_request_set_param
777 size_t keylen = ntohs(req->message.header.request.keylen);
778 uint8_t extlen = req->message.header.request.extlen;
779 size_t vallen = ntohl(req->message.header.request.bodylen);
780 protocol_binary_engine_param_t paramtype =
781 static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
783 if (keylen == 0 || (vallen - keylen - extlen) == 0) {
784 return PROTOCOL_BINARY_RESPONSE_EINVAL;
787 const char *keyp = reinterpret_cast<const char*>(req->bytes)
788 + sizeof(req->bytes);
789 const char *valuep = keyp + keylen;
790 vallen -= (keylen + extlen);
796 if (keylen >= sizeof(keyz)) {
797 *msg = "Key is too large.";
798 return PROTOCOL_BINARY_RESPONSE_EINVAL;
800 memcpy(keyz, keyp, keylen);
804 if (vallen >= sizeof(valz)) {
805 *msg = "Value is too large.";
806 return PROTOCOL_BINARY_RESPONSE_EINVAL;
808 memcpy(valz, valuep, vallen);
811 protocol_binary_response_status rv;
814 case protocol_binary_engine_param_flush:
815 rv = setFlushParam(e, keyz, valz, msg, msg_size);
817 case protocol_binary_engine_param_tap:
818 rv = setTapParam(e, keyz, valz, msg, msg_size);
820 case protocol_binary_engine_param_checkpoint:
821 rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
823 case protocol_binary_engine_param_dcp:
824 rv = setDcpParam(e, keyz, valz, msg, msg_size);
827 rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
833 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
835 protocol_binary_request_header *request,
836 ADD_RESPONSE response) {
837 protocol_binary_request_get_vbucket *req =
838 reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
841 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
842 RCPtr<VBucket> vb = e->getVBucket(vbucket);
844 LockHolder lh(e->clusterConfig.lock);
845 return sendResponse(response, NULL, 0, NULL, 0,
846 e->clusterConfig.config,
847 e->clusterConfig.len,
848 PROTOCOL_BINARY_RAW_BYTES,
849 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
852 vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
853 return sendResponse(response, NULL, 0, NULL, 0, &state,
855 PROTOCOL_BINARY_RAW_BYTES,
856 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
860 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
862 protocol_binary_request_header *request,
863 ADD_RESPONSE response) {
865 protocol_binary_request_set_vbucket *req =
866 reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
868 uint64_t cas = ntohll(req->message.header.request.cas);
870 size_t bodylen = ntohl(req->message.header.request.bodylen)
871 - ntohs(req->message.header.request.keylen);
872 if (bodylen != sizeof(vbucket_state_t)) {
873 const std::string msg("Incorrect packet format");
874 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
875 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
876 PROTOCOL_BINARY_RESPONSE_EINVAL,
880 vbucket_state_t state;
881 memcpy(&state, &req->message.body.state, sizeof(state));
882 state = static_cast<vbucket_state_t>(ntohl(state));
884 if (!is_valid_vbucket_state_t(state)) {
885 const std::string msg("Invalid vbucket state");
886 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
887 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
888 PROTOCOL_BINARY_RESPONSE_EINVAL,
892 uint16_t vb = ntohs(req->message.header.request.vbucket);
893 if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
894 const std::string msg("VBucket number too big");
895 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
896 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
897 PROTOCOL_BINARY_RESPONSE_ERANGE,
900 return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
901 PROTOCOL_BINARY_RAW_BYTES,
902 PROTOCOL_BINARY_RESPONSE_SUCCESS,
906 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
908 protocol_binary_request_header *req,
909 ADD_RESPONSE response) {
911 uint64_t cas = ntohll(req->request.cas);
913 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
914 uint16_t vbucket = ntohs(req->request.vbucket);
916 std::string msg = "";
917 if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
918 msg = "Incorrect packet format.";
919 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
921 PROTOCOL_BINARY_RAW_BYTES,
922 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
926 uint32_t bodylen = ntohl(req->request.bodylen);
928 const char* ptr = reinterpret_cast<const char*>(req->bytes) +
930 if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
935 ENGINE_ERROR_CODE err;
936 void* es = e->getEngineSpecific(cookie);
939 err = e->deleteVBucket(vbucket, cookie);
940 e->storeEngineSpecific(cookie, e);
942 e->storeEngineSpecific(cookie, NULL);
943 LOG(EXTENSION_LOG_INFO,
944 "Completed sync deletion of vbucket %u",
946 err = ENGINE_SUCCESS;
949 err = e->deleteVBucket(vbucket);
953 LOG(EXTENSION_LOG_WARNING,
954 "Deletion of vbucket %d was completed.", vbucket);
956 case ENGINE_NOT_MY_VBUCKET:
957 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
958 "because the vbucket doesn't exist!!!", vbucket);
959 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
962 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
963 "because the vbucket is not in a dead state\n", vbucket);
964 msg = "Failed to delete vbucket. Must be in the dead state.";
965 res = PROTOCOL_BINARY_RESPONSE_EINVAL;
967 case ENGINE_EWOULDBLOCK:
968 LOG(EXTENSION_LOG_WARNING, "Request for vbucket %d deletion is in"
969 " EWOULDBLOCK until the database file is removed from disk",
971 e->storeEngineSpecific(cookie, req);
972 return ENGINE_EWOULDBLOCK;
974 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
975 "because of unknown reasons\n", vbucket);
976 msg = "Failed to delete vbucket. Unknown reason.";
977 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
980 if (err != ENGINE_NOT_MY_VBUCKET) {
981 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
982 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
985 LockHolder lh(e->clusterConfig.lock);
986 return sendResponse(response, NULL, 0, NULL, 0,
987 e->clusterConfig.config,
988 e->clusterConfig.len,
989 PROTOCOL_BINARY_RAW_BYTES,
995 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
996 protocol_binary_request_header *request,
1000 protocol_binary_response_status *res) {
1001 EventuallyPersistentStore *eps = e->getEpStore();
1002 protocol_binary_request_no_extras *req =
1003 (protocol_binary_request_no_extras*)request;
1004 int keylen = ntohs(req->message.header.request.keylen);
1005 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1006 ENGINE_ERROR_CODE error_code;
1007 std::string keystr(((char *)request) + sizeof(req->message.header),
1010 GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
1012 if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
1013 if (error_code == ENGINE_NOT_MY_VBUCKET) {
1014 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1016 } else if (error_code == ENGINE_TMPFAIL) {
1018 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1023 *it = rv.getValue();
1024 *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1026 ++(e->getEpStats().numOpsGet);
1027 return ENGINE_SUCCESS;
1030 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
1032 protocol_binary_request_compact_db *req,
1033 ADD_RESPONSE response) {
1035 std::string msg = "";
1036 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1037 compaction_ctx compactreq;
1038 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1039 uint64_t cas = ntohll(req->message.header.request.cas);
1041 if (ntohs(req->message.header.request.keylen) > 0 ||
1042 req->message.header.request.extlen != 24) {
1043 LOG(EXTENSION_LOG_WARNING,
1044 "Compaction of vbucket %d received bad ext/key len %d/%d.",
1045 vbucket, req->message.header.request.extlen,
1046 ntohs(req->message.header.request.keylen));
1047 msg = "Incorrect packet format.";
1048 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1050 PROTOCOL_BINARY_RAW_BYTES,
1051 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
1053 EPStats &stats = e->getEpStats();
1054 compactreq.max_purged_seq = 0;
1055 compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
1056 compactreq.purge_before_seq =
1057 ntohll(req->message.body.purge_before_seq);
1058 compactreq.drop_deletes = req->message.body.drop_deletes;
1059 compactreq.bfcb = NULL;
1061 ENGINE_ERROR_CODE err;
1062 void* es = e->getEngineSpecific(cookie);
1064 ++stats.pendingCompactions;
1065 e->storeEngineSpecific(cookie, e);
1066 err = e->compactDB(vbucket, compactreq, cookie);
1068 e->storeEngineSpecific(cookie, NULL);
1069 err = ENGINE_SUCCESS;
1073 case ENGINE_SUCCESS:
1074 LOG(EXTENSION_LOG_INFO,
1075 "Compaction of vbucket %d completed.", vbucket);
1077 case ENGINE_NOT_MY_VBUCKET:
1078 --stats.pendingCompactions;
1079 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1080 "because the vbucket doesn't exist!!!", vbucket);
1081 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1083 case ENGINE_EWOULDBLOCK:
1084 LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
1085 "in EWOULDBLOCK state until the database file is "
1086 "compacted on disk",
1088 e->storeEngineSpecific(cookie, req);
1089 return ENGINE_EWOULDBLOCK;
1090 case ENGINE_TMPFAIL:
1091 LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
1092 " a temporary failure and may need to be retried",
1094 msg = "Temporary failure in compacting vbucket.";
1095 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1098 --stats.pendingCompactions;
1099 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1100 "because of unknown reasons\n", vbucket);
1101 msg = "Failed to compact vbucket. Unknown reason.";
1102 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1106 if (err != ENGINE_NOT_MY_VBUCKET) {
1107 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1108 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1111 LockHolder lh(e->clusterConfig.lock);
1112 return sendResponse(response, NULL, 0, NULL, 0,
1113 e->clusterConfig.config,
1114 e->clusterConfig.len,
1115 PROTOCOL_BINARY_RAW_BYTES,
1120 static ENGINE_ERROR_CODE processUnknownCommand(
1121 EventuallyPersistentEngine *h,
1123 protocol_binary_request_header *request,
1124 ADD_RESPONSE response)
1126 protocol_binary_response_status res =
1127 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1128 const char *msg = NULL;
1129 size_t msg_size = 0;
1132 EPStats &stats = h->getEpStats();
1133 ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1136 * Session validation
1137 * (For ns_server commands only)
1139 switch (request->request.opcode) {
1140 case PROTOCOL_BINARY_CMD_SET_PARAM:
1141 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1142 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1143 case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1144 case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1145 case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1146 case PROTOCOL_BINARY_CMD_COMPACT_DB:
1148 if (h->getEngineSpecific(cookie) == NULL) {
1149 uint64_t cas = ntohll(request->request.cas);
1150 if (!h->validateSessionCas(cas)) {
1151 const std::string message("Invalid session token");
1152 return sendResponse(response, NULL, 0, NULL, 0,
1153 message.c_str(), message.length(),
1154 PROTOCOL_BINARY_RAW_BYTES,
1155 PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1165 switch (request->request.opcode) {
1166 case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1167 return h->getAllVBucketSequenceNumbers(cookie, request, response);
1169 case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1171 BlockTimer timer(&stats.getVbucketCmdHisto);
1172 rv = getVBucket(h, cookie, request, response);
1175 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1177 BlockTimer timer(&stats.delVbucketCmdHisto);
1178 rv = delVBucket(h, cookie, request, response);
1179 if (rv != ENGINE_EWOULDBLOCK) {
1180 h->decrementSessionCtr();
1181 h->storeEngineSpecific(cookie, NULL);
1185 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1187 BlockTimer timer(&stats.setVbucketCmdHisto);
1188 rv = setVBucket(h, cookie, request, response);
1189 h->decrementSessionCtr();
1192 case PROTOCOL_BINARY_CMD_TOUCH:
1193 case PROTOCOL_BINARY_CMD_GAT:
1194 case PROTOCOL_BINARY_CMD_GATQ:
1196 rv = h->touch(cookie, request, response);
1199 case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1200 res = stopFlusher(h, &msg, &msg_size);
1202 case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1203 res = startFlusher(h, &msg, &msg_size);
1205 case PROTOCOL_BINARY_CMD_SET_PARAM:
1207 reinterpret_cast<protocol_binary_request_set_param*>(request),
1209 h->decrementSessionCtr();
1211 case PROTOCOL_BINARY_CMD_EVICT_KEY:
1212 res = evictKey(h, request, &msg, &msg_size);
1214 case PROTOCOL_BINARY_CMD_GET_LOCKED:
1215 rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1216 if (rv == ENGINE_EWOULDBLOCK) {
1217 // we dont have the value for the item yet
1221 case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
1222 res = unlockKey(h, request, &msg, &msg_size);
1224 case PROTOCOL_BINARY_CMD_OBSERVE:
1225 return h->observe(cookie, request, response);
1226 case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1227 return h->observe_seqno(cookie, request, response);
1228 case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1230 rv = h->deregisterTapClient(cookie, request, response);
1231 h->decrementSessionCtr();
1234 case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN:
1236 rv = h->resetReplicationChain(cookie, request, response);
1239 case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1241 rv = h->changeTapVBFilter(cookie, request, response);
1242 h->decrementSessionCtr();
1245 case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1246 case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1247 case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE:
1249 rv = h->handleCheckpointCmds(cookie, request, response);
1252 case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE:
1254 rv = h->handleSeqnoCmds(cookie, request, response);
1257 case PROTOCOL_BINARY_CMD_GET_META:
1258 case PROTOCOL_BINARY_CMD_GETQ_META:
1260 rv = h->getMeta(cookie,
1261 reinterpret_cast<protocol_binary_request_get_meta*>
1262 (request), response);
1265 case PROTOCOL_BINARY_CMD_SET_WITH_META:
1266 case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1267 case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1268 case PROTOCOL_BINARY_CMD_ADDQ_WITH_META:
1270 rv = h->setWithMeta(cookie,
1271 reinterpret_cast<protocol_binary_request_set_with_meta*>
1272 (request), response);
1275 case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1276 case PROTOCOL_BINARY_CMD_DELQ_WITH_META:
1278 rv = h->deleteWithMeta(cookie,
1279 reinterpret_cast<protocol_binary_request_delete_with_meta*>
1280 (request), response);
1283 case PROTOCOL_BINARY_CMD_RETURN_META:
1285 return h->returnMeta(cookie,
1286 reinterpret_cast<protocol_binary_request_return_meta*>
1287 (request), response);
1289 case PROTOCOL_BINARY_CMD_GET_REPLICA:
1290 rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1291 if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1295 case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1296 case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC:
1298 rv = h->handleTrafficControlCmd(cookie, request, response);
1301 case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1303 rv = h->setClusterConfig(cookie,
1304 reinterpret_cast<protocol_binary_request_set_cluster_config*>
1305 (request), response);
1306 h->decrementSessionCtr();
1309 case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1310 return h->getClusterConfig(cookie,
1311 reinterpret_cast<protocol_binary_request_get_cluster_config*>
1312 (request), response);
1313 case PROTOCOL_BINARY_CMD_COMPACT_DB:
1315 rv = compactDB(h, cookie,
1316 (protocol_binary_request_compact_db*)(request),
1318 if (rv != ENGINE_EWOULDBLOCK) {
1319 h->decrementSessionCtr();
1320 h->storeEngineSpecific(cookie, NULL);
1324 case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
1326 if (request->request.extlen != 0 ||
1327 request->request.keylen != 0 ||
1328 request->request.bodylen != 0) {
1329 return ENGINE_EINVAL;
1331 return h->getRandomKey(cookie, response);
1335 return h->getAllKeys(cookie,
1336 reinterpret_cast<protocol_binary_request_get_keys*>
1337 (request), response);
1339 case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1341 return h->getAdjustedTime(cookie,
1342 reinterpret_cast<protocol_binary_request_get_adjusted_time*>
1343 (request), response);
1345 case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE:
1347 return h->setDriftCounterState(cookie,
1348 reinterpret_cast<protocol_binary_request_set_drift_counter_state*>
1349 (request), response);
1353 // Send a special response for getl since we don't want to send the key
1354 if (itm && request->request.opcode == PROTOCOL_BINARY_CMD_GET_LOCKED) {
1355 uint32_t flags = itm->getFlags();
1356 rv = sendResponse(response, NULL, 0, (const void *)&flags,
1358 static_cast<const void *>(itm->getData()),
1359 itm->getNBytes(), itm->getDataType(),
1360 static_cast<uint16_t>(res), itm->getCas(),
1364 const std::string &key = itm->getKey();
1365 uint32_t flags = itm->getFlags();
1366 rv = sendResponse(response, static_cast<const void *>(key.data()),
1368 (const void *)&flags, sizeof(uint32_t),
1369 static_cast<const void *>(itm->getData()),
1370 itm->getNBytes(), itm->getDataType(),
1371 static_cast<uint16_t>(res), itm->getCas(),
1374 } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1375 LockHolder lh(h->clusterConfig.lock);
1376 return sendResponse(response, NULL, 0, NULL, 0,
1377 h->clusterConfig.config,
1378 h->clusterConfig.len,
1379 PROTOCOL_BINARY_RAW_BYTES,
1380 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1383 msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1384 rv = sendResponse(response, NULL, 0, NULL, 0,
1385 msg, static_cast<uint16_t>(msg_size),
1386 PROTOCOL_BINARY_RAW_BYTES,
1387 static_cast<uint16_t>(res), 0, cookie);
1393 static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1395 protocol_binary_request_header
1397 ADD_RESPONSE response)
1399 ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1402 releaseHandle(handle);
1406 static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1407 item *itm, uint64_t cas) {
1408 static_cast<Item*>(itm)->setCas(cas);
1411 static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1413 void *engine_specific,
1417 tap_event_t tap_event,
1429 if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1430 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1432 return ENGINE_EINVAL;
1434 ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1438 (uint16_t)tap_event,
1444 releaseHandle(handle);
1448 static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1449 const void *cookie, item **itm,
1450 void **es, uint16_t *nes, uint8_t *ttl,
1451 uint16_t *flags, uint32_t *seqno,
1452 uint16_t *vbucket) {
1453 uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1457 releaseHandle(handle);
1458 return static_cast<tap_event_t>(tap_event);
1461 static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1466 const void* userdata,
1469 EventuallyPersistentEngine *h = getHandle(handle);
1470 TAP_ITERATOR iterator = NULL;
1472 std::string c(static_cast<const char*>(client), nclient);
1473 // Figure out what we want from the userdata before adding it to
1474 // the API to the handle
1475 if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1476 iterator = EvpTapIterator;
1479 releaseHandle(handle);
1484 static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1486 struct dcp_message_producers *producers)
1488 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1489 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1491 errCode = conn->step(producers);
1493 releaseHandle(handle);
1498 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1506 ENGINE_ERROR_CODE errCode;
1507 errCode = getHandle(handle)->dcpOpen(cookie, opaque, seqno, flags,
1509 releaseHandle(handle);
1513 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1519 ENGINE_ERROR_CODE errCode = getHandle(handle)->dcpAddStream(cookie,
1523 releaseHandle(handle);
1527 static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1532 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1533 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1535 errCode = conn->closeStream(opaque, vbucket);
1537 releaseHandle(handle);
1542 static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1547 uint64_t startSeqno,
1549 uint64_t vbucketUuid,
1550 uint64_t snapStartSeqno,
1551 uint64_t snapEndSeqno,
1552 uint64_t *rollbackSeqno,
1553 dcp_add_failover_log callback)
1555 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1556 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1558 errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1559 endSeqno, vbucketUuid, snapStartSeqno,
1560 snapEndSeqno, rollbackSeqno, callback);
1562 releaseHandle(handle);
1566 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1570 dcp_add_failover_log callback)
1572 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1573 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1575 errCode = conn->getFailoverLog(opaque, vbucket, callback);
1577 releaseHandle(handle);
1582 static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1588 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1589 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1591 errCode = conn->streamEnd(opaque, vbucket, flags);
1593 releaseHandle(handle);
1598 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1602 uint64_t start_seqno,
1606 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1607 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1609 errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1612 releaseHandle(handle);
1616 static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1629 uint32_t expiration,
1635 if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1636 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1638 return ENGINE_EINVAL;
1640 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1641 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1643 errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1644 vbucket, flags, datatype, lockTime,
1645 bySeqno, revSeqno, expiration,
1648 releaseHandle(handle);
1652 static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1664 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1665 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1667 errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1668 revSeqno, meta, nmeta);
1670 releaseHandle(handle);
1674 static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1686 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1687 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1689 errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1690 revSeqno, meta, nmeta);
1692 releaseHandle(handle);
1696 static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1701 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1702 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1704 errCode = conn->flushall(opaque, vbucket);
1706 releaseHandle(handle);
1710 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1714 vbucket_state_t state)
1716 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1717 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1719 errCode = conn->setVBucketState(opaque, vbucket, state);
1721 releaseHandle(handle);
1725 static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1728 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1729 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1731 errCode = conn->noop(opaque);
1733 releaseHandle(handle);
1737 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1741 uint32_t buffer_bytes) {
1742 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1743 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1745 errCode = conn->bufferAcknowledgement(opaque, vbucket,
1748 releaseHandle(handle);
1752 static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1759 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1760 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1762 errCode = conn->control(opaque, key, nkey, value, nvalue);
1764 releaseHandle(handle);
1768 static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1770 protocol_binary_response_header *response)
1772 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1773 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1775 errCode = conn->handleResponse(response);
1777 releaseHandle(handle);
1781 static void EvpHandleDisconnect(const void *cookie,
1782 ENGINE_EVENT_TYPE type,
1783 const void *event_data,
1784 const void *cb_data)
1786 cb_assert(type == ON_DISCONNECT);
1787 cb_assert(event_data == NULL);
1788 void *c = const_cast<void*>(cb_data);
1789 getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1790 releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1795 * The only public interface to the eventually persistance engine.
1796 * Allocate a new instance and initialize it
1797 * @param interface the highest interface the server supports (we only
1798 * support interface 1)
1799 * @param get_server_api callback function to get the server exported API
1801 * @param handle Where to return the new instance
1802 * @return ENGINE_SUCCESS on success
1804 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1805 GET_SERVER_API get_server_api,
1806 ENGINE_HANDLE **handle)
1808 SERVER_HANDLE_V1 *api = get_server_api();
1809 if (interface != 1 || api == NULL) {
1810 return ENGINE_ENOTSUP;
1813 hooksApi = api->alloc_hooks;
1814 EventuallyPersistentEngine::loggerApi = api->log;
1815 MemoryTracker::getInstance();
1816 ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1818 AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1820 ObjectRegistry::setStats(inital_tracking);
1821 EventuallyPersistentEngine *engine;
1822 engine = new EventuallyPersistentEngine(get_server_api);
1823 ObjectRegistry::setStats(NULL);
1825 if (engine == NULL) {
1826 return ENGINE_ENOMEM;
1829 if (MemoryTracker::trackingMemoryAllocations()) {
1830 engine->getEpStats().memoryTrackerEnabled.store(true);
1831 engine->getEpStats().totalMemory.store(inital_tracking->load());
1833 delete inital_tracking;
1835 initialize_time_functions(api->core);
1837 *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1839 return ENGINE_SUCCESS;
1843 This method is called prior to unloading of the shared-object.
1844 Global clean-up should be performed from this method.
1846 void destroy_engine() {
1847 ExecutorPool::shutdown();
1850 static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
1851 const item* itm, item_info *itm_info)
1853 const Item *it = reinterpret_cast<const Item*>(itm);
1854 EventuallyPersistentEngine *engine = getHandle(handle);
1855 if (itm_info->nvalue < 1) {
1858 itm_info->cas = it->getCas();
1861 RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(it->getVBucketId());
1864 itm_info->vbucket_uuid = vb->failovers->getLatestUUID();
1866 itm_info->vbucket_uuid = 0;
1869 releaseHandle(handle);
1871 itm_info->vbucket_uuid = 0;
1874 itm_info->seqno = it->getBySeqno();
1875 itm_info->exptime = it->getExptime();
1876 itm_info->nbytes = it->getNBytes();
1877 itm_info->datatype = it->getDataType();
1878 itm_info->flags = it->getFlags();
1879 itm_info->clsid = 0;
1880 itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1881 itm_info->nvalue = 1;
1882 itm_info->key = it->getKey().c_str();
1883 itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1884 itm_info->value[0].iov_len = it->getNBytes();
1888 static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1889 item* itm, const item_info *itm_info)
1891 Item *it = reinterpret_cast<Item*>(itm);
1895 it->setDataType(itm_info->datatype);
1899 static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1901 engine_get_vb_map_cb callback)
1903 EventuallyPersistentEngine *h = getHandle(handle);
1904 LockHolder lh(h->clusterConfig.lock);
1905 uint8_t *config = h->clusterConfig.config;
1906 uint32_t len = h->clusterConfig.len;
1907 releaseHandle(handle);
1908 return callback(cookie, config, len);
1913 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1916 if (EventuallyPersistentEngine::loggerApi != NULL) {
1917 EXTENSION_LOGGER_DESCRIPTOR* logger =
1918 (EXTENSION_LOGGER_DESCRIPTOR*)EventuallyPersistentEngine::loggerApi->get_logger();
1920 if (EventuallyPersistentEngine::loggerApi->get_level() <= severity) {
1921 EventuallyPersistentEngine *engine = ObjectRegistry::onSwitchThread(NULL, true);
1924 vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1926 logger->log(severity, NULL, "(%s) %s", engine->getName(),
1929 logger->log(severity, NULL, "(No Engine) %s", buffer);
1932 ObjectRegistry::onSwitchThread(engine);
1937 ALLOCATOR_HOOKS_API *getHooksApi(void) {
1941 EventuallyPersistentEngine::EventuallyPersistentEngine(
1942 GET_SERVER_API get_server_api) :
1943 clusterConfig(), epstore(NULL), workload(NULL),
1944 workloadPriority(NO_BUCKET_PRIORITY),
1945 tapThrottle(NULL), getServerApiFunc(get_server_api),
1946 dcpConnMap_(NULL), tapConnMap(NULL) ,tapConfig(NULL), checkpointConfig(NULL),
1947 trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1949 interface.interface = 1;
1950 ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1951 ENGINE_HANDLE_V1::initialize = EvpInitialize;
1952 ENGINE_HANDLE_V1::destroy = EvpDestroy;
1953 ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1954 ENGINE_HANDLE_V1::remove = EvpItemDelete;
1955 ENGINE_HANDLE_V1::release = EvpItemRelease;
1956 ENGINE_HANDLE_V1::get = EvpGet;
1957 ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1958 ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1959 ENGINE_HANDLE_V1::store = EvpStore;
1960 ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1961 ENGINE_HANDLE_V1::flush = EvpFlush;
1962 ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1963 ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1964 ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1965 ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1966 ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1967 ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1968 ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1969 ENGINE_HANDLE_V1::get_stats_struct = NULL;
1970 ENGINE_HANDLE_V1::aggregate_stats = NULL;
1973 ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1974 ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1975 ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1976 ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1977 ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1978 ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1979 ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1980 ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1981 ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1982 ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1983 ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1984 ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1985 ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1986 ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1987 ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1988 ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1989 ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1991 serverApi = getServerApiFunc();
1992 memset(&info, 0, sizeof(info));
1993 info.info.description = "EP engine v" VERSION;
1994 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1995 info.info.features[info.info.num_features++].feature =
1996 ENGINE_FEATURE_PERSISTENT_STORAGE;
1997 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1998 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
2001 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
2003 EventuallyPersistentEngine *epe =
2004 ObjectRegistry::onSwitchThread(NULL, true);
2005 ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
2006 ObjectRegistry::onSwitchThread(epe);
2010 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
2012 EventuallyPersistentEngine *epe =
2013 ObjectRegistry::onSwitchThread(NULL, true);
2014 ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
2015 ObjectRegistry::onSwitchThread(epe);
2019 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
2021 const void *cb_data) {
2022 EventuallyPersistentEngine *epe =
2023 ObjectRegistry::onSwitchThread(NULL, true);
2024 SERVER_CALLBACK_API *sapi = getServerApi()->callback;
2025 sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
2027 ObjectRegistry::onSwitchThread(epe);
2031 * A configuration value changed listener that responds to ep-engine
2032 * parameter changes by invoking engine-specific methods on
2033 * configuration change events.
2035 class EpEngineValueChangeListener : public ValueChangedListener {
2037 EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
2041 virtual void sizeValueChanged(const std::string &key, size_t value) {
2042 if (key.compare("getl_max_timeout") == 0) {
2043 engine.setGetlMaxTimeout(value);
2044 } else if (key.compare("getl_default_timeout") == 0) {
2045 engine.setGetlDefaultTimeout(value);
2046 } else if (key.compare("max_item_size") == 0) {
2047 engine.setMaxItemSize(value);
2051 virtual void booleanValueChanged(const std::string &key, bool value) {
2052 if (key.compare("flushall_enabled") == 0) {
2053 engine.setFlushAll(value);
2057 EventuallyPersistentEngine &engine;
2062 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
2064 if (config != NULL) {
2065 if (!configuration.parseConfiguration(config, serverApi)) {
2066 return ENGINE_FAILED;
2070 name = configuration.getCouchBucket();
2071 maxFailoverEntries = configuration.getMaxFailoverEntries();
2073 // Start updating the variables from the config!
2074 HashTable::setDefaultNumBuckets(configuration.getHtSize());
2075 HashTable::setDefaultNumLocks(configuration.getHtLocks());
2076 StoredValue::setMutationMemoryThreshold(
2077 configuration.getMutationMemThreshold());
2079 if (configuration.getMaxSize() == 0) {
2080 configuration.setMaxSize(std::numeric_limits<size_t>::max());
2083 if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
2084 configuration.setMemLowWat(percentOf(
2085 configuration.getMaxSize(), 0.75));
2088 if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
2089 configuration.setMemHighWat(percentOf(
2090 configuration.getMaxSize(), 0.85));
2093 maxItemSize = configuration.getMaxItemSize();
2094 configuration.addValueChangedListener("max_item_size",
2095 new EpEngineValueChangeListener(*this));
2097 getlDefaultTimeout = configuration.getGetlDefaultTimeout();
2098 configuration.addValueChangedListener("getl_default_timeout",
2099 new EpEngineValueChangeListener(*this));
2100 getlMaxTimeout = configuration.getGetlMaxTimeout();
2101 configuration.addValueChangedListener("getl_max_timeout",
2102 new EpEngineValueChangeListener(*this));
2104 flushAllEnabled = configuration.isFlushallEnabled();
2105 configuration.addValueChangedListener("flushall_enabled",
2106 new EpEngineValueChangeListener(*this));
2108 workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2109 configuration.getMaxNumShards());
2110 if ((unsigned int)workload->getNumShards() >
2111 configuration.getMaxVbuckets()) {
2112 LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2113 "equal or less than max number of vbuckets");
2114 return ENGINE_FAILED;
2117 dcpConnMap_ = new DcpConnMap(*this);
2118 tapConnMap = new TapConnMap(*this);
2119 tapConfig = new TapConfig(*this);
2120 tapThrottle = new TapThrottle(configuration, stats);
2121 TapConfig::addConfigChangeListener(*this);
2123 checkpointConfig = new CheckpointConfig(*this);
2124 CheckpointConfig::addConfigChangeListener(*this);
2126 epstore = new EventuallyPersistentStore(*this);
2127 if (epstore == NULL) {
2128 return ENGINE_ENOMEM;
2131 initializeEngineCallbacks();
2133 // Complete the initialization of the ep-store
2134 if (!epstore->initialize()) {
2135 return ENGINE_FAILED;
2138 if(configuration.isDataTrafficEnabled()) {
2139 enableTraffic(true);
2142 tapConnMap->initialize(TAP_CONN_NOTIFIER);
2143 dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2145 // record engine initialization time
2146 startupTime.store(ep_real_time());
2148 LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
2150 return ENGINE_SUCCESS;
2153 void EventuallyPersistentEngine::destroy(bool force) {
2154 stats.forceShutdown = force;
2155 stats.isShutdown = true;
2158 epstore->snapshotStats();
2161 tapConnMap->shutdownAllConnections();
2164 dcpConnMap_->shutdownAllConnections();
2168 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie,
2170 if (!flushAllEnabled) {
2171 return ENGINE_ENOTSUP;
2174 if (!isDegradedMode()) {
2175 return ENGINE_TMPFAIL;
2179 * Supporting only a SYNC operation for bucket flush
2182 void* es = getEngineSpecific(cookie);
2185 // Check if diskFlushAll was false and set it to true
2186 // if yes, if the atomic variable weren't false, then
2187 // we will assume that a flushAll has been scheduled
2188 // already and return TMPFAIL.
2189 if (epstore->scheduleFlushAllTask(cookie, when)) {
2190 storeEngineSpecific(cookie, this);
2191 return ENGINE_EWOULDBLOCK;
2193 LOG(EXTENSION_LOG_INFO, "Tried to trigger a bucket flush, but"
2194 "there seems to be a task running already!");
2195 return ENGINE_TMPFAIL;
2199 storeEngineSpecific(cookie, NULL);
2200 LOG(EXTENSION_LOG_WARNING, "Completed bucket flush operation");
2201 return ENGINE_SUCCESS;
2205 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2208 ENGINE_STORE_OPERATION
2211 BlockTimer timer(&stats.storeCmdHisto);
2212 ENGINE_ERROR_CODE ret;
2213 Item *it = static_cast<Item*>(itm);
2216 it->setVBucketId(vbucket);
2218 switch (operation) {
2220 if (it->getCas() == 0) {
2221 // Using a cas command with a cas wildcard doesn't make sense
2222 ret = ENGINE_NOT_STORED;
2227 if (isDegradedMode()) {
2228 return ENGINE_TMPFAIL;
2230 ret = epstore->set(*it, cookie);
2231 if (ret == ENGINE_SUCCESS) {
2232 *cas = it->getCas();
2238 if (isDegradedMode()) {
2239 return ENGINE_TMPFAIL;
2242 if (it->getCas() != 0) {
2243 // Adding an item with a cas value doesn't really make sense...
2244 return ENGINE_KEY_EEXISTS;
2247 ret = epstore->add(*it, cookie);
2248 if (ret == ENGINE_SUCCESS) {
2249 *cas = it->getCas();
2253 case OPERATION_REPLACE:
2254 ret = epstore->replace(*it, cookie);
2255 if (ret == ENGINE_SUCCESS) {
2256 *cas = it->getCas();
2260 case OPERATION_APPEND:
2261 case OPERATION_PREPEND:
2263 if ((ret = get(cookie, &i, it->getKey().c_str(),
2264 it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2265 Item *old = reinterpret_cast<Item*>(i);
2267 if (old->getCas() == (uint64_t) -1) {
2268 // item is locked against updates
2269 itemRelease(cookie, i);
2270 return ENGINE_TMPFAIL;
2273 if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2274 itemRelease(cookie, i);
2275 return ENGINE_KEY_EEXISTS;
2278 if (operation == OPERATION_APPEND) {
2279 ret = old->append(*it, maxItemSize);
2281 ret = old->prepend(*it, maxItemSize);
2284 if (ret != ENGINE_SUCCESS) {
2285 itemRelease(cookie, i);
2286 if (ret == ENGINE_E2BIG) {
2289 return memoryCondition();
2292 if (old->getDataType() == PROTOCOL_BINARY_DATATYPE_JSON) {
2293 // Set the datatype of the new document to BINARY (0),
2294 // as appending/prepending anything to JSON breaks the
2295 // json data structure.
2296 old->setDataType(PROTOCOL_BINARY_RAW_BYTES);
2297 } else if (old->getDataType() ==
2298 PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
2299 // Set the datatype of the new document to
2300 // COMPRESSED_BINARY, as appending/prepending anything
2301 // to JSON breaks the json data structure.
2302 old->setDataType(PROTOCOL_BINARY_DATATYPE_COMPRESSED);
2306 ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2308 it->setBySeqno(old->getBySeqno());
2309 itemRelease(cookie, i);
2311 } while (ret == ENGINE_KEY_EEXISTS);
2313 // Map the error code back to what memcapable expects
2314 if (ret == ENGINE_KEY_ENOENT) {
2315 ret = ENGINE_NOT_STORED;
2321 ret = ENGINE_ENOTSUP;
2325 case ENGINE_SUCCESS:
2326 ++stats.numOpsStore;
2329 ret = memoryCondition();
2331 case ENGINE_NOT_STORED:
2332 case ENGINE_NOT_MY_VBUCKET:
2333 if (isDegradedMode()) {
2334 return ENGINE_TMPFAIL;
2344 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2364 if (connection->shouldFlush()) {
2368 if (connection->isTimeForNoop()) {
2369 LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2370 connection->logHeader());
2374 if (connection->isSuspended() || connection->windowIsFull()) {
2375 LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2376 " suspended state or its ack windows is full.\n",
2377 connection->logHeader());
2381 uint16_t ret = TAP_PAUSE;
2382 VBucketEvent ev = connection->nextVBucketHighPriority();
2383 if (ev.event != TAP_PAUSE) {
2385 case TAP_VBUCKET_SET:
2386 LOG(EXTENSION_LOG_WARNING,
2387 "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2388 connection->logHeader(), ev.vbucket,
2389 VBucket::toString(ev.state));
2390 connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2393 LOG(EXTENSION_LOG_WARNING,
2394 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2395 connection->logHeader(),
2396 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2398 connection->opaqueCommandCode = (uint32_t) ev.state;
2399 *vbucket = ev.vbucket;
2400 *es = &connection->opaqueCommandCode;
2401 *nes = sizeof(connection->opaqueCommandCode);
2404 LOG(EXTENSION_LOG_WARNING,
2405 "%s Unknown VBucketEvent message type %d\n",
2406 connection->logHeader(), ev.event);
2412 if (connection->waitForOpaqueMsgAck()) {
2416 VBucketFilter backFillVBFilter;
2417 if (connection->runBackfill(backFillVBFilter)) {
2418 queueBackfill(backFillVBFilter, connection);
2421 uint8_t nru = INITIAL_NRU_VALUE;
2422 Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2424 case TAP_CHECKPOINT_START:
2425 case TAP_CHECKPOINT_END:
2429 if (ret == TAP_MUTATION) {
2430 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2431 it->getRevSeqno(), nru);
2432 *es = connection->specificData;
2433 } else if (ret == TAP_DELETION) {
2434 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2436 *es = connection->specificData;
2437 } else if (ret == TAP_CHECKPOINT_START) {
2438 // Send the current value of the max deleted seqno
2439 RCPtr<VBucket> vb = getVBucket(*vbucket);
2444 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2445 vb->ht.getMaxDeletedRevSeqno());
2446 *es = connection->specificData;
2456 if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2457 VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2458 if (vbev.event == TAP_VBUCKET_SET) {
2459 LOG(EXTENSION_LOG_WARNING,
2460 "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2461 connection->logHeader(), vbev.vbucket,
2462 VBucket::toString(vbev.state));
2463 connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2471 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2478 uint16_t *vbucket) {
2479 TapProducer *connection = getTapProducer(cookie);
2481 LOG(EXTENSION_LOG_WARNING,
2482 "Failed to lookup TAP connection.. Disconnecting\n");
2483 return TAP_DISCONNECT;
2486 connection->setPaused(false);
2491 connection->setLastWalkTime();
2493 ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2494 seqno, vbucket, connection, retry);
2497 if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2498 connection->lastMsgTime = ep_current_time();
2499 if (ret == TAP_NOOP) {
2502 ++stats.numTapFetched;
2503 *seqno = connection->getSeqno();
2504 if (connection->requestAck(ret, *vbucket)) {
2505 *flags = TAP_FLAG_ACK;
2506 connection->seqnoAckRequested = *seqno;
2509 if (ret == TAP_MUTATION) {
2510 if (connection->haveFlagByteorderSupport()) {
2511 *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2516 connection->setPaused(true);
2517 connection->setNotifySent(false);
2523 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2524 std::string &client,
2526 const void *userdata,
2528 if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2532 std::string tapName = "eq_tapq:";
2533 if (client.length() == 0) {
2534 tapName.assign(ConnHandler::getAnonName());
2536 tapName.append(client);
2539 // Decoding the userdata section of the packet and update the filters
2540 const char *ptr = static_cast<const char*>(userdata);
2541 uint64_t backfillAge = 0;
2542 std::vector<uint16_t> vbuckets;
2543 std::map<uint16_t, uint64_t> lastCheckpointIds;
2545 if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2546 if (nuserdata < sizeof(backfillAge)) {
2547 LOG(EXTENSION_LOG_WARNING,
2548 "Backfill age is missing. Reject connection request from %s\n",
2552 // use memcpy to avoid alignemt issues
2553 memcpy(&backfillAge, ptr, sizeof(backfillAge));
2554 backfillAge = ntohll(backfillAge);
2555 nuserdata -= sizeof(backfillAge);
2556 ptr += sizeof(backfillAge);
2559 if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2561 if (nuserdata < sizeof(nvbuckets)) {
2562 LOG(EXTENSION_LOG_WARNING,
2563 "Number of vbuckets is missing. Reject connection request from %s"
2564 "\n", tapName.c_str());
2567 memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2568 nuserdata -= sizeof(nvbuckets);
2569 ptr += sizeof(nvbuckets);
2570 nvbuckets = ntohs(nvbuckets);
2571 if (nvbuckets > 0) {
2572 if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2573 LOG(EXTENSION_LOG_WARNING,
2574 "# of vbuckets not matched. Reject connection request from %s"
2575 "\n", tapName.c_str());
2578 for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2580 memcpy(&val, ptr, sizeof(nvbuckets));
2581 ptr += sizeof(uint16_t);
2582 vbuckets.push_back(ntohs(val));
2584 nuserdata -= (sizeof(uint16_t) * nvbuckets);
2588 if (flags & TAP_CONNECT_CHECKPOINT) {
2589 uint16_t nCheckpoints = 0;
2590 if (nuserdata >= sizeof(nCheckpoints)) {
2591 memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2592 nuserdata -= sizeof(nCheckpoints);
2593 ptr += sizeof(nCheckpoints);
2594 nCheckpoints = ntohs(nCheckpoints);
2596 if (nCheckpoints > 0) {
2598 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2599 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2600 "Reject connection request from %s\n", tapName.c_str());
2603 for (uint16_t j = 0; j < nCheckpoints; ++j) {
2605 uint64_t checkpointId;
2606 memcpy(&vbid, ptr, sizeof(vbid));
2607 ptr += sizeof(uint16_t);
2608 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2609 ptr += sizeof(uint64_t);
2610 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2613 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2617 TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2620 configuration.getTapKeepalive()),
2624 tapConnMap->notifyPausedConnection(tp, true);
2628 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2629 void *engine_specific,
2646 void *specific = getEngineSpecific(cookie);
2647 ConnHandler *connection = NULL;
2648 if (specific == NULL) {
2649 if (tap_event == TAP_ACK) {
2650 LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2651 "exist. Force disconnect...\n", (char *) cookie);
2652 // tap producer is no longer connected..
2653 return ENGINE_DISCONNECT;
2655 connection = tapConnMap->newConsumer(cookie);
2656 if (connection == NULL) {
2657 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2658 " Force disconnect\n");
2659 return ENGINE_DISCONNECT;
2661 storeEngineSpecific(cookie, connection);
2664 connection = reinterpret_cast<ConnHandler *>(specific);
2667 std::string k(static_cast<const char*>(key), nkey);
2668 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2670 if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2671 if (!tapThrottle->shouldProcess()) {
2672 ++stats.tapThrottled;
2673 if (connection->supportsAck()) {
2674 ret = ENGINE_TMPFAIL;
2676 ret = ENGINE_DISCONNECT;
2677 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2678 "ack support. Force disconnect...\n",
2679 connection->logHeader());
2685 switch (tap_event) {
2687 ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2690 ret = flush(cookie, 0);
2691 LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2692 connection->logHeader());
2697 TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2698 nengine, &revSeqno);
2700 ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2705 case TAP_CHECKPOINT_START:
2706 case TAP_CHECKPOINT_END:
2708 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2710 if (tap_event == TAP_CHECKPOINT_START &&
2711 nengine == TapEngineSpecific::sizeRevSeqno) {
2712 // Set the current value for the max deleted seqno
2713 RCPtr<VBucket> vb = getVBucket(vbucket);
2715 return ENGINE_TMPFAIL;
2718 TapEngineSpecific::readSpecificData(tap_event,
2722 vb->ht.setMaxDeletedRevSeqno(seqnum);
2726 uint64_t checkpointId;
2727 memcpy(&checkpointId, data, sizeof(checkpointId));
2728 checkpointId = ntohll(checkpointId);
2729 ConnHandlerCheckPoint(tc, tap_event, vbucket,
2733 ret = ENGINE_DISCONNECT;
2734 LOG(EXTENSION_LOG_WARNING,
2735 "%s Checkpoint Id is missing in "
2736 "CHECKPOINT messages. Force disconnect...\n",
2737 connection->logHeader());
2741 ret = ENGINE_DISCONNECT;
2742 LOG(EXTENSION_LOG_WARNING,
2743 "%s not a consumer! Force disconnect\n",
2744 connection->logHeader());
2752 uint8_t nru = INITIAL_NRU_VALUE;
2753 uint64_t revSeqno = 0;
2754 TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2755 nengine, &revSeqno, &nru);
2757 if (!isDatatypeSupported(cookie)) {
2758 datatype = PROTOCOL_BINARY_RAW_BYTES;
2759 const unsigned char *dat = (const unsigned char*)data;
2760 const int datlen = ndata;
2761 if (checkUTF8JSON(dat, datlen)) {
2762 datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2765 ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2766 flags, datatype, 0, 0, revSeqno, exptime,
2773 if (nengine == sizeof(uint32_t)) {
2775 memcpy(&cc, engine_specific, sizeof(cc));
2779 case TAP_OPAQUE_ENABLE_AUTO_NACK:
2780 // @todo: the memcached core will _ALWAYS_ send nack
2781 // if it encounter an error. This should be
2782 // set as the default when we move to .next after 2.0
2783 // (currently we need to allow the message for
2784 // backwards compatibility)
2785 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2786 connection->logHeader());
2788 case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2789 connection->setSupportCheckpointSync(true);
2790 LOG(EXTENSION_LOG_INFO,
2791 "%s Enable checkpoint synchronization\n",
2792 connection->logHeader());
2794 case TAP_OPAQUE_OPEN_CHECKPOINT:
2796 * This event is only received by the TAP client that wants to
2797 * get mutations from closed checkpoints only. At this time,
2798 * only incremental backup client receives this event so that
2799 * it can close the connection and reconnect later.
2801 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2802 connection->logHeader());
2804 case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2806 LOG(EXTENSION_LOG_INFO,
2807 "%s Backfill started for vbucket %d.\n",
2808 connection->logHeader(), vbucket);
2809 BlockTimer timer(&stats.tapVbucketResetHisto);
2810 ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2812 if (ret == ENGINE_DISCONNECT) {
2813 LOG(EXTENSION_LOG_WARNING,
2814 "%s Failed to reset a vbucket %d. Force disconnect\n",
2815 connection->logHeader(), vbucket);
2817 LOG(EXTENSION_LOG_WARNING,
2818 "%s Reset vbucket %d was completed succecssfully.\n",
2819 connection->logHeader(), vbucket);
2822 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2824 tc->setBackfillPhase(true, vbucket);
2826 ret = ENGINE_DISCONNECT;
2827 LOG(EXTENSION_LOG_WARNING,
2828 "TAP consumer doesn't exists. Force disconnect\n");
2832 case TAP_OPAQUE_CLOSE_BACKFILL:
2834 LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2835 connection->logHeader());
2836 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2838 tc->setBackfillPhase(false, vbucket);
2840 ret = ENGINE_DISCONNECT;
2841 LOG(EXTENSION_LOG_WARNING,
2842 "%s not a consumer! Force disconnect\n",
2843 connection->logHeader());
2847 case TAP_OPAQUE_CLOSE_TAP_STREAM:
2849 * This event is sent by the eVBucketMigrator to notify that
2850 * the source node closes the tap replication stream and
2851 * switches to TAKEOVER_VBUCKETS phase.
2852 * This is just an informative message and doesn't require any
2855 LOG(EXTENSION_LOG_INFO,
2856 "%s Received close tap stream. Switching to takeover phase.\n",
2857 connection->logHeader());
2859 case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2861 * This opaque message is just for notifying that the source
2862 * node receives change_vbucket_filter request and processes
2865 LOG(EXTENSION_LOG_INFO,
2866 "%s Notified that the source node changed a vbucket filter.\n",
2867 connection->logHeader());
2870 LOG(EXTENSION_LOG_WARNING,
2871 "%s Received an unknown opaque command\n",
2872 connection->logHeader());
2875 LOG(EXTENSION_LOG_WARNING,
2876 "%s Received tap opaque with unknown size %d\n",
2877 connection->logHeader(), nengine);
2881 case TAP_VBUCKET_SET:
2883 BlockTimer timer(&stats.tapVbucketSetHisto);
2885 if (nengine != sizeof(vbucket_state_t)) {
2887 LOG(EXTENSION_LOG_WARNING,
2888 "%s Received TAP_VBUCKET_SET with illegal size."
2889 " Force disconnect\n", connection->logHeader());
2890 ret = ENGINE_DISCONNECT;
2894 vbucket_state_t state;
2895 memcpy(&state, engine_specific, nengine);
2896 state = (vbucket_state_t)ntohl(state);
2898 ret = connection->setVBucketState(0, vbucket, state);
2904 LOG(EXTENSION_LOG_WARNING,
2905 "%s Recieved bad opcode, ignoring message\n",
2906 connection->logHeader());
2909 connection->processedEvent(tap_event, ret);
2913 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2914 TapConsumer *consumer,
2917 uint64_t checkpointId) {
2918 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2920 if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2921 getEpStore()->wakeUpFlusher();
2922 ret = ENGINE_SUCCESS;
2925 ret = ENGINE_DISCONNECT;
2926 LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2927 "checkpoint %llu. Force disconnect\n",
2928 consumer->logHeader(), checkpointId);
2934 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2936 reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2937 if (!(rv && rv->isConnected())) {
2938 LOG(EXTENSION_LOG_WARNING,
2939 "Walking a non-existent tap queue, disconnecting\n");
2943 if (rv->doDisconnect()) {
2944 LOG(EXTENSION_LOG_WARNING,
2945 "%s Disconnecting pending connection\n", rv->logHeader());
2951 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2952 // Register the callback
2953 registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2956 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2962 TapProducer *connection = getTapProducer(cookie);
2964 LOG(EXTENSION_LOG_WARNING,
2965 "Unable to process tap ack. No producer found\n");
2966 return ENGINE_DISCONNECT;
2969 return connection->processAck(seqno, status, msg);
2972 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2976 ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2978 ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2981 bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2983 item_eviction_policy_t policy = engine.getEpStore()->
2984 getItemEvictionPolicy();
2985 numItems += vb->getNumItems(policy);
2986 numTempItems += vb->getNumTempItems();
2987 nonResident += vb->getNumNonResidentItems(policy);
2989 if (vb->getHighPriorityChkSize() > 0) {
2990 chkPersistRemaining++;
2993 fileSpaceUsed += vb->fileSpaceUsed;
2994 fileSize += vb->fileSize;
2996 if (desired_state != vbucket_state_dead) {
2997 htMemory += vb->ht.memorySize();
2998 htItemMemory += vb->ht.getItemMemory();
2999 htCacheSize += vb->ht.cacheSize;
3000 numEjects += vb->ht.getNumEjects();
3001 numExpiredItems += vb->numExpiredItems;
3002 metaDataMemory += vb->ht.metaDataMemory;
3003 metaDataDisk += vb->metaDataDisk;
3004 opsCreate += vb->opsCreate;
3005 opsUpdate += vb->opsUpdate;
3006 opsDelete += vb->opsDelete;
3007 opsReject += vb->opsReject;
3009 queueSize += vb->dirtyQueueSize;
3010 queueMemory += vb->dirtyQueueMem;
3011 queueFill += vb->dirtyQueueFill;
3012 queueDrain += vb->dirtyQueueDrain;
3013 queueAge += vb->getQueueAge();
3014 pendingWrites += vb->dirtyQueuePendingWrites;
3021 * A container class holding VBucketCountVisitors to aggregate stats for
3022 * different vbucket states.
3024 class VBucketCountAggregator : public VBucketVisitor {
3026 bool visitBucket(RCPtr<VBucket> &vb) {
3027 std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3028 it = visitorMap.find(vb->getState());
3029 if ( it != visitorMap.end() ) {
3030 it->second->visitBucket(vb);
3036 void addVisitor(VBucketCountVisitor* visitor) {
3037 visitorMap[visitor->getVBucketState()] = visitor;
3040 std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
3043 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3044 ADD_STAT add_stat) {
3045 VBucketCountAggregator aggregator;
3047 VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
3048 aggregator.addVisitor(&activeCountVisitor);
3050 VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
3051 aggregator.addVisitor(&replicaCountVisitor);
3053 VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
3054 aggregator.addVisitor(&pendingCountVisitor);
3056 VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
3057 aggregator.addVisitor(&deadCountVisitor);
3059 epstore->visit(aggregator);
3061 epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
3062 replicaCountVisitor.getMemResidentPer());
3063 tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
3064 replicaCountVisitor.getNumItems() +
3065 pendingCountVisitor.getNumItems());
3067 configuration.addStats(add_stat, cookie);
3069 EPStats &epstats = getEpStats();
3070 add_casted_stat("ep_version", VERSION, add_stat, cookie);
3071 add_casted_stat("ep_storage_age",
3072 epstats.dirtyAge, add_stat, cookie);
3073 add_casted_stat("ep_storage_age_highwat",
3074 epstats.dirtyAgeHighWat, add_stat, cookie);
3075 add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3078 if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3079 add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3080 } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3081 add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3084 add_casted_stat("ep_total_enqueued",
3085 epstats.totalEnqueued, add_stat, cookie);
3086 add_casted_stat("ep_total_persisted",
3087 epstats.totalPersisted, add_stat, cookie);
3088 add_casted_stat("ep_item_flush_failed",
3089 epstats.flushFailed, add_stat, cookie);
3090 add_casted_stat("ep_item_commit_failed",
3091 epstats.commitFailed, add_stat, cookie);
3092 add_casted_stat("ep_item_begin_failed",
3093 epstats.beginFailed, add_stat, cookie);
3094 add_casted_stat("ep_expired_access", epstats.expired_access,
3096 add_casted_stat("ep_expired_pager", epstats.expired_pager,
3098 add_casted_stat("ep_item_flush_expired",
3099 epstats.flushExpired, add_stat, cookie);
3100 add_casted_stat("ep_queue_size",
3101 epstats.diskQueueSize, add_stat, cookie);
3102 add_casted_stat("ep_flusher_todo",
3103 epstats.flusher_todo, add_stat, cookie);
3104 add_casted_stat("ep_uncommitted_items",
3105 epstats.flusher_todo, add_stat, cookie);
3106 add_casted_stat("ep_diskqueue_items",
3107 epstats.diskQueueSize, add_stat, cookie);
3108 add_casted_stat("ep_flusher_state",
3109 epstore->getFlusher(0)->stateName(),
3111 add_casted_stat("ep_commit_num", epstats.flusherCommits,
3113 add_casted_stat("ep_commit_time",
3114 epstats.commit_time, add_stat, cookie);
3115 add_casted_stat("ep_commit_time_total",
3116 epstats.cumulativeCommitTime, add_stat, cookie);
3117 add_casted_stat("ep_vbucket_del",
3118 epstats.vbucketDeletions, add_stat, cookie);
3119 add_casted_stat("ep_vbucket_del_fail",
3120 epstats.vbucketDeletionFail, add_stat, cookie);
3121 add_casted_stat("ep_flush_duration_total",
3122 epstats.cumulativeFlushTime, add_stat, cookie);
3123 add_casted_stat("ep_flush_all",
3124 epstore->isFlushAllScheduled() ? "true" : "false",
3126 add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
3128 add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
3130 add_casted_stat("curr_items_tot",
3131 activeCountVisitor.getNumItems() +
3132 replicaCountVisitor.getNumItems() +
3133 pendingCountVisitor.getNumItems(),
3135 add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
3137 add_casted_stat("vb_active_curr_items", activeCountVisitor.getNumItems(),
3139 add_casted_stat("vb_active_num_non_resident",
3140 activeCountVisitor.getNonResident(),
3142 add_casted_stat("vb_active_perc_mem_resident",
3143 activeCountVisitor.getMemResidentPer(),
3145 add_casted_stat("vb_active_eject", activeCountVisitor.getEjects(),
3147 add_casted_stat("vb_active_expired", activeCountVisitor.getExpired(),
3149 add_casted_stat("vb_active_meta_data_memory",
3150 activeCountVisitor.getMetaDataMemory(),
3152 add_casted_stat("vb_active_meta_data_disk",
3153 activeCountVisitor.getMetaDataDisk(),
3155 add_casted_stat("vb_active_ht_memory",
3156 activeCountVisitor.getHashtableMemory(),
3158 add_casted_stat("vb_active_itm_memory", activeCountVisitor.getItemMemory(),
3160 add_casted_stat("vb_active_ops_create", activeCountVisitor.getOpsCreate(),
3162 add_casted_stat("vb_active_ops_update", activeCountVisitor.getOpsUpdate(),
3164 add_casted_stat("vb_active_ops_delete", activeCountVisitor.getOpsDelete(),
3166 add_casted_stat("vb_active_ops_reject", activeCountVisitor.getOpsReject(),
3168 add_casted_stat("vb_active_queue_size", activeCountVisitor.getQueueSize(),
3170 add_casted_stat("vb_active_queue_memory",
3171 activeCountVisitor.getQueueMemory(), add_stat, cookie);
3172 add_casted_stat("vb_active_queue_age", activeCountVisitor.getAge(),
3174 add_casted_stat("vb_active_queue_pending",
3175 activeCountVisitor.getPendingWrites(), add_stat, cookie);
3176 add_casted_stat("vb_active_queue_fill", activeCountVisitor.getQueueFill(),
3178 add_casted_stat("vb_active_queue_drain",
3179 activeCountVisitor.getQueueDrain(), add_stat, cookie);
3181 add_casted_stat("vb_replica_num", replicaCountVisitor.getVBucketNumber(),
3183 add_casted_stat("vb_replica_curr_items", replicaCountVisitor.getNumItems(),
3185 add_casted_stat("vb_replica_num_non_resident",
3186 replicaCountVisitor.getNonResident(), add_stat, cookie);
3187 add_casted_stat("vb_replica_perc_mem_resident",
3188 replicaCountVisitor.getMemResidentPer(),
3190 add_casted_stat("vb_replica_eject", replicaCountVisitor.getEjects(),
3192 add_casted_stat("vb_replica_expired", replicaCountVisitor.getExpired(),
3194 add_casted_stat("vb_replica_meta_data_memory",
3195 replicaCountVisitor.getMetaDataMemory(), add_stat, cookie);
3196 add_casted_stat("vb_replica_meta_data_disk",
3197 replicaCountVisitor.getMetaDataDisk(), add_stat, cookie);
3198 add_casted_stat("vb_replica_ht_memory",
3199 replicaCountVisitor.getHashtableMemory(),
3201 add_casted_stat("vb_replica_itm_memory",
3202 replicaCountVisitor.getItemMemory(), add_stat, cookie);
3203 add_casted_stat("vb_replica_ops_create",
3204 replicaCountVisitor.getOpsCreate(), add_stat, cookie);
3205 add_casted_stat("vb_replica_ops_update",
3206 replicaCountVisitor.getOpsUpdate(), add_stat, cookie);
3207 add_casted_stat("vb_replica_ops_delete",
3208 replicaCountVisitor.getOpsDelete(), add_stat, cookie);
3209 add_casted_stat("vb_replica_ops_reject",
3210 replicaCountVisitor.getOpsReject(), add_stat, cookie);
3211 add_casted_stat("vb_replica_queue_size",
3212 replicaCountVisitor.getQueueSize(), add_stat, cookie);
3213 add_casted_stat("vb_replica_queue_memory",
3214 replicaCountVisitor.getQueueMemory(),
3216 add_casted_stat("vb_replica_queue_age",
3217 replicaCountVisitor.getAge(), add_stat, cookie);
3218 add_casted_stat("vb_replica_queue_pending",
3219 replicaCountVisitor.getPendingWrites(),
3221 add_casted_stat("vb_replica_queue_fill",
3222 replicaCountVisitor.getQueueFill(), add_stat, cookie);
3223 add_casted_stat("vb_replica_queue_drain",
3224 replicaCountVisitor.getQueueDrain(), add_stat, cookie);
3226 add_casted_stat("vb_pending_num",
3227 pendingCountVisitor.getVBucketNumber(), add_stat, cookie);
3228 add_casted_stat("vb_pending_curr_items",
3229 pendingCountVisitor.getNumItems(), add_stat, cookie);
3230 add_casted_stat("vb_pending_num_non_resident",
3231 pendingCountVisitor.getNonResident(),
3233 add_casted_stat("vb_pending_perc_mem_resident",
3234 pendingCountVisitor.getMemResidentPer(), add_stat, cookie);
3235 add_casted_stat("vb_pending_eject", pendingCountVisitor.getEjects(),
3237 add_casted_stat("vb_pending_expired", pendingCountVisitor.getExpired(),
3239 add_casted_stat("vb_pending_meta_data_memory",
3240 pendingCountVisitor.getMetaDataMemory(),
3242 add_casted_stat("vb_pending_meta_data_disk",
3243 pendingCountVisitor.getMetaDataDisk(),
3245 add_casted_stat("vb_pending_ht_memory",
3246 pendingCountVisitor.getHashtableMemory(),
3248 add_casted_stat("vb_pending_itm_memory",
3249 pendingCountVisitor.getItemMemory(), add_stat, cookie);
3250 add_casted_stat("vb_pending_ops_create",
3251 pendingCountVisitor.getOpsCreate(), add_stat, cookie);
3252 add_casted_stat("vb_pending_ops_update",
3253 pendingCountVisitor.getOpsUpdate(), add_stat, cookie);
3254 add_casted_stat("vb_pending_ops_delete",
3255 pendingCountVisitor.getOpsDelete(), add_stat, cookie);
3256 add_casted_stat("vb_pending_ops_reject",
3257 pendingCountVisitor.getOpsReject(), add_stat, cookie);
3258 add_casted_stat("vb_pending_queue_size",
3259 pendingCountVisitor.getQueueSize(), add_stat, cookie);
3260 add_casted_stat("vb_pending_queue_memory",
3261 pendingCountVisitor.getQueueMemory(),
3263 add_casted_stat("vb_pending_queue_age", pendingCountVisitor.getAge(),
3265 add_casted_stat("vb_pending_queue_pending",
3266 pendingCountVisitor.getPendingWrites(),
3268 add_casted_stat("vb_pending_queue_fill",
3269 pendingCountVisitor.getQueueFill(), add_stat, cookie);
3270 add_casted_stat("vb_pending_queue_drain",
3271 pendingCountVisitor.getQueueDrain(), add_stat, cookie);
3273 add_casted_stat("vb_dead_num", deadCountVisitor.getVBucketNumber(),
3276 add_casted_stat("ep_db_data_size",
3277 activeCountVisitor.getFileSpaceUsed() +
3278 replicaCountVisitor.getFileSpaceUsed() +
3279 pendingCountVisitor.getFileSpaceUsed() +
3280 deadCountVisitor.getFileSpaceUsed(),
3282 add_casted_stat("ep_db_file_size",
3283 activeCountVisitor.getFileSize() +
3284 replicaCountVisitor.getFileSize() +
3285 pendingCountVisitor.getFileSize() +
3286 deadCountVisitor.getFileSize(),
3289 add_casted_stat("ep_vb_snapshot_total",
3290 epstats.snapshotVbucketHisto.total(), add_stat, cookie);
3292 add_casted_stat("ep_persist_vbstate_total",
3293 epstats.persistVBStateHisto.total(), add_stat, cookie);
3295 add_casted_stat("ep_vb_total",
3296 activeCountVisitor.getVBucketNumber() +
3297 replicaCountVisitor.getVBucketNumber() +
3298 pendingCountVisitor.getVBucketNumber() +
3299 deadCountVisitor.getVBucketNumber(),
3302 add_casted_stat("ep_total_new_items",
3303 activeCountVisitor.getOpsCreate() +
3304 replicaCountVisitor.getOpsCreate() +
3305 pendingCountVisitor.getOpsCreate(),
3307 add_casted_stat("ep_total_del_items",
3308 activeCountVisitor.getOpsDelete() +
3309 replicaCountVisitor.getOpsDelete() +
3310 pendingCountVisitor.getOpsDelete(),
3312 add_casted_stat("ep_diskqueue_memory",
3313 activeCountVisitor.getQueueMemory() +
3314 replicaCountVisitor.getQueueMemory() +
3315 pendingCountVisitor.getQueueMemory(),
3317 add_casted_stat("ep_diskqueue_fill",
3318 activeCountVisitor.getQueueFill() +
3319 replicaCountVisitor.getQueueFill() +
3320 pendingCountVisitor.getQueueFill(),
3322 add_casted_stat("ep_diskqueue_drain",
3323 activeCountVisitor.getQueueDrain() +
3324 replicaCountVisitor.getQueueDrain() +
3325 pendingCountVisitor.getQueueDrain(),
3327 add_casted_stat("ep_diskqueue_pending",
3328 activeCountVisitor.getPendingWrites() +
3329 replicaCountVisitor.getPendingWrites() +
3330 pendingCountVisitor.getPendingWrites(),
3332 add_casted_stat("ep_meta_data_memory",
3333 activeCountVisitor.getMetaDataMemory() +
3334 replicaCountVisitor.getMetaDataMemory() +
3335 pendingCountVisitor.getMetaDataMemory(),
3337 add_casted_stat("ep_meta_data_disk",
3338 activeCountVisitor.getMetaDataDisk() +
3339 replicaCountVisitor.getMetaDataDisk() +
3340 pendingCountVisitor.getMetaDataDisk(),
3343 size_t memUsed = stats.getTotalMemoryUsed();
3344 add_casted_stat("mem_used", memUsed, add_stat, cookie);
3345 add_casted_stat("bytes", memUsed, add_stat, cookie);
3346 add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3347 add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3348 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3349 add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3351 add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3353 add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3354 add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3356 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3357 add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3359 add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3361 add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3362 add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3363 add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3364 add_casted_stat("ep_total_cache_size",
3365 activeCountVisitor.getCacheSize() +
3366 replicaCountVisitor.getCacheSize() +
3367 pendingCountVisitor.getCacheSize(),
3369 add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3370 add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3372 add_casted_stat("ep_mem_tracker_enabled",
3373 stats.memoryTrackerEnabled ? "true" : "false",
3375 add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3377 add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3379 add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3381 add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3383 add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3385 add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3387 add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3389 add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3391 add_casted_stat("ep_items_rm_from_checkpoints",
3392 epstats.itemsRemovedFromCheckpoints,
3394 add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3396 add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3398 add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3401 add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3402 add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3404 add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3406 add_casted_stat("ep_pending_ops_max_duration",
3407 epstats.pendingOpsMaxDuration,
3410 add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3412 add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3415 size_t vbDeletions = epstats.vbucketDeletions.load();
3416 if (vbDeletions > 0) {
3417 add_casted_stat("ep_vbucket_del_max_walltime",
3418 epstats.vbucketDelMaxWalltime,
3420 add_casted_stat("ep_vbucket_del_avg_walltime",
3421 epstats.vbucketDelTotWalltime / vbDeletions,
3425 size_t numBgOps = epstats.bgNumOperations.load();
3427 add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3429 add_casted_stat("ep_bg_min_wait",
3432 add_casted_stat("ep_bg_max_wait",
3435 add_casted_stat("ep_bg_wait_avg",
3436 epstats.bgWait / numBgOps,
3438 add_casted_stat("ep_bg_min_load",
3441 add_casted_stat("ep_bg_max_load",
3444 add_casted_stat("ep_bg_load_avg",
3445 epstats.bgLoad / numBgOps,
3447 add_casted_stat("ep_bg_wait",
3450 add_casted_stat("ep_bg_load",
3455 add_casted_stat("ep_num_non_resident",
3456 activeCountVisitor.getNonResident() +
3457 pendingCountVisitor.getNonResident() +
3458 replicaCountVisitor.getNonResident(),
3461 add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3462 add_casted_stat("ep_exp_pager_stime", epstore->getExpiryPagerSleeptime(),
3465 add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3467 add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3469 add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3471 add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3474 if (getConfiguration().isAccessScannerEnabled()) {
3477 if (cb_gmtime_r((time_t *)&epstats.alogTime, &alogTim) == -1) {
3478 add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3481 strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3482 add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3486 add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3490 add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
3492 if (getConfiguration().isWarmup()) {
3493 Warmup *wp = epstore->getWarmup();
3495 if (!epstore->isWarmingUp()) {
3496 add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
3498 add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);
3500 if (wp->getTime() > 0) {
3501 add_casted_stat("ep_warmup_time", wp->getTime() / 1000,
3504 add_casted_stat("ep_warmup_oom", epstats.warmOOM, add_stat, cookie);
3505 add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
3508 add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
3510 add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
3512 add_casted_stat("ep_num_ops_del_meta", epstats.numOpsDelMeta,
3514 add_casted_stat("ep_num_ops_set_meta_res_fail",
3515 epstats.numOpsSetMetaResolutionFailed, add_stat, cookie);
3516 add_casted_stat("ep_num_ops_del_meta_res_fail",
3517 epstats.numOpsDelMetaResolutionFailed, add_stat, cookie);
3518 add_casted_stat("ep_num_ops_set_ret_meta", epstats.numOpsSetRetMeta,
3520 add_casted_stat("ep_num_ops_del_ret_meta", epstats.numOpsDelRetMeta,
3522 add_casted_stat("ep_num_ops_get_meta_on_set_meta",
3523 epstats.numOpsGetMetaOnSetWithMeta, add_stat, cookie);
3524 add_casted_stat("ep_chk_persistence_timeout",
3525 VBucket::getCheckpointFlushTimeout(),
3527 add_casted_stat("ep_chk_persistence_remains",
3528 activeCountVisitor.getChkPersistRemaining() +
3529 pendingCountVisitor.getChkPersistRemaining() +
3530 replicaCountVisitor.getChkPersistRemaining(),
3532 add_casted_stat("ep_workload_pattern",
3533 workload->stringOfWorkLoadPattern(),
3536 add_casted_stat("ep_defragmenter_num_visited", epstats.defragNumVisited,
3538 add_casted_stat("ep_defragmenter_num_moved", epstats.defragNumMoved,
3541 return ENGINE_SUCCESS;
3544 ENGINE_ERROR_CODE EventuallyPersistentEngine::doMemoryStats(const void *cookie,
3545 ADD_STAT add_stat) {
3546 add_casted_stat("bytes", stats.getTotalMemoryUsed(), add_stat, cookie);
3547 add_casted_stat("mem_used", stats.getTotalMemoryUsed(), add_stat, cookie);
3548 add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3549 add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3550 add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3551 add_casted_stat("ep_max_size", stats.getMaxDataSize(), add_stat, cookie);
3552 add_casted_stat("ep_mem_low_wat", stats.mem_low_wat, add_stat, cookie);
3553 add_casted_stat("ep_mem_high_wat", stats.mem_high_wat, add_stat, cookie);
3554 add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3555 add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3558 add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3559 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3560 add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3562 add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3564 add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3566 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3567 add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3569 add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3571 add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3572 add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3575 add_casted_stat("ep_mem_tracker_enabled",
3576 stats.memoryTrackerEnabled ? "true" : "false",
3579 std::map<std::string, size_t> alloc_stats;
3580 MemoryTracker::getInstance()->getAllocatorStats(alloc_stats);
3581 std::map<std::string, size_t>::iterator it = alloc_stats.begin();
3582 for (; it != alloc_stats.end(); ++it) {
3583 add_casted_stat(it->first.c_str(), it->second, add_stat, cookie);
3586 return ENGINE_SUCCESS;
3589 ENGINE_ERROR_CODE EventuallyPersistentEngine::doVBucketStats(
3592 const char* stat_key,
3594 bool prevStateRequested,
3596 class StatVBucketVisitor : public VBucketVisitor {
3598 StatVBucketVisitor(EventuallyPersistentStore *store,
3599 const void *c, ADD_STAT a,
3600 bool isPrevStateRequested, bool detailsRequested) :
3601 eps(store), cookie(c), add_stat(a),
3602 isPrevState(isPrevStateRequested),
3603 isDetailsRequested(detailsRequested) {}
3605 bool visitBucket(RCPtr<VBucket> &vb) {
3606 addVBStats(cookie, add_stat, vb, eps, isPrevState,
3607 isDetailsRequested);
3611 static void addVBStats(const void *cookie, ADD_STAT add_stat,
3613 EventuallyPersistentStore *store,
3614 bool isPrevStateRequested,
3615 bool detailsRequested) {