[BP] MB-18452: Single threaded test harness improvements
[ep-engine.git] / src / ep_engine.cc
1
2 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 /*
4  *     Copyright 2010 Couchbase, Inc
5  *
6  *   Licensed under the Apache License, Version 2.0 (the "License");
7  *   you may not use this file except in compliance with the License.
8  *   You may obtain a copy of the License at
9  *
10  *       http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *   Unless required by applicable law or agreed to in writing, software
13  *   distributed under the License is distributed on an "AS IS" BASIS,
14  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *   See the License for the specific language governing permissions and
16  *   limitations under the License.
17  */
18
19 #include "config.h"
20
21 #include <fcntl.h>
22 #include <memcached/engine.h>
23 #include <memcached/protocol_binary.h>
24 #include <memcached/util.h>
25 #include <platform/platform.h>
26 #include <stdarg.h>
27
28 #include <cstdio>
29 #include <cstring>
30 #include <fstream>
31 #include <iostream>
32 #include <limits>
33 #include <string>
34 #include <vector>
35
36 #include "backfill.h"
37 #include "ep_engine.h"
38 #include "failover-table.h"
39 #include "flusher.h"
40 #include "connmap.h"
41 #include "htresizer.h"
42 #include "memory_tracker.h"
43 #include "stats-info.h"
44 #define STATWRITER_NAMESPACE core_engine
45 #include "statwriter.h"
46 #undef STATWRITER_NAMESPACE
47 #include "tapthrottle.h"
48 #include "dcp-consumer.h"
49 #include "dcp-producer.h"
50 #include "warmup.h"
51
52 static ALLOCATOR_HOOKS_API *hooksApi;
53 SERVER_LOG_API* EventuallyPersistentEngine::loggerApi;
54
55
56 static size_t percentOf(size_t val, double percent) {
57     return static_cast<size_t>(static_cast<double>(val) * percent);
58 }
59
60 /**
61  * Helper function to avoid typing in the long cast all over the place
62  * @param handle pointer to the engine
63  * @return the engine as a class
64  */
65 static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
66 {
67     EventuallyPersistentEngine* ret;
68     ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
69     ObjectRegistry::onSwitchThread(ret);
70     return ret;
71 }
72
73 static inline void releaseHandle(ENGINE_HANDLE* handle) {
74     (void) handle;
75     ObjectRegistry::onSwitchThread(NULL);
76 }
77
78
79 /**
80  * Call the response callback and return the appropriate value so that
81  * the core knows what to do..
82  */
83 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
84                                       uint16_t keylen,
85                                       const void *ext, uint8_t extlen,
86                                       const void *body, uint32_t bodylen,
87                                       uint8_t datatype, uint16_t status,
88                                       uint64_t cas, const void *cookie)
89 {
90     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
91     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
92     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
93                  status, cas, cookie)) {
94         rv = ENGINE_SUCCESS;
95     }
96     ObjectRegistry::onSwitchThread(e);
97     return rv;
98 }
99
100 template <typename T>
101 static void validate(T v, T l, T h) {
102     if (v < l || v > h) {
103         throw std::runtime_error("Value out of range.");
104     }
105 }
106
107
108 static void checkNumeric(const char* str) {
109     int i = 0;
110     if (str[0] == '-') {
111         i++;
112     }
113     for (; str[i]; i++) {
114         using namespace std;
115         if (!isdigit(str[i])) {
116             throw std::runtime_error("Value is not numeric");
117         }
118     }
119 }
120
121 // The Engine API specifies C linkage for the functions..
122 extern "C" {
123
124     static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
125     {
126         engine_info* info = getHandle(handle)->getInfo();
127         releaseHandle(handle);
128         return info;
129     }
130
131     static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
132                                            const char* config_str)
133     {
134         ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
135         releaseHandle(handle);
136         return err_code;
137     }
138
139     static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
140     {
141         getHandle(handle)->destroy(force);
142         delete getHandle(handle);
143         releaseHandle(NULL);
144     }
145
146     static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
147                                              const void* cookie,
148                                              item **itm,
149                                              const void* key,
150                                              const size_t nkey,
151                                              const size_t nbytes,
152                                              const int flags,
153                                              const rel_time_t exptime,
154                                              uint8_t datatype)
155     {
156         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
157             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
158                     " (ItemAllocate)");
159             return ENGINE_EINVAL;
160         }
161         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
162                                                                      itm, key,
163                                                                      nkey,
164                                                                      nbytes,
165                                                                      flags,
166                                                                      exptime,
167                                                                      datatype);
168         releaseHandle(handle);
169         return err_code;
170     }
171
172     static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
173                                            const void* cookie,
174                                            const void* key,
175                                            const size_t nkey,
176                                            uint64_t* cas,
177                                            uint16_t vbucket,
178                                            mutation_descr_t *mut_info)
179     {
180         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
181                                                                    nkey, cas,
182                                                                    vbucket, mut_info);
183         releaseHandle(handle);
184         return err_code;
185     }
186
187     static void EvpItemRelease(ENGINE_HANDLE* handle,
188                                const void *cookie,
189                                item* itm)
190     {
191         getHandle(handle)->itemRelease(cookie, itm);
192         releaseHandle(handle);
193     }
194
195     static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
196                                     const void* cookie,
197                                     item** itm,
198                                     const void* key,
199                                     const int nkey,
200                                     uint16_t vbucket)
201     {
202         ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
203                                                             nkey, vbucket, true);
204         releaseHandle(handle);
205         return err_code;
206     }
207
208     static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
209                                          const void* cookie,
210                                          const char* stat_key,
211                                          int nkey,
212                                          ADD_STAT add_stat)
213     {
214         ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
215                                                                  stat_key,
216                                                                  nkey,
217                                                                  add_stat);
218         releaseHandle(handle);
219         return err_code;
220     }
221
222     static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
223                                       const void *cookie,
224                                       item* itm,
225                                       uint64_t *cas,
226                                       ENGINE_STORE_OPERATION operation,
227                                       uint16_t vbucket)
228     {
229         ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
230                                                               operation,
231                                                               vbucket);
232         releaseHandle(handle);
233         return err_code;
234     }
235
236     static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
237                                            const void* cookie,
238                                            const void* key,
239                                            const int nkey,
240                                            const bool increment,
241                                            const bool create,
242                                            const uint64_t delta,
243                                            const uint64_t initial,
244                                            const rel_time_t exptime,
245                                            item **itm,
246                                            uint8_t datatype,
247                                            uint64_t *result,
248                                            uint16_t vbucket)
249     {
250         if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
251             if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
252                 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
253                         " (Arithmetic)");
254             } else {
255                 LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
256                     "operations on compressed data!");
257             }
258             return ENGINE_EINVAL;
259         }
260         ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
261                                                                 nkey,
262                                                                 increment,
263                                                                 create, delta,
264                                                                 initial,
265                                                                 exptime, itm,
266                                                                 datatype,
267                                                                 result,
268                                                                 vbucket);
269         releaseHandle(handle);
270         return ecode;
271     }
272
273     static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
274                                       const void* cookie, time_t when)
275     {
276         ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
277         releaseHandle(handle);
278         return err_code;
279     }
280
281     static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
282     {
283         getHandle(handle)->resetStats();
284         releaseHandle(handle);
285     }
286
287     static protocol_binary_response_status stopFlusher(
288                                                  EventuallyPersistentEngine *e,
289                                                  const char **msg,
290                                                  size_t *msg_size) {
291         return e->stopFlusher(msg, msg_size);
292     }
293
294     static protocol_binary_response_status startFlusher(
295                                                  EventuallyPersistentEngine *e,
296                                                  const char **msg,
297                                                  size_t *msg_size) {
298         return e->startFlusher(msg, msg_size);
299     }
300
301     static protocol_binary_response_status setTapParam(
302                                                  EventuallyPersistentEngine *e,
303                                                  const char *keyz,
304                                                  const char *valz,
305                                                  const char **msg, size_t *) {
306         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
307
308         try {
309             int v = atoi(valz);
310             if (strcmp(keyz, "tap_keepalive") == 0) {
311                 checkNumeric(valz);
312                 validate(v, 0, MAX_TAP_KEEP_ALIVE);
313                 e->setTapKeepAlive(static_cast<uint32_t>(v));
314             } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
315                 checkNumeric(valz);
316                 e->getConfiguration().setTapThrottleThreshold(v);
317             } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
318                 checkNumeric(valz);
319                 e->getConfiguration().setTapThrottleQueueCap(v);
320             } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
321                 checkNumeric(valz);
322                 e->getConfiguration().setTapThrottleCapPcnt(v);
323             } else {
324                 *msg = "Unknown config param";
325                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
326             }
327         } catch(std::runtime_error &) {
328             *msg = "Value out of range.";
329             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
330         }
331
332         return rv;
333     }
334
335     static protocol_binary_response_status setCheckpointParam(
336                                                  EventuallyPersistentEngine *e,
337                                                               const char *keyz,
338                                                               const char *valz,
339                                                               const char **msg,
340                                                               size_t *) {
341         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
342
343         try {
344             int v = atoi(valz);
345             if (strcmp(keyz, "chk_max_items") == 0) {
346                 checkNumeric(valz);
347                 validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
348                 e->getConfiguration().setChkMaxItems(v);
349             } else if (strcmp(keyz, "chk_period") == 0) {
350                 checkNumeric(valz);
351                 validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
352                 e->getConfiguration().setChkPeriod(v);
353             } else if (strcmp(keyz, "max_checkpoints") == 0) {
354                 checkNumeric(valz);
355                 validate(v, DEFAULT_MAX_CHECKPOINTS,
356                          MAX_CHECKPOINTS_UPPER_BOUND);
357                 e->getConfiguration().setMaxCheckpoints(v);
358             } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
359                 if (strcmp(valz, "true") == 0) {
360                     e->getConfiguration().setItemNumBasedNewChk(true);
361                 } else {
362                     e->getConfiguration().setItemNumBasedNewChk(false);
363                 }
364             } else if (strcmp(keyz, "keep_closed_chks") == 0) {
365                 if (strcmp(valz, "true") == 0) {
366                     e->getConfiguration().setKeepClosedChks(true);
367                 } else {
368                     e->getConfiguration().setKeepClosedChks(false);
369                 }
370             } else if (strcmp(keyz, "enable_chk_merge") == 0) {
371                 if (strcmp(valz, "true") == 0) {
372                     e->getConfiguration().setEnableChkMerge(true);
373                 } else {
374                     e->getConfiguration().setEnableChkMerge(false);
375                 }
376             } else {
377                 *msg = "Unknown config param";
378                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
379             }
380         } catch(std::runtime_error &) {
381             *msg = "Value out of range.";
382             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
383         }
384
385         return rv;
386     }
387
388     static protocol_binary_response_status setFlushParam(
389                                                  EventuallyPersistentEngine *e,
390                                                  const char *keyz,
391                                                  const char *valz,
392                                                  const char **msg,
393                                                  size_t *) {
394         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
395
396         // Handle the actual mutation.
397         try {
398             int v = atoi(valz);
399             if (strcmp(keyz, "bg_fetch_delay") == 0) {
400                 checkNumeric(valz);
401                 e->getConfiguration().setBgFetchDelay(v);
402             } else if (strcmp(keyz, "flushall_enabled") == 0) {
403                 if (strcmp(valz, "true") == 0) {
404                     e->getConfiguration().setFlushallEnabled(true);
405                 } else if(strcmp(valz, "false") == 0) {
406                     e->getConfiguration().setFlushallEnabled(false);
407                 } else {
408                     throw std::runtime_error("value out of range.");
409                 }
410             } else if (strcmp(keyz, "max_size") == 0) {
411                 char *ptr = NULL;
412                 checkNumeric(valz);
413                 uint64_t vsize = strtoull(valz, &ptr, 10);
414                 validate(vsize, static_cast<uint64_t>(0),
415                          std::numeric_limits<uint64_t>::max());
416                 e->getConfiguration().setMaxSize(vsize);
417                 e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
418                 e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
419             } else if (strcmp(keyz, "mem_low_wat") == 0) {
420                 char *ptr = NULL;
421                 checkNumeric(valz);
422                 uint64_t vsize = strtoull(valz, &ptr, 10);
423                 validate(vsize, static_cast<uint64_t>(0),
424                          std::numeric_limits<uint64_t>::max());
425                 e->getConfiguration().setMemLowWat(vsize);
426             } else if (strcmp(keyz, "mem_high_wat") == 0) {
427                 char *ptr = NULL;
428                 checkNumeric(valz);
429                 uint64_t vsize = strtoull(valz, &ptr, 10);
430                 validate(vsize, static_cast<uint64_t>(0),
431                          std::numeric_limits<uint64_t>::max());
432                 e->getConfiguration().setMemHighWat(vsize);
433             } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
434                 checkNumeric(valz);
435                 validate(v, 0, 100);
436                 e->getConfiguration().setBackfillMemThreshold(v);
437             } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
438                 checkNumeric(valz);
439                 validate(v, 0, 100);
440                 e->getConfiguration().setCompactionExpMemThreshold(v);
441             } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
442                 checkNumeric(valz);
443                 validate(v, 0, 100);
444                 e->getConfiguration().setMutationMemThreshold(v);
445             } else if (strcmp(keyz, "timing_log") == 0) {
446                 EPStats &stats = e->getEpStats();
447                 std::ostream *old = stats.timingLog;
448                 stats.timingLog = NULL;
449                 delete old;
450                 if (strcmp(valz, "off") == 0) {
451                     LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
452                 } else {
453                     std::ofstream *tmp(new std::ofstream(valz));
454                     if (tmp->good()) {
455                         LOG(EXTENSION_LOG_INFO,
456                             "Logging detailed timings to ``%s''.", valz);
457                         stats.timingLog = tmp;
458                     } else {
459                         LOG(EXTENSION_LOG_WARNING,
460                             "Error setting detailed timing log to ``%s'':  %s",
461                             valz, strerror(errno));
462                         delete tmp;
463                     }
464                 }
465             } else if (strcmp(keyz, "exp_pager_stime") == 0) {
466                 char *ptr = NULL;
467                 checkNumeric(valz);
468                 uint64_t vsize = strtoull(valz, &ptr, 10);
469                 validate(vsize, static_cast<uint64_t>(0),
470                          std::numeric_limits<uint64_t>::max());
471                 e->getConfiguration().setExpPagerStime((size_t)vsize);
472             } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
473                 if (strcmp(valz, "true") == 0) {
474                     e->getConfiguration().setAccessScannerEnabled(true);
475                 } else if (strcmp(valz, "false") == 0) {
476                     e->getConfiguration().setAccessScannerEnabled(false);
477                 } else {
478                     throw std::runtime_error("Value expected: true/false.");
479                 }
480             } else if (strcmp(keyz, "alog_sleep_time") == 0) {
481                 checkNumeric(valz);
482                 e->getConfiguration().setAlogSleepTime(v);
483             } else if (strcmp(keyz, "alog_task_time") == 0) {
484                 checkNumeric(valz);
485                 e->getConfiguration().setAlogTaskTime(v);
486             } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
487                 checkNumeric(valz);
488                 e->getConfiguration().setPagerActiveVbPcnt(v);
489             } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
490                 checkNumeric(valz);
491                 validate(v, 0, std::numeric_limits<int>::max());
492                 e->getConfiguration().setWarmupMinMemoryThreshold(v);
493             } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
494                 checkNumeric(valz);
495                 validate(v, 0, std::numeric_limits<int>::max());
496                 e->getConfiguration().setWarmupMinItemsThreshold(v);
497             } else if (strcmp(keyz, "max_num_readers") == 0) {
498                 checkNumeric(valz);
499                 validate(v, 0, std::numeric_limits<int>::max());
500                 e->getConfiguration().setMaxNumReaders(v);
501                 ExecutorPool::get()->setMaxReaders(v);
502             } else if (strcmp(keyz, "max_num_writers") == 0) {
503                 checkNumeric(valz);
504                 validate(v, 0, std::numeric_limits<int>::max());
505                 e->getConfiguration().setMaxNumWriters(v);
506                 ExecutorPool::get()->setMaxWriters(v);
507             } else if (strcmp(keyz, "max_num_auxio") == 0) {
508                 checkNumeric(valz);
509                 validate(v, 0, std::numeric_limits<int>::max());
510                 e->getConfiguration().setMaxNumAuxio(v);
511                 ExecutorPool::get()->setMaxAuxIO(v);
512             } else if (strcmp(keyz, "max_num_nonio") == 0) {
513                 checkNumeric(valz);
514                 validate(v, 0, std::numeric_limits<int>::max());
515                 e->getConfiguration().setMaxNumNonio(v);
516                 ExecutorPool::get()->setMaxNonIO(v);
517             } else if (strcmp(keyz, "bfilter_enabled") == 0) {
518                 if (strcmp(valz, "true") == 0) {
519                     e->getConfiguration().setBfilterEnabled(true);
520                 } else if (strcmp(valz, "false") == 0) {
521                     e->getConfiguration().setBfilterEnabled(false);
522                 } else {
523                     throw std::runtime_error("Value expected: true/false.");
524                 }
525             } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
526                 float val = atof(valz);
527                 if (val >= 0.0 && val <= 1.0) {
528                     e->getConfiguration().setBfilterResidencyThreshold(val);
529                 } else {
530                     throw std::runtime_error("Value out of range [0.0-1.0].");
531                 }
532             } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
533                 if (strcmp(valz, "true") == 0) {
534                     e->getConfiguration().setDefragmenterEnabled(true);
535                 } else {
536                     e->getConfiguration().setDefragmenterEnabled(false);
537                 }
538             } else if (strcmp(keyz, "defragmenter_interval") == 0) {
539                 checkNumeric(valz);
540                 validate(v, 1, std::numeric_limits<int>::max());
541                 e->getConfiguration().setDefragmenterInterval(v);
542             } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
543                 checkNumeric(valz);
544                 validate(v, 0, std::numeric_limits<int>::max());
545                 e->getConfiguration().setDefragmenterAgeThreshold(v);
546             } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
547                 checkNumeric(valz);
548                 validate(v, 1, std::numeric_limits<int>::max());
549                 e->getConfiguration().setDefragmenterChunkDuration(v);
550             } else if (strcmp(keyz, "defragmenter_run") == 0) {
551                 e->runDefragmenterTask();
552             } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
553                 checkNumeric(valz);
554                 validate(v, 1, std::numeric_limits<int>::max());
555                 e->getConfiguration().setCompactionWriteQueueCap(v);
556             } else {
557                 *msg = "Unknown config param";
558                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
559             }
560         } catch(std::runtime_error& ex) {
561             *msg = "Value out of range.";
562             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
563         }
564
565         return rv;
566     }
567
568     static protocol_binary_response_status setDcpParam(
569                                                     EventuallyPersistentEngine *e,
570                                                     const char *keyz,
571                                                     const char *valz,
572                                                     const char **msg,
573                                                     size_t *) {
574         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
575         try {
576
577             if (strcmp(keyz, "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
578                 size_t v = atoi(valz);
579                 checkNumeric(valz);
580                 validate(v, size_t(1), std::numeric_limits<size_t>::max());
581                 e->getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(v);
582             } else if (strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") == 0) {
583                 size_t v = atoi(valz);
584                 checkNumeric(valz);
585                 validate(v, size_t(1), std::numeric_limits<size_t>::max());
586                 e->getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(v);
587             } else {
588                 *msg = "Unknown config param";
589                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
590             }
591         } catch (std::runtime_error& ex) {
592             *msg = "Value out of range.";
593             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
594         }
595
596         return rv;
597     }
598
599     static protocol_binary_response_status evictKey(
600                                                  EventuallyPersistentEngine *e,
601                                                  protocol_binary_request_header
602                                                                       *request,
603                                                  const char **msg,
604                                                  size_t *msg_size) {
605         protocol_binary_request_no_extras *req =
606             (protocol_binary_request_no_extras*)request;
607
608         char keyz[256];
609
610         // Read the key.
611         int keylen = ntohs(req->message.header.request.keylen);
612         if (keylen >= (int)sizeof(keyz)) {
613             *msg = "Key is too large.";
614             return PROTOCOL_BINARY_RESPONSE_EINVAL;
615         }
616         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
617         keyz[keylen] = 0x00;
618
619         uint16_t vbucket = ntohs(request->request.vbucket);
620
621         std::string key(keyz, keylen);
622
623         LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
624                 keyz);
625
626         protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
627                                                          msg_size);
628         if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
629             rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
630             if (e->isDegradedMode()) {
631                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
632             }
633         }
634         return rv;
635     }
636
637     static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
638                                        protocol_binary_request_header *req,
639                                        const void *cookie,
640                                        Item **itm,
641                                        const char **msg,
642                                        size_t *,
643                                        protocol_binary_response_status *res) {
644
645         uint8_t extlen = req->request.extlen;
646         if (extlen != 0 && extlen != 4) {
647             *msg = "Invalid packet format (extlen may be 0 or 4)";
648             *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
649             return ENGINE_EINVAL;
650         }
651
652         protocol_binary_request_getl *grequest =
653             (protocol_binary_request_getl*)req;
654         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
655
656         const char *keyp = reinterpret_cast<const char*>(req->bytes);
657         keyp += sizeof(req->bytes) + extlen;
658         std::string key(keyp, ntohs(req->request.keylen));
659         uint16_t vbucket = ntohs(req->request.vbucket);
660
661         RememberingCallback<GetValue> getCb;
662         uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
663         uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
664         uint32_t lockTimeout = default_timeout;
665         if (extlen == 4) {
666             lockTimeout = ntohl(grequest->message.body.expiration);
667         }
668
669         if (lockTimeout >  max_timeout || lockTimeout < 1) {
670             LOG(EXTENSION_LOG_WARNING,
671                 "Illegal value for lock timeout specified"
672                 " %u. Using default value: %u", lockTimeout, default_timeout);
673             lockTimeout = default_timeout;
674         }
675
676         bool gotLock = e->getLocked(key, vbucket, getCb,
677                                     ep_current_time(),
678                                     lockTimeout, cookie);
679
680         getCb.waitForValue();
681         ENGINE_ERROR_CODE rv = getCb.val.getStatus();
682
683         if (rv == ENGINE_SUCCESS) {
684             *itm = getCb.val.getValue();
685             ++(e->getEpStats().numOpsGet);
686         } else if (rv == ENGINE_EWOULDBLOCK) {
687
688             // need to wait for value
689             return rv;
690         } else if (rv == ENGINE_NOT_MY_VBUCKET) {
691             *msg = "That's not my bucket.";
692             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
693             return ENGINE_NOT_MY_VBUCKET;
694         } else if (!gotLock){
695             *msg =  "LOCK_ERROR";
696             *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
697             return ENGINE_TMPFAIL;
698         } else {
699             if (e->isDegradedMode()) {
700                 *msg = "LOCK_TMP_ERROR";
701                 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
702                 return ENGINE_TMPFAIL;
703             }
704
705             *msg = "NOT_FOUND";
706             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
707             return ENGINE_KEY_ENOENT;
708         }
709
710         return rv;
711     }
712
713     static protocol_binary_response_status unlockKey(
714                                                  EventuallyPersistentEngine *e,
715                                                  protocol_binary_request_header
716                                                                       *request,
717                                                  const char **msg,
718                                                  size_t *)
719     {
720         protocol_binary_request_no_extras *req =
721             (protocol_binary_request_no_extras*)request;
722
723         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
724         char keyz[256];
725
726         // Read the key.
727         int keylen = ntohs(req->message.header.request.keylen);
728         if (keylen >= (int)sizeof(keyz)) {
729             *msg = "Key is too large.";
730             return PROTOCOL_BINARY_RESPONSE_EINVAL;
731         }
732
733         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
734         keyz[keylen] = 0x00;
735
736         uint16_t vbucket = ntohs(request->request.vbucket);
737         std::string key(keyz, keylen);
738
739         LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
740
741         RememberingCallback<GetValue> getCb;
742         uint64_t cas = ntohll(request->request.cas);
743
744         ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
745                                             ep_current_time());
746
747         if (rv == ENGINE_SUCCESS) {
748             *msg = "UNLOCKED";
749         } else if (rv == ENGINE_TMPFAIL){
750             *msg =  "UNLOCK_ERROR";
751             res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
752         } else {
753             if (e->isDegradedMode()) {
754                 *msg = "LOCK_TMP_ERROR";
755                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
756             }
757
758             RCPtr<VBucket> vb = e->getVBucket(vbucket);
759             if (!vb) {
760                 *msg = "That's not my bucket.";
761                 res =  PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
762             }
763             *msg = "NOT_FOUND";
764             res =  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
765         }
766
767         return res;
768     }
769
770     static protocol_binary_response_status setParam(
771                                             EventuallyPersistentEngine *e,
772                                             protocol_binary_request_set_param
773                                                                      *req,
774                                             const char **msg,
775                                             size_t *msg_size) {
776
777         size_t keylen = ntohs(req->message.header.request.keylen);
778         uint8_t extlen = req->message.header.request.extlen;
779         size_t vallen = ntohl(req->message.header.request.bodylen);
780         protocol_binary_engine_param_t paramtype =
781             static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
782
783         if (keylen == 0 || (vallen - keylen - extlen) == 0) {
784             return PROTOCOL_BINARY_RESPONSE_EINVAL;
785         }
786
787         const char *keyp = reinterpret_cast<const char*>(req->bytes)
788                            + sizeof(req->bytes);
789         const char *valuep = keyp + keylen;
790         vallen -= (keylen + extlen);
791
792         char keyz[128];
793         char valz[512];
794
795         // Read the key.
796         if (keylen >= sizeof(keyz)) {
797             *msg = "Key is too large.";
798             return PROTOCOL_BINARY_RESPONSE_EINVAL;
799         }
800         memcpy(keyz, keyp, keylen);
801         keyz[keylen] = 0x00;
802
803         // Read the value.
804         if (vallen >= sizeof(valz)) {
805             *msg = "Value is too large.";
806             return PROTOCOL_BINARY_RESPONSE_EINVAL;
807         }
808         memcpy(valz, valuep, vallen);
809         valz[vallen] = 0x00;
810
811         protocol_binary_response_status rv;
812
813         switch (paramtype) {
814         case protocol_binary_engine_param_flush:
815             rv = setFlushParam(e, keyz, valz, msg, msg_size);
816             break;
817         case protocol_binary_engine_param_tap:
818             rv = setTapParam(e, keyz, valz, msg, msg_size);
819             break;
820         case protocol_binary_engine_param_checkpoint:
821             rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
822             break;
823         case protocol_binary_engine_param_dcp:
824             rv = setDcpParam(e, keyz, valz, msg, msg_size);
825             break;
826         default:
827             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
828         }
829
830         return rv;
831     }
832
833     static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
834                                        const void *cookie,
835                                        protocol_binary_request_header *request,
836                                        ADD_RESPONSE response) {
837         protocol_binary_request_get_vbucket *req =
838             reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
839         cb_assert(req);
840
841         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
842         RCPtr<VBucket> vb = e->getVBucket(vbucket);
843         if (!vb) {
844             LockHolder lh(e->clusterConfig.lock);
845             return sendResponse(response, NULL, 0, NULL, 0,
846                                 e->clusterConfig.config,
847                                 e->clusterConfig.len,
848                                 PROTOCOL_BINARY_RAW_BYTES,
849                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
850                                 cookie);
851         } else {
852             vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
853             return sendResponse(response, NULL, 0, NULL, 0, &state,
854                                 sizeof(state),
855                                 PROTOCOL_BINARY_RAW_BYTES,
856                                 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
857         }
858     }
859
860     static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
861                                        const void *cookie,
862                                        protocol_binary_request_header *request,
863                                        ADD_RESPONSE response) {
864
865         protocol_binary_request_set_vbucket *req =
866             reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
867
868         uint64_t cas = ntohll(req->message.header.request.cas);
869
870         size_t bodylen = ntohl(req->message.header.request.bodylen)
871             - ntohs(req->message.header.request.keylen);
872         if (bodylen != sizeof(vbucket_state_t)) {
873             const std::string msg("Incorrect packet format");
874             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
875                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
876                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
877                                 cas, cookie);
878         }
879
880         vbucket_state_t state;
881         memcpy(&state, &req->message.body.state, sizeof(state));
882         state = static_cast<vbucket_state_t>(ntohl(state));
883
884         if (!is_valid_vbucket_state_t(state)) {
885             const std::string msg("Invalid vbucket state");
886             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
887                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
888                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
889                                 cas, cookie);
890         }
891
892         uint16_t vb = ntohs(req->message.header.request.vbucket);
893         if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
894             const std::string msg("VBucket number too big");
895             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
896                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
897                                 PROTOCOL_BINARY_RESPONSE_ERANGE,
898                                 cas, cookie);
899         }
900         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
901                             PROTOCOL_BINARY_RAW_BYTES,
902                             PROTOCOL_BINARY_RESPONSE_SUCCESS,
903                             cas, cookie);
904     }
905
906     static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
907                                         const void *cookie,
908                                         protocol_binary_request_header *req,
909                                         ADD_RESPONSE response) {
910
911         uint64_t cas = ntohll(req->request.cas);
912
913         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
914         uint16_t vbucket = ntohs(req->request.vbucket);
915
916         std::string msg = "";
917         if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
918             msg = "Incorrect packet format.";
919             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
920                                 msg.length(),
921                                 PROTOCOL_BINARY_RAW_BYTES,
922                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
923         }
924
925         bool sync = false;
926         uint32_t bodylen = ntohl(req->request.bodylen);
927         if (bodylen > 0) {
928             const char* ptr = reinterpret_cast<const char*>(req->bytes) +
929                 sizeof(req->bytes);
930             if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
931                 sync = true;
932             }
933         }
934
935         ENGINE_ERROR_CODE err;
936         void* es = e->getEngineSpecific(cookie);
937         if (sync) {
938             if (es == NULL) {
939                 err = e->deleteVBucket(vbucket, cookie);
940                 e->storeEngineSpecific(cookie, e);
941             } else {
942                 e->storeEngineSpecific(cookie, NULL);
943                 LOG(EXTENSION_LOG_INFO,
944                     "Completed sync deletion of vbucket %u",
945                     (unsigned)vbucket);
946                 err = ENGINE_SUCCESS;
947             }
948         } else {
949             err = e->deleteVBucket(vbucket);
950         }
951         switch (err) {
952         case ENGINE_SUCCESS:
953             LOG(EXTENSION_LOG_WARNING,
954                 "Deletion of vbucket %d was completed.", vbucket);
955             break;
956         case ENGINE_NOT_MY_VBUCKET:
957             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
958                 "because the vbucket doesn't exist!!!", vbucket);
959             res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
960             break;
961         case ENGINE_EINVAL:
962             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
963                 "because the vbucket is not in a dead state\n", vbucket);
964             msg = "Failed to delete vbucket.  Must be in the dead state.";
965             res = PROTOCOL_BINARY_RESPONSE_EINVAL;
966             break;
967         case ENGINE_EWOULDBLOCK:
968             LOG(EXTENSION_LOG_WARNING, "Request for vbucket %d deletion is in"
969                 " EWOULDBLOCK until the database file is removed from disk",
970                 vbucket);
971             e->storeEngineSpecific(cookie, req);
972             return ENGINE_EWOULDBLOCK;
973         default:
974             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
975                 "because of unknown reasons\n", vbucket);
976             msg = "Failed to delete vbucket.  Unknown reason.";
977             res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
978         }
979
980         if (err != ENGINE_NOT_MY_VBUCKET) {
981             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
982                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
983                                 res, cas, cookie);
984         } else {
985             LockHolder lh(e->clusterConfig.lock);
986             return sendResponse(response, NULL, 0, NULL, 0,
987                                 e->clusterConfig.config,
988                                 e->clusterConfig.len,
989                                 PROTOCOL_BINARY_RAW_BYTES,
990                                 res, cas, cookie);
991         }
992
993     }
994
995     static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
996                                        protocol_binary_request_header *request,
997                                        const void *cookie,
998                                        Item **it,
999                                        const char **msg,
1000                                        protocol_binary_response_status *res) {
1001         EventuallyPersistentStore *eps = e->getEpStore();
1002         protocol_binary_request_no_extras *req =
1003             (protocol_binary_request_no_extras*)request;
1004         int keylen = ntohs(req->message.header.request.keylen);
1005         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1006         ENGINE_ERROR_CODE error_code;
1007         std::string keystr(((char *)request) + sizeof(req->message.header),
1008                             keylen);
1009
1010         GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
1011
1012         if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
1013             if (error_code == ENGINE_NOT_MY_VBUCKET) {
1014                 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1015                 return error_code;
1016             } else if (error_code == ENGINE_TMPFAIL) {
1017                 *msg = "NOT_FOUND";
1018                 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1019             } else {
1020                 return error_code;
1021             }
1022         } else {
1023             *it = rv.getValue();
1024             *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1025         }
1026         ++(e->getEpStats().numOpsGet);
1027         return ENGINE_SUCCESS;
1028     }
1029
1030     static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
1031                                        const void *cookie,
1032                                        protocol_binary_request_compact_db *req,
1033                                        ADD_RESPONSE response) {
1034
1035         std::string msg = "";
1036         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1037         compaction_ctx compactreq;
1038         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1039         uint64_t cas = ntohll(req->message.header.request.cas);
1040
1041         if (ntohs(req->message.header.request.keylen) > 0 ||
1042              req->message.header.request.extlen != 24) {
1043             LOG(EXTENSION_LOG_WARNING,
1044                     "Compaction of vbucket %d received bad ext/key len %d/%d.",
1045                     vbucket, req->message.header.request.extlen,
1046                     ntohs(req->message.header.request.keylen));
1047             msg = "Incorrect packet format.";
1048             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1049                                 msg.length(),
1050                                 PROTOCOL_BINARY_RAW_BYTES,
1051                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
1052         }
1053         EPStats &stats = e->getEpStats();
1054         compactreq.max_purged_seq = 0;
1055         compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
1056         compactreq.purge_before_seq =
1057                                     ntohll(req->message.body.purge_before_seq);
1058         compactreq.drop_deletes     = req->message.body.drop_deletes;
1059         compactreq.bfcb             = NULL;
1060
1061         ENGINE_ERROR_CODE err;
1062         void* es = e->getEngineSpecific(cookie);
1063         if (es == NULL) {
1064             ++stats.pendingCompactions;
1065             e->storeEngineSpecific(cookie, e);
1066             err = e->compactDB(vbucket, compactreq, cookie);
1067         } else {
1068             e->storeEngineSpecific(cookie, NULL);
1069             err = ENGINE_SUCCESS;
1070         }
1071
1072         switch (err) {
1073             case ENGINE_SUCCESS:
1074                 LOG(EXTENSION_LOG_INFO,
1075                     "Compaction of vbucket %d completed.", vbucket);
1076                 break;
1077             case ENGINE_NOT_MY_VBUCKET:
1078                 --stats.pendingCompactions;
1079                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1080                     "because the vbucket doesn't exist!!!", vbucket);
1081                 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1082                 break;
1083             case ENGINE_EWOULDBLOCK:
1084                 LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
1085                         "in EWOULDBLOCK state until the database file is "
1086                         "compacted on disk",
1087                         vbucket);
1088                 e->storeEngineSpecific(cookie, req);
1089                 return ENGINE_EWOULDBLOCK;
1090             case ENGINE_TMPFAIL:
1091                 LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
1092                         " a temporary failure and may need to be retried",
1093                         vbucket);
1094                 msg = "Temporary failure in compacting vbucket.";
1095                 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1096                 break;
1097             default:
1098                 --stats.pendingCompactions;
1099                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1100                     "because of unknown reasons\n", vbucket);
1101                 msg = "Failed to compact vbucket.  Unknown reason.";
1102                 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1103                 break;
1104         }
1105
1106         if (err != ENGINE_NOT_MY_VBUCKET) {
1107             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1108                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1109                                 res, cas, cookie);
1110         } else {
1111             LockHolder lh(e->clusterConfig.lock);
1112             return sendResponse(response, NULL, 0, NULL, 0,
1113                                 e->clusterConfig.config,
1114                                 e->clusterConfig.len,
1115                                 PROTOCOL_BINARY_RAW_BYTES,
1116                                 res, cas, cookie);
1117         }
1118     }
1119
1120     static ENGINE_ERROR_CODE processUnknownCommand(
1121                                        EventuallyPersistentEngine *h,
1122                                        const void* cookie,
1123                                        protocol_binary_request_header *request,
1124                                        ADD_RESPONSE response)
1125     {
1126         protocol_binary_response_status res =
1127                                       PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1128         const char *msg = NULL;
1129         size_t msg_size = 0;
1130         Item *itm = NULL;
1131
1132         EPStats &stats = h->getEpStats();
1133         ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1134
1135         /**
1136          * Session validation
1137          * (For ns_server commands only)
1138          */
1139         switch (request->request.opcode) {
1140             case PROTOCOL_BINARY_CMD_SET_PARAM:
1141             case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1142             case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1143             case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1144             case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1145             case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1146             case PROTOCOL_BINARY_CMD_COMPACT_DB:
1147             {
1148                 if (h->getEngineSpecific(cookie) == NULL) {
1149                     uint64_t cas = ntohll(request->request.cas);
1150                     if (!h->validateSessionCas(cas)) {
1151                         const std::string message("Invalid session token");
1152                         return sendResponse(response, NULL, 0, NULL, 0,
1153                                             message.c_str(), message.length(),
1154                                             PROTOCOL_BINARY_RAW_BYTES,
1155                                             PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1156                                             cas, cookie);
1157                     }
1158                 }
1159                 break;
1160             }
1161             default:
1162                 break;
1163         }
1164
1165         switch (request->request.opcode) {
1166         case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1167             return h->getAllVBucketSequenceNumbers(cookie, request, response);
1168
1169         case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1170             {
1171                 BlockTimer timer(&stats.getVbucketCmdHisto);
1172                 rv = getVBucket(h, cookie, request, response);
1173                 return rv;
1174             }
1175         case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1176             {
1177                 BlockTimer timer(&stats.delVbucketCmdHisto);
1178                 rv = delVBucket(h, cookie, request, response);
1179                 if (rv != ENGINE_EWOULDBLOCK) {
1180                     h->decrementSessionCtr();
1181                     h->storeEngineSpecific(cookie, NULL);
1182                 }
1183                 return rv;
1184             }
1185         case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1186             {
1187                 BlockTimer timer(&stats.setVbucketCmdHisto);
1188                 rv = setVBucket(h, cookie, request, response);
1189                 h->decrementSessionCtr();
1190                 return rv;
1191             }
1192         case PROTOCOL_BINARY_CMD_TOUCH:
1193         case PROTOCOL_BINARY_CMD_GAT:
1194         case PROTOCOL_BINARY_CMD_GATQ:
1195             {
1196                 rv = h->touch(cookie, request, response);
1197                 return rv;
1198             }
1199         case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1200             res = stopFlusher(h, &msg, &msg_size);
1201             break;
1202         case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1203             res = startFlusher(h, &msg, &msg_size);
1204             break;
1205         case PROTOCOL_BINARY_CMD_SET_PARAM:
1206             res = setParam(h,
1207                   reinterpret_cast<protocol_binary_request_set_param*>(request),
1208                             &msg, &msg_size);
1209             h->decrementSessionCtr();
1210             break;
1211         case PROTOCOL_BINARY_CMD_EVICT_KEY:
1212             res = evictKey(h, request, &msg, &msg_size);
1213             break;
1214         case PROTOCOL_BINARY_CMD_GET_LOCKED:
1215             rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1216             if (rv == ENGINE_EWOULDBLOCK) {
1217                 // we dont have the value for the item yet
1218                 return rv;
1219             }
1220             break;
1221         case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
1222             res = unlockKey(h, request, &msg, &msg_size);
1223             break;
1224         case PROTOCOL_BINARY_CMD_OBSERVE:
1225             return h->observe(cookie, request, response);
1226         case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1227             return h->observe_seqno(cookie, request, response);
1228         case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1229             {
1230                 rv = h->deregisterTapClient(cookie, request, response);
1231                 h->decrementSessionCtr();
1232                 return rv;
1233             }
1234         case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN:
1235             {
1236                 rv = h->resetReplicationChain(cookie, request, response);
1237                 return rv;
1238             }
1239         case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1240             {
1241                 rv = h->changeTapVBFilter(cookie, request, response);
1242                 h->decrementSessionCtr();
1243                 return rv;
1244             }
1245         case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1246         case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1247         case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE:
1248             {
1249                 rv = h->handleCheckpointCmds(cookie, request, response);
1250                 return rv;
1251             }
1252         case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE:
1253             {
1254                 rv = h->handleSeqnoCmds(cookie, request, response);
1255                 return rv;
1256             }
1257         case PROTOCOL_BINARY_CMD_GET_META:
1258         case PROTOCOL_BINARY_CMD_GETQ_META:
1259             {
1260                 rv = h->getMeta(cookie,
1261                         reinterpret_cast<protocol_binary_request_get_meta*>
1262                                                           (request), response);
1263                 return rv;
1264             }
1265         case PROTOCOL_BINARY_CMD_SET_WITH_META:
1266         case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1267         case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1268         case PROTOCOL_BINARY_CMD_ADDQ_WITH_META:
1269             {
1270                 rv = h->setWithMeta(cookie,
1271                      reinterpret_cast<protocol_binary_request_set_with_meta*>
1272                                                           (request), response);
1273                 return rv;
1274             }
1275         case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1276         case PROTOCOL_BINARY_CMD_DELQ_WITH_META:
1277             {
1278                 rv = h->deleteWithMeta(cookie,
1279                     reinterpret_cast<protocol_binary_request_delete_with_meta*>
1280                                                           (request), response);
1281                 return rv;
1282             }
1283         case PROTOCOL_BINARY_CMD_RETURN_META:
1284             {
1285                 return h->returnMeta(cookie,
1286                 reinterpret_cast<protocol_binary_request_return_meta*>
1287                                                           (request), response);
1288             }
1289         case PROTOCOL_BINARY_CMD_GET_REPLICA:
1290             rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1291             if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1292                 return rv;
1293             }
1294             break;
1295         case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1296         case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC:
1297             {
1298                 rv = h->handleTrafficControlCmd(cookie, request, response);
1299                 return rv;
1300             }
1301         case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1302             {
1303                 rv = h->setClusterConfig(cookie,
1304                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1305                                                           (request), response);
1306                 h->decrementSessionCtr();
1307                 return rv;
1308             }
1309         case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1310             return h->getClusterConfig(cookie,
1311                reinterpret_cast<protocol_binary_request_get_cluster_config*>
1312                                                           (request), response);
1313         case PROTOCOL_BINARY_CMD_COMPACT_DB:
1314             {
1315                 rv = compactDB(h, cookie,
1316                                (protocol_binary_request_compact_db*)(request),
1317                                response);
1318                 if (rv != ENGINE_EWOULDBLOCK) {
1319                     h->decrementSessionCtr();
1320                     h->storeEngineSpecific(cookie, NULL);
1321                 }
1322                 return rv;
1323             }
1324         case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
1325             {
1326                 if (request->request.extlen != 0 ||
1327                     request->request.keylen != 0 ||
1328                     request->request.bodylen != 0) {
1329                     return ENGINE_EINVAL;
1330                 }
1331                 return h->getRandomKey(cookie, response);
1332             }
1333         case CMD_GET_KEYS:
1334             {
1335                 return h->getAllKeys(cookie,
1336                    reinterpret_cast<protocol_binary_request_get_keys*>
1337                                                            (request), response);
1338             }
1339         case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1340             {
1341                 return h->getAdjustedTime(cookie,
1342                    reinterpret_cast<protocol_binary_request_get_adjusted_time*>
1343                                                            (request), response);
1344             }
1345         case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE:
1346             {
1347                 return h->setDriftCounterState(cookie,
1348                 reinterpret_cast<protocol_binary_request_set_drift_counter_state*>
1349                                                            (request), response);
1350             }
1351         }
1352
1353         // Send a special response for getl since we don't want to send the key
1354         if (itm && request->request.opcode == PROTOCOL_BINARY_CMD_GET_LOCKED) {
1355             uint32_t flags = itm->getFlags();
1356             rv = sendResponse(response, NULL, 0, (const void *)&flags,
1357                               sizeof(uint32_t),
1358                               static_cast<const void *>(itm->getData()),
1359                               itm->getNBytes(), itm->getDataType(),
1360                               static_cast<uint16_t>(res), itm->getCas(),
1361                               cookie);
1362             delete itm;
1363         } else if (itm) {
1364             const std::string &key  = itm->getKey();
1365             uint32_t flags = itm->getFlags();
1366             rv = sendResponse(response, static_cast<const void *>(key.data()),
1367                               itm->getNKey(),
1368                               (const void *)&flags, sizeof(uint32_t),
1369                               static_cast<const void *>(itm->getData()),
1370                               itm->getNBytes(), itm->getDataType(),
1371                               static_cast<uint16_t>(res), itm->getCas(),
1372                               cookie);
1373             delete itm;
1374         } else  if (rv == ENGINE_NOT_MY_VBUCKET) {
1375             LockHolder lh(h->clusterConfig.lock);
1376             return sendResponse(response, NULL, 0, NULL, 0,
1377                                 h->clusterConfig.config,
1378                                 h->clusterConfig.len,
1379                                 PROTOCOL_BINARY_RAW_BYTES,
1380                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1381                                 cookie);
1382         } else {
1383             msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1384             rv = sendResponse(response, NULL, 0, NULL, 0,
1385                               msg, static_cast<uint16_t>(msg_size),
1386                               PROTOCOL_BINARY_RAW_BYTES,
1387                               static_cast<uint16_t>(res), 0, cookie);
1388
1389         }
1390         return rv;
1391     }
1392
1393     static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1394                                                const void* cookie,
1395                                                protocol_binary_request_header
1396                                                                       *request,
1397                                                ADD_RESPONSE response)
1398     {
1399         ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1400                                                            cookie,
1401                                                            request, response);
1402         releaseHandle(handle);
1403         return err_code;
1404     }
1405
1406     static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1407                               item *itm, uint64_t cas) {
1408         static_cast<Item*>(itm)->setCas(cas);
1409     }
1410
1411     static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1412                                           const void *cookie,
1413                                           void *engine_specific,
1414                                           uint16_t nengine,
1415                                           uint8_t ttl,
1416                                           uint16_t tap_flags,
1417                                           tap_event_t tap_event,
1418                                           uint32_t tap_seqno,
1419                                           const void *key,
1420                                           size_t nkey,
1421                                           uint32_t flags,
1422                                           uint32_t exptime,
1423                                           uint64_t cas,
1424                                           uint8_t datatype,
1425                                           const void *data,
1426                                           size_t ndata,
1427                                           uint16_t vbucket)
1428     {
1429         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1430             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1431                     " (TapNotify)");
1432             return ENGINE_EINVAL;
1433         }
1434         ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1435                                                         engine_specific,
1436                                                         nengine, ttl,
1437                                                         tap_flags,
1438                                                         (uint16_t)tap_event,
1439                                                         tap_seqno,
1440                                                         key, nkey, flags,
1441                                                         exptime, cas,
1442                                                         datatype, data,
1443                                                         ndata, vbucket);
1444         releaseHandle(handle);
1445         return err_code;
1446     }
1447
1448     static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1449                                       const void *cookie, item **itm,
1450                                       void **es, uint16_t *nes, uint8_t *ttl,
1451                                       uint16_t *flags, uint32_t *seqno,
1452                                       uint16_t *vbucket) {
1453         uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1454                                                              nes, ttl,
1455                                                              flags, seqno,
1456                                                              vbucket);
1457         releaseHandle(handle);
1458         return static_cast<tap_event_t>(tap_event);
1459     }
1460
1461     static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1462                                           const void* cookie,
1463                                           const void* client,
1464                                           size_t nclient,
1465                                           uint32_t flags,
1466                                           const void* userdata,
1467                                           size_t nuserdata)
1468     {
1469         EventuallyPersistentEngine *h = getHandle(handle);
1470         TAP_ITERATOR iterator = NULL;
1471         {
1472             std::string c(static_cast<const char*>(client), nclient);
1473             // Figure out what we want from the userdata before adding it to
1474             // the API to the handle
1475             if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1476                 iterator = EvpTapIterator;
1477             }
1478         }
1479         releaseHandle(handle);
1480         return iterator;
1481     }
1482
1483
1484     static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1485                                        const void* cookie,
1486                                        struct dcp_message_producers *producers)
1487     {
1488         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1489         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1490         if (conn) {
1491             errCode = conn->step(producers);
1492         }
1493         releaseHandle(handle);
1494         return errCode;
1495     }
1496
1497
1498     static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1499                                         const void* cookie,
1500                                         uint32_t opaque,
1501                                         uint32_t seqno,
1502                                         uint32_t flags,
1503                                         void *name,
1504                                         uint16_t nname)
1505     {
1506         ENGINE_ERROR_CODE errCode;
1507         errCode = getHandle(handle)->dcpOpen(cookie, opaque, seqno, flags,
1508                                              name, nname);
1509         releaseHandle(handle);
1510         return errCode;
1511     }
1512
1513     static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1514                                              const void* cookie,
1515                                              uint32_t opaque,
1516                                              uint16_t vbucket,
1517                                              uint32_t flags)
1518     {
1519         ENGINE_ERROR_CODE errCode = getHandle(handle)->dcpAddStream(cookie,
1520                                                                     opaque,
1521                                                                     vbucket,
1522                                                                     flags);
1523         releaseHandle(handle);
1524         return errCode;
1525     }
1526
1527     static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1528                                                const void* cookie,
1529                                                uint32_t opaque,
1530                                                uint16_t vbucket)
1531     {
1532         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1533         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1534         if (conn) {
1535             errCode = conn->closeStream(opaque, vbucket);
1536         }
1537         releaseHandle(handle);
1538         return errCode;
1539     }
1540
1541
1542     static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1543                                              const void* cookie,
1544                                              uint32_t flags,
1545                                              uint32_t opaque,
1546                                              uint16_t vbucket,
1547                                              uint64_t startSeqno,
1548                                              uint64_t endSeqno,
1549                                              uint64_t vbucketUuid,
1550                                              uint64_t snapStartSeqno,
1551                                              uint64_t snapEndSeqno,
1552                                              uint64_t *rollbackSeqno,
1553                                              dcp_add_failover_log callback)
1554     {
1555         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1556         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1557         if (conn) {
1558             errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1559                                           endSeqno, vbucketUuid, snapStartSeqno,
1560                                           snapEndSeqno, rollbackSeqno, callback);
1561         }
1562         releaseHandle(handle);
1563         return errCode;
1564     }
1565
1566     static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1567                                                  const void* cookie,
1568                                                  uint32_t opaque,
1569                                                  uint16_t vbucket,
1570                                                  dcp_add_failover_log callback)
1571     {
1572         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1573         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1574         if (conn) {
1575             errCode = conn->getFailoverLog(opaque, vbucket, callback);
1576         }
1577         releaseHandle(handle);
1578         return errCode;
1579     }
1580
1581
1582     static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1583                                              const void* cookie,
1584                                              uint32_t opaque,
1585                                              uint16_t vbucket,
1586                                              uint32_t flags)
1587     {
1588         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1589         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1590         if (conn) {
1591             errCode = conn->streamEnd(opaque, vbucket, flags);
1592         }
1593         releaseHandle(handle);
1594         return errCode;
1595     }
1596
1597
1598     static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1599                                                   const void* cookie,
1600                                                   uint32_t opaque,
1601                                                   uint16_t vbucket,
1602                                                   uint64_t start_seqno,
1603                                                   uint64_t end_seqno,
1604                                                   uint32_t flags)
1605     {
1606         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1607         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1608         if (conn) {
1609             errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1610                                            end_seqno, flags);
1611         }
1612         releaseHandle(handle);
1613         return errCode;
1614     }
1615
1616     static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1617                                             const void* cookie,
1618                                             uint32_t opaque,
1619                                             const void *key,
1620                                             uint16_t nkey,
1621                                             const void *value,
1622                                             uint32_t nvalue,
1623                                             uint64_t cas,
1624                                             uint16_t vbucket,
1625                                             uint32_t flags,
1626                                             uint8_t datatype,
1627                                             uint64_t bySeqno,
1628                                             uint64_t revSeqno,
1629                                             uint32_t expiration,
1630                                             uint32_t lockTime,
1631                                             const void *meta,
1632                                             uint16_t nmeta,
1633                                             uint8_t nru)
1634     {
1635         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1636             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1637                     " (DCPMutation)");
1638             return ENGINE_EINVAL;
1639         }
1640         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1641         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1642         if (conn) {
1643             errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1644                                      vbucket, flags, datatype, lockTime,
1645                                      bySeqno, revSeqno, expiration,
1646                                      nru, meta, nmeta);
1647         }
1648         releaseHandle(handle);
1649         return errCode;
1650     }
1651
1652     static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1653                                             const void* cookie,
1654                                             uint32_t opaque,
1655                                             const void *key,
1656                                             uint16_t nkey,
1657                                             uint64_t cas,
1658                                             uint16_t vbucket,
1659                                             uint64_t bySeqno,
1660                                             uint64_t revSeqno,
1661                                             const void *meta,
1662                                             uint16_t nmeta)
1663     {
1664         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1665         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1666         if (conn) {
1667             errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1668                                      revSeqno, meta, nmeta);
1669         }
1670         releaseHandle(handle);
1671         return errCode;
1672     }
1673
1674     static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1675                                               const void* cookie,
1676                                               uint32_t opaque,
1677                                               const void *key,
1678                                               uint16_t nkey,
1679                                               uint64_t cas,
1680                                               uint16_t vbucket,
1681                                               uint64_t bySeqno,
1682                                               uint64_t revSeqno,
1683                                               const void *meta,
1684                                               uint16_t nmeta)
1685     {
1686         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1687         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1688         if (conn) {
1689             errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1690                                        revSeqno, meta, nmeta);
1691         }
1692         releaseHandle(handle);
1693         return errCode;
1694     }
1695
1696     static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1697                                          const void* cookie,
1698                                          uint32_t opaque,
1699                                          uint16_t vbucket)
1700     {
1701         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1702         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1703         if (conn) {
1704             errCode = conn->flushall(opaque, vbucket);
1705         }
1706         releaseHandle(handle);
1707         return errCode;
1708     }
1709
1710     static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1711                                                    const void* cookie,
1712                                                    uint32_t opaque,
1713                                                    uint16_t vbucket,
1714                                                    vbucket_state_t state)
1715     {
1716         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1717         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1718         if (conn) {
1719             errCode = conn->setVBucketState(opaque, vbucket, state);
1720         }
1721         releaseHandle(handle);
1722         return errCode;
1723     }
1724
1725     static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1726                                         const void* cookie,
1727                                         uint32_t opaque) {
1728         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1729         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1730         if (conn) {
1731             errCode = conn->noop(opaque);
1732         }
1733         releaseHandle(handle);
1734         return errCode;
1735     }
1736
1737     static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1738                                                          const void* cookie,
1739                                                          uint32_t opaque,
1740                                                          uint16_t vbucket,
1741                                                          uint32_t buffer_bytes) {
1742         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1743         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1744         if (conn) {
1745             errCode = conn->bufferAcknowledgement(opaque, vbucket,
1746                                                   buffer_bytes);
1747         }
1748         releaseHandle(handle);
1749         return errCode;
1750     }
1751
1752     static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1753                                            const void* cookie,
1754                                            uint32_t opaque,
1755                                            const void *key,
1756                                            uint16_t nkey,
1757                                            const void *value,
1758                                            uint32_t nvalue) {
1759         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1760         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1761         if (conn) {
1762             errCode = conn->control(opaque, key, nkey, value, nvalue);
1763         }
1764         releaseHandle(handle);
1765         return errCode;
1766     }
1767
1768     static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1769                                      const void* cookie,
1770                                      protocol_binary_response_header *response)
1771     {
1772         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1773         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1774         if (conn) {
1775             errCode = conn->handleResponse(response);
1776         }
1777         releaseHandle(handle);
1778         return errCode;
1779     }
1780
1781     static void EvpHandleDisconnect(const void *cookie,
1782                                     ENGINE_EVENT_TYPE type,
1783                                     const void *event_data,
1784                                     const void *cb_data)
1785     {
1786         cb_assert(type == ON_DISCONNECT);
1787         cb_assert(event_data == NULL);
1788         void *c = const_cast<void*>(cb_data);
1789         getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1790         releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1791     }
1792
1793
1794     /**
1795      * The only public interface to the eventually persistance engine.
1796      * Allocate a new instance and initialize it
1797      * @param interface the highest interface the server supports (we only
1798      *                  support interface 1)
1799      * @param get_server_api callback function to get the server exported API
1800      *                  functions
1801      * @param handle Where to return the new instance
1802      * @return ENGINE_SUCCESS on success
1803      */
1804     ENGINE_ERROR_CODE create_instance(uint64_t interface,
1805                                       GET_SERVER_API get_server_api,
1806                                       ENGINE_HANDLE **handle)
1807     {
1808         SERVER_HANDLE_V1 *api = get_server_api();
1809         if (interface != 1 || api == NULL) {
1810             return ENGINE_ENOTSUP;
1811         }
1812
1813         hooksApi = api->alloc_hooks;
1814         EventuallyPersistentEngine::loggerApi = api->log;
1815         MemoryTracker::getInstance();
1816         ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1817
1818         AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1819
1820         ObjectRegistry::setStats(inital_tracking);
1821         EventuallyPersistentEngine *engine;
1822         engine = new EventuallyPersistentEngine(get_server_api);
1823         ObjectRegistry::setStats(NULL);
1824
1825         if (engine == NULL) {
1826             return ENGINE_ENOMEM;
1827         }
1828
1829         if (MemoryTracker::trackingMemoryAllocations()) {
1830             engine->getEpStats().memoryTrackerEnabled.store(true);
1831             engine->getEpStats().totalMemory.store(inital_tracking->load());
1832         }
1833         delete inital_tracking;
1834
1835         initialize_time_functions(api->core);
1836
1837         *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1838
1839         return ENGINE_SUCCESS;
1840     }
1841
1842     /*
1843         This method is called prior to unloading of the shared-object.
1844         Global clean-up should be performed from this method.
1845      */
1846     void destroy_engine() {
1847         ExecutorPool::shutdown();
1848      }
1849
1850     static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
1851                                const item* itm, item_info *itm_info)
1852     {
1853         const Item *it = reinterpret_cast<const Item*>(itm);
1854         EventuallyPersistentEngine *engine = getHandle(handle);
1855         if (itm_info->nvalue < 1) {
1856             return false;
1857         }
1858         itm_info->cas = it->getCas();
1859
1860         if (engine) {
1861             RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(it->getVBucketId());
1862
1863             if (vb) {
1864                 itm_info->vbucket_uuid = vb->failovers->getLatestUUID();
1865             } else {
1866                 itm_info->vbucket_uuid = 0;
1867             }
1868
1869             releaseHandle(handle);
1870         } else{
1871             itm_info->vbucket_uuid = 0;
1872         }
1873
1874         itm_info->seqno = it->getBySeqno();
1875         itm_info->exptime = it->getExptime();
1876         itm_info->nbytes = it->getNBytes();
1877         itm_info->datatype = it->getDataType();
1878         itm_info->flags = it->getFlags();
1879         itm_info->clsid = 0;
1880         itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1881         itm_info->nvalue = 1;
1882         itm_info->key = it->getKey().c_str();
1883         itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1884         itm_info->value[0].iov_len = it->getNBytes();
1885         return true;
1886     }
1887
1888     static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1889                                item* itm, const item_info *itm_info)
1890     {
1891         Item *it = reinterpret_cast<Item*>(itm);
1892         if (!it) {
1893             return false;
1894         }
1895         it->setDataType(itm_info->datatype);
1896         return true;
1897     }
1898
1899     static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1900                                                  const void* cookie,
1901                                                  engine_get_vb_map_cb callback)
1902     {
1903         EventuallyPersistentEngine *h = getHandle(handle);
1904         LockHolder lh(h->clusterConfig.lock);
1905         uint8_t *config = h->clusterConfig.config;
1906         uint32_t len = h->clusterConfig.len;
1907         releaseHandle(handle);
1908         return callback(cookie, config, len);
1909     }
1910
1911 } // C linkage
1912
1913 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1914     char buffer[2048];
1915
1916     if (EventuallyPersistentEngine::loggerApi != NULL) {
1917         EXTENSION_LOGGER_DESCRIPTOR* logger =
1918             (EXTENSION_LOGGER_DESCRIPTOR*)EventuallyPersistentEngine::loggerApi->get_logger();
1919
1920         if (EventuallyPersistentEngine::loggerApi->get_level() <= severity) {
1921             EventuallyPersistentEngine *engine = ObjectRegistry::onSwitchThread(NULL, true);
1922             va_list va;
1923             va_start(va, fmt);
1924             vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1925             if (engine) {
1926                 logger->log(severity, NULL, "(%s) %s", engine->getName(),
1927                             buffer);
1928             } else {
1929                 logger->log(severity, NULL, "(No Engine) %s", buffer);
1930             }
1931             va_end(va);
1932             ObjectRegistry::onSwitchThread(engine);
1933         }
1934     }
1935 }
1936
1937 ALLOCATOR_HOOKS_API *getHooksApi(void) {
1938     return hooksApi;
1939 }
1940
1941 EventuallyPersistentEngine::EventuallyPersistentEngine(
1942                                     GET_SERVER_API get_server_api) :
1943     clusterConfig(), epstore(NULL), workload(NULL),
1944     workloadPriority(NO_BUCKET_PRIORITY),
1945     tapThrottle(NULL), getServerApiFunc(get_server_api),
1946     dcpConnMap_(NULL), tapConnMap(NULL) ,tapConfig(NULL), checkpointConfig(NULL),
1947     trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1948 {
1949     interface.interface = 1;
1950     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1951     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1952     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1953     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1954     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1955     ENGINE_HANDLE_V1::release = EvpItemRelease;
1956     ENGINE_HANDLE_V1::get = EvpGet;
1957     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1958     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1959     ENGINE_HANDLE_V1::store = EvpStore;
1960     ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1961     ENGINE_HANDLE_V1::flush = EvpFlush;
1962     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1963     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1964     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1965     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1966     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1967     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1968     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1969     ENGINE_HANDLE_V1::get_stats_struct = NULL;
1970     ENGINE_HANDLE_V1::aggregate_stats = NULL;
1971
1972
1973     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1974     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1975     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1976     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1977     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1978     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1979     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1980     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1981     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1982     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1983     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1984     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1985     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1986     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1987     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1988     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1989     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1990
1991     serverApi = getServerApiFunc();
1992     memset(&info, 0, sizeof(info));
1993     info.info.description = "EP engine v" VERSION;
1994     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1995     info.info.features[info.info.num_features++].feature =
1996                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1997     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1998     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1999 }
2000
2001 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
2002 {
2003     EventuallyPersistentEngine *epe =
2004                                     ObjectRegistry::onSwitchThread(NULL, true);
2005     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
2006     ObjectRegistry::onSwitchThread(epe);
2007     return rv;
2008 }
2009
2010 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
2011 {
2012     EventuallyPersistentEngine *epe =
2013                                     ObjectRegistry::onSwitchThread(NULL, true);
2014     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
2015     ObjectRegistry::onSwitchThread(epe);
2016     return rv;
2017 }
2018
2019 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
2020                                                         EVENT_CALLBACK cb,
2021                                                         const void *cb_data) {
2022     EventuallyPersistentEngine *epe =
2023                                     ObjectRegistry::onSwitchThread(NULL, true);
2024     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
2025     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
2026                             type, cb, cb_data);
2027     ObjectRegistry::onSwitchThread(epe);
2028 }
2029
2030 /**
2031  * A configuration value changed listener that responds to ep-engine
2032  * parameter changes by invoking engine-specific methods on
2033  * configuration change events.
2034  */
2035 class EpEngineValueChangeListener : public ValueChangedListener {
2036 public:
2037     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
2038         // EMPTY
2039     }
2040
2041     virtual void sizeValueChanged(const std::string &key, size_t value) {
2042         if (key.compare("getl_max_timeout") == 0) {
2043             engine.setGetlMaxTimeout(value);
2044         } else if (key.compare("getl_default_timeout") == 0) {
2045             engine.setGetlDefaultTimeout(value);
2046         } else if (key.compare("max_item_size") == 0) {
2047             engine.setMaxItemSize(value);
2048         }
2049     }
2050
2051     virtual void booleanValueChanged(const std::string &key, bool value) {
2052         if (key.compare("flushall_enabled") == 0) {
2053             engine.setFlushAll(value);
2054         }
2055     }
2056 private:
2057     EventuallyPersistentEngine &engine;
2058 };
2059
2060
2061
2062 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
2063     resetStats();
2064     if (config != NULL) {
2065         if (!configuration.parseConfiguration(config, serverApi)) {
2066             return ENGINE_FAILED;
2067         }
2068     }
2069
2070     name = configuration.getCouchBucket();
2071     maxFailoverEntries = configuration.getMaxFailoverEntries();
2072
2073     // Start updating the variables from the config!
2074     HashTable::setDefaultNumBuckets(configuration.getHtSize());
2075     HashTable::setDefaultNumLocks(configuration.getHtLocks());
2076     StoredValue::setMutationMemoryThreshold(
2077                                       configuration.getMutationMemThreshold());
2078
2079     if (configuration.getMaxSize() == 0) {
2080         configuration.setMaxSize(std::numeric_limits<size_t>::max());
2081     }
2082
2083     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
2084         configuration.setMemLowWat(percentOf(
2085                                             configuration.getMaxSize(), 0.75));
2086     }
2087
2088     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
2089         configuration.setMemHighWat(percentOf(
2090                                             configuration.getMaxSize(), 0.85));
2091     }
2092
2093     maxItemSize = configuration.getMaxItemSize();
2094     configuration.addValueChangedListener("max_item_size",
2095                                        new EpEngineValueChangeListener(*this));
2096
2097     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
2098     configuration.addValueChangedListener("getl_default_timeout",
2099                                        new EpEngineValueChangeListener(*this));
2100     getlMaxTimeout = configuration.getGetlMaxTimeout();
2101     configuration.addValueChangedListener("getl_max_timeout",
2102                                        new EpEngineValueChangeListener(*this));
2103
2104     flushAllEnabled = configuration.isFlushallEnabled();
2105     configuration.addValueChangedListener("flushall_enabled",
2106                                        new EpEngineValueChangeListener(*this));
2107
2108     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2109                                   configuration.getMaxNumShards());
2110     if ((unsigned int)workload->getNumShards() >
2111                                               configuration.getMaxVbuckets()) {
2112         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2113             "equal or less than max number of vbuckets");
2114         return ENGINE_FAILED;
2115     }
2116
2117     dcpConnMap_ = new DcpConnMap(*this);
2118     tapConnMap = new TapConnMap(*this);
2119     tapConfig = new TapConfig(*this);
2120     tapThrottle = new TapThrottle(configuration, stats);
2121     TapConfig::addConfigChangeListener(*this);
2122
2123     checkpointConfig = new CheckpointConfig(*this);
2124     CheckpointConfig::addConfigChangeListener(*this);
2125
2126     epstore = new EventuallyPersistentStore(*this);
2127
2128     initializeEngineCallbacks();
2129
2130     // Complete the initialization of the ep-store
2131     if (!epstore->initialize()) {
2132         return ENGINE_FAILED;
2133     }
2134
2135     if(configuration.isDataTrafficEnabled()) {
2136         enableTraffic(true);
2137     }
2138
2139     tapConnMap->initialize(TAP_CONN_NOTIFIER);
2140     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2141
2142     // record engine initialization time
2143     startupTime.store(ep_real_time());
2144
2145     LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
2146
2147     return ENGINE_SUCCESS;
2148 }
2149
2150 void EventuallyPersistentEngine::destroy(bool force) {
2151     stats.forceShutdown = force;
2152     stats.isShutdown = true;
2153
2154     if (epstore) {
2155         epstore->snapshotStats();
2156     }
2157     if (tapConnMap) {
2158         tapConnMap->shutdownAllConnections();
2159     }
2160     if (dcpConnMap_) {
2161         dcpConnMap_->shutdownAllConnections();
2162     }
2163 }
2164
2165 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie,
2166                                                     time_t when){
2167     if (!flushAllEnabled) {
2168         return ENGINE_ENOTSUP;
2169     }
2170
2171     if (!isDegradedMode()) {
2172         return ENGINE_TMPFAIL;
2173     }
2174
2175     /*
2176      * Supporting only a SYNC operation for bucket flush
2177      */
2178
2179     void* es = getEngineSpecific(cookie);
2180     if (es == NULL) {
2181
2182         // Check if diskFlushAll was false and set it to true
2183         // if yes, if the atomic variable weren't false, then
2184         // we will assume that a flushAll has been scheduled
2185         // already and return TMPFAIL.
2186         if (epstore->scheduleFlushAllTask(cookie, when)) {
2187             storeEngineSpecific(cookie, this);
2188             return ENGINE_EWOULDBLOCK;
2189         } else {
2190             LOG(EXTENSION_LOG_INFO, "Tried to trigger a bucket flush, but"
2191                     "there seems to be a task running already!");
2192             return ENGINE_TMPFAIL;
2193         }
2194
2195     } else {
2196         storeEngineSpecific(cookie, NULL);
2197         LOG(EXTENSION_LOG_WARNING, "Completed bucket flush operation");
2198         return ENGINE_SUCCESS;
2199     }
2200 }
2201
2202 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2203                                                     item* itm,
2204                                                     uint64_t *cas,
2205                                                     ENGINE_STORE_OPERATION
2206                                                                      operation,
2207                                                     uint16_t vbucket) {
2208     BlockTimer timer(&stats.storeCmdHisto);
2209     ENGINE_ERROR_CODE ret;
2210     Item *it = static_cast<Item*>(itm);
2211     item *i = NULL;
2212
2213     it->setVBucketId(vbucket);
2214
2215     switch (operation) {
2216     case OPERATION_CAS:
2217         if (it->getCas() == 0) {
2218             // Using a cas command with a cas wildcard doesn't make sense
2219             ret = ENGINE_NOT_STORED;
2220             break;
2221         }
2222         // FALLTHROUGH
2223     case OPERATION_SET:
2224         if (isDegradedMode()) {
2225             return ENGINE_TMPFAIL;
2226         }
2227         ret = epstore->set(*it, cookie);
2228         if (ret == ENGINE_SUCCESS) {
2229             *cas = it->getCas();
2230         }
2231
2232         break;
2233
2234     case OPERATION_ADD:
2235         if (isDegradedMode()) {
2236             return ENGINE_TMPFAIL;
2237         }
2238
2239         if (it->getCas() != 0) {
2240             // Adding an item with a cas value doesn't really make sense...
2241             return ENGINE_KEY_EEXISTS;
2242         }
2243
2244         ret = epstore->add(*it, cookie);
2245         if (ret == ENGINE_SUCCESS) {
2246             *cas = it->getCas();
2247         }
2248         break;
2249
2250     case OPERATION_REPLACE:
2251         ret = epstore->replace(*it, cookie);
2252         if (ret == ENGINE_SUCCESS) {
2253             *cas = it->getCas();
2254         }
2255         break;
2256
2257     case OPERATION_APPEND:
2258     case OPERATION_PREPEND:
2259         do {
2260             if ((ret = get(cookie, &i, it->getKey().c_str(),
2261                            it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2262                 Item *old = reinterpret_cast<Item*>(i);
2263
2264                 if (old->getCas() == (uint64_t) -1) {
2265                     // item is locked against updates
2266                     itemRelease(cookie, i);
2267                     return ENGINE_TMPFAIL;
2268                 }
2269
2270                 if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2271                     itemRelease(cookie, i);
2272                     return ENGINE_KEY_EEXISTS;
2273                 }
2274
2275                 if (operation == OPERATION_APPEND) {
2276                     ret = old->append(*it, maxItemSize);
2277                 } else {
2278                     ret = old->prepend(*it, maxItemSize);
2279                 }
2280
2281                 if (ret != ENGINE_SUCCESS) {
2282                     itemRelease(cookie, i);
2283                     if (ret == ENGINE_E2BIG) {
2284                         return ret;
2285                     } else {
2286                         return memoryCondition();
2287                     }
2288                 } else {
2289                     if (old->getDataType() == PROTOCOL_BINARY_DATATYPE_JSON) {
2290                         // Set the datatype of the new document to BINARY (0),
2291                         // as appending/prepending anything to JSON breaks the
2292                         // json data structure.
2293                         old->setDataType(PROTOCOL_BINARY_RAW_BYTES);
2294                     } else if (old->getDataType() ==
2295                                     PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
2296                         // Set the datatype of the new document to
2297                         // COMPRESSED_BINARY, as appending/prepending anything
2298                         // to JSON breaks the json data structure.
2299                         old->setDataType(PROTOCOL_BINARY_DATATYPE_COMPRESSED);
2300                     }
2301                 }
2302
2303                 ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2304
2305                 it->setBySeqno(old->getBySeqno());
2306                 itemRelease(cookie, i);
2307             }
2308         } while (ret == ENGINE_KEY_EEXISTS);
2309
2310         // Map the error code back to what memcapable expects
2311         if (ret == ENGINE_KEY_ENOENT) {
2312             ret = ENGINE_NOT_STORED;
2313         }
2314
2315         break;
2316
2317     default:
2318         ret = ENGINE_ENOTSUP;
2319     }
2320
2321     switch (ret) {
2322     case ENGINE_SUCCESS:
2323         ++stats.numOpsStore;
2324         break;
2325     case ENGINE_ENOMEM:
2326         ret = memoryCondition();
2327         break;
2328     case ENGINE_NOT_STORED:
2329     case ENGINE_NOT_MY_VBUCKET:
2330         if (isDegradedMode()) {
2331             return ENGINE_TMPFAIL;
2332         }
2333         break;
2334     default:
2335         break;
2336     }
2337
2338     return ret;
2339 }
2340
2341 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2342                                                            item **itm,
2343                                                            void **es,
2344                                                            uint16_t *nes,
2345                                                            uint8_t *ttl,
2346                                                            uint16_t *flags,
2347                                                            uint32_t *seqno,
2348                                                            uint16_t *vbucket,
2349                                                            TapProducer
2350                                                                    *connection,
2351                                                            bool &retry) {
2352     *es = NULL;
2353     *nes = 0;
2354     *ttl = (uint8_t)-1;
2355     *seqno = 0;
2356     *flags = 0;
2357     *vbucket = 0;
2358
2359     retry = false;
2360
2361     if (connection->shouldFlush()) {
2362         return TAP_FLUSH;
2363     }
2364
2365     if (connection->isTimeForNoop()) {
2366         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2367             connection->logHeader());
2368         return TAP_NOOP;
2369     }
2370
2371     if (connection->isSuspended() || connection->windowIsFull()) {
2372         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2373             " suspended state or its ack windows is full.\n",
2374             connection->logHeader());
2375         return TAP_PAUSE;
2376     }
2377
2378     uint16_t ret = TAP_PAUSE;
2379     VBucketEvent ev = connection->nextVBucketHighPriority();
2380     if (ev.event != TAP_PAUSE) {
2381         switch (ev.event) {
2382         case TAP_VBUCKET_SET:
2383             LOG(EXTENSION_LOG_WARNING,
2384                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2385                 connection->logHeader(), ev.vbucket,
2386                 VBucket::toString(ev.state));
2387             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2388             break;
2389         case TAP_OPAQUE:
2390             LOG(EXTENSION_LOG_WARNING,
2391                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2392                 connection->logHeader(),
2393                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2394                 ev.vbucket);
2395             connection->opaqueCommandCode = (uint32_t) ev.state;
2396             *vbucket = ev.vbucket;
2397             *es = &connection->opaqueCommandCode;
2398             *nes = sizeof(connection->opaqueCommandCode);
2399             break;
2400         default:
2401             LOG(EXTENSION_LOG_WARNING,
2402                 "%s Unknown VBucketEvent message type %d\n",
2403                 connection->logHeader(), ev.event);
2404             abort();
2405         }
2406         return ev.event;
2407     }
2408
2409     if (connection->waitForOpaqueMsgAck()) {
2410         return TAP_PAUSE;
2411     }
2412
2413     VBucketFilter backFillVBFilter;
2414     if (connection->runBackfill(backFillVBFilter)) {
2415         queueBackfill(backFillVBFilter, connection);
2416     }
2417
2418     uint8_t nru = INITIAL_NRU_VALUE;
2419     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2420     switch (ret) {
2421     case TAP_CHECKPOINT_START:
2422     case TAP_CHECKPOINT_END:
2423     case TAP_MUTATION:
2424     case TAP_DELETION:
2425         *itm = it;
2426         if (ret == TAP_MUTATION) {
2427             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2428                                                        it->getRevSeqno(), nru);
2429             *es = connection->specificData;
2430         } else if (ret == TAP_DELETION) {
2431             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2432                                                        it->getRevSeqno());
2433             *es = connection->specificData;
2434         } else if (ret == TAP_CHECKPOINT_START) {
2435             // Send the current value of the max deleted seqno
2436             RCPtr<VBucket> vb = getVBucket(*vbucket);
2437             if (!vb) {
2438                 retry = true;
2439                 return TAP_NOOP;
2440             }
2441             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2442                                                vb->ht.getMaxDeletedRevSeqno());
2443             *es = connection->specificData;
2444         }
2445         break;
2446     case TAP_NOOP:
2447         retry = true;
2448         break;
2449     default:
2450         break;
2451     }
2452
2453     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2454         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2455         if (vbev.event == TAP_VBUCKET_SET) {
2456             LOG(EXTENSION_LOG_WARNING,
2457                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2458                 connection->logHeader(), vbev.vbucket,
2459                 VBucket::toString(vbev.state));
2460             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2461         }
2462         ret = vbev.event;
2463     }
2464
2465     return ret;
2466 }
2467
2468 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2469                                                   item **itm,
2470                                                   void **es,
2471                                                   uint16_t *nes,
2472                                                   uint8_t *ttl,
2473                                                   uint16_t *flags,
2474                                                   uint32_t *seqno,
2475                                                   uint16_t *vbucket) {
2476     TapProducer *connection = getTapProducer(cookie);
2477     if (!connection) {
2478         LOG(EXTENSION_LOG_WARNING,
2479             "Failed to lookup TAP connection.. Disconnecting\n");
2480         return TAP_DISCONNECT;
2481     }
2482
2483     connection->setPaused(false);
2484
2485     bool retry = false;
2486     uint16_t ret;
2487
2488     connection->setLastWalkTime();
2489     do {
2490         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2491                              seqno, vbucket, connection, retry);
2492     } while (retry);
2493
2494     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2495         connection->lastMsgTime = ep_current_time();
2496         if (ret == TAP_NOOP) {
2497             *seqno = 0;
2498         } else {
2499             ++stats.numTapFetched;
2500             *seqno = connection->getSeqno();
2501             if (connection->requestAck(ret, *vbucket)) {
2502                 *flags = TAP_FLAG_ACK;
2503                 connection->seqnoAckRequested = *seqno;
2504             }
2505
2506             if (ret == TAP_MUTATION) {
2507                 if (connection->haveFlagByteorderSupport()) {
2508                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2509                 }
2510             }
2511         }
2512     } else {
2513         connection->setPaused(true);
2514         connection->setNotifySent(false);
2515     }
2516
2517     return ret;
2518 }
2519
2520 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2521                                                 std::string &client,
2522                                                 uint32_t flags,
2523                                                 const void *userdata,
2524                                                 size_t nuserdata) {
2525     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2526         return false;
2527     }
2528
2529     std::string tapName = "eq_tapq:";
2530     if (client.length() == 0) {
2531         tapName.assign(ConnHandler::getAnonName());
2532     } else {
2533         tapName.append(client);
2534     }
2535
2536     // Decoding the userdata section of the packet and update the filters
2537     const char *ptr = static_cast<const char*>(userdata);
2538     uint64_t backfillAge = 0;
2539     std::vector<uint16_t> vbuckets;
2540     std::map<uint16_t, uint64_t> lastCheckpointIds;
2541
2542     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2543         if (nuserdata < sizeof(backfillAge)) {
2544             LOG(EXTENSION_LOG_WARNING,
2545                 "Backfill age is missing. Reject connection request from %s\n",
2546                 tapName.c_str());
2547             return false;
2548         }
2549         // use memcpy to avoid alignemt issues
2550         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2551         backfillAge = ntohll(backfillAge);
2552         nuserdata -= sizeof(backfillAge);
2553         ptr += sizeof(backfillAge);
2554     }
2555
2556     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2557         uint16_t nvbuckets;
2558         if (nuserdata < sizeof(nvbuckets)) {
2559             LOG(EXTENSION_LOG_WARNING,
2560             "Number of vbuckets is missing. Reject connection request from %s"
2561             "\n", tapName.c_str());
2562             return false;
2563         }
2564         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2565         nuserdata -= sizeof(nvbuckets);
2566         ptr += sizeof(nvbuckets);
2567         nvbuckets = ntohs(nvbuckets);
2568         if (nvbuckets > 0) {
2569             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2570                 LOG(EXTENSION_LOG_WARNING,
2571                 "# of vbuckets not matched. Reject connection request from %s"
2572                 "\n", tapName.c_str());
2573                 return false;
2574             }
2575             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2576                 uint16_t val;
2577                 memcpy(&val, ptr, sizeof(nvbuckets));
2578                 ptr += sizeof(uint16_t);
2579                 vbuckets.push_back(ntohs(val));
2580             }
2581             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2582         }
2583     }
2584
2585     if (flags & TAP_CONNECT_CHECKPOINT) {
2586         uint16_t nCheckpoints = 0;
2587         if (nuserdata >= sizeof(nCheckpoints)) {
2588             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2589             nuserdata -= sizeof(nCheckpoints);
2590             ptr += sizeof(nCheckpoints);
2591             nCheckpoints = ntohs(nCheckpoints);
2592         }
2593         if (nCheckpoints > 0) {
2594             if (nuserdata <
2595                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2596                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2597                     "Reject connection request from %s\n", tapName.c_str());
2598                 return false;
2599             }
2600             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2601                 uint16_t vbid;
2602                 uint64_t checkpointId;
2603                 memcpy(&vbid, ptr, sizeof(vbid));
2604                 ptr += sizeof(uint16_t);
2605                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2606                 ptr += sizeof(uint64_t);
2607                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2608             }
2609             nuserdata -=
2610                         ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2611         }
2612     }
2613
2614     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2615                                  backfillAge,
2616                                  static_cast<int>(
2617                                  configuration.getTapKeepalive()),
2618                                  vbuckets,
2619                                  lastCheckpointIds);
2620
2621     tapConnMap->notifyPausedConnection(tp, true);
2622     return true;
2623 }
2624
2625 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2626                                                         void *engine_specific,
2627                                                         uint16_t nengine,
2628                                                         uint8_t ttl,
2629                                                         uint16_t tap_flags,
2630                                                         uint16_t tap_event,
2631                                                         uint32_t tap_seqno,
2632                                                         const void *key,
2633                                                         size_t nkey,
2634                                                         uint32_t flags,
2635                                                         uint32_t exptime,
2636                                                         uint64_t cas,
2637                                                         uint8_t datatype,
2638                                                         const void *data,
2639                                                         size_t ndata,
2640                                                         uint16_t vbucket)
2641 {
2642     (void) ttl;
2643     void *specific = getEngineSpecific(cookie);
2644     ConnHandler *connection = NULL;
2645     if (specific == NULL) {
2646         if (tap_event == TAP_ACK) {
2647             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2648                 "exist. Force disconnect...\n", (char *) cookie);
2649             // tap producer is no longer connected..
2650             return ENGINE_DISCONNECT;
2651         } else {
2652             connection = tapConnMap->newConsumer(cookie);
2653             if (connection == NULL) {
2654                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2655                     " Force disconnect\n");
2656                 return ENGINE_DISCONNECT;
2657             }
2658             storeEngineSpecific(cookie, connection);
2659         }
2660     } else {
2661         connection = reinterpret_cast<ConnHandler *>(specific);
2662     }
2663
2664     std::string k(static_cast<const char*>(key), nkey);
2665     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2666
2667     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2668         if (!tapThrottle->shouldProcess()) {
2669             ++stats.tapThrottled;
2670             if (connection->supportsAck()) {
2671                 ret = ENGINE_TMPFAIL;
2672             } else {
2673                 ret = ENGINE_DISCONNECT;
2674                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2675                     "ack support. Force disconnect...\n",
2676                     connection->logHeader());
2677             }
2678             return ret;
2679         }
2680     }
2681
2682     switch (tap_event) {
2683     case TAP_ACK:
2684         ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2685         break;
2686     case TAP_FLUSH:
2687         ret = flush(cookie, 0);
2688         LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2689             connection->logHeader());
2690         break;
2691     case TAP_DELETION:
2692         {
2693             uint64_t revSeqno;
2694             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2695                                                 nengine, &revSeqno);
2696
2697             ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2698                                        NULL, 0);
2699         }
2700         break;
2701
2702     case TAP_CHECKPOINT_START:
2703     case TAP_CHECKPOINT_END:
2704         {
2705             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2706             if (tc) {
2707                 if (tap_event == TAP_CHECKPOINT_START &&
2708                     nengine == TapEngineSpecific::sizeRevSeqno) {
2709                     // Set the current value for the max deleted seqno
2710                     RCPtr<VBucket> vb = getVBucket(vbucket);
2711                     if (!vb) {
2712                         return ENGINE_TMPFAIL;
2713                     }
2714                     uint64_t seqnum;
2715                     TapEngineSpecific::readSpecificData(tap_event,
2716                                                         engine_specific,
2717                                                         nengine,
2718                                                         &seqnum);
2719                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2720                 }
2721
2722                 if (data) {
2723                     uint64_t checkpointId;
2724                     memcpy(&checkpointId, data, sizeof(checkpointId));
2725                     checkpointId = ntohll(checkpointId);
2726                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2727                                           checkpointId);
2728                 }
2729                 else {
2730                     ret = ENGINE_DISCONNECT;
2731                     LOG(EXTENSION_LOG_WARNING,
2732                         "%s Checkpoint Id is missing in "
2733                         "CHECKPOINT messages. Force disconnect...\n",
2734                         connection->logHeader());
2735                 }
2736             }
2737             else {
2738                 ret = ENGINE_DISCONNECT;
2739                 LOG(EXTENSION_LOG_WARNING,
2740                     "%s not a consumer! Force disconnect\n",
2741                     connection->logHeader());
2742             }
2743         }
2744
2745         break;
2746
2747     case TAP_MUTATION:
2748         {
2749             uint8_t nru = INITIAL_NRU_VALUE;
2750             uint64_t revSeqno = 0;
2751             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2752                                                 nengine, &revSeqno, &nru);
2753
2754             if (!isDatatypeSupported(cookie)) {
2755                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2756                 const unsigned char *dat = (const unsigned char*)data;
2757                 const int datlen = ndata;
2758                 if (checkUTF8JSON(dat, datlen)) {
2759                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2760                 }
2761             }
2762             ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2763                                        flags, datatype, 0, 0, revSeqno, exptime,
2764                                        nru, NULL, 0);
2765         }
2766
2767         break;
2768
2769     case TAP_OPAQUE:
2770         if (nengine == sizeof(uint32_t)) {
2771             uint32_t cc;
2772             memcpy(&cc, engine_specific, sizeof(cc));
2773             cc = ntohl(cc);
2774
2775             switch (cc) {
2776             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2777                 // @todo: the memcached core will _ALWAYS_ send nack
2778                 //        if it encounter an error. This should be
2779                 // set as the default when we move to .next after 2.0
2780                 // (currently we need to allow the message for
2781                 // backwards compatibility)
2782                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2783                     connection->logHeader());
2784                 break;
2785             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2786                 connection->setSupportCheckpointSync(true);
2787                 LOG(EXTENSION_LOG_INFO,
2788                     "%s Enable checkpoint synchronization\n",
2789                     connection->logHeader());
2790                 break;
2791             case TAP_OPAQUE_OPEN_CHECKPOINT:
2792                 /**
2793                  * This event is only received by the TAP client that wants to
2794                  * get mutations from closed checkpoints only. At this time,
2795                  * only incremental backup client receives this event so that
2796                  * it can close the connection and reconnect later.
2797                  */
2798                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2799                     connection->logHeader());
2800                 break;
2801             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2802                 {
2803                     LOG(EXTENSION_LOG_INFO,
2804                         "%s Backfill started for vbucket %d.\n",
2805                         connection->logHeader(), vbucket);
2806                     BlockTimer timer(&stats.tapVbucketResetHisto);
2807                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2808                                                   ENGINE_DISCONNECT;
2809                     if (ret == ENGINE_DISCONNECT) {
2810                         LOG(EXTENSION_LOG_WARNING,
2811                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2812                             connection->logHeader(), vbucket);
2813                     } else {
2814                         LOG(EXTENSION_LOG_WARNING,
2815                          "%s Reset vbucket %d was completed succecssfully.\n",
2816                             connection->logHeader(), vbucket);
2817                     }
2818
2819                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2820                     if (tc) {
2821                         tc->setBackfillPhase(true, vbucket);
2822                     } else {
2823                         ret = ENGINE_DISCONNECT;
2824                         LOG(EXTENSION_LOG_WARNING,
2825                             "TAP consumer doesn't exists. Force disconnect\n");
2826                     }
2827                 }
2828                 break;
2829             case TAP_OPAQUE_CLOSE_BACKFILL:
2830                 {
2831                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2832                         connection->logHeader());
2833                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2834                     if (tc) {
2835                         tc->setBackfillPhase(false, vbucket);
2836                     } else {
2837                         ret = ENGINE_DISCONNECT;
2838                         LOG(EXTENSION_LOG_WARNING,
2839                             "%s not a consumer! Force disconnect\n",
2840                             connection->logHeader());
2841                     }
2842                 }
2843                 break;
2844             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2845                 /**
2846                  * This event is sent by the eVBucketMigrator to notify that
2847                  * the source node closes the tap replication stream and
2848                  * switches to TAKEOVER_VBUCKETS phase.
2849                  * This is just an informative message and doesn't require any
2850                  * action.
2851                  */
2852                 LOG(EXTENSION_LOG_INFO,
2853                 "%s Received close tap stream. Switching to takeover phase.\n",
2854                     connection->logHeader());
2855                 break;
2856             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2857                 /**
2858                  * This opaque message is just for notifying that the source
2859                  * node receives change_vbucket_filter request and processes
2860                  * it successfully.
2861                  */
2862                 LOG(EXTENSION_LOG_INFO,
2863                 "%s Notified that the source node changed a vbucket filter.\n",
2864                     connection->logHeader());
2865                 break;
2866             default:
2867                 LOG(EXTENSION_LOG_WARNING,
2868                     "%s Received an unknown opaque command\n",
2869                     connection->logHeader());
2870             }
2871         } else {
2872             LOG(EXTENSION_LOG_WARNING,
2873                 "%s Received tap opaque with unknown size %d\n",
2874                 connection->logHeader(), nengine);
2875         }
2876         break;
2877
2878     case TAP_VBUCKET_SET:
2879         {
2880             BlockTimer timer(&stats.tapVbucketSetHisto);
2881
2882             if (nengine != sizeof(vbucket_state_t)) {
2883                 // illegal datasize
2884                 LOG(EXTENSION_LOG_WARNING,
2885                     "%s Received TAP_VBUCKET_SET with illegal size."
2886                     " Force disconnect\n", connection->logHeader());
2887                 ret = ENGINE_DISCONNECT;
2888                 break;
2889             }
2890
2891             vbucket_state_t state;
2892             memcpy(&state, engine_specific, nengine);
2893             state = (vbucket_state_t)ntohl(state);
2894
2895             ret = connection->setVBucketState(0, vbucket, state);
2896         }
2897         break;
2898
2899     default:
2900         // Unknown command
2901         LOG(EXTENSION_LOG_WARNING,
2902             "%s Recieved bad opcode, ignoring message\n",
2903             connection->logHeader());
2904     }
2905
2906     connection->processedEvent(tap_event, ret);
2907     return ret;
2908 }
2909
2910 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2911                                                       TapConsumer *consumer,
2912                                                       uint8_t event,
2913                                                       uint16_t vbucket,
2914                                                       uint64_t checkpointId) {
2915     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2916
2917     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2918         getEpStore()->wakeUpFlusher();
2919         ret = ENGINE_SUCCESS;
2920     }
2921     else {
2922         ret = ENGINE_DISCONNECT;
2923         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2924             "checkpoint %llu. Force disconnect\n",
2925             consumer->logHeader(), checkpointId);
2926     }
2927
2928     return ret;
2929 }
2930
2931 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2932     TapProducer *rv =
2933         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2934     if (!(rv && rv->isConnected())) {
2935         LOG(EXTENSION_LOG_WARNING,
2936             "Walking a non-existent tap queue, disconnecting\n");
2937         return NULL;
2938     }
2939
2940     if (rv->doDisconnect()) {
2941         LOG(EXTENSION_LOG_WARNING,
2942             "%s Disconnecting pending connection\n", rv->logHeader());
2943         return NULL;
2944     }
2945     return rv;
2946 }
2947
2948 void EventuallyPersistentEngine::initializeEngineCallbacks() {
2949     // Register the callback
2950     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2951 }
2952
2953 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2954                                                             uint32_t seqno,
2955                                                             uint16_t status,
2956                                                             const std::string
2957                                                             &msg)
2958 {
2959     TapProducer *connection = getTapProducer(cookie);
2960     if (!connection) {
2961         LOG(EXTENSION_LOG_WARNING,
2962             "Unable to process tap ack. No producer found\n");
2963         return ENGINE_DISCONNECT;
2964     }
2965
2966     return connection->processAck(seqno, status, msg);
2967 }
2968
2969 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2970                                                              &backfillVBFilter,
2971                                                Producer *tc)
2972 {
2973     ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2974                                            backfillVBFilter);
2975     ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2976 }
2977
2978 bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2979     ++numVbucket;
2980     item_eviction_policy_t policy = engine.getEpStore()->
2981                                                        getItemEvictionPolicy();
2982     numItems += vb->getNumItems(policy);
2983     numTempItems += vb->getNumTempItems();
2984     nonResident += vb->getNumNonResidentItems(policy);
2985
2986     if (vb->getHighPriorityChkSize() > 0) {
2987         chkPersistRemaining++;
2988     }
2989
2990     fileSpaceUsed += vb->fileSpaceUsed;
2991     fileSize += vb->fileSize;
2992
2993     if (desired_state != vbucket_state_dead) {
2994         htMemory += vb->ht.memorySize();
2995         htItemMemory += vb->ht.getItemMemory();
2996         htCacheSize += vb->ht.cacheSize;
2997         numEjects += vb->ht.getNumEjects();
2998         numExpiredItems += vb->numExpiredItems;
2999         metaDataMemory += vb->ht.metaDataMemory;
3000         metaDataDisk += vb->metaDataDisk;
3001         opsCreate += vb->opsCreate;
3002         opsUpdate += vb->opsUpdate;
3003         opsDelete += vb->opsDelete;
3004         opsReject += vb->opsReject;
3005
3006         queueSize += vb->dirtyQueueSize;
3007         queueMemory += vb->dirtyQueueMem;
3008         queueFill += vb->dirtyQueueFill;
3009         queueDrain += vb->dirtyQueueDrain;
3010         queueAge += vb->getQueueAge();
3011         pendingWrites += vb->dirtyQueuePendingWrites;
3012     }
3013
3014     return false;
3015 }
3016
3017 /**
3018  * A container class holding VBucketCountVisitors to aggregate stats for
3019  * different vbucket states.
3020  */
3021 class VBucketCountAggregator : public VBucketVisitor  {
3022 public:
3023     bool visitBucket(RCPtr<VBucket> &vb)  {
3024         std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3025         it = visitorMap.find(vb->getState());
3026         if ( it != visitorMap.end() ) {
3027             it->second->visitBucket(vb);
3028         }
3029
3030         return false;
3031     }
3032
3033     void addVisitor(VBucketCountVisitor* visitor)  {
3034         visitorMap[visitor->getVBucketState()] = visitor;
3035     }
3036 private:
3037     std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
3038 };
3039
3040 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3041                                                            ADD_STAT add_stat) {
3042     VBucketCountAggregator aggregator;
3043
3044     VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
3045     aggregator.addVisitor(&activeCountVisitor);
3046
3047     VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
3048     aggregator.addVisitor(&replicaCountVisitor);
3049
3050     VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
3051     aggregator.addVisitor(&pendingCountVisitor);
3052
3053     VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
3054     aggregator.addVisitor(&deadCountVisitor);
3055
3056     epstore->visit(aggregator);
3057
3058     epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
3059                                       replicaCountVisitor.getMemResidentPer());
3060     tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
3061                                      replicaCountVisitor.getNumItems() +
3062                                      pendingCountVisitor.getNumItems());
3063
3064     configuration.addStats(add_stat, cookie);
3065
3066     EPStats &epstats = getEpStats();
3067     add_casted_stat("ep_version", VERSION, add_stat, cookie);
3068     add_casted_stat("ep_storage_age",
3069                     epstats.dirtyAge, add_stat, cookie);
3070     add_casted_stat("ep_storage_age_highwat",
3071                     epstats.dirtyAgeHighWat, add_stat, cookie);
3072     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3073                     add_stat, cookie);
3074
3075     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3076         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3077     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3078         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3079     }
3080
3081     add_casted_stat("ep_total_enqueued",
3082                     epstats.totalEnqueued, add_stat, cookie);
3083     add_casted_stat("ep_total_persisted",
3084                     epstats.totalPersisted, add_stat, cookie);
3085     add_casted_stat("ep_item_flush_failed",
3086                     epstats.flushFailed, add_stat, cookie);
3087     add_casted_stat("ep_item_commit_failed",
3088                     epstats.commitFailed, add_stat, cookie);
3089     add_casted_stat("ep_item_begin_failed",
3090                     epstats.beginFailed, add_stat, cookie);
3091     add_casted_stat("ep_expired_access", epstats.expired_access,
3092                     add_stat, cookie);
3093     add_casted_stat("ep_expired_pager", epstats.expired_pager,
3094                     add_stat, cookie);
3095     add_casted_stat("ep_item_flush_expired",
3096                     epstats.flushExpired, add_stat, cookie);
3097     add_casted_stat("ep_queue_size",
3098                     epstats.diskQueueSize, add_stat, cookie);
3099     add_casted_stat("ep_flusher_todo",
3100                     epstats.flusher_todo, add_stat, cookie);
3101     add_casted_stat("ep_uncommitted_items",
3102                     epstats.flusher_todo, add_stat, cookie);
3103     add_casted_stat("ep_diskqueue_items",
3104                     epstats.diskQueueSize, add_stat, cookie);
3105     add_casted_stat("ep_flusher_state",
3106                     epstore->getFlusher(0)->stateName(),
3107                     add_stat, cookie);
3108     add_casted_stat("ep_commit_num", epstats.flusherCommits,
3109                     add_stat, cookie);
3110     add_casted_stat("ep_commit_time",
3111                     epstats.commit_time, add_stat, cookie);
3112     add_casted_stat("ep_commit_time_total",
3113                     epstats.cumulativeCommitTime, add_stat, cookie);
3114     add_casted_stat("ep_vbucket_del",
3115                     epstats.vbucketDeletions, add_stat, cookie);
3116     add_casted_stat("ep_vbucket_del_fail",
3117                     epstats.vbucketDeletionFail, add_stat, cookie);
3118     add_casted_stat("ep_flush_duration_total",
3119                     epstats.cumulativeFlushTime, add_stat, cookie);
3120     add_casted_stat("ep_flush_all",
3121                     epstore->isFlushAllScheduled() ? "true" : "false",
3122                     add_stat, cookie);
3123     add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
3124                     cookie);
3125     add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
3126                     add_stat, cookie);
3127     add_casted_stat("curr_items_tot",
3128                     activeCountVisitor.getNumItems() +
3129                     replicaCountVisitor.getNumItems() +
3130                     pendingCountVisitor.getNumItems(),
3131                     add_stat, cookie);
3132     add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
3133                &n