a26236d5b1e4f55dd48b2944d27b2cd3d4592e0e
[ep-engine.git] / src / ep_engine.cc
1
2 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 /*
4  *     Copyright 2010 Couchbase, Inc
5  *
6  *   Licensed under the Apache License, Version 2.0 (the "License");
7  *   you may not use this file except in compliance with the License.
8  *   You may obtain a copy of the License at
9  *
10  *       http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *   Unless required by applicable law or agreed to in writing, software
13  *   distributed under the License is distributed on an "AS IS" BASIS,
14  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *   See the License for the specific language governing permissions and
16  *   limitations under the License.
17  */
18
19 #include "config.h"
20
21 #include <fcntl.h>
22 #include <memcached/engine.h>
23 #include <memcached/protocol_binary.h>
24 #include <memcached/util.h>
25 #include <platform/platform.h>
26 #include <stdarg.h>
27
28 #include <cstdio>
29 #include <cstring>
30 #include <fstream>
31 #include <iostream>
32 #include <limits>
33 #include <string>
34 #include <vector>
35
36 #include "backfill.h"
37 #include "ep_engine.h"
38 #include "failover-table.h"
39 #include "flusher.h"
40 #include "connmap.h"
41 #include "htresizer.h"
42 #include "memory_tracker.h"
43 #include "stats-info.h"
44 #define STATWRITER_NAMESPACE core_engine
45 #include "statwriter.h"
46 #undef STATWRITER_NAMESPACE
47 #include "tapthrottle.h"
48 #include "dcp-consumer.h"
49 #include "dcp-producer.h"
50 #include "warmup.h"
51
52 #include <JSON_checker.h>
53
54 static ALLOCATOR_HOOKS_API *hooksApi;
55 static SERVER_LOG_API *loggerApi;
56
57 static size_t percentOf(size_t val, double percent) {
58     return static_cast<size_t>(static_cast<double>(val) * percent);
59 }
60
61 /**
62  * Helper function to avoid typing in the long cast all over the place
63  * @param handle pointer to the engine
64  * @return the engine as a class
65  */
66 static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
67 {
68     EventuallyPersistentEngine* ret;
69     ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
70     ObjectRegistry::onSwitchThread(ret);
71     return ret;
72 }
73
74 static inline void releaseHandle(ENGINE_HANDLE* handle) {
75     (void) handle;
76     ObjectRegistry::onSwitchThread(NULL);
77 }
78
79
80 /**
81  * Call the response callback and return the appropriate value so that
82  * the core knows what to do..
83  */
84 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
85                                       uint16_t keylen,
86                                       const void *ext, uint8_t extlen,
87                                       const void *body, uint32_t bodylen,
88                                       uint8_t datatype, uint16_t status,
89                                       uint64_t cas, const void *cookie)
90 {
91     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
92     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
93     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
94                  status, cas, cookie)) {
95         rv = ENGINE_SUCCESS;
96     }
97     ObjectRegistry::onSwitchThread(e);
98     return rv;
99 }
100
101 template <typename T>
102 static void validate(T v, T l, T h) {
103     if (v < l || v > h) {
104         throw std::runtime_error("Value out of range.");
105     }
106 }
107
108
109 static void checkNumeric(const char* str) {
110     int i = 0;
111     if (str[0] == '-') {
112         i++;
113     }
114     for (; str[i]; i++) {
115         using namespace std;
116         if (!isdigit(str[i])) {
117             throw std::runtime_error("Value is not numeric");
118         }
119     }
120 }
121
122 // The Engine API specifies C linkage for the functions..
123 extern "C" {
124
125     static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
126     {
127         engine_info* info = getHandle(handle)->getInfo();
128         releaseHandle(handle);
129         return info;
130     }
131
132     static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
133                                            const char* config_str)
134     {
135         ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
136         releaseHandle(handle);
137         return err_code;
138     }
139
140     static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
141     {
142         getHandle(handle)->destroy(force);
143         delete getHandle(handle);
144         releaseHandle(NULL);
145     }
146
147     static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
148                                              const void* cookie,
149                                              item **itm,
150                                              const void* key,
151                                              const size_t nkey,
152                                              const size_t nbytes,
153                                              const int flags,
154                                              const rel_time_t exptime,
155                                              uint8_t datatype)
156     {
157         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
158             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
159                     " (ItemAllocate)");
160             return ENGINE_EINVAL;
161         }
162         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
163                                                                      itm, key,
164                                                                      nkey,
165                                                                      nbytes,
166                                                                      flags,
167                                                                      exptime,
168                                                                      datatype);
169         releaseHandle(handle);
170         return err_code;
171     }
172
173     static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
174                                            const void* cookie,
175                                            const void* key,
176                                            const size_t nkey,
177                                            uint64_t* cas,
178                                            uint16_t vbucket)
179     {
180         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
181                                                                    nkey, cas,
182                                                                    vbucket);
183         releaseHandle(handle);
184         return err_code;
185     }
186
187     static void EvpItemRelease(ENGINE_HANDLE* handle,
188                                const void *cookie,
189                                item* itm)
190     {
191         getHandle(handle)->itemRelease(cookie, itm);
192         releaseHandle(handle);
193     }
194
195     static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
196                                     const void* cookie,
197                                     item** itm,
198                                     const void* key,
199                                     const int nkey,
200                                     uint16_t vbucket)
201     {
202         ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
203                                                             nkey, vbucket, true);
204         releaseHandle(handle);
205         return err_code;
206     }
207
208     static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
209                                          const void* cookie,
210                                          const char* stat_key,
211                                          int nkey,
212                                          ADD_STAT add_stat)
213     {
214         ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
215                                                                  stat_key,
216                                                                  nkey,
217                                                                  add_stat);
218         releaseHandle(handle);
219         return err_code;
220     }
221
222     static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
223                                       const void *cookie,
224                                       item* itm,
225                                       uint64_t *cas,
226                                       ENGINE_STORE_OPERATION operation,
227                                       uint16_t vbucket)
228     {
229         ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
230                                                               operation,
231                                                               vbucket);
232         releaseHandle(handle);
233         return err_code;
234     }
235
236     static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
237                                            const void* cookie,
238                                            const void* key,
239                                            const int nkey,
240                                            const bool increment,
241                                            const bool create,
242                                            const uint64_t delta,
243                                            const uint64_t initial,
244                                            const rel_time_t exptime,
245                                            uint64_t *cas,
246                                            uint8_t datatype,
247                                            uint64_t *result,
248                                            uint16_t vbucket)
249     {
250         if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
251             if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
252                 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
253                         " (Arithmetic)");
254             } else {
255                 LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
256                     "operations on compressed data!");
257             }
258             return ENGINE_EINVAL;
259         }
260         ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
261                                                                 nkey,
262                                                                 increment,
263                                                                 create, delta,
264                                                                 initial,
265                                                                 exptime, cas,
266                                                                 datatype,
267                                                                 result,
268                                                                 vbucket);
269         releaseHandle(handle);
270         return ecode;
271     }
272
273     static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
274                                       const void* cookie, time_t when)
275     {
276         ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
277         releaseHandle(handle);
278         return err_code;
279     }
280
281     static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
282     {
283         getHandle(handle)->resetStats();
284         releaseHandle(handle);
285     }
286
287     static protocol_binary_response_status stopFlusher(
288                                                  EventuallyPersistentEngine *e,
289                                                  const char **msg,
290                                                  size_t *msg_size) {
291         return e->stopFlusher(msg, msg_size);
292     }
293
294     static protocol_binary_response_status startFlusher(
295                                                  EventuallyPersistentEngine *e,
296                                                  const char **msg,
297                                                  size_t *msg_size) {
298         return e->startFlusher(msg, msg_size);
299     }
300
301     static protocol_binary_response_status setTapParam(
302                                                  EventuallyPersistentEngine *e,
303                                                  const char *keyz,
304                                                  const char *valz,
305                                                  const char **msg, size_t *) {
306         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
307
308         try {
309             int v = atoi(valz);
310             if (strcmp(keyz, "tap_keepalive") == 0) {
311                 checkNumeric(valz);
312                 validate(v, 0, MAX_TAP_KEEP_ALIVE);
313                 e->setTapKeepAlive(static_cast<uint32_t>(v));
314             } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
315                 checkNumeric(valz);
316                 e->getConfiguration().setTapThrottleThreshold(v);
317             } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
318                 checkNumeric(valz);
319                 e->getConfiguration().setTapThrottleQueueCap(v);
320             } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
321                 checkNumeric(valz);
322                 e->getConfiguration().setTapThrottleCapPcnt(v);
323             } else {
324                 *msg = "Unknown config param";
325                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
326             }
327         } catch(std::runtime_error &) {
328             *msg = "Value out of range.";
329             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
330         }
331
332         return rv;
333     }
334
335     static protocol_binary_response_status setCheckpointParam(
336                                                  EventuallyPersistentEngine *e,
337                                                               const char *keyz,
338                                                               const char *valz,
339                                                               const char **msg,
340                                                               size_t *) {
341         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
342
343         try {
344             int v = atoi(valz);
345             if (strcmp(keyz, "chk_max_items") == 0) {
346                 checkNumeric(valz);
347                 validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
348                 e->getConfiguration().setChkMaxItems(v);
349             } else if (strcmp(keyz, "chk_period") == 0) {
350                 checkNumeric(valz);
351                 validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
352                 e->getConfiguration().setChkPeriod(v);
353             } else if (strcmp(keyz, "max_checkpoints") == 0) {
354                 checkNumeric(valz);
355                 validate(v, DEFAULT_MAX_CHECKPOINTS,
356                          MAX_CHECKPOINTS_UPPER_BOUND);
357                 e->getConfiguration().setMaxCheckpoints(v);
358             } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
359                 if (strcmp(valz, "true") == 0) {
360                     e->getConfiguration().setItemNumBasedNewChk(true);
361                 } else {
362                     e->getConfiguration().setItemNumBasedNewChk(false);
363                 }
364             } else if (strcmp(keyz, "keep_closed_chks") == 0) {
365                 if (strcmp(valz, "true") == 0) {
366                     e->getConfiguration().setKeepClosedChks(true);
367                 } else {
368                     e->getConfiguration().setKeepClosedChks(false);
369                 }
370             } else if (strcmp(keyz, "enable_chk_merge") == 0) {
371                 if (strcmp(valz, "true") == 0) {
372                     e->getConfiguration().setEnableChkMerge(true);
373                 } else {
374                     e->getConfiguration().setEnableChkMerge(false);
375                 }
376             } else {
377                 *msg = "Unknown config param";
378                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
379             }
380         } catch(std::runtime_error &) {
381             *msg = "Value out of range.";
382             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
383         }
384
385         return rv;
386     }
387
388     static protocol_binary_response_status setFlushParam(
389                                                  EventuallyPersistentEngine *e,
390                                                  const char *keyz,
391                                                  const char *valz,
392                                                  const char **msg,
393                                                  size_t *) {
394         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
395
396         // Handle the actual mutation.
397         try {
398             int v = atoi(valz);
399             if (strcmp(keyz, "bg_fetch_delay") == 0) {
400                 checkNumeric(valz);
401                 e->getConfiguration().setBgFetchDelay(v);
402             } else if (strcmp(keyz, "flushall_enabled") == 0) {
403                 if (strcmp(valz, "true") == 0) {
404                     e->getConfiguration().setFlushallEnabled(true);
405                 } else if(strcmp(valz, "false") == 0) {
406                     e->getConfiguration().setFlushallEnabled(false);
407                 } else {
408                     throw std::runtime_error("value out of range.");
409                 }
410             } else if (strcmp(keyz, "max_size") == 0) {
411                 char *ptr = NULL;
412                 checkNumeric(valz);
413                 uint64_t vsize = strtoull(valz, &ptr, 10);
414                 validate(vsize, static_cast<uint64_t>(0),
415                          std::numeric_limits<uint64_t>::max());
416                 e->getConfiguration().setMaxSize(vsize);
417                 e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
418                 e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
419             } else if (strcmp(keyz, "mem_low_wat") == 0) {
420                 char *ptr = NULL;
421                 checkNumeric(valz);
422                 uint64_t vsize = strtoull(valz, &ptr, 10);
423                 validate(vsize, static_cast<uint64_t>(0),
424                          std::numeric_limits<uint64_t>::max());
425                 e->getConfiguration().setMemLowWat(vsize);
426             } else if (strcmp(keyz, "mem_high_wat") == 0) {
427                 char *ptr = NULL;
428                 checkNumeric(valz);
429                 uint64_t vsize = strtoull(valz, &ptr, 10);
430                 validate(vsize, static_cast<uint64_t>(0),
431                          std::numeric_limits<uint64_t>::max());
432                 e->getConfiguration().setMemHighWat(vsize);
433             } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
434                 checkNumeric(valz);
435                 validate(v, 0, 100);
436                 e->getConfiguration().setBackfillMemThreshold(v);
437             } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
438                 checkNumeric(valz);
439                 validate(v, 0, 100);
440                 e->getConfiguration().setCompactionExpMemThreshold(v);
441             } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
442                 checkNumeric(valz);
443                 validate(v, 0, 100);
444                 e->getConfiguration().setMutationMemThreshold(v);
445             } else if (strcmp(keyz, "timing_log") == 0) {
446                 EPStats &stats = e->getEpStats();
447                 std::ostream *old = stats.timingLog;
448                 stats.timingLog = NULL;
449                 delete old;
450                 if (strcmp(valz, "off") == 0) {
451                     LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
452                 } else {
453                     std::ofstream *tmp(new std::ofstream(valz));
454                     if (tmp->good()) {
455                         LOG(EXTENSION_LOG_INFO,
456                             "Logging detailed timings to ``%s''.", valz);
457                         stats.timingLog = tmp;
458                     } else {
459                         LOG(EXTENSION_LOG_WARNING,
460                             "Error setting detailed timing log to ``%s'':  %s",
461                             valz, strerror(errno));
462                         delete tmp;
463                     }
464                 }
465             } else if (strcmp(keyz, "exp_pager_stime") == 0) {
466                 char *ptr = NULL;
467                 checkNumeric(valz);
468                 uint64_t vsize = strtoull(valz, &ptr, 10);
469                 validate(vsize, static_cast<uint64_t>(0),
470                          std::numeric_limits<uint64_t>::max());
471                 e->getConfiguration().setExpPagerStime((size_t)vsize);
472             } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
473                 if (strcmp(valz, "true") == 0) {
474                     e->getConfiguration().setAccessScannerEnabled(true);
475                 } else if (strcmp(valz, "false") == 0) {
476                     e->getConfiguration().setAccessScannerEnabled(false);
477                 } else {
478                     throw std::runtime_error("Value expected: true/false.");
479                 }
480             } else if (strcmp(keyz, "alog_sleep_time") == 0) {
481                 checkNumeric(valz);
482                 e->getConfiguration().setAlogSleepTime(v);
483             } else if (strcmp(keyz, "alog_task_time") == 0) {
484                 checkNumeric(valz);
485                 e->getConfiguration().setAlogTaskTime(v);
486             } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
487                 checkNumeric(valz);
488                 e->getConfiguration().setPagerActiveVbPcnt(v);
489             } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
490                 checkNumeric(valz);
491                 validate(v, 0, std::numeric_limits<int>::max());
492                 e->getConfiguration().setWarmupMinMemoryThreshold(v);
493             } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
494                 checkNumeric(valz);
495                 validate(v, 0, std::numeric_limits<int>::max());
496                 e->getConfiguration().setWarmupMinItemsThreshold(v);
497             } else if (strcmp(keyz, "max_num_readers") == 0) {
498                 checkNumeric(valz);
499                 validate(v, 0, std::numeric_limits<int>::max());
500                 e->getConfiguration().setMaxNumReaders(v);
501                 ExecutorPool::get()->setMaxReaders(v);
502             } else if (strcmp(keyz, "max_num_writers") == 0) {
503                 checkNumeric(valz);
504                 validate(v, 0, std::numeric_limits<int>::max());
505                 e->getConfiguration().setMaxNumWriters(v);
506                 ExecutorPool::get()->setMaxWriters(v);
507             } else if (strcmp(keyz, "max_num_auxio") == 0) {
508                 checkNumeric(valz);
509                 validate(v, 0, std::numeric_limits<int>::max());
510                 e->getConfiguration().setMaxNumAuxio(v);
511                 ExecutorPool::get()->setMaxAuxIO(v);
512             } else if (strcmp(keyz, "max_num_nonio") == 0) {
513                 checkNumeric(valz);
514                 validate(v, 0, std::numeric_limits<int>::max());
515                 e->getConfiguration().setMaxNumNonio(v);
516                 ExecutorPool::get()->setMaxNonIO(v);
517             } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
518                 checkNumeric(valz);
519                 validate(v, 1, std::numeric_limits<int>::max());
520                 e->getConfiguration().setCompactionWriteQueueCap(v);
521             } else {
522                 *msg = "Unknown config param";
523                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
524             }
525         } catch(std::runtime_error& ex) {
526             *msg = strdup(ex.what());
527             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
528         }
529
530         return rv;
531     }
532
533     static protocol_binary_response_status evictKey(
534                                                  EventuallyPersistentEngine *e,
535                                                  protocol_binary_request_header
536                                                                       *request,
537                                                  const char **msg,
538                                                  size_t *msg_size) {
539         protocol_binary_request_no_extras *req =
540             (protocol_binary_request_no_extras*)request;
541
542         char keyz[256];
543
544         // Read the key.
545         int keylen = ntohs(req->message.header.request.keylen);
546         if (keylen >= (int)sizeof(keyz)) {
547             *msg = "Key is too large.";
548             return PROTOCOL_BINARY_RESPONSE_EINVAL;
549         }
550         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
551         keyz[keylen] = 0x00;
552
553         uint16_t vbucket = ntohs(request->request.vbucket);
554
555         std::string key(keyz, keylen);
556
557         LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
558                 keyz);
559
560         protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
561                                                          msg_size);
562         if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
563             rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
564             if (e->isDegradedMode()) {
565                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
566             }
567         }
568         return rv;
569     }
570
571     static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
572                                        protocol_binary_request_header *req,
573                                        const void *cookie,
574                                        Item **itm,
575                                        const char **msg,
576                                        size_t *,
577                                        protocol_binary_response_status *res) {
578
579         uint8_t extlen = req->request.extlen;
580         if (extlen != 0 && extlen != 4) {
581             *msg = "Invalid packet format (extlen may be 0 or 4)";
582             *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
583             return ENGINE_EINVAL;
584         }
585
586         protocol_binary_request_getl *grequest =
587             (protocol_binary_request_getl*)req;
588         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
589
590         const char *keyp = reinterpret_cast<const char*>(req->bytes);
591         keyp += sizeof(req->bytes) + extlen;
592         std::string key(keyp, ntohs(req->request.keylen));
593         uint16_t vbucket = ntohs(req->request.vbucket);
594
595         RememberingCallback<GetValue> getCb;
596         uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
597         uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
598         uint32_t lockTimeout = default_timeout;
599         if (extlen == 4) {
600             lockTimeout = ntohl(grequest->message.body.expiration);
601         }
602
603         if (lockTimeout >  max_timeout || lockTimeout < 1) {
604             LOG(EXTENSION_LOG_WARNING,
605                 "Illegal value for lock timeout specified"
606                 " %u. Using default value: %u", lockTimeout, default_timeout);
607             lockTimeout = default_timeout;
608         }
609
610         bool gotLock = e->getLocked(key, vbucket, getCb,
611                                     ep_current_time(),
612                                     lockTimeout, cookie);
613
614         getCb.waitForValue();
615         ENGINE_ERROR_CODE rv = getCb.val.getStatus();
616
617         if (rv == ENGINE_SUCCESS) {
618             *itm = getCb.val.getValue();
619             ++(e->getEpStats().numOpsGet);
620         } else if (rv == ENGINE_EWOULDBLOCK) {
621
622             // need to wait for value
623             return rv;
624         } else if (rv == ENGINE_NOT_MY_VBUCKET) {
625             *msg = "That's not my bucket.";
626             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
627             return ENGINE_NOT_MY_VBUCKET;
628         } else if (!gotLock){
629             *msg =  "LOCK_ERROR";
630             *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
631             return ENGINE_TMPFAIL;
632         } else {
633             if (e->isDegradedMode()) {
634                 *msg = "LOCK_TMP_ERROR";
635                 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
636                 return ENGINE_TMPFAIL;
637             }
638
639             *msg = "NOT_FOUND";
640             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
641             return ENGINE_KEY_ENOENT;
642         }
643
644         return rv;
645     }
646
647     static protocol_binary_response_status unlockKey(
648                                                  EventuallyPersistentEngine *e,
649                                                  protocol_binary_request_header
650                                                                       *request,
651                                                  const char **msg,
652                                                  size_t *)
653     {
654         protocol_binary_request_no_extras *req =
655             (protocol_binary_request_no_extras*)request;
656
657         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
658         char keyz[256];
659
660         // Read the key.
661         int keylen = ntohs(req->message.header.request.keylen);
662         if (keylen >= (int)sizeof(keyz)) {
663             *msg = "Key is too large.";
664             return PROTOCOL_BINARY_RESPONSE_EINVAL;
665         }
666
667         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
668         keyz[keylen] = 0x00;
669
670         uint16_t vbucket = ntohs(request->request.vbucket);
671         std::string key(keyz, keylen);
672
673         LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
674
675         RememberingCallback<GetValue> getCb;
676         uint64_t cas = ntohll(request->request.cas);
677
678         ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
679                                             ep_current_time());
680
681         if (rv == ENGINE_SUCCESS) {
682             *msg = "UNLOCKED";
683         } else if (rv == ENGINE_TMPFAIL){
684             *msg =  "UNLOCK_ERROR";
685             res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
686         } else {
687             if (e->isDegradedMode()) {
688                 *msg = "LOCK_TMP_ERROR";
689                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
690             }
691
692             RCPtr<VBucket> vb = e->getVBucket(vbucket);
693             if (!vb) {
694                 *msg = "That's not my bucket.";
695                 res =  PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
696             }
697             *msg = "NOT_FOUND";
698             res =  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
699         }
700
701         return res;
702     }
703
704     static protocol_binary_response_status setParam(
705                                             EventuallyPersistentEngine *e,
706                                             protocol_binary_request_set_param
707                                                                      *req,
708                                             const char **msg,
709                                             size_t *msg_size) {
710
711         size_t keylen = ntohs(req->message.header.request.keylen);
712         uint8_t extlen = req->message.header.request.extlen;
713         size_t vallen = ntohl(req->message.header.request.bodylen);
714         protocol_binary_engine_param_t paramtype =
715             static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
716
717         if (keylen == 0 || (vallen - keylen - extlen) == 0) {
718             return PROTOCOL_BINARY_RESPONSE_EINVAL;
719         }
720
721         const char *keyp = reinterpret_cast<const char*>(req->bytes)
722                            + sizeof(req->bytes);
723         const char *valuep = keyp + keylen;
724         vallen -= (keylen + extlen);
725
726         char keyz[32];
727         char valz[512];
728
729         // Read the key.
730         if (keylen >= sizeof(keyz)) {
731             *msg = "Key is too large.";
732             return PROTOCOL_BINARY_RESPONSE_EINVAL;
733         }
734         memcpy(keyz, keyp, keylen);
735         keyz[keylen] = 0x00;
736
737         // Read the value.
738         if (vallen >= sizeof(valz)) {
739             *msg = "Value is too large.";
740             return PROTOCOL_BINARY_RESPONSE_EINVAL;
741         }
742         memcpy(valz, valuep, vallen);
743         valz[vallen] = 0x00;
744
745         protocol_binary_response_status rv;
746
747         switch (paramtype) {
748         case protocol_binary_engine_param_flush:
749             rv = setFlushParam(e, keyz, valz, msg, msg_size);
750             break;
751         case protocol_binary_engine_param_tap:
752             rv = setTapParam(e, keyz, valz, msg, msg_size);
753             break;
754         case protocol_binary_engine_param_checkpoint:
755             rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
756             break;
757         default:
758             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
759         }
760
761         return rv;
762     }
763
764     static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
765                                        const void *cookie,
766                                        protocol_binary_request_header *request,
767                                        ADD_RESPONSE response) {
768         protocol_binary_request_get_vbucket *req =
769             reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
770         cb_assert(req);
771
772         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
773         RCPtr<VBucket> vb = e->getVBucket(vbucket);
774         if (!vb) {
775             LockHolder lh(e->clusterConfig.lock);
776             return sendResponse(response, NULL, 0, NULL, 0,
777                                 e->clusterConfig.config,
778                                 e->clusterConfig.len,
779                                 PROTOCOL_BINARY_RAW_BYTES,
780                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
781                                 cookie);
782         } else {
783             vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
784             return sendResponse(response, NULL, 0, NULL, 0, &state,
785                                 sizeof(state),
786                                 PROTOCOL_BINARY_RAW_BYTES,
787                                 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
788         }
789     }
790
791     static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
792                                        const void *cookie,
793                                        protocol_binary_request_header *request,
794                                        ADD_RESPONSE response) {
795
796         protocol_binary_request_set_vbucket *req =
797             reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
798
799         uint64_t cas = ntohll(req->message.header.request.cas);
800
801         size_t bodylen = ntohl(req->message.header.request.bodylen)
802             - ntohs(req->message.header.request.keylen);
803         if (bodylen != sizeof(vbucket_state_t)) {
804             const std::string msg("Incorrect packet format");
805             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
806                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
807                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
808                                 cas, cookie);
809         }
810
811         vbucket_state_t state;
812         memcpy(&state, &req->message.body.state, sizeof(state));
813         state = static_cast<vbucket_state_t>(ntohl(state));
814
815         if (!is_valid_vbucket_state_t(state)) {
816             const std::string msg("Invalid vbucket state");
817             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
818                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
819                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
820                                 cas, cookie);
821         }
822
823         uint16_t vb = ntohs(req->message.header.request.vbucket);
824         if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
825             const std::string msg("VBucket number too big");
826             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
827                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
828                                 PROTOCOL_BINARY_RESPONSE_ERANGE,
829                                 cas, cookie);
830         }
831         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
832                             PROTOCOL_BINARY_RAW_BYTES,
833                             PROTOCOL_BINARY_RESPONSE_SUCCESS,
834                             cas, cookie);
835     }
836
837     static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
838                                         const void *cookie,
839                                         protocol_binary_request_header *req,
840                                         ADD_RESPONSE response) {
841
842         uint64_t cas = ntohll(req->request.cas);
843
844         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
845         uint16_t vbucket = ntohs(req->request.vbucket);
846
847         std::string msg = "";
848         if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
849             msg = "Incorrect packet format.";
850             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
851                                 msg.length(),
852                                 PROTOCOL_BINARY_RAW_BYTES,
853                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
854         }
855
856         bool sync = false;
857         uint32_t bodylen = ntohl(req->request.bodylen);
858         if (bodylen > 0) {
859             const char* ptr = reinterpret_cast<const char*>(req->bytes) +
860                 sizeof(req->bytes);
861             if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
862                 sync = true;
863             }
864         }
865
866         ENGINE_ERROR_CODE err;
867         void* es = e->getEngineSpecific(cookie);
868         if (sync) {
869             if (es == NULL) {
870                 err = e->deleteVBucket(vbucket, cookie);
871                 e->storeEngineSpecific(cookie, e);
872             } else {
873                 e->storeEngineSpecific(cookie, NULL);
874                 LOG(EXTENSION_LOG_INFO,
875                     "Completed sync deletion of vbucket %u",
876                     (unsigned)vbucket);
877                 err = ENGINE_SUCCESS;
878             }
879         } else {
880             err = e->deleteVBucket(vbucket);
881         }
882         switch (err) {
883         case ENGINE_SUCCESS:
884             LOG(EXTENSION_LOG_WARNING,
885                 "Deletion of vbucket %d was completed.", vbucket);
886             break;
887         case ENGINE_NOT_MY_VBUCKET:
888             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
889                 "because the vbucket doesn't exist!!!", vbucket);
890             res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
891             break;
892         case ENGINE_EINVAL:
893             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
894                 "because the vbucket is not in a dead state\n", vbucket);
895             msg = "Failed to delete vbucket.  Must be in the dead state.";
896             res = PROTOCOL_BINARY_RESPONSE_EINVAL;
897             break;
898         case ENGINE_EWOULDBLOCK:
899             LOG(EXTENSION_LOG_WARNING, "Requst to vbucket %d deletion is in"
900                 " EWOULDBLOCK until the database file is removed from disk",
901                 vbucket);
902             e->storeEngineSpecific(cookie, req);
903             return ENGINE_EWOULDBLOCK;
904         default:
905             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
906                 "because of unknown reasons\n", vbucket);
907             msg = "Failed to delete vbucket.  Unknown reason.";
908             res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
909         }
910
911         if (err != ENGINE_NOT_MY_VBUCKET) {
912             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
913                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
914                                 res, cas, cookie);
915         } else {
916             LockHolder lh(e->clusterConfig.lock);
917             return sendResponse(response, NULL, 0, NULL, 0,
918                                 e->clusterConfig.config,
919                                 e->clusterConfig.len,
920                                 PROTOCOL_BINARY_RAW_BYTES,
921                                 res, cas, cookie);
922         }
923
924     }
925
926     static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
927                                        protocol_binary_request_header *request,
928                                        const void *cookie,
929                                        Item **it,
930                                        const char **msg,
931                                        protocol_binary_response_status *res) {
932         EventuallyPersistentStore *eps = e->getEpStore();
933         protocol_binary_request_no_extras *req =
934             (protocol_binary_request_no_extras*)request;
935         int keylen = ntohs(req->message.header.request.keylen);
936         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
937         ENGINE_ERROR_CODE error_code;
938         std::string keystr(((char *)request) + sizeof(req->message.header),
939                             keylen);
940
941         GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
942
943         if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
944             if (error_code == ENGINE_NOT_MY_VBUCKET) {
945                 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
946                 return error_code;
947             } else if (error_code == ENGINE_TMPFAIL) {
948                 *msg = "NOT_FOUND";
949                 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
950             } else {
951                 return error_code;
952             }
953         } else {
954             *it = rv.getValue();
955             *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
956         }
957         ++(e->getEpStats().numOpsGet);
958         return ENGINE_SUCCESS;
959     }
960
961     static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
962                                        const void *cookie,
963                                        protocol_binary_request_compact_db *req,
964                                        ADD_RESPONSE response) {
965
966         std::string msg = "";
967         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
968         compaction_ctx compactreq;
969         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
970         uint64_t cas = ntohll(req->message.header.request.cas);
971
972         if (ntohs(req->message.header.request.keylen) > 0 ||
973              req->message.header.request.extlen != 24) {
974             LOG(EXTENSION_LOG_WARNING,
975                     "Compaction of vbucket %d received bad ext/key len %d/%d.",
976                     vbucket, req->message.header.request.extlen,
977                     ntohs(req->message.header.request.keylen));
978             msg = "Incorrect packet format.";
979             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
980                                 msg.length(),
981                                 PROTOCOL_BINARY_RAW_BYTES,
982                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
983         }
984         EPStats &stats = e->getEpStats();
985         compactreq.max_purged_seq = 0;
986         compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
987         compactreq.purge_before_seq =
988                                     ntohll(req->message.body.purge_before_seq);
989         compactreq.drop_deletes     = req->message.body.drop_deletes;
990
991         ENGINE_ERROR_CODE err;
992         void* es = e->getEngineSpecific(cookie);
993         if (es == NULL) {
994             ++stats.pendingCompactions;
995             e->storeEngineSpecific(cookie, e);
996             err = e->compactDB(vbucket, compactreq, cookie);
997         } else {
998             e->storeEngineSpecific(cookie, NULL);
999             err = ENGINE_SUCCESS;
1000         }
1001
1002         switch (err) {
1003             case ENGINE_SUCCESS:
1004                 LOG(EXTENSION_LOG_INFO,
1005                     "Compaction of vbucket %d completed.", vbucket);
1006                 break;
1007             case ENGINE_NOT_MY_VBUCKET:
1008                 --stats.pendingCompactions;
1009                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1010                     "because the vbucket doesn't exist!!!", vbucket);
1011                 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1012                 break;
1013             case ENGINE_EWOULDBLOCK:
1014                 LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
1015                         "in EWOULDBLOCK state until the database file is "
1016                         "compacted on disk",
1017                         vbucket);
1018                 e->storeEngineSpecific(cookie, req);
1019                 return ENGINE_EWOULDBLOCK;
1020             case ENGINE_TMPFAIL:
1021                 LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
1022                         " a temporary failure and may need to be retried",
1023                         vbucket);
1024                 msg = "Temporary failure in compacting vbucket.";
1025                 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1026                 break;
1027             default:
1028                 --stats.pendingCompactions;
1029                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1030                     "because of unknown reasons\n", vbucket);
1031                 msg = "Failed to compact vbucket.  Unknown reason.";
1032                 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1033                 break;
1034         }
1035
1036         if (err != ENGINE_NOT_MY_VBUCKET) {
1037             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1038                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1039                                 res, cas, cookie);
1040         } else {
1041             LockHolder lh(e->clusterConfig.lock);
1042             return sendResponse(response, NULL, 0, NULL, 0,
1043                                 e->clusterConfig.config,
1044                                 e->clusterConfig.len,
1045                                 PROTOCOL_BINARY_RAW_BYTES,
1046                                 res, cas, cookie);
1047         }
1048     }
1049
1050     static ENGINE_ERROR_CODE processUnknownCommand(
1051                                        EventuallyPersistentEngine *h,
1052                                        const void* cookie,
1053                                        protocol_binary_request_header *request,
1054                                        ADD_RESPONSE response)
1055     {
1056         protocol_binary_response_status res =
1057                                       PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1058         const char *msg = NULL;
1059         size_t msg_size = 0;
1060         Item *itm = NULL;
1061
1062         EPStats &stats = h->getEpStats();
1063         ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1064
1065         /**
1066          * Session validation
1067          * (For ns_server commands only)
1068          */
1069         switch (request->request.opcode) {
1070             case PROTOCOL_BINARY_CMD_SET_PARAM:
1071             case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1072             case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1073             case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1074             case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1075             case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1076             case PROTOCOL_BINARY_CMD_COMPACT_DB:
1077             {
1078                 if (h->getEngineSpecific(cookie) == NULL) {
1079                     uint64_t cas = ntohll(request->request.cas);
1080                     if (!h->validateSessionCas(cas)) {
1081                         const std::string message("Invalid session token");
1082                         return sendResponse(response, NULL, 0, NULL, 0,
1083                                             message.c_str(), message.length(),
1084                                             PROTOCOL_BINARY_RAW_BYTES,
1085                                             PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1086                                             cas, cookie);
1087                     }
1088                 }
1089                 break;
1090             }
1091             default:
1092                 break;
1093         }
1094
1095         switch (request->request.opcode) {
1096         case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1097             return h->getAllVBucketSequenceNumbers(cookie, response);
1098
1099         case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1100             {
1101                 BlockTimer timer(&stats.getVbucketCmdHisto);
1102                 rv = getVBucket(h, cookie, request, response);
1103                 return rv;
1104             }
1105         case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1106             {
1107                 BlockTimer timer(&stats.delVbucketCmdHisto);
1108                 rv = delVBucket(h, cookie, request, response);
1109                 if (rv != ENGINE_EWOULDBLOCK) {
1110                     h->decrementSessionCtr();
1111                     h->storeEngineSpecific(cookie, NULL);
1112                 }
1113                 return rv;
1114             }
1115         case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1116             {
1117                 BlockTimer timer(&stats.setVbucketCmdHisto);
1118                 rv = setVBucket(h, cookie, request, response);
1119                 h->decrementSessionCtr();
1120                 return rv;
1121             }
1122         case PROTOCOL_BINARY_CMD_TOUCH:
1123         case PROTOCOL_BINARY_CMD_GAT:
1124         case PROTOCOL_BINARY_CMD_GATQ:
1125             {
1126                 rv = h->touch(cookie, request, response);
1127                 return rv;
1128             }
1129         case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1130             res = stopFlusher(h, &msg, &msg_size);
1131             break;
1132         case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1133             res = startFlusher(h, &msg, &msg_size);
1134             break;
1135         case PROTOCOL_BINARY_CMD_SET_PARAM:
1136             res = setParam(h,
1137                   reinterpret_cast<protocol_binary_request_set_param*>(request),
1138                             &msg, &msg_size);
1139             h->decrementSessionCtr();
1140             break;
1141         case PROTOCOL_BINARY_CMD_EVICT_KEY:
1142             res = evictKey(h, request, &msg, &msg_size);
1143             break;
1144         case PROTOCOL_BINARY_CMD_GET_LOCKED:
1145             rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1146             if (rv == ENGINE_EWOULDBLOCK) {
1147                 // we dont have the value for the item yet
1148                 return rv;
1149             }
1150             break;
1151         case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
1152             res = unlockKey(h, request, &msg, &msg_size);
1153             break;
1154         case PROTOCOL_BINARY_CMD_OBSERVE:
1155             return h->observe(cookie, request, response);
1156         case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1157             {
1158                 rv = h->deregisterTapClient(cookie, request, response);
1159                 h->decrementSessionCtr();
1160                 return rv;
1161             }
1162         case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN:
1163             {
1164                 rv = h->resetReplicationChain(cookie, request, response);
1165                 return rv;
1166             }
1167         case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1168             {
1169                 rv = h->changeTapVBFilter(cookie, request, response);
1170                 h->decrementSessionCtr();
1171                 return rv;
1172             }
1173         case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1174         case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1175         case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE:
1176             {
1177                 rv = h->handleCheckpointCmds(cookie, request, response);
1178                 return rv;
1179             }
1180         case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE:
1181             {
1182                 rv = h->handleSeqnoCmds(cookie, request, response);
1183                 return rv;
1184             }
1185         case PROTOCOL_BINARY_CMD_GET_META:
1186         case PROTOCOL_BINARY_CMD_GETQ_META:
1187             {
1188                 rv = h->getMeta(cookie,
1189                         reinterpret_cast<protocol_binary_request_get_meta*>
1190                                                           (request), response);
1191                 return rv;
1192             }
1193         case PROTOCOL_BINARY_CMD_SET_WITH_META:
1194         case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1195         case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1196         case PROTOCOL_BINARY_CMD_ADDQ_WITH_META:
1197             {
1198                 rv = h->setWithMeta(cookie,
1199                      reinterpret_cast<protocol_binary_request_set_with_meta*>
1200                                                           (request), response);
1201                 return rv;
1202             }
1203         case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1204         case PROTOCOL_BINARY_CMD_DELQ_WITH_META:
1205             {
1206                 rv = h->deleteWithMeta(cookie,
1207                     reinterpret_cast<protocol_binary_request_delete_with_meta*>
1208                                                           (request), response);
1209                 return rv;
1210             }
1211         case PROTOCOL_BINARY_CMD_RETURN_META:
1212             {
1213                 return h->returnMeta(cookie,
1214                 reinterpret_cast<protocol_binary_request_return_meta*>
1215                                                           (request), response);
1216             }
1217         case PROTOCOL_BINARY_CMD_GET_REPLICA:
1218             rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1219             if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1220                 return rv;
1221             }
1222             break;
1223         case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1224         case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC:
1225             {
1226                 rv = h->handleTrafficControlCmd(cookie, request, response);
1227                 return rv;
1228             }
1229         case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1230             {
1231                 rv = h->setClusterConfig(cookie,
1232                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1233                                                           (request), response);
1234                 h->decrementSessionCtr();
1235                 return rv;
1236             }
1237         case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1238             return h->getClusterConfig(cookie,
1239                reinterpret_cast<protocol_binary_request_get_cluster_config*>
1240                                                           (request), response);
1241         case PROTOCOL_BINARY_CMD_COMPACT_DB:
1242             {
1243                 rv = compactDB(h, cookie,
1244                                (protocol_binary_request_compact_db*)(request),
1245                                response);
1246                 if (rv != ENGINE_EWOULDBLOCK) {
1247                     h->decrementSessionCtr();
1248                     h->storeEngineSpecific(cookie, NULL);
1249                 }
1250                 return rv;
1251             }
1252         case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
1253             if (request->request.extlen != 0 ||
1254                 request->request.keylen != 0 ||
1255                 request->request.bodylen != 0) {
1256                 return ENGINE_EINVAL;
1257             }
1258
1259             return h->getRandomKey(cookie, response);
1260         case CMD_GET_KEYS:
1261             return h->getAllKeys(cookie,
1262                reinterpret_cast<protocol_binary_request_get_keys*>(request),
1263                                                                    response);
1264         }
1265
1266         // Send a special response for getl since we don't want to send the key
1267         if (itm && request->request.opcode == PROTOCOL_BINARY_CMD_GET_LOCKED) {
1268             uint32_t flags = itm->getFlags();
1269             rv = sendResponse(response, NULL, 0, (const void *)&flags,
1270                               sizeof(uint32_t),
1271                               static_cast<const void *>(itm->getData()),
1272                               itm->getNBytes(), itm->getDataType(),
1273                               static_cast<uint16_t>(res), itm->getCas(),
1274                               cookie);
1275             delete itm;
1276         } else if (itm) {
1277             const std::string &key  = itm->getKey();
1278             uint32_t flags = itm->getFlags();
1279             rv = sendResponse(response, static_cast<const void *>(key.data()),
1280                               itm->getNKey(),
1281                               (const void *)&flags, sizeof(uint32_t),
1282                               static_cast<const void *>(itm->getData()),
1283                               itm->getNBytes(), itm->getDataType(),
1284                               static_cast<uint16_t>(res), itm->getCas(),
1285                               cookie);
1286             delete itm;
1287         } else  if (rv == ENGINE_NOT_MY_VBUCKET) {
1288             LockHolder lh(h->clusterConfig.lock);
1289             return sendResponse(response, NULL, 0, NULL, 0,
1290                                 h->clusterConfig.config,
1291                                 h->clusterConfig.len,
1292                                 PROTOCOL_BINARY_RAW_BYTES,
1293                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1294                                 cookie);
1295         } else {
1296             msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1297             rv = sendResponse(response, NULL, 0, NULL, 0,
1298                               msg, static_cast<uint16_t>(msg_size),
1299                               PROTOCOL_BINARY_RAW_BYTES,
1300                               static_cast<uint16_t>(res), 0, cookie);
1301
1302         }
1303         return rv;
1304     }
1305
1306     static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1307                                                const void* cookie,
1308                                                protocol_binary_request_header
1309                                                                       *request,
1310                                                ADD_RESPONSE response)
1311     {
1312         ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1313                                                            cookie,
1314                                                            request, response);
1315         releaseHandle(handle);
1316         return err_code;
1317     }
1318
1319     static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1320                               item *itm, uint64_t cas) {
1321         static_cast<Item*>(itm)->setCas(cas);
1322     }
1323
1324     static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1325                                           const void *cookie,
1326                                           void *engine_specific,
1327                                           uint16_t nengine,
1328                                           uint8_t ttl,
1329                                           uint16_t tap_flags,
1330                                           tap_event_t tap_event,
1331                                           uint32_t tap_seqno,
1332                                           const void *key,
1333                                           size_t nkey,
1334                                           uint32_t flags,
1335                                           uint32_t exptime,
1336                                           uint64_t cas,
1337                                           uint8_t datatype,
1338                                           const void *data,
1339                                           size_t ndata,
1340                                           uint16_t vbucket)
1341     {
1342         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1343             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1344                     " (TapNotify)");
1345             return ENGINE_EINVAL;
1346         }
1347         ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1348                                                         engine_specific,
1349                                                         nengine, ttl,
1350                                                         tap_flags,
1351                                                         (uint16_t)tap_event,
1352                                                         tap_seqno,
1353                                                         key, nkey, flags,
1354                                                         exptime, cas,
1355                                                         datatype, data,
1356                                                         ndata, vbucket);
1357         releaseHandle(handle);
1358         return err_code;
1359     }
1360
1361     static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1362                                       const void *cookie, item **itm,
1363                                       void **es, uint16_t *nes, uint8_t *ttl,
1364                                       uint16_t *flags, uint32_t *seqno,
1365                                       uint16_t *vbucket) {
1366         uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1367                                                              nes, ttl,
1368                                                              flags, seqno,
1369                                                              vbucket);
1370         releaseHandle(handle);
1371         return static_cast<tap_event_t>(tap_event);
1372     }
1373
1374     static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1375                                           const void* cookie,
1376                                           const void* client,
1377                                           size_t nclient,
1378                                           uint32_t flags,
1379                                           const void* userdata,
1380                                           size_t nuserdata)
1381     {
1382         EventuallyPersistentEngine *h = getHandle(handle);
1383         TAP_ITERATOR iterator = NULL;
1384         {
1385             std::string c(static_cast<const char*>(client), nclient);
1386             // Figure out what we want from the userdata before adding it to
1387             // the API to the handle
1388             if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1389                 iterator = EvpTapIterator;
1390             }
1391         }
1392         releaseHandle(handle);
1393         return iterator;
1394     }
1395
1396
1397     static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1398                                        const void* cookie,
1399                                        struct dcp_message_producers *producers)
1400     {
1401         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1402         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1403         if (conn) {
1404             errCode = conn->step(producers);
1405         }
1406         releaseHandle(handle);
1407         return errCode;
1408     }
1409
1410
1411     static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1412                                         const void* cookie,
1413                                         uint32_t opaque,
1414                                         uint32_t seqno,
1415                                         uint32_t flags,
1416                                         void *name,
1417                                         uint16_t nname)
1418     {
1419         ENGINE_ERROR_CODE errCode;
1420         errCode = getHandle(handle)->dcpOpen(cookie, opaque, seqno, flags,
1421                                              name, nname);
1422         releaseHandle(handle);
1423         return errCode;
1424     }
1425
1426     static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1427                                              const void* cookie,
1428                                              uint32_t opaque,
1429                                              uint16_t vbucket,
1430                                              uint32_t flags)
1431     {
1432         ENGINE_ERROR_CODE errCode = getHandle(handle)->dcpAddStream(cookie,
1433                                                                     opaque,
1434                                                                     vbucket,
1435                                                                     flags);
1436         releaseHandle(handle);
1437         return errCode;
1438     }
1439
1440     static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1441                                                const void* cookie,
1442                                                uint32_t opaque,
1443                                                uint16_t vbucket)
1444     {
1445         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1446         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1447         if (conn) {
1448             errCode = conn->closeStream(opaque, vbucket);
1449         }
1450         releaseHandle(handle);
1451         return errCode;
1452     }
1453
1454
1455     static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1456                                              const void* cookie,
1457                                              uint32_t flags,
1458                                              uint32_t opaque,
1459                                              uint16_t vbucket,
1460                                              uint64_t startSeqno,
1461                                              uint64_t endSeqno,
1462                                              uint64_t vbucketUuid,
1463                                              uint64_t snapStartSeqno,
1464                                              uint64_t snapEndSeqno,
1465                                              uint64_t *rollbackSeqno,
1466                                              dcp_add_failover_log callback)
1467     {
1468         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1469         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1470         if (conn) {
1471             errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1472                                           endSeqno, vbucketUuid, snapStartSeqno,
1473                                           snapEndSeqno, rollbackSeqno, callback);
1474         }
1475         releaseHandle(handle);
1476         return errCode;
1477     }
1478
1479     static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1480                                                  const void* cookie,
1481                                                  uint32_t opaque,
1482                                                  uint16_t vbucket,
1483                                                  dcp_add_failover_log callback)
1484     {
1485         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1486         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1487         if (conn) {
1488             errCode = conn->getFailoverLog(opaque, vbucket, callback);
1489         }
1490         releaseHandle(handle);
1491         return errCode;
1492     }
1493
1494
1495     static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1496                                              const void* cookie,
1497                                              uint32_t opaque,
1498                                              uint16_t vbucket,
1499                                              uint32_t flags)
1500     {
1501         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1502         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1503         if (conn) {
1504             errCode = conn->streamEnd(opaque, vbucket, flags);
1505         }
1506         releaseHandle(handle);
1507         return errCode;
1508     }
1509
1510
1511     static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1512                                                   const void* cookie,
1513                                                   uint32_t opaque,
1514                                                   uint16_t vbucket,
1515                                                   uint64_t start_seqno,
1516                                                   uint64_t end_seqno,
1517                                                   uint32_t flags)
1518     {
1519         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1520         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1521         if (conn) {
1522             errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1523                                            end_seqno, flags);
1524         }
1525         releaseHandle(handle);
1526         return errCode;
1527     }
1528
1529     static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1530                                             const void* cookie,
1531                                             uint32_t opaque,
1532                                             const void *key,
1533                                             uint16_t nkey,
1534                                             const void *value,
1535                                             uint32_t nvalue,
1536                                             uint64_t cas,
1537                                             uint16_t vbucket,
1538                                             uint32_t flags,
1539                                             uint8_t datatype,
1540                                             uint64_t bySeqno,
1541                                             uint64_t revSeqno,
1542                                             uint32_t expiration,
1543                                             uint32_t lockTime,
1544                                             const void *meta,
1545                                             uint16_t nmeta,
1546                                             uint8_t nru)
1547     {
1548         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1549             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1550                     " (DCPMutation)");
1551             return ENGINE_EINVAL;
1552         }
1553         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1554         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1555         if (conn) {
1556             errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1557                                      vbucket, flags, datatype, lockTime,
1558                                      bySeqno, revSeqno, expiration,
1559                                      nru, meta, nmeta);
1560         }
1561         releaseHandle(handle);
1562         return errCode;
1563     }
1564
1565     static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1566                                             const void* cookie,
1567                                             uint32_t opaque,
1568                                             const void *key,
1569                                             uint16_t nkey,
1570                                             uint64_t cas,
1571                                             uint16_t vbucket,
1572                                             uint64_t bySeqno,
1573                                             uint64_t revSeqno,
1574                                             const void *meta,
1575                                             uint16_t nmeta)
1576     {
1577         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1578         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1579         if (conn) {
1580             errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1581                                      revSeqno, meta, nmeta);
1582         }
1583         releaseHandle(handle);
1584         return errCode;
1585     }
1586
1587     static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1588                                               const void* cookie,
1589                                               uint32_t opaque,
1590                                               const void *key,
1591                                               uint16_t nkey,
1592                                               uint64_t cas,
1593                                               uint16_t vbucket,
1594                                               uint64_t bySeqno,
1595                                               uint64_t revSeqno,
1596                                               const void *meta,
1597                                               uint16_t nmeta)
1598     {
1599         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1600         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1601         if (conn) {
1602             errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1603                                        revSeqno, meta, nmeta);
1604         }
1605         releaseHandle(handle);
1606         return errCode;
1607     }
1608
1609     static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1610                                          const void* cookie,
1611                                          uint32_t opaque,
1612                                          uint16_t vbucket)
1613     {
1614         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1615         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1616         if (conn) {
1617             errCode = conn->flushall(opaque, vbucket);
1618         }
1619         releaseHandle(handle);
1620         return errCode;
1621     }
1622
1623     static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1624                                                    const void* cookie,
1625                                                    uint32_t opaque,
1626                                                    uint16_t vbucket,
1627                                                    vbucket_state_t state)
1628     {
1629         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1630         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1631         if (conn) {
1632             errCode = conn->setVBucketState(opaque, vbucket, state);
1633         }
1634         releaseHandle(handle);
1635         return errCode;
1636     }
1637
1638     static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1639                                         const void* cookie,
1640                                         uint32_t opaque) {
1641         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1642         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1643         if (conn) {
1644             errCode = conn->noop(opaque);
1645         }
1646         releaseHandle(handle);
1647         return errCode;
1648     }
1649
1650     static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1651                                                          const void* cookie,
1652                                                          uint32_t opaque,
1653                                                          uint16_t vbucket,
1654                                                          uint32_t buffer_bytes) {
1655         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1656         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1657         if (conn) {
1658             errCode = conn->bufferAcknowledgement(opaque, vbucket,
1659                                                   buffer_bytes);
1660         }
1661         releaseHandle(handle);
1662         return errCode;
1663     }
1664
1665     static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1666                                            const void* cookie,
1667                                            uint32_t opaque,
1668                                            const void *key,
1669                                            uint16_t nkey,
1670                                            const void *value,
1671                                            uint32_t nvalue) {
1672         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1673         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1674         if (conn) {
1675             errCode = conn->control(opaque, key, nkey, value, nvalue);
1676         }
1677         releaseHandle(handle);
1678         return errCode;
1679     }
1680
1681     static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1682                                      const void* cookie,
1683                                      protocol_binary_response_header *response)
1684     {
1685         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1686         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1687         if (conn) {
1688             errCode = conn->handleResponse(response);
1689         }
1690         releaseHandle(handle);
1691         return errCode;
1692     }
1693
1694     static void EvpHandleDisconnect(const void *cookie,
1695                                     ENGINE_EVENT_TYPE type,
1696                                     const void *event_data,
1697                                     const void *cb_data)
1698     {
1699         cb_assert(type == ON_DISCONNECT);
1700         cb_assert(event_data == NULL);
1701         void *c = const_cast<void*>(cb_data);
1702         getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1703         releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1704     }
1705
1706
1707     /**
1708      * The only public interface to the eventually persistance engine.
1709      * Allocate a new instance and initialize it
1710      * @param interface the highest interface the server supports (we only
1711      *                  support interface 1)
1712      * @param get_server_api callback function to get the server exported API
1713      *                  functions
1714      * @param handle Where to return the new instance
1715      * @return ENGINE_SUCCESS on success
1716      */
1717     ENGINE_ERROR_CODE create_instance(uint64_t interface,
1718                                       GET_SERVER_API get_server_api,
1719                                       ENGINE_HANDLE **handle)
1720     {
1721         SERVER_HANDLE_V1 *api = get_server_api();
1722         if (interface != 1 || api == NULL) {
1723             return ENGINE_ENOTSUP;
1724         }
1725
1726         hooksApi = api->alloc_hooks;
1727         loggerApi = api->log;
1728         MemoryTracker::getInstance();
1729         ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1730
1731         AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1732
1733         ObjectRegistry::setStats(inital_tracking);
1734         EventuallyPersistentEngine *engine;
1735         engine = new EventuallyPersistentEngine(get_server_api);
1736         ObjectRegistry::setStats(NULL);
1737
1738         if (engine == NULL) {
1739             return ENGINE_ENOMEM;
1740         }
1741
1742         if (MemoryTracker::trackingMemoryAllocations()) {
1743             engine->getEpStats().memoryTrackerEnabled.store(true);
1744             engine->getEpStats().totalMemory.store(inital_tracking->load());
1745         }
1746         delete inital_tracking;
1747
1748         ep_current_time = api->core->get_current_time;
1749         ep_abs_time = api->core->abstime;
1750         ep_reltime = api->core->realtime;
1751
1752         *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1753
1754         return ENGINE_SUCCESS;
1755     }
1756
1757     static bool EvpGetItemInfo(ENGINE_HANDLE *, const void *,
1758                                const item* itm, item_info *itm_info)
1759     {
1760         const Item *it = reinterpret_cast<const Item*>(itm);
1761         if (itm_info->nvalue < 1) {
1762             return false;
1763         }
1764         itm_info->cas = it->getCas();
1765         itm_info->exptime = it->getExptime();
1766         itm_info->nbytes = it->getNBytes();
1767         itm_info->datatype = it->getDataType();
1768         itm_info->flags = it->getFlags();
1769         itm_info->clsid = 0;
1770         itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1771         itm_info->nvalue = 1;
1772         itm_info->key = it->getKey().c_str();
1773         itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1774         itm_info->value[0].iov_len = it->getNBytes();
1775         return true;
1776     }
1777
1778     static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1779                                item* itm, const item_info *itm_info)
1780     {
1781         Item *it = reinterpret_cast<Item*>(itm);
1782         if (!it) {
1783             return false;
1784         }
1785         it->setDataType(itm_info->datatype);
1786         return true;
1787     }
1788
1789     static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1790                                                  const void* cookie,
1791                                                  engine_get_vb_map_cb callback)
1792     {
1793         EventuallyPersistentEngine *h = getHandle(handle);
1794         LockHolder lh(h->clusterConfig.lock);
1795         uint8_t *config = h->clusterConfig.config;
1796         uint32_t len = h->clusterConfig.len;
1797         releaseHandle(handle);
1798         return callback(cookie, config, len);
1799     }
1800
1801 } // C linkage
1802
1803 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1804     char buffer[2048];
1805
1806     if (loggerApi != NULL) {
1807         EXTENSION_LOGGER_DESCRIPTOR* logger =
1808             (EXTENSION_LOGGER_DESCRIPTOR*)loggerApi->get_logger();
1809
1810         if (loggerApi->get_level() <= severity) {
1811             EventuallyPersistentEngine *engine = ObjectRegistry::onSwitchThread(NULL, true);
1812             va_list va;
1813             va_start(va, fmt);
1814             vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1815             if (engine) {
1816                 logger->log(severity, NULL, "(%s) %s", engine->getName(),
1817                             buffer);
1818             } else {
1819                 logger->log(severity, NULL, "(No Engine) %s", buffer);
1820             }
1821             va_end(va);
1822             ObjectRegistry::onSwitchThread(engine);
1823         }
1824     }
1825 }
1826
1827 ALLOCATOR_HOOKS_API *getHooksApi(void) {
1828     return hooksApi;
1829 }
1830
1831 EventuallyPersistentEngine::EventuallyPersistentEngine(
1832                                     GET_SERVER_API get_server_api) :
1833     clusterConfig(), epstore(NULL), workload(NULL),
1834     workloadPriority(NO_BUCKET_PRIORITY),
1835     tapThrottle(NULL), getServerApiFunc(get_server_api),
1836     dcpConnMap_(NULL), tapConnMap(NULL), tapConfig(NULL), checkpointConfig(NULL),
1837     trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1838 {
1839     interface.interface = 1;
1840     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1841     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1842     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1843     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1844     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1845     ENGINE_HANDLE_V1::release = EvpItemRelease;
1846     ENGINE_HANDLE_V1::get = EvpGet;
1847     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1848     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1849     ENGINE_HANDLE_V1::store = EvpStore;
1850     ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1851     ENGINE_HANDLE_V1::flush = EvpFlush;
1852     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1853     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1854     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1855     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1856     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1857     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1858     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1859     ENGINE_HANDLE_V1::get_stats_struct = NULL;
1860     ENGINE_HANDLE_V1::errinfo = NULL;
1861     ENGINE_HANDLE_V1::aggregate_stats = NULL;
1862
1863
1864     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1865     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1866     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1867     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1868     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1869     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1870     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1871     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1872     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1873     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1874     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1875     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1876     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1877     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1878     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1879     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1880     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1881
1882     serverApi = getServerApiFunc();
1883     memset(&info, 0, sizeof(info));
1884     info.info.description = "EP engine v" VERSION;
1885     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1886     info.info.features[info.info.num_features++].feature =
1887                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1888     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1889     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1890
1891 }
1892
1893 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1894 {
1895     EventuallyPersistentEngine *epe =
1896                                     ObjectRegistry::onSwitchThread(NULL, true);
1897     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1898     ObjectRegistry::onSwitchThread(epe);
1899     return rv;
1900 }
1901
1902 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1903 {
1904     EventuallyPersistentEngine *epe =
1905                                     ObjectRegistry::onSwitchThread(NULL, true);
1906     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1907     ObjectRegistry::onSwitchThread(epe);
1908     return rv;
1909 }
1910
1911 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1912                                                         EVENT_CALLBACK cb,
1913                                                         const void *cb_data) {
1914     EventuallyPersistentEngine *epe =
1915                                     ObjectRegistry::onSwitchThread(NULL, true);
1916     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1917     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1918                             type, cb, cb_data);
1919     ObjectRegistry::onSwitchThread(epe);
1920 }
1921
1922 /**
1923  * A configuration value changed listener that responds to ep-engine
1924  * parameter changes by invoking engine-specific methods on
1925  * configuration change events.
1926  */
1927 class EpEngineValueChangeListener : public ValueChangedListener {
1928 public:
1929     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1930         // EMPTY
1931     }
1932
1933     virtual void sizeValueChanged(const std::string &key, size_t value) {
1934         if (key.compare("getl_max_timeout") == 0) {
1935             engine.setGetlMaxTimeout(value);
1936         } else if (key.compare("getl_default_timeout") == 0) {
1937             engine.setGetlDefaultTimeout(value);
1938         } else if (key.compare("max_item_size") == 0) {
1939             engine.setMaxItemSize(value);
1940         }
1941     }
1942
1943     virtual void booleanValueChanged(const std::string &key, bool value) {
1944         if (key.compare("flushall_enabled") == 0) {
1945             engine.setFlushAll(value);
1946         }
1947     }
1948 private:
1949     EventuallyPersistentEngine &engine;
1950 };
1951
1952
1953
1954 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1955     resetStats();
1956     if (config != NULL) {
1957         if (!configuration.parseConfiguration(config, serverApi)) {
1958             return ENGINE_FAILED;
1959         }
1960     }
1961
1962     name = configuration.getCouchBucket();
1963     maxFailoverEntries = configuration.getMaxFailoverEntries();
1964
1965     // Start updating the variables from the config!
1966     HashTable::setDefaultNumBuckets(configuration.getHtSize());
1967     HashTable::setDefaultNumLocks(configuration.getHtLocks());
1968     StoredValue::setMutationMemoryThreshold(
1969                                       configuration.getMutationMemThreshold());
1970
1971     if (configuration.getMaxSize() == 0) {
1972         configuration.setMaxSize(std::numeric_limits<size_t>::max());
1973     }
1974
1975     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1976         configuration.setMemLowWat(percentOf(
1977                                             configuration.getMaxSize(), 0.75));
1978     }
1979
1980     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1981         configuration.setMemHighWat(percentOf(
1982                                             configuration.getMaxSize(), 0.85));
1983     }
1984
1985     maxItemSize = configuration.getMaxItemSize();
1986     configuration.addValueChangedListener("max_item_size",
1987                                        new EpEngineValueChangeListener(*this));
1988
1989     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1990     configuration.addValueChangedListener("getl_default_timeout",
1991                                        new EpEngineValueChangeListener(*this));
1992     getlMaxTimeout = configuration.getGetlMaxTimeout();
1993     configuration.addValueChangedListener("getl_max_timeout",
1994                                        new EpEngineValueChangeListener(*this));
1995
1996     flushAllEnabled = configuration.isFlushallEnabled();
1997     configuration.addValueChangedListener("flushall_enabled",
1998                                        new EpEngineValueChangeListener(*this));
1999
2000     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2001                                   configuration.getMaxNumShards());
2002     if ((unsigned int)workload->getNumShards() >
2003                                               configuration.getMaxVbuckets()) {
2004         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2005             "equal or less than max number of vbuckets");
2006         return ENGINE_FAILED;
2007     }
2008
2009     dcpConnMap_ = new DcpConnMap(*this);
2010     tapConnMap = new TapConnMap(*this);
2011     tapConfig = new TapConfig(*this);
2012     tapThrottle = new TapThrottle(configuration, stats);
2013     TapConfig::addConfigChangeListener(*this);
2014
2015     checkpointConfig = new CheckpointConfig(*this);
2016     CheckpointConfig::addConfigChangeListener(*this);
2017
2018     epstore = new EventuallyPersistentStore(*this);
2019     if (epstore == NULL) {
2020         return ENGINE_ENOMEM;
2021     }
2022
2023     // Register the callback
2024     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2025
2026     // Complete the initialization of the ep-store
2027     if (!epstore->initialize()) {
2028         return ENGINE_FAILED;
2029     }
2030
2031     if(configuration.isDataTrafficEnabled()) {
2032         enableTraffic(true);
2033     }
2034
2035     tapConnMap->initialize(TAP_CONN_NOTIFIER);
2036     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2037
2038     // record engine initialization time
2039     startupTime.store(ep_real_time());
2040
2041     LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
2042
2043     return ENGINE_SUCCESS;
2044 }
2045
2046 void EventuallyPersistentEngine::destroy(bool force) {
2047     stats.forceShutdown = force;
2048     stats.isShutdown = true;
2049
2050     if (epstore) {
2051         epstore->snapshotStats();
2052     }
2053     if (tapConnMap) {
2054         tapConnMap->shutdownAllConnections();
2055     }
2056     if (dcpConnMap_) {
2057         dcpConnMap_->shutdownAllConnections();
2058     }
2059 }
2060
2061 class FlushAllTask : public GlobalTask {
2062 public:
2063     FlushAllTask(EventuallyPersistentStore *st, TapConnMap &tcm, double when)
2064         : GlobalTask(&st->getEPEngine(), Priority::FlushAllPriority, when,
2065                      false), epstore(st), tapConnMap(tcm) { }
2066
2067     bool run(void) {
2068         epstore->reset();
2069         tapConnMap.addFlushEvent();
2070         return false;
2071     }
2072
2073     std::string getDescription() {
2074         return std::string("Performing flush.");
2075     }
2076
2077 private:
2078     EventuallyPersistentStore *epstore;
2079     TapConnMap                &tapConnMap;
2080 };
2081
2082 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *, time_t when){
2083     if (!flushAllEnabled) {
2084         return ENGINE_ENOTSUP;
2085     }
2086
2087     if (isDegradedMode()) {
2088         return ENGINE_TMPFAIL;
2089     }
2090
2091     if (when == 0) {
2092         epstore->reset();
2093         tapConnMap->addFlushEvent();
2094     } else {
2095         ExTask flushTask = new FlushAllTask(epstore, *tapConnMap,
2096                 static_cast<double>(when));
2097         ExecutorPool::get()->schedule(flushTask, NONIO_TASK_IDX);
2098     }
2099
2100     return ENGINE_SUCCESS;
2101 }
2102
2103 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2104                                                     item* itm,
2105                                                     uint64_t *cas,
2106                                                     ENGINE_STORE_OPERATION
2107                                                                      operation,
2108                                                     uint16_t vbucket) {
2109     BlockTimer timer(&stats.storeCmdHisto);
2110     ENGINE_ERROR_CODE ret;
2111     Item *it = static_cast<Item*>(itm);
2112     item *i = NULL;
2113
2114     it->setVBucketId(vbucket);
2115
2116     switch (operation) {
2117     case OPERATION_CAS:
2118         if (it->getCas() == 0) {
2119             // Using a cas command with a cas wildcard doesn't make sense
2120             ret = ENGINE_NOT_STORED;
2121             break;
2122         }
2123         // FALLTHROUGH
2124     case OPERATION_SET:
2125         if (isDegradedMode()) {
2126             return ENGINE_TMPFAIL;
2127         }
2128         ret = epstore->set(*it, cookie);
2129         if (ret == ENGINE_SUCCESS) {
2130             *cas = it->getCas();
2131         }
2132
2133         break;
2134
2135     case OPERATION_ADD:
2136         if (isDegradedMode()) {
2137             return ENGINE_TMPFAIL;
2138         }
2139
2140         if (it->getCas() != 0) {
2141             // Adding an item with a cas value doesn't really make sense...
2142             return ENGINE_KEY_EEXISTS;
2143         }
2144
2145         ret = epstore->add(*it, cookie);
2146         if (ret == ENGINE_SUCCESS) {
2147             *cas = it->getCas();
2148         }
2149         break;
2150
2151     case OPERATION_REPLACE:
2152         ret = epstore->replace(*it, cookie);
2153         if (ret == ENGINE_SUCCESS) {
2154             *cas = it->getCas();
2155         }
2156         break;
2157
2158     case OPERATION_APPEND:
2159     case OPERATION_PREPEND:
2160         do {
2161             if ((ret = get(cookie, &i, it->getKey().c_str(),
2162                            it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2163                 Item *old = reinterpret_cast<Item*>(i);
2164
2165                 if (old->getCas() == (uint64_t) -1) {
2166                     // item is locked against updates
2167                     itemRelease(cookie, i);
2168                     return ENGINE_TMPFAIL;
2169                 }
2170
2171                 if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2172                     itemRelease(cookie, i);
2173                     return ENGINE_KEY_EEXISTS;
2174                 }
2175
2176                 if (operation == OPERATION_APPEND) {
2177                     ret = old->append(*it, maxItemSize);
2178                 } else {
2179                     ret = old->prepend(*it, maxItemSize);
2180                 }
2181
2182                 if (ret != ENGINE_SUCCESS) {
2183                     itemRelease(cookie, i);
2184                     if (ret == ENGINE_E2BIG) {
2185                         return ret;
2186                     } else {
2187                         return memoryCondition();
2188                     }
2189                 } else {
2190                     if (old->getDataType() == PROTOCOL_BINARY_DATATYPE_JSON) {
2191                         // Set the datatype of the new document to BINARY (0),
2192                         // as appending/prepending anything to JSON breaks the
2193                         // json data structure.
2194                         old->setDataType(PROTOCOL_BINARY_RAW_BYTES);
2195                     } else if (old->getDataType() ==
2196                                     PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
2197                         // Set the datatype of the new document to
2198                         // COMPRESSED_BINARY, as appending/prepending anything
2199                         // to JSON breaks the json data structure.
2200                         old->setDataType(PROTOCOL_BINARY_DATATYPE_COMPRESSED);
2201                     }
2202                 }
2203
2204                 ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2205                 itemRelease(cookie, i);
2206             }
2207         } while (ret == ENGINE_KEY_EEXISTS);
2208
2209         // Map the error code back to what memcacpable expects
2210         if (ret == ENGINE_KEY_ENOENT) {
2211             ret = ENGINE_NOT_STORED;
2212         }
2213         break;
2214
2215     default:
2216         ret = ENGINE_ENOTSUP;
2217     }
2218
2219     switch (ret) {
2220     case ENGINE_SUCCESS:
2221         ++stats.numOpsStore;
2222         break;
2223     case ENGINE_ENOMEM:
2224         ret = memoryCondition();
2225         break;
2226     case ENGINE_NOT_STORED:
2227     case ENGINE_NOT_MY_VBUCKET:
2228         if (isDegradedMode()) {
2229             return ENGINE_TMPFAIL;
2230         }
2231         break;
2232     default:
2233         break;
2234     }
2235
2236     return ret;
2237 }
2238
2239 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2240                                                            item **itm,
2241                                                            void **es,
2242                                                            uint16_t *nes,
2243                                                            uint8_t *ttl,
2244                                                            uint16_t *flags,
2245                                                            uint32_t *seqno,
2246                                                            uint16_t *vbucket,
2247                                                            TapProducer
2248                                                                    *connection,
2249                                                            bool &retry) {
2250     *es = NULL;
2251     *nes = 0;
2252     *ttl = (uint8_t)-1;
2253     *seqno = 0;
2254     *flags = 0;
2255     *vbucket = 0;
2256
2257     retry = false;
2258
2259     if (connection->shouldFlush()) {
2260         return TAP_FLUSH;
2261     }
2262
2263     if (connection->isTimeForNoop()) {
2264         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2265             connection->logHeader());
2266         return TAP_NOOP;
2267     }
2268
2269     if (connection->isSuspended() || connection->windowIsFull()) {
2270         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2271             " suspended state or its ack windows is full.\n",
2272             connection->logHeader());
2273         return TAP_PAUSE;
2274     }
2275
2276     uint16_t ret = TAP_PAUSE;
2277     VBucketEvent ev = connection->nextVBucketHighPriority();
2278     if (ev.event != TAP_PAUSE) {
2279         switch (ev.event) {
2280         case TAP_VBUCKET_SET:
2281             LOG(EXTENSION_LOG_WARNING,
2282                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2283                 connection->logHeader(), ev.vbucket,
2284                 VBucket::toString(ev.state));
2285             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2286             break;
2287         case TAP_OPAQUE:
2288             LOG(EXTENSION_LOG_WARNING,
2289                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2290                 connection->logHeader(),
2291                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2292                 ev.vbucket);
2293             connection->opaqueCommandCode = (uint32_t) ev.state;
2294             *vbucket = ev.vbucket;
2295             *es = &connection->opaqueCommandCode;
2296             *nes = sizeof(connection->opaqueCommandCode);
2297             break;
2298         default:
2299             LOG(EXTENSION_LOG_WARNING,
2300                 "%s Unknown VBucketEvent message type %d\n",
2301                 connection->logHeader(), ev.event);
2302             abort();
2303         }
2304         return ev.event;
2305     }
2306
2307     if (connection->waitForOpaqueMsgAck()) {
2308         return TAP_PAUSE;
2309     }
2310
2311     VBucketFilter backFillVBFilter;
2312     if (connection->runBackfill(backFillVBFilter)) {
2313         queueBackfill(backFillVBFilter, connection);
2314     }
2315
2316     uint8_t nru = INITIAL_NRU_VALUE;
2317     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2318     switch (ret) {
2319     case TAP_CHECKPOINT_START:
2320     case TAP_CHECKPOINT_END:
2321     case TAP_MUTATION:
2322     case TAP_DELETION:
2323         *itm = it;
2324         if (ret == TAP_MUTATION) {
2325             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2326                                                        it->getRevSeqno(), nru);
2327             *es = connection->specificData;
2328         } else if (ret == TAP_DELETION) {
2329             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2330                                                        it->getRevSeqno());
2331             *es = connection->specificData;
2332         } else if (ret == TAP_CHECKPOINT_START) {
2333             // Send the current value of the max deleted seqno
2334             RCPtr<VBucket> vb = getVBucket(*vbucket);
2335             if (!vb) {
2336                 retry = true;
2337                 return TAP_NOOP;
2338             }
2339             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2340                                                vb->ht.getMaxDeletedRevSeqno());
2341             *es = connection->specificData;
2342         }
2343         break;
2344     case TAP_NOOP:
2345         retry = true;
2346         break;
2347     default:
2348         break;
2349     }
2350
2351     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2352         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2353         if (vbev.event == TAP_VBUCKET_SET) {
2354             LOG(EXTENSION_LOG_WARNING,
2355                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2356                 connection->logHeader(), vbev.vbucket,
2357                 VBucket::toString(vbev.state));
2358             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2359         }
2360         ret = vbev.event;
2361     }
2362
2363     return ret;
2364 }
2365
2366 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2367                                                   item **itm,
2368                                                   void **es,
2369                                                   uint16_t *nes,
2370                                                   uint8_t *ttl,
2371                                                   uint16_t *flags,
2372                                                   uint32_t *seqno,
2373                                                   uint16_t *vbucket) {
2374     TapProducer *connection = getTapProducer(cookie);
2375     if (!connection) {
2376         LOG(EXTENSION_LOG_WARNING,
2377             "Failed to lookup TAP connection.. Disconnecting\n");
2378         return TAP_DISCONNECT;
2379     }
2380
2381     connection->setPaused(false);
2382
2383     bool retry = false;
2384     uint16_t ret;
2385
2386     connection->setLastWalkTime();
2387     do {
2388         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2389                              seqno, vbucket, connection, retry);
2390     } while (retry);
2391
2392     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2393         connection->lastMsgTime = ep_current_time();
2394         if (ret == TAP_NOOP) {
2395             *seqno = 0;
2396         } else {
2397             ++stats.numTapFetched;
2398             *seqno = connection->getSeqno();
2399             if (connection->requestAck(ret, *vbucket)) {
2400                 *flags = TAP_FLAG_ACK;
2401                 connection->seqnoAckRequested = *seqno;
2402             }
2403
2404             if (ret == TAP_MUTATION) {
2405                 if (connection->haveFlagByteorderSupport()) {
2406                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2407                 }
2408             }
2409         }
2410     } else {
2411         connection->setPaused(true);
2412         connection->setNotifySent(false);
2413     }
2414
2415     return ret;
2416 }
2417
2418 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2419                                                 std::string &client,
2420                                                 uint32_t flags,
2421                                                 const void *userdata,
2422                                                 size_t nuserdata) {
2423     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2424         return false;
2425     }
2426
2427     std::string tapName = "eq_tapq:";
2428     if (client.length() == 0) {
2429         tapName.assign(ConnHandler::getAnonName());
2430     } else {
2431         tapName.append(client);
2432     }
2433
2434     // Decoding the userdata section of the packet and update the filters
2435     const char *ptr = static_cast<const char*>(userdata);
2436     uint64_t backfillAge = 0;
2437     std::vector<uint16_t> vbuckets;
2438     std::map<uint16_t, uint64_t> lastCheckpointIds;
2439
2440     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2441         if (nuserdata < sizeof(backfillAge)) {
2442             LOG(EXTENSION_LOG_WARNING,
2443                 "Backfill age is missing. Reject connection request from %s\n",
2444                 tapName.c_str());
2445             return false;
2446         }
2447         // use memcpy to avoid alignemt issues
2448         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2449         backfillAge = ntohll(backfillAge);
2450         nuserdata -= sizeof(backfillAge);
2451         ptr += sizeof(backfillAge);
2452     }
2453
2454     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2455         uint16_t nvbuckets;
2456         if (nuserdata < sizeof(nvbuckets)) {
2457             LOG(EXTENSION_LOG_WARNING,
2458             "Number of vbuckets is missing. Reject connection request from %s"
2459             "\n", tapName.c_str());
2460             return false;
2461         }
2462         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2463         nuserdata -= sizeof(nvbuckets);
2464         ptr += sizeof(nvbuckets);
2465         nvbuckets = ntohs(nvbuckets);
2466         if (nvbuckets > 0) {
2467             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2468                 LOG(EXTENSION_LOG_WARNING,
2469                 "# of vbuckets not matched. Reject connection request from %s"
2470                 "\n", tapName.c_str());
2471                 return false;
2472             }
2473             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2474                 uint16_t val;
2475                 memcpy(&val, ptr, sizeof(nvbuckets));
2476                 ptr += sizeof(uint16_t);
2477                 vbuckets.push_back(ntohs(val));
2478             }
2479             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2480         }
2481     }
2482
2483     if (flags & TAP_CONNECT_CHECKPOINT) {
2484         uint16_t nCheckpoints = 0;
2485         if (nuserdata >= sizeof(nCheckpoints)) {
2486             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2487             nuserdata -= sizeof(nCheckpoints);
2488             ptr += sizeof(nCheckpoints);
2489             nCheckpoints = ntohs(nCheckpoints);
2490         }
2491         if (nCheckpoints > 0) {
2492             if (nuserdata <
2493                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2494                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2495                     "Reject connection request from %s\n", tapName.c_str());
2496                 return false;
2497             }
2498             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2499                 uint16_t vbid;
2500                 uint64_t checkpointId;
2501                 memcpy(&vbid, ptr, sizeof(vbid));
2502                 ptr += sizeof(uint16_t);
2503                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2504                 ptr += sizeof(uint64_t);
2505                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2506             }
2507             nuserdata -=
2508                         ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2509         }
2510     }
2511
2512     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2513                                  backfillAge,
2514                                  static_cast<int>(
2515                                  configuration.getTapKeepalive()),
2516                                  vbuckets,
2517                                  lastCheckpointIds);
2518
2519     tapConnMap->notifyPausedConnection(tp, true);
2520     return true;
2521 }
2522
2523 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2524                                                         void *engine_specific,
2525                                                         uint16_t nengine,
2526                                                         uint8_t ttl,
2527                                                         uint16_t tap_flags,
2528                                                         uint16_t tap_event,
2529                                                         uint32_t tap_seqno,
2530                                                         const void *key,
2531                                                         size_t nkey,
2532                                                         uint32_t flags,
2533                                                         uint32_t exptime,
2534                                                         uint64_t cas,
2535                                                         uint8_t datatype,
2536                                                         const void *data,
2537                                                         size_t ndata,
2538                                                         uint16_t vbucket)
2539 {
2540     (void) ttl;
2541     void *specific = getEngineSpecific(cookie);
2542     ConnHandler *connection = NULL;
2543     if (specific == NULL) {
2544         if (tap_event == TAP_ACK) {
2545             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2546                 "exist. Force disconnect...\n", (char *) cookie);
2547             // tap producer is no longer connected..
2548             return ENGINE_DISCONNECT;
2549         } else {
2550             connection = tapConnMap->newConsumer(cookie);
2551             if (connection == NULL) {
2552                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2553                     " Force disconnect\n");
2554                 return ENGINE_DISCONNECT;
2555             }
2556             storeEngineSpecific(cookie, connection);
2557         }
2558     } else {
2559         connection = reinterpret_cast<ConnHandler *>(specific);
2560     }
2561
2562     std::string k(static_cast<const char*>(key), nkey);
2563     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2564
2565     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2566         if (!tapThrottle->shouldProcess()) {
2567             ++stats.tapThrottled;
2568             if (connection->supportsAck()) {
2569                 ret = ENGINE_TMPFAIL;
2570             } else {
2571                 ret = ENGINE_DISCONNECT;
2572                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2573                     "ack support. Force disconnect...\n",
2574                     connection->logHeader());
2575             }
2576             return ret;
2577         }
2578     }
2579
2580     switch (tap_event) {
2581     case TAP_ACK:
2582         ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2583         break;
2584     case TAP_FLUSH:
2585         ret = flush(cookie, 0);
2586         LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2587             connection->logHeader());
2588         break;
2589     case TAP_DELETION:
2590         {
2591             uint64_t revSeqno;
2592             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2593                                                 nengine, &revSeqno);
2594
2595             ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2596                                        NULL, 0);
2597         }
2598         break;
2599
2600     case TAP_CHECKPOINT_START:
2601     case TAP_CHECKPOINT_END:
2602         {
2603             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2604             if (tc) {
2605                 if (tap_event == TAP_CHECKPOINT_START &&
2606                     nengine == TapEngineSpecific::sizeRevSeqno) {
2607                     // Set the current value for the max deleted seqno
2608                     RCPtr<VBucket> vb = getVBucket(vbucket);
2609                     if (!vb) {
2610                         return ENGINE_TMPFAIL;
2611                     }
2612                     uint64_t seqnum;
2613                     TapEngineSpecific::readSpecificData(tap_event,
2614                                                         engine_specific,
2615                                                         nengine,
2616                                                         &seqnum);
2617                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2618                 }
2619
2620                 if (data) {
2621                     uint64_t checkpointId;
2622                     memcpy(&checkpointId, data, sizeof(checkpointId));
2623                     checkpointId = ntohll(checkpointId);
2624                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2625                                           checkpointId);
2626                 }
2627                 else {
2628                     ret = ENGINE_DISCONNECT;
2629                     LOG(EXTENSION_LOG_WARNING,
2630                         "%s Checkpoint Id is missing in "
2631                         "CHECKPOINT messages. Force disconnect...\n",
2632                         connection->logHeader());
2633                 }
2634             }
2635             else {
2636                 ret = ENGINE_DISCONNECT;
2637                 LOG(EXTENSION_LOG_WARNING,
2638                     "%s not a consumer! Force disconnect\n",
2639                     connection->logHeader());
2640             }
2641         }
2642
2643         break;
2644
2645     case TAP_MUTATION:
2646         {
2647             uint8_t nru = INITIAL_NRU_VALUE;
2648             uint64_t revSeqno = 0;
2649             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2650                                                 nengine, &revSeqno, &nru);
2651
2652             if (!isDatatypeSupported(cookie)) {
2653                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2654                 const unsigned char *dat = (const unsigned char*)data;
2655                 const int datlen = ndata;
2656                 if (checkUTF8JSON(dat, datlen)) {
2657                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2658                 }
2659             }
2660             ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2661                                        flags, datatype, 0, 0, revSeqno, exptime,
2662                                        nru, NULL, 0);
2663         }
2664
2665         break;
2666
2667     case TAP_OPAQUE:
2668         if (nengine == sizeof(uint32_t)) {
2669             uint32_t cc;
2670             memcpy(&cc, engine_specific, sizeof(cc));
2671             cc = ntohl(cc);
2672
2673             switch (cc) {
2674             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2675                 // @todo: the memcached core will _ALWAYS_ send nack
2676                 //        if it encounter an error. This should be
2677                 // set as the default when we move to .next after 2.0
2678                 // (currently we need to allow the message for
2679                 // backwards compatibility)
2680                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2681                     connection->logHeader());
2682                 break;
2683             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2684                 connection->setSupportCheckpointSync(true);
2685                 LOG(EXTENSION_LOG_INFO,
2686                     "%s Enable checkpoint synchronization\n",
2687                     connection->logHeader());
2688                 break;
2689             case TAP_OPAQUE_OPEN_CHECKPOINT:
2690                 /**
2691                  * This event is only received by the TAP client that wants to
2692                  * get mutations from closed checkpoints only. At this time,
2693                  * only incremental backup client receives this event so that
2694                  * it can close the connection and reconnect later.
2695                  */
2696                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2697                     connection->logHeader());
2698                 break;
2699             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2700                 {
2701                     LOG(EXTENSION_LOG_INFO,
2702                         "%s Backfill started for vbucket %d.\n",
2703                         connection->logHeader(), vbucket);
2704                     BlockTimer timer(&stats.tapVbucketResetHisto);
2705                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2706                                                   ENGINE_DISCONNECT;
2707                     if (ret == ENGINE_DISCONNECT) {
2708                         LOG(EXTENSION_LOG_WARNING,
2709                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2710                             connection->logHeader(), vbucket);
2711                     } else {
2712                         LOG(EXTENSION_LOG_WARNING,
2713                          "%s Reset vbucket %d was completed succecssfully.\n",
2714                             connection->logHeader(), vbucket);
2715                     }
2716
2717                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2718                     if (tc) {
2719                         tc->setBackfillPhase(true, vbucket);
2720                     } else {
2721                         ret = ENGINE_DISCONNECT;
2722                         LOG(EXTENSION_LOG_WARNING,
2723                             "TAP consumer doesn't exists. Force disconnect\n");
2724                     }
2725                 }
2726                 break;
2727             case TAP_OPAQUE_CLOSE_BACKFILL:
2728                 {
2729                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2730                         connection->logHeader());
2731                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2732                     if (tc) {
2733                         tc->setBackfillPhase(false, vbucket);
2734                     } else {
2735                         ret = ENGINE_DISCONNECT;
2736                         LOG(EXTENSION_LOG_WARNING,
2737                             "%s not a consumer! Force disconnect\n",
2738                             connection->logHeader());
2739                     }
2740                 }
2741                 break;
2742             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2743                 /**
2744                  * This event is sent by the eVBucketMigrator to notify that
2745                  * the source node closes the tap replication stream and
2746                  * switches to TAKEOVER_VBUCKETS phase.
2747                  * This is just an informative message and doesn't require any
2748                  * action.
2749                  */
2750                 LOG(EXTENSION_LOG_INFO,
2751                 "%s Received close tap stream. Switching to takeover phase.\n",
2752                     connection->logHeader());
2753                 break;
2754             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2755                 /**
2756                  * This opaque message is just for notifying that the source
2757                  * node receives change_vbucket_filter request and processes
2758                  * it successfully.
2759                  */
2760                 LOG(EXTENSION_LOG_INFO,
2761                 "%s Notified that the source node changed a vbucket filter.\n",
2762                     connection->logHeader());
2763                 break;
2764             default:
2765                 LOG(EXTENSION_LOG_WARNING,
2766                     "%s Received an unknown opaque command\n",
2767                     connection->logHeader());
2768             }
2769         } else {
2770             LOG(EXTENSION_LOG_WARNING,
2771                 "%s Received tap opaque with unknown size %d\n",
2772                 connection->logHeader(), nengine);
2773         }
2774         break;
2775
2776     case TAP_VBUCKET_SET:
2777         {
2778             BlockTimer timer(&stats.tapVbucketSetHisto);
2779
2780             if (nengine != sizeof(vbucket_state_t)) {
2781                 // illegal datasize
2782                 LOG(EXTENSION_LOG_WARNING,
2783                     "%s Received TAP_VBUCKET_SET with illegal size."
2784                     " Force disconnect\n", connection->logHeader());
2785                 ret = ENGINE_DISCONNECT;
2786                 break;
2787             }
2788
2789             vbucket_state_t state;
2790             memcpy(&state, engine_specific, nengine);
2791             state = (vbucket_state_t)ntohl(state);
2792
2793             ret = connection->setVBucketState(0, vbucket, state);
2794         }
2795         break;
2796
2797     default:
2798         // Unknown command
2799         LOG(EXTENSION_LOG_WARNING,
2800             "%s Recieved bad opcode, ignoring message\n",
2801             connection->logHeader());
2802     }
2803
2804     connection->processedEvent(tap_event, ret);
2805     return ret;
2806 }
2807
2808 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2809                                                       TapConsumer *consumer,
2810                                                       uint8_t event,
2811                                                       uint16_t vbucket,
2812                                                       uint64_t checkpointId) {
2813     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2814
2815     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2816         getEpStore()->wakeUpFlusher();
2817         ret = ENGINE_SUCCESS;
2818     }
2819     else {
2820         ret = ENGINE_DISCONNECT;
2821         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2822             "checkpoint %llu. Force disconnect\n",
2823             consumer->logHeader(), checkpointId);
2824     }
2825
2826     return ret;
2827 }
2828
2829 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2830     TapProducer *rv =
2831         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2832     if (!(rv && rv->isConnected())) {
2833         LOG(EXTENSION_LOG_WARNING,
2834             "Walking a non-existent tap queue, disconnecting\n");
2835         return NULL;
2836     }
2837
2838     if (rv->doDisconnect()) {
2839         LOG(EXTENSION_LOG_WARNING,
2840             "%s Disconnecting pending connection\n", rv->logHeader());
2841         return NULL;
2842     }
2843     return rv;
2844 }
2845
2846 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2847                                                             uint32_t seqno,
2848                                                             uint16_t status,
2849                                                             const std::string
2850                                                             &msg)
2851 {
2852     TapProducer *connection = getTapProducer(cookie);
2853     if (!connection) {
2854         LOG(EXTENSION_LOG_WARNING,
2855             "Unable to process tap ack. No producer found\n");
2856         return ENGINE_DISCONNECT;
2857     }
2858
2859     return connection->processAck(seqno, status, msg);
2860 }
2861
2862 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2863                                                              &backfillVBFilter,
2864                                                Producer *tc)
2865 {
2866     ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2867                                            backfillVBFilter);
2868     ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2869 }
2870
2871 bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2872     ++numVbucket;
2873     item_eviction_policy_t policy = engine.getEpStore()->
2874                                                        getItemEvictionPolicy();
2875     numItems += vb->getNumItems(policy);
2876     numTempItems += vb->getNumTempItems();
2877     nonResident += vb->getNumNonResidentItems(policy);
2878
2879     if (vb->getHighPriorityChkSize() > 0) {
2880         chkPersistRemaining++;
2881     }
2882
2883     fileSpaceUsed += vb->fileSpaceUsed;
2884     fileSize += vb->fileSize;
2885
2886     if (desired_state != vbucket_state_dead) {
2887         htMemory += vb->ht.memorySize();
2888         htItemMemory += vb->ht.getItemMemory();
2889         htCacheSize += vb->ht.cacheSize;
2890         numEjects += vb->ht.getNumEjects();
2891         numExpiredItems += vb->numExpiredItems;
2892         metaDataMemory += vb->ht.metaDataMemory;
2893         metaDataDisk += vb->metaDataDisk;
2894         opsCreate += vb->opsCreate;
2895         opsUpdate += vb->opsUpdate;
2896         opsDelete += vb->opsDelete;
2897         opsReject += vb->opsReject;
2898
2899         queueSize += vb->dirtyQueueSize;
2900         queueMemory += vb->dirtyQueueMem;
2901         queueFill += vb->dirtyQueueFill;
2902         queueDrain += vb->dirtyQueueDrain;
2903         queueAge += vb->getQueueAge();
2904         pendingWrites += vb->dirtyQueuePendingWrites;
2905     }
2906
2907     return false;
2908 }
2909
2910 /**
2911  * A container class holding VBucketCountVisitors to aggregate stats for
2912  * different vbucket states.
2913  */
2914 class VBucketCountAggregator : public VBucketVisitor  {
2915 public:
2916     bool visitBucket(RCPtr<VBucket> &vb)  {
2917         std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
2918         it = visitorMap.find(vb->getState());
2919         if ( it != visitorMap.end() ) {
2920             it->second->visitBucket(vb);
2921         }
2922
2923         return false;
2924     }
2925
2926     void addVisitor(VBucketCountVisitor* visitor)  {
2927         visitorMap[visitor->getVBucketState()] = visitor;
2928     }
2929 private:
2930     std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
2931 };
2932
2933 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
2934                                                            ADD_STAT add_stat) {
2935     VBucketCountAggregator aggregator;
2936
2937     VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
2938     aggregator.addVisitor(&activeCountVisitor);
2939
2940     VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
2941     aggregator.addVisitor(&replicaCountVisitor);
2942
2943     VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
2944     aggregator.addVisitor(&pendingCountVisitor);
2945
2946     VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
2947     aggregator.addVisitor(&deadCountVisitor);
2948
2949     epstore->visit(aggregator);
2950
2951     epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
2952                                       replicaCountVisitor.getMemResidentPer());
2953     tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
2954                                      replicaCountVisitor.getNumItems() +
2955                                      pendingCountVisitor.getNumItems());
2956
2957     configuration.addStats(add_stat, cookie);
2958
2959     EPStats &epstats = getEpStats();
2960     add_casted_stat("ep_version", VERSION, add_stat, cookie);
2961     add_casted_stat("ep_storage_age",
2962                     epstats.dirtyAge, add_stat, cookie);
2963     add_casted_stat("ep_storage_age_highwat",
2964                     epstats.dirtyAgeHighWat, add_stat, cookie);
2965     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
2966                     add_stat, cookie);
2967
2968     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
2969         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
2970     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
2971         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
2972     }
2973
2974     add_casted_stat("ep_total_enqueued",
2975                     epstats.totalEnqueued, add_stat, cookie);
2976     add_casted_stat("ep_total_persisted",
2977                     epstats.totalPersisted, add_stat, cookie);
2978     add_casted_stat("ep_item_flush_failed",
2979                     epstats.flushFailed, add_stat, cookie);
2980     add_casted_stat("ep_item_commit_failed",
2981                     epstats.commitFailed, add_stat, cookie);
2982     add_casted_stat("ep_item_begin_failed",
2983                     epstats.beginFailed, add_stat, cookie);
2984     add_casted_stat("ep_expired_access", epstats.expired_access,
2985                     add_stat, cookie);
2986     add_casted_stat("ep_expired_pager", epstats.expired_pager,
2987                     add_stat, cookie);
2988     add_casted_stat("ep_item_flush_expired",
2989                     epstats.flushExpired, add_stat, cookie);
2990     add_casted_stat("ep_queue_size",
2991                     epstats.diskQueueSize, add_stat, cookie);
2992     add_casted_stat("ep_flusher_todo",
2993                     epstats.flusher_todo, add_stat, cookie);
2994     add_casted_stat("ep_uncommitted_items",
2995                     epstats.flusher_todo, add_stat, cookie);
2996     add_casted_stat("ep_diskqueue_items",
2997                     epstats.diskQueueSize, add_stat, cookie);
2998     add_casted_stat("ep_flusher_state",
2999                     epstore->getFlusher(0)->stateName(),
3000                     add_stat, cookie);
3001     add_casted_stat("ep_commit_num", epstats.flusherCommits,
3002                     add_stat, cookie);
3003     add_casted_stat("ep_commit_time",
3004                     epstats.commit_time, add_stat, cookie);
3005     add_casted_stat("ep_commit_time_total",
3006                     epstats.cumulativeCommitTime, add_stat, cookie);
3007     add_casted_stat("ep_vbucket_del",
3008                     epstats.vbucketDeletions, add_stat, cookie);
3009     add_casted_stat("ep_vbucket_del_fail",
3010                     epstats.vbucketDeletionFail, add_stat, cookie);
3011     add_casted_stat("ep_flush_duration_total",
3012                     epstats.cumulativeFlushTime, add_stat, cookie);
3013     add_casted_stat("ep_flush_all",
3014                     epstore->isFlushAllScheduled() ? "true" : "false",
3015                     add_stat, cookie);
3016     add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
3017                     cookie);
3018     add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
3019                     add_stat, cookie);
3020     add_casted_stat("curr_items_tot",
3021                     activeCountVisitor.getNumItems() +
3022                     replicaCountVisitor.getNumItems() +
3023                     pendingCountVisitor.getNumItems(),
3024                     add_stat, cookie);
3025     add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
3026                     add_stat, cookie);
3027     add_casted_stat("vb_active_curr_items", activeCountVisitor.getNumItems(),
3028                     add_stat, cookie);
3029     add_casted_stat("vb_active_num_non_resident",
3030                     activeCountVisitor.getNonResident(),
3031                     add_stat, cookie);
3032     add_casted_stat("vb_active_perc_mem_resident",
3033                     activeCountVisitor.getMemResidentPer(),
3034                     add_stat, cookie);
3035     add_casted_stat("vb_active_eject", activeCountVisitor.getEjects(),
3036                     add_stat, cookie);
3037     add_casted_stat("vb_active_expired", activeCountVisitor.getExpired(),
3038                     add_stat, cookie);
3039     add_casted_stat("vb_active_meta_data_memory",
3040                     activeCountVisitor.getMetaDataMemory(),
3041                     add_stat, cookie);
3042     add_casted_stat("vb_active_meta_data_disk",
3043                     activeCountVisitor.getMetaDataDisk(),
3044                     add_stat, cookie);
3045     add_casted_stat("vb_active_ht_memory",
3046                     activeCountVisitor.getHashtableMemory(),
3047                     add_stat, cookie);
3048     add_casted_stat("vb_active_itm_memory", activeCountVisitor.getItemMemory(),
3049                     add_stat, cookie);
3050     add_casted_stat("vb_active_ops_create", activeCountVisitor.getOpsCreate(),
3051                     add_stat, cookie);
3052     add_casted_stat("vb_active_ops_update", activeCountVisitor.getOpsUpdate(),
3053                     add_stat, cookie);
3054     add_casted_stat("vb_active_ops_delete", activeCountVisitor.getOpsDelete(),
3055                     add_stat, cookie);
3056     add_casted_stat("vb_active_ops_reject", activeCountVisitor.getOpsReject(),
3057                     add_stat, cookie);
3058     add_casted_stat("vb_active_queue_size", activeCountVisitor.getQueueSize(),
3059                     add_stat, cookie);
3060     add_casted_stat("vb_active_queue_memory",
3061                     activeCountVisitor.getQueueMemory(), add_stat, cookie);
3062     add_casted_stat("vb_active_queue_age", activeCountVisitor.getAge(),
3063                     add_stat, cookie);
3064     add_casted_stat("vb_active_queue_pending",
3065                     activeCountVisitor.getPendingWrites(), add_stat, cookie);
3066     add_casted_stat("vb_active_queue_fill", activeCountVisitor.getQueueFill(),
3067                     add_stat, cookie);
3068     add_casted_stat("vb_active_queue_drain",
3069                     activeCountVisitor.getQueueDrain(), add_stat, cookie);
3070
3071     add_casted_stat("vb_replica_num", replicaCountVisitor.getVBucketNumber(),
3072                     add_stat, cookie);
3073     add_casted_stat("vb_replica_curr_items", replicaCountVisitor.getNumItems(),
3074                     add_stat, cookie);
3075     add_casted_stat("vb_replica_num_non_resident",
3076                     replicaCountVisitor.getNonResident(), add_stat, cookie);
3077     add_casted_stat("vb_replica_perc_mem_resident",
3078                     replicaCountVisitor.getMemResidentPer(),
3079                     add_stat, cookie);
3080     add_casted_stat("vb_replica_eject", replicaCountVisitor.getEjects(),
3081                     add_stat, cookie);
3082     add_casted_stat("vb_replica_expired", replicaCountVisitor.getExpired(),
3083                     add_stat, cookie);
3084     add_casted_stat("vb_replica_meta_data_memory",
3085                     replicaCountVisitor.getMetaDataMemory(), add_stat, cookie);
3086     add_casted_stat("vb_replica_meta_data_disk",
3087                     replicaCountVisitor.getMetaDataDisk(), add_stat, cookie);
3088     add_casted_stat("vb_replica_ht_memory",
3089                     replicaCountVisitor.getHashtableMemory(),
3090                     add_stat, cookie);
3091     add_casted_stat("vb_replica_itm_memory",
3092                     replicaCountVisitor.getItemMemory(), add_stat, cookie);
3093     add_casted_stat("vb_replica_ops_create",
3094                     replicaCountVisitor.getOpsCreate(), add_stat, cookie);
3095     add_casted_stat("vb_replica_ops_update",
3096                     replicaCountVisitor.getOpsUpdate(), add_stat, cookie);
3097     add_casted_stat("vb_replica_ops_delete",
3098                     replicaCountVisitor.getOpsDelete(), add_stat, cookie);
3099     add_casted_stat("vb_replica_ops_reject",
3100                     replicaCountVisitor.getOpsReject(), add_stat, cookie);
3101     add_casted_stat("vb_replica_queue_size",
3102                     replicaCountVisitor.getQueueSize(), add_stat, cookie);
3103     add_casted_stat("vb_replica_queue_memory",
3104                     replicaCountVisitor.getQueueMemory(),
3105                     add_stat, cookie);
3106     add_casted_stat("vb_replica_queue_age",
3107                     replicaCountVisitor.getAge(), add_stat, cookie);
3108     add_casted_stat("vb_replica_queue_pending",
3109                     replicaCountVisitor.getPendingWrites(),
3110                     add_stat, cookie);
3111     add_casted_stat("vb_replica_queue_fill",
3112                     replicaCountVisitor.getQueueFill(), add_stat, cookie);
3113     add_casted_stat("vb_replica_queue_drain",
3114                     replicaCountVisitor.getQueueDrain(), add_stat, cookie);
3115
3116     add_casted_stat("vb_pending_num",
3117                     pendingCountVisitor.getVBucketNumber(), add_stat, cookie);
3118     add_casted_stat("vb_pending_curr_items",
3119                     pendingCountVisitor.getNumItems(), add_stat, cookie);
3120     add_casted_stat("vb_pending_num_non_resident",
3121                     pendingCountVisitor.getNonResident(),
3122                     add_stat, cookie);
3123     add_casted_stat("vb_pending_perc_mem_resident",
3124                     pendingCountVisitor.getMemResidentPer(), add_stat, cookie);
3125     add_casted_stat("vb_pending_eject", pendingCountVisitor.getEjects(),
3126                     add_stat, cookie);
3127     add_casted_stat("vb_pending_expired", pendingCountVisitor.getExpired(),
3128                     add_stat, cookie);
3129     add_casted_stat("vb_pending_meta_data_memory",
3130                     pendingCountVisitor.getMetaDataMemory(),
3131                     add_stat, cookie);
3132     add_casted_stat("vb_pending_meta_data_disk",
3133                     pendingCountVisitor.getMetaDataDisk(),
3134                     add_stat, cookie);
3135     add_casted_stat("vb_pending_ht_memory",
3136                     pendingCountVisitor.getHashtableMemory(),
3137                     add_stat, cookie);
3138     add_casted_stat("vb_pending_itm_memory",
3139                     pendingCountVisitor.getItemMemory(), add_stat, cookie);
3140     add_casted_stat("vb_pending_ops_create",
3141                     pendingCountVisitor.getOpsCreate(), add_stat, cookie);
3142     add_casted_stat("vb_pending_ops_update",
3143                     pendingCountVisitor.getOpsUpdate(), add_stat, cookie);
3144     add_casted_stat("vb_pending_ops_delete",
3145                     pendingCountVisitor.getOpsDelete(), add_stat, cookie);
3146     add_casted_stat("vb_pending_ops_reject",
3147                     pendingCountVisitor.getOpsReject(), add_stat, cookie);
3148     add_casted_stat("vb_pending_queue_size",
3149                     pendingCountVisitor.getQueueSize(), add_stat, cookie);
3150     add_casted_stat("vb_pending_queue_memory",
3151                     pendingCountVisitor.getQueueMemory(),
3152                     add_stat, cookie);
3153     add_casted_stat("vb_pending_queue_age", pendingCountVisitor.getAge(),
3154                     add_stat, cookie);
3155     add_casted_stat("vb_pending_queue_pending",
3156                     pendingCountVisitor.getPendingWrites(),
3157                     add_stat, cookie);
3158     add_casted_stat("vb_pending_queue_fill",
3159                     pendingCountVisitor.getQueueFill(), add_stat, cookie);
3160     add_casted_stat("vb_pending_queue_drain",
3161                     pendingCountVisitor.getQueueDrain(), add_stat, cookie);
3162
3163     add_casted_stat("vb_dead_num", deadCountVisitor.getVBucketNumber(),
3164                     add_stat, cookie);
3165
3166     add_casted_stat("ep_db_data_size",
3167                     activeCountVisitor.getFileSpaceUsed() +
3168                     replicaCountVisitor.getFileSpaceUsed() +
3169                     pendingCountVisitor.getFileSpaceUsed() +
3170                     deadCountVisitor.getFileSpaceUsed(),
3171                     add_stat, cookie);
3172     add_casted_stat("ep_db_file_size",
3173                     activeCountVisitor.getFileSize() +
3174                     replicaCountVisitor.getFileSize() +
3175                     pendingCountVisitor.getFileSize() +
3176                     deadCountVisitor.getFileSize(),
3177                     add_stat, cookie);
3178
3179     add_casted_stat("ep_vb_snapshot_total",
3180                     epstats.snapshotVbucketHisto.total(), add_stat, cookie);
3181
3182     add_casted_stat("ep_vb_total",
3183                     activeCountVisitor.getVBucketNumber() +
3184                     replicaCountVisitor.getVBucketNumber() +
3185                     pendingCountVisitor.getVBucketNumber() +
3186                     deadCountVisitor.getVBucketNumber(),
3187                     add_stat, cookie);
3188
3189     add_casted_stat("ep_total_new_items",
3190                     activeCountVisitor.getOpsCreate() +
3191                     replicaCountVisitor.getOpsCreate() +
3192                     pendingCountVisitor.getOpsCreate(),
3193                     add_stat, cookie);
3194     add_casted_stat("ep_total_del_items",
3195                     activeCountVisitor.getOpsDelete() +
3196                     replicaCountVisitor.getOpsDelete() +
3197                     pendingCountVisitor.getOpsDelete(),
3198                     add_stat, cookie);
3199     add_casted_stat("ep_diskqueue_memory",
3200                     activeCountVisitor.getQueueMemory() +
3201                     replicaCountVisitor.getQueueMemory() +
3202                     pendingCountVisitor.getQueueMemory(),
3203                     add_stat, cookie);
3204     add_casted_stat("ep_diskqueue_fill",
3205                     activeCountVisitor.getQueueFill() +
3206                     replicaCountVisitor.getQueueFill() +
3207                     pendingCountVisitor.getQueueFill(),
3208                     add_stat, cookie);
3209     add_casted_stat("ep_diskqueue_drain",
3210                     activeCountVisitor.getQueueDrain() +
3211                     replicaCountVisitor.getQueueDrain() +
3212                     pendingCountVisitor.getQueueDrain(),
3213                     add_stat, cookie);
3214     add_casted_stat("ep_diskqueue_pending",
3215                     activeCountVisitor.getPendingWrites() +
3216                     replicaCountVisitor.getPendingWrites() +
3217                     pendingCountVisitor.getPendingWrites(),
3218                     add_stat, cookie);
3219     add_casted_stat("ep_meta_data_memory",
3220                     activeCountVisitor.getMetaDataMemory() +
3221                     replicaCountVisitor.getMetaDataMemory() +
3222                     pendingCountVisitor.getMetaDataMemory(),
3223                     add_stat, cookie);
3224     add_casted_stat("ep_meta_data_disk",
3225                     activeCountVisitor.getMetaDataDisk() +
3226                     replicaCountVisitor.getMetaDataDisk() +
3227                     pendingCountVisitor.getMetaDataDisk(),
3228                     add_stat, cookie);
3229
3230     size_t memUsed =  stats.getTotalMemoryUsed();
3231     add_casted_stat("mem_used", memUsed, add_stat, cookie);
3232     add_casted_stat("bytes", memUsed, add_stat, cookie);
3233     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3234     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3235 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3236     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3237 #else
3238     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3239 #endif
3240     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3241     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3242                     add_stat, cookie);
3243 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3244     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3245 #else
3246     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3247 #endif
3248     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3249     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3250     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3251     add_casted_stat("ep_total_cache_size",
3252                     activeCountVisitor.getCacheSize() +
3253                     replicaCountVisitor.getCacheSize() +
3254                     pendingCountVisitor.getCacheSize(),
3255                     add_stat, cookie);
3256     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3257     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3258                     add_stat, cookie);
3259     add_casted_stat("ep_mem_tracker_enabled",
3260                     stats.memoryTrackerEnabled ? "true" : "false",
3261                     add_stat, cookie);
3262     add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3263                     add_stat, cookie);
3264     add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3265                     add_stat, cookie);
3266     add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3267                     add_stat, cookie);
3268     add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3269                     add_stat, cookie);
3270     add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3271                     add_stat, cookie);
3272     add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3273                     add_stat, cookie);
3274     add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3275                     add_stat, cookie);
3276     add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3277                     add_stat, cookie);
3278     add_casted_stat("ep_items_rm_from_checkpoints",
3279                     epstats.itemsRemovedFromCheckpoints,
3280                     add_stat, cookie);
3281     add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3282                     add_stat, cookie);
3283     add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3284                     add_stat, cookie);
3285     add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3286                     add_stat, cookie);
3287
3288     add_casted_stat("ep_io_num_read", epstats.io_num_read,
3289                     add_stat, cookie);
3290     add_casted_stat("ep_io_num_write", epstats.io_num_write, add_stat, cookie);
3291     add_casted_stat("ep_io_read_bytes", epstats.io_read_bytes,
3292                     add_stat, cookie);
3293     add_casted_stat("ep_io_write_bytes", epstats.io_write_bytes,
3294                      add_stat, cookie);
3295
3296     add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3297     add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3298                     add_stat, cookie);
3299     add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3300                     add_stat, cookie);
3301     add_casted_stat("ep_pending_ops_max_duration",
3302                     epstats.pendingOpsMaxDuration,
3303                     add_stat, cookie);
3304
3305     add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3306                     add_stat, cookie);
3307     add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3308                     add_stat, cookie);
3309
3310     size_t vbDeletions = epstats.vbucketDeletions.load();
3311     if (vbDeletions > 0) {
3312         add_casted_stat("ep_vbucket_del_max_walltime",
3313                         epstats.vbucketDelMaxWalltime,
3314                         add_stat, cookie);
3315         add_casted_stat("ep_vbucket_del_avg_walltime",
3316                         epstats.vbucketDelTotWalltime / vbDeletions,
3317                         add_stat, cookie);
3318     }
3319
3320     size_t numBgOps = epstats.bgNumOperations.load();
3321     if (numBgOps > 0) {
3322         add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3323                         add_stat, cookie);
3324         add_casted_stat("ep_bg_min_wait",
3325                         epstats.bgMinWait,
3326                         add_stat, cookie);
3327         add_casted_stat("ep_bg_max_wait",
3328                         epstats.bgMaxWait,
3329                         add_stat, cookie);
3330         add_casted_stat("ep_bg_wait_avg",
3331                         epstats.bgWait / numBgOps,
3332                         add_stat, cookie);
3333         add_casted_stat("ep_bg_min_load",
3334                         epstats.bgMinLoad,
3335                         add_stat, cookie);
3336         add_casted_stat("ep_bg_max_load",
3337                         epstats.bgMaxLoad,
3338                         add_stat, cookie);
3339         add_casted_stat("ep_bg_load_avg",
3340                         epstats.bgLoad / numBgOps,
3341                         add_stat, cookie);
3342         add_casted_stat("ep_bg_wait",
3343                         epstats.bgWait,
3344                         add_stat, cookie);
3345         add_casted_stat("ep_bg_load",
3346                         epstats.bgLoad,
3347                         add_stat, cookie);
3348     }
3349
3350     add_casted_stat("ep_num_non_resident",
3351                     activeCountVisitor.getNonResident() +
3352                     pendingCountVisitor.getNonResident() +
3353                     replicaCountVisitor.getNonResident(),
3354                     add_stat, cookie);
3355
3356     add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3357     add_casted_stat("ep_exp_pager_stime", epstore->getExpiryPagerSleeptime(),
3358                     add_stat, cookie);
3359
3360     add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3361                     add_stat, cookie);
3362     add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3363                     add_stat, cookie);
3364     add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3365                     add_stat, cookie);
3366     add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3367                     add_stat, cookie);
3368
3369     if (getConfiguration().isAccessScannerEnabled()) {
3370         char timestr[20];
3371         struct tm alogTim;
3372         if (cb_gmtime_r((time_t *)&epstats.alogTime, &alogTim) == -1) {
3373             add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3374                             cookie);
3375         } else {
3376             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3377             add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3378                             cookie);
3379         }
3380     } else {
3381         add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3382                         add_stat, cookie);
3383     }
3384
3385     add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
3386
3387     if (getConfiguration().isWarmup()) {
3388         Warmup *wp = epstore->getWarmup();
3389         cb_assert(wp);
3390         if (!epstore->isWarmingUp()) {
3391             add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
3392         } else {
3393             add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);
3394         }
3395         if (wp->getTime() > 0) {
3396             add_casted_stat("ep_warmup_time", wp->getTime() / 1000,
3397                             add_stat, cookie);
3398         }
3399         add_casted_stat("ep_warmup_oom", epstats.warmOOM, add_stat, cookie);
3400         add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
3401     }
3402
3403     add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
3404                     add_stat, cookie);
3405     add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
3406                     add_stat, cookie);
3407     add_casted_stat("ep_num_ops_del_meta", epstats.numOpsDelMeta,
3408                     add_stat, cookie);
3409     add_casted_stat("ep_num_ops_set_meta_res_fail",
3410                     epstats.numOpsSetMetaResolutionFailed, add_stat, cookie);
3411     add_casted_stat("ep_num_ops_del_meta_res_fail",
3412                     epstats.numOpsDelMetaResolutionFailed, add_stat, cookie);
3413     add_casted_stat("ep_num_ops_set_ret_meta", epstats.numOpsSetRetMeta,
3414                     add_stat, cookie);
3415     add_casted_stat("ep_num_ops_del_ret_meta", epstats.numOpsDelRetMeta,
3416                     add_stat, cookie);
3417     add_casted_stat("ep_num_ops_get_meta_on_set_meta",
3418                     epstats.numOpsGetMetaOnSetWithMeta, add_stat, cookie);
3419     add_casted_stat("ep_chk_persistence_timeout",
3420                     VBucket::getCheckpointFlushTimeout(),
3421                     add_stat, cookie);
3422     add_casted_stat("ep_chk_persistence_remains",
3423                     activeCountVisitor.getChkPersistRemaining() +
3424                     pendingCountVisitor.getChkPersistRemaining() +
3425                     replicaCountVisitor.getChkPersistRemaining(),
3426                     add_stat, cookie);
3427     add_casted_stat("ep_workload_pattern",
3428                     workload->stringOfWorkLoadPattern(),
3429                     add_stat, cookie);
3430     return ENGINE_SUCCESS;
3431 }
3432
3433 ENGINE_ERROR_CODE EventuallyPersistentEngine::doMemoryStats(const void *cookie,
3434                                                            ADD_STAT add_stat) {
3435     add_casted_stat("bytes", stats.getTotalMemoryUsed(), add_stat, cookie);
3436     add_casted_stat("mem_used", stats.getTotalMemoryUsed(), add_stat, cookie);
3437     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3438     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3439     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3440     add_casted_stat("ep_max_size", stats.getMaxDataSize(), add_stat, cookie);
3441     add_casted_stat("ep_mem_low_wat", stats.mem_low_wat, add_stat, cookie);
3442     add_casted_stat("ep_mem_high_wat", stats.mem_high_wat, add_stat, cookie);
3443     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3444     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3445                     add_stat, cookie);
3446
3447     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3448 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3449     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3450 #else
3451     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3452 #endif
3453     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3454                     add_stat, cookie);
3455 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3456     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3457 #else
3458     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3459 #endif
3460     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3461     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3462
3463
3464     add_casted_stat("ep_mem_tracker_enabled",
3465                     stats.memoryTrackerEnabled ? "true" : "false",
3466                     add_stat, cookie);
3467
3468     std::map<std::string, size_t> alloc_stats;
3469     MemoryTracker::getInstance()->getAllocatorStats(alloc_stats);
3470     std::map<std::string, size_t>::iterator it = alloc_stats.begin();
3471     for (; it != alloc_stats.end(); ++it) {
3472         add_casted_stat(it->first.c_str(), it->second, add_stat, cookie);
3473     }
3474
3475     return ENGINE_SUCCESS;
3476 }
3477
3478 ENGINE_ERROR_CODE EventuallyPersistentEngine::doVBucketStats(
3479                                                        const void *cookie,
3480                                                        ADD_STAT add_stat,
3481                                                        const char* stat_key,
3482                                                        int nkey,
3483                                                        bool prevStateRequested,
3484                                                        bool details) {
3485     class StatVBucketVisitor : public VBucketVisitor {
3486     public:
3487         StatVBucketVisitor(EventuallyPersistentStore *store,
3488                            const void *c, ADD_STAT a,
3489                            bool isPrevStateRequested, bool detailsRequested) :
3490             eps(store), cookie(c), add_stat(a),
3491             isPrevState(isPrevStateRequested),
3492             isDetailsRequested(detailsRequested) {}
3493
3494         bool visitBucket(RCPtr<VBucket> &vb) {
3495             addVBStats(cookie, add_stat, vb, eps, isPrevState,
3496                        isDetailsRequested);
3497             return false;
3498         }
3499
3500         static void addVBStats(const void *cookie, ADD_STAT add_stat,
3501                                RCPtr<VBucket> &vb,
3502                                EventuallyPersistentStore *store,
3503                                bool isPrevStateRequested,
3504                                bool detailsRequested) {
3505             if (!vb) {
3506                 return;
3507             }
3508
3509             if (isPrevStateRequested) {
3510                 char buf[16];
3511                 snprintf(buf, sizeof(buf), "vb_%d", vb->getId());
3512                 add_casted_stat(buf, VBucket::toString(vb->getInitialState()),
3513                                 add_stat, cookie);
3514             } else {
3515                 vb->addStats(detailsRequested, add_stat, cookie,
3516                              store->getItemEvictionPolicy());
3517             }
3518         }
3519
3520     private:
3521         EventuallyPersistentStore *eps;
3522         const void *cookie;
3523         ADD_STAT add_stat;
3524         bool isPrevState;
3525         bool isDetailsRequested;
3526     };
3527
3528     if (nkey > 16 && strncmp(stat_key, "vbucket-details", 15) == 0) {
3529         std::string vbid(&stat_key[16], nkey - 16);
3530         uint16_t vbucket_id(0);
3531         if (!parseUint16(vbid.c_str(), &vbucket_id)) {
3532             return ENGINE_EINVAL;
3533         }
3534         RCPtr<VBucket> vb = getVBucket(vbucket_id);
3535         if (!vb) {
3536             return ENGINE_NOT_MY_VBUCKET;
3537         }
3538
3539         StatVBucketVisitor::addVBStats(cookie, add_stat, vb, epstore,
3540                                        prevStateRequested, details);
3541     }
3542     else {
3543         StatVBucketVisitor svbv(epstore, cookie, add_stat, prevStateRequested,
3544                                 details);
3545         epstore->visit(svbv);
3546     }
3547     return ENGINE_SUCCESS;
3548 }
3549
3550 ENGINE_ERROR_CODE EventuallyPersistentEngine::doHashStats(const void *cookie,
3551                                                           ADD_STAT add_stat) {
3552
3553     class StatVBucketVisitor : public VBucketVisitor {
3554     public:
3555         StatVBucketVisitor(const void *c, ADD_STAT a) : cookie(c),
3556                                                         add_stat(a) {}
3557
3558         bool visitBucket(RCPtr<VBucket> &vb) {
3559             uint16_t vbid = vb->getId();
3560             char buf[32];
3561             snprintf(buf, sizeof(buf), "vb_%d:state", vbid);
3562             add_casted_stat(buf, VBucket::toString(vb->getState()),
3563                             add_stat, cookie);
3564
3565             HashTableDepthStatVisitor depthVisitor;
3566             vb->ht.visitDepth(depthVisitor);
3567
3568             snprintf(buf, sizeof(buf), "vb_%d:size", vbid);
3569             add_casted_stat(buf, vb->ht.getSize(), add_stat, cookie);
3570             snprintf(buf, sizeof(buf), "vb_%d:locks", vbid);
3571             add_casted_stat(buf, vb->ht.getNumLocks(), add_stat, cookie);
3572             snprintf(buf, sizeof(buf), "vb_%d:min_depth", vbid);
3573             add_casted_stat(buf, depthVisitor.min == -1 ? 0 : depthVisitor.min,
3574                             add_stat, cookie);
3575             snprintf(buf, sizeof(buf), "vb_%d:max_depth", vbid);
3576             add_casted_stat(buf, depthVisitor.max, add_stat, cookie);
3577             snprintf(buf, sizeof(buf), "vb_%d:histo", vbid);
3578             add_casted_stat(buf, depthVisitor.depthHisto, add_stat, cookie);
3579             snprintf(buf, sizeof(buf), "vb_%d:reported", vbid);
3580             add_casted_stat(buf, vb->ht.getNumInMemoryItems(), add_stat, cookie);
3581             snprintf(buf, sizeof(buf), "vb_%d:counted", vbid);
3582             add_casted_stat(buf, depthVisitor.size, add_stat, cookie);
3583             snprintf(buf, sizeof(buf), "vb_%d:resized", vbid);
3584             add_casted_stat(buf, vb->ht.getNumResizes(), add_stat, cookie);
3585             snprintf(buf, sizeof(buf), "vb_%d:mem_size", vbid);
3586             add_casted_stat(buf, vb->ht.memSize, add_stat, cookie);
3587             snprintf(buf, sizeof(buf), "vb_%d:mem_size_counted", vbid);
3588             add_casted_stat(buf, depthVisitor.memUsed, add_stat, cookie);
3589
3590             return false;
3591         }
3592
3593         const void *cookie;
3594         ADD_STAT add_stat;
3595     };
3596
3597     StatVBucketVisitor svbv(cookie, add_stat);
3598     epstore->visit(svbv);
3599
3600     return ENGINE_SUCCESS;
3601 }
3602
3603 class StatCheckpointVisitor : public VBucketVisitor {
3604 public:
3605     StatCheckpointVisitor(EventuallyPersistentStore * eps, const void *c,
3606                           ADD_STAT a) : epstore(eps), cookie(c), add_stat(a) {}
3607
3608     bool visitBucket(RCPtr<VBucket> &vb) {
3609         addCheckpointStat(cookie, add_stat, epstore, vb);
3610         return false;
3611     }
3612
3613     static void addCheckpointStat(const void *cookie, ADD_STAT add_stat,
3614                                   EventuallyPersistentStore *eps,
3615                                   RCPtr<VBucket> &vb) {
3616         if (!vb) {
3617             return;
3618         }
3619
3620         uint16_t vbid = vb->getId();
3621         char buf[256];
3622         snprintf(buf, sizeof(buf), "vb_%d:state", vbid);
3623         add_casted_stat(buf, VBucket::toString(vb->getState()),
3624                         add_stat, cookie);
3625         vb->checkpointManager.addStats(add_stat, cookie);
3626         snprintf(buf, sizeof(buf), "vb_%d:persisted_checkpoint_id", vbid);
3627         add_casted_stat(buf, eps->getLastPersistedCheckpointId(vbid),
3628                         add_stat, cookie);
3629     }
3630
3631     EventuallyPersistentStore *epstore;
3632     const void *cookie;
3633     ADD_STAT add_stat;
3634 };
3635
3636
3637 class StatCheckpointTask : public GlobalTask {