2 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
4 * Copyright 2010 Couchbase, Inc
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
22 #include <memcached/engine.h>
23 #include <memcached/protocol_binary.h>
24 #include <memcached/util.h>
25 #include <platform/platform.h>
37 #include "ep_engine.h"
38 #include "failover-table.h"
41 #include "htresizer.h"
42 #include "memory_tracker.h"
43 #include "stats-info.h"
44 #define STATWRITER_NAMESPACE core_engine
45 #include "statwriter.h"
46 #undef STATWRITER_NAMESPACE
47 #include "tapthrottle.h"
48 #include "dcp-consumer.h"
49 #include "dcp-producer.h"
52 static ALLOCATOR_HOOKS_API *hooksApi;
53 static SERVER_LOG_API *loggerApi;
55 static size_t percentOf(size_t val, double percent) {
56 return static_cast<size_t>(static_cast<double>(val) * percent);
60 * Helper function to avoid typing in the long cast all over the place
61 * @param handle pointer to the engine
62 * @return the engine as a class
64 static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
66 EventuallyPersistentEngine* ret;
67 ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
68 ObjectRegistry::onSwitchThread(ret);
72 static inline void releaseHandle(ENGINE_HANDLE* handle) {
74 ObjectRegistry::onSwitchThread(NULL);
79 * Call the response callback and return the appropriate value so that
80 * the core knows what to do..
82 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
84 const void *ext, uint8_t extlen,
85 const void *body, uint32_t bodylen,
86 uint8_t datatype, uint16_t status,
87 uint64_t cas, const void *cookie)
89 ENGINE_ERROR_CODE rv = ENGINE_FAILED;
90 EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
91 if (response(key, keylen, ext, extlen, body, bodylen, datatype,
92 status, cas, cookie)) {
95 ObjectRegistry::onSwitchThread(e);
100 static void validate(T v, T l, T h) {
101 if (v < l || v > h) {
102 throw std::runtime_error("Value out of range.");
107 static void checkNumeric(const char* str) {
112 for (; str[i]; i++) {
114 if (!isdigit(str[i])) {
115 throw std::runtime_error("Value is not numeric");
120 // The Engine API specifies C linkage for the functions..
123 static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
125 engine_info* info = getHandle(handle)->getInfo();
126 releaseHandle(handle);
130 static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
131 const char* config_str)
133 ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
134 releaseHandle(handle);
138 static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
140 getHandle(handle)->destroy(force);
141 delete getHandle(handle);
145 static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
152 const rel_time_t exptime,
155 if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
156 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
158 return ENGINE_EINVAL;
160 ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
167 releaseHandle(handle);
171 static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
177 mutation_descr_t *mut_info)
179 ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
182 releaseHandle(handle);
186 static void EvpItemRelease(ENGINE_HANDLE* handle,
190 getHandle(handle)->itemRelease(cookie, itm);
191 releaseHandle(handle);
194 static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
201 ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
202 nkey, vbucket, true);
203 releaseHandle(handle);
207 static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
209 const char* stat_key,
213 ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
217 releaseHandle(handle);
221 static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
225 ENGINE_STORE_OPERATION operation,
228 ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
231 releaseHandle(handle);
235 static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
239 const bool increment,
241 const uint64_t delta,
242 const uint64_t initial,
243 const rel_time_t exptime,
249 if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
250 if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
251 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
254 LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
255 "operations on compressed data!");
257 return ENGINE_EINVAL;
259 ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
268 releaseHandle(handle);
272 static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
273 const void* cookie, time_t when)
275 ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
276 releaseHandle(handle);
280 static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
282 getHandle(handle)->resetStats();
283 releaseHandle(handle);
286 static protocol_binary_response_status stopFlusher(
287 EventuallyPersistentEngine *e,
290 return e->stopFlusher(msg, msg_size);
293 static protocol_binary_response_status startFlusher(
294 EventuallyPersistentEngine *e,
297 return e->startFlusher(msg, msg_size);
300 static protocol_binary_response_status setTapParam(
301 EventuallyPersistentEngine *e,
304 const char **msg, size_t *) {
305 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
309 if (strcmp(keyz, "tap_keepalive") == 0) {
311 validate(v, 0, MAX_TAP_KEEP_ALIVE);
312 e->setTapKeepAlive(static_cast<uint32_t>(v));
313 } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
315 e->getConfiguration().setTapThrottleThreshold(v);
316 } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
318 e->getConfiguration().setTapThrottleQueueCap(v);
319 } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
321 e->getConfiguration().setTapThrottleCapPcnt(v);
323 *msg = "Unknown config param";
324 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
326 } catch(std::runtime_error &) {
327 *msg = "Value out of range.";
328 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
334 static protocol_binary_response_status setCheckpointParam(
335 EventuallyPersistentEngine *e,
340 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
344 if (strcmp(keyz, "chk_max_items") == 0) {
346 validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
347 e->getConfiguration().setChkMaxItems(v);
348 } else if (strcmp(keyz, "chk_period") == 0) {
350 validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
351 e->getConfiguration().setChkPeriod(v);
352 } else if (strcmp(keyz, "max_checkpoints") == 0) {
354 validate(v, DEFAULT_MAX_CHECKPOINTS,
355 MAX_CHECKPOINTS_UPPER_BOUND);
356 e->getConfiguration().setMaxCheckpoints(v);
357 } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
358 if (strcmp(valz, "true") == 0) {
359 e->getConfiguration().setItemNumBasedNewChk(true);
361 e->getConfiguration().setItemNumBasedNewChk(false);
363 } else if (strcmp(keyz, "keep_closed_chks") == 0) {
364 if (strcmp(valz, "true") == 0) {
365 e->getConfiguration().setKeepClosedChks(true);
367 e->getConfiguration().setKeepClosedChks(false);
369 } else if (strcmp(keyz, "enable_chk_merge") == 0) {
370 if (strcmp(valz, "true") == 0) {
371 e->getConfiguration().setEnableChkMerge(true);
373 e->getConfiguration().setEnableChkMerge(false);
376 *msg = "Unknown config param";
377 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
379 } catch(std::runtime_error &) {
380 *msg = "Value out of range.";
381 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
387 static protocol_binary_response_status setFlushParam(
388 EventuallyPersistentEngine *e,
393 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
395 // Handle the actual mutation.
398 if (strcmp(keyz, "bg_fetch_delay") == 0) {
400 e->getConfiguration().setBgFetchDelay(v);
401 } else if (strcmp(keyz, "flushall_enabled") == 0) {
402 if (strcmp(valz, "true") == 0) {
403 e->getConfiguration().setFlushallEnabled(true);
404 } else if(strcmp(valz, "false") == 0) {
405 e->getConfiguration().setFlushallEnabled(false);
407 throw std::runtime_error("value out of range.");
409 } else if (strcmp(keyz, "max_size") == 0) {
412 uint64_t vsize = strtoull(valz, &ptr, 10);
413 validate(vsize, static_cast<uint64_t>(0),
414 std::numeric_limits<uint64_t>::max());
415 e->getConfiguration().setMaxSize(vsize);
416 e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
417 e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
418 } else if (strcmp(keyz, "mem_low_wat") == 0) {
421 uint64_t vsize = strtoull(valz, &ptr, 10);
422 validate(vsize, static_cast<uint64_t>(0),
423 std::numeric_limits<uint64_t>::max());
424 e->getConfiguration().setMemLowWat(vsize);
425 } else if (strcmp(keyz, "mem_high_wat") == 0) {
428 uint64_t vsize = strtoull(valz, &ptr, 10);
429 validate(vsize, static_cast<uint64_t>(0),
430 std::numeric_limits<uint64_t>::max());
431 e->getConfiguration().setMemHighWat(vsize);
432 } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
435 e->getConfiguration().setBackfillMemThreshold(v);
436 } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
439 e->getConfiguration().setCompactionExpMemThreshold(v);
440 } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
443 e->getConfiguration().setMutationMemThreshold(v);
444 } else if (strcmp(keyz, "timing_log") == 0) {
445 EPStats &stats = e->getEpStats();
446 std::ostream *old = stats.timingLog;
447 stats.timingLog = NULL;
449 if (strcmp(valz, "off") == 0) {
450 LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
452 std::ofstream *tmp(new std::ofstream(valz));
454 LOG(EXTENSION_LOG_INFO,
455 "Logging detailed timings to ``%s''.", valz);
456 stats.timingLog = tmp;
458 LOG(EXTENSION_LOG_WARNING,
459 "Error setting detailed timing log to ``%s'': %s",
460 valz, strerror(errno));
464 } else if (strcmp(keyz, "exp_pager_stime") == 0) {
467 uint64_t vsize = strtoull(valz, &ptr, 10);
468 validate(vsize, static_cast<uint64_t>(0),
469 std::numeric_limits<uint64_t>::max());
470 e->getConfiguration().setExpPagerStime((size_t)vsize);
471 } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
472 if (strcmp(valz, "true") == 0) {
473 e->getConfiguration().setAccessScannerEnabled(true);
474 } else if (strcmp(valz, "false") == 0) {
475 e->getConfiguration().setAccessScannerEnabled(false);
477 throw std::runtime_error("Value expected: true/false.");
479 } else if (strcmp(keyz, "alog_sleep_time") == 0) {
481 e->getConfiguration().setAlogSleepTime(v);
482 } else if (strcmp(keyz, "alog_task_time") == 0) {
484 e->getConfiguration().setAlogTaskTime(v);
485 } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
487 e->getConfiguration().setPagerActiveVbPcnt(v);
488 } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
490 validate(v, 0, std::numeric_limits<int>::max());
491 e->getConfiguration().setWarmupMinMemoryThreshold(v);
492 } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
494 validate(v, 0, std::numeric_limits<int>::max());
495 e->getConfiguration().setWarmupMinItemsThreshold(v);
496 } else if (strcmp(keyz, "max_num_readers") == 0) {
498 validate(v, 0, std::numeric_limits<int>::max());
499 e->getConfiguration().setMaxNumReaders(v);
500 ExecutorPool::get()->setMaxReaders(v);
501 } else if (strcmp(keyz, "max_num_writers") == 0) {
503 validate(v, 0, std::numeric_limits<int>::max());
504 e->getConfiguration().setMaxNumWriters(v);
505 ExecutorPool::get()->setMaxWriters(v);
506 } else if (strcmp(keyz, "max_num_auxio") == 0) {
508 validate(v, 0, std::numeric_limits<int>::max());
509 e->getConfiguration().setMaxNumAuxio(v);
510 ExecutorPool::get()->setMaxAuxIO(v);
511 } else if (strcmp(keyz, "max_num_nonio") == 0) {
513 validate(v, 0, std::numeric_limits<int>::max());
514 e->getConfiguration().setMaxNumNonio(v);
515 ExecutorPool::get()->setMaxNonIO(v);
516 } else if (strcmp(keyz, "bfilter_enabled") == 0) {
517 if (strcmp(valz, "true") == 0) {
518 e->getConfiguration().setBfilterEnabled(true);
519 } else if (strcmp(valz, "false") == 0) {
520 e->getConfiguration().setBfilterEnabled(false);
522 throw std::runtime_error("Value expected: true/false.");
524 } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
525 float val = atof(valz);
526 if (val >= 0.0 && val <= 1.0) {
527 e->getConfiguration().setBfilterResidencyThreshold(val);
529 throw std::runtime_error("Value out of range [0.0-1.0].");
531 } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
532 if (strcmp(valz, "true") == 0) {
533 e->getConfiguration().setDefragmenterEnabled(true);
535 e->getConfiguration().setDefragmenterEnabled(false);
537 } else if (strcmp(keyz, "defragmenter_interval") == 0) {
539 validate(v, 1, std::numeric_limits<int>::max());
540 e->getConfiguration().setDefragmenterInterval(v);
541 } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
543 validate(v, 0, std::numeric_limits<int>::max());
544 e->getConfiguration().setDefragmenterAgeThreshold(v);
545 } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
547 validate(v, 1, std::numeric_limits<int>::max());
548 e->getConfiguration().setDefragmenterChunkDuration(v);
549 } else if (strcmp(keyz, "defragmenter_run") == 0) {
550 e->runDefragmenterTask();
551 } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
553 validate(v, 1, std::numeric_limits<int>::max());
554 e->getConfiguration().setCompactionWriteQueueCap(v);
556 *msg = "Unknown config param";
557 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
559 } catch(std::runtime_error& ex) {
560 *msg = "Value out of range.";
561 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
567 static protocol_binary_response_status setDcpParam(
568 EventuallyPersistentEngine *e,
573 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
576 if (strcmp(keyz, "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
577 size_t v = atoi(valz);
579 validate(v, size_t(1), std::numeric_limits<size_t>::max());
580 e->getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(v);
581 } else if (strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") == 0) {
582 size_t v = atoi(valz);
584 validate(v, size_t(1), std::numeric_limits<size_t>::max());
585 e->getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(v);
587 *msg = "Unknown config param";
588 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
590 } catch (std::runtime_error& ex) {
591 *msg = "Value out of range.";
592 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
598 static protocol_binary_response_status evictKey(
599 EventuallyPersistentEngine *e,
600 protocol_binary_request_header
604 protocol_binary_request_no_extras *req =
605 (protocol_binary_request_no_extras*)request;
610 int keylen = ntohs(req->message.header.request.keylen);
611 if (keylen >= (int)sizeof(keyz)) {
612 *msg = "Key is too large.";
613 return PROTOCOL_BINARY_RESPONSE_EINVAL;
615 memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
618 uint16_t vbucket = ntohs(request->request.vbucket);
620 std::string key(keyz, keylen);
622 LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
625 protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
627 if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
628 rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
629 if (e->isDegradedMode()) {
630 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
636 static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
637 protocol_binary_request_header *req,
642 protocol_binary_response_status *res) {
644 uint8_t extlen = req->request.extlen;
645 if (extlen != 0 && extlen != 4) {
646 *msg = "Invalid packet format (extlen may be 0 or 4)";
647 *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
648 return ENGINE_EINVAL;
651 protocol_binary_request_getl *grequest =
652 (protocol_binary_request_getl*)req;
653 *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
655 const char *keyp = reinterpret_cast<const char*>(req->bytes);
656 keyp += sizeof(req->bytes) + extlen;
657 std::string key(keyp, ntohs(req->request.keylen));
658 uint16_t vbucket = ntohs(req->request.vbucket);
660 RememberingCallback<GetValue> getCb;
661 uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
662 uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
663 uint32_t lockTimeout = default_timeout;
665 lockTimeout = ntohl(grequest->message.body.expiration);
668 if (lockTimeout > max_timeout || lockTimeout < 1) {
669 LOG(EXTENSION_LOG_WARNING,
670 "Illegal value for lock timeout specified"
671 " %u. Using default value: %u", lockTimeout, default_timeout);
672 lockTimeout = default_timeout;
675 bool gotLock = e->getLocked(key, vbucket, getCb,
677 lockTimeout, cookie);
679 getCb.waitForValue();
680 ENGINE_ERROR_CODE rv = getCb.val.getStatus();
682 if (rv == ENGINE_SUCCESS) {
683 *itm = getCb.val.getValue();
684 ++(e->getEpStats().numOpsGet);
685 } else if (rv == ENGINE_EWOULDBLOCK) {
687 // need to wait for value
689 } else if (rv == ENGINE_NOT_MY_VBUCKET) {
690 *msg = "That's not my bucket.";
691 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
692 return ENGINE_NOT_MY_VBUCKET;
693 } else if (!gotLock){
695 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
696 return ENGINE_TMPFAIL;
698 if (e->isDegradedMode()) {
699 *msg = "LOCK_TMP_ERROR";
700 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
701 return ENGINE_TMPFAIL;
705 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
706 return ENGINE_KEY_ENOENT;
712 static protocol_binary_response_status unlockKey(
713 EventuallyPersistentEngine *e,
714 protocol_binary_request_header
719 protocol_binary_request_no_extras *req =
720 (protocol_binary_request_no_extras*)request;
722 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
726 int keylen = ntohs(req->message.header.request.keylen);
727 if (keylen >= (int)sizeof(keyz)) {
728 *msg = "Key is too large.";
729 return PROTOCOL_BINARY_RESPONSE_EINVAL;
732 memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
735 uint16_t vbucket = ntohs(request->request.vbucket);
736 std::string key(keyz, keylen);
738 LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
740 RememberingCallback<GetValue> getCb;
741 uint64_t cas = ntohll(request->request.cas);
743 ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
746 if (rv == ENGINE_SUCCESS) {
748 } else if (rv == ENGINE_TMPFAIL){
749 *msg = "UNLOCK_ERROR";
750 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
752 if (e->isDegradedMode()) {
753 *msg = "LOCK_TMP_ERROR";
754 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
757 RCPtr<VBucket> vb = e->getVBucket(vbucket);
759 *msg = "That's not my bucket.";
760 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
763 res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
769 static protocol_binary_response_status setParam(
770 EventuallyPersistentEngine *e,
771 protocol_binary_request_set_param
776 size_t keylen = ntohs(req->message.header.request.keylen);
777 uint8_t extlen = req->message.header.request.extlen;
778 size_t vallen = ntohl(req->message.header.request.bodylen);
779 protocol_binary_engine_param_t paramtype =
780 static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
782 if (keylen == 0 || (vallen - keylen - extlen) == 0) {
783 return PROTOCOL_BINARY_RESPONSE_EINVAL;
786 const char *keyp = reinterpret_cast<const char*>(req->bytes)
787 + sizeof(req->bytes);
788 const char *valuep = keyp + keylen;
789 vallen -= (keylen + extlen);
795 if (keylen >= sizeof(keyz)) {
796 *msg = "Key is too large.";
797 return PROTOCOL_BINARY_RESPONSE_EINVAL;
799 memcpy(keyz, keyp, keylen);
803 if (vallen >= sizeof(valz)) {
804 *msg = "Value is too large.";
805 return PROTOCOL_BINARY_RESPONSE_EINVAL;
807 memcpy(valz, valuep, vallen);
810 protocol_binary_response_status rv;
813 case protocol_binary_engine_param_flush:
814 rv = setFlushParam(e, keyz, valz, msg, msg_size);
816 case protocol_binary_engine_param_tap:
817 rv = setTapParam(e, keyz, valz, msg, msg_size);
819 case protocol_binary_engine_param_checkpoint:
820 rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
822 case protocol_binary_engine_param_dcp:
823 rv = setDcpParam(e, keyz, valz, msg, msg_size);
826 rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
832 static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
834 protocol_binary_request_header *request,
835 ADD_RESPONSE response) {
836 protocol_binary_request_get_vbucket *req =
837 reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
840 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
841 RCPtr<VBucket> vb = e->getVBucket(vbucket);
843 LockHolder lh(e->clusterConfig.lock);
844 return sendResponse(response, NULL, 0, NULL, 0,
845 e->clusterConfig.config,
846 e->clusterConfig.len,
847 PROTOCOL_BINARY_RAW_BYTES,
848 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
851 vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
852 return sendResponse(response, NULL, 0, NULL, 0, &state,
854 PROTOCOL_BINARY_RAW_BYTES,
855 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
859 static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
861 protocol_binary_request_header *request,
862 ADD_RESPONSE response) {
864 protocol_binary_request_set_vbucket *req =
865 reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
867 uint64_t cas = ntohll(req->message.header.request.cas);
869 size_t bodylen = ntohl(req->message.header.request.bodylen)
870 - ntohs(req->message.header.request.keylen);
871 if (bodylen != sizeof(vbucket_state_t)) {
872 const std::string msg("Incorrect packet format");
873 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
874 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
875 PROTOCOL_BINARY_RESPONSE_EINVAL,
879 vbucket_state_t state;
880 memcpy(&state, &req->message.body.state, sizeof(state));
881 state = static_cast<vbucket_state_t>(ntohl(state));
883 if (!is_valid_vbucket_state_t(state)) {
884 const std::string msg("Invalid vbucket state");
885 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
886 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
887 PROTOCOL_BINARY_RESPONSE_EINVAL,
891 uint16_t vb = ntohs(req->message.header.request.vbucket);
892 if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
893 const std::string msg("VBucket number too big");
894 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
895 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
896 PROTOCOL_BINARY_RESPONSE_ERANGE,
899 return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
900 PROTOCOL_BINARY_RAW_BYTES,
901 PROTOCOL_BINARY_RESPONSE_SUCCESS,
905 static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
907 protocol_binary_request_header *req,
908 ADD_RESPONSE response) {
910 uint64_t cas = ntohll(req->request.cas);
912 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
913 uint16_t vbucket = ntohs(req->request.vbucket);
915 std::string msg = "";
916 if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
917 msg = "Incorrect packet format.";
918 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
920 PROTOCOL_BINARY_RAW_BYTES,
921 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
925 uint32_t bodylen = ntohl(req->request.bodylen);
927 const char* ptr = reinterpret_cast<const char*>(req->bytes) +
929 if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
934 ENGINE_ERROR_CODE err;
935 void* es = e->getEngineSpecific(cookie);
938 err = e->deleteVBucket(vbucket, cookie);
939 e->storeEngineSpecific(cookie, e);
941 e->storeEngineSpecific(cookie, NULL);
942 LOG(EXTENSION_LOG_INFO,
943 "Completed sync deletion of vbucket %u",
945 err = ENGINE_SUCCESS;
948 err = e->deleteVBucket(vbucket);
952 LOG(EXTENSION_LOG_WARNING,
953 "Deletion of vbucket %d was completed.", vbucket);
955 case ENGINE_NOT_MY_VBUCKET:
956 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
957 "because the vbucket doesn't exist!!!", vbucket);
958 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
961 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
962 "because the vbucket is not in a dead state\n", vbucket);
963 msg = "Failed to delete vbucket. Must be in the dead state.";
964 res = PROTOCOL_BINARY_RESPONSE_EINVAL;
966 case ENGINE_EWOULDBLOCK:
967 LOG(EXTENSION_LOG_WARNING, "Request for vbucket %d deletion is in"
968 " EWOULDBLOCK until the database file is removed from disk",
970 e->storeEngineSpecific(cookie, req);
971 return ENGINE_EWOULDBLOCK;
973 LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
974 "because of unknown reasons\n", vbucket);
975 msg = "Failed to delete vbucket. Unknown reason.";
976 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
979 if (err != ENGINE_NOT_MY_VBUCKET) {
980 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
981 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
984 LockHolder lh(e->clusterConfig.lock);
985 return sendResponse(response, NULL, 0, NULL, 0,
986 e->clusterConfig.config,
987 e->clusterConfig.len,
988 PROTOCOL_BINARY_RAW_BYTES,
994 static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
995 protocol_binary_request_header *request,
999 protocol_binary_response_status *res) {
1000 EventuallyPersistentStore *eps = e->getEpStore();
1001 protocol_binary_request_no_extras *req =
1002 (protocol_binary_request_no_extras*)request;
1003 int keylen = ntohs(req->message.header.request.keylen);
1004 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1005 ENGINE_ERROR_CODE error_code;
1006 std::string keystr(((char *)request) + sizeof(req->message.header),
1009 GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
1011 if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
1012 if (error_code == ENGINE_NOT_MY_VBUCKET) {
1013 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1015 } else if (error_code == ENGINE_TMPFAIL) {
1017 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1022 *it = rv.getValue();
1023 *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1025 ++(e->getEpStats().numOpsGet);
1026 return ENGINE_SUCCESS;
1029 static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
1031 protocol_binary_request_compact_db *req,
1032 ADD_RESPONSE response) {
1034 std::string msg = "";
1035 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1036 compaction_ctx compactreq;
1037 uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1038 uint64_t cas = ntohll(req->message.header.request.cas);
1040 if (ntohs(req->message.header.request.keylen) > 0 ||
1041 req->message.header.request.extlen != 24) {
1042 LOG(EXTENSION_LOG_WARNING,
1043 "Compaction of vbucket %d received bad ext/key len %d/%d.",
1044 vbucket, req->message.header.request.extlen,
1045 ntohs(req->message.header.request.keylen));
1046 msg = "Incorrect packet format.";
1047 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1049 PROTOCOL_BINARY_RAW_BYTES,
1050 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
1052 EPStats &stats = e->getEpStats();
1053 compactreq.max_purged_seq = 0;
1054 compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
1055 compactreq.purge_before_seq =
1056 ntohll(req->message.body.purge_before_seq);
1057 compactreq.drop_deletes = req->message.body.drop_deletes;
1058 compactreq.bfcb = NULL;
1060 ENGINE_ERROR_CODE err;
1061 void* es = e->getEngineSpecific(cookie);
1063 ++stats.pendingCompactions;
1064 e->storeEngineSpecific(cookie, e);
1065 err = e->compactDB(vbucket, compactreq, cookie);
1067 e->storeEngineSpecific(cookie, NULL);
1068 err = ENGINE_SUCCESS;
1072 case ENGINE_SUCCESS:
1073 LOG(EXTENSION_LOG_INFO,
1074 "Compaction of vbucket %d completed.", vbucket);
1076 case ENGINE_NOT_MY_VBUCKET:
1077 --stats.pendingCompactions;
1078 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1079 "because the vbucket doesn't exist!!!", vbucket);
1080 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1082 case ENGINE_EWOULDBLOCK:
1083 LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
1084 "in EWOULDBLOCK state until the database file is "
1085 "compacted on disk",
1087 e->storeEngineSpecific(cookie, req);
1088 return ENGINE_EWOULDBLOCK;
1089 case ENGINE_TMPFAIL:
1090 LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
1091 " a temporary failure and may need to be retried",
1093 msg = "Temporary failure in compacting vbucket.";
1094 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1097 --stats.pendingCompactions;
1098 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1099 "because of unknown reasons\n", vbucket);
1100 msg = "Failed to compact vbucket. Unknown reason.";
1101 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1105 if (err != ENGINE_NOT_MY_VBUCKET) {
1106 return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1107 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1110 LockHolder lh(e->clusterConfig.lock);
1111 return sendResponse(response, NULL, 0, NULL, 0,
1112 e->clusterConfig.config,
1113 e->clusterConfig.len,
1114 PROTOCOL_BINARY_RAW_BYTES,
1119 static ENGINE_ERROR_CODE processUnknownCommand(
1120 EventuallyPersistentEngine *h,
1122 protocol_binary_request_header *request,
1123 ADD_RESPONSE response)
1125 protocol_binary_response_status res =
1126 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1127 const char *msg = NULL;
1128 size_t msg_size = 0;
1131 EPStats &stats = h->getEpStats();
1132 ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1135 * Session validation
1136 * (For ns_server commands only)
1138 switch (request->request.opcode) {
1139 case PROTOCOL_BINARY_CMD_SET_PARAM:
1140 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1141 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1142 case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1143 case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1144 case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1145 case PROTOCOL_BINARY_CMD_COMPACT_DB:
1147 if (h->getEngineSpecific(cookie) == NULL) {
1148 uint64_t cas = ntohll(request->request.cas);
1149 if (!h->validateSessionCas(cas)) {
1150 const std::string message("Invalid session token");
1151 return sendResponse(response, NULL, 0, NULL, 0,
1152 message.c_str(), message.length(),
1153 PROTOCOL_BINARY_RAW_BYTES,
1154 PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1164 switch (request->request.opcode) {
1165 case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1166 return h->getAllVBucketSequenceNumbers(cookie, request, response);
1168 case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1170 BlockTimer timer(&stats.getVbucketCmdHisto);
1171 rv = getVBucket(h, cookie, request, response);
1174 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1176 BlockTimer timer(&stats.delVbucketCmdHisto);
1177 rv = delVBucket(h, cookie, request, response);
1178 if (rv != ENGINE_EWOULDBLOCK) {
1179 h->decrementSessionCtr();
1180 h->storeEngineSpecific(cookie, NULL);
1184 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1186 BlockTimer timer(&stats.setVbucketCmdHisto);
1187 rv = setVBucket(h, cookie, request, response);
1188 h->decrementSessionCtr();
1191 case PROTOCOL_BINARY_CMD_TOUCH:
1192 case PROTOCOL_BINARY_CMD_GAT:
1193 case PROTOCOL_BINARY_CMD_GATQ:
1195 rv = h->touch(cookie, request, response);
1198 case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1199 res = stopFlusher(h, &msg, &msg_size);
1201 case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1202 res = startFlusher(h, &msg, &msg_size);
1204 case PROTOCOL_BINARY_CMD_SET_PARAM:
1206 reinterpret_cast<protocol_binary_request_set_param*>(request),
1208 h->decrementSessionCtr();
1210 case PROTOCOL_BINARY_CMD_EVICT_KEY:
1211 res = evictKey(h, request, &msg, &msg_size);
1213 case PROTOCOL_BINARY_CMD_GET_LOCKED:
1214 rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1215 if (rv == ENGINE_EWOULDBLOCK) {
1216 // we dont have the value for the item yet
1220 case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
1221 res = unlockKey(h, request, &msg, &msg_size);
1223 case PROTOCOL_BINARY_CMD_OBSERVE:
1224 return h->observe(cookie, request, response);
1225 case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1226 return h->observe_seqno(cookie, request, response);
1227 case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1229 rv = h->deregisterTapClient(cookie, request, response);
1230 h->decrementSessionCtr();
1233 case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN:
1235 rv = h->resetReplicationChain(cookie, request, response);
1238 case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1240 rv = h->changeTapVBFilter(cookie, request, response);
1241 h->decrementSessionCtr();
1244 case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1245 case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1246 case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE:
1248 rv = h->handleCheckpointCmds(cookie, request, response);
1251 case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE:
1253 rv = h->handleSeqnoCmds(cookie, request, response);
1256 case PROTOCOL_BINARY_CMD_GET_META:
1257 case PROTOCOL_BINARY_CMD_GETQ_META:
1259 rv = h->getMeta(cookie,
1260 reinterpret_cast<protocol_binary_request_get_meta*>
1261 (request), response);
1264 case PROTOCOL_BINARY_CMD_SET_WITH_META:
1265 case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1266 case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1267 case PROTOCOL_BINARY_CMD_ADDQ_WITH_META:
1269 rv = h->setWithMeta(cookie,
1270 reinterpret_cast<protocol_binary_request_set_with_meta*>
1271 (request), response);
1274 case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1275 case PROTOCOL_BINARY_CMD_DELQ_WITH_META:
1277 rv = h->deleteWithMeta(cookie,
1278 reinterpret_cast<protocol_binary_request_delete_with_meta*>
1279 (request), response);
1282 case PROTOCOL_BINARY_CMD_RETURN_META:
1284 return h->returnMeta(cookie,
1285 reinterpret_cast<protocol_binary_request_return_meta*>
1286 (request), response);
1288 case PROTOCOL_BINARY_CMD_GET_REPLICA:
1289 rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1290 if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1294 case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1295 case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC:
1297 rv = h->handleTrafficControlCmd(cookie, request, response);
1300 case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1302 rv = h->setClusterConfig(cookie,
1303 reinterpret_cast<protocol_binary_request_set_cluster_config*>
1304 (request), response);
1305 h->decrementSessionCtr();
1308 case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1309 return h->getClusterConfig(cookie,
1310 reinterpret_cast<protocol_binary_request_get_cluster_config*>
1311 (request), response);
1312 case PROTOCOL_BINARY_CMD_COMPACT_DB:
1314 rv = compactDB(h, cookie,
1315 (protocol_binary_request_compact_db*)(request),
1317 if (rv != ENGINE_EWOULDBLOCK) {
1318 h->decrementSessionCtr();
1319 h->storeEngineSpecific(cookie, NULL);
1323 case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
1325 if (request->request.extlen != 0 ||
1326 request->request.keylen != 0 ||
1327 request->request.bodylen != 0) {
1328 return ENGINE_EINVAL;
1330 return h->getRandomKey(cookie, response);
1334 return h->getAllKeys(cookie,
1335 reinterpret_cast<protocol_binary_request_get_keys*>
1336 (request), response);
1338 case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1340 return h->getAdjustedTime(cookie,
1341 reinterpret_cast<protocol_binary_request_get_adjusted_time*>
1342 (request), response);
1344 case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE:
1346 return h->setDriftCounterState(cookie,
1347 reinterpret_cast<protocol_binary_request_set_drift_counter_state*>
1348 (request), response);
1352 // Send a special response for getl since we don't want to send the key
1353 if (itm && request->request.opcode == PROTOCOL_BINARY_CMD_GET_LOCKED) {
1354 uint32_t flags = itm->getFlags();
1355 rv = sendResponse(response, NULL, 0, (const void *)&flags,
1357 static_cast<const void *>(itm->getData()),
1358 itm->getNBytes(), itm->getDataType(),
1359 static_cast<uint16_t>(res), itm->getCas(),
1363 const std::string &key = itm->getKey();
1364 uint32_t flags = itm->getFlags();
1365 rv = sendResponse(response, static_cast<const void *>(key.data()),
1367 (const void *)&flags, sizeof(uint32_t),
1368 static_cast<const void *>(itm->getData()),
1369 itm->getNBytes(), itm->getDataType(),
1370 static_cast<uint16_t>(res), itm->getCas(),
1373 } else if (rv == ENGINE_NOT_MY_VBUCKET) {
1374 LockHolder lh(h->clusterConfig.lock);
1375 return sendResponse(response, NULL, 0, NULL, 0,
1376 h->clusterConfig.config,
1377 h->clusterConfig.len,
1378 PROTOCOL_BINARY_RAW_BYTES,
1379 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1382 msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1383 rv = sendResponse(response, NULL, 0, NULL, 0,
1384 msg, static_cast<uint16_t>(msg_size),
1385 PROTOCOL_BINARY_RAW_BYTES,
1386 static_cast<uint16_t>(res), 0, cookie);
1392 static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1394 protocol_binary_request_header
1396 ADD_RESPONSE response)
1398 ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1401 releaseHandle(handle);
1405 static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1406 item *itm, uint64_t cas) {
1407 static_cast<Item*>(itm)->setCas(cas);
1410 static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1412 void *engine_specific,
1416 tap_event_t tap_event,
1428 if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1429 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1431 return ENGINE_EINVAL;
1433 ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1437 (uint16_t)tap_event,
1443 releaseHandle(handle);
1447 static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1448 const void *cookie, item **itm,
1449 void **es, uint16_t *nes, uint8_t *ttl,
1450 uint16_t *flags, uint32_t *seqno,
1451 uint16_t *vbucket) {
1452 uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1456 releaseHandle(handle);
1457 return static_cast<tap_event_t>(tap_event);
1460 static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1465 const void* userdata,
1468 EventuallyPersistentEngine *h = getHandle(handle);
1469 TAP_ITERATOR iterator = NULL;
1471 std::string c(static_cast<const char*>(client), nclient);
1472 // Figure out what we want from the userdata before adding it to
1473 // the API to the handle
1474 if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1475 iterator = EvpTapIterator;
1478 releaseHandle(handle);
1483 static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1485 struct dcp_message_producers *producers)
1487 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1488 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1490 errCode = conn->step(producers);
1492 releaseHandle(handle);
1497 static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1505 ENGINE_ERROR_CODE errCode;
1506 errCode = getHandle(handle)->dcpOpen(cookie, opaque, seqno, flags,
1508 releaseHandle(handle);
1512 static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1518 ENGINE_ERROR_CODE errCode = getHandle(handle)->dcpAddStream(cookie,
1522 releaseHandle(handle);
1526 static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1531 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1532 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1534 errCode = conn->closeStream(opaque, vbucket);
1536 releaseHandle(handle);
1541 static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1546 uint64_t startSeqno,
1548 uint64_t vbucketUuid,
1549 uint64_t snapStartSeqno,
1550 uint64_t snapEndSeqno,
1551 uint64_t *rollbackSeqno,
1552 dcp_add_failover_log callback)
1554 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1555 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1557 errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1558 endSeqno, vbucketUuid, snapStartSeqno,
1559 snapEndSeqno, rollbackSeqno, callback);
1561 releaseHandle(handle);
1565 static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1569 dcp_add_failover_log callback)
1571 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1572 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1574 errCode = conn->getFailoverLog(opaque, vbucket, callback);
1576 releaseHandle(handle);
1581 static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1587 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1588 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1590 errCode = conn->streamEnd(opaque, vbucket, flags);
1592 releaseHandle(handle);
1597 static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1601 uint64_t start_seqno,
1605 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1606 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1608 errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1611 releaseHandle(handle);
1615 static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1628 uint32_t expiration,
1634 if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1635 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1637 return ENGINE_EINVAL;
1639 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1640 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1642 errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1643 vbucket, flags, datatype, lockTime,
1644 bySeqno, revSeqno, expiration,
1647 releaseHandle(handle);
1651 static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1663 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1664 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1666 errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1667 revSeqno, meta, nmeta);
1669 releaseHandle(handle);
1673 static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1685 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1686 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1688 errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1689 revSeqno, meta, nmeta);
1691 releaseHandle(handle);
1695 static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1700 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1701 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1703 errCode = conn->flushall(opaque, vbucket);
1705 releaseHandle(handle);
1709 static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1713 vbucket_state_t state)
1715 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1716 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1718 errCode = conn->setVBucketState(opaque, vbucket, state);
1720 releaseHandle(handle);
1724 static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1727 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1728 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1730 errCode = conn->noop(opaque);
1732 releaseHandle(handle);
1736 static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1740 uint32_t buffer_bytes) {
1741 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1742 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1744 errCode = conn->bufferAcknowledgement(opaque, vbucket,
1747 releaseHandle(handle);
1751 static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1758 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1759 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1761 errCode = conn->control(opaque, key, nkey, value, nvalue);
1763 releaseHandle(handle);
1767 static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1769 protocol_binary_response_header *response)
1771 ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1772 ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1774 errCode = conn->handleResponse(response);
1776 releaseHandle(handle);
1780 static void EvpHandleDisconnect(const void *cookie,
1781 ENGINE_EVENT_TYPE type,
1782 const void *event_data,
1783 const void *cb_data)
1785 cb_assert(type == ON_DISCONNECT);
1786 cb_assert(event_data == NULL);
1787 void *c = const_cast<void*>(cb_data);
1788 getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1789 releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1794 * The only public interface to the eventually persistance engine.
1795 * Allocate a new instance and initialize it
1796 * @param interface the highest interface the server supports (we only
1797 * support interface 1)
1798 * @param get_server_api callback function to get the server exported API
1800 * @param handle Where to return the new instance
1801 * @return ENGINE_SUCCESS on success
1803 ENGINE_ERROR_CODE create_instance(uint64_t interface,
1804 GET_SERVER_API get_server_api,
1805 ENGINE_HANDLE **handle)
1807 SERVER_HANDLE_V1 *api = get_server_api();
1808 if (interface != 1 || api == NULL) {
1809 return ENGINE_ENOTSUP;
1812 hooksApi = api->alloc_hooks;
1813 loggerApi = api->log;
1814 MemoryTracker::getInstance();
1815 ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1817 AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1819 ObjectRegistry::setStats(inital_tracking);
1820 EventuallyPersistentEngine *engine;
1821 engine = new EventuallyPersistentEngine(get_server_api);
1822 ObjectRegistry::setStats(NULL);
1824 if (engine == NULL) {
1825 return ENGINE_ENOMEM;
1828 if (MemoryTracker::trackingMemoryAllocations()) {
1829 engine->getEpStats().memoryTrackerEnabled.store(true);
1830 engine->getEpStats().totalMemory.store(inital_tracking->load());
1832 delete inital_tracking;
1834 ep_current_time = api->core->get_current_time;
1835 ep_abs_time = api->core->abstime;
1836 ep_reltime = api->core->realtime;
1838 *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1840 return ENGINE_SUCCESS;
1843 static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
1844 const item* itm, item_info *itm_info)
1846 const Item *it = reinterpret_cast<const Item*>(itm);
1847 EventuallyPersistentEngine *engine = getHandle(handle);
1848 if (itm_info->nvalue < 1) {
1851 itm_info->cas = it->getCas();
1854 RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(it->getVBucketId());
1857 itm_info->vbucket_uuid = vb->failovers->getLatestUUID();
1859 itm_info->vbucket_uuid = 0;
1862 releaseHandle(handle);
1864 itm_info->vbucket_uuid = 0;
1867 itm_info->seqno = it->getBySeqno();
1868 itm_info->exptime = it->getExptime();
1869 itm_info->nbytes = it->getNBytes();
1870 itm_info->datatype = it->getDataType();
1871 itm_info->flags = it->getFlags();
1872 itm_info->clsid = 0;
1873 itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1874 itm_info->nvalue = 1;
1875 itm_info->key = it->getKey().c_str();
1876 itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1877 itm_info->value[0].iov_len = it->getNBytes();
1881 static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1882 item* itm, const item_info *itm_info)
1884 Item *it = reinterpret_cast<Item*>(itm);
1888 it->setDataType(itm_info->datatype);
1892 static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1894 engine_get_vb_map_cb callback)
1896 EventuallyPersistentEngine *h = getHandle(handle);
1897 LockHolder lh(h->clusterConfig.lock);
1898 uint8_t *config = h->clusterConfig.config;
1899 uint32_t len = h->clusterConfig.len;
1900 releaseHandle(handle);
1901 return callback(cookie, config, len);
1906 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1909 if (loggerApi != NULL) {
1910 EXTENSION_LOGGER_DESCRIPTOR* logger =
1911 (EXTENSION_LOGGER_DESCRIPTOR*)loggerApi->get_logger();
1913 if (loggerApi->get_level() <= severity) {
1914 EventuallyPersistentEngine *engine = ObjectRegistry::onSwitchThread(NULL, true);
1917 vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1919 logger->log(severity, NULL, "(%s) %s", engine->getName(),
1922 logger->log(severity, NULL, "(No Engine) %s", buffer);
1925 ObjectRegistry::onSwitchThread(engine);
1930 ALLOCATOR_HOOKS_API *getHooksApi(void) {
1934 EventuallyPersistentEngine::EventuallyPersistentEngine(
1935 GET_SERVER_API get_server_api) :
1936 clusterConfig(), epstore(NULL), workload(NULL),
1937 workloadPriority(NO_BUCKET_PRIORITY),
1938 tapThrottle(NULL), getServerApiFunc(get_server_api),
1939 dcpConnMap_(NULL), tapConnMap(NULL) ,tapConfig(NULL), checkpointConfig(NULL),
1940 trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1942 interface.interface = 1;
1943 ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1944 ENGINE_HANDLE_V1::initialize = EvpInitialize;
1945 ENGINE_HANDLE_V1::destroy = EvpDestroy;
1946 ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1947 ENGINE_HANDLE_V1::remove = EvpItemDelete;
1948 ENGINE_HANDLE_V1::release = EvpItemRelease;
1949 ENGINE_HANDLE_V1::get = EvpGet;
1950 ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1951 ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1952 ENGINE_HANDLE_V1::store = EvpStore;
1953 ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1954 ENGINE_HANDLE_V1::flush = EvpFlush;
1955 ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1956 ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1957 ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1958 ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1959 ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1960 ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1961 ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1962 ENGINE_HANDLE_V1::get_stats_struct = NULL;
1963 ENGINE_HANDLE_V1::aggregate_stats = NULL;
1966 ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1967 ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1968 ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1969 ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1970 ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1971 ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1972 ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1973 ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1974 ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1975 ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1976 ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1977 ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1978 ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1979 ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1980 ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1981 ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1982 ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1984 serverApi = getServerApiFunc();
1985 memset(&info, 0, sizeof(info));
1986 info.info.description = "EP engine v" VERSION;
1987 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1988 info.info.features[info.info.num_features++].feature =
1989 ENGINE_FEATURE_PERSISTENT_STORAGE;
1990 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1991 info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1994 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1996 EventuallyPersistentEngine *epe =
1997 ObjectRegistry::onSwitchThread(NULL, true);
1998 ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1999 ObjectRegistry::onSwitchThread(epe);
2003 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
2005 EventuallyPersistentEngine *epe =
2006 ObjectRegistry::onSwitchThread(NULL, true);
2007 ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
2008 ObjectRegistry::onSwitchThread(epe);
2012 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
2014 const void *cb_data) {
2015 EventuallyPersistentEngine *epe =
2016 ObjectRegistry::onSwitchThread(NULL, true);
2017 SERVER_CALLBACK_API *sapi = getServerApi()->callback;
2018 sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
2020 ObjectRegistry::onSwitchThread(epe);
2024 * A configuration value changed listener that responds to ep-engine
2025 * parameter changes by invoking engine-specific methods on
2026 * configuration change events.
2028 class EpEngineValueChangeListener : public ValueChangedListener {
2030 EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
2034 virtual void sizeValueChanged(const std::string &key, size_t value) {
2035 if (key.compare("getl_max_timeout") == 0) {
2036 engine.setGetlMaxTimeout(value);
2037 } else if (key.compare("getl_default_timeout") == 0) {
2038 engine.setGetlDefaultTimeout(value);
2039 } else if (key.compare("max_item_size") == 0) {
2040 engine.setMaxItemSize(value);
2044 virtual void booleanValueChanged(const std::string &key, bool value) {
2045 if (key.compare("flushall_enabled") == 0) {
2046 engine.setFlushAll(value);
2050 EventuallyPersistentEngine &engine;
2055 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
2057 if (config != NULL) {
2058 if (!configuration.parseConfiguration(config, serverApi)) {
2059 return ENGINE_FAILED;
2063 name = configuration.getCouchBucket();
2064 maxFailoverEntries = configuration.getMaxFailoverEntries();
2066 // Start updating the variables from the config!
2067 HashTable::setDefaultNumBuckets(configuration.getHtSize());
2068 HashTable::setDefaultNumLocks(configuration.getHtLocks());
2069 StoredValue::setMutationMemoryThreshold(
2070 configuration.getMutationMemThreshold());
2072 if (configuration.getMaxSize() == 0) {
2073 configuration.setMaxSize(std::numeric_limits<size_t>::max());
2076 if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
2077 configuration.setMemLowWat(percentOf(
2078 configuration.getMaxSize(), 0.75));
2081 if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
2082 configuration.setMemHighWat(percentOf(
2083 configuration.getMaxSize(), 0.85));
2086 maxItemSize = configuration.getMaxItemSize();
2087 configuration.addValueChangedListener("max_item_size",
2088 new EpEngineValueChangeListener(*this));
2090 getlDefaultTimeout = configuration.getGetlDefaultTimeout();
2091 configuration.addValueChangedListener("getl_default_timeout",
2092 new EpEngineValueChangeListener(*this));
2093 getlMaxTimeout = configuration.getGetlMaxTimeout();
2094 configuration.addValueChangedListener("getl_max_timeout",
2095 new EpEngineValueChangeListener(*this));
2097 flushAllEnabled = configuration.isFlushallEnabled();
2098 configuration.addValueChangedListener("flushall_enabled",
2099 new EpEngineValueChangeListener(*this));
2101 workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2102 configuration.getMaxNumShards());
2103 if ((unsigned int)workload->getNumShards() >
2104 configuration.getMaxVbuckets()) {
2105 LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2106 "equal or less than max number of vbuckets");
2107 return ENGINE_FAILED;
2110 dcpConnMap_ = new DcpConnMap(*this);
2111 tapConnMap = new TapConnMap(*this);
2112 tapConfig = new TapConfig(*this);
2113 tapThrottle = new TapThrottle(configuration, stats);
2114 TapConfig::addConfigChangeListener(*this);
2116 checkpointConfig = new CheckpointConfig(*this);
2117 CheckpointConfig::addConfigChangeListener(*this);
2119 epstore = new EventuallyPersistentStore(*this);
2120 if (epstore == NULL) {
2121 return ENGINE_ENOMEM;
2124 // Register the callback
2125 registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2127 // Complete the initialization of the ep-store
2128 if (!epstore->initialize()) {
2129 return ENGINE_FAILED;
2132 if(configuration.isDataTrafficEnabled()) {
2133 enableTraffic(true);
2136 tapConnMap->initialize(TAP_CONN_NOTIFIER);
2137 dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2139 // record engine initialization time
2140 startupTime = ep_real_time();
2142 LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
2144 return ENGINE_SUCCESS;
2147 void EventuallyPersistentEngine::destroy(bool force) {
2148 stats.forceShutdown = force;
2149 stats.isShutdown = true;
2152 epstore->snapshotStats();
2155 tapConnMap->shutdownAllConnections();
2158 dcpConnMap_->shutdownAllConnections();
2162 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie,
2164 if (!flushAllEnabled) {
2165 return ENGINE_ENOTSUP;
2168 if (!isDegradedMode()) {
2169 return ENGINE_TMPFAIL;
2173 * Supporting only a SYNC operation for bucket flush
2176 void* es = getEngineSpecific(cookie);
2179 // Check if diskFlushAll was false and set it to true
2180 // if yes, if the atomic variable weren't false, then
2181 // we will assume that a flushAll has been scheduled
2182 // already and return TMPFAIL.
2183 if (epstore->scheduleFlushAllTask(cookie, when)) {
2184 storeEngineSpecific(cookie, this);
2185 return ENGINE_EWOULDBLOCK;
2187 LOG(EXTENSION_LOG_INFO, "Tried to trigger a bucket flush, but"
2188 "there seems to be a task running already!");
2189 return ENGINE_TMPFAIL;
2193 storeEngineSpecific(cookie, NULL);
2194 LOG(EXTENSION_LOG_WARNING, "Completed bucket flush operation");
2195 return ENGINE_SUCCESS;
2199 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2202 ENGINE_STORE_OPERATION
2205 BlockTimer timer(&stats.storeCmdHisto);
2206 ENGINE_ERROR_CODE ret;
2207 Item *it = static_cast<Item*>(itm);
2210 it->setVBucketId(vbucket);
2212 switch (operation) {
2214 if (it->getCas() == 0) {
2215 // Using a cas command with a cas wildcard doesn't make sense
2216 ret = ENGINE_NOT_STORED;
2221 if (isDegradedMode()) {
2222 return ENGINE_TMPFAIL;
2224 ret = epstore->set(*it, cookie);
2225 if (ret == ENGINE_SUCCESS) {
2226 *cas = it->getCas();
2232 if (isDegradedMode()) {
2233 return ENGINE_TMPFAIL;
2236 if (it->getCas() != 0) {
2237 // Adding an item with a cas value doesn't really make sense...
2238 return ENGINE_KEY_EEXISTS;
2241 ret = epstore->add(*it, cookie);
2242 if (ret == ENGINE_SUCCESS) {
2243 *cas = it->getCas();
2247 case OPERATION_REPLACE:
2248 ret = epstore->replace(*it, cookie);
2249 if (ret == ENGINE_SUCCESS) {
2250 *cas = it->getCas();
2254 case OPERATION_APPEND:
2255 case OPERATION_PREPEND:
2257 if ((ret = get(cookie, &i, it->getKey().c_str(),
2258 it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2259 Item *old = reinterpret_cast<Item*>(i);
2261 if (old->getCas() == (uint64_t) -1) {
2262 // item is locked against updates
2263 itemRelease(cookie, i);
2264 return ENGINE_TMPFAIL;
2267 if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2268 itemRelease(cookie, i);
2269 return ENGINE_KEY_EEXISTS;
2272 if (operation == OPERATION_APPEND) {
2273 ret = old->append(*it, maxItemSize);
2275 ret = old->prepend(*it, maxItemSize);
2278 if (ret != ENGINE_SUCCESS) {
2279 itemRelease(cookie, i);
2280 if (ret == ENGINE_E2BIG) {
2283 return memoryCondition();
2286 if (old->getDataType() == PROTOCOL_BINARY_DATATYPE_JSON) {
2287 // Set the datatype of the new document to BINARY (0),
2288 // as appending/prepending anything to JSON breaks the
2289 // json data structure.
2290 old->setDataType(PROTOCOL_BINARY_RAW_BYTES);
2291 } else if (old->getDataType() ==
2292 PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
2293 // Set the datatype of the new document to
2294 // COMPRESSED_BINARY, as appending/prepending anything
2295 // to JSON breaks the json data structure.
2296 old->setDataType(PROTOCOL_BINARY_DATATYPE_COMPRESSED);
2300 ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2302 it->setBySeqno(old->getBySeqno());
2303 itemRelease(cookie, i);
2305 } while (ret == ENGINE_KEY_EEXISTS);
2307 // Map the error code back to what memcapable expects
2308 if (ret == ENGINE_KEY_ENOENT) {
2309 ret = ENGINE_NOT_STORED;
2315 ret = ENGINE_ENOTSUP;
2319 case ENGINE_SUCCESS:
2320 ++stats.numOpsStore;
2323 ret = memoryCondition();
2325 case ENGINE_NOT_STORED:
2326 case ENGINE_NOT_MY_VBUCKET:
2327 if (isDegradedMode()) {
2328 return ENGINE_TMPFAIL;
2338 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2358 if (connection->shouldFlush()) {
2362 if (connection->isTimeForNoop()) {
2363 LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2364 connection->logHeader());
2368 if (connection->isSuspended() || connection->windowIsFull()) {
2369 LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2370 " suspended state or its ack windows is full.\n",
2371 connection->logHeader());
2375 uint16_t ret = TAP_PAUSE;
2376 VBucketEvent ev = connection->nextVBucketHighPriority();
2377 if (ev.event != TAP_PAUSE) {
2379 case TAP_VBUCKET_SET:
2380 LOG(EXTENSION_LOG_WARNING,
2381 "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2382 connection->logHeader(), ev.vbucket,
2383 VBucket::toString(ev.state));
2384 connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2387 LOG(EXTENSION_LOG_WARNING,
2388 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2389 connection->logHeader(),
2390 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2392 connection->opaqueCommandCode = (uint32_t) ev.state;
2393 *vbucket = ev.vbucket;
2394 *es = &connection->opaqueCommandCode;
2395 *nes = sizeof(connection->opaqueCommandCode);
2398 LOG(EXTENSION_LOG_WARNING,
2399 "%s Unknown VBucketEvent message type %d\n",
2400 connection->logHeader(), ev.event);
2406 if (connection->waitForOpaqueMsgAck()) {
2410 VBucketFilter backFillVBFilter;
2411 if (connection->runBackfill(backFillVBFilter)) {
2412 queueBackfill(backFillVBFilter, connection);
2415 uint8_t nru = INITIAL_NRU_VALUE;
2416 Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2418 case TAP_CHECKPOINT_START:
2419 case TAP_CHECKPOINT_END:
2423 if (ret == TAP_MUTATION) {
2424 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2425 it->getRevSeqno(), nru);
2426 *es = connection->specificData;
2427 } else if (ret == TAP_DELETION) {
2428 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2430 *es = connection->specificData;
2431 } else if (ret == TAP_CHECKPOINT_START) {
2432 // Send the current value of the max deleted seqno
2433 RCPtr<VBucket> vb = getVBucket(*vbucket);
2438 *nes = TapEngineSpecific::packSpecificData(ret, connection,
2439 vb->ht.getMaxDeletedRevSeqno());
2440 *es = connection->specificData;
2450 if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2451 VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2452 if (vbev.event == TAP_VBUCKET_SET) {
2453 LOG(EXTENSION_LOG_WARNING,
2454 "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2455 connection->logHeader(), vbev.vbucket,
2456 VBucket::toString(vbev.state));
2457 connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2465 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2472 uint16_t *vbucket) {
2473 TapProducer *connection = getTapProducer(cookie);
2475 LOG(EXTENSION_LOG_WARNING,
2476 "Failed to lookup TAP connection.. Disconnecting\n");
2477 return TAP_DISCONNECT;
2480 connection->setPaused(false);
2485 connection->setLastWalkTime();
2487 ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2488 seqno, vbucket, connection, retry);
2491 if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2492 connection->lastMsgTime = ep_current_time();
2493 if (ret == TAP_NOOP) {
2496 ++stats.numTapFetched;
2497 *seqno = connection->getSeqno();
2498 if (connection->requestAck(ret, *vbucket)) {
2499 *flags = TAP_FLAG_ACK;
2500 connection->seqnoAckRequested = *seqno;
2503 if (ret == TAP_MUTATION) {
2504 if (connection->haveFlagByteorderSupport()) {
2505 *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2510 connection->setPaused(true);
2511 connection->setNotifySent(false);
2517 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2518 std::string &client,
2520 const void *userdata,
2522 if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2526 std::string tapName = "eq_tapq:";
2527 if (client.length() == 0) {
2528 tapName.assign(ConnHandler::getAnonName());
2530 tapName.append(client);
2533 // Decoding the userdata section of the packet and update the filters
2534 const char *ptr = static_cast<const char*>(userdata);
2535 uint64_t backfillAge = 0;
2536 std::vector<uint16_t> vbuckets;
2537 std::map<uint16_t, uint64_t> lastCheckpointIds;
2539 if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2540 if (nuserdata < sizeof(backfillAge)) {
2541 LOG(EXTENSION_LOG_WARNING,
2542 "Backfill age is missing. Reject connection request from %s\n",
2546 // use memcpy to avoid alignemt issues
2547 memcpy(&backfillAge, ptr, sizeof(backfillAge));
2548 backfillAge = ntohll(backfillAge);
2549 nuserdata -= sizeof(backfillAge);
2550 ptr += sizeof(backfillAge);
2553 if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2555 if (nuserdata < sizeof(nvbuckets)) {
2556 LOG(EXTENSION_LOG_WARNING,
2557 "Number of vbuckets is missing. Reject connection request from %s"
2558 "\n", tapName.c_str());
2561 memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2562 nuserdata -= sizeof(nvbuckets);
2563 ptr += sizeof(nvbuckets);
2564 nvbuckets = ntohs(nvbuckets);
2565 if (nvbuckets > 0) {
2566 if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2567 LOG(EXTENSION_LOG_WARNING,
2568 "# of vbuckets not matched. Reject connection request from %s"
2569 "\n", tapName.c_str());
2572 for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2574 memcpy(&val, ptr, sizeof(nvbuckets));
2575 ptr += sizeof(uint16_t);
2576 vbuckets.push_back(ntohs(val));
2578 nuserdata -= (sizeof(uint16_t) * nvbuckets);
2582 if (flags & TAP_CONNECT_CHECKPOINT) {
2583 uint16_t nCheckpoints = 0;
2584 if (nuserdata >= sizeof(nCheckpoints)) {
2585 memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2586 nuserdata -= sizeof(nCheckpoints);
2587 ptr += sizeof(nCheckpoints);
2588 nCheckpoints = ntohs(nCheckpoints);
2590 if (nCheckpoints > 0) {
2592 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2593 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2594 "Reject connection request from %s\n", tapName.c_str());
2597 for (uint16_t j = 0; j < nCheckpoints; ++j) {
2599 uint64_t checkpointId;
2600 memcpy(&vbid, ptr, sizeof(vbid));
2601 ptr += sizeof(uint16_t);
2602 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2603 ptr += sizeof(uint64_t);
2604 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2607 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2611 TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2614 configuration.getTapKeepalive()),
2618 tapConnMap->notifyPausedConnection(tp, true);
2622 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2623 void *engine_specific,
2640 void *specific = getEngineSpecific(cookie);
2641 ConnHandler *connection = NULL;
2642 if (specific == NULL) {
2643 if (tap_event == TAP_ACK) {
2644 LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2645 "exist. Force disconnect...\n", (char *) cookie);
2646 // tap producer is no longer connected..
2647 return ENGINE_DISCONNECT;
2649 connection = tapConnMap->newConsumer(cookie);
2650 if (connection == NULL) {
2651 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2652 " Force disconnect\n");
2653 return ENGINE_DISCONNECT;
2655 storeEngineSpecific(cookie, connection);
2658 connection = reinterpret_cast<ConnHandler *>(specific);
2661 std::string k(static_cast<const char*>(key), nkey);
2662 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2664 if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2665 if (!tapThrottle->shouldProcess()) {
2666 ++stats.tapThrottled;
2667 if (connection->supportsAck()) {
2668 ret = ENGINE_TMPFAIL;
2670 ret = ENGINE_DISCONNECT;
2671 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2672 "ack support. Force disconnect...\n",
2673 connection->logHeader());
2679 switch (tap_event) {
2681 ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2684 ret = flush(cookie, 0);
2685 LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2686 connection->logHeader());
2691 TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2692 nengine, &revSeqno);
2694 ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2699 case TAP_CHECKPOINT_START:
2700 case TAP_CHECKPOINT_END:
2702 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2704 if (tap_event == TAP_CHECKPOINT_START &&
2705 nengine == TapEngineSpecific::sizeRevSeqno) {
2706 // Set the current value for the max deleted seqno
2707 RCPtr<VBucket> vb = getVBucket(vbucket);
2709 return ENGINE_TMPFAIL;
2712 TapEngineSpecific::readSpecificData(tap_event,
2716 vb->ht.setMaxDeletedRevSeqno(seqnum);
2720 uint64_t checkpointId;
2721 memcpy(&checkpointId, data, sizeof(checkpointId));
2722 checkpointId = ntohll(checkpointId);
2723 ConnHandlerCheckPoint(tc, tap_event, vbucket,
2727 ret = ENGINE_DISCONNECT;
2728 LOG(EXTENSION_LOG_WARNING,
2729 "%s Checkpoint Id is missing in "
2730 "CHECKPOINT messages. Force disconnect...\n",
2731 connection->logHeader());
2735 ret = ENGINE_DISCONNECT;
2736 LOG(EXTENSION_LOG_WARNING,
2737 "%s not a consumer! Force disconnect\n",
2738 connection->logHeader());
2746 uint8_t nru = INITIAL_NRU_VALUE;
2747 uint64_t revSeqno = 0;
2748 TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2749 nengine, &revSeqno, &nru);
2751 if (!isDatatypeSupported(cookie)) {
2752 datatype = PROTOCOL_BINARY_RAW_BYTES;
2753 const unsigned char *dat = (const unsigned char*)data;
2754 const int datlen = ndata;
2755 if (checkUTF8JSON(dat, datlen)) {
2756 datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2759 ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2760 flags, datatype, 0, 0, revSeqno, exptime,
2767 if (nengine == sizeof(uint32_t)) {
2769 memcpy(&cc, engine_specific, sizeof(cc));
2773 case TAP_OPAQUE_ENABLE_AUTO_NACK:
2774 // @todo: the memcached core will _ALWAYS_ send nack
2775 // if it encounter an error. This should be
2776 // set as the default when we move to .next after 2.0
2777 // (currently we need to allow the message for
2778 // backwards compatibility)
2779 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2780 connection->logHeader());
2782 case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2783 connection->setSupportCheckpointSync(true);
2784 LOG(EXTENSION_LOG_INFO,
2785 "%s Enable checkpoint synchronization\n",
2786 connection->logHeader());
2788 case TAP_OPAQUE_OPEN_CHECKPOINT:
2790 * This event is only received by the TAP client that wants to
2791 * get mutations from closed checkpoints only. At this time,
2792 * only incremental backup client receives this event so that
2793 * it can close the connection and reconnect later.
2795 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2796 connection->logHeader());
2798 case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2800 LOG(EXTENSION_LOG_INFO,
2801 "%s Backfill started for vbucket %d.\n",
2802 connection->logHeader(), vbucket);
2803 BlockTimer timer(&stats.tapVbucketResetHisto);
2804 ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2806 if (ret == ENGINE_DISCONNECT) {
2807 LOG(EXTENSION_LOG_WARNING,
2808 "%s Failed to reset a vbucket %d. Force disconnect\n",
2809 connection->logHeader(), vbucket);
2811 LOG(EXTENSION_LOG_WARNING,
2812 "%s Reset vbucket %d was completed succecssfully.\n",
2813 connection->logHeader(), vbucket);
2816 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2818 tc->setBackfillPhase(true, vbucket);
2820 ret = ENGINE_DISCONNECT;
2821 LOG(EXTENSION_LOG_WARNING,
2822 "TAP consumer doesn't exists. Force disconnect\n");
2826 case TAP_OPAQUE_CLOSE_BACKFILL:
2828 LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2829 connection->logHeader());
2830 TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2832 tc->setBackfillPhase(false, vbucket);
2834 ret = ENGINE_DISCONNECT;
2835 LOG(EXTENSION_LOG_WARNING,
2836 "%s not a consumer! Force disconnect\n",
2837 connection->logHeader());
2841 case TAP_OPAQUE_CLOSE_TAP_STREAM:
2843 * This event is sent by the eVBucketMigrator to notify that
2844 * the source node closes the tap replication stream and
2845 * switches to TAKEOVER_VBUCKETS phase.
2846 * This is just an informative message and doesn't require any
2849 LOG(EXTENSION_LOG_INFO,
2850 "%s Received close tap stream. Switching to takeover phase.\n",
2851 connection->logHeader());
2853 case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2855 * This opaque message is just for notifying that the source
2856 * node receives change_vbucket_filter request and processes
2859 LOG(EXTENSION_LOG_INFO,
2860 "%s Notified that the source node changed a vbucket filter.\n",
2861 connection->logHeader());
2864 LOG(EXTENSION_LOG_WARNING,
2865 "%s Received an unknown opaque command\n",
2866 connection->logHeader());
2869 LOG(EXTENSION_LOG_WARNING,
2870 "%s Received tap opaque with unknown size %d\n",
2871 connection->logHeader(), nengine);
2875 case TAP_VBUCKET_SET:
2877 BlockTimer timer(&stats.tapVbucketSetHisto);
2879 if (nengine != sizeof(vbucket_state_t)) {
2881 LOG(EXTENSION_LOG_WARNING,
2882 "%s Received TAP_VBUCKET_SET with illegal size."
2883 " Force disconnect\n", connection->logHeader());
2884 ret = ENGINE_DISCONNECT;
2888 vbucket_state_t state;
2889 memcpy(&state, engine_specific, nengine);
2890 state = (vbucket_state_t)ntohl(state);
2892 ret = connection->setVBucketState(0, vbucket, state);
2898 LOG(EXTENSION_LOG_WARNING,
2899 "%s Recieved bad opcode, ignoring message\n",
2900 connection->logHeader());
2903 connection->processedEvent(tap_event, ret);
2907 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2908 TapConsumer *consumer,
2911 uint64_t checkpointId) {
2912 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2914 if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2915 getEpStore()->wakeUpFlusher();
2916 ret = ENGINE_SUCCESS;
2919 ret = ENGINE_DISCONNECT;
2920 LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2921 "checkpoint %llu. Force disconnect\n",
2922 consumer->logHeader(), checkpointId);
2928 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2930 reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2931 if (!(rv && rv->isConnected())) {
2932 LOG(EXTENSION_LOG_WARNING,
2933 "Walking a non-existent tap queue, disconnecting\n");
2937 if (rv->doDisconnect()) {
2938 LOG(EXTENSION_LOG_WARNING,
2939 "%s Disconnecting pending connection\n", rv->logHeader());
2945 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2951 TapProducer *connection = getTapProducer(cookie);
2953 LOG(EXTENSION_LOG_WARNING,
2954 "Unable to process tap ack. No producer found\n");
2955 return ENGINE_DISCONNECT;
2958 return connection->processAck(seqno, status, msg);
2961 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2965 ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2967 ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2970 bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2972 item_eviction_policy_t policy = engine.getEpStore()->
2973 getItemEvictionPolicy();
2974 numItems += vb->getNumItems(policy);
2975 numTempItems += vb->getNumTempItems();
2976 nonResident += vb->getNumNonResidentItems(policy);
2978 if (vb->getHighPriorityChkSize() > 0) {
2979 chkPersistRemaining++;
2982 fileSpaceUsed += vb->fileSpaceUsed;
2983 fileSize += vb->fileSize;
2985 if (desired_state != vbucket_state_dead) {
2986 htMemory += vb->ht.memorySize();
2987 htItemMemory += vb->ht.getItemMemory();
2988 htCacheSize += vb->ht.cacheSize;
2989 numEjects += vb->ht.getNumEjects();
2990 numExpiredItems += vb->numExpiredItems;
2991 metaDataMemory += vb->ht.metaDataMemory;
2992 metaDataDisk += vb->metaDataDisk;
2993 opsCreate += vb->opsCreate;
2994 opsUpdate += vb->opsUpdate;
2995 opsDelete += vb->opsDelete;
2996 opsReject += vb->opsReject;
2998 queueSize += vb->dirtyQueueSize;
2999 queueMemory += vb->dirtyQueueMem;
3000 queueFill += vb->dirtyQueueFill;
3001 queueDrain += vb->dirtyQueueDrain;
3002 queueAge += vb->getQueueAge();
3003 pendingWrites += vb->dirtyQueuePendingWrites;
3010 * A container class holding VBucketCountVisitors to aggregate stats for
3011 * different vbucket states.
3013 class VBucketCountAggregator : public VBucketVisitor {
3015 bool visitBucket(RCPtr<VBucket> &vb) {
3016 std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3017 it = visitorMap.find(vb->getState());
3018 if ( it != visitorMap.end() ) {
3019 it->second->visitBucket(vb);
3025 void addVisitor(VBucketCountVisitor* visitor) {
3026 visitorMap[visitor->getVBucketState()] = visitor;
3029 std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
3032 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3033 ADD_STAT add_stat) {
3034 VBucketCountAggregator aggregator;
3036 VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
3037 aggregator.addVisitor(&activeCountVisitor);
3039 VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
3040 aggregator.addVisitor(&replicaCountVisitor);
3042 VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
3043 aggregator.addVisitor(&pendingCountVisitor);
3045 VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
3046 aggregator.addVisitor(&deadCountVisitor);
3048 epstore->visit(aggregator);
3050 epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
3051 replicaCountVisitor.getMemResidentPer());
3052 tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
3053 replicaCountVisitor.getNumItems() +
3054 pendingCountVisitor.getNumItems());
3056 configuration.addStats(add_stat, cookie);
3058 EPStats &epstats = getEpStats();
3059 add_casted_stat("ep_version", VERSION, add_stat, cookie);
3060 add_casted_stat("ep_storage_age",
3061 epstats.dirtyAge, add_stat, cookie);
3062 add_casted_stat("ep_storage_age_highwat",
3063 epstats.dirtyAgeHighWat, add_stat, cookie);
3064 add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3067 if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3068 add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3069 } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3070 add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3073 add_casted_stat("ep_total_enqueued",
3074 epstats.totalEnqueued, add_stat, cookie);
3075 add_casted_stat("ep_total_persisted",
3076 epstats.totalPersisted, add_stat, cookie);
3077 add_casted_stat("ep_item_flush_failed",
3078 epstats.flushFailed, add_stat, cookie);
3079 add_casted_stat("ep_item_commit_failed",
3080 epstats.commitFailed, add_stat, cookie);
3081 add_casted_stat("ep_item_begin_failed",
3082 epstats.beginFailed, add_stat, cookie);
3083 add_casted_stat("ep_expired_access", epstats.expired_access,
3085 add_casted_stat("ep_expired_pager", epstats.expired_pager,
3087 add_casted_stat("ep_item_flush_expired",
3088 epstats.flushExpired, add_stat, cookie);
3089 add_casted_stat("ep_queue_size",
3090 epstats.diskQueueSize, add_stat, cookie);
3091 add_casted_stat("ep_flusher_todo",
3092 epstats.flusher_todo, add_stat, cookie);
3093 add_casted_stat("ep_uncommitted_items",
3094 epstats.flusher_todo, add_stat, cookie);
3095 add_casted_stat("ep_diskqueue_items",
3096 epstats.diskQueueSize, add_stat, cookie);
3097 add_casted_stat("ep_flusher_state",
3098 epstore->getFlusher(0)->stateName(),
3100 add_casted_stat("ep_commit_num", epstats.flusherCommits,
3102 add_casted_stat("ep_commit_time",
3103 epstats.commit_time, add_stat, cookie);
3104 add_casted_stat("ep_commit_time_total",
3105 epstats.cumulativeCommitTime, add_stat, cookie);
3106 add_casted_stat("ep_vbucket_del",
3107 epstats.vbucketDeletions, add_stat, cookie);
3108 add_casted_stat("ep_vbucket_del_fail",
3109 epstats.vbucketDeletionFail, add_stat, cookie);
3110 add_casted_stat("ep_flush_duration_total",
3111 epstats.cumulativeFlushTime, add_stat, cookie);
3112 add_casted_stat("ep_flush_all",
3113 epstore->isFlushAllScheduled() ? "true" : "false",
3115 add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
3117 add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
3119 add_casted_stat("curr_items_tot",
3120 activeCountVisitor.getNumItems() +
3121 replicaCountVisitor.getNumItems() +
3122 pendingCountVisitor.getNumItems(),
3124 add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
3126 add_casted_stat("vb_active_curr_items", activeCountVisitor.getNumItems(),
3128 add_casted_stat("vb_active_num_non_resident",
3129 activeCountVisitor.getNonResident(),
3131 add_casted_stat("vb_active_perc_mem_resident",