MB-23875: Remove old (now unused) gat/touch impl
[ep-engine.git] / src / ep_engine.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
20
21 #include "config.h"
22
23 #include "kv_bucket.h"
24 #include "storeddockey.h"
25 #include "tapconnection.h"
26 #include "taskable.h"
27 #include "vbucket.h"
28
29 #include <memcached/engine.h>
30 #include <platform/processclock.h>
31
32 #include <string>
33
34 class StoredValue;
35 class DcpConnMap;
36 class DcpFlowControlManager;
37 class Producer;
38 class TapConnMap;
39 class ReplicationThrottle;
40 class VBucketCountVisitor;
41
42 extern "C" {
43     EXPORT_FUNCTION
44     ENGINE_ERROR_CODE create_instance(uint64_t interface,
45                                       GET_SERVER_API get_server_api,
46                                       ENGINE_HANDLE **handle);
47
48     EXPORT_FUNCTION
49     void destroy_engine(void);
50
51     void EvpNotifyPendingConns(void*arg);
52 }
53
54 /* We're using notify_io_complete from ptr_fun, but that func
55  * got a "C" linkage that ptr_fun doesn't like... just
56  * cast it away with this typedef ;)
57  */
58 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
59                                      ENGINE_ERROR_CODE status);
60
61 // Forward decl
62 class EventuallyPersistentEngine;
63 class TapConnMap;
64
65 /**
66     To allow Engines to run tasks.
67 **/
68 class EpEngineTaskable : public Taskable {
69 public:
70     EpEngineTaskable(EventuallyPersistentEngine* e) : myEngine(e) {
71
72     }
73
74     const std::string& getName() const;
75
76     task_gid_t getGID() const;
77
78     bucket_priority_t getWorkloadPriority() const;
79
80     void setWorkloadPriority(bucket_priority_t prio);
81
82     WorkLoadPolicy& getWorkLoadPolicy(void);
83
84     void logQTime(TaskId id, const ProcessClock::duration enqTime);
85
86     void logRunTime(TaskId id, const ProcessClock::duration runTime);
87
88 private:
89     EventuallyPersistentEngine* myEngine;
90 };
91
92 /**
93  * A container class holding VBucketCountVisitors to aggregate stats for
94  * different vbucket states.
95  */
96 class VBucketCountAggregator : public VBucketVisitor  {
97 public:
98     void visitBucket(VBucketPtr &vb) override;
99
100     void addVisitor(VBucketCountVisitor* visitor);
101 private:
102     std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
103 };
104
105 /**
106  * memcached engine interface to the KVBucket.
107  */
108 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
109     friend class LookupCallback;
110 public:
111     ENGINE_ERROR_CODE initialize(const char* config);
112     void destroy(bool force);
113
114     ENGINE_ERROR_CODE itemAllocate(item** itm,
115                                    const DocKey& key,
116                                    const size_t nbytes,
117                                    const size_t priv_nbytes,
118                                    const int flags,
119                                    const rel_time_t exptime,
120                                    uint8_t datatype,
121                                    uint16_t vbucket);
122     /**
123      * Delete a given key and value from the engine.
124      *
125      * @param cookie The cookie representing the connection
126      * @param key The key that needs to be deleted from the engine
127      * @param cas CAS value of the mutation that needs to be returned
128      *            back to the client
129      * @param vbucket vbucket id to which the deleted key corresponds to
130      * @param item_meta pointer to item meta data that needs to be
131      *                  as a result the delete. A NULL pointer indicates
132      *                  that no meta data needs to be returned.
133      * @param mut_info pointer to the mutation info that resulted from
134      *                 the delete.
135      *
136      * @returns ENGINE_SUCCESS if the delete was successful or
137      *          an error code indicating the error
138      */
139     ENGINE_ERROR_CODE itemDelete(const void* cookie,
140                                  const DocKey& key,
141                                  uint64_t& cas,
142                                  uint16_t vbucket,
143                                  ItemMetaData* item_meta,
144                                  mutation_descr_t* mut_info) {
145         ENGINE_ERROR_CODE ret = kvBucket->deleteItem(key,
146                                                      cas,
147                                                      vbucket,
148                                                      cookie,
149                                                      item_meta,
150                                                      mut_info);
151
152         if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
153             if (isDegradedMode()) {
154                 return ENGINE_TMPFAIL;
155             }
156         } else if (ret == ENGINE_SUCCESS) {
157             ++stats.numOpsDelete;
158         }
159         return ret;
160     }
161
162
163     void itemRelease(const void* cookie, item *itm)
164     {
165         (void)cookie;
166         delete (Item*)itm;
167     }
168
169     ENGINE_ERROR_CODE get(const void* cookie,
170                           item** itm,
171                           const DocKey& key,
172                           uint16_t vbucket,
173                           get_options_t options)
174     {
175         BlockTimer timer(&stats.getCmdHisto);
176         GetValue gv(kvBucket->get(key, vbucket, cookie, options));
177         ENGINE_ERROR_CODE ret = gv.getStatus();
178
179         if (ret == ENGINE_SUCCESS) {
180             *itm = gv.getValue();
181             if (options & TRACK_STATISTICS) {
182                 ++stats.numOpsGet;
183             }
184         } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
185             if (isDegradedMode()) {
186                 return ENGINE_TMPFAIL;
187             }
188         }
189
190         return ret;
191     }
192
193     cb::EngineErrorItemPair get_if(const void* cookie,
194                                    const DocKey& key,
195                                    uint16_t vbucket,
196                                    std::function<bool(
197                                        const item_info&)> filter);
198
199     cb::EngineErrorItemPair get_and_touch(const void* cookie,
200                                           const DocKey& key,
201                                           uint16_t vbucket,
202                                           uint32_t expiry_time);
203
204     ENGINE_ERROR_CODE get_locked(const void* cookie,
205                                  item** itm,
206                                  const DocKey& key,
207                                  uint16_t vbucket,
208                                  uint32_t lock_timeout);
209
210
211     ENGINE_ERROR_CODE unlock(const void* cookie,
212                              const DocKey& key,
213                              uint16_t vbucket,
214                              uint64_t cas);
215
216
217     const std::string& getName() const {
218         return name;
219     }
220
221     ENGINE_ERROR_CODE getStats(const void* cookie,
222                                const char* stat_key,
223                                int nkey,
224                                ADD_STAT add_stat);
225
226     void resetStats() {
227         stats.reset();
228         if (kvBucket) {
229             kvBucket->resetUnderlyingStats();
230         }
231     }
232
233     ENGINE_ERROR_CODE store(const void *cookie,
234                             item* itm,
235                             uint64_t *cas,
236                             ENGINE_STORE_OPERATION operation);
237
238     ENGINE_ERROR_CODE flush(const void *cookie);
239
240     uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
241                           uint16_t *nes, uint8_t *ttl, uint16_t *flags,
242                           uint32_t *seqno, uint16_t *vbucket);
243
244     bool createTapQueue(const void *cookie,
245                         std::string &client,
246                         uint32_t flags,
247                         const void *userdata,
248                         size_t nuserdata);
249
250     ENGINE_ERROR_CODE tapNotify(const void *cookie,
251                                 void *engine_specific,
252                                 uint16_t nengine,
253                                 uint8_t ttl,
254                                 uint16_t tap_flags,
255                                 uint16_t tap_event,
256                                 uint32_t tap_seqno,
257                                 const void *key,
258                                 size_t nkey,
259                                 uint32_t flags,
260                                 uint32_t exptime,
261                                 uint64_t cas,
262                                 uint8_t datatype,
263                                 const void *data,
264                                 size_t ndata,
265                                 uint16_t vbucket);
266
267     ENGINE_ERROR_CODE dcpOpen(const void* cookie,
268                               uint32_t opaque,
269                               uint32_t seqno,
270                               uint32_t flags,
271                               const void *stream_name,
272                               uint16_t nname);
273
274     ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
275                                    uint32_t opaque,
276                                    uint16_t vbucket,
277                                    uint32_t flags);
278
279     ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
280                                             uint8_t event,
281                                             uint16_t vbucket,
282                                             uint64_t checkpointId);
283
284     ENGINE_ERROR_CODE getMeta(const void* cookie,
285                               protocol_binary_request_get_meta *request,
286                               ADD_RESPONSE response,
287                               DocNamespace docNamespace);
288
289     ENGINE_ERROR_CODE setWithMeta(const void* cookie,
290                                  protocol_binary_request_set_with_meta *request,
291                                  ADD_RESPONSE response,
292                                  DocNamespace docNamespace);
293
294     ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
295                               protocol_binary_request_delete_with_meta *request,
296                               ADD_RESPONSE response,
297                               DocNamespace docNamespace);
298
299     ENGINE_ERROR_CODE returnMeta(const void* cookie,
300                                  protocol_binary_request_return_meta *request,
301                                  ADD_RESPONSE response,
302                                  DocNamespace docNamespace);
303
304     ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
305                             protocol_binary_request_set_cluster_config *request,
306                             ADD_RESPONSE response);
307
308     ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
309                             protocol_binary_request_get_cluster_config *request,
310                             ADD_RESPONSE response);
311
312     ENGINE_ERROR_CODE getAllKeys(const void* cookie,
313                                 protocol_binary_request_get_keys *request,
314                                 ADD_RESPONSE response,
315                                 DocNamespace docNamespace);
316
317     /**
318      * Visit the objects and add them to the tap/dcp connecitons queue.
319      * @todo this code should honor the backfill time!
320      */
321     void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
322
323     void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
324         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
325         serverApi->cookie->set_priority(cookie, priority);
326         ObjectRegistry::onSwitchThread(epe);
327     }
328
329     void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
330         if (cookie == NULL) {
331             LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
332         } else {
333             BlockTimer bt(&stats.notifyIOHisto);
334             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
335             serverApi->cookie->notify_io_complete(cookie, status);
336             ObjectRegistry::onSwitchThread(epe);
337         }
338     }
339
340     ENGINE_ERROR_CODE reserveCookie(const void *cookie);
341     ENGINE_ERROR_CODE releaseCookie(const void *cookie);
342
343     void storeEngineSpecific(const void *cookie, void *engine_data) {
344         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
345         serverApi->cookie->store_engine_specific(cookie, engine_data);
346         ObjectRegistry::onSwitchThread(epe);
347     }
348
349     void *getEngineSpecific(const void *cookie) {
350         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
351         void *engine_data = serverApi->cookie->get_engine_specific(cookie);
352         ObjectRegistry::onSwitchThread(epe);
353         return engine_data;
354     }
355
356     bool isDatatypeSupported(const void* cookie,
357                              protocol_binary_datatype_t datatype) {
358         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
359         bool isSupported =
360                 serverApi->cookie->is_datatype_supported(cookie, datatype);
361         ObjectRegistry::onSwitchThread(epe);
362         return isSupported;
363     }
364
365     bool isMutationExtrasSupported(const void *cookie) {
366         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
367         bool isSupported = serverApi->cookie->is_mutation_extras_supported(cookie);
368         ObjectRegistry::onSwitchThread(epe);
369         return isSupported;
370     }
371
372     bool isXattrSupported(const void* cookie) {
373         return isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_XATTR);
374     }
375
376     uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
377         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
378         uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
379         ObjectRegistry::onSwitchThread(epe);
380         return opcode;
381     }
382
383     bool validateSessionCas(const uint64_t cas) {
384         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
385         bool ret = serverApi->cookie->validate_session_cas(cas);
386         ObjectRegistry::onSwitchThread(epe);
387         return ret;
388     }
389
390     void decrementSessionCtr(void) {
391         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
392         serverApi->cookie->decrement_session_ctr();
393         ObjectRegistry::onSwitchThread(epe);
394     }
395
396     void registerEngineCallback(ENGINE_EVENT_TYPE type,
397                                 EVENT_CALLBACK cb, const void *cb_data);
398
399     template <typename T>
400     void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
401         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
402         std::for_each(cookies.begin(), cookies.end(),
403                       std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
404                                    status));
405         ObjectRegistry::onSwitchThread(epe);
406     }
407
408     void handleDisconnect(const void *cookie);
409     void handleDeleteBucket(const void *cookie);
410
411     protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
412         (void) msg_size;
413         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
414         *msg = NULL;
415         if (!kvBucket->pauseFlusher()) {
416             LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
417             *msg = "Flusher not running.";
418             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
419         }
420         return rv;
421     }
422
423     protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
424         (void) msg_size;
425         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
426         *msg = NULL;
427         if (!kvBucket->resumeFlusher()) {
428             LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
429             *msg = "Flusher not shut down.";
430             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
431         }
432         return rv;
433     }
434
435     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
436         return kvBucket->deleteVBucket(vbid, c);
437     }
438
439     ENGINE_ERROR_CODE compactDB(uint16_t vbid,
440                                 compaction_ctx c,
441                                 const void *cookie = NULL) {
442         return kvBucket->scheduleCompaction(vbid, c, cookie);
443     }
444
445     bool resetVBucket(uint16_t vbid) {
446         return kvBucket->resetVBucket(vbid);
447     }
448
449     void setTapKeepAlive(uint32_t to) {
450         configuration.setTapKeepalive((size_t)to);
451     }
452
453     void setDeleteAll(bool enabled) {
454         deleteAllEnabled = enabled;
455     }
456
457     protocol_binary_response_status evictKey(const DocKey& key,
458                                              uint16_t vbucket,
459                                              const char** msg) {
460         return kvBucket->evictKey(key, vbucket, msg);
461     }
462
463     ENGINE_ERROR_CODE observe(const void* cookie,
464                               protocol_binary_request_header *request,
465                               ADD_RESPONSE response,
466                               DocNamespace docNamespace);
467
468     ENGINE_ERROR_CODE observe_seqno(const void* cookie,
469                                     protocol_binary_request_header *request,
470                                     ADD_RESPONSE response);
471
472     VBucketPtr getVBucket(uint16_t vbucket) {
473         return kvBucket->getVBucket(vbucket);
474     }
475
476     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to,
477                                       bool transfer) {
478         return kvBucket->setVBucketState(vbid, to, transfer);
479     }
480
481     protocol_binary_response_status setParam(
482             protocol_binary_request_set_param* req, std::string& msg);
483
484     protocol_binary_response_status setFlushParam(const char* keyz,
485                                                   const char* valz,
486                                                   std::string& msg);
487
488     protocol_binary_response_status setTapParam(const char* keyz,
489                                                 const char* valz,
490                                                 std::string& msg);
491
492     protocol_binary_response_status setCheckpointParam(const char* keyz,
493                                                        const char* valz,
494                                                        std::string& msg);
495
496     protocol_binary_response_status setDcpParam(const char* keyz,
497                                                 const char* valz,
498                                                 std::string& msg);
499
500     protocol_binary_response_status setVbucketParam(uint16_t vbucket,
501                                                     const char* keyz,
502                                                     const char* valz,
503                                                     std::string& msg);
504
505     ~EventuallyPersistentEngine();
506
507     engine_info *getInfo() {
508         return &info.info;
509     }
510
511     item_info getItemInfo(const Item& itm);
512
513     EPStats &getEpStats() {
514         return stats;
515     }
516
517     KVBucket* getKVBucket() { return kvBucket.get(); }
518
519     TapConnMap &getTapConnMap() { return *tapConnMap; }
520
521     DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
522
523     DcpFlowControlManager &getDcpFlowControlManager() {
524         return *dcpFlowControlManager_;
525     }
526
527     TapConfig &getTapConfig() { return *tapConfig; }
528
529     ReplicationThrottle &getReplicationThrottle() { return *replicationThrottle; }
530
531     CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
532
533     SERVER_HANDLE_V1* getServerApi() { return serverApi; }
534
535     Configuration &getConfiguration() {
536         return configuration;
537     }
538
539     ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
540                                           protocol_binary_request_header *request,
541                                           ADD_RESPONSE response);
542
543     ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
544                                            protocol_binary_request_header *request,
545                                            ADD_RESPONSE response);
546
547     ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
548                                       protocol_binary_request_header *request,
549                                       ADD_RESPONSE response);
550
551     ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
552                                             protocol_binary_request_header *request,
553                                             ADD_RESPONSE response);
554
555     ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
556                                         protocol_binary_request_header *request,
557                                         ADD_RESPONSE response);
558
559     ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
560                                               protocol_binary_request_header *request,
561                                               ADD_RESPONSE response);
562
563     size_t getGetlDefaultTimeout() const {
564         return getlDefaultTimeout;
565     }
566
567     size_t getGetlMaxTimeout() const {
568         return getlMaxTimeout;
569     }
570
571     size_t getMaxFailoverEntries() const {
572         return maxFailoverEntries;
573     }
574
575     bool isDegradedMode() const {
576         return kvBucket->isWarmingUp() || !trafficEnabled.load();
577     }
578
579     WorkLoadPolicy &getWorkLoadPolicy(void) {
580         return *workload;
581     }
582
583     bucket_priority_t getWorkloadPriority(void) const {return workloadPriority; }
584     void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
585
586     struct clusterConfig {
587         std::string config;
588         std::mutex lock;
589     } clusterConfig;
590
591     ENGINE_ERROR_CODE getRandomKey(const void *cookie,
592                                    ADD_RESPONSE response);
593
594     ConnHandler* getConnHandler(const void *cookie);
595
596     void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
597
598     /*
599      * Explicitly trigger the defragmenter task. Provided to facilitate
600      * testing.
601      */
602     void runDefragmenterTask(void);
603
604     /*
605      * Explicitly trigger the AccessScanner task. Provided to facilitate
606      * testing.
607      */
608     bool runAccessScannerTask(void);
609
610     /*
611      * Explicitly trigger the VbStatePersist task. Provided to facilitate
612      * testing.
613      */
614     void runVbStatePersistTask(int vbid);
615
616     /**
617      * Get a (sloppy) list of the sequence numbers for all of the vbuckets
618      * on this server. It is not to be treated as a consistent set of seqence,
619      * but rather a list of "at least" numbers. The way the list is generated
620      * is that we're starting for vbucket 0 and record the current number,
621      * then look at the next vbucket and record its number. That means that
622      * at the time we get the number for vbucket X all of the previous
623      * numbers could have been incremented. If the client just needs a list
624      * of where we are for each vbucket this method may be more optimal than
625      * requesting one by one.
626      *
627      * @param cookie The cookie representing the connection to requesting
628      *               list
629      * @param add_response The method used to format the output buffer
630      * @return ENGINE_SUCCESS upon success
631      */
632     ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(
633                                         const void *cookie,
634                                         protocol_binary_request_header *request,
635                                         ADD_RESPONSE response);
636
637     void updateDcpMinCompressionRatio(float value);
638
639     /**
640      * Sends a not-my-vbucket response, using the specified response callback.
641      * to the specified connection via it's cookie.
642      */
643     ENGINE_ERROR_CODE sendNotMyVBucketResponse(ADD_RESPONSE response,
644                                                const void* cookie,
645                                                uint64_t cas);
646
647     EpEngineTaskable& getTaskable() {
648         return taskable;
649     }
650
651 protected:
652     friend class EpEngineValueChangeListener;
653
654     void setMaxItemSize(size_t value) {
655         maxItemSize = value;
656     }
657
658     void setMaxItemPrivilegedBytes(size_t value) {
659         maxItemPrivilegedBytes = value;
660     }
661
662     void setGetlDefaultTimeout(size_t value) {
663         getlDefaultTimeout = value;
664     }
665
666     void setGetlMaxTimeout(size_t value) {
667         getlMaxTimeout = value;
668     }
669
670     EventuallyPersistentEngine(GET_SERVER_API get_server_api);
671     friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
672                                              GET_SERVER_API get_server_api,
673                                              ENGINE_HANDLE **handle);
674     uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
675                             uint16_t *nes, uint8_t *ttl, uint16_t *flags,
676                             uint32_t *seqno, uint16_t *vbucket,
677                             TapProducer *c, bool &retry);
678
679
680     ENGINE_ERROR_CODE processTapAck(const void *cookie,
681                                     uint32_t seqno,
682                                     uint16_t status,
683                                     const DocKey& key);
684
685     /**
686      * Report the state of a memory condition when out of memory.
687      *
688      * @return ETMPFAIL if we think we can recover without interaction,
689      *         else ENOMEM
690      */
691     ENGINE_ERROR_CODE memoryCondition();
692
693     /**
694      * Check if there is any available memory space to allocate an Item
695      * instance with a given size.
696      */
697     bool hasAvailableSpace(uint32_t nBytes) {
698         return (stats.getTotalMemoryUsed() + nBytes) <= stats.getMaxDataSize();
699     }
700
701     friend class BGFetchCallback;
702     friend class KVBucket;
703     friend class EPBucket;
704
705     bool enableTraffic(bool enable) {
706         bool inverse = !enable;
707         return trafficEnabled.compare_exchange_strong(inverse, enable);
708     }
709
710     ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
711     ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
712     ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
713     ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
714                                      const char* stat_key,
715                                      int nkey,
716                                      bool prevStateRequested,
717                                      bool details);
718     ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
719     ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
720                                         const char* stat_key, int nkey);
721     ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
722     ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
723     ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
724                                      const char *sep, size_t nsep,
725                                      conn_type_t connType);
726     ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
727     ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
728     ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
729     ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
730     ENGINE_ERROR_CODE doTasksStats(const void* cookie, ADD_STAT add_stat);
731     ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
732                                  uint16_t vbid, const DocKey& key, bool validate=false);
733     ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
734                                            ADD_STAT add_stat,
735                                            std::string& key,
736                                            uint16_t vbid);
737
738     ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
739                                            ADD_STAT add_stat,
740                                            std::string &key,
741                                            uint16_t vbid);
742     ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
743                                              ADD_STAT add_stat,
744                                              uint16_t vbid);
745     ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
746     ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
747     ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
748                                    const char* stat_key, int nkey);
749     void addSeqnoVbStats(const void *cookie, ADD_STAT add_stat,
750                                   const VBucketPtr &vb);
751
752     void addLookupResult(const void *cookie, Item *result) {
753         LockHolder lh(lookupMutex);
754         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
755         if (it != lookups.end()) {
756             if (it->second != NULL) {
757                 LOG(EXTENSION_LOG_DEBUG,
758                     "Cleaning up old lookup result for '%s'",
759                     it->second->getKey().data());
760                 delete it->second;
761             } else {
762                 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
763             }
764             lookups.erase(it);
765         }
766         lookups[cookie] = result;
767     }
768
769     bool fetchLookupResult(const void *cookie, Item **itm) {
770         // This will return *and erase* the lookup result for a connection.
771         // You look it up, you own it.
772         LockHolder lh(lookupMutex);
773         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
774         if (it != lookups.end()) {
775             *itm = it->second;
776             lookups.erase(it);
777             return true;
778         } else {
779             return false;
780         }
781     }
782
783     // Get the current tap connection for this cookie.
784     // If this method returns NULL, you should return TAP_DISCONNECT
785     TapProducer* getTapProducer(const void *cookie);
786
787     // Initialize all required callbacks of this engine with the underlying
788     // server.
789     void initializeEngineCallbacks();
790
791     /*
792      * Private helper method for decoding the options on set/del_with_meta.
793      * Tighly coupled to the logic of both those functions, it will
794      * take a request pointer and locate and validate any options within.
795      * @param request pointer to the set/del_with_meta request packet
796      * @param generateCas set to Yes if CAS regeneration is enabled.
797      * @param skipConflictResolution set to true if conflict resolution should
798      *        not be performed.
799      * @param keyOffset set to the number of bytes which are to be skipped to
800      *        locate the key.
801      */
802     protocol_binary_response_status decodeWithMetaOptions(
803                               protocol_binary_request_delete_with_meta* request,
804                               GenerateCas& generateCas,
805                               bool& skipConflictResolution,
806                               int& keyOffset);
807
808     /**
809      * Sends NOT_SUPPORTED response, using the specified response callback
810      * to the specified connection via it's cookie.
811      *
812      * @param response callback func to send the response
813      * @param cookie conn cookie
814      *
815      * @return status of sending response
816      */
817     ENGINE_ERROR_CODE sendNotSupportedResponse(ADD_RESPONSE response,
818                                                const void* cookie);
819
820     /**
821      * Sends error response, using the specified error and response callback
822      * to the specified connection via it's cookie.
823      *
824      * @param response callback func to send the response
825      * @param status error status to send
826      * @param cas a cas value to send
827      * @param cookie conn cookie
828      *
829      * @return status of sending response
830      */
831     ENGINE_ERROR_CODE sendErrorResponse(ADD_RESPONSE response,
832                                         protocol_binary_response_status status,
833                                         uint64_t cas,
834                                         const void* cookie);
835
836     /**
837      * Sends a response that includes the mutation extras, the VB uuid and
838      * seqno of the mutation.
839      *
840      * @param response callback func to send the response
841      * @param vbucket vbucket that was mutated
842      * @param bySeqno the seqno to send
843      * @param status a mcbp status code
844      * @param cas cas assigned to the mutation
845      * @param cookie conn cookie
846      * @returns NMVB if VB can't be located, or the ADD_RESPONSE return code.
847      */
848     ENGINE_ERROR_CODE sendMutationExtras(ADD_RESPONSE response,
849                                          uint16_t vbucket,
850                                          uint64_t bySeqno,
851                                          protocol_binary_response_status status,
852                                          uint64_t cas,
853                                          const void* cookie);
854
855     /**
856      * Factory method for constructing the correct bucket type given the
857      * configuration.
858      * @param config Configuration to create bucket based on. Note this
859      *               object may be modified to ensure the config is valid
860      *               for the selected bucket type.
861      */
862     std::unique_ptr<KVBucket> makeBucket(Configuration& config);
863
864     /**
865      * helper method so that some commands can set the datatype of the document.
866      *
867      * @param cookie connection cookie
868      * @param datatype the current document datatype
869      * @param body a buffer containing the document body
870      * @returns a datatype which will now include JSON if the document is JSON
871      *          and the connection does not support datatype JSON.
872      */
873     protocol_binary_datatype_t checkForDatatypeJson(
874             const void* cookie,
875             protocol_binary_datatype_t datatype,
876             cb::const_char_buffer body);
877
878     /**
879      * Process the set_with_meta with the given buffers/values.
880      *
881      * @param vbucket VB to mutate
882      * @param key DocKey initialised with key data
883      * @param value buffer for the mutation's value
884      * @param itemMeta mutation's cas/revseq/flags/expiration
885      * @param isDeleted the Item is deleted (with value)
886      * @param datatype datatype of the mutation
887      * @param cas [in,out] CAS for the command (updated with new CAS)
888      * @param seqno [out] optional - returns the seqno allocated to the mutation
889      * @param cookie connection's cookie
890      * @param force Should the set skip conflict resolution?
891      * @param allowExisting true if the set can overwrite existing key
892      * @param genBySeqno generate a new seqno? (yes/no)
893      * @param genCas generate a new CAS? (yes/no)
894      * @param emd buffer referencing ExtendedMetaData
895      * @returns state of the operation as an ENGINE_ERROR_CODE
896      */
897     ENGINE_ERROR_CODE setWithMeta(uint16_t vbucket,
898                                   DocKey key,
899                                   cb::const_byte_buffer value,
900                                   ItemMetaData itemMeta,
901                                   bool isDeleted,
902                                   protocol_binary_datatype_t datatype,
903                                   uint64_t& cas,
904                                   uint64_t* seqno,
905                                   const void* cookie,
906                                   bool force,
907                                   bool allowExisting,
908                                   GenerateBySeqno genBySeqno,
909                                   GenerateCas genCas,
910                                   cb::const_byte_buffer emd);
911
912     /**
913      * Process the del_with_meta with the given buffers/values.
914      *
915      * @param vbucket VB to mutate
916      * @param key DocKey initialised with key data
917      * @param itemMeta mutation's cas/revseq/flags/expiration
918      * @param cas [in,out] CAS for the command (updated with new CAS)
919      * @param seqno [out] optional - returns the seqno allocated to the mutation
920      * @param cookie connection's cookie
921      * @param force Should the set skip conflict resolution?
922      * @param genBySeqno generate a new seqno? (yes/no)
923      * @param genCas generate a new CAS? (yes/no)
924      * @param emd buffer referencing ExtendedMetaData
925      * @returns state of the operation as an ENGINE_ERROR_CODE
926      */
927     ENGINE_ERROR_CODE deleteWithMeta(uint16_t vbucket,
928                                      DocKey key,
929                                      ItemMetaData itemMeta,
930                                      uint64_t& cas,
931                                      uint64_t* seqno,
932                                      const void* cookie,
933                                      bool force,
934                                      GenerateBySeqno genBySeqno,
935                                      GenerateCas genCas,
936                                      cb::const_byte_buffer emd);
937
938     SERVER_HANDLE_V1 *serverApi;
939     std::unique_ptr<KVBucket> kvBucket;
940     WorkLoadPolicy *workload;
941     bucket_priority_t workloadPriority;
942
943     ReplicationThrottle *replicationThrottle;
944     std::map<const void*, Item*> lookups;
945     std::unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
946     std::mutex lookupMutex;
947     GET_SERVER_API getServerApiFunc;
948     union {
949         engine_info info;
950         char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
951     } info;
952
953     DcpConnMap *dcpConnMap_;
954     DcpFlowControlManager *dcpFlowControlManager_;
955     TapConnMap *tapConnMap;
956     TapConfig *tapConfig;
957     CheckpointConfig *checkpointConfig;
958     std::string name;
959     size_t maxItemSize;
960     size_t maxItemPrivilegedBytes;
961     size_t getlDefaultTimeout;
962     size_t getlMaxTimeout;
963     size_t maxFailoverEntries;
964     EPStats stats;
965     Configuration configuration;
966     std::atomic<bool> trafficEnabled;
967
968     bool deleteAllEnabled;
969     // a unique system generated token initialized at each time
970     // ep_engine starts up.
971     std::atomic<time_t> startupTime;
972     EpEngineTaskable taskable;
973 };
974
975 #endif  // SRC_EP_ENGINE_H_