99aa4ac62df096983ec7f1715bfbe5c0208c4bbe
[ep-engine.git] / src / ep_engine.cc
1
2 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 /*
4  *     Copyright 2010 Couchbase, Inc
5  *
6  *   Licensed under the Apache License, Version 2.0 (the "License");
7  *   you may not use this file except in compliance with the License.
8  *   You may obtain a copy of the License at
9  *
10  *       http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *   Unless required by applicable law or agreed to in writing, software
13  *   distributed under the License is distributed on an "AS IS" BASIS,
14  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *   See the License for the specific language governing permissions and
16  *   limitations under the License.
17  */
18
19 #include "config.h"
20
21 #include <fcntl.h>
22 #include <memcached/engine.h>
23 #include <memcached/protocol_binary.h>
24 #include <memcached/util.h>
25 #include <platform/platform.h>
26 #include <stdarg.h>
27
28 #include <cstdio>
29 #include <cstring>
30 #include <fstream>
31 #include <iostream>
32 #include <limits>
33 #include <string>
34 #include <vector>
35
36 #include "backfill.h"
37 #include "ep_engine.h"
38 #include "failover-table.h"
39 #include "flusher.h"
40 #include "connmap.h"
41 #include "htresizer.h"
42 #include "memory_tracker.h"
43 #include "stats-info.h"
44 #define STATWRITER_NAMESPACE core_engine
45 #include "statwriter.h"
46 #undef STATWRITER_NAMESPACE
47 #include "tapthrottle.h"
48 #include "dcp-consumer.h"
49 #include "dcp-producer.h"
50 #include "warmup.h"
51
52 static ALLOCATOR_HOOKS_API *hooksApi;
53 SERVER_LOG_API* EventuallyPersistentEngine::loggerApi;
54
55
56 static size_t percentOf(size_t val, double percent) {
57     return static_cast<size_t>(static_cast<double>(val) * percent);
58 }
59
60 /**
61  * Helper function to avoid typing in the long cast all over the place
62  * @param handle pointer to the engine
63  * @return the engine as a class
64  */
65 static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
66 {
67     EventuallyPersistentEngine* ret;
68     ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
69     ObjectRegistry::onSwitchThread(ret);
70     return ret;
71 }
72
73 static inline void releaseHandle(ENGINE_HANDLE* handle) {
74     (void) handle;
75     ObjectRegistry::onSwitchThread(NULL);
76 }
77
78
79 /**
80  * Call the response callback and return the appropriate value so that
81  * the core knows what to do..
82  */
83 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
84                                       uint16_t keylen,
85                                       const void *ext, uint8_t extlen,
86                                       const void *body, uint32_t bodylen,
87                                       uint8_t datatype, uint16_t status,
88                                       uint64_t cas, const void *cookie)
89 {
90     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
91     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
92     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
93                  status, cas, cookie)) {
94         rv = ENGINE_SUCCESS;
95     }
96     ObjectRegistry::onSwitchThread(e);
97     return rv;
98 }
99
100 template <typename T>
101 static void validate(T v, T l, T h) {
102     if (v < l || v > h) {
103         throw std::runtime_error("Value out of range.");
104     }
105 }
106
107
108 static void checkNumeric(const char* str) {
109     int i = 0;
110     if (str[0] == '-') {
111         i++;
112     }
113     for (; str[i]; i++) {
114         using namespace std;
115         if (!isdigit(str[i])) {
116             throw std::runtime_error("Value is not numeric");
117         }
118     }
119 }
120
121 // The Engine API specifies C linkage for the functions..
122 extern "C" {
123
124     static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
125     {
126         engine_info* info = getHandle(handle)->getInfo();
127         releaseHandle(handle);
128         return info;
129     }
130
131     static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
132                                            const char* config_str)
133     {
134         ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
135         releaseHandle(handle);
136         return err_code;
137     }
138
139     static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
140     {
141         getHandle(handle)->destroy(force);
142         delete getHandle(handle);
143         releaseHandle(NULL);
144     }
145
146     static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
147                                              const void* cookie,
148                                              item **itm,
149                                              const void* key,
150                                              const size_t nkey,
151                                              const size_t nbytes,
152                                              const int flags,
153                                              const rel_time_t exptime,
154                                              uint8_t datatype)
155     {
156         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
157             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
158                     " (ItemAllocate)");
159             return ENGINE_EINVAL;
160         }
161         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
162                                                                      itm, key,
163                                                                      nkey,
164                                                                      nbytes,
165                                                                      flags,
166                                                                      exptime,
167                                                                      datatype);
168         releaseHandle(handle);
169         return err_code;
170     }
171
172     static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
173                                            const void* cookie,
174                                            const void* key,
175                                            const size_t nkey,
176                                            uint64_t* cas,
177                                            uint16_t vbucket,
178                                            mutation_descr_t *mut_info)
179     {
180         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
181                                                                    nkey, cas,
182                                                                    vbucket, mut_info);
183         releaseHandle(handle);
184         return err_code;
185     }
186
187     static void EvpItemRelease(ENGINE_HANDLE* handle,
188                                const void *cookie,
189                                item* itm)
190     {
191         getHandle(handle)->itemRelease(cookie, itm);
192         releaseHandle(handle);
193     }
194
195     static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
196                                     const void* cookie,
197                                     item** itm,
198                                     const void* key,
199                                     const int nkey,
200                                     uint16_t vbucket)
201     {
202         ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
203                                                             nkey, vbucket, true);
204         releaseHandle(handle);
205         return err_code;
206     }
207
208     static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
209                                          const void* cookie,
210                                          const char* stat_key,
211                                          int nkey,
212                                          ADD_STAT add_stat)
213     {
214         ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
215                                                                  stat_key,
216                                                                  nkey,
217                                                                  add_stat);
218         releaseHandle(handle);
219         return err_code;
220     }
221
222     static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
223                                       const void *cookie,
224                                       item* itm,
225                                       uint64_t *cas,
226                                       ENGINE_STORE_OPERATION operation,
227                                       uint16_t vbucket)
228     {
229         ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
230                                                               operation,
231                                                               vbucket);
232         releaseHandle(handle);
233         return err_code;
234     }
235
236     static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
237                                            const void* cookie,
238                                            const void* key,
239                                            const int nkey,
240                                            const bool increment,
241                                            const bool create,
242                                            const uint64_t delta,
243                                            const uint64_t initial,
244                                            const rel_time_t exptime,
245                                            item **itm,
246                                            uint8_t datatype,
247                                            uint64_t *result,
248                                            uint16_t vbucket)
249     {
250         if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
251             if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
252                 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
253                         " (Arithmetic)");
254             } else {
255                 LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
256                     "operations on compressed data!");
257             }
258             return ENGINE_EINVAL;
259         }
260         ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
261                                                                 nkey,
262                                                                 increment,
263                                                                 create, delta,
264                                                                 initial,
265                                                                 exptime, itm,
266                                                                 datatype,
267                                                                 result,
268                                                                 vbucket);
269         releaseHandle(handle);
270         return ecode;
271     }
272
273     static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
274                                       const void* cookie, time_t when)
275     {
276         ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
277         releaseHandle(handle);
278         return err_code;
279     }
280
281     static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
282     {
283         getHandle(handle)->resetStats();
284         releaseHandle(handle);
285     }
286
287     static protocol_binary_response_status stopFlusher(
288                                                  EventuallyPersistentEngine *e,
289                                                  const char **msg,
290                                                  size_t *msg_size) {
291         return e->stopFlusher(msg, msg_size);
292     }
293
294     static protocol_binary_response_status startFlusher(
295                                                  EventuallyPersistentEngine *e,
296                                                  const char **msg,
297                                                  size_t *msg_size) {
298         return e->startFlusher(msg, msg_size);
299     }
300
301     static protocol_binary_response_status setTapParam(
302                                                  EventuallyPersistentEngine *e,
303                                                  const char *keyz,
304                                                  const char *valz,
305                                                  const char **msg, size_t *) {
306         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
307
308         try {
309             int v = atoi(valz);
310             if (strcmp(keyz, "tap_keepalive") == 0) {
311                 checkNumeric(valz);
312                 validate(v, 0, MAX_TAP_KEEP_ALIVE);
313                 e->setTapKeepAlive(static_cast<uint32_t>(v));
314             } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
315                 checkNumeric(valz);
316                 e->getConfiguration().setTapThrottleThreshold(v);
317             } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
318                 checkNumeric(valz);
319                 e->getConfiguration().setTapThrottleQueueCap(v);
320             } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
321                 checkNumeric(valz);
322                 e->getConfiguration().setTapThrottleCapPcnt(v);
323             } else {
324                 *msg = "Unknown config param";
325                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
326             }
327         } catch(std::runtime_error &) {
328             *msg = "Value out of range.";
329             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
330         }
331
332         return rv;
333     }
334
335     static protocol_binary_response_status setCheckpointParam(
336                                                  EventuallyPersistentEngine *e,
337                                                               const char *keyz,
338                                                               const char *valz,
339                                                               const char **msg,
340                                                               size_t *) {
341         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
342
343         try {
344             int v = atoi(valz);
345             if (strcmp(keyz, "chk_max_items") == 0) {
346                 checkNumeric(valz);
347                 validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
348                 e->getConfiguration().setChkMaxItems(v);
349             } else if (strcmp(keyz, "chk_period") == 0) {
350                 checkNumeric(valz);
351                 validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
352                 e->getConfiguration().setChkPeriod(v);
353             } else if (strcmp(keyz, "max_checkpoints") == 0) {
354                 checkNumeric(valz);
355                 validate(v, DEFAULT_MAX_CHECKPOINTS,
356                          MAX_CHECKPOINTS_UPPER_BOUND);
357                 e->getConfiguration().setMaxCheckpoints(v);
358             } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
359                 if (strcmp(valz, "true") == 0) {
360                     e->getConfiguration().setItemNumBasedNewChk(true);
361                 } else {
362                     e->getConfiguration().setItemNumBasedNewChk(false);
363                 }
364             } else if (strcmp(keyz, "keep_closed_chks") == 0) {
365                 if (strcmp(valz, "true") == 0) {
366                     e->getConfiguration().setKeepClosedChks(true);
367                 } else {
368                     e->getConfiguration().setKeepClosedChks(false);
369                 }
370             } else if (strcmp(keyz, "enable_chk_merge") == 0) {
371                 if (strcmp(valz, "true") == 0) {
372                     e->getConfiguration().setEnableChkMerge(true);
373                 } else {
374                     e->getConfiguration().setEnableChkMerge(false);
375                 }
376             } else {
377                 *msg = "Unknown config param";
378                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
379             }
380         } catch(std::runtime_error &) {
381             *msg = "Value out of range.";
382             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
383         }
384
385         return rv;
386     }
387
388     static protocol_binary_response_status setFlushParam(
389                                                  EventuallyPersistentEngine *e,
390                                                  const char *keyz,
391                                                  const char *valz,
392                                                  const char **msg,
393                                                  size_t *) {
394         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
395
396         // Handle the actual mutation.
397         try {
398             int v = atoi(valz);
399             if (strcmp(keyz, "bg_fetch_delay") == 0) {
400                 checkNumeric(valz);
401                 e->getConfiguration().setBgFetchDelay(v);
402             } else if (strcmp(keyz, "flushall_enabled") == 0) {
403                 if (strcmp(valz, "true") == 0) {
404                     e->getConfiguration().setFlushallEnabled(true);
405                 } else if(strcmp(valz, "false") == 0) {
406                     e->getConfiguration().setFlushallEnabled(false);
407                 } else {
408                     throw std::runtime_error("value out of range.");
409                 }
410             } else if (strcmp(keyz, "max_size") == 0) {
411                 char *ptr = NULL;
412                 checkNumeric(valz);
413                 uint64_t vsize = strtoull(valz, &ptr, 10);
414                 validate(vsize, static_cast<uint64_t>(0),
415                          std::numeric_limits<uint64_t>::max());
416                 e->getConfiguration().setMaxSize(vsize);
417                 e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
418                 e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
419             } else if (strcmp(keyz, "mem_low_wat") == 0) {
420                 char *ptr = NULL;
421                 checkNumeric(valz);
422                 uint64_t vsize = strtoull(valz, &ptr, 10);
423                 validate(vsize, static_cast<uint64_t>(0),
424                          std::numeric_limits<uint64_t>::max());
425                 e->getConfiguration().setMemLowWat(vsize);
426             } else if (strcmp(keyz, "mem_high_wat") == 0) {
427                 char *ptr = NULL;
428                 checkNumeric(valz);
429                 uint64_t vsize = strtoull(valz, &ptr, 10);
430                 validate(vsize, static_cast<uint64_t>(0),
431                          std::numeric_limits<uint64_t>::max());
432                 e->getConfiguration().setMemHighWat(vsize);
433             } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
434                 checkNumeric(valz);
435                 validate(v, 0, 100);
436                 e->getConfiguration().setBackfillMemThreshold(v);
437             } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
438                 checkNumeric(valz);
439                 validate(v, 0, 100);
440                 e->getConfiguration().setCompactionExpMemThreshold(v);
441             } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
442                 checkNumeric(valz);
443                 validate(v, 0, 100);
444                 e->getConfiguration().setMutationMemThreshold(v);
445             } else if (strcmp(keyz, "timing_log") == 0) {
446                 EPStats &stats = e->getEpStats();
447                 std::ostream *old = stats.timingLog;
448                 stats.timingLog = NULL;
449                 delete old;
450                 if (strcmp(valz, "off") == 0) {
451                     LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
452                 } else {
453                     std::ofstream *tmp(new std::ofstream(valz));
454                     if (tmp->good()) {
455                         LOG(EXTENSION_LOG_INFO,
456                             "Logging detailed timings to ``%s''.", valz);
457                         stats.timingLog = tmp;
458                     } else {
459                         LOG(EXTENSION_LOG_WARNING,
460                             "Error setting detailed timing log to ``%s'':  %s",
461                             valz, strerror(errno));
462                         delete tmp;
463                     }
464                 }
465             } else if (strcmp(keyz, "exp_pager_stime") == 0) {
466                 char *ptr = NULL;
467                 checkNumeric(valz);
468                 uint64_t vsize = strtoull(valz, &ptr, 10);
469                 validate(vsize, static_cast<uint64_t>(0),
470                          std::numeric_limits<uint64_t>::max());
471                 e->getConfiguration().setExpPagerStime((size_t)vsize);
472             } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
473                 if (strcmp(valz, "true") == 0) {
474                     e->getConfiguration().setAccessScannerEnabled(true);
475                 } else if (strcmp(valz, "false") == 0) {
476                     e->getConfiguration().setAccessScannerEnabled(false);
477                 } else {
478                     throw std::runtime_error("Value expected: true/false.");
479                 }
480             } else if (strcmp(keyz, "alog_sleep_time") == 0) {
481                 checkNumeric(valz);
482                 e->getConfiguration().setAlogSleepTime(v);
483             } else if (strcmp(keyz, "alog_task_time") == 0) {
484                 checkNumeric(valz);
485                 e->getConfiguration().setAlogTaskTime(v);
486             } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
487                 checkNumeric(valz);
488                 e->getConfiguration().setPagerActiveVbPcnt(v);
489             } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
490                 checkNumeric(valz);
491                 validate(v, 0, std::numeric_limits<int>::max());
492                 e->getConfiguration().setWarmupMinMemoryThreshold(v);
493             } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
494                 checkNumeric(valz);
495                 validate(v, 0, std::numeric_limits<int>::max());
496                 e->getConfiguration().setWarmupMinItemsThreshold(v);
497             } else if (strcmp(keyz, "max_num_readers") == 0) {
498                 checkNumeric(valz);
499                 validate(v, 0, std::numeric_limits<int>::max());
500                 e->getConfiguration().setMaxNumReaders(v);
501                 ExecutorPool::get()->setMaxReaders(v);
502             } else if (strcmp(keyz, "max_num_writers") == 0) {
503                 checkNumeric(valz);
504                 validate(v, 0, std::numeric_limits<int>::max());
505                 e->getConfiguration().setMaxNumWriters(v);
506                 ExecutorPool::get()->setMaxWriters(v);
507             } else if (strcmp(keyz, "max_num_auxio") == 0) {
508                 checkNumeric(valz);
509                 validate(v, 0, std::numeric_limits<int>::max());
510                 e->getConfiguration().setMaxNumAuxio(v);
511                 ExecutorPool::get()->setMaxAuxIO(v);
512             } else if (strcmp(keyz, "max_num_nonio") == 0) {
513                 checkNumeric(valz);
514                 validate(v, 0, std::numeric_limits<int>::max());
515                 e->getConfiguration().setMaxNumNonio(v);
516                 ExecutorPool::get()->setMaxNonIO(v);
517             } else if (strcmp(keyz, "bfilter_enabled") == 0) {
518                 if (strcmp(valz, "true") == 0) {
519                     e->getConfiguration().setBfilterEnabled(true);
520                 } else if (strcmp(valz, "false") == 0) {
521                     e->getConfiguration().setBfilterEnabled(false);
522                 } else {
523                     throw std::runtime_error("Value expected: true/false.");
524                 }
525             } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
526                 float val = atof(valz);
527                 if (val >= 0.0 && val <= 1.0) {
528                     e->getConfiguration().setBfilterResidencyThreshold(val);
529                 } else {
530                     throw std::runtime_error("Value out of range [0.0-1.0].");
531                 }
532             } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
533                 if (strcmp(valz, "true") == 0) {
534                     e->getConfiguration().setDefragmenterEnabled(true);
535                 } else {
536                     e->getConfiguration().setDefragmenterEnabled(false);
537                 }
538             } else if (strcmp(keyz, "defragmenter_interval") == 0) {
539                 checkNumeric(valz);
540                 validate(v, 1, std::numeric_limits<int>::max());
541                 e->getConfiguration().setDefragmenterInterval(v);
542             } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
543                 checkNumeric(valz);
544                 validate(v, 0, std::numeric_limits<int>::max());
545                 e->getConfiguration().setDefragmenterAgeThreshold(v);
546             } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
547                 checkNumeric(valz);
548                 validate(v, 1, std::numeric_limits<int>::max());
549                 e->getConfiguration().setDefragmenterChunkDuration(v);
550             } else if (strcmp(keyz, "defragmenter_run") == 0) {
551                 e->runDefragmenterTask();
552             } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
553                 checkNumeric(valz);
554                 validate(v, 1, std::numeric_limits<int>::max());
555                 e->getConfiguration().setCompactionWriteQueueCap(v);
556             } else {
557                 *msg = "Unknown config param";
558                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
559             }
560         } catch(std::runtime_error& ex) {
561             *msg = "Value out of range.";
562             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
563         }
564
565         return rv;
566     }
567
568     static protocol_binary_response_status setDcpParam(
569                                                     EventuallyPersistentEngine *e,
570                                                     const char *keyz,
571                                                     const char *valz,
572                                                     const char **msg,
573                                                     size_t *) {
574         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
575         try {
576
577             if (strcmp(keyz, "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
578                 size_t v = atoi(valz);
579                 checkNumeric(valz);
580                 validate(v, size_t(1), std::numeric_limits<size_t>::max());
581                 e->getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(v);
582             } else if (strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") == 0) {
583                 size_t v = atoi(valz);
584                 checkNumeric(valz);
585                 validate(v, size_t(1), std::numeric_limits<size_t>::max());
586                 e->getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(v);
587             } else {
588                 *msg = "Unknown config param";
589                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
590             }
591         } catch (std::runtime_error& ex) {
592             *msg = "Value out of range.";
593             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
594         }
595
596         return rv;
597     }
598
599     static protocol_binary_response_status evictKey(
600                                                  EventuallyPersistentEngine *e,
601                                                  protocol_binary_request_header
602                                                                       *request,
603                                                  const char **msg,
604                                                  size_t *msg_size) {
605         protocol_binary_request_no_extras *req =
606             (protocol_binary_request_no_extras*)request;
607
608         char keyz[256];
609
610         // Read the key.
611         int keylen = ntohs(req->message.header.request.keylen);
612         if (keylen >= (int)sizeof(keyz)) {
613             *msg = "Key is too large.";
614             return PROTOCOL_BINARY_RESPONSE_EINVAL;
615         }
616         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
617         keyz[keylen] = 0x00;
618
619         uint16_t vbucket = ntohs(request->request.vbucket);
620
621         std::string key(keyz, keylen);
622
623         LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
624                 keyz);
625
626         protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
627                                                          msg_size);
628         if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
629             rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
630             if (e->isDegradedMode()) {
631                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
632             }
633         }
634         return rv;
635     }
636
637     static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
638                                        protocol_binary_request_header *req,
639                                        const void *cookie,
640                                        Item **itm,
641                                        const char **msg,
642                                        size_t *,
643                                        protocol_binary_response_status *res) {
644
645         uint8_t extlen = req->request.extlen;
646         if (extlen != 0 && extlen != 4) {
647             *msg = "Invalid packet format (extlen may be 0 or 4)";
648             *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
649             return ENGINE_EINVAL;
650         }
651
652         protocol_binary_request_getl *grequest =
653             (protocol_binary_request_getl*)req;
654         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
655
656         const char *keyp = reinterpret_cast<const char*>(req->bytes);
657         keyp += sizeof(req->bytes) + extlen;
658         std::string key(keyp, ntohs(req->request.keylen));
659         uint16_t vbucket = ntohs(req->request.vbucket);
660
661         RememberingCallback<GetValue> getCb;
662         uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
663         uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
664         uint32_t lockTimeout = default_timeout;
665         if (extlen == 4) {
666             lockTimeout = ntohl(grequest->message.body.expiration);
667         }
668
669         if (lockTimeout >  max_timeout || lockTimeout < 1) {
670             LOG(EXTENSION_LOG_WARNING,
671                 "Illegal value for lock timeout specified"
672                 " %u. Using default value: %u", lockTimeout, default_timeout);
673             lockTimeout = default_timeout;
674         }
675
676         bool gotLock = e->getLocked(key, vbucket, getCb,
677                                     ep_current_time(),
678                                     lockTimeout, cookie);
679
680         getCb.waitForValue();
681         ENGINE_ERROR_CODE rv = getCb.val.getStatus();
682
683         if (rv == ENGINE_SUCCESS) {
684             *itm = getCb.val.getValue();
685             ++(e->getEpStats().numOpsGet);
686         } else if (rv == ENGINE_EWOULDBLOCK) {
687
688             // need to wait for value
689             return rv;
690         } else if (rv == ENGINE_NOT_MY_VBUCKET) {
691             *msg = "That's not my bucket.";
692             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
693             return ENGINE_NOT_MY_VBUCKET;
694         } else if (!gotLock){
695             *msg =  "LOCK_ERROR";
696             *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
697             return ENGINE_TMPFAIL;
698         } else {
699             if (e->isDegradedMode()) {
700                 *msg = "LOCK_TMP_ERROR";
701                 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
702                 return ENGINE_TMPFAIL;
703             }
704
705             *msg = "NOT_FOUND";
706             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
707             return ENGINE_KEY_ENOENT;
708         }
709
710         return rv;
711     }
712
713     static protocol_binary_response_status unlockKey(
714                                                  EventuallyPersistentEngine *e,
715                                                  protocol_binary_request_header
716                                                                       *request,
717                                                  const char **msg,
718                                                  size_t *)
719     {
720         protocol_binary_request_no_extras *req =
721             (protocol_binary_request_no_extras*)request;
722
723         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
724         char keyz[256];
725
726         // Read the key.
727         int keylen = ntohs(req->message.header.request.keylen);
728         if (keylen >= (int)sizeof(keyz)) {
729             *msg = "Key is too large.";
730             return PROTOCOL_BINARY_RESPONSE_EINVAL;
731         }
732
733         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
734         keyz[keylen] = 0x00;
735
736         uint16_t vbucket = ntohs(request->request.vbucket);
737         std::string key(keyz, keylen);
738
739         LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
740
741         RememberingCallback<GetValue> getCb;
742         uint64_t cas = ntohll(request->request.cas);
743
744         ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
745                                             ep_current_time());
746
747         if (rv == ENGINE_SUCCESS) {
748             *msg = "UNLOCKED";
749         } else if (rv == ENGINE_TMPFAIL){
750             *msg =  "UNLOCK_ERROR";
751             res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
752         } else {
753             if (e->isDegradedMode()) {
754                 *msg = "LOCK_TMP_ERROR";
755                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
756             }
757
758             RCPtr<VBucket> vb = e->getVBucket(vbucket);
759             if (!vb) {
760                 *msg = "That's not my bucket.";
761                 res =  PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
762             }
763             *msg = "NOT_FOUND";
764             res =  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
765         }
766
767         return res;
768     }
769
770     static protocol_binary_response_status setParam(
771                                             EventuallyPersistentEngine *e,
772                                             protocol_binary_request_set_param
773                                                                      *req,
774                                             const char **msg,
775                                             size_t *msg_size) {
776
777         size_t keylen = ntohs(req->message.header.request.keylen);
778         uint8_t extlen = req->message.header.request.extlen;
779         size_t vallen = ntohl(req->message.header.request.bodylen);
780         protocol_binary_engine_param_t paramtype =
781             static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
782
783         if (keylen == 0 || (vallen - keylen - extlen) == 0) {
784             return PROTOCOL_BINARY_RESPONSE_EINVAL;
785         }
786
787         const char *keyp = reinterpret_cast<const char*>(req->bytes)
788                            + sizeof(req->bytes);
789         const char *valuep = keyp + keylen;
790         vallen -= (keylen + extlen);
791
792         char keyz[128];
793         char valz[512];
794
795         // Read the key.
796         if (keylen >= sizeof(keyz)) {
797             *msg = "Key is too large.";
798             return PROTOCOL_BINARY_RESPONSE_EINVAL;
799         }
800         memcpy(keyz, keyp, keylen);
801         keyz[keylen] = 0x00;
802
803         // Read the value.
804         if (vallen >= sizeof(valz)) {
805             *msg = "Value is too large.";
806             return PROTOCOL_BINARY_RESPONSE_EINVAL;
807         }
808         memcpy(valz, valuep, vallen);
809         valz[vallen] = 0x00;
810
811         protocol_binary_response_status rv;
812
813         switch (paramtype) {
814         case protocol_binary_engine_param_flush:
815             rv = setFlushParam(e, keyz, valz, msg, msg_size);
816             break;
817         case protocol_binary_engine_param_tap:
818             rv = setTapParam(e, keyz, valz, msg, msg_size);
819             break;
820         case protocol_binary_engine_param_checkpoint:
821             rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
822             break;
823         case protocol_binary_engine_param_dcp:
824             rv = setDcpParam(e, keyz, valz, msg, msg_size);
825             break;
826         default:
827             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
828         }
829
830         return rv;
831     }
832
833     static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
834                                        const void *cookie,
835                                        protocol_binary_request_header *request,
836                                        ADD_RESPONSE response) {
837         protocol_binary_request_get_vbucket *req =
838             reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
839         cb_assert(req);
840
841         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
842         RCPtr<VBucket> vb = e->getVBucket(vbucket);
843         if (!vb) {
844             LockHolder lh(e->clusterConfig.lock);
845             return sendResponse(response, NULL, 0, NULL, 0,
846                                 e->clusterConfig.config,
847                                 e->clusterConfig.len,
848                                 PROTOCOL_BINARY_RAW_BYTES,
849                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
850                                 cookie);
851         } else {
852             vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
853             return sendResponse(response, NULL, 0, NULL, 0, &state,
854                                 sizeof(state),
855                                 PROTOCOL_BINARY_RAW_BYTES,
856                                 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
857         }
858     }
859
860     static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
861                                        const void *cookie,
862                                        protocol_binary_request_header *request,
863                                        ADD_RESPONSE response) {
864
865         protocol_binary_request_set_vbucket *req =
866             reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
867
868         uint64_t cas = ntohll(req->message.header.request.cas);
869
870         size_t bodylen = ntohl(req->message.header.request.bodylen)
871             - ntohs(req->message.header.request.keylen);
872         if (bodylen != sizeof(vbucket_state_t)) {
873             const std::string msg("Incorrect packet format");
874             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
875                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
876                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
877                                 cas, cookie);
878         }
879
880         vbucket_state_t state;
881         memcpy(&state, &req->message.body.state, sizeof(state));
882         state = static_cast<vbucket_state_t>(ntohl(state));
883
884         if (!is_valid_vbucket_state_t(state)) {
885             const std::string msg("Invalid vbucket state");
886             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
887                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
888                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
889                                 cas, cookie);
890         }
891
892         uint16_t vb = ntohs(req->message.header.request.vbucket);
893         if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
894             const std::string msg("VBucket number too big");
895             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
896                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
897                                 PROTOCOL_BINARY_RESPONSE_ERANGE,
898                                 cas, cookie);
899         }
900         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
901                             PROTOCOL_BINARY_RAW_BYTES,
902                             PROTOCOL_BINARY_RESPONSE_SUCCESS,
903                             cas, cookie);
904     }
905
906     static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
907                                         const void *cookie,
908                                         protocol_binary_request_header *req,
909                                         ADD_RESPONSE response) {
910
911         uint64_t cas = ntohll(req->request.cas);
912
913         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
914         uint16_t vbucket = ntohs(req->request.vbucket);
915
916         std::string msg = "";
917         if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
918             msg = "Incorrect packet format.";
919             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
920                                 msg.length(),
921                                 PROTOCOL_BINARY_RAW_BYTES,
922                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
923         }
924
925         bool sync = false;
926         uint32_t bodylen = ntohl(req->request.bodylen);
927         if (bodylen > 0) {
928             const char* ptr = reinterpret_cast<const char*>(req->bytes) +
929                 sizeof(req->bytes);
930             if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
931                 sync = true;
932             }
933         }
934
935         ENGINE_ERROR_CODE err;
936         void* es = e->getEngineSpecific(cookie);
937         if (sync) {
938             if (es == NULL) {
939                 err = e->deleteVBucket(vbucket, cookie);
940                 e->storeEngineSpecific(cookie, e);
941             } else {
942                 e->storeEngineSpecific(cookie, NULL);
943                 LOG(EXTENSION_LOG_INFO,
944                     "Completed sync deletion of vbucket %u",
945                     (unsigned)vbucket);
946                 err = ENGINE_SUCCESS;
947             }
948         } else {
949             err = e->deleteVBucket(vbucket);
950         }
951         switch (err) {
952         case ENGINE_SUCCESS:
953             LOG(EXTENSION_LOG_WARNING,
954                 "Deletion of vbucket %d was completed.", vbucket);
955             break;
956         case ENGINE_NOT_MY_VBUCKET:
957             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
958                 "because the vbucket doesn't exist!!!", vbucket);
959             res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
960             break;
961         case ENGINE_EINVAL:
962             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
963                 "because the vbucket is not in a dead state\n", vbucket);
964             msg = "Failed to delete vbucket.  Must be in the dead state.";
965             res = PROTOCOL_BINARY_RESPONSE_EINVAL;
966             break;
967         case ENGINE_EWOULDBLOCK:
968             LOG(EXTENSION_LOG_WARNING, "Request for vbucket %d deletion is in"
969                 " EWOULDBLOCK until the database file is removed from disk",
970                 vbucket);
971             e->storeEngineSpecific(cookie, req);
972             return ENGINE_EWOULDBLOCK;
973         default:
974             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
975                 "because of unknown reasons\n", vbucket);
976             msg = "Failed to delete vbucket.  Unknown reason.";
977             res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
978         }
979
980         if (err != ENGINE_NOT_MY_VBUCKET) {
981             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
982                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
983                                 res, cas, cookie);
984         } else {
985             LockHolder lh(e->clusterConfig.lock);
986             return sendResponse(response, NULL, 0, NULL, 0,
987                                 e->clusterConfig.config,
988                                 e->clusterConfig.len,
989                                 PROTOCOL_BINARY_RAW_BYTES,
990                                 res, cas, cookie);
991         }
992
993     }
994
995     static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
996                                        protocol_binary_request_header *request,
997                                        const void *cookie,
998                                        Item **it,
999                                        const char **msg,
1000                                        protocol_binary_response_status *res) {
1001         EventuallyPersistentStore *eps = e->getEpStore();
1002         protocol_binary_request_no_extras *req =
1003             (protocol_binary_request_no_extras*)request;
1004         int keylen = ntohs(req->message.header.request.keylen);
1005         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1006         ENGINE_ERROR_CODE error_code;
1007         std::string keystr(((char *)request) + sizeof(req->message.header),
1008                             keylen);
1009
1010         GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
1011
1012         if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
1013             if (error_code == ENGINE_NOT_MY_VBUCKET) {
1014                 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1015                 return error_code;
1016             } else if (error_code == ENGINE_TMPFAIL) {
1017                 *msg = "NOT_FOUND";
1018                 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1019             } else {
1020                 return error_code;
1021             }
1022         } else {
1023             *it = rv.getValue();
1024             *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1025         }
1026         ++(e->getEpStats().numOpsGet);
1027         return ENGINE_SUCCESS;
1028     }
1029
1030     static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
1031                                        const void *cookie,
1032                                        protocol_binary_request_compact_db *req,
1033                                        ADD_RESPONSE response) {
1034
1035         std::string msg = "";
1036         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1037         compaction_ctx compactreq;
1038         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1039         uint64_t cas = ntohll(req->message.header.request.cas);
1040
1041         if (ntohs(req->message.header.request.keylen) > 0 ||
1042              req->message.header.request.extlen != 24) {
1043             LOG(EXTENSION_LOG_WARNING,
1044                     "Compaction of vbucket %d received bad ext/key len %d/%d.",
1045                     vbucket, req->message.header.request.extlen,
1046                     ntohs(req->message.header.request.keylen));
1047             msg = "Incorrect packet format.";
1048             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1049                                 msg.length(),
1050                                 PROTOCOL_BINARY_RAW_BYTES,
1051                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
1052         }
1053         EPStats &stats = e->getEpStats();
1054         compactreq.max_purged_seq = 0;
1055         compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
1056         compactreq.purge_before_seq =
1057                                     ntohll(req->message.body.purge_before_seq);
1058         compactreq.drop_deletes     = req->message.body.drop_deletes;
1059         compactreq.bfcb             = NULL;
1060
1061         ENGINE_ERROR_CODE err;
1062         void* es = e->getEngineSpecific(cookie);
1063         if (es == NULL) {
1064             ++stats.pendingCompactions;
1065             e->storeEngineSpecific(cookie, e);
1066             err = e->compactDB(vbucket, compactreq, cookie);
1067         } else {
1068             e->storeEngineSpecific(cookie, NULL);
1069             err = ENGINE_SUCCESS;
1070         }
1071
1072         switch (err) {
1073             case ENGINE_SUCCESS:
1074                 LOG(EXTENSION_LOG_INFO,
1075                     "Compaction of vbucket %d completed.", vbucket);
1076                 break;
1077             case ENGINE_NOT_MY_VBUCKET:
1078                 --stats.pendingCompactions;
1079                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1080                     "because the vbucket doesn't exist!!!", vbucket);
1081                 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1082                 break;
1083             case ENGINE_EWOULDBLOCK:
1084                 LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
1085                         "in EWOULDBLOCK state until the database file is "
1086                         "compacted on disk",
1087                         vbucket);
1088                 e->storeEngineSpecific(cookie, req);
1089                 return ENGINE_EWOULDBLOCK;
1090             case ENGINE_TMPFAIL:
1091                 LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
1092                         " a temporary failure and may need to be retried",
1093                         vbucket);
1094                 msg = "Temporary failure in compacting vbucket.";
1095                 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1096                 break;
1097             default:
1098                 --stats.pendingCompactions;
1099                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1100                     "because of unknown reasons\n", vbucket);
1101                 msg = "Failed to compact vbucket.  Unknown reason.";
1102                 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1103                 break;
1104         }
1105
1106         if (err != ENGINE_NOT_MY_VBUCKET) {
1107             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1108                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1109                                 res, cas, cookie);
1110         } else {
1111             LockHolder lh(e->clusterConfig.lock);
1112             return sendResponse(response, NULL, 0, NULL, 0,
1113                                 e->clusterConfig.config,
1114                                 e->clusterConfig.len,
1115                                 PROTOCOL_BINARY_RAW_BYTES,
1116                                 res, cas, cookie);
1117         }
1118     }
1119
1120     static ENGINE_ERROR_CODE processUnknownCommand(
1121                                        EventuallyPersistentEngine *h,
1122                                        const void* cookie,
1123                                        protocol_binary_request_header *request,
1124                                        ADD_RESPONSE response)
1125     {
1126         protocol_binary_response_status res =
1127                                       PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1128         const char *msg = NULL;
1129         size_t msg_size = 0;
1130         Item *itm = NULL;
1131
1132         EPStats &stats = h->getEpStats();
1133         ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1134
1135         /**
1136          * Session validation
1137          * (For ns_server commands only)
1138          */
1139         switch (request->request.opcode) {
1140             case PROTOCOL_BINARY_CMD_SET_PARAM:
1141             case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1142             case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1143             case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1144             case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1145             case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1146             case PROTOCOL_BINARY_CMD_COMPACT_DB:
1147             {
1148                 if (h->getEngineSpecific(cookie) == NULL) {
1149                     uint64_t cas = ntohll(request->request.cas);
1150                     if (!h->validateSessionCas(cas)) {
1151                         const std::string message("Invalid session token");
1152                         return sendResponse(response, NULL, 0, NULL, 0,
1153                                             message.c_str(), message.length(),
1154                                             PROTOCOL_BINARY_RAW_BYTES,
1155                                             PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1156                                             cas, cookie);
1157                     }
1158                 }
1159                 break;
1160             }
1161             default:
1162                 break;
1163         }
1164
1165         switch (request->request.opcode) {
1166         case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1167             return h->getAllVBucketSequenceNumbers(cookie, request, response);
1168
1169         case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1170             {
1171                 BlockTimer timer(&stats.getVbucketCmdHisto);
1172                 rv = getVBucket(h, cookie, request, response);
1173                 return rv;
1174             }
1175         case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1176             {
1177                 BlockTimer timer(&stats.delVbucketCmdHisto);
1178                 rv = delVBucket(h, cookie, request, response);
1179                 if (rv != ENGINE_EWOULDBLOCK) {
1180                     h->decrementSessionCtr();
1181                     h->storeEngineSpecific(cookie, NULL);
1182                 }
1183                 return rv;
1184             }
1185         case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1186             {
1187                 BlockTimer timer(&stats.setVbucketCmdHisto);
1188                 rv = setVBucket(h, cookie, request, response);
1189                 h->decrementSessionCtr();
1190                 return rv;
1191             }
1192         case PROTOCOL_BINARY_CMD_TOUCH:
1193         case PROTOCOL_BINARY_CMD_GAT:
1194         case PROTOCOL_BINARY_CMD_GATQ:
1195             {
1196                 rv = h->touch(cookie, request, response);
1197                 return rv;
1198             }
1199         case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1200             res = stopFlusher(h, &msg, &msg_size);
1201             break;
1202         case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1203             res = startFlusher(h, &msg, &msg_size);
1204             break;
1205         case PROTOCOL_BINARY_CMD_SET_PARAM:
1206             res = setParam(h,
1207                   reinterpret_cast<protocol_binary_request_set_param*>(request),
1208                             &msg, &msg_size);
1209             h->decrementSessionCtr();
1210             break;
1211         case PROTOCOL_BINARY_CMD_EVICT_KEY:
1212             res = evictKey(h, request, &msg, &msg_size);
1213             break;
1214         case PROTOCOL_BINARY_CMD_GET_LOCKED:
1215             rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1216             if (rv == ENGINE_EWOULDBLOCK) {
1217                 // we dont have the value for the item yet
1218                 return rv;
1219             }
1220             break;
1221         case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
1222             res = unlockKey(h, request, &msg, &msg_size);
1223             break;
1224         case PROTOCOL_BINARY_CMD_OBSERVE:
1225             return h->observe(cookie, request, response);
1226         case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1227             return h->observe_seqno(cookie, request, response);
1228         case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1229             {
1230                 rv = h->deregisterTapClient(cookie, request, response);
1231                 h->decrementSessionCtr();
1232                 return rv;
1233             }
1234         case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN:
1235             {
1236                 rv = h->resetReplicationChain(cookie, request, response);
1237                 return rv;
1238             }
1239         case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1240             {
1241                 rv = h->changeTapVBFilter(cookie, request, response);
1242                 h->decrementSessionCtr();
1243                 return rv;
1244             }
1245         case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1246         case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1247         case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE:
1248             {
1249                 rv = h->handleCheckpointCmds(cookie, request, response);
1250                 return rv;
1251             }
1252         case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE:
1253             {
1254                 rv = h->handleSeqnoCmds(cookie, request, response);
1255                 return rv;
1256             }
1257         case PROTOCOL_BINARY_CMD_GET_META:
1258         case PROTOCOL_BINARY_CMD_GETQ_META:
1259             {
1260                 rv = h->getMeta(cookie,
1261                         reinterpret_cast<protocol_binary_request_get_meta*>
1262                                                           (request), response);
1263                 return rv;
1264             }
1265         case PROTOCOL_BINARY_CMD_SET_WITH_META:
1266         case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1267         case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1268         case PROTOCOL_BINARY_CMD_ADDQ_WITH_META:
1269             {
1270                 rv = h->setWithMeta(cookie,
1271                      reinterpret_cast<protocol_binary_request_set_with_meta*>
1272                                                           (request), response);
1273                 return rv;
1274             }
1275         case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1276         case PROTOCOL_BINARY_CMD_DELQ_WITH_META:
1277             {
1278                 rv = h->deleteWithMeta(cookie,
1279                     reinterpret_cast<protocol_binary_request_delete_with_meta*>
1280                                                           (request), response);
1281                 return rv;
1282             }
1283         case PROTOCOL_BINARY_CMD_RETURN_META:
1284             {
1285                 return h->returnMeta(cookie,
1286                 reinterpret_cast<protocol_binary_request_return_meta*>
1287                                                           (request), response);
1288             }
1289         case PROTOCOL_BINARY_CMD_GET_REPLICA:
1290             rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1291             if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1292                 return rv;
1293             }
1294             break;
1295         case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1296         case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC:
1297             {
1298                 rv = h->handleTrafficControlCmd(cookie, request, response);
1299                 return rv;
1300             }
1301         case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1302             {
1303                 rv = h->setClusterConfig(cookie,
1304                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1305                                                           (request), response);
1306                 h->decrementSessionCtr();
1307                 return rv;
1308             }
1309         case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1310             return h->getClusterConfig(cookie,
1311                reinterpret_cast<protocol_binary_request_get_cluster_config*>
1312                                                           (request), response);
1313         case PROTOCOL_BINARY_CMD_COMPACT_DB:
1314             {
1315                 rv = compactDB(h, cookie,
1316                                (protocol_binary_request_compact_db*)(request),
1317                                response);
1318                 if (rv != ENGINE_EWOULDBLOCK) {
1319                     h->decrementSessionCtr();
1320                     h->storeEngineSpecific(cookie, NULL);
1321                 }
1322                 return rv;
1323             }
1324         case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
1325             {
1326                 if (request->request.extlen != 0 ||
1327                     request->request.keylen != 0 ||
1328                     request->request.bodylen != 0) {
1329                     return ENGINE_EINVAL;
1330                 }
1331                 return h->getRandomKey(cookie, response);
1332             }
1333         case CMD_GET_KEYS:
1334             {
1335                 return h->getAllKeys(cookie,
1336                    reinterpret_cast<protocol_binary_request_get_keys*>
1337                                                            (request), response);
1338             }
1339         case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1340             {
1341                 return h->getAdjustedTime(cookie,
1342                    reinterpret_cast<protocol_binary_request_get_adjusted_time*>
1343                                                            (request), response);
1344             }
1345         case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE:
1346             {
1347                 return h->setDriftCounterState(cookie,
1348                 reinterpret_cast<protocol_binary_request_set_drift_counter_state*>
1349                                                            (request), response);
1350             }
1351         }
1352
1353         // Send a special response for getl since we don't want to send the key
1354         if (itm && request->request.opcode == PROTOCOL_BINARY_CMD_GET_LOCKED) {
1355             uint32_t flags = itm->getFlags();
1356             rv = sendResponse(response, NULL, 0, (const void *)&flags,
1357                               sizeof(uint32_t),
1358                               static_cast<const void *>(itm->getData()),
1359                               itm->getNBytes(), itm->getDataType(),
1360                               static_cast<uint16_t>(res), itm->getCas(),
1361                               cookie);
1362             delete itm;
1363         } else if (itm) {
1364             const std::string &key  = itm->getKey();
1365             uint32_t flags = itm->getFlags();
1366             rv = sendResponse(response, static_cast<const void *>(key.data()),
1367                               itm->getNKey(),
1368                               (const void *)&flags, sizeof(uint32_t),
1369                               static_cast<const void *>(itm->getData()),
1370                               itm->getNBytes(), itm->getDataType(),
1371                               static_cast<uint16_t>(res), itm->getCas(),
1372                               cookie);
1373             delete itm;
1374         } else  if (rv == ENGINE_NOT_MY_VBUCKET) {
1375             LockHolder lh(h->clusterConfig.lock);
1376             return sendResponse(response, NULL, 0, NULL, 0,
1377                                 h->clusterConfig.config,
1378                                 h->clusterConfig.len,
1379                                 PROTOCOL_BINARY_RAW_BYTES,
1380                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1381                                 cookie);
1382         } else {
1383             msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1384             rv = sendResponse(response, NULL, 0, NULL, 0,
1385                               msg, static_cast<uint16_t>(msg_size),
1386                               PROTOCOL_BINARY_RAW_BYTES,
1387                               static_cast<uint16_t>(res), 0, cookie);
1388
1389         }
1390         return rv;
1391     }
1392
1393     static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1394                                                const void* cookie,
1395                                                protocol_binary_request_header
1396                                                                       *request,
1397                                                ADD_RESPONSE response)
1398     {
1399         ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1400                                                            cookie,
1401                                                            request, response);
1402         releaseHandle(handle);
1403         return err_code;
1404     }
1405
1406     static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1407                               item *itm, uint64_t cas) {
1408         static_cast<Item*>(itm)->setCas(cas);
1409     }
1410
1411     static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1412                                           const void *cookie,
1413                                           void *engine_specific,
1414                                           uint16_t nengine,
1415                                           uint8_t ttl,
1416                                           uint16_t tap_flags,
1417                                           tap_event_t tap_event,
1418                                           uint32_t tap_seqno,
1419                                           const void *key,
1420                                           size_t nkey,
1421                                           uint32_t flags,
1422                                           uint32_t exptime,
1423                                           uint64_t cas,
1424                                           uint8_t datatype,
1425                                           const void *data,
1426                                           size_t ndata,
1427                                           uint16_t vbucket)
1428     {
1429         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1430             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1431                     " (TapNotify)");
1432             return ENGINE_EINVAL;
1433         }
1434         ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1435                                                         engine_specific,
1436                                                         nengine, ttl,
1437                                                         tap_flags,
1438                                                         (uint16_t)tap_event,
1439                                                         tap_seqno,
1440                                                         key, nkey, flags,
1441                                                         exptime, cas,
1442                                                         datatype, data,
1443                                                         ndata, vbucket);
1444         releaseHandle(handle);
1445         return err_code;
1446     }
1447
1448     static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1449                                       const void *cookie, item **itm,
1450                                       void **es, uint16_t *nes, uint8_t *ttl,
1451                                       uint16_t *flags, uint32_t *seqno,
1452                                       uint16_t *vbucket) {
1453         uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1454                                                              nes, ttl,
1455                                                              flags, seqno,
1456                                                              vbucket);
1457         releaseHandle(handle);
1458         return static_cast<tap_event_t>(tap_event);
1459     }
1460
1461     static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1462                                           const void* cookie,
1463                                           const void* client,
1464                                           size_t nclient,
1465                                           uint32_t flags,
1466                                           const void* userdata,
1467                                           size_t nuserdata)
1468     {
1469         EventuallyPersistentEngine *h = getHandle(handle);
1470         TAP_ITERATOR iterator = NULL;
1471         {
1472             std::string c(static_cast<const char*>(client), nclient);
1473             // Figure out what we want from the userdata before adding it to
1474             // the API to the handle
1475             if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1476                 iterator = EvpTapIterator;
1477             }
1478         }
1479         releaseHandle(handle);
1480         return iterator;
1481     }
1482
1483
1484     static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1485                                        const void* cookie,
1486                                        struct dcp_message_producers *producers)
1487     {
1488         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1489         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1490         if (conn) {
1491             errCode = conn->step(producers);
1492         }
1493         releaseHandle(handle);
1494         return errCode;
1495     }
1496
1497
1498     static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1499                                         const void* cookie,
1500                                         uint32_t opaque,
1501                                         uint32_t seqno,
1502                                         uint32_t flags,
1503                                         void *name,
1504                                         uint16_t nname)
1505     {
1506         ENGINE_ERROR_CODE errCode;
1507         errCode = getHandle(handle)->dcpOpen(cookie, opaque, seqno, flags,
1508                                              name, nname);
1509         releaseHandle(handle);
1510         return errCode;
1511     }
1512
1513     static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1514                                              const void* cookie,
1515                                              uint32_t opaque,
1516                                              uint16_t vbucket,
1517                                              uint32_t flags)
1518     {
1519         ENGINE_ERROR_CODE errCode = getHandle(handle)->dcpAddStream(cookie,
1520                                                                     opaque,
1521                                                                     vbucket,
1522                                                                     flags);
1523         releaseHandle(handle);
1524         return errCode;
1525     }
1526
1527     static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1528                                                const void* cookie,
1529                                                uint32_t opaque,
1530                                                uint16_t vbucket)
1531     {
1532         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1533         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1534         if (conn) {
1535             errCode = conn->closeStream(opaque, vbucket);
1536         }
1537         releaseHandle(handle);
1538         return errCode;
1539     }
1540
1541
1542     static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1543                                              const void* cookie,
1544                                              uint32_t flags,
1545                                              uint32_t opaque,
1546                                              uint16_t vbucket,
1547                                              uint64_t startSeqno,
1548                                              uint64_t endSeqno,
1549                                              uint64_t vbucketUuid,
1550                                              uint64_t snapStartSeqno,
1551                                              uint64_t snapEndSeqno,
1552                                              uint64_t *rollbackSeqno,
1553                                              dcp_add_failover_log callback)
1554     {
1555         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1556         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1557         if (conn) {
1558             errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1559                                           endSeqno, vbucketUuid, snapStartSeqno,
1560                                           snapEndSeqno, rollbackSeqno, callback);
1561         }
1562         releaseHandle(handle);
1563         return errCode;
1564     }
1565
1566     static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1567                                                  const void* cookie,
1568                                                  uint32_t opaque,
1569                                                  uint16_t vbucket,
1570                                                  dcp_add_failover_log callback)
1571     {
1572         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1573         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1574         if (conn) {
1575             errCode = conn->getFailoverLog(opaque, vbucket, callback);
1576         }
1577         releaseHandle(handle);
1578         return errCode;
1579     }
1580
1581
1582     static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1583                                              const void* cookie,
1584                                              uint32_t opaque,
1585                                              uint16_t vbucket,
1586                                              uint32_t flags)
1587     {
1588         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1589         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1590         if (conn) {
1591             errCode = conn->streamEnd(opaque, vbucket, flags);
1592         }
1593         releaseHandle(handle);
1594         return errCode;
1595     }
1596
1597
1598     static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1599                                                   const void* cookie,
1600                                                   uint32_t opaque,
1601                                                   uint16_t vbucket,
1602                                                   uint64_t start_seqno,
1603                                                   uint64_t end_seqno,
1604                                                   uint32_t flags)
1605     {
1606         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1607         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1608         if (conn) {
1609             errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1610                                            end_seqno, flags);
1611         }
1612         releaseHandle(handle);
1613         return errCode;
1614     }
1615
1616     static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1617                                             const void* cookie,
1618                                             uint32_t opaque,
1619                                             const void *key,
1620                                             uint16_t nkey,
1621                                             const void *value,
1622                                             uint32_t nvalue,
1623                                             uint64_t cas,
1624                                             uint16_t vbucket,
1625                                             uint32_t flags,
1626                                             uint8_t datatype,
1627                                             uint64_t bySeqno,
1628                                             uint64_t revSeqno,
1629                                             uint32_t expiration,
1630                                             uint32_t lockTime,
1631                                             const void *meta,
1632                                             uint16_t nmeta,
1633                                             uint8_t nru)
1634     {
1635         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1636             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1637                     " (DCPMutation)");
1638             return ENGINE_EINVAL;
1639         }
1640         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1641         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1642         if (conn) {
1643             errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1644                                      vbucket, flags, datatype, lockTime,
1645                                      bySeqno, revSeqno, expiration,
1646                                      nru, meta, nmeta);
1647         }
1648         releaseHandle(handle);
1649         return errCode;
1650     }
1651
1652     static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1653                                             const void* cookie,
1654                                             uint32_t opaque,
1655                                             const void *key,
1656                                             uint16_t nkey,
1657                                             uint64_t cas,
1658                                             uint16_t vbucket,
1659                                             uint64_t bySeqno,
1660                                             uint64_t revSeqno,
1661                                             const void *meta,
1662                                             uint16_t nmeta)
1663     {
1664         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1665         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1666         if (conn) {
1667             errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1668                                      revSeqno, meta, nmeta);
1669         }
1670         releaseHandle(handle);
1671         return errCode;
1672     }
1673
1674     static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1675                                               const void* cookie,
1676                                               uint32_t opaque,
1677                                               const void *key,
1678                                               uint16_t nkey,
1679                                               uint64_t cas,
1680                                               uint16_t vbucket,
1681                                               uint64_t bySeqno,
1682                                               uint64_t revSeqno,
1683                                               const void *meta,
1684                                               uint16_t nmeta)
1685     {
1686         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1687         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1688         if (conn) {
1689             errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1690                                        revSeqno, meta, nmeta);
1691         }
1692         releaseHandle(handle);
1693         return errCode;
1694     }
1695
1696     static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1697                                          const void* cookie,
1698                                          uint32_t opaque,
1699                                          uint16_t vbucket)
1700     {
1701         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1702         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1703         if (conn) {
1704             errCode = conn->flushall(opaque, vbucket);
1705         }
1706         releaseHandle(handle);
1707         return errCode;
1708     }
1709
1710     static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1711                                                    const void* cookie,
1712                                                    uint32_t opaque,
1713                                                    uint16_t vbucket,
1714                                                    vbucket_state_t state)
1715     {
1716         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1717         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1718         if (conn) {
1719             errCode = conn->setVBucketState(opaque, vbucket, state);
1720         }
1721         releaseHandle(handle);
1722         return errCode;
1723     }
1724
1725     static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1726                                         const void* cookie,
1727                                         uint32_t opaque) {
1728         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1729         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1730         if (conn) {
1731             errCode = conn->noop(opaque);
1732         }
1733         releaseHandle(handle);
1734         return errCode;
1735     }
1736
1737     static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1738                                                          const void* cookie,
1739                                                          uint32_t opaque,
1740                                                          uint16_t vbucket,
1741                                                          uint32_t buffer_bytes) {
1742         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1743         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1744         if (conn) {
1745             errCode = conn->bufferAcknowledgement(opaque, vbucket,
1746                                                   buffer_bytes);
1747         }
1748         releaseHandle(handle);
1749         return errCode;
1750     }
1751
1752     static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1753                                            const void* cookie,
1754                                            uint32_t opaque,
1755                                            const void *key,
1756                                            uint16_t nkey,
1757                                            const void *value,
1758                                            uint32_t nvalue) {
1759         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1760         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1761         if (conn) {
1762             errCode = conn->control(opaque, key, nkey, value, nvalue);
1763         }
1764         releaseHandle(handle);
1765         return errCode;
1766     }
1767
1768     static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1769                                      const void* cookie,
1770                                      protocol_binary_response_header *response)
1771     {
1772         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1773         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1774         if (conn) {
1775             errCode = conn->handleResponse(response);
1776         }
1777         releaseHandle(handle);
1778         return errCode;
1779     }
1780
1781     static void EvpHandleDisconnect(const void *cookie,
1782                                     ENGINE_EVENT_TYPE type,
1783                                     const void *event_data,
1784                                     const void *cb_data)
1785     {
1786         cb_assert(type == ON_DISCONNECT);
1787         cb_assert(event_data == NULL);
1788         void *c = const_cast<void*>(cb_data);
1789         getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1790         releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1791     }
1792
1793
1794     /**
1795      * The only public interface to the eventually persistance engine.
1796      * Allocate a new instance and initialize it
1797      * @param interface the highest interface the server supports (we only
1798      *                  support interface 1)
1799      * @param get_server_api callback function to get the server exported API
1800      *                  functions
1801      * @param handle Where to return the new instance
1802      * @return ENGINE_SUCCESS on success
1803      */
1804     ENGINE_ERROR_CODE create_instance(uint64_t interface,
1805                                       GET_SERVER_API get_server_api,
1806                                       ENGINE_HANDLE **handle)
1807     {
1808         SERVER_HANDLE_V1 *api = get_server_api();
1809         if (interface != 1 || api == NULL) {
1810             return ENGINE_ENOTSUP;
1811         }
1812
1813         hooksApi = api->alloc_hooks;
1814         EventuallyPersistentEngine::loggerApi = api->log;
1815         MemoryTracker::getInstance();
1816         ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1817
1818         AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1819
1820         ObjectRegistry::setStats(inital_tracking);
1821         EventuallyPersistentEngine *engine;
1822         engine = new EventuallyPersistentEngine(get_server_api);
1823         ObjectRegistry::setStats(NULL);
1824
1825         if (engine == NULL) {
1826             return ENGINE_ENOMEM;
1827         }
1828
1829         if (MemoryTracker::trackingMemoryAllocations()) {
1830             engine->getEpStats().memoryTrackerEnabled.store(true);
1831             engine->getEpStats().totalMemory.store(inital_tracking->load());
1832         }
1833         delete inital_tracking;
1834
1835         initialize_time_functions(api->core);
1836
1837         *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1838
1839         return ENGINE_SUCCESS;
1840     }
1841
1842     /*
1843         This method is called prior to unloading of the shared-object.
1844         Global clean-up should be performed from this method.
1845      */
1846     void destroy_engine() {
1847         ExecutorPool::shutdown();
1848      }
1849
1850     static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
1851                                const item* itm, item_info *itm_info)
1852     {
1853         const Item *it = reinterpret_cast<const Item*>(itm);
1854         EventuallyPersistentEngine *engine = getHandle(handle);
1855         if (itm_info->nvalue < 1) {
1856             return false;
1857         }
1858         itm_info->cas = it->getCas();
1859
1860         if (engine) {
1861             RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(it->getVBucketId());
1862
1863             if (vb) {
1864                 itm_info->vbucket_uuid = vb->failovers->getLatestUUID();
1865             } else {
1866                 itm_info->vbucket_uuid = 0;
1867             }
1868
1869             releaseHandle(handle);
1870         } else{
1871             itm_info->vbucket_uuid = 0;
1872         }
1873
1874         itm_info->seqno = it->getBySeqno();
1875         itm_info->exptime = it->getExptime();
1876         itm_info->nbytes = it->getNBytes();
1877         itm_info->datatype = it->getDataType();
1878         itm_info->flags = it->getFlags();
1879         itm_info->clsid = 0;
1880         itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1881         itm_info->nvalue = 1;
1882         itm_info->key = it->getKey().c_str();
1883         itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1884         itm_info->value[0].iov_len = it->getNBytes();
1885         return true;
1886     }
1887
1888     static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1889                                item* itm, const item_info *itm_info)
1890     {
1891         Item *it = reinterpret_cast<Item*>(itm);
1892         if (!it) {
1893             return false;
1894         }
1895         it->setDataType(itm_info->datatype);
1896         return true;
1897     }
1898
1899     static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1900                                                  const void* cookie,
1901                                                  engine_get_vb_map_cb callback)
1902     {
1903         EventuallyPersistentEngine *h = getHandle(handle);
1904         LockHolder lh(h->clusterConfig.lock);
1905         uint8_t *config = h->clusterConfig.config;
1906         uint32_t len = h->clusterConfig.len;
1907         releaseHandle(handle);
1908         return callback(cookie, config, len);
1909     }
1910
1911 } // C linkage
1912
1913 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1914     char buffer[2048];
1915
1916     if (EventuallyPersistentEngine::loggerApi != NULL) {
1917         EXTENSION_LOGGER_DESCRIPTOR* logger =
1918             (EXTENSION_LOGGER_DESCRIPTOR*)EventuallyPersistentEngine::loggerApi->get_logger();
1919
1920         if (EventuallyPersistentEngine::loggerApi->get_level() <= severity) {
1921             EventuallyPersistentEngine *engine = ObjectRegistry::onSwitchThread(NULL, true);
1922             va_list va;
1923             va_start(va, fmt);
1924             vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1925             if (engine) {
1926                 logger->log(severity, NULL, "(%s) %s", engine->getName(),
1927                             buffer);
1928             } else {
1929                 logger->log(severity, NULL, "(No Engine) %s", buffer);
1930             }
1931             va_end(va);
1932             ObjectRegistry::onSwitchThread(engine);
1933         }
1934     }
1935 }
1936
1937 ALLOCATOR_HOOKS_API *getHooksApi(void) {
1938     return hooksApi;
1939 }
1940
1941 EventuallyPersistentEngine::EventuallyPersistentEngine(
1942                                     GET_SERVER_API get_server_api) :
1943     clusterConfig(), epstore(NULL), workload(NULL),
1944     workloadPriority(NO_BUCKET_PRIORITY),
1945     tapThrottle(NULL), getServerApiFunc(get_server_api),
1946     dcpConnMap_(NULL), tapConnMap(NULL) ,tapConfig(NULL), checkpointConfig(NULL),
1947     trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1948 {
1949     interface.interface = 1;
1950     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1951     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1952     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1953     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1954     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1955     ENGINE_HANDLE_V1::release = EvpItemRelease;
1956     ENGINE_HANDLE_V1::get = EvpGet;
1957     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1958     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1959     ENGINE_HANDLE_V1::store = EvpStore;
1960     ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1961     ENGINE_HANDLE_V1::flush = EvpFlush;
1962     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1963     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1964     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1965     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1966     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1967     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1968     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1969     ENGINE_HANDLE_V1::get_stats_struct = NULL;
1970     ENGINE_HANDLE_V1::aggregate_stats = NULL;
1971
1972
1973     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1974     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1975     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1976     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1977     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1978     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1979     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1980     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1981     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1982     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1983     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1984     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1985     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1986     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1987     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1988     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1989     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1990
1991     serverApi = getServerApiFunc();
1992     memset(&info, 0, sizeof(info));
1993     info.info.description = "EP engine v" VERSION;
1994     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1995     info.info.features[info.info.num_features++].feature =
1996                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1997     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1998     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1999 }
2000
2001 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
2002 {
2003     EventuallyPersistentEngine *epe =
2004                                     ObjectRegistry::onSwitchThread(NULL, true);
2005     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
2006     ObjectRegistry::onSwitchThread(epe);
2007     return rv;
2008 }
2009
2010 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
2011 {
2012     EventuallyPersistentEngine *epe =
2013                                     ObjectRegistry::onSwitchThread(NULL, true);
2014     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
2015     ObjectRegistry::onSwitchThread(epe);
2016     return rv;
2017 }
2018
2019 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
2020                                                         EVENT_CALLBACK cb,
2021                                                         const void *cb_data) {
2022     EventuallyPersistentEngine *epe =
2023                                     ObjectRegistry::onSwitchThread(NULL, true);
2024     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
2025     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
2026                             type, cb, cb_data);
2027     ObjectRegistry::onSwitchThread(epe);
2028 }
2029
2030 /**
2031  * A configuration value changed listener that responds to ep-engine
2032  * parameter changes by invoking engine-specific methods on
2033  * configuration change events.
2034  */
2035 class EpEngineValueChangeListener : public ValueChangedListener {
2036 public:
2037     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
2038         // EMPTY
2039     }
2040
2041     virtual void sizeValueChanged(const std::string &key, size_t value) {
2042         if (key.compare("getl_max_timeout") == 0) {
2043             engine.setGetlMaxTimeout(value);
2044         } else if (key.compare("getl_default_timeout") == 0) {
2045             engine.setGetlDefaultTimeout(value);
2046         } else if (key.compare("max_item_size") == 0) {
2047             engine.setMaxItemSize(value);
2048         }
2049     }
2050
2051     virtual void booleanValueChanged(const std::string &key, bool value) {
2052         if (key.compare("flushall_enabled") == 0) {
2053             engine.setFlushAll(value);
2054         }
2055     }
2056 private:
2057     EventuallyPersistentEngine &engine;
2058 };
2059
2060
2061
2062 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
2063     resetStats();
2064     if (config != NULL) {
2065         if (!configuration.parseConfiguration(config, serverApi)) {
2066             return ENGINE_FAILED;
2067         }
2068     }
2069
2070     name = configuration.getCouchBucket();
2071     maxFailoverEntries = configuration.getMaxFailoverEntries();
2072
2073     // Start updating the variables from the config!
2074     HashTable::setDefaultNumBuckets(configuration.getHtSize());
2075     HashTable::setDefaultNumLocks(configuration.getHtLocks());
2076     StoredValue::setMutationMemoryThreshold(
2077                                       configuration.getMutationMemThreshold());
2078
2079     if (configuration.getMaxSize() == 0) {
2080         configuration.setMaxSize(std::numeric_limits<size_t>::max());
2081     }
2082
2083     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
2084         configuration.setMemLowWat(percentOf(
2085                                             configuration.getMaxSize(), 0.75));
2086     }
2087
2088     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
2089         configuration.setMemHighWat(percentOf(
2090                                             configuration.getMaxSize(), 0.85));
2091     }
2092
2093     maxItemSize = configuration.getMaxItemSize();
2094     configuration.addValueChangedListener("max_item_size",
2095                                        new EpEngineValueChangeListener(*this));
2096
2097     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
2098     configuration.addValueChangedListener("getl_default_timeout",
2099                                        new EpEngineValueChangeListener(*this));
2100     getlMaxTimeout = configuration.getGetlMaxTimeout();
2101     configuration.addValueChangedListener("getl_max_timeout",
2102                                        new EpEngineValueChangeListener(*this));
2103
2104     flushAllEnabled = configuration.isFlushallEnabled();
2105     configuration.addValueChangedListener("flushall_enabled",
2106                                        new EpEngineValueChangeListener(*this));
2107
2108     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2109                                   configuration.getMaxNumShards());
2110     if ((unsigned int)workload->getNumShards() >
2111                                               configuration.getMaxVbuckets()) {
2112         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2113             "equal or less than max number of vbuckets");
2114         return ENGINE_FAILED;
2115     }
2116
2117     dcpConnMap_ = new DcpConnMap(*this);
2118     tapConnMap = new TapConnMap(*this);
2119     tapConfig = new TapConfig(*this);
2120     tapThrottle = new TapThrottle(configuration, stats);
2121     TapConfig::addConfigChangeListener(*this);
2122
2123     checkpointConfig = new CheckpointConfig(*this);
2124     CheckpointConfig::addConfigChangeListener(*this);
2125
2126     epstore = new EventuallyPersistentStore(*this);
2127     if (epstore == NULL) {
2128         return ENGINE_ENOMEM;
2129     }
2130
2131     initializeEngineCallbacks();
2132
2133     // Complete the initialization of the ep-store
2134     if (!epstore->initialize()) {
2135         return ENGINE_FAILED;
2136     }
2137
2138     if(configuration.isDataTrafficEnabled()) {
2139         enableTraffic(true);
2140     }
2141
2142     tapConnMap->initialize(TAP_CONN_NOTIFIER);
2143     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2144
2145     // record engine initialization time
2146     startupTime.store(ep_real_time());
2147
2148     LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
2149
2150     return ENGINE_SUCCESS;
2151 }
2152
2153 void EventuallyPersistentEngine::destroy(bool force) {
2154     stats.forceShutdown = force;
2155     stats.isShutdown = true;
2156
2157     if (epstore) {
2158         epstore->snapshotStats();
2159     }
2160     if (tapConnMap) {
2161         tapConnMap->shutdownAllConnections();
2162     }
2163     if (dcpConnMap_) {
2164         dcpConnMap_->shutdownAllConnections();
2165     }
2166 }
2167
2168 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie,
2169                                                     time_t when){
2170     if (!flushAllEnabled) {
2171         return ENGINE_ENOTSUP;
2172     }
2173
2174     if (!isDegradedMode()) {
2175         return ENGINE_TMPFAIL;
2176     }
2177
2178     /*
2179      * Supporting only a SYNC operation for bucket flush
2180      */
2181
2182     void* es = getEngineSpecific(cookie);
2183     if (es == NULL) {
2184
2185         // Check if diskFlushAll was false and set it to true
2186         // if yes, if the atomic variable weren't false, then
2187         // we will assume that a flushAll has been scheduled
2188         // already and return TMPFAIL.
2189         if (epstore->scheduleFlushAllTask(cookie, when)) {
2190             storeEngineSpecific(cookie, this);
2191             return ENGINE_EWOULDBLOCK;
2192         } else {
2193             LOG(EXTENSION_LOG_INFO, "Tried to trigger a bucket flush, but"
2194                     "there seems to be a task running already!");
2195             return ENGINE_TMPFAIL;
2196         }
2197
2198     } else {
2199         storeEngineSpecific(cookie, NULL);
2200         LOG(EXTENSION_LOG_WARNING, "Completed bucket flush operation");
2201         return ENGINE_SUCCESS;
2202     }
2203 }
2204
2205 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2206                                                     item* itm,
2207                                                     uint64_t *cas,
2208                                                     ENGINE_STORE_OPERATION
2209                                                                      operation,
2210                                                     uint16_t vbucket) {
2211     BlockTimer timer(&stats.storeCmdHisto);
2212     ENGINE_ERROR_CODE ret;
2213     Item *it = static_cast<Item*>(itm);
2214     item *i = NULL;
2215
2216     it->setVBucketId(vbucket);
2217
2218     switch (operation) {
2219     case OPERATION_CAS:
2220         if (it->getCas() == 0) {
2221             // Using a cas command with a cas wildcard doesn't make sense
2222             ret = ENGINE_NOT_STORED;
2223             break;
2224         }
2225         // FALLTHROUGH
2226     case OPERATION_SET:
2227         if (isDegradedMode()) {
2228             return ENGINE_TMPFAIL;
2229         }
2230         ret = epstore->set(*it, cookie);
2231         if (ret == ENGINE_SUCCESS) {
2232             *cas = it->getCas();
2233         }
2234
2235         break;
2236
2237     case OPERATION_ADD:
2238         if (isDegradedMode()) {
2239             return ENGINE_TMPFAIL;
2240         }
2241
2242         if (it->getCas() != 0) {
2243             // Adding an item with a cas value doesn't really make sense...
2244             return ENGINE_KEY_EEXISTS;
2245         }
2246
2247         ret = epstore->add(*it, cookie);
2248         if (ret == ENGINE_SUCCESS) {
2249             *cas = it->getCas();
2250         }
2251         break;
2252
2253     case OPERATION_REPLACE:
2254         ret = epstore->replace(*it, cookie);
2255         if (ret == ENGINE_SUCCESS) {
2256             *cas = it->getCas();
2257         }
2258         break;
2259
2260     case OPERATION_APPEND:
2261     case OPERATION_PREPEND:
2262         do {
2263             if ((ret = get(cookie, &i, it->getKey().c_str(),
2264                            it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2265                 Item *old = reinterpret_cast<Item*>(i);
2266
2267                 if (old->getCas() == (uint64_t) -1) {
2268                     // item is locked against updates
2269                     itemRelease(cookie, i);
2270                     return ENGINE_TMPFAIL;
2271                 }
2272
2273                 if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2274                     itemRelease(cookie, i);
2275                     return ENGINE_KEY_EEXISTS;
2276                 }
2277
2278                 if (operation == OPERATION_APPEND) {
2279                     ret = old->append(*it, maxItemSize);
2280                 } else {
2281                     ret = old->prepend(*it, maxItemSize);
2282                 }
2283
2284                 if (ret != ENGINE_SUCCESS) {
2285                     itemRelease(cookie, i);
2286                     if (ret == ENGINE_E2BIG) {
2287                         return ret;
2288                     } else {
2289                         return memoryCondition();
2290                     }
2291                 } else {
2292                     if (old->getDataType() == PROTOCOL_BINARY_DATATYPE_JSON) {
2293                         // Set the datatype of the new document to BINARY (0),
2294                         // as appending/prepending anything to JSON breaks the
2295                         // json data structure.
2296                         old->setDataType(PROTOCOL_BINARY_RAW_BYTES);
2297                     } else if (old->getDataType() ==
2298                                     PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
2299                         // Set the datatype of the new document to
2300                         // COMPRESSED_BINARY, as appending/prepending anything
2301                         // to JSON breaks the json data structure.
2302                         old->setDataType(PROTOCOL_BINARY_DATATYPE_COMPRESSED);
2303                     }
2304                 }
2305
2306                 ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2307
2308                 it->setBySeqno(old->getBySeqno());
2309                 itemRelease(cookie, i);
2310             }
2311         } while (ret == ENGINE_KEY_EEXISTS);
2312
2313         // Map the error code back to what memcapable expects
2314         if (ret == ENGINE_KEY_ENOENT) {
2315             ret = ENGINE_NOT_STORED;
2316         }
2317
2318         break;
2319
2320     default:
2321         ret = ENGINE_ENOTSUP;
2322     }
2323
2324     switch (ret) {
2325     case ENGINE_SUCCESS:
2326         ++stats.numOpsStore;
2327         break;
2328     case ENGINE_ENOMEM:
2329         ret = memoryCondition();
2330         break;
2331     case ENGINE_NOT_STORED:
2332     case ENGINE_NOT_MY_VBUCKET:
2333         if (isDegradedMode()) {
2334             return ENGINE_TMPFAIL;
2335         }
2336         break;
2337     default:
2338         break;
2339     }
2340
2341     return ret;
2342 }
2343
2344 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2345                                                            item **itm,
2346                                                            void **es,
2347                                                            uint16_t *nes,
2348                                                            uint8_t *ttl,
2349                                                            uint16_t *flags,
2350                                                            uint32_t *seqno,
2351                                                            uint16_t *vbucket,
2352                                                            TapProducer
2353                                                                    *connection,
2354                                                            bool &retry) {
2355     *es = NULL;
2356     *nes = 0;
2357     *ttl = (uint8_t)-1;
2358     *seqno = 0;
2359     *flags = 0;
2360     *vbucket = 0;
2361
2362     retry = false;
2363
2364     if (connection->shouldFlush()) {
2365         return TAP_FLUSH;
2366     }
2367
2368     if (connection->isTimeForNoop()) {
2369         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2370             connection->logHeader());
2371         return TAP_NOOP;
2372     }
2373
2374     if (connection->isSuspended() || connection->windowIsFull()) {
2375         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2376             " suspended state or its ack windows is full.\n",
2377             connection->logHeader());
2378         return TAP_PAUSE;
2379     }
2380
2381     uint16_t ret = TAP_PAUSE;
2382     VBucketEvent ev = connection->nextVBucketHighPriority();
2383     if (ev.event != TAP_PAUSE) {
2384         switch (ev.event) {
2385         case TAP_VBUCKET_SET:
2386             LOG(EXTENSION_LOG_WARNING,
2387                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2388                 connection->logHeader(), ev.vbucket,
2389                 VBucket::toString(ev.state));
2390             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2391             break;
2392         case TAP_OPAQUE:
2393             LOG(EXTENSION_LOG_WARNING,
2394                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2395                 connection->logHeader(),
2396                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2397                 ev.vbucket);
2398             connection->opaqueCommandCode = (uint32_t) ev.state;
2399             *vbucket = ev.vbucket;
2400             *es = &connection->opaqueCommandCode;
2401             *nes = sizeof(connection->opaqueCommandCode);
2402             break;
2403         default:
2404             LOG(EXTENSION_LOG_WARNING,
2405                 "%s Unknown VBucketEvent message type %d\n",
2406                 connection->logHeader(), ev.event);
2407             abort();
2408         }
2409         return ev.event;
2410     }
2411
2412     if (connection->waitForOpaqueMsgAck()) {
2413         return TAP_PAUSE;
2414     }
2415
2416     VBucketFilter backFillVBFilter;
2417     if (connection->runBackfill(backFillVBFilter)) {
2418         queueBackfill(backFillVBFilter, connection);
2419     }
2420
2421     uint8_t nru = INITIAL_NRU_VALUE;
2422     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2423     switch (ret) {
2424     case TAP_CHECKPOINT_START:
2425     case TAP_CHECKPOINT_END:
2426     case TAP_MUTATION:
2427     case TAP_DELETION:
2428         *itm = it;
2429         if (ret == TAP_MUTATION) {
2430             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2431                                                        it->getRevSeqno(), nru);
2432             *es = connection->specificData;
2433         } else if (ret == TAP_DELETION) {
2434             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2435                                                        it->getRevSeqno());
2436             *es = connection->specificData;
2437         } else if (ret == TAP_CHECKPOINT_START) {
2438             // Send the current value of the max deleted seqno
2439             RCPtr<VBucket> vb = getVBucket(*vbucket);
2440             if (!vb) {
2441                 retry = true;
2442                 return TAP_NOOP;
2443             }
2444             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2445                                                vb->ht.getMaxDeletedRevSeqno());
2446             *es = connection->specificData;
2447         }
2448         break;
2449     case TAP_NOOP:
2450         retry = true;
2451         break;
2452     default:
2453         break;
2454     }
2455
2456     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2457         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2458         if (vbev.event == TAP_VBUCKET_SET) {
2459             LOG(EXTENSION_LOG_WARNING,
2460                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2461                 connection->logHeader(), vbev.vbucket,
2462                 VBucket::toString(vbev.state));
2463             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2464         }
2465         ret = vbev.event;
2466     }
2467
2468     return ret;
2469 }
2470
2471 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2472                                                   item **itm,
2473                                                   void **es,
2474                                                   uint16_t *nes,
2475                                                   uint8_t *ttl,
2476                                                   uint16_t *flags,
2477                                                   uint32_t *seqno,
2478                                                   uint16_t *vbucket) {
2479     TapProducer *connection = getTapProducer(cookie);
2480     if (!connection) {
2481         LOG(EXTENSION_LOG_WARNING,
2482             "Failed to lookup TAP connection.. Disconnecting\n");
2483         return TAP_DISCONNECT;
2484     }
2485
2486     connection->setPaused(false);
2487
2488     bool retry = false;
2489     uint16_t ret;
2490
2491     connection->setLastWalkTime();
2492     do {
2493         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2494                              seqno, vbucket, connection, retry);
2495     } while (retry);
2496
2497     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2498         connection->lastMsgTime = ep_current_time();
2499         if (ret == TAP_NOOP) {
2500             *seqno = 0;
2501         } else {
2502             ++stats.numTapFetched;
2503             *seqno = connection->getSeqno();
2504             if (connection->requestAck(ret, *vbucket)) {
2505                 *flags = TAP_FLAG_ACK;
2506                 connection->seqnoAckRequested = *seqno;
2507             }
2508
2509             if (ret == TAP_MUTATION) {
2510                 if (connection->haveFlagByteorderSupport()) {
2511                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2512                 }
2513             }
2514         }
2515     } else {
2516         connection->setPaused(true);
2517         connection->setNotifySent(false);
2518     }
2519
2520     return ret;
2521 }
2522
2523 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2524                                                 std::string &client,
2525                                                 uint32_t flags,
2526                                                 const void *userdata,
2527                                                 size_t nuserdata) {
2528     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2529         return false;
2530     }
2531
2532     std::string tapName = "eq_tapq:";
2533     if (client.length() == 0) {
2534         tapName.assign(ConnHandler::getAnonName());
2535     } else {
2536         tapName.append(client);
2537     }
2538
2539     // Decoding the userdata section of the packet and update the filters
2540     const char *ptr = static_cast<const char*>(userdata);
2541     uint64_t backfillAge = 0;
2542     std::vector<uint16_t> vbuckets;
2543     std::map<uint16_t, uint64_t> lastCheckpointIds;
2544
2545     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2546         if (nuserdata < sizeof(backfillAge)) {
2547             LOG(EXTENSION_LOG_WARNING,
2548                 "Backfill age is missing. Reject connection request from %s\n",
2549                 tapName.c_str());
2550             return false;
2551         }
2552         // use memcpy to avoid alignemt issues
2553         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2554         backfillAge = ntohll(backfillAge);
2555         nuserdata -= sizeof(backfillAge);
2556         ptr += sizeof(backfillAge);
2557     }
2558
2559     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2560         uint16_t nvbuckets;
2561         if (nuserdata < sizeof(nvbuckets)) {
2562             LOG(EXTENSION_LOG_WARNING,
2563             "Number of vbuckets is missing. Reject connection request from %s"
2564             "\n", tapName.c_str());
2565             return false;
2566         }
2567         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2568         nuserdata -= sizeof(nvbuckets);
2569         ptr += sizeof(nvbuckets);
2570         nvbuckets = ntohs(nvbuckets);
2571         if (nvbuckets > 0) {
2572             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2573                 LOG(EXTENSION_LOG_WARNING,
2574                 "# of vbuckets not matched. Reject connection request from %s"
2575                 "\n", tapName.c_str());
2576                 return false;
2577             }
2578             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2579                 uint16_t val;
2580                 memcpy(&val, ptr, sizeof(nvbuckets));
2581                 ptr += sizeof(uint16_t);
2582                 vbuckets.push_back(ntohs(val));
2583             }
2584             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2585         }
2586     }
2587
2588     if (flags & TAP_CONNECT_CHECKPOINT) {
2589         uint16_t nCheckpoints = 0;
2590         if (nuserdata >= sizeof(nCheckpoints)) {
2591             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2592             nuserdata -= sizeof(nCheckpoints);
2593             ptr += sizeof(nCheckpoints);
2594             nCheckpoints = ntohs(nCheckpoints);
2595         }
2596         if (nCheckpoints > 0) {
2597             if (nuserdata <
2598                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2599                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2600                     "Reject connection request from %s\n", tapName.c_str());
2601                 return false;
2602             }
2603             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2604                 uint16_t vbid;
2605                 uint64_t checkpointId;
2606                 memcpy(&vbid, ptr, sizeof(vbid));
2607                 ptr += sizeof(uint16_t);
2608                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2609                 ptr += sizeof(uint64_t);
2610                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2611             }
2612             nuserdata -=
2613                         ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2614         }
2615     }
2616
2617     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2618                                  backfillAge,
2619                                  static_cast<int>(
2620                                  configuration.getTapKeepalive()),
2621                                  vbuckets,
2622                                  lastCheckpointIds);
2623
2624     tapConnMap->notifyPausedConnection(tp, true);
2625     return true;
2626 }
2627
2628 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2629                                                         void *engine_specific,
2630                                                         uint16_t nengine,
2631                                                         uint8_t ttl,
2632                                                         uint16_t tap_flags,
2633                                                         uint16_t tap_event,
2634                                                         uint32_t tap_seqno,
2635                                                         const void *key,
2636                                                         size_t nkey,
2637                                                         uint32_t flags,
2638                                                         uint32_t exptime,
2639                                                         uint64_t cas,
2640                                                         uint8_t datatype,
2641                                                         const void *data,
2642                                                         size_t ndata,
2643                                                         uint16_t vbucket)
2644 {
2645     (void) ttl;
2646     void *specific = getEngineSpecific(cookie);
2647     ConnHandler *connection = NULL;
2648     if (specific == NULL) {
2649         if (tap_event == TAP_ACK) {
2650             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2651                 "exist. Force disconnect...\n", (char *) cookie);
2652             // tap producer is no longer connected..
2653             return ENGINE_DISCONNECT;
2654         } else {
2655             connection = tapConnMap->newConsumer(cookie);
2656             if (connection == NULL) {
2657                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2658                     " Force disconnect\n");
2659                 return ENGINE_DISCONNECT;
2660             }
2661             storeEngineSpecific(cookie, connection);
2662         }
2663     } else {
2664         connection = reinterpret_cast<ConnHandler *>(specific);
2665     }
2666
2667     std::string k(static_cast<const char*>(key), nkey);
2668     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2669
2670     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2671         if (!tapThrottle->shouldProcess()) {
2672             ++stats.tapThrottled;
2673             if (connection->supportsAck()) {
2674                 ret = ENGINE_TMPFAIL;
2675             } else {
2676                 ret = ENGINE_DISCONNECT;
2677                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2678                     "ack support. Force disconnect...\n",
2679                     connection->logHeader());
2680             }
2681             return ret;
2682         }
2683     }
2684
2685     switch (tap_event) {
2686     case TAP_ACK:
2687         ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2688         break;
2689     case TAP_FLUSH:
2690         ret = flush(cookie, 0);
2691         LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2692             connection->logHeader());
2693         break;
2694     case TAP_DELETION:
2695         {
2696             uint64_t revSeqno;
2697             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2698                                                 nengine, &revSeqno);
2699
2700             ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2701                                        NULL, 0);
2702         }
2703         break;
2704
2705     case TAP_CHECKPOINT_START:
2706     case TAP_CHECKPOINT_END:
2707         {
2708             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2709             if (tc) {
2710                 if (tap_event == TAP_CHECKPOINT_START &&
2711                     nengine == TapEngineSpecific::sizeRevSeqno) {
2712                     // Set the current value for the max deleted seqno
2713                     RCPtr<VBucket> vb = getVBucket(vbucket);
2714                     if (!vb) {
2715                         return ENGINE_TMPFAIL;
2716                     }
2717                     uint64_t seqnum;
2718                     TapEngineSpecific::readSpecificData(tap_event,
2719                                                         engine_specific,
2720                                                         nengine,
2721                                                         &seqnum);
2722                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2723                 }
2724
2725                 if (data) {
2726                     uint64_t checkpointId;
2727                     memcpy(&checkpointId, data, sizeof(checkpointId));
2728                     checkpointId = ntohll(checkpointId);
2729                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2730                                           checkpointId);
2731                 }
2732                 else {
2733                     ret = ENGINE_DISCONNECT;
2734                     LOG(EXTENSION_LOG_WARNING,
2735                         "%s Checkpoint Id is missing in "
2736                         "CHECKPOINT messages. Force disconnect...\n",
2737                         connection->logHeader());
2738                 }
2739             }
2740             else {
2741                 ret = ENGINE_DISCONNECT;
2742                 LOG(EXTENSION_LOG_WARNING,
2743                     "%s not a consumer! Force disconnect\n",
2744                     connection->logHeader());
2745             }
2746         }
2747
2748         break;
2749
2750     case TAP_MUTATION:
2751         {
2752             uint8_t nru = INITIAL_NRU_VALUE;
2753             uint64_t revSeqno = 0;
2754             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2755                                                 nengine, &revSeqno, &nru);
2756
2757             if (!isDatatypeSupported(cookie)) {
2758                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2759                 const unsigned char *dat = (const unsigned char*)data;
2760                 const int datlen = ndata;
2761                 if (checkUTF8JSON(dat, datlen)) {
2762                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2763                 }
2764             }
2765             ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2766                                        flags, datatype, 0, 0, revSeqno, exptime,
2767                                        nru, NULL, 0);
2768         }
2769
2770         break;
2771
2772     case TAP_OPAQUE:
2773         if (nengine == sizeof(uint32_t)) {
2774             uint32_t cc;
2775             memcpy(&cc, engine_specific, sizeof(cc));
2776             cc = ntohl(cc);
2777
2778             switch (cc) {
2779             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2780                 // @todo: the memcached core will _ALWAYS_ send nack
2781                 //        if it encounter an error. This should be
2782                 // set as the default when we move to .next after 2.0
2783                 // (currently we need to allow the message for
2784                 // backwards compatibility)
2785                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2786                     connection->logHeader());
2787                 break;
2788             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2789                 connection->setSupportCheckpointSync(true);
2790                 LOG(EXTENSION_LOG_INFO,
2791                     "%s Enable checkpoint synchronization\n",
2792                     connection->logHeader());
2793                 break;
2794             case TAP_OPAQUE_OPEN_CHECKPOINT:
2795                 /**
2796                  * This event is only received by the TAP client that wants to
2797                  * get mutations from closed checkpoints only. At this time,
2798                  * only incremental backup client receives this event so that
2799                  * it can close the connection and reconnect later.
2800                  */
2801                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2802                     connection->logHeader());
2803                 break;
2804             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2805                 {
2806                     LOG(EXTENSION_LOG_INFO,
2807                         "%s Backfill started for vbucket %d.\n",
2808                         connection->logHeader(), vbucket);
2809                     BlockTimer timer(&stats.tapVbucketResetHisto);
2810                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2811                                                   ENGINE_DISCONNECT;
2812                     if (ret == ENGINE_DISCONNECT) {
2813                         LOG(EXTENSION_LOG_WARNING,
2814                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2815                             connection->logHeader(), vbucket);
2816                     } else {
2817                         LOG(EXTENSION_LOG_WARNING,
2818                          "%s Reset vbucket %d was completed succecssfully.\n",
2819                             connection->logHeader(), vbucket);
2820                     }
2821
2822                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2823                     if (tc) {
2824                         tc->setBackfillPhase(true, vbucket);
2825                     } else {
2826                         ret = ENGINE_DISCONNECT;
2827                         LOG(EXTENSION_LOG_WARNING,
2828                             "TAP consumer doesn't exists. Force disconnect\n");
2829                     }
2830                 }
2831                 break;
2832             case TAP_OPAQUE_CLOSE_BACKFILL:
2833                 {
2834                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2835                         connection->logHeader());
2836                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2837                     if (tc) {
2838                         tc->setBackfillPhase(false, vbucket);
2839                     } else {
2840                         ret = ENGINE_DISCONNECT;
2841                         LOG(EXTENSION_LOG_WARNING,
2842                             "%s not a consumer! Force disconnect\n",
2843                             connection->logHeader());
2844                     }
2845                 }
2846                 break;
2847             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2848                 /**
2849                  * This event is sent by the eVBucketMigrator to notify that
2850                  * the source node closes the tap replication stream and
2851                  * switches to TAKEOVER_VBUCKETS phase.
2852                  * This is just an informative message and doesn't require any
2853                  * action.
2854                  */
2855                 LOG(EXTENSION_LOG_INFO,
2856                 "%s Received close tap stream. Switching to takeover phase.\n",
2857                     connection->logHeader());
2858                 break;
2859             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2860                 /**
2861                  * This opaque message is just for notifying that the source
2862                  * node receives change_vbucket_filter request and processes
2863                  * it successfully.
2864                  */
2865                 LOG(EXTENSION_LOG_INFO,
2866                 "%s Notified that the source node changed a vbucket filter.\n",
2867                     connection->logHeader());
2868                 break;
2869             default:
2870                 LOG(EXTENSION_LOG_WARNING,
2871                     "%s Received an unknown opaque command\n",
2872                     connection->logHeader());
2873             }
2874         } else {
2875             LOG(EXTENSION_LOG_WARNING,
2876                 "%s Received tap opaque with unknown size %d\n",
2877                 connection->logHeader(), nengine);
2878         }
2879         break;
2880
2881     case TAP_VBUCKET_SET:
2882         {
2883             BlockTimer timer(&stats.tapVbucketSetHisto);
2884
2885             if (nengine != sizeof(vbucket_state_t)) {
2886                 // illegal datasize
2887                 LOG(EXTENSION_LOG_WARNING,
2888                     "%s Received TAP_VBUCKET_SET with illegal size."
2889                     " Force disconnect\n", connection->logHeader());
2890                 ret = ENGINE_DISCONNECT;
2891                 break;
2892             }
2893
2894             vbucket_state_t state;
2895             memcpy(&state, engine_specific, nengine);
2896             state = (vbucket_state_t)ntohl(state);
2897
2898             ret = connection->setVBucketState(0, vbucket, state);
2899         }
2900         break;
2901
2902     default:
2903         // Unknown command
2904         LOG(EXTENSION_LOG_WARNING,
2905             "%s Recieved bad opcode, ignoring message\n",
2906             connection->logHeader());
2907     }
2908
2909     connection->processedEvent(tap_event, ret);
2910     return ret;
2911 }
2912
2913 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2914                                                       TapConsumer *consumer,
2915                                                       uint8_t event,
2916                                                       uint16_t vbucket,
2917                                                       uint64_t checkpointId) {
2918     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2919
2920     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2921         getEpStore()->wakeUpFlusher();
2922         ret = ENGINE_SUCCESS;
2923     }
2924     else {
2925         ret = ENGINE_DISCONNECT;
2926         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2927             "checkpoint %llu. Force disconnect\n",
2928             consumer->logHeader(), checkpointId);
2929     }
2930
2931     return ret;
2932 }
2933
2934 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2935     TapProducer *rv =
2936         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2937     if (!(rv && rv->isConnected())) {
2938         LOG(EXTENSION_LOG_WARNING,
2939             "Walking a non-existent tap queue, disconnecting\n");
2940         return NULL;
2941     }
2942
2943     if (rv->doDisconnect()) {
2944         LOG(EXTENSION_LOG_WARNING,
2945             "%s Disconnecting pending connection\n", rv->logHeader());
2946         return NULL;
2947     }
2948     return rv;
2949 }
2950
2951 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2952     // Register the callback
2953     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2954 }
2955
2956 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2957                                                             uint32_t seqno,
2958                                                             uint16_t status,
2959                                                             const std::string
2960                                                             &msg)
2961 {
2962     TapProducer *connection = getTapProducer(cookie);
2963     if (!connection) {
2964         LOG(EXTENSION_LOG_WARNING,
2965             "Unable to process tap ack. No producer found\n");
2966         return ENGINE_DISCONNECT;
2967     }
2968
2969     return connection->processAck(seqno, status, msg);
2970 }
2971
2972 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2973                                                              &backfillVBFilter,
2974                                                Producer *tc)
2975 {
2976     ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2977                                            backfillVBFilter);
2978     ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2979 }
2980
2981 bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2982     ++numVbucket;
2983     item_eviction_policy_t policy = engine.getEpStore()->
2984                                                        getItemEvictionPolicy();
2985     numItems += vb->getNumItems(policy);
2986     numTempItems += vb->getNumTempItems();
2987     nonResident += vb->getNumNonResidentItems(policy);
2988
2989     if (vb->getHighPriorityChkSize() > 0) {
2990         chkPersistRemaining++;
2991     }
2992
2993     fileSpaceUsed += vb->fileSpaceUsed;
2994     fileSize += vb->fileSize;
2995
2996     if (desired_state != vbucket_state_dead) {
2997         htMemory += vb->ht.memorySize();
2998         htItemMemory += vb->ht.getItemMemory();
2999         htCacheSize += vb->ht.cacheSize;
3000         numEjects += vb->ht.getNumEjects();
3001         numExpiredItems += vb->numExpiredItems;
3002         metaDataMemory += vb->ht.metaDataMemory;
3003         metaDataDisk += vb->metaDataDisk;
3004         opsCreate += vb->opsCreate;
3005         opsUpdate += vb->opsUpdate;
3006         opsDelete += vb->opsDelete;
3007         opsReject += vb->opsReject;
3008
3009         queueSize += vb->dirtyQueueSize;
3010         queueMemory += vb->dirtyQueueMem;
3011         queueFill += vb->dirtyQueueFill;
3012         queueDrain += vb->dirtyQueueDrain;
3013         queueAge += vb->getQueueAge();
3014         pendingWrites += vb->dirtyQueuePendingWrites;
3015     }
3016
3017     return false;
3018 }
3019
3020 /**
3021  * A container class holding VBucketCountVisitors to aggregate stats for
3022  * different vbucket states.
3023  */
3024 class VBucketCountAggregator : public VBucketVisitor  {
3025 public:
3026     bool visitBucket(RCPtr<VBucket> &vb)  {
3027         std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3028         it = visitorMap.find(vb->getState());
3029         if ( it != visitorMap.end() ) {
3030             it->second->visitBucket(vb);
3031         }
3032
3033         return false;
3034     }
3035
3036     void addVisitor(VBucketCountVisitor* visitor)  {
3037         visitorMap[visitor->getVBucketState()] = visitor;
3038     }
3039 private:
3040     std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
3041 };
3042
3043 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3044                                                            ADD_STAT add_stat) {
3045     VBucketCountAggregator aggregator;
3046
3047     VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
3048     aggregator.addVisitor(&activeCountVisitor);
3049
3050     VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
3051     aggregator.addVisitor(&replicaCountVisitor);
3052
3053     VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
3054     aggregator.addVisitor(&pendingCountVisitor);
3055
3056     VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
3057     aggregator.addVisitor(&deadCountVisitor);
3058
3059     epstore->visit(aggregator);
3060
3061     epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
3062                                       replicaCountVisitor.getMemResidentPer());
3063     tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
3064                                      replicaCountVisitor.getNumItems() +
3065                                      pendingCountVisitor.getNumItems());
3066
3067     configuration.addStats(add_stat, cookie);
3068
3069     EPStats &epstats = getEpStats();
3070     add_casted_stat("ep_version", VERSION, add_stat, cookie);
3071     add_casted_stat("ep_storage_age",
3072                     epstats.dirtyAge, add_stat, cookie);
3073     add_casted_stat("ep_storage_age_highwat",
3074                     epstats.dirtyAgeHighWat, add_stat, cookie);
3075     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3076                     add_stat, cookie);
3077
3078     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3079         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3080     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3081         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3082     }
3083
3084     add_casted_stat("ep_total_enqueued",
3085                     epstats.totalEnqueued, add_stat, cookie);
3086     add_casted_stat("ep_total_persisted",
3087                     epstats.totalPersisted, add_stat, cookie);
3088     add_casted_stat("ep_item_flush_failed",
3089                     epstats.flushFailed, add_stat, cookie);
3090     add_casted_stat("ep_item_commit_failed",
3091                     epstats.commitFailed, add_stat, cookie);
3092     add_casted_stat("ep_item_begin_failed",
3093                     epstats.beginFailed, add_stat, cookie);
3094     add_casted_stat("ep_expired_access", epstats.expired_access,
3095                     add_stat, cookie);
3096     add_casted_stat("ep_expired_pager", epstats.expired_pager,
3097                     add_stat, cookie);
3098     add_casted_stat("ep_item_flush_expired",
3099                     epstats.flushExpired, add_stat, cookie);
3100     add_casted_stat("ep_queue_size",
3101                     epstats.diskQueueSize, add_stat, cookie);
3102     add_casted_stat("ep_flusher_todo",
3103                     epstats.flusher_todo, add_stat, cookie);
3104     add_casted_stat("ep_uncommitted_items",
3105                     epstats.flusher_todo, add_stat, cookie);
3106     add_casted_stat("ep_diskqueue_items",
3107                     epstats.diskQueueSize, add_stat, cookie);
3108     add_casted_stat("ep_flusher_state",
3109                     epstore->getFlusher(0)->stateName(),
3110                     add_stat, cookie);
3111     add_casted_stat("ep_commit_num", epstats.flusherCommits,
3112                     add_stat, cookie);
3113     add_casted_stat("ep_commit_time",
3114                     epstats.commit_time, add_stat, cookie);
3115     add_casted_stat("ep_commit_time_total",
3116                     epstats.cumulativeCommitTime, add_stat, cookie);
3117     add_casted_stat("ep_vbucket_del",
3118                     epstats.vbucketDeletions, add_stat, cookie);
3119     add_casted_stat("ep_vbucket_del_fail",
3120                     epstats.vbucketDeletionFail, add_stat, cookie);
3121     add_casted_stat("ep_flush_duration_total",
3122                     epstats.cumulativeFlushTime, add_stat, cookie);
3123     add_casted_stat("ep_flush_all",
3124                     epstore->isFlushAllScheduled() ? "true" : "false",
3125                     add_stat, cookie);
3126     add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
3127                     cookie);
3128     add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
3129                     add_stat, cookie);
3130     add_casted_stat("curr_items_tot",
3131                     activeCountVisitor.getNumItems() +
3132                     replicaCountVisitor.getNumItems() +
3133                     pendingCountVisitor.getNumItems(),
3134                     add_stat, cookie);
3135     add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
3136                     add_stat, cookie);
3137     add_casted_stat("vb_active_curr_items", activeCountVisitor.getNumItems(),
3138                     add_stat, cookie);
3139     add_casted_stat("vb_active_num_non_resident",
3140                     activeCountVisitor.getNonResident(),
3141                     add_stat, cookie);
3142     add_casted_stat("vb_active_perc_mem_resident",
3143                     activeCountVisitor.getMemResidentPer(),
3144                     add_stat, cookie);
3145     add_casted_stat("vb_active_eject", activeCountVisitor.getEjects(),
3146                     add_stat, cookie);
3147     add_casted_stat("vb_active_expired", activeCountVisitor.getExpired(),
3148                     add_stat, cookie);
3149     add_casted_stat("vb_active_meta_data_memory",
3150                     activeCountVisitor.getMetaDataMemory(),
3151                     add_stat, cookie);
3152     add_casted_stat("vb_active_meta_data_disk",
3153                     activeCountVisitor.getMetaDataDisk(),
3154                     add_stat, cookie);
3155     add_casted_stat("vb_active_ht_memory",
3156                     activeCountVisitor.getHashtableMemory(),
3157                     add_stat, cookie);
3158     add_casted_stat("vb_active_itm_memory", activeCountVisitor.getItemMemory(),
3159                     add_stat, cookie);
3160     add_casted_stat("vb_active_ops_create", activeCountVisitor.getOpsCreate(),
3161                     add_stat, cookie);
3162     add_casted_stat("vb_active_ops_update", activeCountVisitor.getOpsUpdate(),
3163                     add_stat, cookie);
3164     add_casted_stat("vb_active_ops_delete", activeCountVisitor.getOpsDelete(),
3165                     add_stat, cookie);
3166     add_casted_stat("vb_active_ops_reject", activeCountVisitor.getOpsReject(),
3167                     add_stat, cookie);
3168     add_casted_stat("vb_active_queue_size", activeCountVisitor.getQueueSize(),
3169                     add_stat, cookie);
3170     add_casted_stat("vb_active_queue_memory",
3171                     activeCountVisitor.getQueueMemory(), add_stat, cookie);
3172     add_casted_stat("vb_active_queue_age", activeCountVisitor.getAge(),
3173                     add_stat, cookie);
3174     add_casted_stat("vb_active_queue_pending",
3175                     activeCountVisitor.getPendingWrites(), add_stat, cookie);
3176     add_casted_stat("vb_active_queue_fill", activeCountVisitor.getQueueFill(),
3177                     add_stat, cookie);
3178     add_casted_stat("vb_active_queue_drain",
3179                     activeCountVisitor.getQueueDrain(), add_stat, cookie);
3180
3181     add_casted_stat("vb_replica_num", replicaCountVisitor.getVBucketNumber(),
3182                     add_stat, cookie);
3183     add_casted_stat("vb_replica_curr_items", replicaCountVisitor.getNumItems(),
3184                     add_stat, cookie);
3185     add_casted_stat("vb_replica_num_non_resident",
3186                     replicaCountVisitor.getNonResident(), add_stat, cookie);
3187     add_casted_stat("vb_replica_perc_mem_resident",
3188                     replicaCountVisitor.getMemResidentPer(),
3189                     add_stat, cookie);
3190     add_casted_stat("vb_replica_eject", replicaCountVisitor.getEjects(),
3191                     add_stat, cookie);
3192     add_casted_stat("vb_replica_expired", replicaCountVisitor.getExpired(),
3193                     add_stat, cookie);
3194     add_casted_stat("vb_replica_meta_data_memory",
3195                     replicaCountVisitor.getMetaDataMemory(), add_stat, cookie);
3196     add_casted_stat("vb_replica_meta_data_disk",
3197                     replicaCountVisitor.getMetaDataDisk(), add_stat, cookie);
3198     add_casted_stat("vb_replica_ht_memory",
3199                     replicaCountVisitor.getHashtableMemory(),
3200                     add_stat, cookie);
3201     add_casted_stat("vb_replica_itm_memory",
3202                     replicaCountVisitor.getItemMemory(), add_stat, cookie);
3203     add_casted_stat("vb_replica_ops_create",
3204                     replicaCountVisitor.getOpsCreate(), add_stat, cookie);
3205     add_casted_stat("vb_replica_ops_update",
3206                     replicaCountVisitor.getOpsUpdate(), add_stat, cookie);
3207     add_casted_stat("vb_replica_ops_delete",
3208                     replicaCountVisitor.getOpsDelete(), add_stat, cookie);
3209     add_casted_stat("vb_replica_ops_reject",
3210                     replicaCountVisitor.getOpsReject(), add_stat, cookie);
3211     add_casted_stat("vb_replica_queue_size",
3212                     replicaCountVisitor.getQueueSize(), add_stat, cookie);
3213     add_casted_stat("vb_replica_queue_memory",
3214                     replicaCountVisitor.getQueueMemory(),
3215                     add_stat, cookie);
3216     add_casted_stat("vb_replica_queue_age",
3217                     replicaCountVisitor.getAge(), add_stat, cookie);
3218     add_casted_stat("vb_replica_queue_pending",
3219                     replicaCountVisitor.getPendingWrites(),
3220                     add_stat, cookie);
3221     add_casted_stat("vb_replica_queue_fill",
3222                     replicaCountVisitor.getQueueFill(), add_stat, cookie);
3223     add_casted_stat("vb_replica_queue_drain",
3224                     replicaCountVisitor.getQueueDrain(), add_stat, cookie);
3225
3226     add_casted_stat("vb_pending_num",
3227                     pendingCountVisitor.getVBucketNumber(), add_stat, cookie);
3228     add_casted_stat("vb_pending_curr_items",
3229                     pendingCountVisitor.getNumItems(), add_stat, cookie);
3230     add_casted_stat("vb_pending_num_non_resident",
3231                     pendingCountVisitor.getNonResident(),
3232                     add_stat, cookie);
3233     add_casted_stat("vb_pending_perc_mem_resident",
3234                     pendingCountVisitor.getMemResidentPer(), add_stat, cookie);
3235     add_casted_stat("vb_pending_eject", pendingCountVisitor.getEjects(),
3236                     add_stat, cookie);
3237     add_casted_stat("vb_pending_expired", pendingCountVisitor.getExpired(),
3238                     add_stat, cookie);
3239     add_casted_stat("vb_pending_meta_data_memory",
3240                     pendingCountVisitor.getMetaDataMemory(),
3241                     add_stat, cookie);
3242     add_casted_stat("vb_pending_meta_data_disk",
3243                     pendingCountVisitor.getMetaDataDisk(),
3244                     add_stat, cookie);
3245     add_casted_stat("vb_pending_ht_memory",
3246                     pendingCountVisitor.getHashtableMemory(),
3247                     add_stat, cookie);
3248     add_casted_stat("vb_pending_itm_memory",
3249                     pendingCountVisitor.getItemMemory(), add_stat, cookie);
3250     add_casted_stat("vb_pending_ops_create",
3251                     pendingCountVisitor.getOpsCreate(), add_stat, cookie);
3252     add_casted_stat("vb_pending_ops_update",
3253                     pendingCountVisitor.getOpsUpdate(), add_stat, cookie);
3254     add_casted_stat("vb_pending_ops_delete",
3255                     pendingCountVisitor.getOpsDelete(), add_stat, cookie);
3256     add_casted_stat("vb_pending_ops_reject",
3257                     pendingCountVisitor.getOpsReject(), add_stat, cookie);
3258     add_casted_stat("vb_pending_queue_size",
3259                     pendingCountVisitor.getQueueSize(), add_stat, cookie);
3260     add_casted_stat("vb_pending_queue_memory",
3261                     pendingCountVisitor.getQueueMemory(),
3262                     add_stat, cookie);
3263     add_casted_stat("vb_pending_queue_age", pendingCountVisitor.getAge(),
3264                     add_stat, cookie);
3265     add_casted_stat("vb_pending_queue_pending",
3266                     pendingCountVisitor.getPendingWrites(),
3267                     add_stat, cookie);
3268     add_casted_stat("vb_pending_queue_fill",
3269                     pendingCountVisitor.getQueueFill(), add_stat, cookie);
3270     add_casted_stat("vb_pending_queue_drain",
3271                     pendingCountVisitor.getQueueDrain(), add_stat, cookie);
3272
3273     add_casted_stat("vb_dead_num", deadCountVisitor.getVBucketNumber(),
3274                     add_stat, cookie);
3275
3276     add_casted_stat("ep_db_data_size",
3277                     activeCountVisitor.getFileSpaceUsed() +
3278                     replicaCountVisitor.getFileSpaceUsed() +
3279                     pendingCountVisitor.getFileSpaceUsed() +
3280                     deadCountVisitor.getFileSpaceUsed(),
3281                     add_stat, cookie);
3282     add_casted_stat("ep_db_file_size",
3283                     activeCountVisitor.getFileSize() +
3284                     replicaCountVisitor.getFileSize() +
3285                     pendingCountVisitor.getFileSize() +
3286                     deadCountVisitor.getFileSize(),
3287                     add_stat, cookie);
3288
3289     add_casted_stat("ep_vb_snapshot_total",
3290                     epstats.snapshotVbucketHisto.total(), add_stat, cookie);
3291
3292     add_casted_stat("ep_persist_vbstate_total",
3293                     epstats.persistVBStateHisto.total(), add_stat, cookie);
3294
3295     add_casted_stat("ep_vb_total",
3296                     activeCountVisitor.getVBucketNumber() +
3297                     replicaCountVisitor.getVBucketNumber() +
3298                     pendingCountVisitor.getVBucketNumber() +
3299                     deadCountVisitor.getVBucketNumber(),
3300                     add_stat, cookie);
3301
3302     add_casted_stat("ep_total_new_items",
3303                     activeCountVisitor.getOpsCreate() +
3304                     replicaCountVisitor.getOpsCreate() +
3305                     pendingCountVisitor.getOpsCreate(),
3306                     add_stat, cookie);
3307     add_casted_stat("ep_total_del_items",
3308                     activeCountVisitor.getOpsDelete() +
3309                     replicaCountVisitor.getOpsDelete() +
3310                     pendingCountVisitor.getOpsDelete(),
3311                     add_stat, cookie);
3312     add_casted_stat("ep_diskqueue_memory",
3313                     activeCountVisitor.getQueueMemory() +
3314                     replicaCountVisitor.getQueueMemory() +
3315                     pendingCountVisitor.getQueueMemory(),
3316                     add_stat, cookie);
3317     add_casted_stat("ep_diskqueue_fill",
3318                     activeCountVisitor.getQueueFill() +
3319                     replicaCountVisitor.getQueueFill() +
3320                     pendingCountVisitor.getQueueFill(),
3321                     add_stat, cookie);
3322     add_casted_stat("ep_diskqueue_drain",
3323                     activeCountVisitor.getQueueDrain() +
3324                     replicaCountVisitor.getQueueDrain() +
3325                     pendingCountVisitor.getQueueDrain(),
3326                     add_stat, cookie);
3327     add_casted_stat("ep_diskqueue_pending",
3328                     activeCountVisitor.getPendingWrites() +
3329                     replicaCountVisitor.getPendingWrites() +
3330                     pendingCountVisitor.getPendingWrites(),
3331                     add_stat, cookie);
3332     add_casted_stat("ep_meta_data_memory",
3333                     activeCountVisitor.getMetaDataMemory() +
3334                     replicaCountVisitor.getMetaDataMemory() +
3335                     pendingCountVisitor.getMetaDataMemory(),
3336                     add_stat, cookie);
3337     add_casted_stat("ep_meta_data_disk",
3338                     activeCountVisitor.getMetaDataDisk() +
3339                     replicaCountVisitor.getMetaDataDisk() +
3340                     pendingCountVisitor.getMetaDataDisk(),
3341                     add_stat, cookie);
3342
3343     size_t memUsed =  stats.getTotalMemoryUsed();
3344     add_casted_stat("mem_used", memUsed, add_stat, cookie);
3345     add_casted_stat("bytes", memUsed, add_stat, cookie);
3346     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3347     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3348 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3349     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3350 #else
3351     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3352 #endif
3353     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3354     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3355                     add_stat, cookie);
3356 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3357     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3358 #else
3359     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3360 #endif
3361     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3362     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3363     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3364     add_casted_stat("ep_total_cache_size",
3365                     activeCountVisitor.getCacheSize() +
3366                     replicaCountVisitor.getCacheSize() +
3367                     pendingCountVisitor.getCacheSize(),
3368                     add_stat, cookie);
3369     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3370     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3371                     add_stat, cookie);
3372     add_casted_stat("ep_mem_tracker_enabled",
3373                     stats.memoryTrackerEnabled ? "true" : "false",
3374                     add_stat, cookie);
3375     add_casted_stat("ep_bg_fetched", epstats.bg_fetched,
3376                     add_stat, cookie);
3377     add_casted_stat("ep_bg_meta_fetched", epstats.bg_meta_fetched,
3378                     add_stat, cookie);
3379     add_casted_stat("ep_bg_remaining_jobs", epstats.numRemainingBgJobs,
3380                     add_stat, cookie);
3381     add_casted_stat("ep_max_bg_remaining_jobs", epstats.maxRemainingBgJobs,
3382                     add_stat, cookie);
3383     add_casted_stat("ep_tap_bg_fetched", stats.numTapBGFetched,
3384                     add_stat, cookie);
3385     add_casted_stat("ep_tap_bg_fetch_requeued", stats.numTapBGFetchRequeued,
3386                     add_stat, cookie);
3387     add_casted_stat("ep_num_pager_runs", epstats.pagerRuns,
3388                     add_stat, cookie);
3389     add_casted_stat("ep_num_expiry_pager_runs", epstats.expiryPagerRuns,
3390                     add_stat, cookie);
3391     add_casted_stat("ep_items_rm_from_checkpoints",
3392                     epstats.itemsRemovedFromCheckpoints,
3393                     add_stat, cookie);
3394     add_casted_stat("ep_num_value_ejects", epstats.numValueEjects,
3395                     add_stat, cookie);
3396     add_casted_stat("ep_num_eject_failures", epstats.numFailedEjects,
3397                     add_stat, cookie);
3398     add_casted_stat("ep_num_not_my_vbuckets", epstats.numNotMyVBuckets,
3399                     add_stat, cookie);
3400
3401     add_casted_stat("ep_pending_ops", epstats.pendingOps, add_stat, cookie);
3402     add_casted_stat("ep_pending_ops_total", epstats.pendingOpsTotal,
3403                     add_stat, cookie);
3404     add_casted_stat("ep_pending_ops_max", epstats.pendingOpsMax,
3405                     add_stat, cookie);
3406     add_casted_stat("ep_pending_ops_max_duration",
3407                     epstats.pendingOpsMaxDuration,
3408                     add_stat, cookie);
3409
3410     add_casted_stat("ep_pending_compactions", epstats.pendingCompactions,
3411                     add_stat, cookie);
3412     add_casted_stat("ep_rollback_count", epstats.rollbackCount,
3413                     add_stat, cookie);
3414
3415     size_t vbDeletions = epstats.vbucketDeletions.load();
3416     if (vbDeletions > 0) {
3417         add_casted_stat("ep_vbucket_del_max_walltime",
3418                         epstats.vbucketDelMaxWalltime,
3419                         add_stat, cookie);
3420         add_casted_stat("ep_vbucket_del_avg_walltime",
3421                         epstats.vbucketDelTotWalltime / vbDeletions,
3422                         add_stat, cookie);
3423     }
3424
3425     size_t numBgOps = epstats.bgNumOperations.load();
3426     if (numBgOps > 0) {
3427         add_casted_stat("ep_bg_num_samples", epstats.bgNumOperations,
3428                         add_stat, cookie);
3429         add_casted_stat("ep_bg_min_wait",
3430                         epstats.bgMinWait,
3431                         add_stat, cookie);
3432         add_casted_stat("ep_bg_max_wait",
3433                         epstats.bgMaxWait,
3434                         add_stat, cookie);
3435         add_casted_stat("ep_bg_wait_avg",
3436                         epstats.bgWait / numBgOps,
3437                         add_stat, cookie);
3438         add_casted_stat("ep_bg_min_load",
3439                         epstats.bgMinLoad,
3440                         add_stat, cookie);
3441         add_casted_stat("ep_bg_max_load",
3442                         epstats.bgMaxLoad,
3443                         add_stat, cookie);
3444         add_casted_stat("ep_bg_load_avg",
3445                         epstats.bgLoad / numBgOps,
3446                         add_stat, cookie);
3447         add_casted_stat("ep_bg_wait",
3448                         epstats.bgWait,
3449                         add_stat, cookie);
3450         add_casted_stat("ep_bg_load",
3451                         epstats.bgLoad,
3452                         add_stat, cookie);
3453     }
3454
3455     add_casted_stat("ep_num_non_resident",
3456                     activeCountVisitor.getNonResident() +
3457                     pendingCountVisitor.getNonResident() +
3458                     replicaCountVisitor.getNonResident(),
3459                     add_stat, cookie);
3460
3461     add_casted_stat("ep_degraded_mode", isDegradedMode(), add_stat, cookie);
3462     add_casted_stat("ep_exp_pager_stime", epstore->getExpiryPagerSleeptime(),
3463                     add_stat, cookie);
3464
3465     add_casted_stat("ep_mlog_compactor_runs", epstats.mlogCompactorRuns,
3466                     add_stat, cookie);
3467     add_casted_stat("ep_num_access_scanner_runs", epstats.alogRuns,
3468                     add_stat, cookie);
3469     add_casted_stat("ep_access_scanner_last_runtime", epstats.alogRuntime,
3470                     add_stat, cookie);
3471     add_casted_stat("ep_access_scanner_num_items", epstats.alogNumItems,
3472                     add_stat, cookie);
3473
3474     if (getConfiguration().isAccessScannerEnabled()) {
3475         char timestr[20];
3476         struct tm alogTim;
3477         if (cb_gmtime_r((time_t *)&epstats.alogTime, &alogTim) == -1) {
3478             add_casted_stat("ep_access_scanner_task_time", "UNKNOWN", add_stat,
3479                             cookie);
3480         } else {
3481             strftime(timestr, 20, "%Y-%m-%d %H:%M:%S", &alogTim);
3482             add_casted_stat("ep_access_scanner_task_time", timestr, add_stat,
3483                             cookie);
3484         }
3485     } else {
3486         add_casted_stat("ep_access_scanner_task_time", "NOT_SCHEDULED",
3487                         add_stat, cookie);
3488     }
3489
3490     add_casted_stat("ep_startup_time", startupTime.load(), add_stat, cookie);
3491
3492     if (getConfiguration().isWarmup()) {
3493         Warmup *wp = epstore->getWarmup();
3494         cb_assert(wp);
3495         if (!epstore->isWarmingUp()) {
3496             add_casted_stat("ep_warmup_thread", "complete", add_stat, cookie);
3497         } else {
3498             add_casted_stat("ep_warmup_thread", "running", add_stat, cookie);
3499         }
3500         if (wp->getTime() > 0) {
3501             add_casted_stat("ep_warmup_time", wp->getTime() / 1000,
3502                             add_stat, cookie);
3503         }
3504         add_casted_stat("ep_warmup_oom", epstats.warmOOM, add_stat, cookie);
3505         add_casted_stat("ep_warmup_dups", epstats.warmDups, add_stat, cookie);
3506     }
3507
3508     add_casted_stat("ep_num_ops_get_meta", epstats.numOpsGetMeta,
3509                     add_stat, cookie);
3510     add_casted_stat("ep_num_ops_set_meta", epstats.numOpsSetMeta,
3511                     add_stat, cookie);
3512     add_casted_stat("ep_num_ops_del_meta", epstats.numOpsDelMeta,
3513                     add_stat, cookie);
3514     add_casted_stat("ep_num_ops_set_meta_res_fail",
3515                     epstats.numOpsSetMetaResolutionFailed, add_stat, cookie);
3516     add_casted_stat("ep_num_ops_del_meta_res_fail",
3517                     epstats.numOpsDelMetaResolutionFailed, add_stat, cookie);
3518     add_casted_stat("ep_num_ops_set_ret_meta", epstats.numOpsSetRetMeta,
3519                     add_stat, cookie);
3520     add_casted_stat("ep_num_ops_del_ret_meta", epstats.numOpsDelRetMeta,
3521                     add_stat, cookie);
3522     add_casted_stat("ep_num_ops_get_meta_on_set_meta",
3523                     epstats.numOpsGetMetaOnSetWithMeta, add_stat, cookie);
3524     add_casted_stat("ep_chk_persistence_timeout",
3525                     VBucket::getCheckpointFlushTimeout(),
3526                     add_stat, cookie);
3527     add_casted_stat("ep_chk_persistence_remains",
3528                     activeCountVisitor.getChkPersistRemaining() +
3529                     pendingCountVisitor.getChkPersistRemaining() +
3530                     replicaCountVisitor.getChkPersistRemaining(),
3531                     add_stat, cookie);
3532     add_casted_stat("ep_workload_pattern",
3533                     workload->stringOfWorkLoadPattern(),
3534                     add_stat, cookie);
3535
3536     add_casted_stat("ep_defragmenter_num_visited", epstats.defragNumVisited,
3537                     add_stat, cookie);
3538     add_casted_stat("ep_defragmenter_num_moved", epstats.defragNumMoved,
3539                     add_stat, cookie);
3540
3541     return ENGINE_SUCCESS;
3542 }
3543
3544 ENGINE_ERROR_CODE EventuallyPersistentEngine::doMemoryStats(const void *cookie,
3545                                                            ADD_STAT add_stat) {
3546     add_casted_stat("bytes", stats.getTotalMemoryUsed(), add_stat, cookie);
3547     add_casted_stat("mem_used", stats.getTotalMemoryUsed(), add_stat, cookie);
3548     add_casted_stat("ep_kv_size", stats.currentSize, add_stat, cookie);
3549     add_casted_stat("ep_value_size", stats.totalValueSize, add_stat, cookie);
3550     add_casted_stat("ep_overhead", stats.memOverhead, add_stat, cookie);
3551     add_casted_stat("ep_max_size", stats.getMaxDataSize(), add_stat, cookie);
3552     add_casted_stat("ep_mem_low_wat", stats.mem_low_wat, add_stat, cookie);
3553     add_casted_stat("ep_mem_high_wat", stats.mem_high_wat, add_stat, cookie);
3554     add_casted_stat("ep_oom_errors", stats.oom_errors, add_stat, cookie);
3555     add_casted_stat("ep_tmp_oom_errors", stats.tmp_oom_errors,
3556                     add_stat, cookie);
3557
3558     add_casted_stat("ep_blob_num", stats.numBlob, add_stat, cookie);
3559 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3560     add_casted_stat("ep_blob_overhead", stats.blobOverhead, add_stat, cookie);
3561 #else
3562     add_casted_stat("ep_blob_overhead", "unknown", add_stat, cookie);
3563 #endif
3564     add_casted_stat("ep_storedval_size", stats.totalStoredValSize,
3565                     add_stat, cookie);
3566 #if defined(HAVE_JEMALLOC) || defined(HAVE_TCMALLOC)
3567     add_casted_stat("ep_storedval_overhead", stats.blobOverhead, add_stat, cookie);
3568 #else
3569     add_casted_stat("ep_storedval_overhead", "unknown", add_stat, cookie);
3570 #endif
3571     add_casted_stat("ep_storedval_num", stats.numStoredVal, add_stat, cookie);
3572     add_casted_stat("ep_item_num", stats.numItem, add_stat, cookie);
3573
3574
3575     add_casted_stat("ep_mem_tracker_enabled",
3576                     stats.memoryTrackerEnabled ? "true" : "false",
3577                     add_stat, cookie);
3578
3579     std::map<std::string, size_t> alloc_stats;
3580     MemoryTracker::getInstance()->getAllocatorStats(alloc_stats);
3581     std::map<std::string, size_t>::iterator it = alloc_stats.begin();
3582     for (; it != alloc_stats.end(); ++it) {
3583         add_casted_stat(it->first.c_str(), it->second, add_stat, cookie);
3584     }
3585
3586     return ENGINE_SUCCESS;
3587 }
3588
3589 ENGINE_ERROR_CODE EventuallyPersistentEngine::doVBucketStats(
3590                                                        const void *cookie,
3591                                                        ADD_STAT add_stat,
3592                                                        const char* stat_key,
3593                                                        int nkey,
3594                                                        bool prevStateRequested,
3595                                                        bool details) {
3596     class StatVBucketVisitor : public VBucketVisitor {
3597     public:
3598         StatVBucketVisitor(EventuallyPersistentStore *store,
3599                            const void *c, ADD_STAT a,
3600                            bool isPrevStateRequested, bool detailsRequested) :
3601             eps(store), cookie(c), add_stat(a),
3602             isPrevState(isPrevStateRequested),
3603             isDetailsRequested(detailsRequested) {}
3604
3605         bool visitBucket(RCPtr<VBucket> &vb) {
3606             addVBStats(cookie, add_stat, vb, eps, isPrevState,
3607                        isDetailsRequested);
3608             return false;
3609         }
3610
3611         static void addVBStats(const void *cookie, ADD_STAT add_stat,
3612                                RCPtr<VBucket> &vb,
3613                                EventuallyPersistentStore *store,
3614                                bool isPrevStateRequested,
3615                                bool detailsRequested) {
3616             if (!vb) {
3617                 return;
3618     &nbs