a2cb9d1bfb2524a9167425ef46bd083fe293f0af
[ep-engine.git] / src / ep_engine.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
20
21 #include "config.h"
22
23 #include "kv_bucket.h"
24 #include "storeddockey.h"
25 #include "tapconnection.h"
26 #include "taskable.h"
27 #include "vbucket.h"
28
29 #include <memcached/engine.h>
30 #include <platform/processclock.h>
31
32 #include <string>
33
34 class StoredValue;
35 class DcpConnMap;
36 class DcpFlowControlManager;
37 class Producer;
38 class TapConnMap;
39 class ReplicationThrottle;
40 class VBucketCountVisitor;
41
42 extern "C" {
43     EXPORT_FUNCTION
44     ENGINE_ERROR_CODE create_instance(uint64_t interface,
45                                       GET_SERVER_API get_server_api,
46                                       ENGINE_HANDLE **handle);
47
48     EXPORT_FUNCTION
49     void destroy_engine(void);
50
51     void EvpNotifyPendingConns(void*arg);
52 }
53
54 /* We're using notify_io_complete from ptr_fun, but that func
55  * got a "C" linkage that ptr_fun doesn't like... just
56  * cast it away with this typedef ;)
57  */
58 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
59                                      ENGINE_ERROR_CODE status);
60
61 // Forward decl
62 class EventuallyPersistentEngine;
63 class TapConnMap;
64
65 /**
66     To allow Engines to run tasks.
67 **/
68 class EpEngineTaskable : public Taskable {
69 public:
70     EpEngineTaskable(EventuallyPersistentEngine* e) : myEngine(e) {
71
72     }
73
74     const std::string& getName() const;
75
76     task_gid_t getGID() const;
77
78     bucket_priority_t getWorkloadPriority() const;
79
80     void setWorkloadPriority(bucket_priority_t prio);
81
82     WorkLoadPolicy& getWorkLoadPolicy(void);
83
84     void logQTime(TaskId id, const ProcessClock::duration enqTime);
85
86     void logRunTime(TaskId id, const ProcessClock::duration runTime);
87
88 private:
89     EventuallyPersistentEngine* myEngine;
90 };
91
92 /**
93  * A container class holding VBucketCountVisitors to aggregate stats for
94  * different vbucket states.
95  */
96 class VBucketCountAggregator : public VBucketVisitor  {
97 public:
98     void visitBucket(VBucketPtr &vb) override;
99
100     void addVisitor(VBucketCountVisitor* visitor);
101 private:
102     std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
103 };
104
105 /**
106  * memcached engine interface to the KVBucket.
107  */
108 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
109     friend class LookupCallback;
110 public:
111     ENGINE_ERROR_CODE initialize(const char* config);
112     void destroy(bool force);
113
114     ENGINE_ERROR_CODE itemAllocate(item** itm,
115                                    const DocKey& key,
116                                    const size_t nbytes,
117                                    const size_t priv_nbytes,
118                                    const int flags,
119                                    const rel_time_t exptime,
120                                    uint8_t datatype,
121                                    uint16_t vbucket);
122     /**
123      * Delete a given key and value from the engine.
124      *
125      * @param cookie The cookie representing the connection
126      * @param key The key that needs to be deleted from the engine
127      * @param cas CAS value of the mutation that needs to be returned
128      *            back to the client
129      * @param vbucket vbucket id to which the deleted key corresponds to
130      * @param itm item pointer that contains a value that needs to be
131      *            stored along with a delete. A NULL pointer indicates
132      *            that no value needs to be stored with the delete.
133      * @param item_meta pointer to item meta data that needs to be
134      *                  as a result the delete. A NULL pointer indicates
135      *                  that no meta data needs to be returned.
136      * @param mut_info pointer to the mutation info that resulted from
137      *                 the delete.
138      *
139      * @returns ENGINE_SUCCESS if the delete was successful or
140      *          an error code indicating the error
141      */
142     ENGINE_ERROR_CODE itemDelete(const void* cookie,
143                                  const DocKey& key,
144                                  uint64_t& cas,
145                                  uint16_t vbucket,
146                                  Item* itm,
147                                  ItemMetaData* item_meta,
148                                  mutation_descr_t* mut_info) {
149         ENGINE_ERROR_CODE ret = kvBucket->deleteItem(key,
150                                                      cas,
151                                                      vbucket,
152                                                      cookie,
153                                                      itm,
154                                                      item_meta,
155                                                      mut_info);
156
157         if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
158             if (isDegradedMode()) {
159                 return ENGINE_TMPFAIL;
160             }
161         } else if (ret == ENGINE_SUCCESS) {
162             ++stats.numOpsDelete;
163         }
164         return ret;
165     }
166
167
168     void itemRelease(const void* cookie, item *itm)
169     {
170         (void)cookie;
171         delete (Item*)itm;
172     }
173
174     ENGINE_ERROR_CODE get(const void* cookie,
175                           item** itm,
176                           const DocKey& key,
177                           uint16_t vbucket,
178                           get_options_t options)
179     {
180         BlockTimer timer(&stats.getCmdHisto);
181         GetValue gv(kvBucket->get(key, vbucket, cookie, options));
182         ENGINE_ERROR_CODE ret = gv.getStatus();
183
184         if (ret == ENGINE_SUCCESS) {
185             *itm = gv.getValue();
186             if (options & TRACK_STATISTICS) {
187                 ++stats.numOpsGet;
188             }
189         } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
190             if (isDegradedMode()) {
191                 return ENGINE_TMPFAIL;
192             }
193         }
194
195         return ret;
196     }
197
198     cb::EngineErrorItemPair get_if(const void* cookie,
199                                    const DocKey& key,
200                                    uint16_t vbucket,
201                                    std::function<bool(
202                                        const item_info&)> filter);
203
204     ENGINE_ERROR_CODE get_locked(const void* cookie,
205                                  item** itm,
206                                  const DocKey& key,
207                                  uint16_t vbucket,
208                                  uint32_t lock_timeout);
209
210
211     ENGINE_ERROR_CODE unlock(const void* cookie,
212                              const DocKey& key,
213                              uint16_t vbucket,
214                              uint64_t cas);
215
216
217     const std::string& getName() const {
218         return name;
219     }
220
221     ENGINE_ERROR_CODE getStats(const void* cookie,
222                                const char* stat_key,
223                                int nkey,
224                                ADD_STAT add_stat);
225
226     void resetStats() {
227         stats.reset();
228         if (kvBucket) {
229             kvBucket->resetUnderlyingStats();
230         }
231     }
232
233     ENGINE_ERROR_CODE store(const void *cookie,
234                             item* itm,
235                             uint64_t *cas,
236                             ENGINE_STORE_OPERATION operation);
237
238     ENGINE_ERROR_CODE flush(const void *cookie);
239
240     uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
241                           uint16_t *nes, uint8_t *ttl, uint16_t *flags,
242                           uint32_t *seqno, uint16_t *vbucket);
243
244     bool createTapQueue(const void *cookie,
245                         std::string &client,
246                         uint32_t flags,
247                         const void *userdata,
248                         size_t nuserdata);
249
250     ENGINE_ERROR_CODE tapNotify(const void *cookie,
251                                 void *engine_specific,
252                                 uint16_t nengine,
253                                 uint8_t ttl,
254                                 uint16_t tap_flags,
255                                 uint16_t tap_event,
256                                 uint32_t tap_seqno,
257                                 const void *key,
258                                 size_t nkey,
259                                 uint32_t flags,
260                                 uint32_t exptime,
261                                 uint64_t cas,
262                                 uint8_t datatype,
263                                 const void *data,
264                                 size_t ndata,
265                                 uint16_t vbucket);
266
267     ENGINE_ERROR_CODE dcpOpen(const void* cookie,
268                               uint32_t opaque,
269                               uint32_t seqno,
270                               uint32_t flags,
271                               const void *stream_name,
272                               uint16_t nname);
273
274     ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
275                                    uint32_t opaque,
276                                    uint16_t vbucket,
277                                    uint32_t flags);
278
279     ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
280                                             uint8_t event,
281                                             uint16_t vbucket,
282                                             uint64_t checkpointId);
283
284     ENGINE_ERROR_CODE touch(const void* cookie,
285                             protocol_binary_request_header *request,
286                             ADD_RESPONSE response,
287                             DocNamespace docNamespace);
288
289     ENGINE_ERROR_CODE getMeta(const void* cookie,
290                               protocol_binary_request_get_meta *request,
291                               ADD_RESPONSE response,
292                               DocNamespace docNamespace);
293
294     ENGINE_ERROR_CODE setWithMeta(const void* cookie,
295                                  protocol_binary_request_set_with_meta *request,
296                                  ADD_RESPONSE response,
297                                  DocNamespace docNamespace);
298
299     ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
300                               protocol_binary_request_delete_with_meta *request,
301                               ADD_RESPONSE response,
302                               DocNamespace docNamespace);
303
304     ENGINE_ERROR_CODE returnMeta(const void* cookie,
305                                  protocol_binary_request_return_meta *request,
306                                  ADD_RESPONSE response,
307                                  DocNamespace docNamespace);
308
309     ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
310                             protocol_binary_request_set_cluster_config *request,
311                             ADD_RESPONSE response);
312
313     ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
314                             protocol_binary_request_get_cluster_config *request,
315                             ADD_RESPONSE response);
316
317     ENGINE_ERROR_CODE getAllKeys(const void* cookie,
318                                 protocol_binary_request_get_keys *request,
319                                 ADD_RESPONSE response,
320                                 DocNamespace docNamespace);
321
322     /**
323      * Visit the objects and add them to the tap/dcp connecitons queue.
324      * @todo this code should honor the backfill time!
325      */
326     void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
327
328     void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
329         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
330         serverApi->cookie->set_priority(cookie, priority);
331         ObjectRegistry::onSwitchThread(epe);
332     }
333
334     void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
335         if (cookie == NULL) {
336             LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
337         } else {
338             BlockTimer bt(&stats.notifyIOHisto);
339             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
340             serverApi->cookie->notify_io_complete(cookie, status);
341             ObjectRegistry::onSwitchThread(epe);
342         }
343     }
344
345     ENGINE_ERROR_CODE reserveCookie(const void *cookie);
346     ENGINE_ERROR_CODE releaseCookie(const void *cookie);
347
348     void storeEngineSpecific(const void *cookie, void *engine_data) {
349         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
350         serverApi->cookie->store_engine_specific(cookie, engine_data);
351         ObjectRegistry::onSwitchThread(epe);
352     }
353
354     void *getEngineSpecific(const void *cookie) {
355         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
356         void *engine_data = serverApi->cookie->get_engine_specific(cookie);
357         ObjectRegistry::onSwitchThread(epe);
358         return engine_data;
359     }
360
361     bool isDatatypeSupported(const void* cookie,
362                              protocol_binary_datatype_t datatype) {
363         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
364         bool isSupported =
365                 serverApi->cookie->is_datatype_supported(cookie, datatype);
366         ObjectRegistry::onSwitchThread(epe);
367         return isSupported;
368     }
369
370     bool isMutationExtrasSupported(const void *cookie) {
371         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
372         bool isSupported = serverApi->cookie->is_mutation_extras_supported(cookie);
373         ObjectRegistry::onSwitchThread(epe);
374         return isSupported;
375     }
376
377     bool isXattrSupported(const void* cookie) {
378         return isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_XATTR);
379     }
380
381     uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
382         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
383         uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
384         ObjectRegistry::onSwitchThread(epe);
385         return opcode;
386     }
387
388     bool validateSessionCas(const uint64_t cas) {
389         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
390         bool ret = serverApi->cookie->validate_session_cas(cas);
391         ObjectRegistry::onSwitchThread(epe);
392         return ret;
393     }
394
395     void decrementSessionCtr(void) {
396         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
397         serverApi->cookie->decrement_session_ctr();
398         ObjectRegistry::onSwitchThread(epe);
399     }
400
401     void registerEngineCallback(ENGINE_EVENT_TYPE type,
402                                 EVENT_CALLBACK cb, const void *cb_data);
403
404     template <typename T>
405     void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
406         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
407         std::for_each(cookies.begin(), cookies.end(),
408                       std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
409                                    status));
410         ObjectRegistry::onSwitchThread(epe);
411     }
412
413     void handleDisconnect(const void *cookie);
414     void handleDeleteBucket(const void *cookie);
415
416     protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
417         (void) msg_size;
418         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
419         *msg = NULL;
420         if (!kvBucket->pauseFlusher()) {
421             LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
422             *msg = "Flusher not running.";
423             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
424         }
425         return rv;
426     }
427
428     protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
429         (void) msg_size;
430         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
431         *msg = NULL;
432         if (!kvBucket->resumeFlusher()) {
433             LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
434             *msg = "Flusher not shut down.";
435             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
436         }
437         return rv;
438     }
439
440     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
441         return kvBucket->deleteVBucket(vbid, c);
442     }
443
444     ENGINE_ERROR_CODE compactDB(uint16_t vbid,
445                                 compaction_ctx c,
446                                 const void *cookie = NULL) {
447         return kvBucket->scheduleCompaction(vbid, c, cookie);
448     }
449
450     bool resetVBucket(uint16_t vbid) {
451         return kvBucket->resetVBucket(vbid);
452     }
453
454     void setTapKeepAlive(uint32_t to) {
455         configuration.setTapKeepalive((size_t)to);
456     }
457
458     void setDeleteAll(bool enabled) {
459         deleteAllEnabled = enabled;
460     }
461
462     protocol_binary_response_status evictKey(const DocKey& key,
463                                              uint16_t vbucket,
464                                              const char** msg) {
465         return kvBucket->evictKey(key, vbucket, msg);
466     }
467
468     ENGINE_ERROR_CODE observe(const void* cookie,
469                               protocol_binary_request_header *request,
470                               ADD_RESPONSE response,
471                               DocNamespace docNamespace);
472
473     ENGINE_ERROR_CODE observe_seqno(const void* cookie,
474                                     protocol_binary_request_header *request,
475                                     ADD_RESPONSE response);
476
477     VBucketPtr getVBucket(uint16_t vbucket) {
478         return kvBucket->getVBucket(vbucket);
479     }
480
481     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to,
482                                       bool transfer) {
483         return kvBucket->setVBucketState(vbid, to, transfer);
484     }
485
486     protocol_binary_response_status setParam(
487             protocol_binary_request_set_param* req, std::string& msg);
488
489     protocol_binary_response_status setFlushParam(const char* keyz,
490                                                   const char* valz,
491                                                   std::string& msg);
492
493     protocol_binary_response_status setTapParam(const char* keyz,
494                                                 const char* valz,
495                                                 std::string& msg);
496
497     protocol_binary_response_status setCheckpointParam(const char* keyz,
498                                                        const char* valz,
499                                                        std::string& msg);
500
501     protocol_binary_response_status setDcpParam(const char* keyz,
502                                                 const char* valz,
503                                                 std::string& msg);
504
505     protocol_binary_response_status setVbucketParam(uint16_t vbucket,
506                                                     const char* keyz,
507                                                     const char* valz,
508                                                     std::string& msg);
509
510     ~EventuallyPersistentEngine();
511
512     engine_info *getInfo() {
513         return &info.info;
514     }
515
516     item_info getItemInfo(const Item& itm);
517
518     EPStats &getEpStats() {
519         return stats;
520     }
521
522     KVBucket* getKVBucket() { return kvBucket.get(); }
523
524     TapConnMap &getTapConnMap() { return *tapConnMap; }
525
526     DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
527
528     DcpFlowControlManager &getDcpFlowControlManager() {
529         return *dcpFlowControlManager_;
530     }
531
532     TapConfig &getTapConfig() { return *tapConfig; }
533
534     ReplicationThrottle &getReplicationThrottle() { return *replicationThrottle; }
535
536     CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
537
538     SERVER_HANDLE_V1* getServerApi() { return serverApi; }
539
540     Configuration &getConfiguration() {
541         return configuration;
542     }
543
544     ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
545                                           protocol_binary_request_header *request,
546                                           ADD_RESPONSE response);
547
548     ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
549                                            protocol_binary_request_header *request,
550                                            ADD_RESPONSE response);
551
552     ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
553                                       protocol_binary_request_header *request,
554                                       ADD_RESPONSE response);
555
556     ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
557                                             protocol_binary_request_header *request,
558                                             ADD_RESPONSE response);
559
560     ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
561                                         protocol_binary_request_header *request,
562                                         ADD_RESPONSE response);
563
564     ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
565                                               protocol_binary_request_header *request,
566                                               ADD_RESPONSE response);
567
568     size_t getGetlDefaultTimeout() const {
569         return getlDefaultTimeout;
570     }
571
572     size_t getGetlMaxTimeout() const {
573         return getlMaxTimeout;
574     }
575
576     size_t getMaxFailoverEntries() const {
577         return maxFailoverEntries;
578     }
579
580     bool isDegradedMode() const {
581         return kvBucket->isWarmingUp() || !trafficEnabled.load();
582     }
583
584     WorkLoadPolicy &getWorkLoadPolicy(void) {
585         return *workload;
586     }
587
588     bucket_priority_t getWorkloadPriority(void) const {return workloadPriority; }
589     void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
590
591     struct clusterConfig {
592         std::string config;
593         std::mutex lock;
594     } clusterConfig;
595
596     ENGINE_ERROR_CODE getRandomKey(const void *cookie,
597                                    ADD_RESPONSE response);
598
599     ConnHandler* getConnHandler(const void *cookie);
600
601     void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
602
603     /*
604      * Explicitly trigger the defragmenter task. Provided to facilitate
605      * testing.
606      */
607     void runDefragmenterTask(void);
608
609     /*
610      * Explicitly trigger the AccessScanner task. Provided to facilitate
611      * testing.
612      */
613     bool runAccessScannerTask(void);
614
615     /*
616      * Explicitly trigger the VbStatePersist task. Provided to facilitate
617      * testing.
618      */
619     void runVbStatePersistTask(int vbid);
620
621     /**
622      * Get a (sloppy) list of the sequence numbers for all of the vbuckets
623      * on this server. It is not to be treated as a consistent set of seqence,
624      * but rather a list of "at least" numbers. The way the list is generated
625      * is that we're starting for vbucket 0 and record the current number,
626      * then look at the next vbucket and record its number. That means that
627      * at the time we get the number for vbucket X all of the previous
628      * numbers could have been incremented. If the client just needs a list
629      * of where we are for each vbucket this method may be more optimal than
630      * requesting one by one.
631      *
632      * @param cookie The cookie representing the connection to requesting
633      *               list
634      * @param add_response The method used to format the output buffer
635      * @return ENGINE_SUCCESS upon success
636      */
637     ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(
638                                         const void *cookie,
639                                         protocol_binary_request_header *request,
640                                         ADD_RESPONSE response);
641
642     void updateDcpMinCompressionRatio(float value);
643
644     /**
645      * Sends a not-my-vbucket response, using the specified response callback.
646      * to the specified connection via it's cookie.
647      */
648     ENGINE_ERROR_CODE sendNotMyVBucketResponse(ADD_RESPONSE response,
649                                                const void* cookie,
650                                                uint64_t cas);
651
652     EpEngineTaskable& getTaskable() {
653         return taskable;
654     }
655
656 protected:
657     friend class EpEngineValueChangeListener;
658
659     void setMaxItemSize(size_t value) {
660         maxItemSize = value;
661     }
662
663     void setMaxItemPrivilegedBytes(size_t value) {
664         maxItemPrivilegedBytes = value;
665     }
666
667     void setGetlDefaultTimeout(size_t value) {
668         getlDefaultTimeout = value;
669     }
670
671     void setGetlMaxTimeout(size_t value) {
672         getlMaxTimeout = value;
673     }
674
675     EventuallyPersistentEngine(GET_SERVER_API get_server_api);
676     friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
677                                              GET_SERVER_API get_server_api,
678                                              ENGINE_HANDLE **handle);
679     uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
680                             uint16_t *nes, uint8_t *ttl, uint16_t *flags,
681                             uint32_t *seqno, uint16_t *vbucket,
682                             TapProducer *c, bool &retry);
683
684
685     ENGINE_ERROR_CODE processTapAck(const void *cookie,
686                                     uint32_t seqno,
687                                     uint16_t status,
688                                     const DocKey& key);
689
690     /**
691      * Report the state of a memory condition when out of memory.
692      *
693      * @return ETMPFAIL if we think we can recover without interaction,
694      *         else ENOMEM
695      */
696     ENGINE_ERROR_CODE memoryCondition();
697
698     /**
699      * Check if there is any available memory space to allocate an Item
700      * instance with a given size.
701      */
702     bool hasAvailableSpace(uint32_t nBytes) {
703         return (stats.getTotalMemoryUsed() + nBytes) <= stats.getMaxDataSize();
704     }
705
706     friend class BGFetchCallback;
707     friend class KVBucket;
708     friend class EPBucket;
709
710     bool enableTraffic(bool enable) {
711         bool inverse = !enable;
712         return trafficEnabled.compare_exchange_strong(inverse, enable);
713     }
714
715     ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
716     ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
717     ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
718     ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
719                                      const char* stat_key,
720                                      int nkey,
721                                      bool prevStateRequested,
722                                      bool details);
723     ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
724     ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
725                                         const char* stat_key, int nkey);
726     ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
727     ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
728     ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
729                                      const char *sep, size_t nsep,
730                                      conn_type_t connType);
731     ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
732     ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
733     ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
734     ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
735     ENGINE_ERROR_CODE doTasksStats(const void* cookie, ADD_STAT add_stat);
736     ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
737                                  uint16_t vbid, const DocKey& key, bool validate=false);
738     ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
739                                            ADD_STAT add_stat,
740                                            std::string& key,
741                                            uint16_t vbid);
742
743     ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
744                                            ADD_STAT add_stat,
745                                            std::string &key,
746                                            uint16_t vbid);
747     ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
748                                              ADD_STAT add_stat,
749                                              uint16_t vbid);
750     ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
751     ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
752     ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
753                                    const char* stat_key, int nkey);
754     void addSeqnoVbStats(const void *cookie, ADD_STAT add_stat,
755                                   const VBucketPtr &vb);
756
757     void addLookupResult(const void *cookie, Item *result) {
758         LockHolder lh(lookupMutex);
759         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
760         if (it != lookups.end()) {
761             if (it->second != NULL) {
762                 LOG(EXTENSION_LOG_DEBUG,
763                     "Cleaning up old lookup result for '%s'",
764                     it->second->getKey().data());
765                 delete it->second;
766             } else {
767                 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
768             }
769             lookups.erase(it);
770         }
771         lookups[cookie] = result;
772     }
773
774     bool fetchLookupResult(const void *cookie, Item **itm) {
775         // This will return *and erase* the lookup result for a connection.
776         // You look it up, you own it.
777         LockHolder lh(lookupMutex);
778         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
779         if (it != lookups.end()) {
780             *itm = it->second;
781             lookups.erase(it);
782             return true;
783         } else {
784             return false;
785         }
786     }
787
788     // Get the current tap connection for this cookie.
789     // If this method returns NULL, you should return TAP_DISCONNECT
790     TapProducer* getTapProducer(const void *cookie);
791
792     // Initialize all required callbacks of this engine with the underlying
793     // server.
794     void initializeEngineCallbacks();
795
796     /*
797      * Private helper method for decoding the options on set/del_with_meta.
798      * Tighly coupled to the logic of both those functions, it will
799      * take a request pointer and locate and validate any options within.
800      * @param request pointer to the set/del_with_meta request packet
801      * @param generateCas set to Yes if CAS regeneration is enabled.
802      * @param skipConflictResolution set to true if conflict resolution should
803      *        not be performed.
804      * @param keyOffset set to the number of bytes which are to be skipped to
805      *        locate the key.
806      */
807     protocol_binary_response_status decodeWithMetaOptions(
808                               protocol_binary_request_delete_with_meta* request,
809                               GenerateCas& generateCas,
810                               bool& skipConflictResolution,
811                               int& keyOffset);
812
813     /**
814      * Sends NOT_SUPPORTED response, using the specified response callback
815      * to the specified connection via it's cookie.
816      *
817      * @param response callback func to send the response
818      * @param cookie conn cookie
819      *
820      * @return status of sending response
821      */
822     ENGINE_ERROR_CODE sendNotSupportedResponse(ADD_RESPONSE response,
823                                                const void* cookie);
824
825     /**
826      * Sends error response, using the specified error and response callback
827      * to the specified connection via it's cookie.
828      *
829      * @param response callback func to send the response
830      * @param status error status to send
831      * @param cas a cas value to send
832      * @param cookie conn cookie
833      *
834      * @return status of sending response
835      */
836     ENGINE_ERROR_CODE sendErrorResponse(ADD_RESPONSE response,
837                                         protocol_binary_response_status status,
838                                         uint64_t cas,
839                                         const void* cookie);
840
841     /**
842      * Sends a response that includes the mutation extras, the VB uuid and
843      * seqno of the mutation.
844      *
845      * @param response callback func to send the response
846      * @param vbucket vbucket that was mutated
847      * @param bySeqno the seqno to send
848      * @param status a mcbp status code
849      * @param cas cas assigned to the mutation
850      * @param cookie conn cookie
851      * @returns NMVB if VB can't be located, or the ADD_RESPONSE return code.
852      */
853     ENGINE_ERROR_CODE sendMutationExtras(ADD_RESPONSE response,
854                                          uint16_t vbucket,
855                                          uint64_t bySeqno,
856                                          protocol_binary_response_status status,
857                                          uint64_t cas,
858                                          const void* cookie);
859
860     /**
861      * Factory method for constructing the correct bucket type given the
862      * configuration.
863      * @param config Configuration to create bucket based on. Note this
864      *               object may be modified to ensure the config is valid
865      *               for the selected bucket type.
866      */
867     std::unique_ptr<KVBucket> makeBucket(Configuration& config);
868
869     /**
870      * helper method so that some commands can set the datatype of the document.
871      *
872      * @param cookie connection cookie
873      * @param datatype the current document datatype
874      * @param body a buffer containing the document body
875      * @returns a datatype which will now include JSON if the document is JSON
876      *          and the connection does not support datatype JSON.
877      */
878     protocol_binary_datatype_t checkForDatatypeJson(
879             const void* cookie,
880             protocol_binary_datatype_t datatype,
881             cb::const_char_buffer body);
882
883     /**
884      * Process the set_with_meta with the given buffers/values.
885      *
886      * @param vbucket VB to mutate
887      * @param key DocKey initialised with key data
888      * @param value buffer for the mutation's value
889      * @param itemMeta mutation's cas/revseq/flags/expiration
890      * @param isDeleted the Item is deleted (with value)
891      * @param datatype datatype of the mutation
892      * @param cas [in,out] CAS for the command (updated with new CAS)
893      * @param seqno [out] optional - returns the seqno allocated to the mutation
894      * @param cookie connection's cookie
895      * @param force Should the set skip conflict resolution?
896      * @param allowExisting true if the set can overwrite existing key
897      * @param genBySeqno generate a new seqno? (yes/no)
898      * @param genCas generate a new CAS? (yes/no)
899      * @param emd buffer referencing ExtendedMetaData
900      * @returns state of the operation as an ENGINE_ERROR_CODE
901      */
902     ENGINE_ERROR_CODE setWithMeta(uint16_t vbucket,
903                                   DocKey key,
904                                   cb::const_byte_buffer value,
905                                   ItemMetaData itemMeta,
906                                   bool isDeleted,
907                                   protocol_binary_datatype_t datatype,
908                                   uint64_t& cas,
909                                   uint64_t* seqno,
910                                   const void* cookie,
911                                   bool force,
912                                   bool allowExisting,
913                                   GenerateBySeqno genBySeqno,
914                                   GenerateCas genCas,
915                                   cb::const_byte_buffer emd);
916
917     /**
918      * Process the del_with_meta with the given buffers/values.
919      *
920      * @param vbucket VB to mutate
921      * @param key DocKey initialised with key data
922      * @param itemMeta mutation's cas/revseq/flags/expiration
923      * @param cas [in,out] CAS for the command (updated with new CAS)
924      * @param seqno [out] optional - returns the seqno allocated to the mutation
925      * @param cookie connection's cookie
926      * @param force Should the set skip conflict resolution?
927      * @param genBySeqno generate a new seqno? (yes/no)
928      * @param genCas generate a new CAS? (yes/no)
929      * @param emd buffer referencing ExtendedMetaData
930      * @returns state of the operation as an ENGINE_ERROR_CODE
931      */
932     ENGINE_ERROR_CODE deleteWithMeta(uint16_t vbucket,
933                                      DocKey key,
934                                      ItemMetaData itemMeta,
935                                      uint64_t& cas,
936                                      uint64_t* seqno,
937                                      const void* cookie,
938                                      bool force,
939                                      GenerateBySeqno genBySeqno,
940                                      GenerateCas genCas,
941                                      cb::const_byte_buffer emd);
942
943     SERVER_HANDLE_V1 *serverApi;
944     std::unique_ptr<KVBucket> kvBucket;
945     WorkLoadPolicy *workload;
946     bucket_priority_t workloadPriority;
947
948     ReplicationThrottle *replicationThrottle;
949     std::map<const void*, Item*> lookups;
950     std::unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
951     std::mutex lookupMutex;
952     GET_SERVER_API getServerApiFunc;
953     union {
954         engine_info info;
955         char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
956     } info;
957
958     DcpConnMap *dcpConnMap_;
959     DcpFlowControlManager *dcpFlowControlManager_;
960     TapConnMap *tapConnMap;
961     TapConfig *tapConfig;
962     CheckpointConfig *checkpointConfig;
963     std::string name;
964     size_t maxItemSize;
965     size_t maxItemPrivilegedBytes;
966     size_t getlDefaultTimeout;
967     size_t getlMaxTimeout;
968     size_t maxFailoverEntries;
969     EPStats stats;
970     Configuration configuration;
971     std::atomic<bool> trafficEnabled;
972
973     bool deleteAllEnabled;
974     // a unique system generated token initialized at each time
975     // ep_engine starts up.
976     std::atomic<time_t> startupTime;
977     EpEngineTaskable taskable;
978 };
979
980 #endif  // SRC_EP_ENGINE_H_