1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2013 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
23 #include "kv_bucket.h"
24 #include "storeddockey.h"
25 #include "tapconnection.h"
29 #include <memcached/engine.h>
30 #include <platform/processclock.h>
36 class DcpFlowControlManager;
39 class ReplicationThrottle;
40 class VBucketCountVisitor;
44 ENGINE_ERROR_CODE create_instance(uint64_t interface,
45 GET_SERVER_API get_server_api,
46 ENGINE_HANDLE **handle);
49 void destroy_engine(void);
51 void EvpNotifyPendingConns(void*arg);
54 /* We're using notify_io_complete from ptr_fun, but that func
55 * got a "C" linkage that ptr_fun doesn't like... just
56 * cast it away with this typedef ;)
58 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
59 ENGINE_ERROR_CODE status);
62 class EventuallyPersistentEngine;
66 To allow Engines to run tasks.
68 class EpEngineTaskable : public Taskable {
70 EpEngineTaskable(EventuallyPersistentEngine* e) : myEngine(e) {
74 const std::string& getName() const;
76 task_gid_t getGID() const;
78 bucket_priority_t getWorkloadPriority() const;
80 void setWorkloadPriority(bucket_priority_t prio);
82 WorkLoadPolicy& getWorkLoadPolicy(void);
84 void logQTime(TaskId id, const ProcessClock::duration enqTime);
86 void logRunTime(TaskId id, const ProcessClock::duration runTime);
89 EventuallyPersistentEngine* myEngine;
93 * A container class holding VBucketCountVisitors to aggregate stats for
94 * different vbucket states.
96 class VBucketCountAggregator : public VBucketVisitor {
98 void visitBucket(VBucketPtr &vb) override;
100 void addVisitor(VBucketCountVisitor* visitor);
102 std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
106 * memcached engine interface to the KVBucket.
108 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
109 friend class LookupCallback;
111 ENGINE_ERROR_CODE initialize(const char* config);
112 void destroy(bool force);
114 ENGINE_ERROR_CODE itemAllocate(item** itm,
117 const size_t priv_nbytes,
119 const rel_time_t exptime,
123 * Delete a given key and value from the engine.
125 * @param cookie The cookie representing the connection
126 * @param key The key that needs to be deleted from the engine
127 * @param cas CAS value of the mutation that needs to be returned
129 * @param vbucket vbucket id to which the deleted key corresponds to
130 * @param itm item pointer that contains a value that needs to be
131 * stored along with a delete. A NULL pointer indicates
132 * that no value needs to be stored with the delete.
133 * @param item_meta pointer to item meta data that needs to be
134 * as a result the delete. A NULL pointer indicates
135 * that no meta data needs to be returned.
136 * @param mut_info pointer to the mutation info that resulted from
139 * @returns ENGINE_SUCCESS if the delete was successful or
140 * an error code indicating the error
142 ENGINE_ERROR_CODE itemDelete(const void* cookie,
147 ItemMetaData* item_meta,
148 mutation_descr_t* mut_info) {
149 ENGINE_ERROR_CODE ret = kvBucket->deleteItem(key,
157 if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
158 if (isDegradedMode()) {
159 return ENGINE_TMPFAIL;
161 } else if (ret == ENGINE_SUCCESS) {
162 ++stats.numOpsDelete;
168 void itemRelease(const void* cookie, item *itm)
174 ENGINE_ERROR_CODE get(const void* cookie,
178 get_options_t options)
180 BlockTimer timer(&stats.getCmdHisto);
181 GetValue gv(kvBucket->get(key, vbucket, cookie, options));
182 ENGINE_ERROR_CODE ret = gv.getStatus();
184 if (ret == ENGINE_SUCCESS) {
185 *itm = gv.getValue();
186 if (options & TRACK_STATISTICS) {
189 } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
190 if (isDegradedMode()) {
191 return ENGINE_TMPFAIL;
198 cb::EngineErrorItemPair get_if(const void* cookie,
202 const item_info&)> filter);
204 ENGINE_ERROR_CODE get_locked(const void* cookie,
208 uint32_t lock_timeout);
211 ENGINE_ERROR_CODE unlock(const void* cookie,
217 const std::string& getName() const {
221 ENGINE_ERROR_CODE getStats(const void* cookie,
222 const char* stat_key,
229 kvBucket->resetUnderlyingStats();
233 ENGINE_ERROR_CODE store(const void *cookie,
236 ENGINE_STORE_OPERATION operation);
238 ENGINE_ERROR_CODE flush(const void *cookie);
240 uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
241 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
242 uint32_t *seqno, uint16_t *vbucket);
244 bool createTapQueue(const void *cookie,
247 const void *userdata,
250 ENGINE_ERROR_CODE tapNotify(const void *cookie,
251 void *engine_specific,
267 ENGINE_ERROR_CODE dcpOpen(const void* cookie,
271 const void *stream_name,
274 ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
279 ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
282 uint64_t checkpointId);
284 ENGINE_ERROR_CODE touch(const void* cookie,
285 protocol_binary_request_header *request,
286 ADD_RESPONSE response,
287 DocNamespace docNamespace);
289 ENGINE_ERROR_CODE getMeta(const void* cookie,
290 protocol_binary_request_get_meta *request,
291 ADD_RESPONSE response,
292 DocNamespace docNamespace);
294 ENGINE_ERROR_CODE setWithMeta(const void* cookie,
295 protocol_binary_request_set_with_meta *request,
296 ADD_RESPONSE response,
297 DocNamespace docNamespace);
299 ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
300 protocol_binary_request_delete_with_meta *request,
301 ADD_RESPONSE response,
302 DocNamespace docNamespace);
304 ENGINE_ERROR_CODE returnMeta(const void* cookie,
305 protocol_binary_request_return_meta *request,
306 ADD_RESPONSE response,
307 DocNamespace docNamespace);
309 ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
310 protocol_binary_request_set_cluster_config *request,
311 ADD_RESPONSE response);
313 ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
314 protocol_binary_request_get_cluster_config *request,
315 ADD_RESPONSE response);
317 ENGINE_ERROR_CODE getAllKeys(const void* cookie,
318 protocol_binary_request_get_keys *request,
319 ADD_RESPONSE response,
320 DocNamespace docNamespace);
323 * Visit the objects and add them to the tap/dcp connecitons queue.
324 * @todo this code should honor the backfill time!
326 void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
328 void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
329 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
330 serverApi->cookie->set_priority(cookie, priority);
331 ObjectRegistry::onSwitchThread(epe);
334 void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
335 if (cookie == NULL) {
336 LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
338 BlockTimer bt(&stats.notifyIOHisto);
339 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
340 serverApi->cookie->notify_io_complete(cookie, status);
341 ObjectRegistry::onSwitchThread(epe);
345 ENGINE_ERROR_CODE reserveCookie(const void *cookie);
346 ENGINE_ERROR_CODE releaseCookie(const void *cookie);
348 void storeEngineSpecific(const void *cookie, void *engine_data) {
349 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
350 serverApi->cookie->store_engine_specific(cookie, engine_data);
351 ObjectRegistry::onSwitchThread(epe);
354 void *getEngineSpecific(const void *cookie) {
355 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
356 void *engine_data = serverApi->cookie->get_engine_specific(cookie);
357 ObjectRegistry::onSwitchThread(epe);
361 bool isDatatypeSupported(const void* cookie,
362 protocol_binary_datatype_t datatype) {
363 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
365 serverApi->cookie->is_datatype_supported(cookie, datatype);
366 ObjectRegistry::onSwitchThread(epe);
370 bool isMutationExtrasSupported(const void *cookie) {
371 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
372 bool isSupported = serverApi->cookie->is_mutation_extras_supported(cookie);
373 ObjectRegistry::onSwitchThread(epe);
377 bool isXattrSupported(const void* cookie) {
378 return isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_XATTR);
381 uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
382 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
383 uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
384 ObjectRegistry::onSwitchThread(epe);
388 bool validateSessionCas(const uint64_t cas) {
389 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
390 bool ret = serverApi->cookie->validate_session_cas(cas);
391 ObjectRegistry::onSwitchThread(epe);
395 void decrementSessionCtr(void) {
396 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
397 serverApi->cookie->decrement_session_ctr();
398 ObjectRegistry::onSwitchThread(epe);
401 void registerEngineCallback(ENGINE_EVENT_TYPE type,
402 EVENT_CALLBACK cb, const void *cb_data);
404 template <typename T>
405 void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
406 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
407 std::for_each(cookies.begin(), cookies.end(),
408 std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
410 ObjectRegistry::onSwitchThread(epe);
413 void handleDisconnect(const void *cookie);
414 void handleDeleteBucket(const void *cookie);
416 protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
418 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
420 if (!kvBucket->pauseFlusher()) {
421 LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
422 *msg = "Flusher not running.";
423 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
428 protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
430 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
432 if (!kvBucket->resumeFlusher()) {
433 LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
434 *msg = "Flusher not shut down.";
435 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
440 ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
441 return kvBucket->deleteVBucket(vbid, c);
444 ENGINE_ERROR_CODE compactDB(uint16_t vbid,
446 const void *cookie = NULL) {
447 return kvBucket->scheduleCompaction(vbid, c, cookie);
450 bool resetVBucket(uint16_t vbid) {
451 return kvBucket->resetVBucket(vbid);
454 void setTapKeepAlive(uint32_t to) {
455 configuration.setTapKeepalive((size_t)to);
458 void setDeleteAll(bool enabled) {
459 deleteAllEnabled = enabled;
462 protocol_binary_response_status evictKey(const DocKey& key,
465 return kvBucket->evictKey(key, vbucket, msg);
468 ENGINE_ERROR_CODE observe(const void* cookie,
469 protocol_binary_request_header *request,
470 ADD_RESPONSE response,
471 DocNamespace docNamespace);
473 ENGINE_ERROR_CODE observe_seqno(const void* cookie,
474 protocol_binary_request_header *request,
475 ADD_RESPONSE response);
477 VBucketPtr getVBucket(uint16_t vbucket) {
478 return kvBucket->getVBucket(vbucket);
481 ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to,
483 return kvBucket->setVBucketState(vbid, to, transfer);
486 protocol_binary_response_status setParam(
487 protocol_binary_request_set_param* req, std::string& msg);
489 protocol_binary_response_status setFlushParam(const char* keyz,
493 protocol_binary_response_status setTapParam(const char* keyz,
497 protocol_binary_response_status setCheckpointParam(const char* keyz,
501 protocol_binary_response_status setDcpParam(const char* keyz,
505 protocol_binary_response_status setVbucketParam(uint16_t vbucket,
510 ~EventuallyPersistentEngine();
512 engine_info *getInfo() {
516 item_info getItemInfo(const Item& itm);
518 EPStats &getEpStats() {
522 KVBucket* getKVBucket() { return kvBucket.get(); }
524 TapConnMap &getTapConnMap() { return *tapConnMap; }
526 DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
528 DcpFlowControlManager &getDcpFlowControlManager() {
529 return *dcpFlowControlManager_;
532 TapConfig &getTapConfig() { return *tapConfig; }
534 ReplicationThrottle &getReplicationThrottle() { return *replicationThrottle; }
536 CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
538 SERVER_HANDLE_V1* getServerApi() { return serverApi; }
540 Configuration &getConfiguration() {
541 return configuration;
544 ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
545 protocol_binary_request_header *request,
546 ADD_RESPONSE response);
548 ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
549 protocol_binary_request_header *request,
550 ADD_RESPONSE response);
552 ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
553 protocol_binary_request_header *request,
554 ADD_RESPONSE response);
556 ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
557 protocol_binary_request_header *request,
558 ADD_RESPONSE response);
560 ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
561 protocol_binary_request_header *request,
562 ADD_RESPONSE response);
564 ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
565 protocol_binary_request_header *request,
566 ADD_RESPONSE response);
568 size_t getGetlDefaultTimeout() const {
569 return getlDefaultTimeout;
572 size_t getGetlMaxTimeout() const {
573 return getlMaxTimeout;
576 size_t getMaxFailoverEntries() const {
577 return maxFailoverEntries;
580 bool isDegradedMode() const {
581 return kvBucket->isWarmingUp() || !trafficEnabled.load();
584 WorkLoadPolicy &getWorkLoadPolicy(void) {
588 bucket_priority_t getWorkloadPriority(void) const {return workloadPriority; }
589 void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
591 struct clusterConfig {
596 ENGINE_ERROR_CODE getRandomKey(const void *cookie,
597 ADD_RESPONSE response);
599 ConnHandler* getConnHandler(const void *cookie);
601 void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
604 * Explicitly trigger the defragmenter task. Provided to facilitate
607 void runDefragmenterTask(void);
610 * Explicitly trigger the AccessScanner task. Provided to facilitate
613 bool runAccessScannerTask(void);
616 * Explicitly trigger the VbStatePersist task. Provided to facilitate
619 void runVbStatePersistTask(int vbid);
622 * Get a (sloppy) list of the sequence numbers for all of the vbuckets
623 * on this server. It is not to be treated as a consistent set of seqence,
624 * but rather a list of "at least" numbers. The way the list is generated
625 * is that we're starting for vbucket 0 and record the current number,
626 * then look at the next vbucket and record its number. That means that
627 * at the time we get the number for vbucket X all of the previous
628 * numbers could have been incremented. If the client just needs a list
629 * of where we are for each vbucket this method may be more optimal than
630 * requesting one by one.
632 * @param cookie The cookie representing the connection to requesting
634 * @param add_response The method used to format the output buffer
635 * @return ENGINE_SUCCESS upon success
637 ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(
639 protocol_binary_request_header *request,
640 ADD_RESPONSE response);
642 void updateDcpMinCompressionRatio(float value);
645 * Sends a not-my-vbucket response, using the specified response callback.
646 * to the specified connection via it's cookie.
648 ENGINE_ERROR_CODE sendNotMyVBucketResponse(ADD_RESPONSE response,
652 EpEngineTaskable& getTaskable() {
657 friend class EpEngineValueChangeListener;
659 void setMaxItemSize(size_t value) {
663 void setMaxItemPrivilegedBytes(size_t value) {
664 maxItemPrivilegedBytes = value;
667 void setGetlDefaultTimeout(size_t value) {
668 getlDefaultTimeout = value;
671 void setGetlMaxTimeout(size_t value) {
672 getlMaxTimeout = value;
675 EventuallyPersistentEngine(GET_SERVER_API get_server_api);
676 friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
677 GET_SERVER_API get_server_api,
678 ENGINE_HANDLE **handle);
679 uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
680 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
681 uint32_t *seqno, uint16_t *vbucket,
682 TapProducer *c, bool &retry);
685 ENGINE_ERROR_CODE processTapAck(const void *cookie,
691 * Report the state of a memory condition when out of memory.
693 * @return ETMPFAIL if we think we can recover without interaction,
696 ENGINE_ERROR_CODE memoryCondition();
699 * Check if there is any available memory space to allocate an Item
700 * instance with a given size.
702 bool hasAvailableSpace(uint32_t nBytes) {
703 return (stats.getTotalMemoryUsed() + nBytes) <= stats.getMaxDataSize();
706 friend class BGFetchCallback;
707 friend class KVBucket;
708 friend class EPBucket;
710 bool enableTraffic(bool enable) {
711 bool inverse = !enable;
712 return trafficEnabled.compare_exchange_strong(inverse, enable);
715 ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
716 ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
717 ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
718 ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
719 const char* stat_key,
721 bool prevStateRequested,
723 ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
724 ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
725 const char* stat_key, int nkey);
726 ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
727 ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
728 ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
729 const char *sep, size_t nsep,
730 conn_type_t connType);
731 ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
732 ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
733 ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
734 ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
735 ENGINE_ERROR_CODE doTasksStats(const void* cookie, ADD_STAT add_stat);
736 ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
737 uint16_t vbid, const DocKey& key, bool validate=false);
738 ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
743 ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
747 ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
750 ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
751 ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
752 ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
753 const char* stat_key, int nkey);
754 void addSeqnoVbStats(const void *cookie, ADD_STAT add_stat,
755 const VBucketPtr &vb);
757 void addLookupResult(const void *cookie, Item *result) {
758 LockHolder lh(lookupMutex);
759 std::map<const void*, Item*>::iterator it = lookups.find(cookie);
760 if (it != lookups.end()) {
761 if (it->second != NULL) {
762 LOG(EXTENSION_LOG_DEBUG,
763 "Cleaning up old lookup result for '%s'",
764 it->second->getKey().data());
767 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
771 lookups[cookie] = result;
774 bool fetchLookupResult(const void *cookie, Item **itm) {
775 // This will return *and erase* the lookup result for a connection.
776 // You look it up, you own it.
777 LockHolder lh(lookupMutex);
778 std::map<const void*, Item*>::iterator it = lookups.find(cookie);
779 if (it != lookups.end()) {
788 // Get the current tap connection for this cookie.
789 // If this method returns NULL, you should return TAP_DISCONNECT
790 TapProducer* getTapProducer(const void *cookie);
792 // Initialize all required callbacks of this engine with the underlying
794 void initializeEngineCallbacks();
797 * Private helper method for decoding the options on set/del_with_meta.
798 * Tighly coupled to the logic of both those functions, it will
799 * take a request pointer and locate and validate any options within.
800 * @param request pointer to the set/del_with_meta request packet
801 * @param generateCas set to Yes if CAS regeneration is enabled.
802 * @param skipConflictResolution set to true if conflict resolution should
804 * @param keyOffset set to the number of bytes which are to be skipped to
807 protocol_binary_response_status decodeWithMetaOptions(
808 protocol_binary_request_delete_with_meta* request,
809 GenerateCas& generateCas,
810 bool& skipConflictResolution,
814 * Sends NOT_SUPPORTED response, using the specified response callback
815 * to the specified connection via it's cookie.
817 * @param response callback func to send the response
818 * @param cookie conn cookie
820 * @return status of sending response
822 ENGINE_ERROR_CODE sendNotSupportedResponse(ADD_RESPONSE response,
826 * Sends error response, using the specified error and response callback
827 * to the specified connection via it's cookie.
829 * @param response callback func to send the response
830 * @param status error status to send
831 * @param cas a cas value to send
832 * @param cookie conn cookie
834 * @return status of sending response
836 ENGINE_ERROR_CODE sendErrorResponse(ADD_RESPONSE response,
837 protocol_binary_response_status status,
842 * Sends a response that includes the mutation extras, the VB uuid and
843 * seqno of the mutation.
845 * @param response callback func to send the response
846 * @param vbucket vbucket that was mutated
847 * @param bySeqno the seqno to send
848 * @param status a mcbp status code
849 * @param cas cas assigned to the mutation
850 * @param cookie conn cookie
851 * @returns NMVB if VB can't be located, or the ADD_RESPONSE return code.
853 ENGINE_ERROR_CODE sendMutationExtras(ADD_RESPONSE response,
856 protocol_binary_response_status status,
861 * Factory method for constructing the correct bucket type given the
863 * @param config Configuration to create bucket based on. Note this
864 * object may be modified to ensure the config is valid
865 * for the selected bucket type.
867 std::unique_ptr<KVBucket> makeBucket(Configuration& config);
870 * helper method so that some commands can set the datatype of the document.
872 * @param cookie connection cookie
873 * @param datatype the current document datatype
874 * @param body a buffer containing the document body
875 * @returns a datatype which will now include JSON if the document is JSON
876 * and the connection does not support datatype JSON.
878 protocol_binary_datatype_t checkForDatatypeJson(
880 protocol_binary_datatype_t datatype,
881 cb::const_char_buffer body);
884 * Process the set_with_meta with the given buffers/values.
886 * @param vbucket VB to mutate
887 * @param key DocKey initialised with key data
888 * @param value buffer for the mutation's value
889 * @param itemMeta mutation's cas/revseq/flags/expiration
890 * @param isDeleted the Item is deleted (with value)
891 * @param datatype datatype of the mutation
892 * @param cas [in,out] CAS for the command (updated with new CAS)
893 * @param seqno [out] optional - returns the seqno allocated to the mutation
894 * @param cookie connection's cookie
895 * @param force Should the set skip conflict resolution?
896 * @param allowExisting true if the set can overwrite existing key
897 * @param genBySeqno generate a new seqno? (yes/no)
898 * @param genCas generate a new CAS? (yes/no)
899 * @param emd buffer referencing ExtendedMetaData
900 * @returns state of the operation as an ENGINE_ERROR_CODE
902 ENGINE_ERROR_CODE setWithMeta(uint16_t vbucket,
904 cb::const_byte_buffer value,
905 ItemMetaData itemMeta,
907 protocol_binary_datatype_t datatype,
913 GenerateBySeqno genBySeqno,
915 cb::const_byte_buffer emd);
918 * Process the del_with_meta with the given buffers/values.
920 * @param vbucket VB to mutate
921 * @param key DocKey initialised with key data
922 * @param itemMeta mutation's cas/revseq/flags/expiration
923 * @param cas [in,out] CAS for the command (updated with new CAS)
924 * @param seqno [out] optional - returns the seqno allocated to the mutation
925 * @param cookie connection's cookie
926 * @param force Should the set skip conflict resolution?
927 * @param genBySeqno generate a new seqno? (yes/no)
928 * @param genCas generate a new CAS? (yes/no)
929 * @param emd buffer referencing ExtendedMetaData
930 * @returns state of the operation as an ENGINE_ERROR_CODE
932 ENGINE_ERROR_CODE deleteWithMeta(uint16_t vbucket,
934 ItemMetaData itemMeta,
939 GenerateBySeqno genBySeqno,
941 cb::const_byte_buffer emd);
943 SERVER_HANDLE_V1 *serverApi;
944 std::unique_ptr<KVBucket> kvBucket;
945 WorkLoadPolicy *workload;
946 bucket_priority_t workloadPriority;
948 ReplicationThrottle *replicationThrottle;
949 std::map<const void*, Item*> lookups;
950 std::unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
951 std::mutex lookupMutex;
952 GET_SERVER_API getServerApiFunc;
955 char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
958 DcpConnMap *dcpConnMap_;
959 DcpFlowControlManager *dcpFlowControlManager_;
960 TapConnMap *tapConnMap;
961 TapConfig *tapConfig;
962 CheckpointConfig *checkpointConfig;
965 size_t maxItemPrivilegedBytes;
966 size_t getlDefaultTimeout;
967 size_t getlMaxTimeout;
968 size_t maxFailoverEntries;
970 Configuration configuration;
971 std::atomic<bool> trafficEnabled;
973 bool deleteAllEnabled;
974 // a unique system generated token initialized at each time
975 // ep_engine starts up.
976 std::atomic<time_t> startupTime;
977 EpEngineTaskable taskable;
980 #endif // SRC_EP_ENGINE_H_