1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
23 #include "kv_bucket.h"
24 #include "storeddockey.h"
25 #include "tapconnection.h"
29 #include <memcached/engine.h>
30 #include <platform/processclock.h>
36 class DcpFlowControlManager;
39 class ReplicationThrottle;
40 class VBucketCountVisitor;
44 ENGINE_ERROR_CODE create_instance(uint64_t interface,
45 GET_SERVER_API get_server_api,
46 ENGINE_HANDLE **handle);
49 void destroy_engine(void);
51 void EvpNotifyPendingConns(void*arg);
54 /* We're using notify_io_complete from ptr_fun, but that func
55 * got a "C" linkage that ptr_fun doesn't like... just
56 * cast it away with this typedef ;)
58 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
59 ENGINE_ERROR_CODE status);
62 class EventuallyPersistentEngine;
66 To allow Engines to run tasks.
68 class EpEngineTaskable : public Taskable {
70 EpEngineTaskable(EventuallyPersistentEngine* e) : myEngine(e) {
74 const std::string& getName() const;
76 task_gid_t getGID() const;
78 bucket_priority_t getWorkloadPriority() const;
80 void setWorkloadPriority(bucket_priority_t prio);
82 WorkLoadPolicy& getWorkLoadPolicy(void);
84 void logQTime(TaskId id, const ProcessClock::duration enqTime);
86 void logRunTime(TaskId id, const ProcessClock::duration runTime);
89 EventuallyPersistentEngine* myEngine;
93 * A container class holding VBucketCountVisitors to aggregate stats for
94 * different vbucket states.
96 class VBucketCountAggregator : public VBucketVisitor {
98 void visitBucket(VBucketPtr &vb) override;
100 void addVisitor(VBucketCountVisitor* visitor);
102 std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
106 * memcached engine interface to the KVBucket.
108 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
109 friend class LookupCallback;
111 ENGINE_ERROR_CODE initialize(const char* config);
112 void destroy(bool force);
114 ENGINE_ERROR_CODE itemAllocate(item** itm,
117 const size_t priv_nbytes,
119 const rel_time_t exptime,
123 * Delete a given key and value from the engine.
125 * @param cookie The cookie representing the connection
126 * @param key The key that needs to be deleted from the engine
127 * @param cas CAS value of the mutation that needs to be returned
129 * @param vbucket vbucket id to which the deleted key corresponds to
130 * @param item_meta pointer to item meta data that needs to be
131 * as a result the delete. A NULL pointer indicates
132 * that no meta data needs to be returned.
133 * @param mut_info pointer to the mutation info that resulted from
136 * @returns ENGINE_SUCCESS if the delete was successful or
137 * an error code indicating the error
139 ENGINE_ERROR_CODE itemDelete(const void* cookie,
143 ItemMetaData* item_meta,
144 mutation_descr_t* mut_info) {
145 ENGINE_ERROR_CODE ret = kvBucket->deleteItem(key,
152 if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
153 if (isDegradedMode()) {
154 return ENGINE_TMPFAIL;
156 } else if (ret == ENGINE_SUCCESS) {
157 ++stats.numOpsDelete;
163 void itemRelease(const void* cookie, item *itm)
169 ENGINE_ERROR_CODE get(const void* cookie,
173 get_options_t options)
175 BlockTimer timer(&stats.getCmdHisto);
176 GetValue gv(kvBucket->get(key, vbucket, cookie, options));
177 ENGINE_ERROR_CODE ret = gv.getStatus();
179 if (ret == ENGINE_SUCCESS) {
180 *itm = gv.getValue();
181 if (options & TRACK_STATISTICS) {
184 } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
185 if (isDegradedMode()) {
186 return ENGINE_TMPFAIL;
193 cb::EngineErrorItemPair get_if(const void* cookie,
197 const item_info&)> filter);
199 ENGINE_ERROR_CODE get_locked(const void* cookie,
203 uint32_t lock_timeout);
206 ENGINE_ERROR_CODE unlock(const void* cookie,
212 const std::string& getName() const {
216 ENGINE_ERROR_CODE getStats(const void* cookie,
217 const char* stat_key,
224 kvBucket->resetUnderlyingStats();
228 ENGINE_ERROR_CODE store(const void *cookie,
231 ENGINE_STORE_OPERATION operation);
233 ENGINE_ERROR_CODE flush(const void *cookie);
235 uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
236 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
237 uint32_t *seqno, uint16_t *vbucket);
239 bool createTapQueue(const void *cookie,
242 const void *userdata,
245 ENGINE_ERROR_CODE tapNotify(const void *cookie,
246 void *engine_specific,
262 ENGINE_ERROR_CODE dcpOpen(const void* cookie,
266 const void *stream_name,
269 ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
274 ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
277 uint64_t checkpointId);
279 ENGINE_ERROR_CODE touch(const void* cookie,
280 protocol_binary_request_header *request,
281 ADD_RESPONSE response,
282 DocNamespace docNamespace);
284 ENGINE_ERROR_CODE getMeta(const void* cookie,
285 protocol_binary_request_get_meta *request,
286 ADD_RESPONSE response,
287 DocNamespace docNamespace);
289 ENGINE_ERROR_CODE setWithMeta(const void* cookie,
290 protocol_binary_request_set_with_meta *request,
291 ADD_RESPONSE response,
292 DocNamespace docNamespace);
294 ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
295 protocol_binary_request_delete_with_meta *request,
296 ADD_RESPONSE response,
297 DocNamespace docNamespace);
299 ENGINE_ERROR_CODE returnMeta(const void* cookie,
300 protocol_binary_request_return_meta *request,
301 ADD_RESPONSE response,
302 DocNamespace docNamespace);
304 ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
305 protocol_binary_request_set_cluster_config *request,
306 ADD_RESPONSE response);
308 ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
309 protocol_binary_request_get_cluster_config *request,
310 ADD_RESPONSE response);
312 ENGINE_ERROR_CODE getAllKeys(const void* cookie,
313 protocol_binary_request_get_keys *request,
314 ADD_RESPONSE response,
315 DocNamespace docNamespace);
318 * Visit the objects and add them to the tap/dcp connecitons queue.
319 * @todo this code should honor the backfill time!
321 void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
323 void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
324 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
325 serverApi->cookie->set_priority(cookie, priority);
326 ObjectRegistry::onSwitchThread(epe);
329 void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
330 if (cookie == NULL) {
331 LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
333 BlockTimer bt(&stats.notifyIOHisto);
334 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
335 serverApi->cookie->notify_io_complete(cookie, status);
336 ObjectRegistry::onSwitchThread(epe);
340 ENGINE_ERROR_CODE reserveCookie(const void *cookie);
341 ENGINE_ERROR_CODE releaseCookie(const void *cookie);
343 void storeEngineSpecific(const void *cookie, void *engine_data) {
344 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
345 serverApi->cookie->store_engine_specific(cookie, engine_data);
346 ObjectRegistry::onSwitchThread(epe);
349 void *getEngineSpecific(const void *cookie) {
350 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
351 void *engine_data = serverApi->cookie->get_engine_specific(cookie);
352 ObjectRegistry::onSwitchThread(epe);
356 bool isDatatypeSupported(const void* cookie,
357 protocol_binary_datatype_t datatype) {
358 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
360 serverApi->cookie->is_datatype_supported(cookie, datatype);
361 ObjectRegistry::onSwitchThread(epe);
365 bool isMutationExtrasSupported(const void *cookie) {
366 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
367 bool isSupported = serverApi->cookie->is_mutation_extras_supported(cookie);
368 ObjectRegistry::onSwitchThread(epe);
372 bool isXattrSupported(const void* cookie) {
373 return isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_XATTR);
376 uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
377 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
378 uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
379 ObjectRegistry::onSwitchThread(epe);
383 bool validateSessionCas(const uint64_t cas) {
384 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
385 bool ret = serverApi->cookie->validate_session_cas(cas);
386 ObjectRegistry::onSwitchThread(epe);
390 void decrementSessionCtr(void) {
391 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
392 serverApi->cookie->decrement_session_ctr();
393 ObjectRegistry::onSwitchThread(epe);
396 void registerEngineCallback(ENGINE_EVENT_TYPE type,
397 EVENT_CALLBACK cb, const void *cb_data);
399 template <typename T>
400 void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
401 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
402 std::for_each(cookies.begin(), cookies.end(),
403 std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
405 ObjectRegistry::onSwitchThread(epe);
408 void handleDisconnect(const void *cookie);
409 void handleDeleteBucket(const void *cookie);
411 protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
413 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
415 if (!kvBucket->pauseFlusher()) {
416 LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
417 *msg = "Flusher not running.";
418 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
423 protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
425 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
427 if (!kvBucket->resumeFlusher()) {
428 LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
429 *msg = "Flusher not shut down.";
430 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
435 ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
436 return kvBucket->deleteVBucket(vbid, c);
439 ENGINE_ERROR_CODE compactDB(uint16_t vbid,
441 const void *cookie = NULL) {
442 return kvBucket->scheduleCompaction(vbid, c, cookie);
445 bool resetVBucket(uint16_t vbid) {
446 return kvBucket->resetVBucket(vbid);
449 void setTapKeepAlive(uint32_t to) {
450 configuration.setTapKeepalive((size_t)to);
453 void setDeleteAll(bool enabled) {
454 deleteAllEnabled = enabled;
457 protocol_binary_response_status evictKey(const DocKey& key,
460 return kvBucket->evictKey(key, vbucket, msg);
463 ENGINE_ERROR_CODE observe(const void* cookie,
464 protocol_binary_request_header *request,
465 ADD_RESPONSE response,
466 DocNamespace docNamespace);
468 ENGINE_ERROR_CODE observe_seqno(const void* cookie,
469 protocol_binary_request_header *request,
470 ADD_RESPONSE response);
472 VBucketPtr getVBucket(uint16_t vbucket) {
473 return kvBucket->getVBucket(vbucket);
476 ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to,
478 return kvBucket->setVBucketState(vbid, to, transfer);
481 protocol_binary_response_status setParam(
482 protocol_binary_request_set_param* req, std::string& msg);
484 protocol_binary_response_status setFlushParam(const char* keyz,
488 protocol_binary_response_status setTapParam(const char* keyz,
492 protocol_binary_response_status setCheckpointParam(const char* keyz,
496 protocol_binary_response_status setDcpParam(const char* keyz,
500 protocol_binary_response_status setVbucketParam(uint16_t vbucket,
505 ~EventuallyPersistentEngine();
507 engine_info *getInfo() {
511 item_info getItemInfo(const Item& itm);
513 EPStats &getEpStats() {
517 KVBucket* getKVBucket() { return kvBucket.get(); }
519 TapConnMap &getTapConnMap() { return *tapConnMap; }
521 DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
523 DcpFlowControlManager &getDcpFlowControlManager() {
524 return *dcpFlowControlManager_;
527 TapConfig &getTapConfig() { return *tapConfig; }
529 ReplicationThrottle &getReplicationThrottle() { return *replicationThrottle; }
531 CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
533 SERVER_HANDLE_V1* getServerApi() { return serverApi; }
535 Configuration &getConfiguration() {
536 return configuration;
539 ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
540 protocol_binary_request_header *request,
541 ADD_RESPONSE response);
543 ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
544 protocol_binary_request_header *request,
545 ADD_RESPONSE response);
547 ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
548 protocol_binary_request_header *request,
549 ADD_RESPONSE response);
551 ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
552 protocol_binary_request_header *request,
553 ADD_RESPONSE response);
555 ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
556 protocol_binary_request_header *request,
557 ADD_RESPONSE response);
559 ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
560 protocol_binary_request_header *request,
561 ADD_RESPONSE response);
563 size_t getGetlDefaultTimeout() const {
564 return getlDefaultTimeout;
567 size_t getGetlMaxTimeout() const {
568 return getlMaxTimeout;
571 size_t getMaxFailoverEntries() const {
572 return maxFailoverEntries;
575 bool isDegradedMode() const {
576 return kvBucket->isWarmingUp() || !trafficEnabled.load();
579 WorkLoadPolicy &getWorkLoadPolicy(void) {
583 bucket_priority_t getWorkloadPriority(void) const {return workloadPriority; }
584 void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
586 struct clusterConfig {
591 ENGINE_ERROR_CODE getRandomKey(const void *cookie,
592 ADD_RESPONSE response);
594 ConnHandler* getConnHandler(const void *cookie);
596 void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
599 * Explicitly trigger the defragmenter task. Provided to facilitate
602 void runDefragmenterTask(void);
605 * Explicitly trigger the AccessScanner task. Provided to facilitate
608 bool runAccessScannerTask(void);
611 * Explicitly trigger the VbStatePersist task. Provided to facilitate
614 void runVbStatePersistTask(int vbid);
617 * Get a (sloppy) list of the sequence numbers for all of the vbuckets
618 * on this server. It is not to be treated as a consistent set of seqence,
619 * but rather a list of "at least" numbers. The way the list is generated
620 * is that we're starting for vbucket 0 and record the current number,
621 * then look at the next vbucket and record its number. That means that
622 * at the time we get the number for vbucket X all of the previous
623 * numbers could have been incremented. If the client just needs a list
624 * of where we are for each vbucket this method may be more optimal than
625 * requesting one by one.
627 * @param cookie The cookie representing the connection to requesting
629 * @param add_response The method used to format the output buffer
630 * @return ENGINE_SUCCESS upon success
632 ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(
634 protocol_binary_request_header *request,
635 ADD_RESPONSE response);
637 void updateDcpMinCompressionRatio(float value);
640 * Sends a not-my-vbucket response, using the specified response callback.
641 * to the specified connection via it's cookie.
643 ENGINE_ERROR_CODE sendNotMyVBucketResponse(ADD_RESPONSE response,
647 EpEngineTaskable& getTaskable() {
652 friend class EpEngineValueChangeListener;
654 void setMaxItemSize(size_t value) {
658 void setMaxItemPrivilegedBytes(size_t value) {
659 maxItemPrivilegedBytes = value;
662 void setGetlDefaultTimeout(size_t value) {
663 getlDefaultTimeout = value;
666 void setGetlMaxTimeout(size_t value) {
667 getlMaxTimeout = value;
670 EventuallyPersistentEngine(GET_SERVER_API get_server_api);
671 friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
672 GET_SERVER_API get_server_api,
673 ENGINE_HANDLE **handle);
674 uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
675 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
676 uint32_t *seqno, uint16_t *vbucket,
677 TapProducer *c, bool &retry);
680 ENGINE_ERROR_CODE processTapAck(const void *cookie,
686 * Report the state of a memory condition when out of memory.
688 * @return ETMPFAIL if we think we can recover without interaction,
691 ENGINE_ERROR_CODE memoryCondition();
694 * Check if there is any available memory space to allocate an Item
695 * instance with a given size.
697 bool hasAvailableSpace(uint32_t nBytes) {
698 return (stats.getTotalMemoryUsed() + nBytes) <= stats.getMaxDataSize();
701 friend class BGFetchCallback;
702 friend class KVBucket;
703 friend class EPBucket;
705 bool enableTraffic(bool enable) {
706 bool inverse = !enable;
707 return trafficEnabled.compare_exchange_strong(inverse, enable);
710 ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
711 ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
712 ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
713 ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
714 const char* stat_key,
716 bool prevStateRequested,
718 ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
719 ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
720 const char* stat_key, int nkey);
721 ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
722 ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
723 ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
724 const char *sep, size_t nsep,
725 conn_type_t connType);
726 ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
727 ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
728 ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
729 ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
730 ENGINE_ERROR_CODE doTasksStats(const void* cookie, ADD_STAT add_stat);
731 ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
732 uint16_t vbid, const DocKey& key, bool validate=false);
733 ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
738 ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
742 ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
745 ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
746 ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
747 ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
748 const char* stat_key, int nkey);
749 void addSeqnoVbStats(const void *cookie, ADD_STAT add_stat,
750 const VBucketPtr &vb);
752 void addLookupResult(const void *cookie, Item *result) {
753 LockHolder lh(lookupMutex);
754 std::map<const void*, Item*>::iterator it = lookups.find(cookie);
755 if (it != lookups.end()) {
756 if (it->second != NULL) {
757 LOG(EXTENSION_LOG_DEBUG,
758 "Cleaning up old lookup result for '%s'",
759 it->second->getKey().data());
762 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
766 lookups[cookie] = result;
769 bool fetchLookupResult(const void *cookie, Item **itm) {
770 // This will return *and erase* the lookup result for a connection.
771 // You look it up, you own it.
772 LockHolder lh(lookupMutex);
773 std::map<const void*, Item*>::iterator it = lookups.find(cookie);
774 if (it != lookups.end()) {
783 // Get the current tap connection for this cookie.
784 // If this method returns NULL, you should return TAP_DISCONNECT
785 TapProducer* getTapProducer(const void *cookie);
787 // Initialize all required callbacks of this engine with the underlying
789 void initializeEngineCallbacks();
792 * Private helper method for decoding the options on set/del_with_meta.
793 * Tighly coupled to the logic of both those functions, it will
794 * take a request pointer and locate and validate any options within.
795 * @param request pointer to the set/del_with_meta request packet
796 * @param generateCas set to Yes if CAS regeneration is enabled.
797 * @param skipConflictResolution set to true if conflict resolution should
799 * @param keyOffset set to the number of bytes which are to be skipped to
802 protocol_binary_response_status decodeWithMetaOptions(
803 protocol_binary_request_delete_with_meta* request,
804 GenerateCas& generateCas,
805 bool& skipConflictResolution,
809 * Sends NOT_SUPPORTED response, using the specified response callback
810 * to the specified connection via it's cookie.
812 * @param response callback func to send the response
813 * @param cookie conn cookie
815 * @return status of sending response
817 ENGINE_ERROR_CODE sendNotSupportedResponse(ADD_RESPONSE response,
821 * Sends error response, using the specified error and response callback
822 * to the specified connection via it's cookie.
824 * @param response callback func to send the response
825 * @param status error status to send
826 * @param cas a cas value to send
827 * @param cookie conn cookie
829 * @return status of sending response
831 ENGINE_ERROR_CODE sendErrorResponse(ADD_RESPONSE response,
832 protocol_binary_response_status status,
837 * Sends a response that includes the mutation extras, the VB uuid and
838 * seqno of the mutation.
840 * @param response callback func to send the response
841 * @param vbucket vbucket that was mutated
842 * @param bySeqno the seqno to send
843 * @param status a mcbp status code
844 * @param cas cas assigned to the mutation
845 * @param cookie conn cookie
846 * @returns NMVB if VB can't be located, or the ADD_RESPONSE return code.
848 ENGINE_ERROR_CODE sendMutationExtras(ADD_RESPONSE response,
851 protocol_binary_response_status status,
856 * Factory method for constructing the correct bucket type given the
858 * @param config Configuration to create bucket based on. Note this
859 * object may be modified to ensure the config is valid
860 * for the selected bucket type.
862 std::unique_ptr<KVBucket> makeBucket(Configuration& config);
865 * helper method so that some commands can set the datatype of the document.
867 * @param cookie connection cookie
868 * @param datatype the current document datatype
869 * @param body a buffer containing the document body
870 * @returns a datatype which will now include JSON if the document is JSON
871 * and the connection does not support datatype JSON.
873 protocol_binary_datatype_t checkForDatatypeJson(
875 protocol_binary_datatype_t datatype,
876 cb::const_char_buffer body);
879 * Process the set_with_meta with the given buffers/values.
881 * @param vbucket VB to mutate
882 * @param key DocKey initialised with key data
883 * @param value buffer for the mutation's value
884 * @param itemMeta mutation's cas/revseq/flags/expiration
885 * @param isDeleted the Item is deleted (with value)
886 * @param datatype datatype of the mutation
887 * @param cas [in,out] CAS for the command (updated with new CAS)
888 * @param seqno [out] optional - returns the seqno allocated to the mutation
889 * @param cookie connection's cookie
890 * @param force Should the set skip conflict resolution?
891 * @param allowExisting true if the set can overwrite existing key
892 * @param genBySeqno generate a new seqno? (yes/no)
893 * @param genCas generate a new CAS? (yes/no)
894 * @param emd buffer referencing ExtendedMetaData
895 * @returns state of the operation as an ENGINE_ERROR_CODE
897 ENGINE_ERROR_CODE setWithMeta(uint16_t vbucket,
899 cb::const_byte_buffer value,
900 ItemMetaData itemMeta,
902 protocol_binary_datatype_t datatype,
908 GenerateBySeqno genBySeqno,
910 cb::const_byte_buffer emd);
913 * Process the del_with_meta with the given buffers/values.
915 * @param vbucket VB to mutate
916 * @param key DocKey initialised with key data
917 * @param itemMeta mutation's cas/revseq/flags/expiration
918 * @param cas [in,out] CAS for the command (updated with new CAS)
919 * @param seqno [out] optional - returns the seqno allocated to the mutation
920 * @param cookie connection's cookie
921 * @param force Should the set skip conflict resolution?
922 * @param genBySeqno generate a new seqno? (yes/no)
923 * @param genCas generate a new CAS? (yes/no)
924 * @param emd buffer referencing ExtendedMetaData
925 * @returns state of the operation as an ENGINE_ERROR_CODE
927 ENGINE_ERROR_CODE deleteWithMeta(uint16_t vbucket,
929 ItemMetaData itemMeta,
934 GenerateBySeqno genBySeqno,
936 cb::const_byte_buffer emd);
938 SERVER_HANDLE_V1 *serverApi;
939 std::unique_ptr<KVBucket> kvBucket;
940 WorkLoadPolicy *workload;
941 bucket_priority_t workloadPriority;
943 ReplicationThrottle *replicationThrottle;
944 std::map<const void*, Item*> lookups;
945 std::unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
946 std::mutex lookupMutex;
947 GET_SERVER_API getServerApiFunc;
950 char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
953 DcpConnMap *dcpConnMap_;
954 DcpFlowControlManager *dcpFlowControlManager_;
955 TapConnMap *tapConnMap;
956 TapConfig *tapConfig;
957 CheckpointConfig *checkpointConfig;
960 size_t maxItemPrivilegedBytes;
961 size_t getlDefaultTimeout;
962 size_t getlMaxTimeout;
963 size_t maxFailoverEntries;
965 Configuration configuration;
966 std::atomic<bool> trafficEnabled;
968 bool deleteAllEnabled;
969 // a unique system generated token initialized at each time
970 // ep_engine starts up.
971 std::atomic<time_t> startupTime;
972 EpEngineTaskable taskable;
975 #endif // SRC_EP_ENGINE_H_