8dc4f82174a5d29cc679ca14b24669458e2d2253
[ep-engine.git] / src / ep_engine.cc
1
2 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 /*
4  *     Copyright 2010 Couchbase, Inc
5  *
6  *   Licensed under the Apache License, Version 2.0 (the "License");
7  *   you may not use this file except in compliance with the License.
8  *   You may obtain a copy of the License at
9  *
10  *       http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *   Unless required by applicable law or agreed to in writing, software
13  *   distributed under the License is distributed on an "AS IS" BASIS,
14  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *   See the License for the specific language governing permissions and
16  *   limitations under the License.
17  */
18
19 #include "config.h"
20
21 #include <fcntl.h>
22 #include <memcached/engine.h>
23 #include <memcached/protocol_binary.h>
24 #include <memcached/util.h>
25 #include <platform/platform.h>
26 #include <stdarg.h>
27
28 #include <cstdio>
29 #include <cstring>
30 #include <fstream>
31 #include <iostream>
32 #include <limits>
33 #include <string>
34 #include <vector>
35
36 #include "backfill.h"
37 #include "ep_engine.h"
38 #include "failover-table.h"
39 #include "flusher.h"
40 #include "connmap.h"
41 #include "htresizer.h"
42 #include "memory_tracker.h"
43 #include "stats-info.h"
44 #define STATWRITER_NAMESPACE core_engine
45 #include "statwriter.h"
46 #undef STATWRITER_NAMESPACE
47 #include "tapthrottle.h"
48 #include "dcp-consumer.h"
49 #include "dcp-producer.h"
50 #include "warmup.h"
51
52 #include <JSON_checker.h>
53
54 static ALLOCATOR_HOOKS_API *hooksApi;
55 SERVER_LOG_API* EventuallyPersistentEngine::loggerApi;
56
57
58 static size_t percentOf(size_t val, double percent) {
59     return static_cast<size_t>(static_cast<double>(val) * percent);
60 }
61
62 /**
63  * Helper function to avoid typing in the long cast all over the place
64  * @param handle pointer to the engine
65  * @return the engine as a class
66  */
67 static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
68 {
69     EventuallyPersistentEngine* ret;
70     ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
71     ObjectRegistry::onSwitchThread(ret);
72     return ret;
73 }
74
75 static inline void releaseHandle(ENGINE_HANDLE* handle) {
76     (void) handle;
77     ObjectRegistry::onSwitchThread(NULL);
78 }
79
80
81 /**
82  * Call the response callback and return the appropriate value so that
83  * the core knows what to do..
84  */
85 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
86                                       uint16_t keylen,
87                                       const void *ext, uint8_t extlen,
88                                       const void *body, uint32_t bodylen,
89                                       uint8_t datatype, uint16_t status,
90                                       uint64_t cas, const void *cookie)
91 {
92     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
93     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
94     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
95                  status, cas, cookie)) {
96         rv = ENGINE_SUCCESS;
97     }
98     ObjectRegistry::onSwitchThread(e);
99     return rv;
100 }
101
102 template <typename T>
103 static void validate(T v, T l, T h) {
104     if (v < l || v > h) {
105         throw std::runtime_error("Value out of range.");
106     }
107 }
108
109
110 static void checkNumeric(const char* str) {
111     int i = 0;
112     if (str[0] == '-') {
113         i++;
114     }
115     for (; str[i]; i++) {
116         using namespace std;
117         if (!isdigit(str[i])) {
118             throw std::runtime_error("Value is not numeric");
119         }
120     }
121 }
122
123 // The Engine API specifies C linkage for the functions..
124 extern "C" {
125
126     static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
127     {
128         engine_info* info = getHandle(handle)->getInfo();
129         releaseHandle(handle);
130         return info;
131     }
132
133     static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
134                                            const char* config_str)
135     {
136         ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
137         releaseHandle(handle);
138         return err_code;
139     }
140
141     static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
142     {
143         getHandle(handle)->destroy(force);
144         delete getHandle(handle);
145         releaseHandle(NULL);
146     }
147
148     static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
149                                              const void* cookie,
150                                              item **itm,
151                                              const void* key,
152                                              const size_t nkey,
153                                              const size_t nbytes,
154                                              const int flags,
155                                              const rel_time_t exptime,
156                                              uint8_t datatype)
157     {
158         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
159             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
160                     " (ItemAllocate)");
161             return ENGINE_EINVAL;
162         }
163         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
164                                                                      itm, key,
165                                                                      nkey,
166                                                                      nbytes,
167                                                                      flags,
168                                                                      exptime,
169                                                                      datatype);
170         releaseHandle(handle);
171         return err_code;
172     }
173
174     static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
175                                            const void* cookie,
176                                            const void* key,
177                                            const size_t nkey,
178                                            uint64_t* cas,
179                                            uint16_t vbucket)
180     {
181         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
182                                                                    nkey, cas,
183                                                                    vbucket);
184         releaseHandle(handle);
185         return err_code;
186     }
187
188     static void EvpItemRelease(ENGINE_HANDLE* handle,
189                                const void *cookie,
190                                item* itm)
191     {
192         getHandle(handle)->itemRelease(cookie, itm);
193         releaseHandle(handle);
194     }
195
196     static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
197                                     const void* cookie,
198                                     item** itm,
199                                     const void* key,
200                                     const int nkey,
201                                     uint16_t vbucket)
202     {
203         ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
204                                                             nkey, vbucket, true);
205         releaseHandle(handle);
206         return err_code;
207     }
208
209     static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
210                                          const void* cookie,
211                                          const char* stat_key,
212                                          int nkey,
213                                          ADD_STAT add_stat)
214     {
215         ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
216                                                                  stat_key,
217                                                                  nkey,
218                                                                  add_stat);
219         releaseHandle(handle);
220         return err_code;
221     }
222
223     static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
224                                       const void *cookie,
225                                       item* itm,
226                                       uint64_t *cas,
227                                       ENGINE_STORE_OPERATION operation,
228                                       uint16_t vbucket)
229     {
230         ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
231                                                               operation,
232                                                               vbucket);
233         releaseHandle(handle);
234         return err_code;
235     }
236
237     static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
238                                            const void* cookie,
239                                            const void* key,
240                                            const int nkey,
241                                            const bool increment,
242                                            const bool create,
243                                            const uint64_t delta,
244                                            const uint64_t initial,
245                                            const rel_time_t exptime,
246                                            uint64_t *cas,
247                                            uint8_t datatype,
248                                            uint64_t *result,
249                                            uint16_t vbucket)
250     {
251         if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
252             if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
253                 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
254                         " (Arithmetic)");
255             } else {
256                 LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
257                     "operations on compressed data!");
258             }
259             return ENGINE_EINVAL;
260         }
261         ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
262                                                                 nkey,
263                                                                 increment,
264                                                                 create, delta,
265                                                                 initial,
266                                                                 exptime, cas,
267                                                                 datatype,
268                                                                 result,
269                                                                 vbucket);
270         releaseHandle(handle);
271         return ecode;
272     }
273
274     static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
275                                       const void* cookie, time_t when)
276     {
277         ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
278         releaseHandle(handle);
279         return err_code;
280     }
281
282     static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
283     {
284         getHandle(handle)->resetStats();
285         releaseHandle(handle);
286     }
287
288     static protocol_binary_response_status stopFlusher(
289                                                  EventuallyPersistentEngine *e,
290                                                  const char **msg,
291                                                  size_t *msg_size) {
292         return e->stopFlusher(msg, msg_size);
293     }
294
295     static protocol_binary_response_status startFlusher(
296                                                  EventuallyPersistentEngine *e,
297                                                  const char **msg,
298                                                  size_t *msg_size) {
299         return e->startFlusher(msg, msg_size);
300     }
301
302     static protocol_binary_response_status setTapParam(
303                                                  EventuallyPersistentEngine *e,
304                                                  const char *keyz,
305                                                  const char *valz,
306                                                  const char **msg, size_t *) {
307         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
308
309         try {
310             int v = atoi(valz);
311             if (strcmp(keyz, "tap_keepalive") == 0) {
312                 checkNumeric(valz);
313                 validate(v, 0, MAX_TAP_KEEP_ALIVE);
314                 e->setTapKeepAlive(static_cast<uint32_t>(v));
315             } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
316                 checkNumeric(valz);
317                 e->getConfiguration().setTapThrottleThreshold(v);
318             } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
319                 checkNumeric(valz);
320                 e->getConfiguration().setTapThrottleQueueCap(v);
321             } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
322                 checkNumeric(valz);
323                 e->getConfiguration().setTapThrottleCapPcnt(v);
324             } else {
325                 *msg = "Unknown config param";
326                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
327             }
328         } catch(std::runtime_error &) {
329             *msg = "Value out of range.";
330             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
331         }
332
333         return rv;
334     }
335
336     static protocol_binary_response_status setCheckpointParam(
337                                                  EventuallyPersistentEngine *e,
338                                                               const char *keyz,
339                                                               const char *valz,
340                                                               const char **msg,
341                                                               size_t *) {
342         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
343
344         try {
345             int v = atoi(valz);
346             if (strcmp(keyz, "chk_max_items") == 0) {
347                 checkNumeric(valz);
348                 validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
349                 e->getConfiguration().setChkMaxItems(v);
350             } else if (strcmp(keyz, "chk_period") == 0) {
351                 checkNumeric(valz);
352                 validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
353                 e->getConfiguration().setChkPeriod(v);
354             } else if (strcmp(keyz, "max_checkpoints") == 0) {
355                 checkNumeric(valz);
356                 validate(v, DEFAULT_MAX_CHECKPOINTS,
357                          MAX_CHECKPOINTS_UPPER_BOUND);
358                 e->getConfiguration().setMaxCheckpoints(v);
359             } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
360                 if (strcmp(valz, "true") == 0) {
361                     e->getConfiguration().setItemNumBasedNewChk(true);
362                 } else {
363                     e->getConfiguration().setItemNumBasedNewChk(false);
364                 }
365             } else if (strcmp(keyz, "keep_closed_chks") == 0) {
366                 if (strcmp(valz, "true") == 0) {
367                     e->getConfiguration().setKeepClosedChks(true);
368                 } else {
369                     e->getConfiguration().setKeepClosedChks(false);
370                 }
371             } else if (strcmp(keyz, "enable_chk_merge") == 0) {
372                 if (strcmp(valz, "true") == 0) {
373                     e->getConfiguration().setEnableChkMerge(true);
374                 } else {
375                     e->getConfiguration().setEnableChkMerge(false);
376                 }
377             } else {
378                 *msg = "Unknown config param";
379                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
380             }
381         } catch(std::runtime_error &) {
382             *msg = "Value out of range.";
383             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
384         }
385
386         return rv;
387     }
388
389     static protocol_binary_response_status setFlushParam(
390                                                  EventuallyPersistentEngine *e,
391                                                  const char *keyz,
392                                                  const char *valz,
393                                                  const char **msg,
394                                                  size_t *) {
395         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
396
397         // Handle the actual mutation.
398         try {
399             int v = atoi(valz);
400             if (strcmp(keyz, "bg_fetch_delay") == 0) {
401                 checkNumeric(valz);
402                 e->getConfiguration().setBgFetchDelay(v);
403             } else if (strcmp(keyz, "flushall_enabled") == 0) {
404                 if (strcmp(valz, "true") == 0) {
405                     e->getConfiguration().setFlushallEnabled(true);
406                 } else if(strcmp(valz, "false") == 0) {
407                     e->getConfiguration().setFlushallEnabled(false);
408                 } else {
409                     throw std::runtime_error("value out of range.");
410                 }
411             } else if (strcmp(keyz, "max_size") == 0) {
412                 char *ptr = NULL;
413                 checkNumeric(valz);
414                 uint64_t vsize = strtoull(valz, &ptr, 10);
415                 validate(vsize, static_cast<uint64_t>(0),
416                          std::numeric_limits<uint64_t>::max());
417                 e->getConfiguration().setMaxSize(vsize);
418                 e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
419                 e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
420             } else if (strcmp(keyz, "mem_low_wat") == 0) {
421                 char *ptr = NULL;
422                 checkNumeric(valz);
423                 uint64_t vsize = strtoull(valz, &ptr, 10);
424                 validate(vsize, static_cast<uint64_t>(0),
425                          std::numeric_limits<uint64_t>::max());
426                 e->getConfiguration().setMemLowWat(vsize);
427             } else if (strcmp(keyz, "mem_high_wat") == 0) {
428                 char *ptr = NULL;
429                 checkNumeric(valz);
430                 uint64_t vsize = strtoull(valz, &ptr, 10);
431                 validate(vsize, static_cast<uint64_t>(0),
432                          std::numeric_limits<uint64_t>::max());
433                 e->getConfiguration().setMemHighWat(vsize);
434             } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
435                 checkNumeric(valz);
436                 validate(v, 0, 100);
437                 e->getConfiguration().setBackfillMemThreshold(v);
438             } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
439                 checkNumeric(valz);
440                 validate(v, 0, 100);
441                 e->getConfiguration().setCompactionExpMemThreshold(v);
442             } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
443                 checkNumeric(valz);
444                 validate(v, 0, 100);
445                 e->getConfiguration().setMutationMemThreshold(v);
446             } else if (strcmp(keyz, "timing_log") == 0) {
447                 EPStats &stats = e->getEpStats();
448                 std::ostream *old = stats.timingLog;
449                 stats.timingLog = NULL;
450                 delete old;
451                 if (strcmp(valz, "off") == 0) {
452                     LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
453                 } else {
454                     std::ofstream *tmp(new std::ofstream(valz));
455                     if (tmp->good()) {
456                         LOG(EXTENSION_LOG_INFO,
457                             "Logging detailed timings to ``%s''.", valz);
458                         stats.timingLog = tmp;
459                     } else {
460                         LOG(EXTENSION_LOG_WARNING,
461                             "Error setting detailed timing log to ``%s'':  %s",
462                             valz, strerror(errno));
463                         delete tmp;
464                     }
465                 }
466             } else if (strcmp(keyz, "exp_pager_stime") == 0) {
467                 char *ptr = NULL;
468                 checkNumeric(valz);
469                 uint64_t vsize = strtoull(valz, &ptr, 10);
470                 validate(vsize, static_cast<uint64_t>(0),
471                          std::numeric_limits<uint64_t>::max());
472                 e->getConfiguration().setExpPagerStime((size_t)vsize);
473             } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
474                 if (strcmp(valz, "true") == 0) {
475                     e->getConfiguration().setAccessScannerEnabled(true);
476                 } else if (strcmp(valz, "false") == 0) {
477                     e->getConfiguration().setAccessScannerEnabled(false);
478                 } else {
479                     throw std::runtime_error("Value expected: true/false.");
480                 }
481             } else if (strcmp(keyz, "alog_sleep_time") == 0) {
482                 checkNumeric(valz);
483                 e->getConfiguration().setAlogSleepTime(v);
484             } else if (strcmp(keyz, "alog_task_time") == 0) {
485                 checkNumeric(valz);
486                 e->getConfiguration().setAlogTaskTime(v);
487             } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
488                 checkNumeric(valz);
489                 e->getConfiguration().setPagerActiveVbPcnt(v);
490             } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
491                 checkNumeric(valz);
492                 validate(v, 0, std::numeric_limits<int>::max());
493                 e->getConfiguration().setWarmupMinMemoryThreshold(v);
494             } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
495                 checkNumeric(valz);
496                 validate(v, 0, std::numeric_limits<int>::max());
497                 e->getConfiguration().setWarmupMinItemsThreshold(v);
498             } else if (strcmp(keyz, "max_num_readers") == 0) {
499                 checkNumeric(valz);
500                 validate(v, 0, std::numeric_limits<int>::max());
501                 e->getConfiguration().setMaxNumReaders(v);
502                 ExecutorPool::get()->setMaxReaders(v);
503             } else if (strcmp(keyz, "max_num_writers") == 0) {
504                 checkNumeric(valz);
505                 validate(v, 0, std::numeric_limits<int>::max());
506                 e->getConfiguration().setMaxNumWriters(v);
507                 ExecutorPool::get()->setMaxWriters(v);
508             } else if (strcmp(keyz, "max_num_auxio") == 0) {
509                 checkNumeric(valz);
510                 validate(v, 0, std::numeric_limits<int>::max());
511                 e->getConfiguration().setMaxNumAuxio(v);
512                 ExecutorPool::get()->setMaxAuxIO(v);
513             } else if (strcmp(keyz, "max_num_nonio") == 0) {
514                 checkNumeric(valz);
515                 validate(v, 0, std::numeric_limits<int>::max());
516                 e->getConfiguration().setMaxNumNonio(v);
517                 ExecutorPool::get()->setMaxNonIO(v);
518             } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
519                 checkNumeric(valz);
520                 validate(v, 1, std::numeric_limits<int>::max());
521                 e->getConfiguration().setCompactionWriteQueueCap(v);
522             } else {
523                 *msg = "Unknown config param";
524                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
525             }
526         } catch(std::runtime_error& ex) {
527             *msg = strdup(ex.what());
528             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
529         }
530
531         return rv;
532     }
533
534     static protocol_binary_response_status evictKey(
535                                                  EventuallyPersistentEngine *e,
536                                                  protocol_binary_request_header
537                                                                       *request,
538                                                  const char **msg,
539                                                  size_t *msg_size) {
540         protocol_binary_request_no_extras *req =
541             (protocol_binary_request_no_extras*)request;
542
543         char keyz[256];
544
545         // Read the key.
546         int keylen = ntohs(req->message.header.request.keylen);
547         if (keylen >= (int)sizeof(keyz)) {
548             *msg = "Key is too large.";
549             return PROTOCOL_BINARY_RESPONSE_EINVAL;
550         }
551         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
552         keyz[keylen] = 0x00;
553
554         uint16_t vbucket = ntohs(request->request.vbucket);
555
556         std::string key(keyz, keylen);
557
558         LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
559                 keyz);
560
561         protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
562                                                          msg_size);
563         if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
564             rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
565             if (e->isDegradedMode()) {
566                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
567             }
568         }
569         return rv;
570     }
571
572     static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
573                                        protocol_binary_request_header *req,
574                                        const void *cookie,
575                                        Item **itm,
576                                        const char **msg,
577                                        size_t *,
578                                        protocol_binary_response_status *res) {
579
580         uint8_t extlen = req->request.extlen;
581         if (extlen != 0 && extlen != 4) {
582             *msg = "Invalid packet format (extlen may be 0 or 4)";
583             *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
584             return ENGINE_EINVAL;
585         }
586
587         protocol_binary_request_getl *grequest =
588             (protocol_binary_request_getl*)req;
589         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
590
591         const char *keyp = reinterpret_cast<const char*>(req->bytes);
592         keyp += sizeof(req->bytes) + extlen;
593         std::string key(keyp, ntohs(req->request.keylen));
594         uint16_t vbucket = ntohs(req->request.vbucket);
595
596         RememberingCallback<GetValue> getCb;
597         uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
598         uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
599         uint32_t lockTimeout = default_timeout;
600         if (extlen == 4) {
601             lockTimeout = ntohl(grequest->message.body.expiration);
602         }
603
604         if (lockTimeout >  max_timeout || lockTimeout < 1) {
605             LOG(EXTENSION_LOG_WARNING,
606                 "Illegal value for lock timeout specified"
607                 " %u. Using default value: %u", lockTimeout, default_timeout);
608             lockTimeout = default_timeout;
609         }
610
611         bool gotLock = e->getLocked(key, vbucket, getCb,
612                                     ep_current_time(),
613                                     lockTimeout, cookie);
614
615         getCb.waitForValue();
616         ENGINE_ERROR_CODE rv = getCb.val.getStatus();
617
618         if (rv == ENGINE_SUCCESS) {
619             *itm = getCb.val.getValue();
620             ++(e->getEpStats().numOpsGet);
621         } else if (rv == ENGINE_EWOULDBLOCK) {
622
623             // need to wait for value
624             return rv;
625         } else if (rv == ENGINE_NOT_MY_VBUCKET) {
626             *msg = "That's not my bucket.";
627             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
628             return ENGINE_NOT_MY_VBUCKET;
629         } else if (!gotLock){
630             *msg =  "LOCK_ERROR";
631             *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
632             return ENGINE_TMPFAIL;
633         } else {
634             if (e->isDegradedMode()) {
635                 *msg = "LOCK_TMP_ERROR";
636                 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
637                 return ENGINE_TMPFAIL;
638             }
639
640             *msg = "NOT_FOUND";
641             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
642             return ENGINE_KEY_ENOENT;
643         }
644
645         return rv;
646     }
647
648     static protocol_binary_response_status unlockKey(
649                                                  EventuallyPersistentEngine *e,
650                                                  protocol_binary_request_header
651                                                                       *request,
652                                                  const char **msg,
653                                                  size_t *)
654     {
655         protocol_binary_request_no_extras *req =
656             (protocol_binary_request_no_extras*)request;
657
658         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
659         char keyz[256];
660
661         // Read the key.
662         int keylen = ntohs(req->message.header.request.keylen);
663         if (keylen >= (int)sizeof(keyz)) {
664             *msg = "Key is too large.";
665             return PROTOCOL_BINARY_RESPONSE_EINVAL;
666         }
667
668         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
669         keyz[keylen] = 0x00;
670
671         uint16_t vbucket = ntohs(request->request.vbucket);
672         std::string key(keyz, keylen);
673
674         LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
675
676         RememberingCallback<GetValue> getCb;
677         uint64_t cas = ntohll(request->request.cas);
678
679         ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
680                                             ep_current_time());
681
682         if (rv == ENGINE_SUCCESS) {
683             *msg = "UNLOCKED";
684         } else if (rv == ENGINE_TMPFAIL){
685             *msg =  "UNLOCK_ERROR";
686             res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
687         } else {
688             if (e->isDegradedMode()) {
689                 *msg = "LOCK_TMP_ERROR";
690                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
691             }
692
693             RCPtr<VBucket> vb = e->getVBucket(vbucket);
694             if (!vb) {
695                 *msg = "That's not my bucket.";
696                 res =  PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
697             }
698             *msg = "NOT_FOUND";
699             res =  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
700         }
701
702         return res;
703     }
704
705     static protocol_binary_response_status setParam(
706                                             EventuallyPersistentEngine *e,
707                                             protocol_binary_request_set_param
708                                                                      *req,
709                                             const char **msg,
710                                             size_t *msg_size) {
711
712         size_t keylen = ntohs(req->message.header.request.keylen);
713         uint8_t extlen = req->message.header.request.extlen;
714         size_t vallen = ntohl(req->message.header.request.bodylen);
715         protocol_binary_engine_param_t paramtype =
716             static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
717
718         if (keylen == 0 || (vallen - keylen - extlen) == 0) {
719             return PROTOCOL_BINARY_RESPONSE_EINVAL;
720         }
721
722         const char *keyp = reinterpret_cast<const char*>(req->bytes)
723                            + sizeof(req->bytes);
724         const char *valuep = keyp + keylen;
725         vallen -= (keylen + extlen);
726
727         char keyz[32];
728         char valz[512];
729
730         // Read the key.
731         if (keylen >= sizeof(keyz)) {
732             *msg = "Key is too large.";
733             return PROTOCOL_BINARY_RESPONSE_EINVAL;
734         }
735         memcpy(keyz, keyp, keylen);
736         keyz[keylen] = 0x00;
737
738         // Read the value.
739         if (vallen >= sizeof(valz)) {
740             *msg = "Value is too large.";
741             return PROTOCOL_BINARY_RESPONSE_EINVAL;
742         }
743         memcpy(valz, valuep, vallen);
744         valz[vallen] = 0x00;
745
746         protocol_binary_response_status rv;
747
748         switch (paramtype) {
749         case protocol_binary_engine_param_flush:
750             rv = setFlushParam(e, keyz, valz, msg, msg_size);
751             break;
752         case protocol_binary_engine_param_tap:
753             rv = setTapParam(e, keyz, valz, msg, msg_size);
754             break;
755         case protocol_binary_engine_param_checkpoint:
756             rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
757             break;
758         default:
759             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
760         }
761
762         return rv;
763     }
764
765     static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
766                                        const void *cookie,
767                                        protocol_binary_request_header *request,
768                                        ADD_RESPONSE response) {
769         protocol_binary_request_get_vbucket *req =
770             reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
771         cb_assert(req);
772
773         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
774         RCPtr<VBucket> vb = e->getVBucket(vbucket);
775         if (!vb) {
776             LockHolder lh(e->clusterConfig.lock);
777             return sendResponse(response, NULL, 0, NULL, 0,
778                                 e->clusterConfig.config,
779                                 e->clusterConfig.len,
780                                 PROTOCOL_BINARY_RAW_BYTES,
781                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
782                                 cookie);
783         } else {
784             vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
785             return sendResponse(response, NULL, 0, NULL, 0, &state,
786                                 sizeof(state),
787                                 PROTOCOL_BINARY_RAW_BYTES,
788                                 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
789         }
790     }
791
792     static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
793                                        const void *cookie,
794                                        protocol_binary_request_header *request,
795                                        ADD_RESPONSE response) {
796
797         protocol_binary_request_set_vbucket *req =
798             reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
799
800         uint64_t cas = ntohll(req->message.header.request.cas);
801
802         size_t bodylen = ntohl(req->message.header.request.bodylen)
803             - ntohs(req->message.header.request.keylen);
804         if (bodylen != sizeof(vbucket_state_t)) {
805             const std::string msg("Incorrect packet format");
806             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
807                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
808                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
809                                 cas, cookie);
810         }
811
812         vbucket_state_t state;
813         memcpy(&state, &req->message.body.state, sizeof(state));
814         state = static_cast<vbucket_state_t>(ntohl(state));
815
816         if (!is_valid_vbucket_state_t(state)) {
817             const std::string msg("Invalid vbucket state");
818             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
819                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
820                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
821                                 cas, cookie);
822         }
823
824         uint16_t vb = ntohs(req->message.header.request.vbucket);
825         if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
826             const std::string msg("VBucket number too big");
827             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
828                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
829                                 PROTOCOL_BINARY_RESPONSE_ERANGE,
830                                 cas, cookie);
831         }
832         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
833                             PROTOCOL_BINARY_RAW_BYTES,
834                             PROTOCOL_BINARY_RESPONSE_SUCCESS,
835                             cas, cookie);
836     }
837
838     static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
839                                         const void *cookie,
840                                         protocol_binary_request_header *req,
841                                         ADD_RESPONSE response) {
842
843         uint64_t cas = ntohll(req->request.cas);
844
845         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
846         uint16_t vbucket = ntohs(req->request.vbucket);
847
848         std::string msg = "";
849         if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
850             msg = "Incorrect packet format.";
851             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
852                                 msg.length(),
853                                 PROTOCOL_BINARY_RAW_BYTES,
854                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
855         }
856
857         bool sync = false;
858         uint32_t bodylen = ntohl(req->request.bodylen);
859         if (bodylen > 0) {
860             const char* ptr = reinterpret_cast<const char*>(req->bytes) +
861                 sizeof(req->bytes);
862             if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
863                 sync = true;
864             }
865         }
866
867         ENGINE_ERROR_CODE err;
868         void* es = e->getEngineSpecific(cookie);
869         if (sync) {
870             if (es == NULL) {
871                 err = e->deleteVBucket(vbucket, cookie);
872                 e->storeEngineSpecific(cookie, e);
873             } else {
874                 e->storeEngineSpecific(cookie, NULL);
875                 LOG(EXTENSION_LOG_INFO,
876                     "Completed sync deletion of vbucket %u",
877                     (unsigned)vbucket);
878                 err = ENGINE_SUCCESS;
879             }
880         } else {
881             err = e->deleteVBucket(vbucket);
882         }
883         switch (err) {
884         case ENGINE_SUCCESS:
885             LOG(EXTENSION_LOG_WARNING,
886                 "Deletion of vbucket %d was completed.", vbucket);
887             break;
888         case ENGINE_NOT_MY_VBUCKET:
889             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
890                 "because the vbucket doesn't exist!!!", vbucket);
891             res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
892             break;
893         case ENGINE_EINVAL:
894             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
895                 "because the vbucket is not in a dead state\n", vbucket);
896             msg = "Failed to delete vbucket.  Must be in the dead state.";
897             res = PROTOCOL_BINARY_RESPONSE_EINVAL;
898             break;
899         case ENGINE_EWOULDBLOCK:
900             LOG(EXTENSION_LOG_WARNING, "Requst to vbucket %d deletion is in"
901                 " EWOULDBLOCK until the database file is removed from disk",
902                 vbucket);
903             e->storeEngineSpecific(cookie, req);
904             return ENGINE_EWOULDBLOCK;
905         default:
906             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
907                 "because of unknown reasons\n", vbucket);
908             msg = "Failed to delete vbucket.  Unknown reason.";
909             res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
910         }
911
912         if (err != ENGINE_NOT_MY_VBUCKET) {
913             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
914                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
915                                 res, cas, cookie);
916         } else {
917             LockHolder lh(e->clusterConfig.lock);
918             return sendResponse(response, NULL, 0, NULL, 0,
919                                 e->clusterConfig.config,
920                                 e->clusterConfig.len,
921                                 PROTOCOL_BINARY_RAW_BYTES,
922                                 res, cas, cookie);
923         }
924
925     }
926
927     static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
928                                        protocol_binary_request_header *request,
929                                        const void *cookie,
930                                        Item **it,
931                                        const char **msg,
932                                        protocol_binary_response_status *res) {
933         EventuallyPersistentStore *eps = e->getEpStore();
934         protocol_binary_request_no_extras *req =
935             (protocol_binary_request_no_extras*)request;
936         int keylen = ntohs(req->message.header.request.keylen);
937         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
938         ENGINE_ERROR_CODE error_code;
939         std::string keystr(((char *)request) + sizeof(req->message.header),
940                             keylen);
941
942         GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
943
944         if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
945             if (error_code == ENGINE_NOT_MY_VBUCKET) {
946                 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
947                 return error_code;
948             } else if (error_code == ENGINE_TMPFAIL) {
949                 *msg = "NOT_FOUND";
950                 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
951             } else {
952                 return error_code;
953             }
954         } else {
955             *it = rv.getValue();
956             *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
957         }
958         ++(e->getEpStats().numOpsGet);
959         return ENGINE_SUCCESS;
960     }
961
962     static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
963                                        const void *cookie,
964                                        protocol_binary_request_compact_db *req,
965                                        ADD_RESPONSE response) {
966
967         std::string msg = "";
968         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
969         compaction_ctx compactreq;
970         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
971         uint64_t cas = ntohll(req->message.header.request.cas);
972
973         if (ntohs(req->message.header.request.keylen) > 0 ||
974              req->message.header.request.extlen != 24) {
975             LOG(EXTENSION_LOG_WARNING,
976                     "Compaction of vbucket %d received bad ext/key len %d/%d.",
977                     vbucket, req->message.header.request.extlen,
978                     ntohs(req->message.header.request.keylen));
979             msg = "Incorrect packet format.";
980             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
981                                 msg.length(),
982                                 PROTOCOL_BINARY_RAW_BYTES,
983                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
984         }
985         EPStats &stats = e->getEpStats();
986         compactreq.max_purged_seq = 0;
987         compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
988         compactreq.purge_before_seq =
989                                     ntohll(req->message.body.purge_before_seq);
990         compactreq.drop_deletes     = req->message.body.drop_deletes;
991
992         ENGINE_ERROR_CODE err;
993         void* es = e->getEngineSpecific(cookie);
994         if (es == NULL) {
995             ++stats.pendingCompactions;
996             e->storeEngineSpecific(cookie, e);
997             err = e->compactDB(vbucket, compactreq, cookie);
998         } else {
999             e->storeEngineSpecific(cookie, NULL);
1000             err = ENGINE_SUCCESS;
1001         }
1002
1003         switch (err) {
1004             case ENGINE_SUCCESS:
1005                 LOG(EXTENSION_LOG_INFO,
1006                     "Compaction of vbucket %d completed.", vbucket);
1007                 break;
1008             case ENGINE_NOT_MY_VBUCKET:
1009                 --stats.pendingCompactions;
1010                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1011                     "because the vbucket doesn't exist!!!", vbucket);
1012                 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1013                 break;
1014             case ENGINE_EWOULDBLOCK:
1015                 LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
1016                         "in EWOULDBLOCK state until the database file is "
1017                         "compacted on disk",
1018                         vbucket);
1019                 e->storeEngineSpecific(cookie, req);
1020                 return ENGINE_EWOULDBLOCK;
1021             case ENGINE_TMPFAIL:
1022                 LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
1023                         " a temporary failure and may need to be retried",
1024                         vbucket);
1025                 msg = "Temporary failure in compacting vbucket.";
1026                 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1027                 break;
1028             default:
1029                 --stats.pendingCompactions;
1030                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1031                     "because of unknown reasons\n", vbucket);
1032                 msg = "Failed to compact vbucket.  Unknown reason.";
1033                 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1034                 break;
1035         }
1036
1037         if (err != ENGINE_NOT_MY_VBUCKET) {
1038             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1039                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1040                                 res, cas, cookie);
1041         } else {
1042             LockHolder lh(e->clusterConfig.lock);
1043             return sendResponse(response, NULL, 0, NULL, 0,
1044                                 e->clusterConfig.config,
1045                                 e->clusterConfig.len,
1046                                 PROTOCOL_BINARY_RAW_BYTES,
1047                                 res, cas, cookie);
1048         }
1049     }
1050
1051     static ENGINE_ERROR_CODE processUnknownCommand(
1052                                        EventuallyPersistentEngine *h,
1053                                        const void* cookie,
1054                                        protocol_binary_request_header *request,
1055                                        ADD_RESPONSE response)
1056     {
1057         protocol_binary_response_status res =
1058                                       PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1059         const char *msg = NULL;
1060         size_t msg_size = 0;
1061         Item *itm = NULL;
1062
1063         EPStats &stats = h->getEpStats();
1064         ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1065
1066         /**
1067          * Session validation
1068          * (For ns_server commands only)
1069          */
1070         switch (request->request.opcode) {
1071             case PROTOCOL_BINARY_CMD_SET_PARAM:
1072             case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1073             case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1074             case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1075             case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1076             case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1077             case PROTOCOL_BINARY_CMD_COMPACT_DB:
1078             {
1079                 if (h->getEngineSpecific(cookie) == NULL) {
1080                     uint64_t cas = ntohll(request->request.cas);
1081                     if (!h->validateSessionCas(cas)) {
1082                         const std::string message("Invalid session token");
1083                         return sendResponse(response, NULL, 0, NULL, 0,
1084                                             message.c_str(), message.length(),
1085                                             PROTOCOL_BINARY_RAW_BYTES,
1086                                             PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1087                                             cas, cookie);
1088                     }
1089                 }
1090                 break;
1091             }
1092             default:
1093                 break;
1094         }
1095
1096         switch (request->request.opcode) {
1097         case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1098             return h->getAllVBucketSequenceNumbers(cookie, response);
1099
1100         case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1101             {
1102                 BlockTimer timer(&stats.getVbucketCmdHisto);
1103                 rv = getVBucket(h, cookie, request, response);
1104                 return rv;
1105             }
1106         case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1107             {
1108                 BlockTimer timer(&stats.delVbucketCmdHisto);
1109                 rv = delVBucket(h, cookie, request, response);
1110                 if (rv != ENGINE_EWOULDBLOCK) {
1111                     h->decrementSessionCtr();
1112                     h->storeEngineSpecific(cookie, NULL);
1113                 }
1114                 return rv;
1115             }
1116         case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1117             {
1118                 BlockTimer timer(&stats.setVbucketCmdHisto);
1119                 rv = setVBucket(h, cookie, request, response);
1120                 h->decrementSessionCtr();
1121                 return rv;
1122             }
1123         case PROTOCOL_BINARY_CMD_TOUCH:
1124         case PROTOCOL_BINARY_CMD_GAT:
1125         case PROTOCOL_BINARY_CMD_GATQ:
1126             {
1127                 rv = h->touch(cookie, request, response);
1128                 return rv;
1129             }
1130         case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1131             res = stopFlusher(h, &msg, &msg_size);
1132             break;
1133         case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1134             res = startFlusher(h, &msg, &msg_size);
1135             break;
1136         case PROTOCOL_BINARY_CMD_SET_PARAM:
1137             res = setParam(h,
1138                   reinterpret_cast<protocol_binary_request_set_param*>(request),
1139                             &msg, &msg_size);
1140             h->decrementSessionCtr();
1141             break;
1142         case PROTOCOL_BINARY_CMD_EVICT_KEY:
1143             res = evictKey(h, request, &msg, &msg_size);
1144             break;
1145         case PROTOCOL_BINARY_CMD_GET_LOCKED:
1146             rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1147             if (rv == ENGINE_EWOULDBLOCK) {
1148                 // we dont have the value for the item yet
1149                 return rv;
1150             }
1151             break;
1152         case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
1153             res = unlockKey(h, request, &msg, &msg_size);
1154             break;
1155         case PROTOCOL_BINARY_CMD_OBSERVE:
1156             return h->observe(cookie, request, response);
1157         case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1158             {
1159                 rv = h->deregisterTapClient(cookie, request, response);
1160                 h->decrementSessionCtr();
1161                 return rv;
1162             }
1163         case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN:
1164             {
1165                 rv = h->resetReplicationChain(cookie, request, response);
1166                 return rv;
1167             }
1168         case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1169             {
1170                 rv = h->changeTapVBFilter(cookie, request, response);
1171                 h->decrementSessionCtr();
1172                 return rv;
1173             }
1174         case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1175         case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1176         case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE:
1177             {
1178                 rv = h->handleCheckpointCmds(cookie, request, response);
1179                 return rv;
1180             }
1181         case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE:
1182             {
1183                 rv = h->handleSeqnoCmds(cookie, request, response);
1184                 return rv;
1185             }
1186         case PROTOCOL_BINARY_CMD_GET_META:
1187         case PROTOCOL_BINARY_CMD_GETQ_META:
1188             {
1189                 rv = h->getMeta(cookie,
1190                         reinterpret_cast<protocol_binary_request_get_meta*>
1191                                                           (request), response);
1192                 return rv;
1193             }
1194         case PROTOCOL_BINARY_CMD_SET_WITH_META:
1195         case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1196         case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1197         case PROTOCOL_BINARY_CMD_ADDQ_WITH_META:
1198             {
1199                 rv = h->setWithMeta(cookie,
1200                      reinterpret_cast<protocol_binary_request_set_with_meta*>
1201                                                           (request), response);
1202                 return rv;
1203             }
1204         case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1205         case PROTOCOL_BINARY_CMD_DELQ_WITH_META:
1206             {
1207                 rv = h->deleteWithMeta(cookie,
1208                     reinterpret_cast<protocol_binary_request_delete_with_meta*>
1209                                                           (request), response);
1210                 return rv;
1211             }
1212         case PROTOCOL_BINARY_CMD_RETURN_META:
1213             {
1214                 return h->returnMeta(cookie,
1215                 reinterpret_cast<protocol_binary_request_return_meta*>
1216                                                           (request), response);
1217             }
1218         case PROTOCOL_BINARY_CMD_GET_REPLICA:
1219             rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1220             if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1221                 return rv;
1222             }
1223             break;
1224         case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1225         case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC:
1226             {
1227                 rv = h->handleTrafficControlCmd(cookie, request, response);
1228                 return rv;
1229             }
1230         case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1231             {
1232                 rv = h->setClusterConfig(cookie,
1233                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1234                                                           (request), response);
1235                 h->decrementSessionCtr();
1236                 return rv;
1237             }
1238         case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1239             return h->getClusterConfig(cookie,
1240                reinterpret_cast<protocol_binary_request_get_cluster_config*>
1241                                                           (request), response);
1242         case PROTOCOL_BINARY_CMD_COMPACT_DB:
1243             {
1244                 rv = compactDB(h, cookie,
1245                                (protocol_binary_request_compact_db*)(request),
1246                                response);
1247                 if (rv != ENGINE_EWOULDBLOCK) {
1248                     h->decrementSessionCtr();
1249                     h->storeEngineSpecific(cookie, NULL);
1250                 }
1251                 return rv;
1252             }
1253         case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
1254             if (request->request.extlen != 0 ||
1255                 request->request.keylen != 0 ||
1256                 request->request.bodylen != 0) {
1257                 return ENGINE_EINVAL;
1258             }
1259
1260             return h->getRandomKey(cookie, response);
1261         case CMD_GET_KEYS:
1262             return h->getAllKeys(cookie,
1263                reinterpret_cast<protocol_binary_request_get_keys*>(request),
1264                                                                    response);
1265         }
1266
1267         // Send a special response for getl since we don't want to send the key
1268         if (itm && request->request.opcode == PROTOCOL_BINARY_CMD_GET_LOCKED) {
1269             uint32_t flags = itm->getFlags();
1270             rv = sendResponse(response, NULL, 0, (const void *)&flags,
1271                               sizeof(uint32_t),
1272                               static_cast<const void *>(itm->getData()),
1273                               itm->getNBytes(), itm->getDataType(),
1274                               static_cast<uint16_t>(res), itm->getCas(),
1275                               cookie);
1276             delete itm;
1277         } else if (itm) {
1278             const std::string &key  = itm->getKey();
1279             uint32_t flags = itm->getFlags();
1280             rv = sendResponse(response, static_cast<const void *>(key.data()),
1281                               itm->getNKey(),
1282                               (const void *)&flags, sizeof(uint32_t),
1283                               static_cast<const void *>(itm->getData()),
1284                               itm->getNBytes(), itm->getDataType(),
1285                               static_cast<uint16_t>(res), itm->getCas(),
1286                               cookie);
1287             delete itm;
1288         } else  if (rv == ENGINE_NOT_MY_VBUCKET) {
1289             LockHolder lh(h->clusterConfig.lock);
1290             return sendResponse(response, NULL, 0, NULL, 0,
1291                                 h->clusterConfig.config,
1292                                 h->clusterConfig.len,
1293                                 PROTOCOL_BINARY_RAW_BYTES,
1294                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1295                                 cookie);
1296         } else {
1297             msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1298             rv = sendResponse(response, NULL, 0, NULL, 0,
1299                               msg, static_cast<uint16_t>(msg_size),
1300                               PROTOCOL_BINARY_RAW_BYTES,
1301                               static_cast<uint16_t>(res), 0, cookie);
1302
1303         }
1304         return rv;
1305     }
1306
1307     static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1308                                                const void* cookie,
1309                                                protocol_binary_request_header
1310                                                                       *request,
1311                                                ADD_RESPONSE response)
1312     {
1313         ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1314                                                            cookie,
1315                                                            request, response);
1316         releaseHandle(handle);
1317         return err_code;
1318     }
1319
1320     static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1321                               item *itm, uint64_t cas) {
1322         static_cast<Item*>(itm)->setCas(cas);
1323     }
1324
1325     static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1326                                           const void *cookie,
1327                                           void *engine_specific,
1328                                           uint16_t nengine,
1329                                           uint8_t ttl,
1330                                           uint16_t tap_flags,
1331                                           tap_event_t tap_event,
1332                                           uint32_t tap_seqno,
1333                                           const void *key,
1334                                           size_t nkey,
1335                                           uint32_t flags,
1336                                           uint32_t exptime,
1337                                           uint64_t cas,
1338                                           uint8_t datatype,
1339                                           const void *data,
1340                                           size_t ndata,
1341                                           uint16_t vbucket)
1342     {
1343         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1344             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1345                     " (TapNotify)");
1346             return ENGINE_EINVAL;
1347         }
1348         ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1349                                                         engine_specific,
1350                                                         nengine, ttl,
1351                                                         tap_flags,
1352                                                         (uint16_t)tap_event,
1353                                                         tap_seqno,
1354                                                         key, nkey, flags,
1355                                                         exptime, cas,
1356                                                         datatype, data,
1357                                                         ndata, vbucket);
1358         releaseHandle(handle);
1359         return err_code;
1360     }
1361
1362     static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1363                                       const void *cookie, item **itm,
1364                                       void **es, uint16_t *nes, uint8_t *ttl,
1365                                       uint16_t *flags, uint32_t *seqno,
1366                                       uint16_t *vbucket) {
1367         uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1368                                                              nes, ttl,
1369                                                              flags, seqno,
1370                                                              vbucket);
1371         releaseHandle(handle);
1372         return static_cast<tap_event_t>(tap_event);
1373     }
1374
1375     static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1376                                           const void* cookie,
1377                                           const void* client,
1378                                           size_t nclient,
1379                                           uint32_t flags,
1380                                           const void* userdata,
1381                                           size_t nuserdata)
1382     {
1383         EventuallyPersistentEngine *h = getHandle(handle);
1384         TAP_ITERATOR iterator = NULL;
1385         {
1386             std::string c(static_cast<const char*>(client), nclient);
1387             // Figure out what we want from the userdata before adding it to
1388             // the API to the handle
1389             if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1390                 iterator = EvpTapIterator;
1391             }
1392         }
1393         releaseHandle(handle);
1394         return iterator;
1395     }
1396
1397
1398     static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1399                                        const void* cookie,
1400                                        struct dcp_message_producers *producers)
1401     {
1402         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1403         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1404         if (conn) {
1405             errCode = conn->step(producers);
1406         }
1407         releaseHandle(handle);
1408         return errCode;
1409     }
1410
1411
1412     static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1413                                         const void* cookie,
1414                                         uint32_t opaque,
1415                                         uint32_t seqno,
1416                                         uint32_t flags,
1417                                         void *name,
1418                                         uint16_t nname)
1419     {
1420         ENGINE_ERROR_CODE errCode;
1421         errCode = getHandle(handle)->dcpOpen(cookie, opaque, seqno, flags,
1422                                              name, nname);
1423         releaseHandle(handle);
1424         return errCode;
1425     }
1426
1427     static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1428                                              const void* cookie,
1429                                              uint32_t opaque,
1430                                              uint16_t vbucket,
1431                                              uint32_t flags)
1432     {
1433         ENGINE_ERROR_CODE errCode = getHandle(handle)->dcpAddStream(cookie,
1434                                                                     opaque,
1435                                                                     vbucket,
1436                                                                     flags);
1437         releaseHandle(handle);
1438         return errCode;
1439     }
1440
1441     static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1442                                                const void* cookie,
1443                                                uint32_t opaque,
1444                                                uint16_t vbucket)
1445     {
1446         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1447         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1448         if (conn) {
1449             errCode = conn->closeStream(opaque, vbucket);
1450         }
1451         releaseHandle(handle);
1452         return errCode;
1453     }
1454
1455
1456     static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1457                                              const void* cookie,
1458                                              uint32_t flags,
1459                                              uint32_t opaque,
1460                                              uint16_t vbucket,
1461                                              uint64_t startSeqno,
1462                                              uint64_t endSeqno,
1463                                              uint64_t vbucketUuid,
1464                                              uint64_t snapStartSeqno,
1465                                              uint64_t snapEndSeqno,
1466                                              uint64_t *rollbackSeqno,
1467                                              dcp_add_failover_log callback)
1468     {
1469         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1470         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1471         if (conn) {
1472             errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1473                                           endSeqno, vbucketUuid, snapStartSeqno,
1474                                           snapEndSeqno, rollbackSeqno, callback);
1475         }
1476         releaseHandle(handle);
1477         return errCode;
1478     }
1479
1480     static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1481                                                  const void* cookie,
1482                                                  uint32_t opaque,
1483                                                  uint16_t vbucket,
1484                                                  dcp_add_failover_log callback)
1485     {
1486         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1487         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1488         if (conn) {
1489             errCode = conn->getFailoverLog(opaque, vbucket, callback);
1490         }
1491         releaseHandle(handle);
1492         return errCode;
1493     }
1494
1495
1496     static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1497                                              const void* cookie,
1498                                              uint32_t opaque,
1499                                              uint16_t vbucket,
1500                                              uint32_t flags)
1501     {
1502         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1503         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1504         if (conn) {
1505             errCode = conn->streamEnd(opaque, vbucket, flags);
1506         }
1507         releaseHandle(handle);
1508         return errCode;
1509     }
1510
1511
1512     static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1513                                                   const void* cookie,
1514                                                   uint32_t opaque,
1515                                                   uint16_t vbucket,
1516                                                   uint64_t start_seqno,
1517                                                   uint64_t end_seqno,
1518                                                   uint32_t flags)
1519     {
1520         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1521         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1522         if (conn) {
1523             errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1524                                            end_seqno, flags);
1525         }
1526         releaseHandle(handle);
1527         return errCode;
1528     }
1529
1530     static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1531                                             const void* cookie,
1532                                             uint32_t opaque,
1533                                             const void *key,
1534                                             uint16_t nkey,
1535                                             const void *value,
1536                                             uint32_t nvalue,
1537                                             uint64_t cas,
1538                                             uint16_t vbucket,
1539                                             uint32_t flags,
1540                                             uint8_t datatype,
1541                                             uint64_t bySeqno,
1542                                             uint64_t revSeqno,
1543                                             uint32_t expiration,
1544                                             uint32_t lockTime,
1545                                             const void *meta,
1546                                             uint16_t nmeta,
1547                                             uint8_t nru)
1548     {
1549         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1550             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1551                     " (DCPMutation)");
1552             return ENGINE_EINVAL;
1553         }
1554         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1555         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1556         if (conn) {
1557             errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1558                                      vbucket, flags, datatype, lockTime,
1559                                      bySeqno, revSeqno, expiration,
1560                                      nru, meta, nmeta);
1561         }
1562         releaseHandle(handle);
1563         return errCode;
1564     }
1565
1566     static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1567                                             const void* cookie,
1568                                             uint32_t opaque,
1569                                             const void *key,
1570                                             uint16_t nkey,
1571                                             uint64_t cas,
1572                                             uint16_t vbucket,
1573                                             uint64_t bySeqno,
1574                                             uint64_t revSeqno,
1575                                             const void *meta,
1576                                             uint16_t nmeta)
1577     {
1578         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1579         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1580         if (conn) {
1581             errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1582                                      revSeqno, meta, nmeta);
1583         }
1584         releaseHandle(handle);
1585         return errCode;
1586     }
1587
1588     static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1589                                               const void* cookie,
1590                                               uint32_t opaque,
1591                                               const void *key,
1592                                               uint16_t nkey,
1593                                               uint64_t cas,
1594                                               uint16_t vbucket,
1595                                               uint64_t bySeqno,
1596                                               uint64_t revSeqno,
1597                                               const void *meta,
1598                                               uint16_t nmeta)
1599     {
1600         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1601         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1602         if (conn) {
1603             errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1604                                        revSeqno, meta, nmeta);
1605         }
1606         releaseHandle(handle);
1607         return errCode;
1608     }
1609
1610     static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1611                                          const void* cookie,
1612                                          uint32_t opaque,
1613                                          uint16_t vbucket)
1614     {
1615         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1616         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1617         if (conn) {
1618             errCode = conn->flushall(opaque, vbucket);
1619         }
1620         releaseHandle(handle);
1621         return errCode;
1622     }
1623
1624     static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1625                                                    const void* cookie,
1626                                                    uint32_t opaque,
1627                                                    uint16_t vbucket,
1628                                                    vbucket_state_t state)
1629     {
1630         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1631         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1632         if (conn) {
1633             errCode = conn->setVBucketState(opaque, vbucket, state);
1634         }
1635         releaseHandle(handle);
1636         return errCode;
1637     }
1638
1639     static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1640                                         const void* cookie,
1641                                         uint32_t opaque) {
1642         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1643         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1644         if (conn) {
1645             errCode = conn->noop(opaque);
1646         }
1647         releaseHandle(handle);
1648         return errCode;
1649     }
1650
1651     static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1652                                                          const void* cookie,
1653                                                          uint32_t opaque,
1654                                                          uint16_t vbucket,
1655                                                          uint32_t buffer_bytes) {
1656         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1657         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1658         if (conn) {
1659             errCode = conn->bufferAcknowledgement(opaque, vbucket,
1660                                                   buffer_bytes);
1661         }
1662         releaseHandle(handle);
1663         return errCode;
1664     }
1665
1666     static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1667                                            const void* cookie,
1668                                            uint32_t opaque,
1669                                            const void *key,
1670                                            uint16_t nkey,
1671                                            const void *value,
1672                                            uint32_t nvalue) {
1673         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1674         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1675         if (conn) {
1676             errCode = conn->control(opaque, key, nkey, value, nvalue);
1677         }
1678         releaseHandle(handle);
1679         return errCode;
1680     }
1681
1682     static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1683                                      const void* cookie,
1684                                      protocol_binary_response_header *response)
1685     {
1686         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1687         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1688         if (conn) {
1689             errCode = conn->handleResponse(response);
1690         }
1691         releaseHandle(handle);
1692         return errCode;
1693     }
1694
1695     static void EvpHandleDisconnect(const void *cookie,
1696                                     ENGINE_EVENT_TYPE type,
1697                                     const void *event_data,
1698                                     const void *cb_data)
1699     {
1700         cb_assert(type == ON_DISCONNECT);
1701         cb_assert(event_data == NULL);
1702         void *c = const_cast<void*>(cb_data);
1703         getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1704         releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1705     }
1706
1707
1708     /**
1709      * The only public interface to the eventually persistance engine.
1710      * Allocate a new instance and initialize it
1711      * @param interface the highest interface the server supports (we only
1712      *                  support interface 1)
1713      * @param get_server_api callback function to get the server exported API
1714      *                  functions
1715      * @param handle Where to return the new instance
1716      * @return ENGINE_SUCCESS on success
1717      */
1718     ENGINE_ERROR_CODE create_instance(uint64_t interface,
1719                                       GET_SERVER_API get_server_api,
1720                                       ENGINE_HANDLE **handle)
1721     {
1722         SERVER_HANDLE_V1 *api = get_server_api();
1723         if (interface != 1 || api == NULL) {
1724             return ENGINE_ENOTSUP;
1725         }
1726
1727         hooksApi = api->alloc_hooks;
1728         EventuallyPersistentEngine::loggerApi = api->log;
1729         MemoryTracker::getInstance();
1730         ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1731
1732         AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1733
1734         ObjectRegistry::setStats(inital_tracking);
1735         EventuallyPersistentEngine *engine;
1736         engine = new EventuallyPersistentEngine(get_server_api);
1737         ObjectRegistry::setStats(NULL);
1738
1739         if (engine == NULL) {
1740             return ENGINE_ENOMEM;
1741         }
1742
1743         if (MemoryTracker::trackingMemoryAllocations()) {
1744             engine->getEpStats().memoryTrackerEnabled.store(true);
1745             engine->getEpStats().totalMemory.store(inital_tracking->load());
1746         }
1747         delete inital_tracking;
1748
1749         initialize_time_functions(api->core);
1750
1751         *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1752
1753         return ENGINE_SUCCESS;
1754     }
1755
1756     /*
1757         This method is called prior to unloading of the shared-object.
1758         Global clean-up should be performed from this method.
1759      */
1760     void destroy_engine() {
1761         ExecutorPool::shutdown();
1762     }
1763
1764     static bool EvpGetItemInfo(ENGINE_HANDLE *, const void *,
1765                                const item* itm, item_info *itm_info)
1766     {
1767         const Item *it = reinterpret_cast<const Item*>(itm);
1768         if (itm_info->nvalue < 1) {
1769             return false;
1770         }
1771         itm_info->cas = it->getCas();
1772         itm_info->exptime = it->getExptime();
1773         itm_info->nbytes = it->getNBytes();
1774         itm_info->datatype = it->getDataType();
1775         itm_info->flags = it->getFlags();
1776         itm_info->clsid = 0;
1777         itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1778         itm_info->nvalue = 1;
1779         itm_info->key = it->getKey().c_str();
1780         itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1781         itm_info->value[0].iov_len = it->getNBytes();
1782         return true;
1783     }
1784
1785     static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1786                                item* itm, const item_info *itm_info)
1787     {
1788         Item *it = reinterpret_cast<Item*>(itm);
1789         if (!it) {
1790             return false;
1791         }
1792         it->setDataType(itm_info->datatype);
1793         return true;
1794     }
1795
1796     static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1797                                                  const void* cookie,
1798                                                  engine_get_vb_map_cb callback)
1799     {
1800         EventuallyPersistentEngine *h = getHandle(handle);
1801         LockHolder lh(h->clusterConfig.lock);
1802         uint8_t *config = h->clusterConfig.config;
1803         uint32_t len = h->clusterConfig.len;
1804         releaseHandle(handle);
1805         return callback(cookie, config, len);
1806     }
1807
1808 } // C linkage
1809
1810 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1811     char buffer[2048];
1812
1813     if (EventuallyPersistentEngine::loggerApi != NULL) {
1814         EXTENSION_LOGGER_DESCRIPTOR* logger =
1815             (EXTENSION_LOGGER_DESCRIPTOR*)EventuallyPersistentEngine::loggerApi->get_logger();
1816
1817         if (EventuallyPersistentEngine::loggerApi->get_level() <= severity) {
1818             EventuallyPersistentEngine *engine = ObjectRegistry::onSwitchThread(NULL, true);
1819             va_list va;
1820             va_start(va, fmt);
1821             vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1822             if (engine) {
1823                 logger->log(severity, NULL, "(%s) %s", engine->getName(),
1824                             buffer);
1825             } else {
1826                 logger->log(severity, NULL, "(No Engine) %s", buffer);
1827             }
1828             va_end(va);
1829             ObjectRegistry::onSwitchThread(engine);
1830         }
1831     }
1832 }
1833
1834 ALLOCATOR_HOOKS_API *getHooksApi(void) {
1835     return hooksApi;
1836 }
1837
1838 EventuallyPersistentEngine::EventuallyPersistentEngine(
1839                                     GET_SERVER_API get_server_api) :
1840     clusterConfig(), epstore(NULL), workload(NULL),
1841     workloadPriority(NO_BUCKET_PRIORITY),
1842     tapThrottle(NULL), getServerApiFunc(get_server_api),
1843     dcpConnMap_(NULL), tapConnMap(NULL), tapConfig(NULL), checkpointConfig(NULL),
1844     trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1845 {
1846     interface.interface = 1;
1847     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1848     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1849     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1850     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1851     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1852     ENGINE_HANDLE_V1::release = EvpItemRelease;
1853     ENGINE_HANDLE_V1::get = EvpGet;
1854     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1855     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1856     ENGINE_HANDLE_V1::store = EvpStore;
1857     ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1858     ENGINE_HANDLE_V1::flush = EvpFlush;
1859     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1860     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1861     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1862     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1863     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1864     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1865     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1866     ENGINE_HANDLE_V1::get_stats_struct = NULL;
1867     ENGINE_HANDLE_V1::errinfo = NULL;
1868     ENGINE_HANDLE_V1::aggregate_stats = NULL;
1869
1870
1871     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1872     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1873     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1874     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1875     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1876     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1877     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1878     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1879     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1880     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1881     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1882     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1883     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1884     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1885     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1886     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1887     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1888
1889     serverApi = getServerApiFunc();
1890     memset(&info, 0, sizeof(info));
1891     info.info.description = "EP engine v" VERSION;
1892     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1893     info.info.features[info.info.num_features++].feature =
1894                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1895     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1896     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1897
1898 }
1899
1900 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1901 {
1902     EventuallyPersistentEngine *epe =
1903                                     ObjectRegistry::onSwitchThread(NULL, true);
1904     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1905     ObjectRegistry::onSwitchThread(epe);
1906     return rv;
1907 }
1908
1909 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
1910 {
1911     EventuallyPersistentEngine *epe =
1912                                     ObjectRegistry::onSwitchThread(NULL, true);
1913     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
1914     ObjectRegistry::onSwitchThread(epe);
1915     return rv;
1916 }
1917
1918 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
1919                                                         EVENT_CALLBACK cb,
1920                                                         const void *cb_data) {
1921     EventuallyPersistentEngine *epe =
1922                                     ObjectRegistry::onSwitchThread(NULL, true);
1923     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
1924     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
1925                             type, cb, cb_data);
1926     ObjectRegistry::onSwitchThread(epe);
1927 }
1928
1929 /**
1930  * A configuration value changed listener that responds to ep-engine
1931  * parameter changes by invoking engine-specific methods on
1932  * configuration change events.
1933  */
1934 class EpEngineValueChangeListener : public ValueChangedListener {
1935 public:
1936     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
1937         // EMPTY
1938     }
1939
1940     virtual void sizeValueChanged(const std::string &key, size_t value) {
1941         if (key.compare("getl_max_timeout") == 0) {
1942             engine.setGetlMaxTimeout(value);
1943         } else if (key.compare("getl_default_timeout") == 0) {
1944             engine.setGetlDefaultTimeout(value);
1945         } else if (key.compare("max_item_size") == 0) {
1946             engine.setMaxItemSize(value);
1947         }
1948     }
1949
1950     virtual void booleanValueChanged(const std::string &key, bool value) {
1951         if (key.compare("flushall_enabled") == 0) {
1952             engine.setFlushAll(value);
1953         }
1954     }
1955 private:
1956     EventuallyPersistentEngine &engine;
1957 };
1958
1959
1960
1961 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
1962     resetStats();
1963     if (config != NULL) {
1964         if (!configuration.parseConfiguration(config, serverApi)) {
1965             return ENGINE_FAILED;
1966         }
1967     }
1968
1969     name = configuration.getCouchBucket();
1970     maxFailoverEntries = configuration.getMaxFailoverEntries();
1971
1972     // Start updating the variables from the config!
1973     HashTable::setDefaultNumBuckets(configuration.getHtSize());
1974     HashTable::setDefaultNumLocks(configuration.getHtLocks());
1975     StoredValue::setMutationMemoryThreshold(
1976                                       configuration.getMutationMemThreshold());
1977
1978     if (configuration.getMaxSize() == 0) {
1979         configuration.setMaxSize(std::numeric_limits<size_t>::max());
1980     }
1981
1982     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
1983         configuration.setMemLowWat(percentOf(
1984                                             configuration.getMaxSize(), 0.75));
1985     }
1986
1987     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
1988         configuration.setMemHighWat(percentOf(
1989                                             configuration.getMaxSize(), 0.85));
1990     }
1991
1992     maxItemSize = configuration.getMaxItemSize();
1993     configuration.addValueChangedListener("max_item_size",
1994                                        new EpEngineValueChangeListener(*this));
1995
1996     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
1997     configuration.addValueChangedListener("getl_default_timeout",
1998                                        new EpEngineValueChangeListener(*this));
1999     getlMaxTimeout = configuration.getGetlMaxTimeout();
2000     configuration.addValueChangedListener("getl_max_timeout",
2001                                        new EpEngineValueChangeListener(*this));
2002
2003     flushAllEnabled = configuration.isFlushallEnabled();
2004     configuration.addValueChangedListener("flushall_enabled",
2005                                        new EpEngineValueChangeListener(*this));
2006
2007     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2008                                   configuration.getMaxNumShards());
2009     if ((unsigned int)workload->getNumShards() >
2010                                               configuration.getMaxVbuckets()) {
2011         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2012             "equal or less than max number of vbuckets");
2013         return ENGINE_FAILED;
2014     }
2015
2016     dcpConnMap_ = new DcpConnMap(*this);
2017     tapConnMap = new TapConnMap(*this);
2018     tapConfig = new TapConfig(*this);
2019     tapThrottle = new TapThrottle(configuration, stats);
2020     TapConfig::addConfigChangeListener(*this);
2021
2022     checkpointConfig = new CheckpointConfig(*this);
2023     CheckpointConfig::addConfigChangeListener(*this);
2024
2025     epstore = new EventuallyPersistentStore(*this);
2026     if (epstore == NULL) {
2027         return ENGINE_ENOMEM;
2028     }
2029
2030     initializeEngineCallbacks();
2031
2032     // Complete the initialization of the ep-store
2033     if (!epstore->initialize()) {
2034         return ENGINE_FAILED;
2035     }
2036
2037     if(configuration.isDataTrafficEnabled()) {
2038         enableTraffic(true);
2039     }
2040
2041     tapConnMap->initialize(TAP_CONN_NOTIFIER);
2042     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2043
2044     // record engine initialization time
2045     startupTime.store(ep_real_time());
2046
2047     LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
2048
2049     return ENGINE_SUCCESS;
2050 }
2051
2052 void EventuallyPersistentEngine::destroy(bool force) {
2053     stats.forceShutdown = force;
2054     stats.isShutdown = true;
2055
2056     if (epstore) {
2057         epstore->snapshotStats();
2058     }
2059     if (tapConnMap) {
2060         tapConnMap->shutdownAllConnections();
2061     }
2062     if (dcpConnMap_) {
2063         dcpConnMap_->shutdownAllConnections();
2064     }
2065 }
2066
2067 class FlushAllTask : public GlobalTask {
2068 public:
2069     FlushAllTask(EventuallyPersistentStore *st, TapConnMap &tcm, double when)
2070         : GlobalTask(&st->getEPEngine(), Priority::FlushAllPriority, when,
2071                      false), epstore(st), tapConnMap(tcm) { }
2072
2073     bool run(void) {
2074         epstore->reset();
2075         tapConnMap.addFlushEvent();
2076         return false;
2077     }
2078
2079     std::string getDescription() {
2080         return std::string("Performing flush.");
2081     }
2082
2083 private:
2084     EventuallyPersistentStore *epstore;
2085     TapConnMap                &tapConnMap;
2086 };
2087
2088 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *, time_t when){
2089     if (!flushAllEnabled) {
2090         return ENGINE_ENOTSUP;
2091     }
2092
2093     if (isDegradedMode()) {
2094         return ENGINE_TMPFAIL;
2095     }
2096
2097     if (when == 0) {
2098         epstore->reset();
2099         tapConnMap->addFlushEvent();
2100     } else {
2101         ExTask flushTask = new FlushAllTask(epstore, *tapConnMap,
2102                 static_cast<double>(when));
2103         ExecutorPool::get()->schedule(flushTask, NONIO_TASK_IDX);
2104     }
2105
2106     return ENGINE_SUCCESS;
2107 }
2108
2109 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2110                                                     item* itm,
2111                                                     uint64_t *cas,
2112                                                     ENGINE_STORE_OPERATION
2113                                                                      operation,
2114                                                     uint16_t vbucket) {
2115     BlockTimer timer(&stats.storeCmdHisto);
2116     ENGINE_ERROR_CODE ret;
2117     Item *it = static_cast<Item*>(itm);
2118     item *i = NULL;
2119
2120     it->setVBucketId(vbucket);
2121
2122     switch (operation) {
2123     case OPERATION_CAS:
2124         if (it->getCas() == 0) {
2125             // Using a cas command with a cas wildcard doesn't make sense
2126             ret = ENGINE_NOT_STORED;
2127             break;
2128         }
2129         // FALLTHROUGH
2130     case OPERATION_SET:
2131         if (isDegradedMode()) {
2132             return ENGINE_TMPFAIL;
2133         }
2134         ret = epstore->set(*it, cookie);
2135         if (ret == ENGINE_SUCCESS) {
2136             *cas = it->getCas();
2137         }
2138
2139         break;
2140
2141     case OPERATION_ADD:
2142         if (isDegradedMode()) {
2143             return ENGINE_TMPFAIL;
2144         }
2145
2146         if (it->getCas() != 0) {
2147             // Adding an item with a cas value doesn't really make sense...
2148             return ENGINE_KEY_EEXISTS;
2149         }
2150
2151         ret = epstore->add(*it, cookie);
2152         if (ret == ENGINE_SUCCESS) {
2153             *cas = it->getCas();
2154         }
2155         break;
2156
2157     case OPERATION_REPLACE:
2158         ret = epstore->replace(*it, cookie);
2159         if (ret == ENGINE_SUCCESS) {
2160             *cas = it->getCas();
2161         }
2162         break;
2163
2164     case OPERATION_APPEND:
2165     case OPERATION_PREPEND:
2166         do {
2167             if ((ret = get(cookie, &i, it->getKey().c_str(),
2168                            it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2169                 Item *old = reinterpret_cast<Item*>(i);
2170
2171                 if (old->getCas() == (uint64_t) -1) {
2172                     // item is locked against updates
2173                     itemRelease(cookie, i);
2174                     return ENGINE_TMPFAIL;
2175                 }
2176
2177                 if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2178                     itemRelease(cookie, i);
2179                     return ENGINE_KEY_EEXISTS;
2180                 }
2181
2182                 if (operation == OPERATION_APPEND) {
2183                     ret = old->append(*it, maxItemSize);
2184                 } else {
2185                     ret = old->prepend(*it, maxItemSize);
2186                 }
2187
2188                 if (ret != ENGINE_SUCCESS) {
2189                     itemRelease(cookie, i);
2190                     if (ret == ENGINE_E2BIG) {
2191                         return ret;
2192                     } else {
2193                         return memoryCondition();
2194                     }
2195                 } else {
2196                     if (old->getDataType() == PROTOCOL_BINARY_DATATYPE_JSON) {
2197                         // Set the datatype of the new document to BINARY (0),
2198                         // as appending/prepending anything to JSON breaks the
2199                         // json data structure.
2200                         old->setDataType(PROTOCOL_BINARY_RAW_BYTES);
2201                     } else if (old->getDataType() ==
2202                                     PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
2203                         // Set the datatype of the new document to
2204                         // COMPRESSED_BINARY, as appending/prepending anything
2205                         // to JSON breaks the json data structure.
2206                         old->setDataType(PROTOCOL_BINARY_DATATYPE_COMPRESSED);
2207                     }
2208                 }
2209
2210                 ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2211                 itemRelease(cookie, i);
2212             }
2213         } while (ret == ENGINE_KEY_EEXISTS);
2214
2215         // Map the error code back to what memcacpable expects
2216         if (ret == ENGINE_KEY_ENOENT) {
2217             ret = ENGINE_NOT_STORED;
2218         }
2219         break;
2220
2221     default:
2222         ret = ENGINE_ENOTSUP;
2223     }
2224
2225     switch (ret) {
2226     case ENGINE_SUCCESS:
2227         ++stats.numOpsStore;
2228         break;
2229     case ENGINE_ENOMEM:
2230         ret = memoryCondition();
2231         break;
2232     case ENGINE_NOT_STORED:
2233     case ENGINE_NOT_MY_VBUCKET:
2234         if (isDegradedMode()) {
2235             return ENGINE_TMPFAIL;
2236         }
2237         break;
2238     default:
2239         break;
2240     }
2241
2242     return ret;
2243 }
2244
2245 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2246                                                            item **itm,
2247                                                            void **es,
2248                                                            uint16_t *nes,
2249                                                            uint8_t *ttl,
2250                                                            uint16_t *flags,
2251                                                            uint32_t *seqno,
2252                                                            uint16_t *vbucket,
2253                                                            TapProducer
2254                                                                    *connection,
2255                                                            bool &retry) {
2256     *es = NULL;
2257     *nes = 0;
2258     *ttl = (uint8_t)-1;
2259     *seqno = 0;
2260     *flags = 0;
2261     *vbucket = 0;
2262
2263     retry = false;
2264
2265     if (connection->shouldFlush()) {
2266         return TAP_FLUSH;
2267     }
2268
2269     if (connection->isTimeForNoop()) {
2270         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2271             connection->logHeader());
2272         return TAP_NOOP;
2273     }
2274
2275     if (connection->isSuspended() || connection->windowIsFull()) {
2276         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2277             " suspended state or its ack windows is full.\n",
2278             connection->logHeader());
2279         return TAP_PAUSE;
2280     }
2281
2282     uint16_t ret = TAP_PAUSE;
2283     VBucketEvent ev = connection->nextVBucketHighPriority();
2284     if (ev.event != TAP_PAUSE) {
2285         switch (ev.event) {
2286         case TAP_VBUCKET_SET:
2287             LOG(EXTENSION_LOG_WARNING,
2288                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2289                 connection->logHeader(), ev.vbucket,
2290                 VBucket::toString(ev.state));
2291             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2292             break;
2293         case TAP_OPAQUE:
2294             LOG(EXTENSION_LOG_WARNING,
2295                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2296                 connection->logHeader(),
2297                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2298                 ev.vbucket);
2299             connection->opaqueCommandCode = (uint32_t) ev.state;
2300             *vbucket = ev.vbucket;
2301             *es = &connection->opaqueCommandCode;
2302             *nes = sizeof(connection->opaqueCommandCode);
2303             break;
2304         default:
2305             LOG(EXTENSION_LOG_WARNING,
2306                 "%s Unknown VBucketEvent message type %d\n",
2307                 connection->logHeader(), ev.event);
2308             abort();
2309         }
2310         return ev.event;
2311     }
2312
2313     if (connection->waitForOpaqueMsgAck()) {
2314         return TAP_PAUSE;
2315     }
2316
2317     VBucketFilter backFillVBFilter;
2318     if (connection->runBackfill(backFillVBFilter)) {
2319         queueBackfill(backFillVBFilter, connection);
2320     }
2321
2322     uint8_t nru = INITIAL_NRU_VALUE;
2323     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2324     switch (ret) {
2325     case TAP_CHECKPOINT_START:
2326     case TAP_CHECKPOINT_END:
2327     case TAP_MUTATION:
2328     case TAP_DELETION:
2329         *itm = it;
2330         if (ret == TAP_MUTATION) {
2331             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2332                                                        it->getRevSeqno(), nru);
2333             *es = connection->specificData;
2334         } else if (ret == TAP_DELETION) {
2335             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2336                                                        it->getRevSeqno());
2337             *es = connection->specificData;
2338         } else if (ret == TAP_CHECKPOINT_START) {
2339             // Send the current value of the max deleted seqno
2340             RCPtr<VBucket> vb = getVBucket(*vbucket);
2341             if (!vb) {
2342                 retry = true;
2343                 return TAP_NOOP;
2344             }
2345             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2346                                                vb->ht.getMaxDeletedRevSeqno());
2347             *es = connection->specificData;
2348         }
2349         break;
2350     case TAP_NOOP:
2351         retry = true;
2352         break;
2353     default:
2354         break;
2355     }
2356
2357     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2358         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2359         if (vbev.event == TAP_VBUCKET_SET) {
2360             LOG(EXTENSION_LOG_WARNING,
2361                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2362                 connection->logHeader(), vbev.vbucket,
2363                 VBucket::toString(vbev.state));
2364             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2365         }
2366         ret = vbev.event;
2367     }
2368
2369     return ret;
2370 }
2371
2372 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2373                                                   item **itm,
2374                                                   void **es,
2375                                                   uint16_t *nes,
2376                                                   uint8_t *ttl,
2377                                                   uint16_t *flags,
2378                                                   uint32_t *seqno,
2379                                                   uint16_t *vbucket) {
2380     TapProducer *connection = getTapProducer(cookie);
2381     if (!connection) {
2382         LOG(EXTENSION_LOG_WARNING,
2383             "Failed to lookup TAP connection.. Disconnecting\n");
2384         return TAP_DISCONNECT;
2385     }
2386
2387     connection->setPaused(false);
2388
2389     bool retry = false;
2390     uint16_t ret;
2391
2392     connection->setLastWalkTime();
2393     do {
2394         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2395                              seqno, vbucket, connection, retry);
2396     } while (retry);
2397
2398     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2399         connection->lastMsgTime = ep_current_time();
2400         if (ret == TAP_NOOP) {
2401             *seqno = 0;
2402         } else {
2403             ++stats.numTapFetched;
2404             *seqno = connection->getSeqno();
2405             if (connection->requestAck(ret, *vbucket)) {
2406                 *flags = TAP_FLAG_ACK;
2407                 connection->seqnoAckRequested = *seqno;
2408             }
2409
2410             if (ret == TAP_MUTATION) {
2411                 if (connection->haveFlagByteorderSupport()) {
2412                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2413                 }
2414             }
2415         }
2416     } else {
2417         connection->setPaused(true);
2418         connection->setNotifySent(false);
2419     }
2420
2421     return ret;
2422 }
2423
2424 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2425                                                 std::string &client,
2426                                                 uint32_t flags,
2427                                                 const void *userdata,
2428                                                 size_t nuserdata) {
2429     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2430         return false;
2431     }
2432
2433     std::string tapName = "eq_tapq:";
2434     if (client.length() == 0) {
2435         tapName.assign(ConnHandler::getAnonName());
2436     } else {
2437         tapName.append(client);
2438     }
2439
2440     // Decoding the userdata section of the packet and update the filters
2441     const char *ptr = static_cast<const char*>(userdata);
2442     uint64_t backfillAge = 0;
2443     std::vector<uint16_t> vbuckets;
2444     std::map<uint16_t, uint64_t> lastCheckpointIds;
2445
2446     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2447         if (nuserdata < sizeof(backfillAge)) {
2448             LOG(EXTENSION_LOG_WARNING,
2449                 "Backfill age is missing. Reject connection request from %s\n",
2450                 tapName.c_str());
2451             return false;
2452         }
2453         // use memcpy to avoid alignemt issues
2454         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2455         backfillAge = ntohll(backfillAge);
2456         nuserdata -= sizeof(backfillAge);
2457         ptr += sizeof(backfillAge);
2458     }
2459
2460     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2461         uint16_t nvbuckets;
2462         if (nuserdata < sizeof(nvbuckets)) {
2463             LOG(EXTENSION_LOG_WARNING,
2464             "Number of vbuckets is missing. Reject connection request from %s"
2465             "\n", tapName.c_str());
2466             return false;
2467         }
2468         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2469         nuserdata -= sizeof(nvbuckets);
2470         ptr += sizeof(nvbuckets);
2471         nvbuckets = ntohs(nvbuckets);
2472         if (nvbuckets > 0) {
2473             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2474                 LOG(EXTENSION_LOG_WARNING,
2475                 "# of vbuckets not matched. Reject connection request from %s"
2476                 "\n", tapName.c_str());
2477                 return false;
2478             }
2479             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2480                 uint16_t val;
2481                 memcpy(&val, ptr, sizeof(nvbuckets));
2482                 ptr += sizeof(uint16_t);
2483                 vbuckets.push_back(ntohs(val));
2484             }
2485             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2486         }
2487     }
2488
2489     if (flags & TAP_CONNECT_CHECKPOINT) {
2490         uint16_t nCheckpoints = 0;
2491         if (nuserdata >= sizeof(nCheckpoints)) {
2492             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2493             nuserdata -= sizeof(nCheckpoints);
2494             ptr += sizeof(nCheckpoints);
2495             nCheckpoints = ntohs(nCheckpoints);
2496         }
2497         if (nCheckpoints > 0) {
2498             if (nuserdata <
2499                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2500                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2501                     "Reject connection request from %s\n", tapName.c_str());
2502                 return false;
2503             }
2504             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2505                 uint16_t vbid;
2506                 uint64_t checkpointId;
2507                 memcpy(&vbid, ptr, sizeof(vbid));
2508                 ptr += sizeof(uint16_t);
2509                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2510                 ptr += sizeof(uint64_t);
2511                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2512             }
2513             nuserdata -=
2514                         ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2515         }
2516     }
2517
2518     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2519                                  backfillAge,
2520                                  static_cast<int>(
2521                                  configuration.getTapKeepalive()),
2522                                  vbuckets,
2523                                  lastCheckpointIds);
2524
2525     tapConnMap->notifyPausedConnection(tp, true);
2526     return true;
2527 }
2528
2529 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2530                                                         void *engine_specific,
2531                                                         uint16_t nengine,
2532                                                         uint8_t ttl,
2533                                                         uint16_t tap_flags,
2534                                                         uint16_t tap_event,
2535                                                         uint32_t tap_seqno,
2536                                                         const void *key,
2537                                                         size_t nkey,
2538                                                         uint32_t flags,
2539                                                         uint32_t exptime,
2540                                                         uint64_t cas,
2541                                                         uint8_t datatype,
2542                                                         const void *data,
2543                                                         size_t ndata,
2544                                                         uint16_t vbucket)
2545 {
2546     (void) ttl;
2547     void *specific = getEngineSpecific(cookie);
2548     ConnHandler *connection = NULL;
2549     if (specific == NULL) {
2550         if (tap_event == TAP_ACK) {
2551             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2552                 "exist. Force disconnect...\n", (char *) cookie);
2553             // tap producer is no longer connected..
2554             return ENGINE_DISCONNECT;
2555         } else {
2556             connection = tapConnMap->newConsumer(cookie);
2557             if (connection == NULL) {
2558                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2559                     " Force disconnect\n");
2560                 return ENGINE_DISCONNECT;
2561             }
2562             storeEngineSpecific(cookie, connection);
2563         }
2564     } else {
2565         connection = reinterpret_cast<ConnHandler *>(specific);
2566     }
2567
2568     std::string k(static_cast<const char*>(key), nkey);
2569     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2570
2571     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2572         if (!tapThrottle->shouldProcess()) {
2573             ++stats.tapThrottled;
2574             if (connection->supportsAck()) {
2575                 ret = ENGINE_TMPFAIL;
2576             } else {
2577                 ret = ENGINE_DISCONNECT;
2578                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2579                     "ack support. Force disconnect...\n",
2580                     connection->logHeader());
2581             }
2582             return ret;
2583         }
2584     }
2585
2586     switch (tap_event) {
2587     case TAP_ACK:
2588         ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2589         break;
2590     case TAP_FLUSH:
2591         ret = flush(cookie, 0);
2592         LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2593             connection->logHeader());
2594         break;
2595     case TAP_DELETION:
2596         {
2597             uint64_t revSeqno;
2598             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2599                                                 nengine, &revSeqno);
2600
2601             ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2602                                        NULL, 0);
2603         }
2604         break;
2605
2606     case TAP_CHECKPOINT_START:
2607     case TAP_CHECKPOINT_END:
2608         {
2609             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2610             if (tc) {
2611                 if (tap_event == TAP_CHECKPOINT_START &&
2612                     nengine == TapEngineSpecific::sizeRevSeqno) {
2613                     // Set the current value for the max deleted seqno
2614                     RCPtr<VBucket> vb = getVBucket(vbucket);
2615                     if (!vb) {
2616                         return ENGINE_TMPFAIL;
2617                     }
2618                     uint64_t seqnum;
2619                     TapEngineSpecific::readSpecificData(tap_event,
2620                                                         engine_specific,
2621                                                         nengine,
2622                                                         &seqnum);
2623                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2624                 }
2625
2626                 if (data) {
2627                     uint64_t checkpointId;
2628                     memcpy(&checkpointId, data, sizeof(checkpointId));
2629                     checkpointId = ntohll(checkpointId);
2630                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2631                                           checkpointId);
2632                 }
2633                 else {
2634                     ret = ENGINE_DISCONNECT;
2635                     LOG(EXTENSION_LOG_WARNING,
2636                         "%s Checkpoint Id is missing in "
2637                         "CHECKPOINT messages. Force disconnect...\n",
2638                         connection->logHeader());
2639                 }
2640             }
2641             else {
2642                 ret = ENGINE_DISCONNECT;
2643                 LOG(EXTENSION_LOG_WARNING,
2644                     "%s not a consumer! Force disconnect\n",
2645                     connection->logHeader());
2646             }
2647         }
2648
2649         break;
2650
2651     case TAP_MUTATION:
2652         {
2653             uint8_t nru = INITIAL_NRU_VALUE;
2654             uint64_t revSeqno = 0;
2655             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2656                                                 nengine, &revSeqno, &nru);
2657
2658             if (!isDatatypeSupported(cookie)) {
2659                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2660                 const unsigned char *dat = (const unsigned char*)data;
2661                 const int datlen = ndata;
2662                 if (checkUTF8JSON(dat, datlen)) {
2663                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2664                 }
2665             }
2666             ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2667                                        flags, datatype, 0, 0, revSeqno, exptime,
2668                                        nru, NULL, 0);
2669         }
2670
2671         break;
2672
2673     case TAP_OPAQUE:
2674         if (nengine == sizeof(uint32_t)) {
2675             uint32_t cc;
2676             memcpy(&cc, engine_specific, sizeof(cc));
2677             cc = ntohl(cc);
2678
2679             switch (cc) {
2680             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2681                 // @todo: the memcached core will _ALWAYS_ send nack
2682                 //        if it encounter an error. This should be
2683                 // set as the default when we move to .next after 2.0
2684                 // (currently we need to allow the message for
2685                 // backwards compatibility)
2686                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2687                     connection->logHeader());
2688                 break;
2689             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2690                 connection->setSupportCheckpointSync(true);
2691                 LOG(EXTENSION_LOG_INFO,
2692                     "%s Enable checkpoint synchronization\n",
2693                     connection->logHeader());
2694                 break;
2695             case TAP_OPAQUE_OPEN_CHECKPOINT:
2696                 /**
2697                  * This event is only received by the TAP client that wants to
2698                  * get mutations from closed checkpoints only. At this time,
2699                  * only incremental backup client receives this event so that
2700                  * it can close the connection and reconnect later.
2701                  */
2702                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2703                     connection->logHeader());
2704                 break;
2705             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2706                 {
2707                     LOG(EXTENSION_LOG_INFO,
2708                         "%s Backfill started for vbucket %d.\n",
2709                         connection->logHeader(), vbucket);
2710                     BlockTimer timer(&stats.tapVbucketResetHisto);
2711                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2712                                                   ENGINE_DISCONNECT;
2713                     if (ret == ENGINE_DISCONNECT) {
2714                         LOG(EXTENSION_LOG_WARNING,
2715                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2716                             connection->logHeader(), vbucket);
2717                     } else {
2718                         LOG(EXTENSION_LOG_WARNING,
2719                          "%s Reset vbucket %d was completed succecssfully.\n",
2720                             connection->logHeader(), vbucket);
2721                     }
2722
2723                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2724                     if (tc) {
2725                         tc->setBackfillPhase(true, vbucket);
2726                     } else {
2727                         ret = ENGINE_DISCONNECT;
2728                         LOG(EXTENSION_LOG_WARNING,
2729                             "TAP consumer doesn't exists. Force disconnect\n");
2730                     }
2731                 }
2732                 break;
2733             case TAP_OPAQUE_CLOSE_BACKFILL:
2734                 {
2735                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2736                         connection->logHeader());
2737                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2738                     if (tc) {
2739                         tc->setBackfillPhase(false, vbucket);
2740                     } else {
2741                         ret = ENGINE_DISCONNECT;
2742                         LOG(EXTENSION_LOG_WARNING,
2743                             "%s not a consumer! Force disconnect\n",
2744                             connection->logHeader());
2745                     }
2746                 }
2747                 break;
2748             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2749                 /**
2750                  * This event is sent by the eVBucketMigrator to notify that
2751                  * the source node closes the tap replication stream and
2752                  * switches to TAKEOVER_VBUCKETS phase.
2753                  * This is just an informative message and doesn't require any
2754                  * action.
2755                  */
2756                 LOG(EXTENSION_LOG_INFO,
2757                 "%s Received close tap stream. Switching to takeover phase.\n",
2758                     connection->logHeader());
2759                 break;
2760             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2761                 /**
2762                  * This opaque message is just for notifying that the source
2763                  * node receives change_vbucket_filter request and processes
2764                  * it successfully.
2765                  */
2766                 LOG(EXTENSION_LOG_INFO,
2767                 "%s Notified that the source node changed a vbucket filter.\n",
2768                     connection->logHeader());
2769                 break;
2770             default:
2771                 LOG(EXTENSION_LOG_WARNING,
2772                     "%s Received an unknown opaque command\n",
2773                     connection->logHeader());
2774             }
2775         } else {
2776             LOG(EXTENSION_LOG_WARNING,
2777                 "%s Received tap opaque with unknown size %d\n",
2778                 connection->logHeader(), nengine);
2779         }
2780         break;
2781
2782     case TAP_VBUCKET_SET:
2783         {
2784             BlockTimer timer(&stats.tapVbucketSetHisto);
2785
2786             if (nengine != sizeof(vbucket_state_t)) {
2787                 // illegal datasize
2788                 LOG(EXTENSION_LOG_WARNING,
2789                     "%s Received TAP_VBUCKET_SET with illegal size."
2790                     " Force disconnect\n", connection->logHeader());
2791                 ret = ENGINE_DISCONNECT;
2792                 break;
2793             }
2794
2795             vbucket_state_t state;
2796             memcpy(&state, engine_specific, nengine);
2797             state = (vbucket_state_t)ntohl(state);
2798
2799             ret = connection->setVBucketState(0, vbucket, state);
2800         }
2801         break;
2802
2803     default:
2804         // Unknown command
2805         LOG(EXTENSION_LOG_WARNING,
2806             "%s Recieved bad opcode, ignoring message\n",
2807             connection->logHeader());
2808     }
2809
2810     connection->processedEvent(tap_event, ret);
2811     return ret;
2812 }
2813
2814 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2815                                                       TapConsumer *consumer,
2816                                                       uint8_t event,
2817                                                       uint16_t vbucket,
2818                                                       uint64_t checkpointId) {
2819     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2820
2821     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2822         getEpStore()->wakeUpFlusher();
2823         ret = ENGINE_SUCCESS;
2824     }
2825     else {
2826         ret = ENGINE_DISCONNECT;
2827         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2828             "checkpoint %llu. Force disconnect\n",
2829             consumer->logHeader(), checkpointId);
2830     }
2831
2832     return ret;
2833 }
2834
2835 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2836     TapProducer *rv =
2837         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2838     if (!(rv && rv->isConnected())) {
2839         LOG(EXTENSION_LOG_WARNING,
2840             "Walking a non-existent tap queue, disconnecting\n");
2841         return NULL;
2842     }
2843
2844     if (rv->doDisconnect()) {
2845         LOG(EXTENSION_LOG_WARNING,
2846             "%s Disconnecting pending connection\n", rv->logHeader());
2847         return NULL;
2848     }
2849     return rv;
2850 }
2851
2852 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2853     // Register the callback
2854     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2855 }
2856
2857 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2858                                                             uint32_t seqno,
2859                                                             uint16_t status,
2860                                                             const std::string
2861                                                             &msg)
2862 {
2863     TapProducer *connection = getTapProducer(cookie);
2864     if (!connection) {
2865         LOG(EXTENSION_LOG_WARNING,
2866             "Unable to process tap ack. No producer found\n");
2867         return ENGINE_DISCONNECT;
2868     }
2869
2870     return connection->processAck(seqno, status, msg);
2871 }
2872
2873 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2874                                                              &backfillVBFilter,
2875                                                Producer *tc)
2876 {
2877     ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2878                                            backfillVBFilter);
2879     ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2880 }
2881
2882 bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2883     ++numVbucket;
2884     item_eviction_policy_t policy = engine.getEpStore()->
2885                                                        getItemEvictionPolicy();
2886     numItems += vb->getNumItems(policy);
2887     numTempItems += vb->getNumTempItems();
2888     nonResident += vb->getNumNonResidentItems(policy);
2889
2890     if (vb->getHighPriorityChkSize() > 0) {
2891         chkPersistRemaining++;
2892     }
2893
2894     fileSpaceUsed += vb->fileSpaceUsed;
2895     fileSize += vb->fileSize;
2896
2897     if (desired_state != vbucket_state_dead) {
2898         htMemory += vb->ht.memorySize();
2899         htItemMemory += vb->ht.getItemMemory();
2900         htCacheSize += vb->ht.cacheSize;
2901         numEjects += vb->ht.getNumEjects();
2902         numExpiredItems += vb->numExpiredItems;
2903         metaDataMemory += vb->ht.metaDataMemory;
2904         metaDataDisk += vb->metaDataDisk;
2905         opsCreate += vb->opsCreate;
2906         opsUpdate += vb->opsUpdate;
2907         opsDelete += vb->opsDelete;
2908         opsReject += vb->opsReject;
2909
2910         queueSize += vb->dirtyQueueSize;
2911         queueMemory += vb->dirtyQueueMem;
2912         queueFill += vb->dirtyQueueFill;
2913         queueDrain += vb->dirtyQueueDrain;
2914         queueAge += vb->getQueueAge();
2915         pendingWrites += vb->dirtyQueuePendingWrites;
2916     }
2917
2918     return false;
2919 }
2920
2921 /**
2922  * A container class holding VBucketCountVisitors to aggregate stats for
2923  * different vbucket states.
2924  */
2925 class VBucketCountAggregator : public VBucketVisitor  {
2926 public:
2927     bool visitBucket(RCPtr<VBucket> &vb)  {
2928         std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
2929         it = visitorMap.find(vb->getState());
2930         if ( it != visitorMap.end() ) {
2931             it->second->visitBucket(vb);
2932         }
2933
2934         return false;
2935     }
2936
2937     void addVisitor(VBucketCountVisitor* visitor)  {
2938         visitorMap[visitor->getVBucketState()] = visitor;
2939     }
2940 private:
2941     std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
2942 };
2943
2944 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
2945                                                            ADD_STAT add_stat) {
2946     VBucketCountAggregator aggregator;
2947
2948     VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
2949     aggregator.addVisitor(&activeCountVisitor);
2950
2951     VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
2952     aggregator.addVisitor(&replicaCountVisitor);
2953
2954     VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
2955     aggregator.addVisitor(&pendingCountVisitor);
2956
2957     VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
2958     aggregator.addVisitor(&deadCountVisitor);
2959
2960     epstore->visit(aggregator);
2961
2962     epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
2963                                       replicaCountVisitor.getMemResidentPer());
2964     tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
2965                                      replicaCountVisitor.getNumItems() +
2966                                      pendingCountVisitor.getNumItems());
2967
2968     configuration.addStats(add_stat, cookie);
2969
2970     EPStats &epstats = getEpStats();
2971     add_casted_stat("ep_version", VERSION, add_stat, cookie);
2972     add_casted_stat("ep_storage_age",
2973                     epstats.dirtyAge, add_stat, cookie);
2974     add_casted_stat("ep_storage_age_highwat",
2975                     epstats.dirtyAgeHighWat, add_stat, cookie);
2976     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
2977                     add_stat, cookie);
2978
2979     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
2980         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
2981     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
2982         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
2983     }
2984
2985     add_casted_stat("ep_total_enqueued",
2986                     epstats.totalEnqueued, add_stat, cookie);
2987     add_casted_stat("ep_total_persisted",
2988                     epstats.totalPersisted, add_stat, cookie);
2989     add_casted_stat("ep_item_flush_failed",
2990                     epstats.flushFailed, add_stat, cookie);
2991     add_casted_stat("ep_item_commit_failed",
2992                     epstats.commitFailed, add_stat, cookie);
2993     add_casted_stat("ep_item_begin_failed",
2994                     epstats.beginFailed, add_stat, cookie);
2995     add_casted_stat("ep_expired_access", epstats.expired_access,
2996                     add_stat, cookie);
2997     add_casted_stat("ep_expired_pager", epstats.expired_pager,
2998                     add_stat, cookie);
2999     add_casted_stat("ep_item_flush_expired",
3000                     epstats.flushExpired, add_stat, cookie);
3001     add_casted_stat("ep_queue_size",
3002                     epstats.diskQueueSize, add_stat, cookie);
3003     add_casted_stat("ep_flusher_todo",
3004                     epstats.flusher_todo, add_stat, cookie);
3005     add_casted_stat("ep_uncommitted_items",
3006                     epstats.flusher_todo, add_stat, cookie);
3007     add_casted_stat("ep_diskqueue_items",
3008                     epstats.diskQueueSize, add_stat, cookie);
3009     add_casted_stat("ep_flusher_state",
3010                     epstore->getFlusher(0)->stateName(),
3011                     add_stat, cookie);
3012     add_casted_stat("ep_commit_num", epstats.flusherCommits,
3013                     add_stat, cookie);
3014     add_casted_stat("ep_commit_time",
3015                     epstats.commit_time, add_stat, cookie);
3016     add_casted_stat("ep_commit_time_total",
3017                     epstats.cumulativeCommitTime, add_stat, cookie);
3018     add_casted_stat("ep_vbucket_del",
3019                     epstats.vbucketDeletions, add_stat, cookie);
3020     add_casted_stat("ep_vbucket_del_fail",
3021                     epstats.vbucketDeletionFail, add_stat, cookie);
3022     add_casted_stat("ep_flush_duration_total",
3023                     epstats.cumulativeFlushTime, add_stat, cookie);
3024     add_casted_stat("ep_flush_all",
3025                     epstore->isFlushAllScheduled() ? "true" : "false",
3026                     add_stat, cookie);
3027     add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
3028                     cookie);
3029     add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
3030                     add_stat, cookie);
3031     add_casted_stat("curr_items_tot",
3032                     activeCountVisitor.getNumItems() +
3033                     replicaCountVisitor.getNumItems() +
3034                     pendingCountVisitor.getNumItems(),
3035                     add_stat, cookie);
3036     add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
3037                     add_stat, cookie);
3038     add_casted_stat("vb_active_curr_items", activeCountVisitor.getNumItems(),
3039                     add_stat, cookie);
3040     add_casted_stat("vb_active_num_non_resident",
3041                     activeCountVisitor.getNonResident(),
3042                     add_stat, cookie);
3043     add_casted_stat("vb_active_perc_mem_resident",
3044                     activeCountVisitor.getMemResidentPer(),
3045                     add_stat, cookie);
3046     add_casted_stat("vb_active_eject", activeCountVisitor.getEjects(),
3047                     add_stat, cookie);
3048     add_casted_stat("vb_active_expired", activeCountVisitor.getExpired(),
3049                     add_stat, cookie);
3050     add_casted_stat("vb_active_meta_data_memory",
3051                     activeCountVisitor.getMetaDataMemory(),
3052                     add_stat, cookie);
3053     add_casted_stat("vb_active_meta_data_disk",
3054                     activeCountVisitor.getMetaDataDisk(),
3055                     add_stat, cookie);
3056     add_casted_stat("vb_active_ht_memory",
3057                     activeCountVisitor.getHashtableMemory(),
3058                     add_stat, cookie);
3059     add_casted_stat("vb_active_itm_memory", activeCountVisitor.getItemMemory(),
3060                     add_stat, cookie);
3061     add_casted_stat("vb_active_ops_create", activeCountVisitor.getOpsCreate(),
3062                     add_stat, cookie);
3063     add_casted_stat("vb_active_ops_update", activeCountVisitor.getOpsUpdate(),
3064                     add_stat, cookie);
3065     add_casted_stat("vb_active_ops_delete", activeCountVisitor.getOpsDelete(),
3066                     add_stat, cookie);
3067     add_casted_stat("vb_active_ops_reject", activeCountVisitor.getOpsReject(),
3068                     add_stat, cookie);
3069     add_casted_stat("vb_active_queue_size", activeCountVisitor.getQueueSize(),
3070                     add_stat, cookie);
3071     add_casted_stat("vb_active_queue_memory",
3072                     activeCountVisitor.getQueueMemory(), add_stat, cookie);
3073     add_casted_stat("vb_active_queue_age", activeCountVisitor.getAge(),
3074                     add_stat, cookie);
3075     add_casted_stat("vb_active_queue_pending",
3076                     activeCountVisitor.getPendingWrites(), add_stat, cookie);
3077     add_casted_stat("vb_active_queue_fill", activeCountVisitor.getQueueFill(),
3078                     add_stat, cookie);
3079     add_casted_stat("vb_active_queue_drain",
3080                     activeCountVisitor.getQueueDrain(), add_stat, cookie);
3081
3082     add_casted_stat("vb_replica_num", replicaCountVisitor.getVBucketNumber(),
3083                     add_stat, cookie);
3084     add_casted_stat("vb_replica_curr_items", replicaCountVisitor.getNumItems(),
3085                     add_stat, cookie);
3086     add_casted_stat("vb_replica_num_non_resident",
3087                     replicaCountVisitor.getNonResident(), add_stat, cookie);
3088     add_casted_stat("vb_replica_perc_mem_resident",
3089                     replicaCountVisitor.getMemResidentPer(),
3090                     add_stat, cookie);
3091     add_casted_stat("vb_replica_eject", replicaCountVisitor.getEjects(),
3092                     add_stat, cookie);
3093     add_casted_stat("vb_replica_expired", replicaCountVisitor.getExpired(),
3094                     add_stat, cookie);
3095     add_casted_stat("vb_replica_meta_data_memory",
3096                     replicaCountVisitor.getMetaDataMemory(), add_stat, cookie);
3097     add_casted_stat("vb_replica_meta_data_disk",
3098                     replicaCountVisitor.getMetaDataDisk(), add_stat, cookie);
3099     add_casted_stat("vb_replica_ht_memory",
3100                     replicaCountVisitor.getHashtableMemory(),
3101                     add_stat, cookie);
3102     add_casted_stat("vb_replica_itm_memory",
3103                     replicaCountVisitor.getItemMemory(), add_stat, cookie);
3104     add_casted_stat("vb_replica_ops_create",
3105                     replicaCountVisitor.getOpsCreate(), add_stat, cookie);
3106     add_casted_stat("vb_replica_ops_update",
3107                     replicaCountVisitor.getOpsUpdate(), add_stat, cookie);
3108     add_casted_stat("vb_replica_ops_delete",
3109                     replicaCountVisitor.getOpsDelete(), add_stat, cookie);
3110     add_casted_stat("vb_replica_ops_reject",
3111                     replicaCountVisitor.getOpsReject(), add_stat, cookie);
3112     add_casted_stat("vb_replica_queue_size",
3113                     replicaCountVisitor.getQueueSize(), add_stat, cookie);
3114     add_casted_stat("vb_replica_queue_memory",
3115                     replicaCountVisitor.getQueueMemory(),
3116                     add_stat, cookie);
3117     add_casted_stat("vb_replica_queue_age",
3118                     replicaCountVisitor.getAge(), add_stat, cookie);
3119     add_casted_stat("vb_replica_queue_pending",
3120                     replicaCountVisitor.getPendingWrites(),
3121                     add_stat, cookie);
3122     add_casted_stat("vb_replica_queue_fill",
3123                     replicaCountVisitor.getQueueFill(), add_stat, cookie);
3124     add_casted_stat("vb_replica_queue_drain",
3125                     replicaCountVisitor.getQueueDrain(), add_stat, cookie);
3126
3127     add_casted_stat("vb_pending_num",
3128                     pendingCountVisitor.getVBucketNumber(), add_stat, cookie);
3129     add_casted_stat("vb_pending_curr_items",
3130                     pendingCountVisitor.getNumItems(), add_stat, cookie);
3131     add_casted_stat("vb_pending_num_non_resident",
3132                     pendingCountVisitor.getNonResident(),
3133                     add_stat, cookie);
3134     add_casted_stat("vb_pending_perc_mem_resident",
3135                     pendingCountVisitor.getMemResidentPer(), add_stat, cookie);
3136     add_casted_stat("vb_pending_eject", pendingCountVisitor.getEjects(),
3137                     add_stat, cookie);
3138     add_casted_stat("vb_pending_expired", pendingCountVisitor.getExpired(),
3139                     add_stat, cookie);
3140     add_casted_stat("vb_pending_meta_data_memory",
3141                     pendingCountVisitor.getMetaDataMemory(),
3142                     add_stat, cookie);
3143     add_casted_stat("vb_pending_meta_data_disk",
3144                     pendingCountVisitor.getMetaDataDisk(),
3145                     add_stat, cookie);
3146     add_casted_stat("vb_pending_ht_memory",
3147                     pendingCountVisitor.getHashtableMemory(),
3148                     add_stat, cookie);
3149     add_casted_stat("vb_pending_itm_memory",
3150                     pendingCountVisitor.getItemMemory(), add_stat, cookie);
3151     add_casted_stat("vb_pending_ops_create",
3152                     pendingCountVisitor.getOpsCreate(), add_stat, cookie);
3153     add_casted_stat("vb_pending_ops_update",
3154                     pendingCountVisitor.getOpsUpdate(), add_stat, cookie);
3155     add_casted_stat("vb_pending_ops_delete",
3156                     pendingCountVisitor.getOpsDelete(), add_stat, cookie);
3157     add_casted_stat("vb_pending_ops_reject",
3158                     pendingCountVisitor.getOpsReject(), add_stat, cookie);
3159     add_casted_stat("vb_pending_queue_size",
3160                     pendingCountVisitor.getQueueSize(), add_stat, cookie);
3161     add_casted_stat("vb_pending_queue_memory",
3162                     pendingCountVisitor.getQueueMemory(),
3163                     add_stat, cookie);
3164     add_casted_stat("vb_pending_queue_age", pendingCountVisitor.getAge(),
3165                     add_stat, cookie);
3166     add_casted_stat("vb_pending_queue_pending",
3167                     pendingCountVisitor.getPendingWrites(),
3168                     add_stat, cookie);
3169     add_casted_stat("vb_pending_queue_fill",
3170                     pendingCountVisitor.getQueueFill(), add_stat, cookie);
3171     add_casted_stat("vb_pending_queue_drain",
3172                     pendingCountVisitor.getQueueDrain(), add_stat, cookie);
3173
3174     add_casted_stat("vb_dead_num", deadCountVisitor.getVBucketNumber(),
3175                     add_stat, cookie);
3176
3177     add_casted_stat("ep_db_data_size",
3178                     activeCountVisitor.getFileSpaceUsed() +
3179                     replicaCountVisitor.getFileSpaceUsed() +
3180                     pendingCountVisitor.getFileSpaceUsed() +
3181                     deadCountVisitor.getFileSpaceUsed(),
3182                     add_stat, cookie);
3183     add_casted_stat("ep_db_file_size",
3184                     activeCountVisitor.getFileSize() +
3185                     replicaCountVisitor.getFileSize() +
3186                     pendingCountVisitor.getFileSize() +
3187                     deadCountVisitor.getFileSize(),
3188                     add_stat, cookie);
3189
3190     add_casted_stat("ep_vb_snapshot_total",
3191                     epstats.snapshotVbucketHisto.total(), add_stat, cookie);
3192
3193     add_casted_stat("ep_vb_total",
3194                     activeCountVisitor.getVBucketNumber() +
3195                     replicaCountVisitor.getVBucketNumber() +
3196                     pendingCountVisitor.getVBucketNumber() +
3197                     deadCountVisitor.getVBucketNumber(),
3198                     add_stat, cookie);
3199
3200     add_casted_stat("ep_total_new_items",
3201                     activeCountVisitor.getOpsCreate() +
3202                     replicaCountVisitor.getOpsCreate() +
3203                     pendingCountVisitor.getOpsCreate(),
3204                     add_stat, cookie);
3205     add_casted_stat("ep_total_del_items",
3206                     activeCountVisitor.getOpsDelete() +
3207                     replicaCountVisitor.getOpsDelete() +
3208                     pendingCountVisitor.getOpsDelete(),
3209                     add_stat, cookie);
3210     add_casted_stat("ep_diskqueue_memory",
3211                     activeCountVisitor.getQueueMemory() +
3212                     replicaCountVisitor.getQueueMemory() +
3213                     pendingCountVisitor.getQueueMemory(),
3214                     add_stat, cookie);
3215     add_casted_stat("ep_diskqueue_fill",
3216                     activeCountVisitor.getQueueFill() +
3217                     replicaCountVisitor.getQueueFill() +
3218                     pendingCountVisitor.getQueueFill(),
3219                     add_stat, cookie);
3220     add_casted_stat("ep_diskqueue_drain",
3221                     activeCountVisitor.getQueueDrain() +
3222                     replicaCountVisitor.getQueueDrain() +
3223                     pendingCountVisitor.getQueueDrain(),
3224                     add_stat, cookie);
3225     add_casted_stat("ep_diskqueue_pending",
3226                     activeCountVisitor.getPendingWrites() +
3227                     replicaCountVisitor.getPendingWrites() +
3228                     pendingCountVisitor.getPendingWrites(),
3229                     add_stat, cookie);
3230     add_casted_stat("ep_meta_data_memory",
3231                     activeCountVisitor.getMetaDataMemory() +
3232                     replicaCountVisitor.getMetaDataMemory() +
3233                     pendingCountVisitor.getMetaDataMemory(),
3234                     add_stat, cookie);
3235     add_casted_stat("ep_meta_data_disk",
3236                     activeCountVisitor.getMetaDataDisk() +
3237                     replicaCountVisitor.getMetaDataDisk() +
3238                     pendingCountVisitor.getMetaDataDisk(),
3239                     add_stat, cookie);
3240
3241     size_t memUsed =  stats.getTotalMemoryUsed();
3242     add_casted_stat("mem_used", memUsed, add_stat, cookie);
3243     add_casted_stat("bytes", memUsed, add_stat, cookie);
3244     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3245     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3246 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3247     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3248 #else
3249     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3250 #endif
3251     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3252     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3253                     add_stat, cookie);
3254 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3255     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3256 #else
3257     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3258 #endif
3259     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3260     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3261     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3262     add_casted_stat("ep_total_cache_size",
3263                     activeCountVisitor.getCacheSize() +
3264                     replicaCountVisitor.getCacheSize() +
3265                     pendingCountVisitor.getCacheSize(),
3266                     add_stat, cookie);
3267     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3268     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3269                     add_stat, cookie);
3270     add_casted_stat("ep_mem_tracker_enabled",
3271                     stats.memoryTrackerEnabled ? "true" : "false",
3272                     add_stat, cookie);
3273     add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3274                     add_stat, cookie);
3275     add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3276                     add_stat, cookie);
3277     add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3278                     add_stat, cookie);
3279     add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3280                     add_stat, cookie);
3281     add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3282                     add_stat, cookie);
3283     add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3284                     add_stat, cookie);
3285     add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3286                     add_stat, cookie);
3287     add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3288                     add_stat, cookie);
3289     add_casted_stat("ep_items_rm_from_checkpoints",
3290                     epstats.itemsRemovedFromCheckpoints,
3291                     add_stat, cookie);
3292     add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3293                     add_stat, cookie);
3294     add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3295                     add_stat, cookie);
3296     add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3297                     add_stat, cookie);
3298
3299     add_casted_stat("ep_io_num_read", epstats.io_num_read,
3300                     add_stat, cookie);
3301     add_casted_stat("ep_io_num_write", epstats.io_num_write, add_stat, cookie);
3302     add_casted_stat("ep_io_read_bytes", epstats.io_read_bytes,
3303                     add_stat, cookie);
3304     add_casted_stat("ep_io_write_bytes", epstats.io_write_bytes,
3305                      add_stat, cookie);
3306
3307     add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3308     add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3309                     add_stat, cookie);
3310     add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3311                     add_stat, cookie);
3312     add_casted_stat("ep_pending_ops_max_duration",
3313                     epstats.pendingOpsMaxDuration,
3314                     add_stat, cookie);
3315
3316     add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3317                     add_stat, cookie);
3318     add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3319                     add_stat, cookie);
3320
3321     size_t vbDeletions = epstats.vbucketDeletions.load();
3322     if (vbDeletions > 0) {
3323         add_casted_stat("ep_vbucket_del_max_walltime",
3324                         epstats.vbucketDelMaxWalltime,
3325                         add_stat, cookie);
3326         add_casted_stat("ep_vbucket_del_avg_walltime",
3327                         epstats.vbucketDelTotWalltime / vbDeletions,
3328                         add_stat, cookie);
3329     }
3330
3331     size_t numBgOps = epstats.bgNumOperations.load();
3332     if (numBgOps > 0) {
3333         add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3334                         add_stat, cookie);
3335         add_casted_stat("ep_bg_min_wait",
3336                         epstats.bgMinWait,
3337                         add_stat, cookie);
3338         add_casted_stat("ep_bg_max_wait",
3339                         epstats.bgMaxWait,
3340                         add_stat, cookie);
3341         add_casted_stat("ep_bg_wait_avg",
3342                         epstats.bgWait / numBgOps,
3343                         add_stat, cookie);
3344         add_casted_stat("ep_bg_min_load",
3345                         epstats.bgMinLoad,
3346                         add_stat, cookie);
3347         add_casted_stat("ep_bg_max_load",
3348                         epstats.bgMaxLoad,
3349                         add_stat, cookie);
3350         add_casted_stat("ep_bg_load_avg",
3351                         epstats.bgLoad / numBgOps,
3352                         add_stat, cookie);
3353         add_casted_stat("ep_bg_wait",
3354                         epstats.bgWait,
3355                         add_stat, cookie);
3356         add_casted_stat("ep_bg_load",
3357                         epstats.bgLoad,
3358                         add_stat, cookie);
3359     }
3360
3361     add_casted_stat("ep_num_non_resident",
3362                     activeCountVisitor.getNonResident() +
3363                     pendingCountVisitor.getNonResident() +
3364                     replicaCountVisitor.getNonResident(),
3365                     add_stat, cookie);
3366
3367     add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3368     add_casted_stat("ep_exp_pager_stime", epstore->getExpiryPagerSleeptime(),
3369                     add_stat, cookie);
3370
3371     add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3372                     add_stat, cookie);
3373     add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3374                     add_stat, cookie);
3375     add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3376                     add_stat, cookie);
3377     add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3378                     add_stat, cookie);
3379
3380     if (getConfiguration().isAccessScannerEnabled()) {
3381         char timestr[20];
3382         struct tm alogTim;
3383         if (cb_gmtime_r((time_t *)&epstats.alogTime, &alogTim) == -1) {
3384             add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3385                             cookie);
3386         } else {
3387             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3388             add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3389                             cookie);
3390         }
3391     } else {
3392         add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3393                         add_stat, cookie);
3394     }
3395
3396     add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
3397
3398     if (getConfiguration().isWarmup()) {
3399         Warmup *wp = epstore->getWarmup();
3400         cb_assert(wp);
3401         if (!epstore->isWarmingUp()) {
3402             add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
3403         } else {
3404             add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);
3405         }
3406         if (wp->getTime() > 0) {
3407             add_casted_stat("ep_warmup_time", wp->getTime() / 1000,
3408                             add_stat, cookie);
3409         }
3410         add_casted_stat("ep_warmup_oom", epstats.warmOOM, add_stat, cookie);
3411         add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
3412     }
3413
3414     add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
3415                     add_stat, cookie);
3416     add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
3417                     add_stat, cookie);
3418     add_casted_stat("ep_num_ops_del_meta", epstats.numOpsDelMeta,
3419                     add_stat, cookie);
3420     add_casted_stat("ep_num_ops_set_meta_res_fail",
3421                     epstats.numOpsSetMetaResolutionFailed, add_stat, cookie);
3422     add_casted_stat("ep_num_ops_del_meta_res_fail",
3423                     epstats.numOpsDelMetaResolutionFailed, add_stat, cookie);
3424     add_casted_stat("ep_num_ops_set_ret_meta", epstats.numOpsSetRetMeta,
3425                     add_stat, cookie);
3426     add_casted_stat("ep_num_ops_del_ret_meta", epstats.numOpsDelRetMeta,
3427                     add_stat, cookie);
3428     add_casted_stat("ep_num_ops_get_meta_on_set_meta",
3429                     epstats.numOpsGetMetaOnSetWithMeta, add_stat, cookie);
3430     add_casted_stat("ep_chk_persistence_timeout",
3431                     VBucket::getCheckpointFlushTimeout(),
3432                     add_stat, cookie);
3433     add_casted_stat("ep_chk_persistence_remains",
3434                     activeCountVisitor.getChkPersistRemaining() +
3435                     pendingCountVisitor.getChkPersistRemaining() +
3436                     replicaCountVisitor.getChkPersistRemaining(),
3437                     add_stat, cookie);
3438     add_casted_stat("ep_workload_pattern",
3439                     workload->stringOfWorkLoadPattern(),
3440                     add_stat, cookie);
3441     return ENGINE_SUCCESS;
3442 }
3443
3444 ENGINE_ERROR_CODE EventuallyPersistentEngine::doMemoryStats(const void *cookie,
3445                                                            ADD_STAT add_stat) {
3446     add_casted_stat("bytes", stats.getTotalMemoryUsed(), add_stat, cookie);
3447     add_casted_stat("mem_used", stats.getTotalMemoryUsed(), add_stat, cookie);
3448     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3449     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3450     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3451     add_casted_stat("ep_max_size", stats.getMaxDataSize(), add_stat, cookie);
3452     add_casted_stat("ep_mem_low_wat", stats.mem_low_wat, add_stat, cookie);
3453     add_casted_stat("ep_mem_high_wat", stats.mem_high_wat, add_stat, cookie);
3454     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3455     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3456                     add_stat, cookie);
3457
3458     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3459 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3460     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3461 #else
3462     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3463 #endif
3464     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3465                     add_stat, cookie);
3466 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3467     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3468 #else
3469     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3470 #endif
3471     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3472     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3473
3474
3475     add_casted_stat("ep_mem_tracker_enabled",
3476                     stats.memoryTrackerEnabled ? "true" : "false",
3477                     add_stat, cookie);
3478
3479     std::map<std::string, size_t> alloc_stats;
3480     MemoryTracker::getInstance()->getAllocatorStats(alloc_stats);
3481     std::map<std::string, size_t>::iterator it = alloc_stats.begin();
3482     for (; it != alloc_stats.end(); ++it) {
3483         add_casted_stat(it->first.c_str(), it->second, add_stat, cookie);
3484     }
3485
3486     return ENGINE_SUCCESS;
3487 }
3488
3489 ENGINE_ERROR_CODE EventuallyPersistentEngine::doVBucketStats(
3490                                                        const void *cookie,
3491                                                        ADD_STAT add_stat,
3492                                                        const char* stat_key,
3493                                                        int nkey,
3494                                                        bool prevStateRequested,
3495                                                        bool details) {
3496     class StatVBucketVisitor : public VBucketVisitor {
3497     public:
3498         StatVBucketVisitor(EventuallyPersistentStore *store,
3499                            const void *c, ADD_STAT a,
3500                            bool isPrevStateRequested, bool detailsRequested) :
3501             eps(store), cookie(c), add_stat(a),
3502             isPrevState(isPrevStateRequested),
3503             isDetailsRequested(detailsRequested) {}
3504
3505         bool visitBucket(RCPtr<VBucket> &vb) {
3506             addVBStats(cookie, add_stat, vb, eps, isPrevState,
3507                        isDetailsRequested);
3508             return false;
3509         }
3510
3511         static void addVBStats(const void *cookie, ADD_STAT add_stat,
3512                                RCPtr<VBucket> &vb,
3513                                EventuallyPersistentStore *store,
3514                                bool isPrevStateRequested,
3515                                bool detailsRequested) {
3516             if (!vb) {
3517                 return;
3518             }
3519
3520             if (isPrevStateRequested) {
3521                 char buf[16];
3522                 snprintf(buf, sizeof(buf), "vb_%d", vb->getId());
3523                 add_casted_stat(buf, VBucket::toString(vb->getInitialState()),
3524                                 add_stat, cookie);
3525             } else {
3526                 vb->addStats(detailsRequested, add_stat, cookie,
3527                              store->getItemEvictionPolicy());
3528             }
3529         }
3530
3531     private:
3532         EventuallyPersistentStore *eps;
3533         const void *cookie;
3534         ADD_STAT add_stat;
3535         bool isPrevState;
3536         bool isDetailsRequested;
3537     };
3538
3539     if (nkey > 16 && strncmp(stat_key, "vbucket-details", 15) == 0) {
3540         std::string vbid(&stat_key[16], nkey - 16);
3541         uint16_t vbucket_id(0);
3542         if (!parseUint16(vbid.c_str(), &vbucket_id)) {
3543             return ENGINE_EINVAL;
3544         }
3545         RCPtr<VBucket> vb = getVBucket(vbucket_id);
3546         if (!vb) {
3547             return ENGINE_NOT_MY_VBUCKET;
3548         }
3549
3550         StatVBucketVisitor::addVBStats(cookie, add_stat, vb, epstore,
3551                                        prevStateRequested, details);
3552     }
3553     else {
3554         StatVBucketVisitor svbv(epstore, cookie, add_stat, prevStateRequested,
3555                                 details);
3556         epstore->visit(svbv);
3557     }
3558     return ENGINE_SUCCESS;
3559 }
3560
3561 ENGINE_ERROR_CODE EventuallyPersistentEngine::doHashStats(const void *cookie,
3562                                                           ADD_STAT add_stat) {
3563
3564     class StatVBucketVisitor : public VBucketVisitor {
3565     public:
3566         StatVBucketVisitor(const void *c, ADD_STAT a) : cookie(c),
3567                                                         add_stat(a) {}
3568
3569         bool visitBucket(RCPtr<VBucket> &vb) {
3570             uint16_t vbid = vb->getId();
3571             char buf[32];
3572             snprintf(buf, sizeof(buf), "vb_%d:state", vbid);
3573             add_casted_stat(buf, VBucket::toString(vb->getState()),
3574                             add_stat, cookie);
3575
3576             HashTableDepthStatVisitor depthVisitor;
3577             vb->ht.visitDepth(depthVisitor);
3578
3579             snprintf(buf, sizeof(buf), "vb_%d:size", vbid);
3580             add_casted_stat(buf, vb->ht.getSize(), add_stat, cookie);
3581             snprintf(buf, sizeof(buf), "vb_%d:locks", vbid);
3582             add_casted_stat(buf, vb->ht.getNumLocks(), add_stat, cookie);
3583             snprintf(buf, sizeof(buf), "vb_%d:min_depth", vbid);
3584             add_casted_stat(buf, depthVisitor.min == -1 ? 0 : depthVisitor.min,
3585                             add_stat, cookie);
3586             snprintf(buf, sizeof(buf), "vb_%d:max_depth", vbid);
3587             add_casted_stat(buf, depthVisitor.max, add_stat, cookie);
3588             snprintf(buf, sizeof(buf), "vb_%d:histo", vbid);
3589             add_casted_stat(buf, depthVisitor.depthHisto, add_stat, cookie);
3590             snprintf(buf, sizeof(buf), "vb_%d:reported", vbid);
3591             add_casted_stat(buf, vb->ht.getNumInMemoryItems(), add_stat, cookie);
3592             snprintf(buf, sizeof(buf), "vb_%d:counted", vbid);
3593             add_casted_stat(buf, depthVisitor.size, add_stat, cookie);
3594             snprintf(buf, sizeof(buf), "vb_%d:resized", vbid);
3595             add_casted_stat(buf, vb->ht.getNumResizes(), add_stat, cookie);
3596             snprintf(buf, sizeof(buf), "vb_%d:mem_size", vbid);
3597             add_casted_stat(buf, vb->ht.memSize, add_stat, cookie);
3598             snprintf(buf, sizeof(buf), "vb_%d:mem_size_counted", vbid);
3599             add_casted_stat(buf, depthVisitor.memUsed, add_stat, cookie);
3600
3601             return false;
3602         }
3603
3604         const void *cookie;
3605         ADD_STAT add_stat;
3606     };
3607
3608     StatVBucketVisitor svbv(cookie, add_stat);
3609     epstore->visit(svbv);
3610
3611     return ENGINE_SUCCESS;
3612 }
3613
3614 class StatCheckpointVisitor : public VBucketVisitor {
3615 public:
3616     StatCheckpointVisitor(EventuallyPersistentStore * eps, const void *c,
3617                           ADD_STAT a) : epstore(eps), cookie(c), add_stat(a) {}
3618
3619     bool visitBucket(RCPtr<VBucket> &vb) {
3620         addCheckpointStat(cookie, add_stat, epstore, vb);
3621         return false;
3622     }
3623
3624     static void addCheckpointStat(const void *cookie, ADD_STAT add_stat,
3625                                   EventuallyPersistentStore *eps,
3626                                   RCPtr<VBucket> &vb) {
3627         if (!vb) {
3628             return;
3629         }
3630
3631         uint16_t vbid = vb->getId();
3632         char buf[256];
3633         snprintf(buf, sizeof(buf), "vb_%d:state", vbid);
3634         add_casted_stat(buf, VBucket::toString(vb->getState()),
3635                         add_stat, cookie);
3636         vb->checkpointManager.addStats(add_stat, cookie);
3637         snprintf(buf, sizeof(buf), "vb_%d:persisted_checkpoint_id", vbid);