a1fa67f2d0684739f2e553413dd813c30c2446de
[ep-engine.git] / src / ep_engine.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_EP_ENGINE_H_
19 #define SRC_EP_ENGINE_H_ 1
20
21 #include "config.h"
22
23 #include <errno.h>
24
25 #include <algorithm>
26 #include <cstdio>
27 #include <limits>
28 #include <list>
29 #include <map>
30 #include <sstream>
31 #include <string>
32
33 #include "configuration.h"
34 #include "ep.h"
35 #include "ep-engine/command_ids.h"
36 #include "item_pager.h"
37 #include "kvstore.h"
38 #include "locks.h"
39 #include "tapconnection.h"
40 #include "workload.h"
41
42
43 class DcpConnMap;
44 class TapConnMap;
45 class TapThrottle;
46
47 extern "C" {
48     EXPORT_FUNCTION
49     ENGINE_ERROR_CODE create_instance(uint64_t interface,
50                                       GET_SERVER_API get_server_api,
51                                       ENGINE_HANDLE **handle);
52
53     EXPORT_FUNCTION
54     void destroy_engine(void);
55
56     void EvpNotifyPendingConns(void*arg);
57 }
58
59 /* We're using notify_io_complete from ptr_fun, but that func
60  * got a "C" linkage that ptr_fun doesn't like... just
61  * cast it away with this typedef ;)
62  */
63 typedef void (*NOTIFY_IO_COMPLETE_T)(const void *cookie,
64                                      ENGINE_ERROR_CODE status);
65
66
67 // Forward decl
68 class EventuallyPersistentEngine;
69 class TapConnMap;
70
71 /**
72  * Vbucket visitor that counts active vbuckets.
73  */
74 class VBucketCountVisitor : public VBucketVisitor {
75 public:
76     VBucketCountVisitor(EventuallyPersistentEngine &e,
77                         vbucket_state_t state) :
78         engine(e),
79         desired_state(state), numItems(0),
80         numTempItems(0),nonResident(0),
81         numVbucket(0), htMemory(0),
82         htItemMemory(0), htCacheSize(0),
83         numEjects(0), numExpiredItems(0),
84         metaDataMemory(0), metaDataDisk(0),
85         opsCreate(0),
86         opsUpdate(0), opsDelete(0),
87         opsReject(0), queueSize(0),
88         queueMemory(0), queueAge(0),
89         queueFill(0), queueDrain(0),
90         pendingWrites(0), chkPersistRemaining(0),
91         fileSpaceUsed(0), fileSize(0)
92     { }
93
94     bool visitBucket(RCPtr<VBucket> &vb);
95
96     void visit(StoredValue* v) {
97         (void)v;
98         cb_assert(false); // this does not happen
99     }
100
101     vbucket_state_t getVBucketState() { return desired_state; }
102
103     size_t getNumItems() { return numItems; }
104
105     size_t getNumTempItems() { return numTempItems; }
106
107     size_t getNonResident() { return nonResident; }
108
109     size_t getVBucketNumber() { return numVbucket; }
110
111     size_t getMemResidentPer() {
112         size_t numResident = numItems - nonResident;
113         return (numItems != 0) ? (size_t) (numResident *100.0) / (numItems) : 100;
114     }
115
116     size_t getEjects() { return numEjects; }
117
118     size_t getExpired() { return numExpiredItems; }
119
120     size_t getMetaDataMemory() { return metaDataMemory; }
121
122     size_t getMetaDataDisk() { return metaDataDisk; }
123
124     size_t getHashtableMemory() { return htMemory; }
125
126     size_t getItemMemory() { return htItemMemory; }
127     size_t getCacheSize() { return htCacheSize; }
128
129     size_t getOpsCreate() { return opsCreate; }
130     size_t getOpsUpdate() { return opsUpdate; }
131     size_t getOpsDelete() { return opsDelete; }
132     size_t getOpsReject() { return opsReject; }
133
134     size_t getQueueSize() { return queueSize; }
135     size_t getQueueMemory() { return queueMemory; }
136     size_t getQueueFill() { return queueFill; }
137     size_t getQueueDrain() { return queueDrain; }
138     uint64_t getAge() { return queueAge; }
139     size_t getPendingWrites() { return pendingWrites; }
140     size_t getChkPersistRemaining() { return chkPersistRemaining; }
141
142     size_t getFileSpaceUsed() { return fileSpaceUsed; }
143     size_t getFileSize() { return fileSize; }
144
145 private:
146     EventuallyPersistentEngine &engine;
147     vbucket_state_t desired_state;
148
149     size_t numItems;
150     size_t numTempItems;
151     size_t nonResident;
152     size_t numVbucket;
153     size_t htMemory;
154     size_t htItemMemory;
155     size_t htCacheSize;
156     size_t numEjects;
157     size_t numExpiredItems;
158     size_t metaDataMemory;
159     size_t metaDataDisk;
160
161     size_t opsCreate;
162     size_t opsUpdate;
163     size_t opsDelete;
164     size_t opsReject;
165
166     size_t queueSize;
167     size_t queueMemory;
168     uint64_t queueAge;
169     size_t queueFill;
170     size_t queueDrain;
171     size_t pendingWrites;
172     size_t chkPersistRemaining;
173
174     size_t fileSpaceUsed;
175     size_t fileSize;
176 };
177
178 /**
179  * memcached engine interface to the EventuallyPersistentStore.
180  */
181 class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
182     friend class LookupCallback;
183 public:
184     ENGINE_ERROR_CODE initialize(const char* config);
185     void destroy(bool force);
186
187     ENGINE_ERROR_CODE itemAllocate(const void* cookie,
188                                    item** itm,
189                                    const void* key,
190                                    const size_t nkey,
191                                    const size_t nbytes,
192                                    const int flags,
193                                    const rel_time_t exptime,
194                                    uint8_t datatype)
195     {
196         (void)cookie;
197         if (nbytes > maxItemSize) {
198             return ENGINE_E2BIG;
199         }
200
201         time_t expiretime = (exptime == 0) ? 0 : ep_abs_time(ep_reltime(exptime));
202
203         uint8_t ext_meta[1];
204         uint8_t ext_len = EXT_META_LEN;
205         *(ext_meta) = datatype;
206         *itm = new Item(key, nkey, nbytes, flags, expiretime, ext_meta,
207                         ext_len);
208         if (*itm == NULL) {
209             return memoryCondition();
210         } else {
211             stats.itemAllocSizeHisto.add(nbytes);
212             return ENGINE_SUCCESS;
213         }
214     }
215
216     ENGINE_ERROR_CODE itemDelete(const void* cookie,
217                                  const void* key,
218                                  const size_t nkey,
219                                  uint64_t* cas,
220                                  uint16_t vbucket)
221     {
222         std::string k(static_cast<const char*>(key), nkey);
223         return itemDelete(cookie, k, cas, vbucket);
224     }
225
226     ENGINE_ERROR_CODE itemDelete(const void* cookie,
227                                  const std::string &key,
228                                  uint64_t* cas,
229                                  uint16_t vbucket)
230     {
231         ENGINE_ERROR_CODE ret = epstore->deleteItem(key, cas,
232                                                     vbucket, cookie,
233                                                     false, // not force
234                                                     NULL);
235
236         if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
237             if (isDegradedMode()) {
238                 return ENGINE_TMPFAIL;
239             }
240         } else if (ret == ENGINE_SUCCESS) {
241             ++stats.numOpsDelete;
242         }
243         return ret;
244     }
245
246
247     void itemRelease(const void* cookie, item *itm)
248     {
249         (void)cookie;
250         delete (Item*)itm;
251     }
252
253     ENGINE_ERROR_CODE get(const void* cookie,
254                           item** itm,
255                           const void* key,
256                           const int nkey,
257                           uint16_t vbucket,
258                           bool track_stat = false)
259     {
260         BlockTimer timer(&stats.getCmdHisto);
261         std::string k(static_cast<const char*>(key), nkey);
262
263         GetValue gv(epstore->get(k, vbucket, cookie, serverApi->core));
264         ENGINE_ERROR_CODE ret = gv.getStatus();
265
266         if (ret == ENGINE_SUCCESS) {
267             *itm = gv.getValue();
268             if (track_stat) {
269                 ++stats.numOpsGet;
270             }
271         } else if (ret == ENGINE_KEY_ENOENT || ret == ENGINE_NOT_MY_VBUCKET) {
272             if (isDegradedMode()) {
273                 return ENGINE_TMPFAIL;
274             }
275         }
276
277         return ret;
278     }
279
280     const char* getName() {
281         return name.c_str();
282     }
283
284     ENGINE_ERROR_CODE getStats(const void* cookie,
285                                const char* stat_key,
286                                int nkey,
287                                ADD_STAT add_stat);
288
289     void resetStats() {
290         stats.reset();
291         if (epstore) {
292             epstore->resetUnderlyingStats();
293         }
294     }
295
296     ENGINE_ERROR_CODE store(const void *cookie,
297                             item* itm,
298                             uint64_t *cas,
299                             ENGINE_STORE_OPERATION operation,
300                             uint16_t vbucket);
301
302     ENGINE_ERROR_CODE arithmetic(const void* cookie,
303                                  const void* key,
304                                  const int nkey,
305                                  const bool increment,
306                                  const bool create,
307                                  const uint64_t delta,
308                                  const uint64_t initial,
309                                  const rel_time_t exptime,
310                                  uint64_t *cas,
311                                  uint8_t datatype,
312                                  uint64_t *result,
313                                  uint16_t vbucket)
314     {
315         BlockTimer timer(&stats.arithCmdHisto);
316         item *it = NULL;
317         uint8_t ext_meta[1];
318         uint8_t ext_len = EXT_META_LEN;
319         *(ext_meta) = datatype;
320
321         rel_time_t expiretime = (exptime == 0 ||
322                                  exptime == 0xffffffff) ?
323             0 : ep_abs_time(ep_reltime(exptime));
324
325         ENGINE_ERROR_CODE ret = get(cookie, &it, key, nkey, vbucket);
326         if (ret == ENGINE_SUCCESS) {
327             Item *itm = static_cast<Item*>(it);
328             char *endptr = NULL;
329             char data[24];
330             size_t len = std::min(static_cast<uint32_t>(sizeof(data) - 1),
331                                   itm->getNBytes());
332             data[len] = 0;
333             memcpy(data, itm->getData(), len);
334             uint64_t val = strtoull(data, &endptr, 10);
335             if (itm->getCas() == (uint64_t) -1) {
336                 // item is locked, can't perform arithmetic operation
337                 delete itm;
338                 return ENGINE_TMPFAIL;
339             }
340             if ((errno != ERANGE) && (isspace(*endptr)
341                                       || (*endptr == '\0' && endptr != data))) {
342                 if (increment) {
343                     val += delta;
344                 } else {
345                     if (delta > val) {
346                         val = 0;
347                     } else {
348                         val -= delta;
349                     }
350                 }
351
352                 std::stringstream vals;
353                 vals << val;
354                 size_t nb = vals.str().length();
355                 *result = val;
356                 Item *nit = new Item(key, (uint16_t)nkey, itm->getFlags(),
357                                      itm->getExptime(), vals.str().c_str(), nb,
358                                      ext_meta, ext_len);
359                 nit->setCas(itm->getCas());
360                 ret = store(cookie, nit, cas, OPERATION_CAS, vbucket);
361                 delete nit;
362             } else {
363                 ret = ENGINE_EINVAL;
364             }
365
366             delete itm;
367         } else if (ret == ENGINE_NOT_MY_VBUCKET) {
368             return isDegradedMode() ? ENGINE_TMPFAIL: ret;
369         } else if (ret == ENGINE_KEY_ENOENT) {
370             if (isDegradedMode()) {
371                 return ENGINE_TMPFAIL;
372             }
373             if (create) {
374                 std::stringstream vals;
375                 vals << initial;
376                 size_t nb = vals.str().length();
377                 *result = initial;
378                 Item *itm = new Item(key, (uint16_t)nkey, 0, expiretime,
379                                      vals.str().c_str(), nb, ext_meta, ext_len);
380                 ret = store(cookie, itm, cas, OPERATION_ADD, vbucket);
381                 delete itm;
382             }
383         }
384
385         /* We had a race condition.. just call ourself recursively to retry */
386         if (ret == ENGINE_KEY_EEXISTS) {
387             return arithmetic(cookie, key, nkey, increment, create, delta,
388                               initial, expiretime, cas, datatype, result,
389                               vbucket);
390         } else if (ret == ENGINE_SUCCESS) {
391             ++stats.numOpsStore;
392         }
393
394         return ret;
395     }
396
397
398
399     ENGINE_ERROR_CODE flush(const void *cookie, time_t when);
400
401     uint16_t walkTapQueue(const void *cookie, item **itm, void **es,
402                           uint16_t *nes, uint8_t *ttl, uint16_t *flags,
403                           uint32_t *seqno, uint16_t *vbucket);
404
405     bool createTapQueue(const void *cookie,
406                         std::string &client,
407                         uint32_t flags,
408                         const void *userdata,
409                         size_t nuserdata);
410
411     ENGINE_ERROR_CODE tapNotify(const void *cookie,
412                                 void *engine_specific,
413                                 uint16_t nengine,
414                                 uint8_t ttl,
415                                 uint16_t tap_flags,
416                                 uint16_t tap_event,
417                                 uint32_t tap_seqno,
418                                 const void *key,
419                                 size_t nkey,
420                                 uint32_t flags,
421                                 uint32_t exptime,
422                                 uint64_t cas,
423                                 uint8_t datatype,
424                                 const void *data,
425                                 size_t ndata,
426                                 uint16_t vbucket);
427
428     ENGINE_ERROR_CODE dcpOpen(const void* cookie,
429                               uint32_t opaque,
430                               uint32_t seqno,
431                               uint32_t flags,
432                               void *stream_name,
433                               uint16_t nname);
434
435     ENGINE_ERROR_CODE dcpAddStream(const void* cookie,
436                                    uint32_t opaque,
437                                    uint16_t vbucket,
438                                    uint32_t flags);
439
440     ENGINE_ERROR_CODE ConnHandlerCheckPoint(TapConsumer *consumer,
441                                             uint8_t event,
442                                             uint16_t vbucket,
443                                             uint64_t checkpointId);
444
445     ENGINE_ERROR_CODE touch(const void* cookie,
446                             protocol_binary_request_header *request,
447                             ADD_RESPONSE response);
448
449     ENGINE_ERROR_CODE getMeta(const void* cookie,
450                               protocol_binary_request_get_meta *request,
451                               ADD_RESPONSE response);
452     ENGINE_ERROR_CODE setWithMeta(const void* cookie,
453                                   protocol_binary_request_set_with_meta *request,
454                                   ADD_RESPONSE response);
455     ENGINE_ERROR_CODE deleteWithMeta(const void* cookie,
456                                      protocol_binary_request_delete_with_meta *request,
457                                      ADD_RESPONSE response);
458
459     ENGINE_ERROR_CODE returnMeta(const void* cookie,
460                                  protocol_binary_request_return_meta *request,
461                                  ADD_RESPONSE response);
462
463     ENGINE_ERROR_CODE setClusterConfig(const void* cookie,
464                                 protocol_binary_request_set_cluster_config *request,
465                                 ADD_RESPONSE response);
466
467     ENGINE_ERROR_CODE getClusterConfig(const void* cookie,
468                                 protocol_binary_request_get_cluster_config *request,
469                                 ADD_RESPONSE response);
470
471     ENGINE_ERROR_CODE getAllKeys(const void* cookie,
472                                 protocol_binary_request_get_keys *request,
473                                 ADD_RESPONSE response);
474
475     /**
476      * Visit the objects and add them to the tap/dcp connecitons queue.
477      * @todo this code should honor the backfill time!
478      */
479     void queueBackfill(const VBucketFilter &backfillVBFilter, Producer *tc);
480
481     void setDCPPriority(const void* cookie, CONN_PRIORITY priority) {
482         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
483         serverApi->cookie->set_priority(cookie, priority);
484         ObjectRegistry::onSwitchThread(epe);
485     }
486
487     void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
488         if (cookie == NULL) {
489             LOG(EXTENSION_LOG_WARNING, "Tried to signal a NULL cookie!");
490         } else {
491             BlockTimer bt(&stats.notifyIOHisto);
492             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
493             serverApi->cookie->notify_io_complete(cookie, status);
494             ObjectRegistry::onSwitchThread(epe);
495         }
496     }
497
498     ENGINE_ERROR_CODE reserveCookie(const void *cookie);
499     ENGINE_ERROR_CODE releaseCookie(const void *cookie);
500
501     void storeEngineSpecific(const void *cookie, void *engine_data) {
502         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
503         serverApi->cookie->store_engine_specific(cookie, engine_data);
504         ObjectRegistry::onSwitchThread(epe);
505     }
506
507     void *getEngineSpecific(const void *cookie) {
508         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
509         void *engine_data = serverApi->cookie->get_engine_specific(cookie);
510         ObjectRegistry::onSwitchThread(epe);
511         return engine_data;
512     }
513
514     bool isDatatypeSupported(const void *cookie) {
515         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
516         bool isSupported = serverApi->cookie->is_datatype_supported(cookie);
517         ObjectRegistry::onSwitchThread(epe);
518         return isSupported;
519     }
520
521     uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
522         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
523         uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
524         ObjectRegistry::onSwitchThread(epe);
525         return opcode;
526     }
527
528     bool validateSessionCas(const uint64_t cas) {
529         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
530         bool ret = serverApi->cookie->validate_session_cas(cas);
531         ObjectRegistry::onSwitchThread(epe);
532         return ret;
533     }
534
535     void decrementSessionCtr(void) {
536         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
537         serverApi->cookie->decrement_session_ctr();
538         ObjectRegistry::onSwitchThread(epe);
539     }
540
541     void registerEngineCallback(ENGINE_EVENT_TYPE type,
542                                 EVENT_CALLBACK cb, const void *cb_data);
543
544     template <typename T>
545     void notifyIOComplete(T cookies, ENGINE_ERROR_CODE status) {
546         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
547         std::for_each(cookies.begin(), cookies.end(),
548                       std::bind2nd(std::ptr_fun((NOTIFY_IO_COMPLETE_T)serverApi->cookie->notify_io_complete),
549                                    status));
550         ObjectRegistry::onSwitchThread(epe);
551     }
552
553     void handleDisconnect(const void *cookie);
554
555     protocol_binary_response_status stopFlusher(const char **msg, size_t *msg_size) {
556         (void) msg_size;
557         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
558         *msg = NULL;
559         if (!epstore->pauseFlusher()) {
560             LOG(EXTENSION_LOG_INFO, "Unable to stop flusher");
561             *msg = "Flusher not running.";
562             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
563         }
564         return rv;
565     }
566
567     protocol_binary_response_status startFlusher(const char **msg, size_t *msg_size) {
568         (void) msg_size;
569         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
570         *msg = NULL;
571         if (!epstore->resumeFlusher()) {
572             LOG(EXTENSION_LOG_INFO, "Unable to start flusher");
573             *msg = "Flusher not shut down.";
574             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
575         }
576         return rv;
577     }
578
579     ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL) {
580         return epstore->deleteVBucket(vbid, c);
581     }
582
583     ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
584                                 const void *cookie = NULL) {
585         return epstore->compactDB(vbid, c, cookie);
586     }
587
588     bool resetVBucket(uint16_t vbid) {
589         return epstore->resetVBucket(vbid);
590     }
591
592     void setTapKeepAlive(uint32_t to) {
593         configuration.setTapKeepalive((size_t)to);
594     }
595
596     void setFlushAll(bool enabled) {
597         flushAllEnabled = enabled;
598     }
599
600     protocol_binary_response_status evictKey(const std::string &key,
601                                              uint16_t vbucket,
602                                              const char **msg,
603                                              size_t *msg_size) {
604         return epstore->evictKey(key, vbucket, msg, msg_size);
605     }
606
607     bool getLocked(const std::string &key,
608                    uint16_t vbucket,
609                    Callback<GetValue> &cb,
610                    rel_time_t currentTime,
611                    uint32_t lockTimeout,
612                    const void *cookie) {
613         return epstore->getLocked(key, vbucket, cb, currentTime, lockTimeout, cookie);
614     }
615
616     ENGINE_ERROR_CODE unlockKey(const std::string &key,
617                                 uint16_t vbucket,
618                                 uint64_t cas,
619                                 rel_time_t currentTime) {
620         return epstore->unlockKey(key, vbucket, cas, currentTime);
621     }
622
623     ENGINE_ERROR_CODE observe(const void* cookie,
624                               protocol_binary_request_header *request,
625                               ADD_RESPONSE response);
626
627     RCPtr<VBucket> getVBucket(uint16_t vbucket) {
628         return epstore->getVBucket(vbucket);
629     }
630
631     ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t to, bool transfer) {
632         return epstore->setVBucketState(vbid, to, transfer);
633     }
634
635     ~EventuallyPersistentEngine();
636
637     engine_info *getInfo() {
638         return &info.info;
639     }
640
641     EPStats &getEpStats() {
642         return stats;
643     }
644
645     EventuallyPersistentStore* getEpStore() { return epstore; }
646
647     TapConnMap &getTapConnMap() { return *tapConnMap; }
648
649     DcpConnMap &getDcpConnMap() { return *dcpConnMap_; }
650
651     TapConfig &getTapConfig() { return *tapConfig; }
652
653     TapThrottle &getTapThrottle() { return *tapThrottle; }
654
655     CheckpointConfig &getCheckpointConfig() { return *checkpointConfig; }
656
657     SERVER_HANDLE_V1* getServerApi() { return serverApi; }
658
659     Configuration &getConfiguration() {
660         return configuration;
661     }
662
663     ENGINE_ERROR_CODE deregisterTapClient(const void* cookie,
664                                           protocol_binary_request_header *request,
665                                           ADD_RESPONSE response);
666
667     ENGINE_ERROR_CODE handleCheckpointCmds(const void* cookie,
668                                            protocol_binary_request_header *request,
669                                            ADD_RESPONSE response);
670
671     ENGINE_ERROR_CODE handleSeqnoCmds(const void* cookie,
672                                       protocol_binary_request_header *request,
673                                       ADD_RESPONSE response);
674
675     ENGINE_ERROR_CODE resetReplicationChain(const void* cookie,
676                                             protocol_binary_request_header *request,
677                                             ADD_RESPONSE response);
678
679     ENGINE_ERROR_CODE changeTapVBFilter(const void* cookie,
680                                         protocol_binary_request_header *request,
681                                         ADD_RESPONSE response);
682
683     ENGINE_ERROR_CODE handleTrafficControlCmd(const void* cookie,
684                                               protocol_binary_request_header *request,
685                                               ADD_RESPONSE response);
686
687     size_t getGetlDefaultTimeout() const {
688         return getlDefaultTimeout;
689     }
690
691     size_t getGetlMaxTimeout() const {
692         return getlMaxTimeout;
693     }
694
695     size_t getMaxFailoverEntries() const {
696         return maxFailoverEntries;
697     }
698
699     bool isDegradedMode() const {
700         return epstore->isWarmingUp() || !trafficEnabled.load();
701     }
702
703     WorkLoadPolicy &getWorkLoadPolicy(void) {
704         return *workload;
705     }
706
707     bucket_priority_t getWorkloadPriority(void) {return workloadPriority; }
708     void setWorkloadPriority(bucket_priority_t p) { workloadPriority = p; }
709
710     struct clusterConfig {
711         clusterConfig() : len(0), config(NULL) {}
712         uint32_t len;
713         uint8_t *config;
714         Mutex lock;
715     } clusterConfig;
716
717     ENGINE_ERROR_CODE getRandomKey(const void *cookie,
718                                    ADD_RESPONSE response);
719
720     ConnHandler* getConnHandler(const void *cookie);
721
722     void addLookupAllKeys(const void *cookie, ENGINE_ERROR_CODE err);
723
724     /**
725      * Get a (sloppy) list of the sequence numbers for all of the vbuckets
726      * on this server. It is not to be treated as a consistent set of seqence,
727      * but rather a list of "at least" numbers. The way the list is generated
728      * is that we're starting for vbucket 0 and record the current number,
729      * then look at the next vbucket and record its number. That means that
730      * at the time we get the number for vbucket X all of the previous
731      * numbers could have been incremented. If the client just needs a list
732      * of where we are for each vbucket this method may be more optimal than
733      * requesting one by one.
734      *
735      * @param cookie The cookie representing the connection to requesting
736      *               list
737      * @param add_response The method used to format the output buffer
738      * @return ENGINE_SUCCESS upon success
739      */
740     ENGINE_ERROR_CODE getAllVBucketSequenceNumbers(const void *cookie,
741                                                    ADD_RESPONSE response);
742
743     static SERVER_LOG_API *loggerApi;
744
745 protected:
746     friend class EpEngineValueChangeListener;
747
748     void setMaxItemSize(size_t value) {
749         maxItemSize = value;
750     }
751
752     void setGetlDefaultTimeout(size_t value) {
753         getlDefaultTimeout = value;
754     }
755
756     void setGetlMaxTimeout(size_t value) {
757         getlMaxTimeout = value;
758     }
759
760     EventuallyPersistentEngine(GET_SERVER_API get_server_api);
761     friend ENGINE_ERROR_CODE create_instance(uint64_t interface,
762                                              GET_SERVER_API get_server_api,
763                                              ENGINE_HANDLE **handle);
764     uint16_t doWalkTapQueue(const void *cookie, item **itm, void **es,
765                             uint16_t *nes, uint8_t *ttl, uint16_t *flags,
766                             uint32_t *seqno, uint16_t *vbucket,
767                             TapProducer *c, bool &retry);
768
769
770     ENGINE_ERROR_CODE processTapAck(const void *cookie,
771                                     uint32_t seqno,
772                                     uint16_t status,
773                                     const std::string &msg);
774
775     /**
776      * Report the state of a memory condition when out of memory.
777      *
778      * @return ETMPFAIL if we think we can recover without interaction,
779      *         else ENOMEM
780      */
781     ENGINE_ERROR_CODE memoryCondition() {
782         // Do we think it's possible we could free something?
783         bool haveEvidenceWeCanFreeMemory(stats.getMaxDataSize() > stats.memOverhead);
784         if (haveEvidenceWeCanFreeMemory) {
785             // Look for more evidence by seeing if we have resident items.
786             VBucketCountVisitor countVisitor(*this, vbucket_state_active);
787             epstore->visit(countVisitor);
788
789             haveEvidenceWeCanFreeMemory = countVisitor.getNonResident() <
790                 countVisitor.getNumItems();
791         }
792         if (haveEvidenceWeCanFreeMemory) {
793             ++stats.tmp_oom_errors;
794             // Wake up the item pager task as memory usage
795             // seems to have exceeded high water mark
796             if ((getEpStore()->fetchItemPagerTask())->getState() ==
797                                                                 TASK_SNOOZED) {
798                 ExecutorPool::get()->wake(
799                         (getEpStore()->fetchItemPagerTask())->getId());
800             }
801             return ENGINE_TMPFAIL;
802         } else {
803             ++stats.oom_errors;
804             return ENGINE_ENOMEM;
805         }
806     }
807
808     friend class BGFetchCallback;
809     friend class EventuallyPersistentStore;
810
811     bool enableTraffic(bool enable) {
812         bool inverse = !enable;
813         return trafficEnabled.compare_exchange_strong(inverse, enable);
814     }
815
816     ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat);
817     ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat);
818     ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat);
819     ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat,
820                                      const char* stat_key,
821                                      int nkey,
822                                      bool prevStateRequested,
823                                      bool details);
824     ENGINE_ERROR_CODE doHashStats(const void *cookie, ADD_STAT add_stat);
825     ENGINE_ERROR_CODE doCheckpointStats(const void *cookie, ADD_STAT add_stat,
826                                         const char* stat_key, int nkey);
827     ENGINE_ERROR_CODE doTapStats(const void *cookie, ADD_STAT add_stat);
828     ENGINE_ERROR_CODE doDcpStats(const void *cookie, ADD_STAT add_stat);
829     ENGINE_ERROR_CODE doConnAggStats(const void *cookie, ADD_STAT add_stat,
830                                      const char *sep, size_t nsep,
831                                      conn_type_t connType);
832     ENGINE_ERROR_CODE doTimingStats(const void *cookie, ADD_STAT add_stat);
833     ENGINE_ERROR_CODE doSchedulerStats(const void *cookie, ADD_STAT add_stat);
834     ENGINE_ERROR_CODE doRunTimeStats(const void *cookie, ADD_STAT add_stat);
835     ENGINE_ERROR_CODE doDispatcherStats(const void *cookie, ADD_STAT add_stat);
836     ENGINE_ERROR_CODE doKeyStats(const void *cookie, ADD_STAT add_stat,
837                                  uint16_t vbid, std::string &key, bool validate=false);
838     ENGINE_ERROR_CODE doTapVbTakeoverStats(const void *cookie,
839                                            ADD_STAT add_stat,
840                                            std::string& key,
841                                            uint16_t vbid);
842
843     ENGINE_ERROR_CODE doDcpVbTakeoverStats(const void *cookie,
844                                            ADD_STAT add_stat,
845                                            std::string &key,
846                                            uint16_t vbid);
847     ENGINE_ERROR_CODE doVbIdFailoverLogStats(const void *cookie,
848                                              ADD_STAT add_stat,
849                                              uint16_t vbid);
850     ENGINE_ERROR_CODE doAllFailoverLogStats(const void *cookie, ADD_STAT add_stat);
851     ENGINE_ERROR_CODE doWorkloadStats(const void *cookie, ADD_STAT add_stat);
852     ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
853                                    const char* stat_key, int nkey);
854     ENGINE_ERROR_CODE doDiskStats(const void *cookie, ADD_STAT add_stat,
855                                   const char* stat_key, int nkey);
856
857     void addLookupResult(const void *cookie, Item *result) {
858         LockHolder lh(lookupMutex);
859         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
860         if (it != lookups.end()) {
861             if (it->second != NULL) {
862                 LOG(EXTENSION_LOG_DEBUG,
863                     "Cleaning up old lookup result for '%s'",
864                     it->second->getKey().c_str());
865                 delete it->second;
866             } else {
867                 LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
868             }
869             lookups.erase(it);
870         }
871         lookups[cookie] = result;
872     }
873
874     bool fetchLookupResult(const void *cookie, Item **itm) {
875         // This will return *and erase* the lookup result for a connection.
876         // You look it up, you own it.
877         LockHolder lh(lookupMutex);
878         std::map<const void*, Item*>::iterator it = lookups.find(cookie);
879         if (it != lookups.end()) {
880             *itm = it->second;
881             lookups.erase(it);
882             return true;
883         } else {
884             return false;
885         }
886     }
887
888     // Get the current tap connection for this cookie.
889     // If this method returns NULL, you should return TAP_DISCONNECT
890     TapProducer* getTapProducer(const void *cookie);
891
892     // Initialize all required callbacks of this engine with the underlying
893     // server.
894     void initializeEngineCallbacks();
895
896     SERVER_HANDLE_V1 *serverApi;
897     EventuallyPersistentStore *epstore;
898     WorkLoadPolicy *workload;
899     bucket_priority_t workloadPriority;
900
901     TapThrottle *tapThrottle;
902     std::map<const void*, Item*> lookups;
903     unordered_map<const void*, ENGINE_ERROR_CODE> allKeysLookups;
904     Mutex lookupMutex;
905     GET_SERVER_API getServerApiFunc;
906     union {
907         engine_info info;
908         char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
909     } info;
910
911     DcpConnMap *dcpConnMap_;
912     TapConnMap *tapConnMap;
913     TapConfig *tapConfig;
914     CheckpointConfig *checkpointConfig;
915     std::string name;
916     size_t maxItemSize;
917     size_t getlDefaultTimeout;
918     size_t getlMaxTimeout;
919     size_t maxFailoverEntries;
920     EPStats stats;
921     Configuration configuration;
922     AtomicValue<bool> trafficEnabled;
923
924     bool flushAllEnabled;
925     // a unique system generated token initialized at each time
926     // ep_engine starts up.
927     AtomicValue<time_t> startupTime;
928
929 };
930
931 #endif  // SRC_EP_ENGINE_H_