0eb5aa1e0b827fe7fa4baacf6ba068888cb3fa5a
[ep-engine.git] / src / ep_engine.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
20
21 #include "config.h"
22
23 #include <errno.h>
24
25 #include <algorithm>
26 #include <cstdio>
27 #include <limits>
28 #include <list>
29 #include <map>
30 #include <sstream>
31 #include <string>
32
33 #include "configuration.h"
34 #include "ep.h"
35 #include "ep-engine/command_ids.h"
36 #include "item_pager.h"
37 #include "kvstore.h"
38 #include "locks.h"
39 #include "tapconnection.h"
40 #include "workload.h"
41
42
43 class DcpConnMap;
44 class TapConnMap;
45 class TapThrottle;
46
47 extern "C" {
48     EXPORT_FUNCTION
49     ENGINE_ERROR_CODE create_instance(uint64_t interface,
50                                       GET_SERVER_API get_server_api,
51                                       ENGINE_HANDLE **handle);
52     void EvpNotifyPendingConns(void*arg);
53 }
54
55 /* We're using notify_io_complete from ptr_fun, but that func
56  * got a "C" linkage that ptr_fun doesn't like... just
57  * cast it away with this typedef ;)
58  */
59 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
60                                      ENGINE_ERROR_CODE status);
61
62
63 // Forward decl
64 class EventuallyPersistentEngine;
65 class TapConnMap;
66
67 /**
68  * Vbucket visitor that counts active vbuckets.
69  */
70 class VBucketCountVisitor : public VBucketVisitor {
71 public:
72     VBucketCountVisitor(EventuallyPersistentEngine &e,
73                         vbucket_state_t state) :
74         engine(e),
75         desired_state(state), numItems(0),
76         numTempItems(0),nonResident(0),
77         numVbucket(0), htMemory(0),
78         htItemMemory(0), htCacheSize(0),
79         numEjects(0), numExpiredItems(0),
80         metaDataMemory(0), metaDataDisk(0),
81         opsCreate(0),
82         opsUpdate(0), opsDelete(0),
83         opsReject(0), queueSize(0),
84         queueMemory(0), queueAge(0),
85         queueFill(0), queueDrain(0),
86         pendingWrites(0), chkPersistRemaining(0),
87         fileSpaceUsed(0), fileSize(0)
88     { }
89
90     bool visitBucket(RCPtr<VBucket> &vb);
91
92     void visit(StoredValue* v) {
93         (void)v;
94         cb_assert(false); // this does not happen
95     }
96
97     vbucket_state_t getVBucketState() { return desired_state; }
98
99     size_t getNumItems() { return numItems; }
100
101     size_t getNumTempItems() { return numTempItems; }
102
103     size_t getNonResident() { return nonResident; }
104
105     size_t getVBucketNumber() { return numVbucket; }
106
107     size_t getMemResidentPer() {
108         size_t numResident = numItems - nonResident;
109         return (numItems != 0) ? (size_t) (numResident *100.0) / (numItems) : 100;
110     }
111
112     size_t getEjects() { return numEjects; }
113
114     size_t getExpired() { return numExpiredItems; }
115
116     size_t getMetaDataMemory() { return metaDataMemory; }
117
118     size_t getMetaDataDisk() { return metaDataDisk; }
119
120     size_t getHashtableMemory() { return htMemory; }
121
122     size_t getItemMemory() { return htItemMemory; }
123     size_t getCacheSize() { return htCacheSize; }
124
125     size_t getOpsCreate() { return opsCreate; }
126     size_t getOpsUpdate() { return opsUpdate; }
127     size_t getOpsDelete() { return opsDelete; }
128     size_t getOpsReject() { return opsReject; }
129
130     size_t getQueueSize() { return queueSize; }
131     size_t getQueueMemory() { return queueMemory; }
132     size_t getQueueFill() { return queueFill; }
133     size_t getQueueDrain() { return queueDrain; }
134     uint64_t getAge() { return queueAge; }
135     size_t getPendingWrites() { return pendingWrites; }
136     size_t getChkPersistRemaining() { return chkPersistRemaining; }
137
138     size_t getFileSpaceUsed() { return fileSpaceUsed; }
139     size_t getFileSize() { return fileSize; }
140
141 private:
142     EventuallyPersistentEngine &engine;
143     vbucket_state_t desired_state;
144
145     size_t numItems;
146     size_t numTempItems;
147     size_t nonResident;
148     size_t numVbucket;
149     size_t htMemory;
150     size_t htItemMemory;
151     size_t htCacheSize;
152     size_t numEjects;
153     size_t numExpiredItems;
154     size_t metaDataMemory;
155     size_t metaDataDisk;
156
157     size_t opsCreate;
158     size_t opsUpdate;
159     size_t opsDelete;
160     size_t opsReject;
161
162     size_t queueSize;
163     size_t queueMemory;
164     uint64_t queueAge;
165     size_t queueFill;
166     size_t queueDrain;
167     size_t pendingWrites;
168     size_t chkPersistRemaining;
169
170     size_t fileSpaceUsed;
171     size_t fileSize;
172 };
173
174 /**
175  * memcached engine interface to the EventuallyPersistentStore.
176  */
177 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
178     friend class LookupCallback;
179 public:
180     ENGINE_ERROR_CODE initialize(const char* config);
181     void destroy(bool force);
182
183     ENGINE_ERROR_CODE itemAllocate(const void* cookie,
184                                    item** itm,
185                                    const void* key,
186                                    const size_t nkey,
187                                    const size_t nbytes,
188                                    const int flags,
189                                    const rel_time_t exptime,
190                                    uint8_t datatype)
191     {
192         (void)cookie;
193         if (nbytes > maxItemSize) {
194             return ENGINE_E2BIG;
195         }
196
197         time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
198
199         uint8_t ext_meta[1];
200         uint8_t ext_len = EXT_META_LEN;
201         *(ext_meta) = datatype;
202         *itm = new Item(key, nkey, nbytes, flags, expiretime, ext_meta,
203                         ext_len);
204         if (*itm == NULL) {
205             return memoryCondition();
206         } else {
207             stats.itemAllocSizeHisto.add(nbytes);
208             return ENGINE_SUCCESS;
209         }
210     }
211
212     ENGINE_ERROR_CODE itemDelete(const void* cookie,
213                                  const void* key,
214                                  const size_t nkey,
215                                  uint64_t* cas,
216                                  uint16_t vbucket)
217     {
218         std::string k(static_cast<const char*>(key), nkey);
219         return itemDelete(cookie, k, cas, vbucket);
220     }
221
222     ENGINE_ERROR_CODE itemDelete(const void* cookie,
223                                  const std::string &key,
224                                  uint64_t* cas,
225                                  uint16_t vbucket)
226     {
227         ENGINE_ERROR_CODE ret = epstore->deleteItem(key, cas,
228                                                     vbucket, cookie,
229                                                     false, // not force
230                                                     NULL);
231
232         if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
233             if (isDegradedMode()) {
234                 return ENGINE_TMPFAIL;
235             }
236         } else if (ret == ENGINE_SUCCESS) {
237             ++stats.numOpsDelete;
238         }
239         return ret;
240     }
241
242
243     void itemRelease(const void* cookie, item *itm)
244     {
245         (void)cookie;
246         delete (Item*)itm;
247     }
248
249     ENGINE_ERROR_CODE get(const void* cookie,
250                           item** itm,
251                           const void* key,
252                           const int nkey,
253                           uint16_t vbucket,
254                           bool track_stat = false)
255     {
256         BlockTimer timer(&stats.getCmdHisto);
257         std::string k(static_cast<const char*>(key), nkey);
258
259         GetValue gv(epstore->get(k, vbucket, cookie, serverApi->core));
260         ENGINE_ERROR_CODE ret = gv.getStatus();
261
262         if (ret == ENGINE_SUCCESS) {
263             *itm = gv.getValue();
264             if (track_stat) {
265                 ++stats.numOpsGet;
266             }
267         } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
268             if (isDegradedMode()) {
269                 return ENGINE_TMPFAIL;
270             }
271         }
272
273         return ret;
274     }
275
276     const char* getName() {
277         return name.c_str();
278     }
279
280     ENGINE_ERROR_CODE getStats(const void* cookie,
281                                const char* stat_key,
282                                int nkey,
283                                ADD_STAT add_stat);
284
285     void resetStats() {
286         stats.reset();
287         if (epstore) {
288             epstore->resetUnderlyingStats();
289         }
290     }
291
292     ENGINE_ERROR_CODE store(const void *cookie,
293                             item* itm,
294                             uint64_t *cas,
295                             ENGINE_STORE_OPERATION operation,
296                             uint16_t vbucket);
297
298     ENGINE_ERROR_CODE arithmetic(const void* cookie,
299                                  const void* key,
300                                  const int nkey,
301                                  const bool increment,
302                                  const bool create,
303                                  const uint64_t delta,
304                                  const uint64_t initial,
305                                  const rel_time_t exptime,
306                                  uint64_t *cas,
307                                  uint8_t datatype,
308                                  uint64_t *result,
309                                  uint16_t vbucket)
310     {
311         BlockTimer timer(&stats.arithCmdHisto);
312         item *it = NULL;
313         uint8_t ext_meta[1];
314         uint8_t ext_len = EXT_META_LEN;
315         *(ext_meta) = datatype;
316
317         rel_time_t expiretime = (exptime == 0 ||
318                                  exptime == 0xffffffff) ?
319             0 : ep_abs_time(ep_reltime(exptime));
320
321         ENGINE_ERROR_CODE ret = get(cookie, &it, key, nkey, vbucket);
322         if (ret == ENGINE_SUCCESS) {
323             Item *itm = static_cast<Item*>(it);
324             char *endptr = NULL;
325             char data[24];
326             size_t len = std::min(static_cast<uint32_t>(sizeof(data) - 1),
327                                   itm->getNBytes());
328             data[len] = 0;
329             memcpy(data, itm->getData(), len);
330             uint64_t val = strtoull(data, &endptr, 10);
331             if (itm->getCas() == (uint64_t) -1) {
332                 // item is locked, can't perform arithmetic operation
333                 delete itm;
334                 return ENGINE_TMPFAIL;
335             }
336             if ((errno != ERANGE) && (isspace(*endptr)
337                                       || (*endptr == '\0' && endptr != data))) {
338                 if (increment) {
339                     val += delta;
340                 } else {
341                     if (delta > val) {
342                         val = 0;
343                     } else {
344                         val -= delta;
345                     }
346                 }
347
348                 std::stringstream vals;
349                 vals << val;
350                 size_t nb = vals.str().length();
351                 *result = val;
352                 Item *nit = new Item(key, (uint16_t)nkey, itm->getFlags(),
353                                      itm->getExptime(), vals.str().c_str(), nb,
354                                      ext_meta, ext_len);
355                 nit->setCas(itm->getCas());
356                 ret = store(cookie, nit, cas, OPERATION_CAS, vbucket);
357                 delete nit;
358             } else {
359                 ret = ENGINE_EINVAL;
360             }
361
362             delete itm;
363         } else if (ret == ENGINE_NOT_MY_VBUCKET) {
364             return isDegradedMode() ? ENGINE_TMPFAIL: ret;
365         } else if (ret == ENGINE_KEY_ENOENT) {
366             if (isDegradedMode()) {
367                 return ENGINE_TMPFAIL;
368             }
369             if (create) {
370                 std::stringstream vals;
371                 vals << initial;
372                 size_t nb = vals.str().length();
373                 *result = initial;
374                 Item *itm = new Item(key, (uint16_t)nkey, 0, expiretime,
375                                      vals.str().c_str(), nb, ext_meta, ext_len);
376                 ret = store(cookie, itm, cas, OPERATION_ADD, vbucket);
377                 delete itm;
378             }
379         }
380
381         /* We had a race condition.. just call ourself recursively to retry */
382         if (ret == ENGINE_KEY_EEXISTS) {
383             return arithmetic(cookie, key, nkey, increment, create, delta,
384                               initial, expiretime, cas, datatype, result,
385                               vbucket);
386         } else if (ret == ENGINE_SUCCESS) {
387             ++stats.numOpsStore;
388         }
389
390         return ret;
391     }
392
393
394
395     ENGINE_ERROR_CODE flush(const void *cookie, time_t when);
396
397     uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
398                           uint16_t *nes, uint8_t *ttl, uint16_t *flags,
399                           uint32_t *seqno, uint16_t *vbucket);
400
401     bool createTapQueue(const void *cookie,
402                         std::string &client,
403                         uint32_t flags,
404                         const void *userdata,
405                         size_t nuserdata);
406
407     ENGINE_ERROR_CODE tapNotify(const void *cookie,
408                                 void *engine_specific,
409                                 uint16_t nengine,
410                                 uint8_t ttl,
411                                 uint16_t tap_flags,
412                                 uint16_t tap_event,
413                                 uint32_t tap_seqno,
414                                 const void *key,
415                                 size_t nkey,
416                                 uint32_t flags,
417                                 uint32_t exptime,
418                                 uint64_t cas,
419                                 uint8_t datatype,
420                                 const void *data,
421                                 size_t ndata,
422                                 uint16_t vbucket);
423
424     ENGINE_ERROR_CODE dcpOpen(const void* cookie,
425                               uint32_t opaque,
426                               uint32_t seqno,
427                               uint32_t flags,
428                               void *stream_name,
429                               uint16_t nname);
430
431     ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
432                                    uint32_t opaque,
433                                    uint16_t vbucket,
434                                    uint32_t flags);
435
436     ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
437                                             uint8_t event,
438                                             uint16_t vbucket,
439                                             uint64_t checkpointId);
440
441     ENGINE_ERROR_CODE touch(const void* cookie,
442                             protocol_binary_request_header *request,
443                             ADD_RESPONSE response);
444
445     ENGINE_ERROR_CODE getMeta(const void* cookie,
446                               protocol_binary_request_get_meta *request,
447                               ADD_RESPONSE response);
448     ENGINE_ERROR_CODE setWithMeta(const void* cookie,
449                                   protocol_binary_request_set_with_meta *request,
450                                   ADD_RESPONSE response);
451     ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
452                                      protocol_binary_request_delete_with_meta *request,
453                                      ADD_RESPONSE response);
454
455     ENGINE_ERROR_CODE returnMeta(const void* cookie,
456                                  protocol_binary_request_return_meta *request,
457                                  ADD_RESPONSE response);
458
459     ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
460                                 protocol_binary_request_set_cluster_config *request,
461                                 ADD_RESPONSE response);
462
463     ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
464                                 protocol_binary_request_get_cluster_config *request,
465                                 ADD_RESPONSE response);
466
467     ENGINE_ERROR_CODE getAllKeys(const void* cookie,
468                                 protocol_binary_request_get_keys *request,
469                                 ADD_RESPONSE response);
470
471     /**
472      * Visit the objects and add them to the tap/dcp connecitons queue.
473      * @todo this code should honor the backfill time!
474      */
475     void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
476
477     void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
478         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
479         serverApi->cookie->set_priority(cookie, priority);
480         ObjectRegistry::onSwitchThread(epe);
481     }
482
483     void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
484         if (cookie == NULL) {
485             LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
486         } else {
487             BlockTimer bt(&stats.notifyIOHisto);
488             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
489             serverApi->cookie->notify_io_complete(cookie, status);
490             ObjectRegistry::onSwitchThread(epe);
491         }
492     }
493
494     ENGINE_ERROR_CODE reserveCookie(const void *cookie);
495     ENGINE_ERROR_CODE releaseCookie(const void *cookie);
496
497     void storeEngineSpecific(const void *cookie, void *engine_data) {
498         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
499         serverApi->cookie->store_engine_specific(cookie, engine_data);
500         ObjectRegistry::onSwitchThread(epe);
501     }
502
503     void *getEngineSpecific(const void *cookie) {
504         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
505         void *engine_data = serverApi->cookie->get_engine_specific(cookie);
506         ObjectRegistry::onSwitchThread(epe);
507         return engine_data;
508     }
509
510     bool isDatatypeSupported(const void *cookie) {
511         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
512         bool isSupported = serverApi->cookie->is_datatype_supported(cookie);
513         ObjectRegistry::onSwitchThread(epe);
514         return isSupported;
515     }
516
517     uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
518         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
519         uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
520         ObjectRegistry::onSwitchThread(epe);
521         return opcode;
522     }
523
524     bool validateSessionCas(const uint64_t cas) {
525         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
526         bool ret = serverApi->cookie->validate_session_cas(cas);
527         ObjectRegistry::onSwitchThread(epe);
528         return ret;
529     }
530
531     void decrementSessionCtr(void) {
532         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
533         serverApi->cookie->decrement_session_ctr();
534         ObjectRegistry::onSwitchThread(epe);
535     }
536
537     void registerEngineCallback(ENGINE_EVENT_TYPE type,
538                                 EVENT_CALLBACK cb, const void *cb_data);
539
540     template <typename T>
541     void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
542         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
543         std::for_each(cookies.begin(), cookies.end(),
544                       std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
545                                    status));
546         ObjectRegistry::onSwitchThread(epe);
547     }
548
549     void handleDisconnect(const void *cookie);
550
551     protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
552         (void) msg_size;
553         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
554         *msg = NULL;
555         if (!epstore->pauseFlusher()) {
556             LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
557             *msg = "Flusher not running.";
558             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
559         }
560         return rv;
561     }
562
563     protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
564         (void) msg_size;
565         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
566         *msg = NULL;
567         if (!epstore->resumeFlusher()) {
568             LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
569             *msg = "Flusher not shut down.";
570             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
571         }
572         return rv;
573     }
574
575     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
576         return epstore->deleteVBucket(vbid, c);
577     }
578
579     ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
580                                 const void *cookie = NULL) {
581         return epstore->compactDB(vbid, c, cookie);
582     }
583
584     bool resetVBucket(uint16_t vbid) {
585         return epstore->resetVBucket(vbid);
586     }
587
588     void setTapKeepAlive(uint32_t to) {
589         configuration.setTapKeepalive((size_t)to);
590     }
591
592     void setFlushAll(bool enabled) {
593         flushAllEnabled = enabled;
594     }
595
596     protocol_binary_response_status evictKey(const std::string &key,
597                                              uint16_t vbucket,
598                                              const char **msg,
599                                              size_t *msg_size) {
600         return epstore->evictKey(key, vbucket, msg, msg_size);
601     }
602
603     bool getLocked(const std::string &key,
604                    uint16_t vbucket,
605                    Callback<GetValue> &cb,
606                    rel_time_t currentTime,
607                    uint32_t lockTimeout,
608                    const void *cookie) {
609         return epstore->getLocked(key, vbucket, cb, currentTime, lockTimeout, cookie);
610     }
611
612     ENGINE_ERROR_CODE unlockKey(const std::string &key,
613                                 uint16_t vbucket,
614                                 uint64_t cas,
615                                 rel_time_t currentTime) {
616         return epstore->unlockKey(key, vbucket, cas, currentTime);
617     }
618
619     ENGINE_ERROR_CODE observe(const void* cookie,
620                               protocol_binary_request_header *request,
621                               ADD_RESPONSE response);
622
623     RCPtr<VBucket> getVBucket(uint16_t vbucket) {
624         return epstore->getVBucket(vbucket);
625     }
626
627     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer) {
628         return epstore->setVBucketState(vbid, to, transfer);
629     }
630
631     ~EventuallyPersistentEngine();
632
633     engine_info *getInfo() {
634         return &info.info;
635     }
636
637     EPStats &getEpStats() {
638         return stats;
639     }
640
641     EventuallyPersistentStore* getEpStore() { return epstore; }
642
643     TapConnMap &getTapConnMap() { return *tapConnMap; }
644
645     DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
646
647     TapConfig &getTapConfig() { return *tapConfig; }
648
649     TapThrottle &getTapThrottle() { return *tapThrottle; }
650
651     CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
652
653     SERVER_HANDLE_V1* getServerApi() { return serverApi; }
654
655     Configuration &getConfiguration() {
656         return configuration;
657     }
658
659     ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
660                                           protocol_binary_request_header *request,
661                                           ADD_RESPONSE response);
662
663     ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
664                                            protocol_binary_request_header *request,
665                                            ADD_RESPONSE response);
666
667     ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
668                                       protocol_binary_request_header *request,
669                                       ADD_RESPONSE response);
670
671     ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
672                                             protocol_binary_request_header *request,
673                                             ADD_RESPONSE response);
674
675     ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
676                                         protocol_binary_request_header *request,
677                                         ADD_RESPONSE response);
678
679     ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
680                                               protocol_binary_request_header *request,
681                                               ADD_RESPONSE response);
682
683     size_t getGetlDefaultTimeout() const {
684         return getlDefaultTimeout;
685     }
686
687     size_t getGetlMaxTimeout() const {
688         return getlMaxTimeout;
689     }
690
691     size_t getMaxFailoverEntries() const {
692         return maxFailoverEntries;
693     }
694
695     bool isDegradedMode() const {
696         return epstore->isWarmingUp() || !trafficEnabled.load();
697     }
698
699     WorkLoadPolicy &getWorkLoadPolicy(void) {
700         return *workload;
701     }
702
703     bucket_priority_t getWorkloadPriority(void) {return workloadPriority; }
704     void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
705
706     struct clusterConfig {
707         clusterConfig() : len(0), config(NULL) {}
708         uint32_t len;
709         uint8_t *config;
710         Mutex lock;
711     } clusterConfig;
712
713     ENGINE_ERROR_CODE getRandomKey(const void *cookie,
714                                    ADD_RESPONSE response);
715
716     ConnHandler* getConnHandler(const void *cookie);
717
718     void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
719
720     /**
721      * Get a (sloppy) list of the sequence numbers for all of the vbuckets
722      * on this server. It is not to be treated as a consistent set of seqence,
723      * but rather a list of "at least" numbers. The way the list is generated
724      * is that we're starting for vbucket 0 and record the current number,
725      * then look at the next vbucket and record its number. That means that
726      * at the time we get the number for vbucket X all of the previous
727      * numbers could have been incremented. If the client just needs a list
728      * of where we are for each vbucket this method may be more optimal than
729      * requesting one by one.
730      *
731      * @param cookie The cookie representing the connection to requesting
732      *               list
733      * @param add_response The method used to format the output buffer
734      * @return ENGINE_SUCCESS upon success
735      */
736     ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(const void *cookie,
737                                                    ADD_RESPONSE response);
738
739 protected:
740     friend class EpEngineValueChangeListener;
741
742     void setMaxItemSize(size_t value) {
743         maxItemSize = value;
744     }
745
746     void setGetlDefaultTimeout(size_t value) {
747         getlDefaultTimeout = value;
748     }
749
750     void setGetlMaxTimeout(size_t value) {
751         getlMaxTimeout = value;
752     }
753
754 private:
755     EventuallyPersistentEngine(GET_SERVER_API get_server_api);
756     friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
757                                              GET_SERVER_API get_server_api,
758                                              ENGINE_HANDLE **handle);
759     uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
760                             uint16_t *nes, uint8_t *ttl, uint16_t *flags,
761                             uint32_t *seqno, uint16_t *vbucket,
762                             TapProducer *c, bool &retry);
763
764
765     ENGINE_ERROR_CODE processTapAck(const void *cookie,
766                                     uint32_t seqno,
767                                     uint16_t status,
768                                     const std::string &msg);
769
770     /**
771      * Report the state of a memory condition when out of memory.
772      *
773      * @return ETMPFAIL if we think we can recover without interaction,
774      *         else ENOMEM
775      */
776     ENGINE_ERROR_CODE memoryCondition() {
777         // Do we think it's possible we could free something?
778         bool haveEvidenceWeCanFreeMemory(stats.getMaxDataSize() > stats.memOverhead);
779         if (haveEvidenceWeCanFreeMemory) {
780             // Look for more evidence by seeing if we have resident items.
781             VBucketCountVisitor countVisitor(*this, vbucket_state_active);
782             epstore->visit(countVisitor);
783
784             haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
785                 countVisitor.getNumItems();
786         }
787         if (haveEvidenceWeCanFreeMemory) {
788             ++stats.tmp_oom_errors;
789             // Wake up the item pager task as memory usage
790             // seems to have exceeded high water mark
791             if ((getEpStore()->fetchItemPagerTask())->getState() ==
792                                                                 TASK_SNOOZED) {
793                 ExecutorPool::get()->wake(
794                         (getEpStore()->fetchItemPagerTask())->getId());
795             }
796             return ENGINE_TMPFAIL;
797         } else {
798             ++stats.oom_errors;
799             return ENGINE_ENOMEM;
800         }
801     }
802
803     friend class BGFetchCallback;
804     friend class EventuallyPersistentStore;
805
806     bool enableTraffic(bool enable) {
807         bool inverse = !enable;
808         return trafficEnabled.compare_exchange_strong(inverse, enable);
809     }
810
811     ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
812     ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
813     ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
814     ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
815                                      const char* stat_key,
816                                      int nkey,
817                                      bool prevStateRequested,
818                                      bool details);
819     ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
820     ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
821                                         const char* stat_key, int nkey);
822     ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
823     ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
824     ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
825                                      const char *sep, size_t nsep,
826                                      conn_type_t connType);
827     ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
828     ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
829     ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
830     ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
831     ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
832                                  uint16_t vbid, std::string &key, bool validate=false);
833     ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
834                                            ADD_STAT add_stat,
835                                            std::string& key,
836                                            uint16_t vbid);
837
838     ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
839                                            ADD_STAT add_stat,
840                                            std::string &key,
841                                            uint16_t vbid);
842     ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
843                                              ADD_STAT add_stat,
844                                              uint16_t vbid);
845     ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
846     ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
847     ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
848                                    const char* stat_key, int nkey);
849     ENGINE_ERROR_CODE doDiskStats(const void *cookie, ADD_STAT add_stat,
850                                   const char* stat_key, int nkey);
851
852     void addLookupResult(const void *cookie, Item *result) {
853         LockHolder lh(lookupMutex);
854         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
855         if (it != lookups.end()) {
856             if (it->second != NULL) {
857                 LOG(EXTENSION_LOG_DEBUG,
858                     "Cleaning up old lookup result for '%s'",
859                     it->second->getKey().c_str());
860                 delete it->second;
861             } else {
862                 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
863             }
864             lookups.erase(it);
865         }
866         lookups[cookie] = result;
867     }
868
869     bool fetchLookupResult(const void *cookie, Item **itm) {
870         // This will return *and erase* the lookup result for a connection.
871         // You look it up, you own it.
872         LockHolder lh(lookupMutex);
873         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
874         if (it != lookups.end()) {
875             *itm = it->second;
876             lookups.erase(it);
877             return true;
878         } else {
879             return false;
880         }
881     }
882
883     // Get the current tap connection for this cookie.
884     // If this method returns NULL, you should return TAP_DISCONNECT
885     TapProducer* getTapProducer(const void *cookie);
886
887     SERVER_HANDLE_V1 *serverApi;
888     EventuallyPersistentStore *epstore;
889     WorkLoadPolicy *workload;
890     bucket_priority_t workloadPriority;
891
892     TapThrottle *tapThrottle;
893     std::map<const void*, Item*> lookups;
894     unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
895     Mutex lookupMutex;
896     GET_SERVER_API getServerApiFunc;
897     union {
898         engine_info info;
899         char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
900     } info;
901
902     DcpConnMap *dcpConnMap_;
903     TapConnMap *tapConnMap;
904     TapConfig *tapConfig;
905     CheckpointConfig *checkpointConfig;
906     std::string name;
907     size_t maxItemSize;
908     size_t getlDefaultTimeout;
909     size_t getlMaxTimeout;
910     size_t maxFailoverEntries;
911     EPStats stats;
912     Configuration configuration;
913     AtomicValue<bool> trafficEnabled;
914
915     bool flushAllEnabled;
916     // a unique system generated token initialized at each time
917     // ep_engine starts up.
918     AtomicValue<time_t> startupTime;
919
920 };
921
922 #endif  // SRC_EP_ENGINE_H_