1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
33 #include "configuration.h"
35 #include "ep-engine/command_ids.h"
36 #include "item_pager.h"
39 #include "tapconnection.h"
49 ENGINE_ERROR_CODE create_instance(uint64_t interface,
50 GET_SERVER_API get_server_api,
51 ENGINE_HANDLE **handle);
54 void destroy_engine(void);
56 void EvpNotifyPendingConns(void*arg);
59 /* We're using notify_io_complete from ptr_fun, but that func
60 * got a "C" linkage that ptr_fun doesn't like... just
61 * cast it away with this typedef ;)
63 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
64 ENGINE_ERROR_CODE status);
68 class EventuallyPersistentEngine;
72 * Vbucket visitor that counts active vbuckets.
74 class VBucketCountVisitor : public VBucketVisitor {
76 VBucketCountVisitor(EventuallyPersistentEngine &e,
77 vbucket_state_t state) :
79 desired_state(state), numItems(0),
80 numTempItems(0),nonResident(0),
81 numVbucket(0), htMemory(0),
82 htItemMemory(0), htCacheSize(0),
83 numEjects(0), numExpiredItems(0),
84 metaDataMemory(0), metaDataDisk(0),
86 opsUpdate(0), opsDelete(0),
87 opsReject(0), queueSize(0),
88 queueMemory(0), queueAge(0),
89 queueFill(0), queueDrain(0),
90 pendingWrites(0), chkPersistRemaining(0),
91 fileSpaceUsed(0), fileSize(0)
94 bool visitBucket(RCPtr<VBucket> &vb);
96 void visit(StoredValue* v) {
98 cb_assert(false); // this does not happen
101 vbucket_state_t getVBucketState() { return desired_state; }
103 size_t getNumItems() { return numItems; }
105 size_t getNumTempItems() { return numTempItems; }
107 size_t getNonResident() { return nonResident; }
109 size_t getVBucketNumber() { return numVbucket; }
111 size_t getMemResidentPer() {
112 size_t numResident = numItems - nonResident;
113 return (numItems != 0) ? (size_t) (numResident *100.0) / (numItems) : 100;
116 size_t getEjects() { return numEjects; }
118 size_t getExpired() { return numExpiredItems; }
120 size_t getMetaDataMemory() { return metaDataMemory; }
122 size_t getMetaDataDisk() { return metaDataDisk; }
124 size_t getHashtableMemory() { return htMemory; }
126 size_t getItemMemory() { return htItemMemory; }
127 size_t getCacheSize() { return htCacheSize; }
129 size_t getOpsCreate() { return opsCreate; }
130 size_t getOpsUpdate() { return opsUpdate; }
131 size_t getOpsDelete() { return opsDelete; }
132 size_t getOpsReject() { return opsReject; }
134 size_t getQueueSize() { return queueSize; }
135 size_t getQueueMemory() { return queueMemory; }
136 size_t getQueueFill() { return queueFill; }
137 size_t getQueueDrain() { return queueDrain; }
138 uint64_t getAge() { return queueAge; }
139 size_t getPendingWrites() { return pendingWrites; }
140 size_t getChkPersistRemaining() { return chkPersistRemaining; }
142 size_t getFileSpaceUsed() { return fileSpaceUsed; }
143 size_t getFileSize() { return fileSize; }
146 EventuallyPersistentEngine &engine;
147 vbucket_state_t desired_state;
157 size_t numExpiredItems;
158 size_t metaDataMemory;
171 size_t pendingWrites;
172 size_t chkPersistRemaining;
174 size_t fileSpaceUsed;
179 * memcached engine interface to the EventuallyPersistentStore.
181 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
182 friend class LookupCallback;
184 ENGINE_ERROR_CODE initialize(const char* config);
185 void destroy(bool force);
187 ENGINE_ERROR_CODE itemAllocate(const void* cookie,
193 const rel_time_t exptime,
197 if (nbytes > maxItemSize) {
201 time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
204 uint8_t ext_len = EXT_META_LEN;
205 *(ext_meta) = datatype;
206 *itm = new Item(key, nkey, nbytes, flags, expiretime, ext_meta,
209 return memoryCondition();
211 stats.itemAllocSizeHisto.add(nbytes);
212 return ENGINE_SUCCESS;
216 ENGINE_ERROR_CODE itemDelete(const void* cookie,
222 std::string k(static_cast<const char*>(key), nkey);
223 return itemDelete(cookie, k, cas, vbucket);
226 ENGINE_ERROR_CODE itemDelete(const void* cookie,
227 const std::string &key,
231 ENGINE_ERROR_CODE ret = epstore->deleteItem(key, cas,
236 if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
237 if (isDegradedMode()) {
238 return ENGINE_TMPFAIL;
240 } else if (ret == ENGINE_SUCCESS) {
241 ++stats.numOpsDelete;
247 void itemRelease(const void* cookie, item *itm)
253 ENGINE_ERROR_CODE get(const void* cookie,
258 bool track_stat = false)
260 BlockTimer timer(&stats.getCmdHisto);
261 std::string k(static_cast<const char*>(key), nkey);
263 GetValue gv(epstore->get(k, vbucket, cookie, serverApi->core));
264 ENGINE_ERROR_CODE ret = gv.getStatus();
266 if (ret == ENGINE_SUCCESS) {
267 *itm = gv.getValue();
271 } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
272 if (isDegradedMode()) {
273 return ENGINE_TMPFAIL;
280 const char* getName() {
284 ENGINE_ERROR_CODE getStats(const void* cookie,
285 const char* stat_key,
292 epstore->resetUnderlyingStats();
296 ENGINE_ERROR_CODE store(const void *cookie,
299 ENGINE_STORE_OPERATION operation,
302 ENGINE_ERROR_CODE arithmetic(const void* cookie,
305 const bool increment,
307 const uint64_t delta,
308 const uint64_t initial,
309 const rel_time_t exptime,
315 BlockTimer timer(&stats.arithCmdHisto);
318 uint8_t ext_len = EXT_META_LEN;
319 *(ext_meta) = datatype;
321 rel_time_t expiretime = (exptime == 0 ||
322 exptime == 0xffffffff) ?
323 0 : ep_abs_time(ep_reltime(exptime));
325 ENGINE_ERROR_CODE ret = get(cookie, &it, key, nkey, vbucket);
326 if (ret == ENGINE_SUCCESS) {
327 Item *itm = static_cast<Item*>(it);
330 size_t len = std::min(static_cast<uint32_t>(sizeof(data) - 1),
333 memcpy(data, itm->getData(), len);
334 uint64_t val = strtoull(data, &endptr, 10);
335 if (itm->getCas() == (uint64_t) -1) {
336 // item is locked, can't perform arithmetic operation
338 return ENGINE_TMPFAIL;
340 if ((errno != ERANGE) && (isspace(*endptr)
341 || (*endptr == '\0' && endptr != data))) {
352 std::stringstream vals;
354 size_t nb = vals.str().length();
356 Item *nit = new Item(key, (uint16_t)nkey, itm->getFlags(),
357 itm->getExptime(), vals.str().c_str(), nb,
359 nit->setCas(itm->getCas());
360 ret = store(cookie, nit, cas, OPERATION_CAS, vbucket);
367 } else if (ret == ENGINE_NOT_MY_VBUCKET) {
368 return isDegradedMode() ? ENGINE_TMPFAIL: ret;
369 } else if (ret == ENGINE_KEY_ENOENT) {
370 if (isDegradedMode()) {
371 return ENGINE_TMPFAIL;
374 std::stringstream vals;
376 size_t nb = vals.str().length();
378 Item *itm = new Item(key, (uint16_t)nkey, 0, expiretime,
379 vals.str().c_str(), nb, ext_meta, ext_len);
380 ret = store(cookie, itm, cas, OPERATION_ADD, vbucket);
385 /* We had a race condition.. just call ourself recursively to retry */
386 if (ret == ENGINE_KEY_EEXISTS) {
387 return arithmetic(cookie, key, nkey, increment, create, delta,
388 initial, expiretime, cas, datatype, result,
390 } else if (ret == ENGINE_SUCCESS) {
399 ENGINE_ERROR_CODE flush(const void *cookie, time_t when);
401 uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
402 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
403 uint32_t *seqno, uint16_t *vbucket);
405 bool createTapQueue(const void *cookie,
408 const void *userdata,
411 ENGINE_ERROR_CODE tapNotify(const void *cookie,
412 void *engine_specific,
428 ENGINE_ERROR_CODE dcpOpen(const void* cookie,
435 ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
440 ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
443 uint64_t checkpointId);
445 ENGINE_ERROR_CODE touch(const void* cookie,
446 protocol_binary_request_header *request,
447 ADD_RESPONSE response);
449 ENGINE_ERROR_CODE getMeta(const void* cookie,
450 protocol_binary_request_get_meta *request,
451 ADD_RESPONSE response);
452 ENGINE_ERROR_CODE setWithMeta(const void* cookie,
453 protocol_binary_request_set_with_meta *request,
454 ADD_RESPONSE response);
455 ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
456 protocol_binary_request_delete_with_meta *request,
457 ADD_RESPONSE response);
459 ENGINE_ERROR_CODE returnMeta(const void* cookie,
460 protocol_binary_request_return_meta *request,
461 ADD_RESPONSE response);
463 ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
464 protocol_binary_request_set_cluster_config *request,
465 ADD_RESPONSE response);
467 ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
468 protocol_binary_request_get_cluster_config *request,
469 ADD_RESPONSE response);
471 ENGINE_ERROR_CODE getAllKeys(const void* cookie,
472 protocol_binary_request_get_keys *request,
473 ADD_RESPONSE response);
476 * Visit the objects and add them to the tap/dcp connecitons queue.
477 * @todo this code should honor the backfill time!
479 void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
481 void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
482 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
483 serverApi->cookie->set_priority(cookie, priority);
484 ObjectRegistry::onSwitchThread(epe);
487 void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
488 if (cookie == NULL) {
489 LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
491 BlockTimer bt(&stats.notifyIOHisto);
492 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
493 serverApi->cookie->notify_io_complete(cookie, status);
494 ObjectRegistry::onSwitchThread(epe);
498 ENGINE_ERROR_CODE reserveCookie(const void *cookie);
499 ENGINE_ERROR_CODE releaseCookie(const void *cookie);
501 void storeEngineSpecific(const void *cookie, void *engine_data) {
502 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
503 serverApi->cookie->store_engine_specific(cookie, engine_data);
504 ObjectRegistry::onSwitchThread(epe);
507 void *getEngineSpecific(const void *cookie) {
508 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
509 void *engine_data = serverApi->cookie->get_engine_specific(cookie);
510 ObjectRegistry::onSwitchThread(epe);
514 bool isDatatypeSupported(const void *cookie) {
515 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
516 bool isSupported = serverApi->cookie->is_datatype_supported(cookie);
517 ObjectRegistry::onSwitchThread(epe);
521 uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
522 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
523 uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
524 ObjectRegistry::onSwitchThread(epe);
528 bool validateSessionCas(const uint64_t cas) {
529 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
530 bool ret = serverApi->cookie->validate_session_cas(cas);
531 ObjectRegistry::onSwitchThread(epe);
535 void decrementSessionCtr(void) {
536 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
537 serverApi->cookie->decrement_session_ctr();
538 ObjectRegistry::onSwitchThread(epe);
541 void registerEngineCallback(ENGINE_EVENT_TYPE type,
542 EVENT_CALLBACK cb, const void *cb_data);
544 template <typename T>
545 void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
546 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
547 std::for_each(cookies.begin(), cookies.end(),
548 std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
550 ObjectRegistry::onSwitchThread(epe);
553 void handleDisconnect(const void *cookie);
555 protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
557 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
559 if (!epstore->pauseFlusher()) {
560 LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
561 *msg = "Flusher not running.";
562 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
567 protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
569 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
571 if (!epstore->resumeFlusher()) {
572 LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
573 *msg = "Flusher not shut down.";
574 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
579 ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
580 return epstore->deleteVBucket(vbid, c);
583 ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
584 const void *cookie = NULL) {
585 return epstore->compactDB(vbid, c, cookie);
588 bool resetVBucket(uint16_t vbid) {
589 return epstore->resetVBucket(vbid);
592 void setTapKeepAlive(uint32_t to) {
593 configuration.setTapKeepalive((size_t)to);
596 void setFlushAll(bool enabled) {
597 flushAllEnabled = enabled;
600 protocol_binary_response_status evictKey(const std::string &key,
604 return epstore->evictKey(key, vbucket, msg, msg_size);
607 bool getLocked(const std::string &key,
609 Callback<GetValue> &cb,
610 rel_time_t currentTime,
611 uint32_t lockTimeout,
612 const void *cookie) {
613 return epstore->getLocked(key, vbucket, cb, currentTime, lockTimeout, cookie);
616 ENGINE_ERROR_CODE unlockKey(const std::string &key,
619 rel_time_t currentTime) {
620 return epstore->unlockKey(key, vbucket, cas, currentTime);
623 ENGINE_ERROR_CODE observe(const void* cookie,
624 protocol_binary_request_header *request,
625 ADD_RESPONSE response);
627 RCPtr<VBucket> getVBucket(uint16_t vbucket) {
628 return epstore->getVBucket(vbucket);
631 ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer) {
632 return epstore->setVBucketState(vbid, to, transfer);
635 ~EventuallyPersistentEngine();
637 engine_info *getInfo() {
641 EPStats &getEpStats() {
645 EventuallyPersistentStore* getEpStore() { return epstore; }
647 TapConnMap &getTapConnMap() { return *tapConnMap; }
649 DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
651 TapConfig &getTapConfig() { return *tapConfig; }
653 TapThrottle &getTapThrottle() { return *tapThrottle; }
655 CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
657 SERVER_HANDLE_V1* getServerApi() { return serverApi; }
659 Configuration &getConfiguration() {
660 return configuration;
663 ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
664 protocol_binary_request_header *request,
665 ADD_RESPONSE response);
667 ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
668 protocol_binary_request_header *request,
669 ADD_RESPONSE response);
671 ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
672 protocol_binary_request_header *request,
673 ADD_RESPONSE response);
675 ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
676 protocol_binary_request_header *request,
677 ADD_RESPONSE response);
679 ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
680 protocol_binary_request_header *request,
681 ADD_RESPONSE response);
683 ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
684 protocol_binary_request_header *request,
685 ADD_RESPONSE response);
687 size_t getGetlDefaultTimeout() const {
688 return getlDefaultTimeout;
691 size_t getGetlMaxTimeout() const {
692 return getlMaxTimeout;
695 size_t getMaxFailoverEntries() const {
696 return maxFailoverEntries;
699 bool isDegradedMode() const {
700 return epstore->isWarmingUp() || !trafficEnabled.load();
703 WorkLoadPolicy &getWorkLoadPolicy(void) {
707 bucket_priority_t getWorkloadPriority(void) {return workloadPriority; }
708 void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
710 struct clusterConfig {
711 clusterConfig() : len(0), config(NULL) {}
717 ENGINE_ERROR_CODE getRandomKey(const void *cookie,
718 ADD_RESPONSE response);
720 ConnHandler* getConnHandler(const void *cookie);
722 void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
725 * Get a (sloppy) list of the sequence numbers for all of the vbuckets
726 * on this server. It is not to be treated as a consistent set of seqence,
727 * but rather a list of "at least" numbers. The way the list is generated
728 * is that we're starting for vbucket 0 and record the current number,
729 * then look at the next vbucket and record its number. That means that
730 * at the time we get the number for vbucket X all of the previous
731 * numbers could have been incremented. If the client just needs a list
732 * of where we are for each vbucket this method may be more optimal than
733 * requesting one by one.
735 * @param cookie The cookie representing the connection to requesting
737 * @param add_response The method used to format the output buffer
738 * @return ENGINE_SUCCESS upon success
740 ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(const void *cookie,
741 ADD_RESPONSE response);
744 friend class EpEngineValueChangeListener;
746 void setMaxItemSize(size_t value) {
750 void setGetlDefaultTimeout(size_t value) {
751 getlDefaultTimeout = value;
754 void setGetlMaxTimeout(size_t value) {
755 getlMaxTimeout = value;
758 EventuallyPersistentEngine(GET_SERVER_API get_server_api);
759 friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
760 GET_SERVER_API get_server_api,
761 ENGINE_HANDLE **handle);
762 uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
763 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
764 uint32_t *seqno, uint16_t *vbucket,
765 TapProducer *c, bool &retry);
768 ENGINE_ERROR_CODE processTapAck(const void *cookie,
771 const std::string &msg);
774 * Report the state of a memory condition when out of memory.
776 * @return ETMPFAIL if we think we can recover without interaction,
779 ENGINE_ERROR_CODE memoryCondition() {
780 // Do we think it's possible we could free something?
781 bool haveEvidenceWeCanFreeMemory(stats.getMaxDataSize() > stats.memOverhead);
782 if (haveEvidenceWeCanFreeMemory) {
783 // Look for more evidence by seeing if we have resident items.
784 VBucketCountVisitor countVisitor(*this, vbucket_state_active);
785 epstore->visit(countVisitor);
787 haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
788 countVisitor.getNumItems();
790 if (haveEvidenceWeCanFreeMemory) {
791 ++stats.tmp_oom_errors;
792 // Wake up the item pager task as memory usage
793 // seems to have exceeded high water mark
794 if ((getEpStore()->fetchItemPagerTask())->getState() ==
796 ExecutorPool::get()->wake(
797 (getEpStore()->fetchItemPagerTask())->getId());
799 return ENGINE_TMPFAIL;
802 return ENGINE_ENOMEM;
806 friend class BGFetchCallback;
807 friend class EventuallyPersistentStore;
809 bool enableTraffic(bool enable) {
810 bool inverse = !enable;
811 return trafficEnabled.compare_exchange_strong(inverse, enable);
814 ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
815 ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
816 ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
817 ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
818 const char* stat_key,
820 bool prevStateRequested,
822 ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
823 ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
824 const char* stat_key, int nkey);
825 ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
826 ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
827 ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
828 const char *sep, size_t nsep,
829 conn_type_t connType);
830 ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
831 ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
832 ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
833 ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
834 ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
835 uint16_t vbid, std::string &key, bool validate=false);
836 ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
841 ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
845 ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
848 ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
849 ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
850 ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
851 const char* stat_key, int nkey);
852 ENGINE_ERROR_CODE doDiskStats(const void *cookie, ADD_STAT add_stat,
853 const char* stat_key, int nkey);
855 void addLookupResult(const void *cookie, Item *result) {
856 LockHolder lh(lookupMutex);
857 std::map<const void*, Item*>::iterator it = lookups.find(cookie);
858 if (it != lookups.end()) {
859 if (it->second != NULL) {
860 LOG(EXTENSION_LOG_DEBUG,
861 "Cleaning up old lookup result for '%s'",
862 it->second->getKey().c_str());
865 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
869 lookups[cookie] = result;
872 bool fetchLookupResult(const void *cookie, Item **itm) {
873 // This will return *and erase* the lookup result for a connection.
874 // You look it up, you own it.
875 LockHolder lh(lookupMutex);
876 std::map<const void*, Item*>::iterator it = lookups.find(cookie);
877 if (it != lookups.end()) {
886 // Get the current tap connection for this cookie.
887 // If this method returns NULL, you should return TAP_DISCONNECT
888 TapProducer* getTapProducer(const void *cookie);
890 // Initialize all required callbacks of this engine with the underlying
892 void initializeEngineCallbacks();
894 SERVER_HANDLE_V1 *serverApi;
895 EventuallyPersistentStore *epstore;
896 WorkLoadPolicy *workload;
897 bucket_priority_t workloadPriority;
899 TapThrottle *tapThrottle;
900 std::map<const void*, Item*> lookups;
901 unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
903 GET_SERVER_API getServerApiFunc;
906 char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
909 DcpConnMap *dcpConnMap_;
910 TapConnMap *tapConnMap;
911 TapConfig *tapConfig;
912 CheckpointConfig *checkpointConfig;
915 size_t getlDefaultTimeout;
916 size_t getlMaxTimeout;
917 size_t maxFailoverEntries;
919 Configuration configuration;
920 AtomicValue<bool> trafficEnabled;
922 bool flushAllEnabled;
923 // a unique system generated token initialized at each time
924 // ep_engine starts up.
925 AtomicValue<time_t> startupTime;
929 #endif // SRC_EP_ENGINE_H_