Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / ep_engine.cc
1
2 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 /*
4  *     Copyright 2010 Couchbase, Inc
5  *
6  *   Licensed under the Apache License, Version 2.0 (the "License");
7  *   you may not use this file except in compliance with the License.
8  *   You may obtain a copy of the License at
9  *
10  *       http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *   Unless required by applicable law or agreed to in writing, software
13  *   distributed under the License is distributed on an "AS IS" BASIS,
14  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *   See the License for the specific language governing permissions and
16  *   limitations under the License.
17  */
18
19 #include "config.h"
20
21 #include <fcntl.h>
22 #include <memcached/engine.h>
23 #include <memcached/protocol_binary.h>
24 #include <memcached/util.h>
25 #include <platform/platform.h>
26 #include <stdarg.h>
27
28 #include <cstdio>
29 #include <cstring>
30 #include <fstream>
31 #include <iostream>
32 #include <limits>
33 #include <string>
34 #include <vector>
35
36 #include "backfill.h"
37 #include "ep_engine.h"
38 #include "failover-table.h"
39 #include "flusher.h"
40 #include "connmap.h"
41 #include "htresizer.h"
42 #include "memory_tracker.h"
43 #include "stats-info.h"
44 #define STATWRITER_NAMESPACE core_engine
45 #include "statwriter.h"
46 #undef STATWRITER_NAMESPACE
47 #include "tapthrottle.h"
48 #include "dcp-consumer.h"
49 #include "dcp-producer.h"
50 #include "warmup.h"
51
52 static ALLOCATOR_HOOKS_API *hooksApi;
53 static SERVER_LOG_API *loggerApi;
54
55 static size_t percentOf(size_t val, double percent) {
56     return static_cast<size_t>(static_cast<double>(val) * percent);
57 }
58
59 /**
60  * Helper function to avoid typing in the long cast all over the place
61  * @param handle pointer to the engine
62  * @return the engine as a class
63  */
64 static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
65 {
66     EventuallyPersistentEngine* ret;
67     ret = reinterpret_cast<EventuallyPersistentEngine*>(handle);
68     ObjectRegistry::onSwitchThread(ret);
69     return ret;
70 }
71
72 static inline void releaseHandle(ENGINE_HANDLE* handle) {
73     (void) handle;
74     ObjectRegistry::onSwitchThread(NULL);
75 }
76
77
78 /**
79  * Call the response callback and return the appropriate value so that
80  * the core knows what to do..
81  */
82 static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
83                                       uint16_t keylen,
84                                       const void *ext, uint8_t extlen,
85                                       const void *body, uint32_t bodylen,
86                                       uint8_t datatype, uint16_t status,
87                                       uint64_t cas, const void *cookie)
88 {
89     ENGINE_ERROR_CODE rv = ENGINE_FAILED;
90     EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
91     if (response(key, keylen, ext, extlen, body, bodylen, datatype,
92                  status, cas, cookie)) {
93         rv = ENGINE_SUCCESS;
94     }
95     ObjectRegistry::onSwitchThread(e);
96     return rv;
97 }
98
99 template <typename T>
100 static void validate(T v, T l, T h) {
101     if (v < l || v > h) {
102         throw std::runtime_error("Value out of range.");
103     }
104 }
105
106
107 static void checkNumeric(const char* str) {
108     int i = 0;
109     if (str[0] == '-') {
110         i++;
111     }
112     for (; str[i]; i++) {
113         using namespace std;
114         if (!isdigit(str[i])) {
115             throw std::runtime_error("Value is not numeric");
116         }
117     }
118 }
119
120 // The Engine API specifies C linkage for the functions..
121 extern "C" {
122
123     static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
124     {
125         engine_info* info = getHandle(handle)->getInfo();
126         releaseHandle(handle);
127         return info;
128     }
129
130     static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
131                                            const char* config_str)
132     {
133         ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
134         releaseHandle(handle);
135         return err_code;
136     }
137
138     static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
139     {
140         getHandle(handle)->destroy(force);
141         delete getHandle(handle);
142         releaseHandle(NULL);
143     }
144
145     static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
146                                              const void* cookie,
147                                              item **itm,
148                                              const void* key,
149                                              const size_t nkey,
150                                              const size_t nbytes,
151                                              const int flags,
152                                              const rel_time_t exptime,
153                                              uint8_t datatype)
154     {
155         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
156             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
157                     " (ItemAllocate)");
158             return ENGINE_EINVAL;
159         }
160         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie,
161                                                                      itm, key,
162                                                                      nkey,
163                                                                      nbytes,
164                                                                      flags,
165                                                                      exptime,
166                                                                      datatype);
167         releaseHandle(handle);
168         return err_code;
169     }
170
171     static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
172                                            const void* cookie,
173                                            const void* key,
174                                            const size_t nkey,
175                                            uint64_t* cas,
176                                            uint16_t vbucket,
177                                            mutation_descr_t *mut_info)
178     {
179         ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key,
180                                                                    nkey, cas,
181                                                                    vbucket, mut_info);
182         releaseHandle(handle);
183         return err_code;
184     }
185
186     static void EvpItemRelease(ENGINE_HANDLE* handle,
187                                const void *cookie,
188                                item* itm)
189     {
190         getHandle(handle)->itemRelease(cookie, itm);
191         releaseHandle(handle);
192     }
193
194     static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
195                                     const void* cookie,
196                                     item** itm,
197                                     const void* key,
198                                     const int nkey,
199                                     uint16_t vbucket)
200     {
201         ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key,
202                                                             nkey, vbucket, true);
203         releaseHandle(handle);
204         return err_code;
205     }
206
207     static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
208                                          const void* cookie,
209                                          const char* stat_key,
210                                          int nkey,
211                                          ADD_STAT add_stat)
212     {
213         ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie,
214                                                                  stat_key,
215                                                                  nkey,
216                                                                  add_stat);
217         releaseHandle(handle);
218         return err_code;
219     }
220
221     static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
222                                       const void *cookie,
223                                       item* itm,
224                                       uint64_t *cas,
225                                       ENGINE_STORE_OPERATION operation,
226                                       uint16_t vbucket)
227     {
228         ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas,
229                                                               operation,
230                                                               vbucket);
231         releaseHandle(handle);
232         return err_code;
233     }
234
235     static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
236                                            const void* cookie,
237                                            const void* key,
238                                            const int nkey,
239                                            const bool increment,
240                                            const bool create,
241                                            const uint64_t delta,
242                                            const uint64_t initial,
243                                            const rel_time_t exptime,
244                                            item **itm,
245                                            uint8_t datatype,
246                                            uint64_t *result,
247                                            uint16_t vbucket)
248     {
249         if (datatype > PROTOCOL_BINARY_DATATYPE_JSON) {
250             if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
251                 LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
252                         " (Arithmetic)");
253             } else {
254                 LOG(EXTENSION_LOG_WARNING, "Cannnot perform arithmetic "
255                     "operations on compressed data!");
256             }
257             return ENGINE_EINVAL;
258         }
259         ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key,
260                                                                 nkey,
261                                                                 increment,
262                                                                 create, delta,
263                                                                 initial,
264                                                                 exptime, itm,
265                                                                 datatype,
266                                                                 result,
267                                                                 vbucket);
268         releaseHandle(handle);
269         return ecode;
270     }
271
272     static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
273                                       const void* cookie, time_t when)
274     {
275         ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
276         releaseHandle(handle);
277         return err_code;
278     }
279
280     static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
281     {
282         getHandle(handle)->resetStats();
283         releaseHandle(handle);
284     }
285
286     static protocol_binary_response_status stopFlusher(
287                                                  EventuallyPersistentEngine *e,
288                                                  const char **msg,
289                                                  size_t *msg_size) {
290         return e->stopFlusher(msg, msg_size);
291     }
292
293     static protocol_binary_response_status startFlusher(
294                                                  EventuallyPersistentEngine *e,
295                                                  const char **msg,
296                                                  size_t *msg_size) {
297         return e->startFlusher(msg, msg_size);
298     }
299
300     static protocol_binary_response_status setTapParam(
301                                                  EventuallyPersistentEngine *e,
302                                                  const char *keyz,
303                                                  const char *valz,
304                                                  const char **msg, size_t *) {
305         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
306
307         try {
308             int v = atoi(valz);
309             if (strcmp(keyz, "tap_keepalive") == 0) {
310                 checkNumeric(valz);
311                 validate(v, 0, MAX_TAP_KEEP_ALIVE);
312                 e->setTapKeepAlive(static_cast<uint32_t>(v));
313             } else if (strcmp(keyz, "tap_throttle_threshold") == 0) {
314                 checkNumeric(valz);
315                 e->getConfiguration().setTapThrottleThreshold(v);
316             } else if (strcmp(keyz, "tap_throttle_queue_cap") == 0) {
317                 checkNumeric(valz);
318                 e->getConfiguration().setTapThrottleQueueCap(v);
319             } else if (strcmp(keyz, "tap_throttle_cap_pcnt") == 0) {
320                 checkNumeric(valz);
321                 e->getConfiguration().setTapThrottleCapPcnt(v);
322             } else {
323                 *msg = "Unknown config param";
324                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
325             }
326         } catch(std::runtime_error &) {
327             *msg = "Value out of range.";
328             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
329         }
330
331         return rv;
332     }
333
334     static protocol_binary_response_status setCheckpointParam(
335                                                  EventuallyPersistentEngine *e,
336                                                               const char *keyz,
337                                                               const char *valz,
338                                                               const char **msg,
339                                                               size_t *) {
340         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
341
342         try {
343             int v = atoi(valz);
344             if (strcmp(keyz, "chk_max_items") == 0) {
345                 checkNumeric(valz);
346                 validate(v, MIN_CHECKPOINT_ITEMS, MAX_CHECKPOINT_ITEMS);
347                 e->getConfiguration().setChkMaxItems(v);
348             } else if (strcmp(keyz, "chk_period") == 0) {
349                 checkNumeric(valz);
350                 validate(v, MIN_CHECKPOINT_PERIOD, MAX_CHECKPOINT_PERIOD);
351                 e->getConfiguration().setChkPeriod(v);
352             } else if (strcmp(keyz, "max_checkpoints") == 0) {
353                 checkNumeric(valz);
354                 validate(v, DEFAULT_MAX_CHECKPOINTS,
355                          MAX_CHECKPOINTS_UPPER_BOUND);
356                 e->getConfiguration().setMaxCheckpoints(v);
357             } else if (strcmp(keyz, "item_num_based_new_chk") == 0) {
358                 if (strcmp(valz, "true") == 0) {
359                     e->getConfiguration().setItemNumBasedNewChk(true);
360                 } else {
361                     e->getConfiguration().setItemNumBasedNewChk(false);
362                 }
363             } else if (strcmp(keyz, "keep_closed_chks") == 0) {
364                 if (strcmp(valz, "true") == 0) {
365                     e->getConfiguration().setKeepClosedChks(true);
366                 } else {
367                     e->getConfiguration().setKeepClosedChks(false);
368                 }
369             } else if (strcmp(keyz, "enable_chk_merge") == 0) {
370                 if (strcmp(valz, "true") == 0) {
371                     e->getConfiguration().setEnableChkMerge(true);
372                 } else {
373                     e->getConfiguration().setEnableChkMerge(false);
374                 }
375             } else {
376                 *msg = "Unknown config param";
377                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
378             }
379         } catch(std::runtime_error &) {
380             *msg = "Value out of range.";
381             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
382         }
383
384         return rv;
385     }
386
387     static protocol_binary_response_status setFlushParam(
388                                                  EventuallyPersistentEngine *e,
389                                                  const char *keyz,
390                                                  const char *valz,
391                                                  const char **msg,
392                                                  size_t *) {
393         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
394
395         // Handle the actual mutation.
396         try {
397             int v = atoi(valz);
398             if (strcmp(keyz, "bg_fetch_delay") == 0) {
399                 checkNumeric(valz);
400                 e->getConfiguration().setBgFetchDelay(v);
401             } else if (strcmp(keyz, "flushall_enabled") == 0) {
402                 if (strcmp(valz, "true") == 0) {
403                     e->getConfiguration().setFlushallEnabled(true);
404                 } else if(strcmp(valz, "false") == 0) {
405                     e->getConfiguration().setFlushallEnabled(false);
406                 } else {
407                     throw std::runtime_error("value out of range.");
408                 }
409             } else if (strcmp(keyz, "max_size") == 0) {
410                 char *ptr = NULL;
411                 checkNumeric(valz);
412                 uint64_t vsize = strtoull(valz, &ptr, 10);
413                 validate(vsize, static_cast<uint64_t>(0),
414                          std::numeric_limits<uint64_t>::max());
415                 e->getConfiguration().setMaxSize(vsize);
416                 e->getConfiguration().setMemLowWat(percentOf(vsize, 0.75));
417                 e->getConfiguration().setMemHighWat(percentOf(vsize, 0.85));
418             } else if (strcmp(keyz, "mem_low_wat") == 0) {
419                 char *ptr = NULL;
420                 checkNumeric(valz);
421                 uint64_t vsize = strtoull(valz, &ptr, 10);
422                 validate(vsize, static_cast<uint64_t>(0),
423                          std::numeric_limits<uint64_t>::max());
424                 e->getConfiguration().setMemLowWat(vsize);
425             } else if (strcmp(keyz, "mem_high_wat") == 0) {
426                 char *ptr = NULL;
427                 checkNumeric(valz);
428                 uint64_t vsize = strtoull(valz, &ptr, 10);
429                 validate(vsize, static_cast<uint64_t>(0),
430                          std::numeric_limits<uint64_t>::max());
431                 e->getConfiguration().setMemHighWat(vsize);
432             } else if (strcmp(keyz, "backfill_mem_threshold") == 0) {
433                 checkNumeric(valz);
434                 validate(v, 0, 100);
435                 e->getConfiguration().setBackfillMemThreshold(v);
436             } else if (strcmp(keyz, "compaction_exp_mem_threshold") == 0) {
437                 checkNumeric(valz);
438                 validate(v, 0, 100);
439                 e->getConfiguration().setCompactionExpMemThreshold(v);
440             } else if (strcmp(keyz, "mutation_mem_threshold") == 0) {
441                 checkNumeric(valz);
442                 validate(v, 0, 100);
443                 e->getConfiguration().setMutationMemThreshold(v);
444             } else if (strcmp(keyz, "timing_log") == 0) {
445                 EPStats &stats = e->getEpStats();
446                 std::ostream *old = stats.timingLog;
447                 stats.timingLog = NULL;
448                 delete old;
449                 if (strcmp(valz, "off") == 0) {
450                     LOG(EXTENSION_LOG_INFO, "Disabled timing log.");
451                 } else {
452                     std::ofstream *tmp(new std::ofstream(valz));
453                     if (tmp->good()) {
454                         LOG(EXTENSION_LOG_INFO,
455                             "Logging detailed timings to ``%s''.", valz);
456                         stats.timingLog = tmp;
457                     } else {
458                         LOG(EXTENSION_LOG_WARNING,
459                             "Error setting detailed timing log to ``%s'':  %s",
460                             valz, strerror(errno));
461                         delete tmp;
462                     }
463                 }
464             } else if (strcmp(keyz, "exp_pager_stime") == 0) {
465                 char *ptr = NULL;
466                 checkNumeric(valz);
467                 uint64_t vsize = strtoull(valz, &ptr, 10);
468                 validate(vsize, static_cast<uint64_t>(0),
469                          std::numeric_limits<uint64_t>::max());
470                 e->getConfiguration().setExpPagerStime((size_t)vsize);
471             } else if (strcmp(keyz, "access_scanner_enabled") == 0) {
472                 if (strcmp(valz, "true") == 0) {
473                     e->getConfiguration().setAccessScannerEnabled(true);
474                 } else if (strcmp(valz, "false") == 0) {
475                     e->getConfiguration().setAccessScannerEnabled(false);
476                 } else {
477                     throw std::runtime_error("Value expected: true/false.");
478                 }
479             } else if (strcmp(keyz, "alog_sleep_time") == 0) {
480                 checkNumeric(valz);
481                 e->getConfiguration().setAlogSleepTime(v);
482             } else if (strcmp(keyz, "alog_task_time") == 0) {
483                 checkNumeric(valz);
484                 e->getConfiguration().setAlogTaskTime(v);
485             } else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
486                 checkNumeric(valz);
487                 e->getConfiguration().setPagerActiveVbPcnt(v);
488             } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
489                 checkNumeric(valz);
490                 validate(v, 0, std::numeric_limits<int>::max());
491                 e->getConfiguration().setWarmupMinMemoryThreshold(v);
492             } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
493                 checkNumeric(valz);
494                 validate(v, 0, std::numeric_limits<int>::max());
495                 e->getConfiguration().setWarmupMinItemsThreshold(v);
496             } else if (strcmp(keyz, "max_num_readers") == 0) {
497                 checkNumeric(valz);
498                 validate(v, 0, std::numeric_limits<int>::max());
499                 e->getConfiguration().setMaxNumReaders(v);
500                 ExecutorPool::get()->setMaxReaders(v);
501             } else if (strcmp(keyz, "max_num_writers") == 0) {
502                 checkNumeric(valz);
503                 validate(v, 0, std::numeric_limits<int>::max());
504                 e->getConfiguration().setMaxNumWriters(v);
505                 ExecutorPool::get()->setMaxWriters(v);
506             } else if (strcmp(keyz, "max_num_auxio") == 0) {
507                 checkNumeric(valz);
508                 validate(v, 0, std::numeric_limits<int>::max());
509                 e->getConfiguration().setMaxNumAuxio(v);
510                 ExecutorPool::get()->setMaxAuxIO(v);
511             } else if (strcmp(keyz, "max_num_nonio") == 0) {
512                 checkNumeric(valz);
513                 validate(v, 0, std::numeric_limits<int>::max());
514                 e->getConfiguration().setMaxNumNonio(v);
515                 ExecutorPool::get()->setMaxNonIO(v);
516             } else if (strcmp(keyz, "bfilter_enabled") == 0) {
517                 if (strcmp(valz, "true") == 0) {
518                     e->getConfiguration().setBfilterEnabled(true);
519                 } else if (strcmp(valz, "false") == 0) {
520                     e->getConfiguration().setBfilterEnabled(false);
521                 } else {
522                     throw std::runtime_error("Value expected: true/false.");
523                 }
524             } else if (strcmp(keyz, "bfilter_residency_threshold") == 0) {
525                 float val = atof(valz);
526                 if (val >= 0.0 && val <= 1.0) {
527                     e->getConfiguration().setBfilterResidencyThreshold(val);
528                 } else {
529                     throw std::runtime_error("Value out of range [0.0-1.0].");
530                 }
531             } else if (strcmp(keyz, "defragmenter_enabled") == 0) {
532                 if (strcmp(valz, "true") == 0) {
533                     e->getConfiguration().setDefragmenterEnabled(true);
534                 } else {
535                     e->getConfiguration().setDefragmenterEnabled(false);
536                 }
537             } else if (strcmp(keyz, "defragmenter_interval") == 0) {
538                 checkNumeric(valz);
539                 validate(v, 1, std::numeric_limits<int>::max());
540                 e->getConfiguration().setDefragmenterInterval(v);
541             } else if (strcmp(keyz, "defragmenter_age_threshold") == 0) {
542                 checkNumeric(valz);
543                 validate(v, 0, std::numeric_limits<int>::max());
544                 e->getConfiguration().setDefragmenterAgeThreshold(v);
545             } else if (strcmp(keyz, "defragmenter_chunk_duration") == 0) {
546                 checkNumeric(valz);
547                 validate(v, 1, std::numeric_limits<int>::max());
548                 e->getConfiguration().setDefragmenterChunkDuration(v);
549             } else if (strcmp(keyz, "defragmenter_run") == 0) {
550                 e->runDefragmenterTask();
551             } else if (strcmp(keyz, "compaction_write_queue_cap") == 0) {
552                 checkNumeric(valz);
553                 validate(v, 1, std::numeric_limits<int>::max());
554                 e->getConfiguration().setCompactionWriteQueueCap(v);
555             } else {
556                 *msg = "Unknown config param";
557                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
558             }
559         } catch(std::runtime_error& ex) {
560             *msg = "Value out of range.";
561             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
562         }
563
564         return rv;
565     }
566
567     static protocol_binary_response_status setDcpParam(
568                                                     EventuallyPersistentEngine *e,
569                                                     const char *keyz,
570                                                     const char *valz,
571                                                     const char **msg,
572                                                     size_t *) {
573         protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
574         try {
575
576             if (strcmp(keyz, "dcp_consumer_process_buffered_messages_yield_limit") == 0) {
577                 size_t v = atoi(valz);
578                 checkNumeric(valz);
579                 validate(v, size_t(1), std::numeric_limits<size_t>::max());
580                 e->getConfiguration().setDcpConsumerProcessBufferedMessagesYieldLimit(v);
581             } else if (strcmp(keyz, "dcp_consumer_process_buffered_messages_batch_size") == 0) {
582                 size_t v = atoi(valz);
583                 checkNumeric(valz);
584                 validate(v, size_t(1), std::numeric_limits<size_t>::max());
585                 e->getConfiguration().setDcpConsumerProcessBufferedMessagesBatchSize(v);
586             } else {
587                 *msg = "Unknown config param";
588                 rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
589             }
590         } catch (std::runtime_error& ex) {
591             *msg = "Value out of range.";
592             rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
593         }
594
595         return rv;
596     }
597
598     static protocol_binary_response_status evictKey(
599                                                  EventuallyPersistentEngine *e,
600                                                  protocol_binary_request_header
601                                                                       *request,
602                                                  const char **msg,
603                                                  size_t *msg_size) {
604         protocol_binary_request_no_extras *req =
605             (protocol_binary_request_no_extras*)request;
606
607         char keyz[256];
608
609         // Read the key.
610         int keylen = ntohs(req->message.header.request.keylen);
611         if (keylen >= (int)sizeof(keyz)) {
612             *msg = "Key is too large.";
613             return PROTOCOL_BINARY_RESPONSE_EINVAL;
614         }
615         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
616         keyz[keylen] = 0x00;
617
618         uint16_t vbucket = ntohs(request->request.vbucket);
619
620         std::string key(keyz, keylen);
621
622         LOG(EXTENSION_LOG_DEBUG, "Manually evicting object with key %s\n",
623                 keyz);
624
625         protocol_binary_response_status rv = e->evictKey(key, vbucket, msg,
626                                                          msg_size);
627         if (rv == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET ||
628             rv == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT) {
629             if (e->isDegradedMode()) {
630                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
631             }
632         }
633         return rv;
634     }
635
636     static ENGINE_ERROR_CODE getLocked(EventuallyPersistentEngine *e,
637                                        protocol_binary_request_header *req,
638                                        const void *cookie,
639                                        Item **itm,
640                                        const char **msg,
641                                        size_t *,
642                                        protocol_binary_response_status *res) {
643
644         uint8_t extlen = req->request.extlen;
645         if (extlen != 0 && extlen != 4) {
646             *msg = "Invalid packet format (extlen may be 0 or 4)";
647             *res = PROTOCOL_BINARY_RESPONSE_EINVAL;
648             return ENGINE_EINVAL;
649         }
650
651         protocol_binary_request_getl *grequest =
652             (protocol_binary_request_getl*)req;
653         *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
654
655         const char *keyp = reinterpret_cast<const char*>(req->bytes);
656         keyp += sizeof(req->bytes) + extlen;
657         std::string key(keyp, ntohs(req->request.keylen));
658         uint16_t vbucket = ntohs(req->request.vbucket);
659
660         RememberingCallback<GetValue> getCb;
661         uint32_t max_timeout = (unsigned int)e->getGetlMaxTimeout();
662         uint32_t default_timeout = (unsigned int)e->getGetlDefaultTimeout();
663         uint32_t lockTimeout = default_timeout;
664         if (extlen == 4) {
665             lockTimeout = ntohl(grequest->message.body.expiration);
666         }
667
668         if (lockTimeout >  max_timeout || lockTimeout < 1) {
669             LOG(EXTENSION_LOG_WARNING,
670                 "Illegal value for lock timeout specified"
671                 " %u. Using default value: %u", lockTimeout, default_timeout);
672             lockTimeout = default_timeout;
673         }
674
675         bool gotLock = e->getLocked(key, vbucket, getCb,
676                                     ep_current_time(),
677                                     lockTimeout, cookie);
678
679         getCb.waitForValue();
680         ENGINE_ERROR_CODE rv = getCb.val.getStatus();
681
682         if (rv == ENGINE_SUCCESS) {
683             *itm = getCb.val.getValue();
684             ++(e->getEpStats().numOpsGet);
685         } else if (rv == ENGINE_EWOULDBLOCK) {
686
687             // need to wait for value
688             return rv;
689         } else if (rv == ENGINE_NOT_MY_VBUCKET) {
690             *msg = "That's not my bucket.";
691             *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
692             return ENGINE_NOT_MY_VBUCKET;
693         } else if (!gotLock){
694             *msg =  "LOCK_ERROR";
695             *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
696             return ENGINE_TMPFAIL;
697         } else {
698             if (e->isDegradedMode()) {
699                 *msg = "LOCK_TMP_ERROR";
700                 *res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
701                 return ENGINE_TMPFAIL;
702             }
703
704             *msg = "NOT_FOUND";
705             *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
706             return ENGINE_KEY_ENOENT;
707         }
708
709         return rv;
710     }
711
712     static protocol_binary_response_status unlockKey(
713                                                  EventuallyPersistentEngine *e,
714                                                  protocol_binary_request_header
715                                                                       *request,
716                                                  const char **msg,
717                                                  size_t *)
718     {
719         protocol_binary_request_no_extras *req =
720             (protocol_binary_request_no_extras*)request;
721
722         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
723         char keyz[256];
724
725         // Read the key.
726         int keylen = ntohs(req->message.header.request.keylen);
727         if (keylen >= (int)sizeof(keyz)) {
728             *msg = "Key is too large.";
729             return PROTOCOL_BINARY_RESPONSE_EINVAL;
730         }
731
732         memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
733         keyz[keylen] = 0x00;
734
735         uint16_t vbucket = ntohs(request->request.vbucket);
736         std::string key(keyz, keylen);
737
738         LOG(EXTENSION_LOG_DEBUG, "Executing unl for key %s\n", keyz);
739
740         RememberingCallback<GetValue> getCb;
741         uint64_t cas = ntohll(request->request.cas);
742
743         ENGINE_ERROR_CODE rv = e->unlockKey(key, vbucket, cas,
744                                             ep_current_time());
745
746         if (rv == ENGINE_SUCCESS) {
747             *msg = "UNLOCKED";
748         } else if (rv == ENGINE_TMPFAIL){
749             *msg =  "UNLOCK_ERROR";
750             res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
751         } else {
752             if (e->isDegradedMode()) {
753                 *msg = "LOCK_TMP_ERROR";
754                 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
755             }
756
757             RCPtr<VBucket> vb = e->getVBucket(vbucket);
758             if (!vb) {
759                 *msg = "That's not my bucket.";
760                 res =  PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
761             }
762             *msg = "NOT_FOUND";
763             res =  PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
764         }
765
766         return res;
767     }
768
769     static protocol_binary_response_status setParam(
770                                             EventuallyPersistentEngine *e,
771                                             protocol_binary_request_set_param
772                                                                      *req,
773                                             const char **msg,
774                                             size_t *msg_size) {
775
776         size_t keylen = ntohs(req->message.header.request.keylen);
777         uint8_t extlen = req->message.header.request.extlen;
778         size_t vallen = ntohl(req->message.header.request.bodylen);
779         protocol_binary_engine_param_t paramtype =
780             static_cast<protocol_binary_engine_param_t>(ntohl(req->message.body.param_type));
781
782         if (keylen == 0 || (vallen - keylen - extlen) == 0) {
783             return PROTOCOL_BINARY_RESPONSE_EINVAL;
784         }
785
786         const char *keyp = reinterpret_cast<const char*>(req->bytes)
787                            + sizeof(req->bytes);
788         const char *valuep = keyp + keylen;
789         vallen -= (keylen + extlen);
790
791         char keyz[128];
792         char valz[512];
793
794         // Read the key.
795         if (keylen >= sizeof(keyz)) {
796             *msg = "Key is too large.";
797             return PROTOCOL_BINARY_RESPONSE_EINVAL;
798         }
799         memcpy(keyz, keyp, keylen);
800         keyz[keylen] = 0x00;
801
802         // Read the value.
803         if (vallen >= sizeof(valz)) {
804             *msg = "Value is too large.";
805             return PROTOCOL_BINARY_RESPONSE_EINVAL;
806         }
807         memcpy(valz, valuep, vallen);
808         valz[vallen] = 0x00;
809
810         protocol_binary_response_status rv;
811
812         switch (paramtype) {
813         case protocol_binary_engine_param_flush:
814             rv = setFlushParam(e, keyz, valz, msg, msg_size);
815             break;
816         case protocol_binary_engine_param_tap:
817             rv = setTapParam(e, keyz, valz, msg, msg_size);
818             break;
819         case protocol_binary_engine_param_checkpoint:
820             rv = setCheckpointParam(e, keyz, valz, msg, msg_size);
821             break;
822         case protocol_binary_engine_param_dcp:
823             rv = setDcpParam(e, keyz, valz, msg, msg_size);
824             break;
825         default:
826             rv = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
827         }
828
829         return rv;
830     }
831
832     static ENGINE_ERROR_CODE getVBucket(EventuallyPersistentEngine *e,
833                                        const void *cookie,
834                                        protocol_binary_request_header *request,
835                                        ADD_RESPONSE response) {
836         protocol_binary_request_get_vbucket *req =
837             reinterpret_cast<protocol_binary_request_get_vbucket*>(request);
838         cb_assert(req);
839
840         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
841         RCPtr<VBucket> vb = e->getVBucket(vbucket);
842         if (!vb) {
843             LockHolder lh(e->clusterConfig.lock);
844             return sendResponse(response, NULL, 0, NULL, 0,
845                                 e->clusterConfig.config,
846                                 e->clusterConfig.len,
847                                 PROTOCOL_BINARY_RAW_BYTES,
848                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
849                                 cookie);
850         } else {
851             vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
852             return sendResponse(response, NULL, 0, NULL, 0, &state,
853                                 sizeof(state),
854                                 PROTOCOL_BINARY_RAW_BYTES,
855                                 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
856         }
857     }
858
859     static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
860                                        const void *cookie,
861                                        protocol_binary_request_header *request,
862                                        ADD_RESPONSE response) {
863
864         protocol_binary_request_set_vbucket *req =
865             reinterpret_cast<protocol_binary_request_set_vbucket*>(request);
866
867         uint64_t cas = ntohll(req->message.header.request.cas);
868
869         size_t bodylen = ntohl(req->message.header.request.bodylen)
870             - ntohs(req->message.header.request.keylen);
871         if (bodylen != sizeof(vbucket_state_t)) {
872             const std::string msg("Incorrect packet format");
873             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
874                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
875                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
876                                 cas, cookie);
877         }
878
879         vbucket_state_t state;
880         memcpy(&state, &req->message.body.state, sizeof(state));
881         state = static_cast<vbucket_state_t>(ntohl(state));
882
883         if (!is_valid_vbucket_state_t(state)) {
884             const std::string msg("Invalid vbucket state");
885             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
886                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
887                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
888                                 cas, cookie);
889         }
890
891         uint16_t vb = ntohs(req->message.header.request.vbucket);
892         if(e->setVBucketState(vb, state, false) == ENGINE_ERANGE) {
893             const std::string msg("VBucket number too big");
894             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
895                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
896                                 PROTOCOL_BINARY_RESPONSE_ERANGE,
897                                 cas, cookie);
898         }
899         return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
900                             PROTOCOL_BINARY_RAW_BYTES,
901                             PROTOCOL_BINARY_RESPONSE_SUCCESS,
902                             cas, cookie);
903     }
904
905     static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
906                                         const void *cookie,
907                                         protocol_binary_request_header *req,
908                                         ADD_RESPONSE response) {
909
910         uint64_t cas = ntohll(req->request.cas);
911
912         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
913         uint16_t vbucket = ntohs(req->request.vbucket);
914
915         std::string msg = "";
916         if (ntohs(req->request.keylen) > 0 || req->request.extlen > 0) {
917             msg = "Incorrect packet format.";
918             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
919                                 msg.length(),
920                                 PROTOCOL_BINARY_RAW_BYTES,
921                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
922         }
923
924         bool sync = false;
925         uint32_t bodylen = ntohl(req->request.bodylen);
926         if (bodylen > 0) {
927             const char* ptr = reinterpret_cast<const char*>(req->bytes) +
928                 sizeof(req->bytes);
929             if (bodylen == 7 && strncmp(ptr, "async=0", bodylen) == 0) {
930                 sync = true;
931             }
932         }
933
934         ENGINE_ERROR_CODE err;
935         void* es = e->getEngineSpecific(cookie);
936         if (sync) {
937             if (es == NULL) {
938                 err = e->deleteVBucket(vbucket, cookie);
939                 e->storeEngineSpecific(cookie, e);
940             } else {
941                 e->storeEngineSpecific(cookie, NULL);
942                 LOG(EXTENSION_LOG_INFO,
943                     "Completed sync deletion of vbucket %u",
944                     (unsigned)vbucket);
945                 err = ENGINE_SUCCESS;
946             }
947         } else {
948             err = e->deleteVBucket(vbucket);
949         }
950         switch (err) {
951         case ENGINE_SUCCESS:
952             LOG(EXTENSION_LOG_WARNING,
953                 "Deletion of vbucket %d was completed.", vbucket);
954             break;
955         case ENGINE_NOT_MY_VBUCKET:
956             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
957                 "because the vbucket doesn't exist!!!", vbucket);
958             res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
959             break;
960         case ENGINE_EINVAL:
961             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
962                 "because the vbucket is not in a dead state\n", vbucket);
963             msg = "Failed to delete vbucket.  Must be in the dead state.";
964             res = PROTOCOL_BINARY_RESPONSE_EINVAL;
965             break;
966         case ENGINE_EWOULDBLOCK:
967             LOG(EXTENSION_LOG_WARNING, "Request for vbucket %d deletion is in"
968                 " EWOULDBLOCK until the database file is removed from disk",
969                 vbucket);
970             e->storeEngineSpecific(cookie, req);
971             return ENGINE_EWOULDBLOCK;
972         default:
973             LOG(EXTENSION_LOG_WARNING, "Deletion of vbucket %d failed "
974                 "because of unknown reasons\n", vbucket);
975             msg = "Failed to delete vbucket.  Unknown reason.";
976             res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
977         }
978
979         if (err != ENGINE_NOT_MY_VBUCKET) {
980             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
981                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
982                                 res, cas, cookie);
983         } else {
984             LockHolder lh(e->clusterConfig.lock);
985             return sendResponse(response, NULL, 0, NULL, 0,
986                                 e->clusterConfig.config,
987                                 e->clusterConfig.len,
988                                 PROTOCOL_BINARY_RAW_BYTES,
989                                 res, cas, cookie);
990         }
991
992     }
993
994     static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
995                                        protocol_binary_request_header *request,
996                                        const void *cookie,
997                                        Item **it,
998                                        const char **msg,
999                                        protocol_binary_response_status *res) {
1000         EventuallyPersistentStore *eps = e->getEpStore();
1001         protocol_binary_request_no_extras *req =
1002             (protocol_binary_request_no_extras*)request;
1003         int keylen = ntohs(req->message.header.request.keylen);
1004         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1005         ENGINE_ERROR_CODE error_code;
1006         std::string keystr(((char *)request) + sizeof(req->message.header),
1007                             keylen);
1008
1009         GetValue rv(eps->getReplica(keystr, vbucket, cookie, true));
1010
1011         if ((error_code = rv.getStatus()) != ENGINE_SUCCESS) {
1012             if (error_code == ENGINE_NOT_MY_VBUCKET) {
1013                 *res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1014                 return error_code;
1015             } else if (error_code == ENGINE_TMPFAIL) {
1016                 *msg = "NOT_FOUND";
1017                 *res = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1018             } else {
1019                 return error_code;
1020             }
1021         } else {
1022             *it = rv.getValue();
1023             *res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1024         }
1025         ++(e->getEpStats().numOpsGet);
1026         return ENGINE_SUCCESS;
1027     }
1028
1029     static ENGINE_ERROR_CODE compactDB(EventuallyPersistentEngine *e,
1030                                        const void *cookie,
1031                                        protocol_binary_request_compact_db *req,
1032                                        ADD_RESPONSE response) {
1033
1034         std::string msg = "";
1035         protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
1036         compaction_ctx compactreq;
1037         uint16_t vbucket = ntohs(req->message.header.request.vbucket);
1038         uint64_t cas = ntohll(req->message.header.request.cas);
1039
1040         if (ntohs(req->message.header.request.keylen) > 0 ||
1041              req->message.header.request.extlen != 24) {
1042             LOG(EXTENSION_LOG_WARNING,
1043                     "Compaction of vbucket %d received bad ext/key len %d/%d.",
1044                     vbucket, req->message.header.request.extlen,
1045                     ntohs(req->message.header.request.keylen));
1046             msg = "Incorrect packet format.";
1047             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1048                                 msg.length(),
1049                                 PROTOCOL_BINARY_RAW_BYTES,
1050                                 PROTOCOL_BINARY_RESPONSE_EINVAL, cas, cookie);
1051         }
1052         EPStats &stats = e->getEpStats();
1053         compactreq.max_purged_seq = 0;
1054         compactreq.purge_before_ts = ntohll(req->message.body.purge_before_ts);
1055         compactreq.purge_before_seq =
1056                                     ntohll(req->message.body.purge_before_seq);
1057         compactreq.drop_deletes     = req->message.body.drop_deletes;
1058         compactreq.bfcb             = NULL;
1059
1060         ENGINE_ERROR_CODE err;
1061         void* es = e->getEngineSpecific(cookie);
1062         if (es == NULL) {
1063             ++stats.pendingCompactions;
1064             e->storeEngineSpecific(cookie, e);
1065             err = e->compactDB(vbucket, compactreq, cookie);
1066         } else {
1067             e->storeEngineSpecific(cookie, NULL);
1068             err = ENGINE_SUCCESS;
1069         }
1070
1071         switch (err) {
1072             case ENGINE_SUCCESS:
1073                 LOG(EXTENSION_LOG_INFO,
1074                     "Compaction of vbucket %d completed.", vbucket);
1075                 break;
1076             case ENGINE_NOT_MY_VBUCKET:
1077                 --stats.pendingCompactions;
1078                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1079                     "because the vbucket doesn't exist!!!", vbucket);
1080                 res = PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1081                 break;
1082             case ENGINE_EWOULDBLOCK:
1083                 LOG(EXTENSION_LOG_INFO, "Request to compact vbucket %d is "
1084                         "in EWOULDBLOCK state until the database file is "
1085                         "compacted on disk",
1086                         vbucket);
1087                 e->storeEngineSpecific(cookie, req);
1088                 return ENGINE_EWOULDBLOCK;
1089             case ENGINE_TMPFAIL:
1090                 LOG(EXTENSION_LOG_WARNING, "Request to compact vbucket %d hit"
1091                         " a temporary failure and may need to be retried",
1092                         vbucket);
1093                 msg = "Temporary failure in compacting vbucket.";
1094                 res = PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1095                 break;
1096             default:
1097                 --stats.pendingCompactions;
1098                 LOG(EXTENSION_LOG_WARNING, "Compaction of vbucket %d failed "
1099                     "because of unknown reasons\n", vbucket);
1100                 msg = "Failed to compact vbucket.  Unknown reason.";
1101                 res = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1102                 break;
1103         }
1104
1105         if (err != ENGINE_NOT_MY_VBUCKET) {
1106             return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(),
1107                                 msg.length(), PROTOCOL_BINARY_RAW_BYTES,
1108                                 res, cas, cookie);
1109         } else {
1110             LockHolder lh(e->clusterConfig.lock);
1111             return sendResponse(response, NULL, 0, NULL, 0,
1112                                 e->clusterConfig.config,
1113                                 e->clusterConfig.len,
1114                                 PROTOCOL_BINARY_RAW_BYTES,
1115                                 res, cas, cookie);
1116         }
1117     }
1118
1119     static ENGINE_ERROR_CODE processUnknownCommand(
1120                                        EventuallyPersistentEngine *h,
1121                                        const void* cookie,
1122                                        protocol_binary_request_header *request,
1123                                        ADD_RESPONSE response)
1124     {
1125         protocol_binary_response_status res =
1126                                       PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
1127         const char *msg = NULL;
1128         size_t msg_size = 0;
1129         Item *itm = NULL;
1130
1131         EPStats &stats = h->getEpStats();
1132         ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1133
1134         /**
1135          * Session validation
1136          * (For ns_server commands only)
1137          */
1138         switch (request->request.opcode) {
1139             case PROTOCOL_BINARY_CMD_SET_PARAM:
1140             case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1141             case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1142             case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1143             case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1144             case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1145             case PROTOCOL_BINARY_CMD_COMPACT_DB:
1146             {
1147                 if (h->getEngineSpecific(cookie) == NULL) {
1148                     uint64_t cas = ntohll(request->request.cas);
1149                     if (!h->validateSessionCas(cas)) {
1150                         const std::string message("Invalid session token");
1151                         return sendResponse(response, NULL, 0, NULL, 0,
1152                                             message.c_str(), message.length(),
1153                                             PROTOCOL_BINARY_RAW_BYTES,
1154                                             PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS,
1155                                             cas, cookie);
1156                     }
1157                 }
1158                 break;
1159             }
1160             default:
1161                 break;
1162         }
1163
1164         switch (request->request.opcode) {
1165         case PROTOCOL_BINARY_CMD_GET_ALL_VB_SEQNOS:
1166             return h->getAllVBucketSequenceNumbers(cookie, request, response);
1167
1168         case PROTOCOL_BINARY_CMD_GET_VBUCKET:
1169             {
1170                 BlockTimer timer(&stats.getVbucketCmdHisto);
1171                 rv = getVBucket(h, cookie, request, response);
1172                 return rv;
1173             }
1174         case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
1175             {
1176                 BlockTimer timer(&stats.delVbucketCmdHisto);
1177                 rv = delVBucket(h, cookie, request, response);
1178                 if (rv != ENGINE_EWOULDBLOCK) {
1179                     h->decrementSessionCtr();
1180                     h->storeEngineSpecific(cookie, NULL);
1181                 }
1182                 return rv;
1183             }
1184         case PROTOCOL_BINARY_CMD_SET_VBUCKET:
1185             {
1186                 BlockTimer timer(&stats.setVbucketCmdHisto);
1187                 rv = setVBucket(h, cookie, request, response);
1188                 h->decrementSessionCtr();
1189                 return rv;
1190             }
1191         case PROTOCOL_BINARY_CMD_TOUCH:
1192         case PROTOCOL_BINARY_CMD_GAT:
1193         case PROTOCOL_BINARY_CMD_GATQ:
1194             {
1195                 rv = h->touch(cookie, request, response);
1196                 return rv;
1197             }
1198         case PROTOCOL_BINARY_CMD_STOP_PERSISTENCE:
1199             res = stopFlusher(h, &msg, &msg_size);
1200             break;
1201         case PROTOCOL_BINARY_CMD_START_PERSISTENCE:
1202             res = startFlusher(h, &msg, &msg_size);
1203             break;
1204         case PROTOCOL_BINARY_CMD_SET_PARAM:
1205             res = setParam(h,
1206                   reinterpret_cast<protocol_binary_request_set_param*>(request),
1207                             &msg, &msg_size);
1208             h->decrementSessionCtr();
1209             break;
1210         case PROTOCOL_BINARY_CMD_EVICT_KEY:
1211             res = evictKey(h, request, &msg, &msg_size);
1212             break;
1213         case PROTOCOL_BINARY_CMD_GET_LOCKED:
1214             rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
1215             if (rv == ENGINE_EWOULDBLOCK) {
1216                 // we dont have the value for the item yet
1217                 return rv;
1218             }
1219             break;
1220         case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
1221             res = unlockKey(h, request, &msg, &msg_size);
1222             break;
1223         case PROTOCOL_BINARY_CMD_OBSERVE:
1224             return h->observe(cookie, request, response);
1225         case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
1226             return h->observe_seqno(cookie, request, response);
1227         case PROTOCOL_BINARY_CMD_DEREGISTER_TAP_CLIENT:
1228             {
1229                 rv = h->deregisterTapClient(cookie, request, response);
1230                 h->decrementSessionCtr();
1231                 return rv;
1232             }
1233         case PROTOCOL_BINARY_CMD_RESET_REPLICATION_CHAIN:
1234             {
1235                 rv = h->resetReplicationChain(cookie, request, response);
1236                 return rv;
1237             }
1238         case PROTOCOL_BINARY_CMD_CHANGE_VB_FILTER:
1239             {
1240                 rv = h->changeTapVBFilter(cookie, request, response);
1241                 h->decrementSessionCtr();
1242                 return rv;
1243             }
1244         case PROTOCOL_BINARY_CMD_LAST_CLOSED_CHECKPOINT:
1245         case PROTOCOL_BINARY_CMD_CREATE_CHECKPOINT:
1246         case PROTOCOL_BINARY_CMD_CHECKPOINT_PERSISTENCE:
1247             {
1248                 rv = h->handleCheckpointCmds(cookie, request, response);
1249                 return rv;
1250             }
1251         case PROTOCOL_BINARY_CMD_SEQNO_PERSISTENCE:
1252             {
1253                 rv = h->handleSeqnoCmds(cookie, request, response);
1254                 return rv;
1255             }
1256         case PROTOCOL_BINARY_CMD_GET_META:
1257         case PROTOCOL_BINARY_CMD_GETQ_META:
1258             {
1259                 rv = h->getMeta(cookie,
1260                         reinterpret_cast<protocol_binary_request_get_meta*>
1261                                                           (request), response);
1262                 return rv;
1263             }
1264         case PROTOCOL_BINARY_CMD_SET_WITH_META:
1265         case PROTOCOL_BINARY_CMD_SETQ_WITH_META:
1266         case PROTOCOL_BINARY_CMD_ADD_WITH_META:
1267         case PROTOCOL_BINARY_CMD_ADDQ_WITH_META:
1268             {
1269                 rv = h->setWithMeta(cookie,
1270                      reinterpret_cast<protocol_binary_request_set_with_meta*>
1271                                                           (request), response);
1272                 return rv;
1273             }
1274         case PROTOCOL_BINARY_CMD_DEL_WITH_META:
1275         case PROTOCOL_BINARY_CMD_DELQ_WITH_META:
1276             {
1277                 rv = h->deleteWithMeta(cookie,
1278                     reinterpret_cast<protocol_binary_request_delete_with_meta*>
1279                                                           (request), response);
1280                 return rv;
1281             }
1282         case PROTOCOL_BINARY_CMD_RETURN_META:
1283             {
1284                 return h->returnMeta(cookie,
1285                 reinterpret_cast<protocol_binary_request_return_meta*>
1286                                                           (request), response);
1287             }
1288         case PROTOCOL_BINARY_CMD_GET_REPLICA:
1289             rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
1290             if (rv != ENGINE_SUCCESS && rv != ENGINE_NOT_MY_VBUCKET) {
1291                 return rv;
1292             }
1293             break;
1294         case PROTOCOL_BINARY_CMD_ENABLE_TRAFFIC:
1295         case PROTOCOL_BINARY_CMD_DISABLE_TRAFFIC:
1296             {
1297                 rv = h->handleTrafficControlCmd(cookie, request, response);
1298                 return rv;
1299             }
1300         case PROTOCOL_BINARY_CMD_SET_CLUSTER_CONFIG:
1301             {
1302                 rv = h->setClusterConfig(cookie,
1303                  reinterpret_cast<protocol_binary_request_set_cluster_config*>
1304                                                           (request), response);
1305                 h->decrementSessionCtr();
1306                 return rv;
1307             }
1308         case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
1309             return h->getClusterConfig(cookie,
1310                reinterpret_cast<protocol_binary_request_get_cluster_config*>
1311                                                           (request), response);
1312         case PROTOCOL_BINARY_CMD_COMPACT_DB:
1313             {
1314                 rv = compactDB(h, cookie,
1315                                (protocol_binary_request_compact_db*)(request),
1316                                response);
1317                 if (rv != ENGINE_EWOULDBLOCK) {
1318                     h->decrementSessionCtr();
1319                     h->storeEngineSpecific(cookie, NULL);
1320                 }
1321                 return rv;
1322             }
1323         case PROTOCOL_BINARY_CMD_GET_RANDOM_KEY:
1324             {
1325                 if (request->request.extlen != 0 ||
1326                     request->request.keylen != 0 ||
1327                     request->request.bodylen != 0) {
1328                     return ENGINE_EINVAL;
1329                 }
1330                 return h->getRandomKey(cookie, response);
1331             }
1332         case CMD_GET_KEYS:
1333             {
1334                 return h->getAllKeys(cookie,
1335                    reinterpret_cast<protocol_binary_request_get_keys*>
1336                                                            (request), response);
1337             }
1338         case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
1339             {
1340                 return h->getAdjustedTime(cookie,
1341                    reinterpret_cast<protocol_binary_request_get_adjusted_time*>
1342                                                            (request), response);
1343             }
1344         case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE:
1345             {
1346                 return h->setDriftCounterState(cookie,
1347                 reinterpret_cast<protocol_binary_request_set_drift_counter_state*>
1348                                                            (request), response);
1349             }
1350         }
1351
1352         // Send a special response for getl since we don't want to send the key
1353         if (itm && request->request.opcode == PROTOCOL_BINARY_CMD_GET_LOCKED) {
1354             uint32_t flags = itm->getFlags();
1355             rv = sendResponse(response, NULL, 0, (const void *)&flags,
1356                               sizeof(uint32_t),
1357                               static_cast<const void *>(itm->getData()),
1358                               itm->getNBytes(), itm->getDataType(),
1359                               static_cast<uint16_t>(res), itm->getCas(),
1360                               cookie);
1361             delete itm;
1362         } else if (itm) {
1363             const std::string &key  = itm->getKey();
1364             uint32_t flags = itm->getFlags();
1365             rv = sendResponse(response, static_cast<const void *>(key.data()),
1366                               itm->getNKey(),
1367                               (const void *)&flags, sizeof(uint32_t),
1368                               static_cast<const void *>(itm->getData()),
1369                               itm->getNBytes(), itm->getDataType(),
1370                               static_cast<uint16_t>(res), itm->getCas(),
1371                               cookie);
1372             delete itm;
1373         } else  if (rv == ENGINE_NOT_MY_VBUCKET) {
1374             LockHolder lh(h->clusterConfig.lock);
1375             return sendResponse(response, NULL, 0, NULL, 0,
1376                                 h->clusterConfig.config,
1377                                 h->clusterConfig.len,
1378                                 PROTOCOL_BINARY_RAW_BYTES,
1379                                 PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0,
1380                                 cookie);
1381         } else {
1382             msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
1383             rv = sendResponse(response, NULL, 0, NULL, 0,
1384                               msg, static_cast<uint16_t>(msg_size),
1385                               PROTOCOL_BINARY_RAW_BYTES,
1386                               static_cast<uint16_t>(res), 0, cookie);
1387
1388         }
1389         return rv;
1390     }
1391
1392     static ENGINE_ERROR_CODE EvpUnknownCommand(ENGINE_HANDLE* handle,
1393                                                const void* cookie,
1394                                                protocol_binary_request_header
1395                                                                       *request,
1396                                                ADD_RESPONSE response)
1397     {
1398         ENGINE_ERROR_CODE err_code = processUnknownCommand(getHandle(handle),
1399                                                            cookie,
1400                                                            request, response);
1401         releaseHandle(handle);
1402         return err_code;
1403     }
1404
1405     static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
1406                               item *itm, uint64_t cas) {
1407         static_cast<Item*>(itm)->setCas(cas);
1408     }
1409
1410     static ENGINE_ERROR_CODE EvpTapNotify(ENGINE_HANDLE* handle,
1411                                           const void *cookie,
1412                                           void *engine_specific,
1413                                           uint16_t nengine,
1414                                           uint8_t ttl,
1415                                           uint16_t tap_flags,
1416                                           tap_event_t tap_event,
1417                                           uint32_t tap_seqno,
1418                                           const void *key,
1419                                           size_t nkey,
1420                                           uint32_t flags,
1421                                           uint32_t exptime,
1422                                           uint64_t cas,
1423                                           uint8_t datatype,
1424                                           const void *data,
1425                                           size_t ndata,
1426                                           uint16_t vbucket)
1427     {
1428         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1429             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1430                     " (TapNotify)");
1431             return ENGINE_EINVAL;
1432         }
1433         ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie,
1434                                                         engine_specific,
1435                                                         nengine, ttl,
1436                                                         tap_flags,
1437                                                         (uint16_t)tap_event,
1438                                                         tap_seqno,
1439                                                         key, nkey, flags,
1440                                                         exptime, cas,
1441                                                         datatype, data,
1442                                                         ndata, vbucket);
1443         releaseHandle(handle);
1444         return err_code;
1445     }
1446
1447     static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
1448                                       const void *cookie, item **itm,
1449                                       void **es, uint16_t *nes, uint8_t *ttl,
1450                                       uint16_t *flags, uint32_t *seqno,
1451                                       uint16_t *vbucket) {
1452         uint16_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es,
1453                                                              nes, ttl,
1454                                                              flags, seqno,
1455                                                              vbucket);
1456         releaseHandle(handle);
1457         return static_cast<tap_event_t>(tap_event);
1458     }
1459
1460     static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
1461                                           const void* cookie,
1462                                           const void* client,
1463                                           size_t nclient,
1464                                           uint32_t flags,
1465                                           const void* userdata,
1466                                           size_t nuserdata)
1467     {
1468         EventuallyPersistentEngine *h = getHandle(handle);
1469         TAP_ITERATOR iterator = NULL;
1470         {
1471             std::string c(static_cast<const char*>(client), nclient);
1472             // Figure out what we want from the userdata before adding it to
1473             // the API to the handle
1474             if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
1475                 iterator = EvpTapIterator;
1476             }
1477         }
1478         releaseHandle(handle);
1479         return iterator;
1480     }
1481
1482
1483     static ENGINE_ERROR_CODE EvpDcpStep(ENGINE_HANDLE* handle,
1484                                        const void* cookie,
1485                                        struct dcp_message_producers *producers)
1486     {
1487         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1488         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1489         if (conn) {
1490             errCode = conn->step(producers);
1491         }
1492         releaseHandle(handle);
1493         return errCode;
1494     }
1495
1496
1497     static ENGINE_ERROR_CODE EvpDcpOpen(ENGINE_HANDLE* handle,
1498                                         const void* cookie,
1499                                         uint32_t opaque,
1500                                         uint32_t seqno,
1501                                         uint32_t flags,
1502                                         void *name,
1503                                         uint16_t nname)
1504     {
1505         ENGINE_ERROR_CODE errCode;
1506         errCode = getHandle(handle)->dcpOpen(cookie, opaque, seqno, flags,
1507                                              name, nname);
1508         releaseHandle(handle);
1509         return errCode;
1510     }
1511
1512     static ENGINE_ERROR_CODE EvpDcpAddStream(ENGINE_HANDLE* handle,
1513                                              const void* cookie,
1514                                              uint32_t opaque,
1515                                              uint16_t vbucket,
1516                                              uint32_t flags)
1517     {
1518         ENGINE_ERROR_CODE errCode = getHandle(handle)->dcpAddStream(cookie,
1519                                                                     opaque,
1520                                                                     vbucket,
1521                                                                     flags);
1522         releaseHandle(handle);
1523         return errCode;
1524     }
1525
1526     static ENGINE_ERROR_CODE EvpDcpCloseStream(ENGINE_HANDLE* handle,
1527                                                const void* cookie,
1528                                                uint32_t opaque,
1529                                                uint16_t vbucket)
1530     {
1531         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1532         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1533         if (conn) {
1534             errCode = conn->closeStream(opaque, vbucket);
1535         }
1536         releaseHandle(handle);
1537         return errCode;
1538     }
1539
1540
1541     static ENGINE_ERROR_CODE EvpDcpStreamReq(ENGINE_HANDLE* handle,
1542                                              const void* cookie,
1543                                              uint32_t flags,
1544                                              uint32_t opaque,
1545                                              uint16_t vbucket,
1546                                              uint64_t startSeqno,
1547                                              uint64_t endSeqno,
1548                                              uint64_t vbucketUuid,
1549                                              uint64_t snapStartSeqno,
1550                                              uint64_t snapEndSeqno,
1551                                              uint64_t *rollbackSeqno,
1552                                              dcp_add_failover_log callback)
1553     {
1554         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1555         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1556         if (conn) {
1557             errCode = conn->streamRequest(flags, opaque, vbucket, startSeqno,
1558                                           endSeqno, vbucketUuid, snapStartSeqno,
1559                                           snapEndSeqno, rollbackSeqno, callback);
1560         }
1561         releaseHandle(handle);
1562         return errCode;
1563     }
1564
1565     static ENGINE_ERROR_CODE EvpDcpGetFailoverLog(ENGINE_HANDLE* handle,
1566                                                  const void* cookie,
1567                                                  uint32_t opaque,
1568                                                  uint16_t vbucket,
1569                                                  dcp_add_failover_log callback)
1570     {
1571         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1572         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1573         if (conn) {
1574             errCode = conn->getFailoverLog(opaque, vbucket, callback);
1575         }
1576         releaseHandle(handle);
1577         return errCode;
1578     }
1579
1580
1581     static ENGINE_ERROR_CODE EvpDcpStreamEnd(ENGINE_HANDLE* handle,
1582                                              const void* cookie,
1583                                              uint32_t opaque,
1584                                              uint16_t vbucket,
1585                                              uint32_t flags)
1586     {
1587         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1588         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1589         if (conn) {
1590             errCode = conn->streamEnd(opaque, vbucket, flags);
1591         }
1592         releaseHandle(handle);
1593         return errCode;
1594     }
1595
1596
1597     static ENGINE_ERROR_CODE EvpDcpSnapshotMarker(ENGINE_HANDLE* handle,
1598                                                   const void* cookie,
1599                                                   uint32_t opaque,
1600                                                   uint16_t vbucket,
1601                                                   uint64_t start_seqno,
1602                                                   uint64_t end_seqno,
1603                                                   uint32_t flags)
1604     {
1605         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1606         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1607         if (conn) {
1608             errCode = conn->snapshotMarker(opaque, vbucket, start_seqno,
1609                                            end_seqno, flags);
1610         }
1611         releaseHandle(handle);
1612         return errCode;
1613     }
1614
1615     static ENGINE_ERROR_CODE EvpDcpMutation(ENGINE_HANDLE* handle,
1616                                             const void* cookie,
1617                                             uint32_t opaque,
1618                                             const void *key,
1619                                             uint16_t nkey,
1620                                             const void *value,
1621                                             uint32_t nvalue,
1622                                             uint64_t cas,
1623                                             uint16_t vbucket,
1624                                             uint32_t flags,
1625                                             uint8_t datatype,
1626                                             uint64_t bySeqno,
1627                                             uint64_t revSeqno,
1628                                             uint32_t expiration,
1629                                             uint32_t lockTime,
1630                                             const void *meta,
1631                                             uint16_t nmeta,
1632                                             uint8_t nru)
1633     {
1634         if (datatype > PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
1635             LOG(EXTENSION_LOG_WARNING, "Invalid value for datatype "
1636                     " (DCPMutation)");
1637             return ENGINE_EINVAL;
1638         }
1639         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1640         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1641         if (conn) {
1642             errCode = conn->mutation(opaque, key, nkey, value, nvalue, cas,
1643                                      vbucket, flags, datatype, lockTime,
1644                                      bySeqno, revSeqno, expiration,
1645                                      nru, meta, nmeta);
1646         }
1647         releaseHandle(handle);
1648         return errCode;
1649     }
1650
1651     static ENGINE_ERROR_CODE EvpDcpDeletion(ENGINE_HANDLE* handle,
1652                                             const void* cookie,
1653                                             uint32_t opaque,
1654                                             const void *key,
1655                                             uint16_t nkey,
1656                                             uint64_t cas,
1657                                             uint16_t vbucket,
1658                                             uint64_t bySeqno,
1659                                             uint64_t revSeqno,
1660                                             const void *meta,
1661                                             uint16_t nmeta)
1662     {
1663         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1664         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1665         if (conn) {
1666             errCode = conn->deletion(opaque, key, nkey, cas, vbucket, bySeqno,
1667                                      revSeqno, meta, nmeta);
1668         }
1669         releaseHandle(handle);
1670         return errCode;
1671     }
1672
1673     static ENGINE_ERROR_CODE EvpDcpExpiration(ENGINE_HANDLE* handle,
1674                                               const void* cookie,
1675                                               uint32_t opaque,
1676                                               const void *key,
1677                                               uint16_t nkey,
1678                                               uint64_t cas,
1679                                               uint16_t vbucket,
1680                                               uint64_t bySeqno,
1681                                               uint64_t revSeqno,
1682                                               const void *meta,
1683                                               uint16_t nmeta)
1684     {
1685         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1686         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1687         if (conn) {
1688             errCode = conn->expiration(opaque, key, nkey, cas, vbucket, bySeqno,
1689                                        revSeqno, meta, nmeta);
1690         }
1691         releaseHandle(handle);
1692         return errCode;
1693     }
1694
1695     static ENGINE_ERROR_CODE EvpDcpFlush(ENGINE_HANDLE* handle,
1696                                          const void* cookie,
1697                                          uint32_t opaque,
1698                                          uint16_t vbucket)
1699     {
1700         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1701         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1702         if (conn) {
1703             errCode = conn->flushall(opaque, vbucket);
1704         }
1705         releaseHandle(handle);
1706         return errCode;
1707     }
1708
1709     static ENGINE_ERROR_CODE EvpDcpSetVbucketState(ENGINE_HANDLE* handle,
1710                                                    const void* cookie,
1711                                                    uint32_t opaque,
1712                                                    uint16_t vbucket,
1713                                                    vbucket_state_t state)
1714     {
1715         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1716         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1717         if (conn) {
1718             errCode = conn->setVBucketState(opaque, vbucket, state);
1719         }
1720         releaseHandle(handle);
1721         return errCode;
1722     }
1723
1724     static ENGINE_ERROR_CODE EvpDcpNoop(ENGINE_HANDLE* handle,
1725                                         const void* cookie,
1726                                         uint32_t opaque) {
1727         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1728         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1729         if (conn) {
1730             errCode = conn->noop(opaque);
1731         }
1732         releaseHandle(handle);
1733         return errCode;
1734     }
1735
1736     static ENGINE_ERROR_CODE EvpDcpBufferAcknowledgement(ENGINE_HANDLE* handle,
1737                                                          const void* cookie,
1738                                                          uint32_t opaque,
1739                                                          uint16_t vbucket,
1740                                                          uint32_t buffer_bytes) {
1741         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1742         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1743         if (conn) {
1744             errCode = conn->bufferAcknowledgement(opaque, vbucket,
1745                                                   buffer_bytes);
1746         }
1747         releaseHandle(handle);
1748         return errCode;
1749     }
1750
1751     static ENGINE_ERROR_CODE EvpDcpControl(ENGINE_HANDLE* handle,
1752                                            const void* cookie,
1753                                            uint32_t opaque,
1754                                            const void *key,
1755                                            uint16_t nkey,
1756                                            const void *value,
1757                                            uint32_t nvalue) {
1758         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1759         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1760         if (conn) {
1761             errCode = conn->control(opaque, key, nkey, value, nvalue);
1762         }
1763         releaseHandle(handle);
1764         return errCode;
1765     }
1766
1767     static ENGINE_ERROR_CODE EvpDcpResponseHandler(ENGINE_HANDLE* handle,
1768                                      const void* cookie,
1769                                      protocol_binary_response_header *response)
1770     {
1771         ENGINE_ERROR_CODE errCode = ENGINE_DISCONNECT;
1772         ConnHandler* conn = getHandle(handle)->getConnHandler(cookie);
1773         if (conn) {
1774             errCode = conn->handleResponse(response);
1775         }
1776         releaseHandle(handle);
1777         return errCode;
1778     }
1779
1780     static void EvpHandleDisconnect(const void *cookie,
1781                                     ENGINE_EVENT_TYPE type,
1782                                     const void *event_data,
1783                                     const void *cb_data)
1784     {
1785         cb_assert(type == ON_DISCONNECT);
1786         cb_assert(event_data == NULL);
1787         void *c = const_cast<void*>(cb_data);
1788         getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
1789         releaseHandle(static_cast<ENGINE_HANDLE*>(c));
1790     }
1791
1792
1793     /**
1794      * The only public interface to the eventually persistance engine.
1795      * Allocate a new instance and initialize it
1796      * @param interface the highest interface the server supports (we only
1797      *                  support interface 1)
1798      * @param get_server_api callback function to get the server exported API
1799      *                  functions
1800      * @param handle Where to return the new instance
1801      * @return ENGINE_SUCCESS on success
1802      */
1803     ENGINE_ERROR_CODE create_instance(uint64_t interface,
1804                                       GET_SERVER_API get_server_api,
1805                                       ENGINE_HANDLE **handle)
1806     {
1807         SERVER_HANDLE_V1 *api = get_server_api();
1808         if (interface != 1 || api == NULL) {
1809             return ENGINE_ENOTSUP;
1810         }
1811
1812         hooksApi = api->alloc_hooks;
1813         loggerApi = api->log;
1814         MemoryTracker::getInstance();
1815         ObjectRegistry::initialize(api->alloc_hooks->get_allocation_size);
1816
1817         AtomicValue<size_t>* inital_tracking = new AtomicValue<size_t>();
1818
1819         ObjectRegistry::setStats(inital_tracking);
1820         EventuallyPersistentEngine *engine;
1821         engine = new EventuallyPersistentEngine(get_server_api);
1822         ObjectRegistry::setStats(NULL);
1823
1824         if (engine == NULL) {
1825             return ENGINE_ENOMEM;
1826         }
1827
1828         if (MemoryTracker::trackingMemoryAllocations()) {
1829             engine->getEpStats().memoryTrackerEnabled.store(true);
1830             engine->getEpStats().totalMemory.store(inital_tracking->load());
1831         }
1832         delete inital_tracking;
1833
1834         ep_current_time = api->core->get_current_time;
1835         ep_abs_time = api->core->abstime;
1836         ep_reltime = api->core->realtime;
1837
1838         *handle = reinterpret_cast<ENGINE_HANDLE*> (engine);
1839
1840         return ENGINE_SUCCESS;
1841     }
1842
1843     static bool EvpGetItemInfo(ENGINE_HANDLE *handle, const void *,
1844                                const item* itm, item_info *itm_info)
1845     {
1846         const Item *it = reinterpret_cast<const Item*>(itm);
1847         EventuallyPersistentEngine *engine = getHandle(handle);
1848         if (itm_info->nvalue < 1) {
1849             return false;
1850         }
1851         itm_info->cas = it->getCas();
1852
1853         if (engine) {
1854             RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(it->getVBucketId());
1855
1856             if (vb) {
1857                 itm_info->vbucket_uuid = vb->failovers->getLatestUUID();
1858             } else {
1859                 itm_info->vbucket_uuid = 0;
1860             }
1861
1862             releaseHandle(handle);
1863         } else{
1864             itm_info->vbucket_uuid = 0;
1865         }
1866
1867         itm_info->seqno = it->getBySeqno();
1868         itm_info->exptime = it->getExptime();
1869         itm_info->nbytes = it->getNBytes();
1870         itm_info->datatype = it->getDataType();
1871         itm_info->flags = it->getFlags();
1872         itm_info->clsid = 0;
1873         itm_info->nkey = static_cast<uint16_t>(it->getNKey());
1874         itm_info->nvalue = 1;
1875         itm_info->key = it->getKey().c_str();
1876         itm_info->value[0].iov_base = const_cast<char*>(it->getData());
1877         itm_info->value[0].iov_len = it->getNBytes();
1878         return true;
1879     }
1880
1881     static bool EvpSetItemInfo(ENGINE_HANDLE* handle, const void* cookie,
1882                                item* itm, const item_info *itm_info)
1883     {
1884         Item *it = reinterpret_cast<Item*>(itm);
1885         if (!it) {
1886             return false;
1887         }
1888         it->setDataType(itm_info->datatype);
1889         return true;
1890     }
1891
1892     static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
1893                                                  const void* cookie,
1894                                                  engine_get_vb_map_cb callback)
1895     {
1896         EventuallyPersistentEngine *h = getHandle(handle);
1897         LockHolder lh(h->clusterConfig.lock);
1898         uint8_t *config = h->clusterConfig.config;
1899         uint32_t len = h->clusterConfig.len;
1900         releaseHandle(handle);
1901         return callback(cookie, config, len);
1902     }
1903
1904 } // C linkage
1905
1906 void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
1907     char buffer[2048];
1908
1909     if (loggerApi != NULL) {
1910         EXTENSION_LOGGER_DESCRIPTOR* logger =
1911             (EXTENSION_LOGGER_DESCRIPTOR*)loggerApi->get_logger();
1912
1913         if (loggerApi->get_level() <= severity) {
1914             EventuallyPersistentEngine *engine = ObjectRegistry::onSwitchThread(NULL, true);
1915             va_list va;
1916             va_start(va, fmt);
1917             vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
1918             if (engine) {
1919                 logger->log(severity, NULL, "(%s) %s", engine->getName(),
1920                             buffer);
1921             } else {
1922                 logger->log(severity, NULL, "(No Engine) %s", buffer);
1923             }
1924             va_end(va);
1925             ObjectRegistry::onSwitchThread(engine);
1926         }
1927     }
1928 }
1929
1930 ALLOCATOR_HOOKS_API *getHooksApi(void) {
1931     return hooksApi;
1932 }
1933
1934 EventuallyPersistentEngine::EventuallyPersistentEngine(
1935                                     GET_SERVER_API get_server_api) :
1936     clusterConfig(), epstore(NULL), workload(NULL),
1937     workloadPriority(NO_BUCKET_PRIORITY),
1938     tapThrottle(NULL), getServerApiFunc(get_server_api),
1939     dcpConnMap_(NULL), tapConnMap(NULL) ,tapConfig(NULL), checkpointConfig(NULL),
1940     trafficEnabled(false), flushAllEnabled(false), startupTime(0)
1941 {
1942     interface.interface = 1;
1943     ENGINE_HANDLE_V1::get_info = EvpGetInfo;
1944     ENGINE_HANDLE_V1::initialize = EvpInitialize;
1945     ENGINE_HANDLE_V1::destroy = EvpDestroy;
1946     ENGINE_HANDLE_V1::allocate = EvpItemAllocate;
1947     ENGINE_HANDLE_V1::remove = EvpItemDelete;
1948     ENGINE_HANDLE_V1::release = EvpItemRelease;
1949     ENGINE_HANDLE_V1::get = EvpGet;
1950     ENGINE_HANDLE_V1::get_stats = EvpGetStats;
1951     ENGINE_HANDLE_V1::reset_stats = EvpResetStats;
1952     ENGINE_HANDLE_V1::store = EvpStore;
1953     ENGINE_HANDLE_V1::arithmetic = EvpArithmetic;
1954     ENGINE_HANDLE_V1::flush = EvpFlush;
1955     ENGINE_HANDLE_V1::unknown_command = EvpUnknownCommand;
1956     ENGINE_HANDLE_V1::get_tap_iterator = EvpGetTapIterator;
1957     ENGINE_HANDLE_V1::tap_notify = EvpTapNotify;
1958     ENGINE_HANDLE_V1::item_set_cas = EvpItemSetCas;
1959     ENGINE_HANDLE_V1::get_item_info = EvpGetItemInfo;
1960     ENGINE_HANDLE_V1::set_item_info = EvpSetItemInfo;
1961     ENGINE_HANDLE_V1::get_engine_vb_map = EvpGetClusterConfig;
1962     ENGINE_HANDLE_V1::get_stats_struct = NULL;
1963     ENGINE_HANDLE_V1::aggregate_stats = NULL;
1964
1965
1966     ENGINE_HANDLE_V1::dcp.step = EvpDcpStep;
1967     ENGINE_HANDLE_V1::dcp.open = EvpDcpOpen;
1968     ENGINE_HANDLE_V1::dcp.add_stream = EvpDcpAddStream;
1969     ENGINE_HANDLE_V1::dcp.close_stream = EvpDcpCloseStream;
1970     ENGINE_HANDLE_V1::dcp.get_failover_log = EvpDcpGetFailoverLog;
1971     ENGINE_HANDLE_V1::dcp.stream_req = EvpDcpStreamReq;
1972     ENGINE_HANDLE_V1::dcp.stream_end = EvpDcpStreamEnd;
1973     ENGINE_HANDLE_V1::dcp.snapshot_marker = EvpDcpSnapshotMarker;
1974     ENGINE_HANDLE_V1::dcp.mutation = EvpDcpMutation;
1975     ENGINE_HANDLE_V1::dcp.deletion = EvpDcpDeletion;
1976     ENGINE_HANDLE_V1::dcp.expiration = EvpDcpExpiration;
1977     ENGINE_HANDLE_V1::dcp.flush = EvpDcpFlush;
1978     ENGINE_HANDLE_V1::dcp.set_vbucket_state = EvpDcpSetVbucketState;
1979     ENGINE_HANDLE_V1::dcp.noop = EvpDcpNoop;
1980     ENGINE_HANDLE_V1::dcp.buffer_acknowledgement = EvpDcpBufferAcknowledgement;
1981     ENGINE_HANDLE_V1::dcp.control = EvpDcpControl;
1982     ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
1983
1984     serverApi = getServerApiFunc();
1985     memset(&info, 0, sizeof(info));
1986     info.info.description = "EP engine v" VERSION;
1987     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_CAS;
1988     info.info.features[info.info.num_features++].feature =
1989                                              ENGINE_FEATURE_PERSISTENT_STORAGE;
1990     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_LRU;
1991     info.info.features[info.info.num_features++].feature = ENGINE_FEATURE_DATATYPE;
1992 }
1993
1994 ENGINE_ERROR_CODE EventuallyPersistentEngine::reserveCookie(const void *cookie)
1995 {
1996     EventuallyPersistentEngine *epe =
1997                                     ObjectRegistry::onSwitchThread(NULL, true);
1998     ENGINE_ERROR_CODE rv = serverApi->cookie->reserve(cookie);
1999     ObjectRegistry::onSwitchThread(epe);
2000     return rv;
2001 }
2002
2003 ENGINE_ERROR_CODE EventuallyPersistentEngine::releaseCookie(const void *cookie)
2004 {
2005     EventuallyPersistentEngine *epe =
2006                                     ObjectRegistry::onSwitchThread(NULL, true);
2007     ENGINE_ERROR_CODE rv = serverApi->cookie->release(cookie);
2008     ObjectRegistry::onSwitchThread(epe);
2009     return rv;
2010 }
2011
2012 void EventuallyPersistentEngine::registerEngineCallback(ENGINE_EVENT_TYPE type,
2013                                                         EVENT_CALLBACK cb,
2014                                                         const void *cb_data) {
2015     EventuallyPersistentEngine *epe =
2016                                     ObjectRegistry::onSwitchThread(NULL, true);
2017     SERVER_CALLBACK_API *sapi = getServerApi()->callback;
2018     sapi->register_callback(reinterpret_cast<ENGINE_HANDLE*>(this),
2019                             type, cb, cb_data);
2020     ObjectRegistry::onSwitchThread(epe);
2021 }
2022
2023 /**
2024  * A configuration value changed listener that responds to ep-engine
2025  * parameter changes by invoking engine-specific methods on
2026  * configuration change events.
2027  */
2028 class EpEngineValueChangeListener : public ValueChangedListener {
2029 public:
2030     EpEngineValueChangeListener(EventuallyPersistentEngine &e) : engine(e) {
2031         // EMPTY
2032     }
2033
2034     virtual void sizeValueChanged(const std::string &key, size_t value) {
2035         if (key.compare("getl_max_timeout") == 0) {
2036             engine.setGetlMaxTimeout(value);
2037         } else if (key.compare("getl_default_timeout") == 0) {
2038             engine.setGetlDefaultTimeout(value);
2039         } else if (key.compare("max_item_size") == 0) {
2040             engine.setMaxItemSize(value);
2041         }
2042     }
2043
2044     virtual void booleanValueChanged(const std::string &key, bool value) {
2045         if (key.compare("flushall_enabled") == 0) {
2046             engine.setFlushAll(value);
2047         }
2048     }
2049 private:
2050     EventuallyPersistentEngine &engine;
2051 };
2052
2053
2054
2055 ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
2056     resetStats();
2057     if (config != NULL) {
2058         if (!configuration.parseConfiguration(config, serverApi)) {
2059             return ENGINE_FAILED;
2060         }
2061     }
2062
2063     name = configuration.getCouchBucket();
2064     maxFailoverEntries = configuration.getMaxFailoverEntries();
2065
2066     // Start updating the variables from the config!
2067     HashTable::setDefaultNumBuckets(configuration.getHtSize());
2068     HashTable::setDefaultNumLocks(configuration.getHtLocks());
2069     StoredValue::setMutationMemoryThreshold(
2070                                       configuration.getMutationMemThreshold());
2071
2072     if (configuration.getMaxSize() == 0) {
2073         configuration.setMaxSize(std::numeric_limits<size_t>::max());
2074     }
2075
2076     if (configuration.getMemLowWat() == std::numeric_limits<size_t>::max()) {
2077         configuration.setMemLowWat(percentOf(
2078                                             configuration.getMaxSize(), 0.75));
2079     }
2080
2081     if (configuration.getMemHighWat() == std::numeric_limits<size_t>::max()) {
2082         configuration.setMemHighWat(percentOf(
2083                                             configuration.getMaxSize(), 0.85));
2084     }
2085
2086     maxItemSize = configuration.getMaxItemSize();
2087     configuration.addValueChangedListener("max_item_size",
2088                                        new EpEngineValueChangeListener(*this));
2089
2090     getlDefaultTimeout = configuration.getGetlDefaultTimeout();
2091     configuration.addValueChangedListener("getl_default_timeout",
2092                                        new EpEngineValueChangeListener(*this));
2093     getlMaxTimeout = configuration.getGetlMaxTimeout();
2094     configuration.addValueChangedListener("getl_max_timeout",
2095                                        new EpEngineValueChangeListener(*this));
2096
2097     flushAllEnabled = configuration.isFlushallEnabled();
2098     configuration.addValueChangedListener("flushall_enabled",
2099                                        new EpEngineValueChangeListener(*this));
2100
2101     workload = new WorkLoadPolicy(configuration.getMaxNumWorkers(),
2102                                   configuration.getMaxNumShards());
2103     if ((unsigned int)workload->getNumShards() >
2104                                               configuration.getMaxVbuckets()) {
2105         LOG(EXTENSION_LOG_WARNING, "Invalid configuration: Shards must be "
2106             "equal or less than max number of vbuckets");
2107         return ENGINE_FAILED;
2108     }
2109
2110     dcpConnMap_ = new DcpConnMap(*this);
2111     tapConnMap = new TapConnMap(*this);
2112     tapConfig = new TapConfig(*this);
2113     tapThrottle = new TapThrottle(configuration, stats);
2114     TapConfig::addConfigChangeListener(*this);
2115
2116     checkpointConfig = new CheckpointConfig(*this);
2117     CheckpointConfig::addConfigChangeListener(*this);
2118
2119     epstore = new EventuallyPersistentStore(*this);
2120     if (epstore == NULL) {
2121         return ENGINE_ENOMEM;
2122     }
2123
2124     // Register the callback
2125     registerEngineCallback(ON_DISCONNECT, EvpHandleDisconnect, this);
2126
2127     // Complete the initialization of the ep-store
2128     if (!epstore->initialize()) {
2129         return ENGINE_FAILED;
2130     }
2131
2132     if(configuration.isDataTrafficEnabled()) {
2133         enableTraffic(true);
2134     }
2135
2136     tapConnMap->initialize(TAP_CONN_NOTIFIER);
2137     dcpConnMap_->initialize(DCP_CONN_NOTIFIER);
2138
2139     // record engine initialization time
2140     startupTime.store(ep_real_time());
2141
2142     LOG(EXTENSION_LOG_DEBUG, "Engine init complete.\n");
2143
2144     return ENGINE_SUCCESS;
2145 }
2146
2147 void EventuallyPersistentEngine::destroy(bool force) {
2148     stats.forceShutdown = force;
2149     stats.isShutdown = true;
2150
2151     if (epstore) {
2152         epstore->snapshotStats();
2153     }
2154     if (tapConnMap) {
2155         tapConnMap->shutdownAllConnections();
2156     }
2157     if (dcpConnMap_) {
2158         dcpConnMap_->shutdownAllConnections();
2159     }
2160 }
2161
2162 ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie,
2163                                                     time_t when){
2164     if (!flushAllEnabled) {
2165         return ENGINE_ENOTSUP;
2166     }
2167
2168     if (!isDegradedMode()) {
2169         return ENGINE_TMPFAIL;
2170     }
2171
2172     /*
2173      * Supporting only a SYNC operation for bucket flush
2174      */
2175
2176     void* es = getEngineSpecific(cookie);
2177     if (es == NULL) {
2178
2179         // Check if diskFlushAll was false and set it to true
2180         // if yes, if the atomic variable weren't false, then
2181         // we will assume that a flushAll has been scheduled
2182         // already and return TMPFAIL.
2183         if (epstore->scheduleFlushAllTask(cookie, when)) {
2184             storeEngineSpecific(cookie, this);
2185             return ENGINE_EWOULDBLOCK;
2186         } else {
2187             LOG(EXTENSION_LOG_INFO, "Tried to trigger a bucket flush, but"
2188                     "there seems to be a task running already!");
2189             return ENGINE_TMPFAIL;
2190         }
2191
2192     } else {
2193         storeEngineSpecific(cookie, NULL);
2194         LOG(EXTENSION_LOG_WARNING, "Completed bucket flush operation");
2195         return ENGINE_SUCCESS;
2196     }
2197 }
2198
2199 ENGINE_ERROR_CODE EventuallyPersistentEngine::store(const void *cookie,
2200                                                     item* itm,
2201                                                     uint64_t *cas,
2202                                                     ENGINE_STORE_OPERATION
2203                                                                      operation,
2204                                                     uint16_t vbucket) {
2205     BlockTimer timer(&stats.storeCmdHisto);
2206     ENGINE_ERROR_CODE ret;
2207     Item *it = static_cast<Item*>(itm);
2208     item *i = NULL;
2209
2210     it->setVBucketId(vbucket);
2211
2212     switch (operation) {
2213     case OPERATION_CAS:
2214         if (it->getCas() == 0) {
2215             // Using a cas command with a cas wildcard doesn't make sense
2216             ret = ENGINE_NOT_STORED;
2217             break;
2218         }
2219         // FALLTHROUGH
2220     case OPERATION_SET:
2221         if (isDegradedMode()) {
2222             return ENGINE_TMPFAIL;
2223         }
2224         ret = epstore->set(*it, cookie);
2225         if (ret == ENGINE_SUCCESS) {
2226             *cas = it->getCas();
2227         }
2228
2229         break;
2230
2231     case OPERATION_ADD:
2232         if (isDegradedMode()) {
2233             return ENGINE_TMPFAIL;
2234         }
2235
2236         if (it->getCas() != 0) {
2237             // Adding an item with a cas value doesn't really make sense...
2238             return ENGINE_KEY_EEXISTS;
2239         }
2240
2241         ret = epstore->add(*it, cookie);
2242         if (ret == ENGINE_SUCCESS) {
2243             *cas = it->getCas();
2244         }
2245         break;
2246
2247     case OPERATION_REPLACE:
2248         ret = epstore->replace(*it, cookie);
2249         if (ret == ENGINE_SUCCESS) {
2250             *cas = it->getCas();
2251         }
2252         break;
2253
2254     case OPERATION_APPEND:
2255     case OPERATION_PREPEND:
2256         do {
2257             if ((ret = get(cookie, &i, it->getKey().c_str(),
2258                            it->getNKey(), vbucket)) == ENGINE_SUCCESS) {
2259                 Item *old = reinterpret_cast<Item*>(i);
2260
2261                 if (old->getCas() == (uint64_t) -1) {
2262                     // item is locked against updates
2263                     itemRelease(cookie, i);
2264                     return ENGINE_TMPFAIL;
2265                 }
2266
2267                 if (it->getCas() != 0 && old->getCas() != it->getCas()) {
2268                     itemRelease(cookie, i);
2269                     return ENGINE_KEY_EEXISTS;
2270                 }
2271
2272                 if (operation == OPERATION_APPEND) {
2273                     ret = old->append(*it, maxItemSize);
2274                 } else {
2275                     ret = old->prepend(*it, maxItemSize);
2276                 }
2277
2278                 if (ret != ENGINE_SUCCESS) {
2279                     itemRelease(cookie, i);
2280                     if (ret == ENGINE_E2BIG) {
2281                         return ret;
2282                     } else {
2283                         return memoryCondition();
2284                     }
2285                 } else {
2286                     if (old->getDataType() == PROTOCOL_BINARY_DATATYPE_JSON) {
2287                         // Set the datatype of the new document to BINARY (0),
2288                         // as appending/prepending anything to JSON breaks the
2289                         // json data structure.
2290                         old->setDataType(PROTOCOL_BINARY_RAW_BYTES);
2291                     } else if (old->getDataType() ==
2292                                     PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON) {
2293                         // Set the datatype of the new document to
2294                         // COMPRESSED_BINARY, as appending/prepending anything
2295                         // to JSON breaks the json data structure.
2296                         old->setDataType(PROTOCOL_BINARY_DATATYPE_COMPRESSED);
2297                     }
2298                 }
2299
2300                 ret = store(cookie, old, cas, OPERATION_CAS, vbucket);
2301
2302                 it->setBySeqno(old->getBySeqno());
2303                 itemRelease(cookie, i);
2304             }
2305         } while (ret == ENGINE_KEY_EEXISTS);
2306
2307         // Map the error code back to what memcapable expects
2308         if (ret == ENGINE_KEY_ENOENT) {
2309             ret = ENGINE_NOT_STORED;
2310         }
2311
2312         break;
2313
2314     default:
2315         ret = ENGINE_ENOTSUP;
2316     }
2317
2318     switch (ret) {
2319     case ENGINE_SUCCESS:
2320         ++stats.numOpsStore;
2321         break;
2322     case ENGINE_ENOMEM:
2323         ret = memoryCondition();
2324         break;
2325     case ENGINE_NOT_STORED:
2326     case ENGINE_NOT_MY_VBUCKET:
2327         if (isDegradedMode()) {
2328             return ENGINE_TMPFAIL;
2329         }
2330         break;
2331     default:
2332         break;
2333     }
2334
2335     return ret;
2336 }
2337
2338 inline uint16_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie,
2339                                                            item **itm,
2340                                                            void **es,
2341                                                            uint16_t *nes,
2342                                                            uint8_t *ttl,
2343                                                            uint16_t *flags,
2344                                                            uint32_t *seqno,
2345                                                            uint16_t *vbucket,
2346                                                            TapProducer
2347                                                                    *connection,
2348                                                            bool &retry) {
2349     *es = NULL;
2350     *nes = 0;
2351     *ttl = (uint8_t)-1;
2352     *seqno = 0;
2353     *flags = 0;
2354     *vbucket = 0;
2355
2356     retry = false;
2357
2358     if (connection->shouldFlush()) {
2359         return TAP_FLUSH;
2360     }
2361
2362     if (connection->isTimeForNoop()) {
2363         LOG(EXTENSION_LOG_INFO, "%s Sending a NOOP message.\n",
2364             connection->logHeader());
2365         return TAP_NOOP;
2366     }
2367
2368     if (connection->isSuspended() || connection->windowIsFull()) {
2369         LOG(EXTENSION_LOG_INFO, "%s Connection in pause state because it is in"
2370             " suspended state or its ack windows is full.\n",
2371             connection->logHeader());
2372         return TAP_PAUSE;
2373     }
2374
2375     uint16_t ret = TAP_PAUSE;
2376     VBucketEvent ev = connection->nextVBucketHighPriority();
2377     if (ev.event != TAP_PAUSE) {
2378         switch (ev.event) {
2379         case TAP_VBUCKET_SET:
2380             LOG(EXTENSION_LOG_WARNING,
2381                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2382                 connection->logHeader(), ev.vbucket,
2383                 VBucket::toString(ev.state));
2384             connection->encodeVBucketStateTransition(ev, es, nes, vbucket);
2385             break;
2386         case TAP_OPAQUE:
2387             LOG(EXTENSION_LOG_WARNING,
2388                 "%s Sending TAP_OPAQUE with command \"%s\" and vbucket %d\n",
2389                 connection->logHeader(),
2390                 TapProducer::opaqueCmdToString(ntohl((uint32_t) ev.state)),
2391                 ev.vbucket);
2392             connection->opaqueCommandCode = (uint32_t) ev.state;
2393             *vbucket = ev.vbucket;
2394             *es = &connection->opaqueCommandCode;
2395             *nes = sizeof(connection->opaqueCommandCode);
2396             break;
2397         default:
2398             LOG(EXTENSION_LOG_WARNING,
2399                 "%s Unknown VBucketEvent message type %d\n",
2400                 connection->logHeader(), ev.event);
2401             abort();
2402         }
2403         return ev.event;
2404     }
2405
2406     if (connection->waitForOpaqueMsgAck()) {
2407         return TAP_PAUSE;
2408     }
2409
2410     VBucketFilter backFillVBFilter;
2411     if (connection->runBackfill(backFillVBFilter)) {
2412         queueBackfill(backFillVBFilter, connection);
2413     }
2414
2415     uint8_t nru = INITIAL_NRU_VALUE;
2416     Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
2417     switch (ret) {
2418     case TAP_CHECKPOINT_START:
2419     case TAP_CHECKPOINT_END:
2420     case TAP_MUTATION:
2421     case TAP_DELETION:
2422         *itm = it;
2423         if (ret == TAP_MUTATION) {
2424             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2425                                                        it->getRevSeqno(), nru);
2426             *es = connection->specificData;
2427         } else if (ret == TAP_DELETION) {
2428             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2429                                                        it->getRevSeqno());
2430             *es = connection->specificData;
2431         } else if (ret == TAP_CHECKPOINT_START) {
2432             // Send the current value of the max deleted seqno
2433             RCPtr<VBucket> vb = getVBucket(*vbucket);
2434             if (!vb) {
2435                 retry = true;
2436                 return TAP_NOOP;
2437             }
2438             *nes = TapEngineSpecific::packSpecificData(ret, connection,
2439                                                vb->ht.getMaxDeletedRevSeqno());
2440             *es = connection->specificData;
2441         }
2442         break;
2443     case TAP_NOOP:
2444         retry = true;
2445         break;
2446     default:
2447         break;
2448     }
2449
2450     if (ret == TAP_PAUSE && (connection->dumpQueue || connection->doTakeOver)){
2451         VBucketEvent vbev = connection->checkDumpOrTakeOverCompletion();
2452         if (vbev.event == TAP_VBUCKET_SET) {
2453             LOG(EXTENSION_LOG_WARNING,
2454                "%s Sending TAP_VBUCKET_SET with vbucket %d and state \"%s\"\n",
2455                 connection->logHeader(), vbev.vbucket,
2456                 VBucket::toString(vbev.state));
2457             connection->encodeVBucketStateTransition(vbev, es, nes, vbucket);
2458         }
2459         ret = vbev.event;
2460     }
2461
2462     return ret;
2463 }
2464
2465 uint16_t EventuallyPersistentEngine::walkTapQueue(const void *cookie,
2466                                                   item **itm,
2467                                                   void **es,
2468                                                   uint16_t *nes,
2469                                                   uint8_t *ttl,
2470                                                   uint16_t *flags,
2471                                                   uint32_t *seqno,
2472                                                   uint16_t *vbucket) {
2473     TapProducer *connection = getTapProducer(cookie);
2474     if (!connection) {
2475         LOG(EXTENSION_LOG_WARNING,
2476             "Failed to lookup TAP connection.. Disconnecting\n");
2477         return TAP_DISCONNECT;
2478     }
2479
2480     connection->setPaused(false);
2481
2482     bool retry = false;
2483     uint16_t ret;
2484
2485     connection->setLastWalkTime();
2486     do {
2487         ret = doWalkTapQueue(cookie, itm, es, nes, ttl, flags,
2488                              seqno, vbucket, connection, retry);
2489     } while (retry);
2490
2491     if (ret != TAP_PAUSE && ret != TAP_DISCONNECT) {
2492         connection->lastMsgTime = ep_current_time();
2493         if (ret == TAP_NOOP) {
2494             *seqno = 0;
2495         } else {
2496             ++stats.numTapFetched;
2497             *seqno = connection->getSeqno();
2498             if (connection->requestAck(ret, *vbucket)) {
2499                 *flags = TAP_FLAG_ACK;
2500                 connection->seqnoAckRequested = *seqno;
2501             }
2502
2503             if (ret == TAP_MUTATION) {
2504                 if (connection->haveFlagByteorderSupport()) {
2505                     *flags |= TAP_FLAG_NETWORK_BYTE_ORDER;
2506                 }
2507             }
2508         }
2509     } else {
2510         connection->setPaused(true);
2511         connection->setNotifySent(false);
2512     }
2513
2514     return ret;
2515 }
2516
2517 bool EventuallyPersistentEngine::createTapQueue(const void *cookie,
2518                                                 std::string &client,
2519                                                 uint32_t flags,
2520                                                 const void *userdata,
2521                                                 size_t nuserdata) {
2522     if (reserveCookie(cookie) != ENGINE_SUCCESS) {
2523         return false;
2524     }
2525
2526     std::string tapName = "eq_tapq:";
2527     if (client.length() == 0) {
2528         tapName.assign(ConnHandler::getAnonName());
2529     } else {
2530         tapName.append(client);
2531     }
2532
2533     // Decoding the userdata section of the packet and update the filters
2534     const char *ptr = static_cast<const char*>(userdata);
2535     uint64_t backfillAge = 0;
2536     std::vector<uint16_t> vbuckets;
2537     std::map<uint16_t, uint64_t> lastCheckpointIds;
2538
2539     if (flags & TAP_CONNECT_FLAG_BACKFILL) { /* */
2540         if (nuserdata < sizeof(backfillAge)) {
2541             LOG(EXTENSION_LOG_WARNING,
2542                 "Backfill age is missing. Reject connection request from %s\n",
2543                 tapName.c_str());
2544             return false;
2545         }
2546         // use memcpy to avoid alignemt issues
2547         memcpy(&backfillAge, ptr, sizeof(backfillAge));
2548         backfillAge = ntohll(backfillAge);
2549         nuserdata -= sizeof(backfillAge);
2550         ptr += sizeof(backfillAge);
2551     }
2552
2553     if (flags & TAP_CONNECT_FLAG_LIST_VBUCKETS) {
2554         uint16_t nvbuckets;
2555         if (nuserdata < sizeof(nvbuckets)) {
2556             LOG(EXTENSION_LOG_WARNING,
2557             "Number of vbuckets is missing. Reject connection request from %s"
2558             "\n", tapName.c_str());
2559             return false;
2560         }
2561         memcpy(&nvbuckets, ptr, sizeof(nvbuckets));
2562         nuserdata -= sizeof(nvbuckets);
2563         ptr += sizeof(nvbuckets);
2564         nvbuckets = ntohs(nvbuckets);
2565         if (nvbuckets > 0) {
2566             if (nuserdata < (sizeof(uint16_t) * nvbuckets)) {
2567                 LOG(EXTENSION_LOG_WARNING,
2568                 "# of vbuckets not matched. Reject connection request from %s"
2569                 "\n", tapName.c_str());
2570                 return false;
2571             }
2572             for (uint16_t ii = 0; ii < nvbuckets; ++ii) {
2573                 uint16_t val;
2574                 memcpy(&val, ptr, sizeof(nvbuckets));
2575                 ptr += sizeof(uint16_t);
2576                 vbuckets.push_back(ntohs(val));
2577             }
2578             nuserdata -= (sizeof(uint16_t) * nvbuckets);
2579         }
2580     }
2581
2582     if (flags & TAP_CONNECT_CHECKPOINT) {
2583         uint16_t nCheckpoints = 0;
2584         if (nuserdata >= sizeof(nCheckpoints)) {
2585             memcpy(&nCheckpoints, ptr, sizeof(nCheckpoints));
2586             nuserdata -= sizeof(nCheckpoints);
2587             ptr += sizeof(nCheckpoints);
2588             nCheckpoints = ntohs(nCheckpoints);
2589         }
2590         if (nCheckpoints > 0) {
2591             if (nuserdata <
2592                 ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints)) {
2593                 LOG(EXTENSION_LOG_WARNING, "# of checkpoint Ids not matched. "
2594                     "Reject connection request from %s\n", tapName.c_str());
2595                 return false;
2596             }
2597             for (uint16_t j = 0; j < nCheckpoints; ++j) {
2598                 uint16_t vbid;
2599                 uint64_t checkpointId;
2600                 memcpy(&vbid, ptr, sizeof(vbid));
2601                 ptr += sizeof(uint16_t);
2602                 memcpy(&checkpointId, ptr, sizeof(checkpointId));
2603                 ptr += sizeof(uint64_t);
2604                 lastCheckpointIds[ntohs(vbid)] = ntohll(checkpointId);
2605             }
2606             nuserdata -=
2607                         ((sizeof(uint16_t) + sizeof(uint64_t)) * nCheckpoints);
2608         }
2609     }
2610
2611     TapProducer *tp = tapConnMap->newProducer(cookie, tapName, flags,
2612                                  backfillAge,
2613                                  static_cast<int>(
2614                                  configuration.getTapKeepalive()),
2615                                  vbuckets,
2616                                  lastCheckpointIds);
2617
2618     tapConnMap->notifyPausedConnection(tp, true);
2619     return true;
2620 }
2621
2622 ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
2623                                                         void *engine_specific,
2624                                                         uint16_t nengine,
2625                                                         uint8_t ttl,
2626                                                         uint16_t tap_flags,
2627                                                         uint16_t tap_event,
2628                                                         uint32_t tap_seqno,
2629                                                         const void *key,
2630                                                         size_t nkey,
2631                                                         uint32_t flags,
2632                                                         uint32_t exptime,
2633                                                         uint64_t cas,
2634                                                         uint8_t datatype,
2635                                                         const void *data,
2636                                                         size_t ndata,
2637                                                         uint16_t vbucket)
2638 {
2639     (void) ttl;
2640     void *specific = getEngineSpecific(cookie);
2641     ConnHandler *connection = NULL;
2642     if (specific == NULL) {
2643         if (tap_event == TAP_ACK) {
2644             LOG(EXTENSION_LOG_WARNING, "Tap producer with cookie %s does not "
2645                 "exist. Force disconnect...\n", (char *) cookie);
2646             // tap producer is no longer connected..
2647             return ENGINE_DISCONNECT;
2648         } else {
2649             connection = tapConnMap->newConsumer(cookie);
2650             if (connection == NULL) {
2651                 LOG(EXTENSION_LOG_WARNING, "Failed to create new tap consumer."
2652                     " Force disconnect\n");
2653                 return ENGINE_DISCONNECT;
2654             }
2655             storeEngineSpecific(cookie, connection);
2656         }
2657     } else {
2658         connection = reinterpret_cast<ConnHandler *>(specific);
2659     }
2660
2661     std::string k(static_cast<const char*>(key), nkey);
2662     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2663
2664     if (tap_event == TAP_MUTATION || tap_event == TAP_DELETION) {
2665         if (!tapThrottle->shouldProcess()) {
2666             ++stats.tapThrottled;
2667             if (connection->supportsAck()) {
2668                 ret = ENGINE_TMPFAIL;
2669             } else {
2670                 ret = ENGINE_DISCONNECT;
2671                 LOG(EXTENSION_LOG_WARNING, "%s Can't throttle streams without "
2672                     "ack support. Force disconnect...\n",
2673                     connection->logHeader());
2674             }
2675             return ret;
2676         }
2677     }
2678
2679     switch (tap_event) {
2680     case TAP_ACK:
2681         ret = processTapAck(cookie, tap_seqno, tap_flags, k);
2682         break;
2683     case TAP_FLUSH:
2684         ret = flush(cookie, 0);
2685         LOG(EXTENSION_LOG_WARNING, "%s Received flush.\n",
2686             connection->logHeader());
2687         break;
2688     case TAP_DELETION:
2689         {
2690             uint64_t revSeqno;
2691             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2692                                                 nengine, &revSeqno);
2693
2694             ret = connection->deletion(0, key, nkey, cas, vbucket, 0, revSeqno,
2695                                        NULL, 0);
2696         }
2697         break;
2698
2699     case TAP_CHECKPOINT_START:
2700     case TAP_CHECKPOINT_END:
2701         {
2702             TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2703             if (tc) {
2704                 if (tap_event == TAP_CHECKPOINT_START &&
2705                     nengine == TapEngineSpecific::sizeRevSeqno) {
2706                     // Set the current value for the max deleted seqno
2707                     RCPtr<VBucket> vb = getVBucket(vbucket);
2708                     if (!vb) {
2709                         return ENGINE_TMPFAIL;
2710                     }
2711                     uint64_t seqnum;
2712                     TapEngineSpecific::readSpecificData(tap_event,
2713                                                         engine_specific,
2714                                                         nengine,
2715                                                         &seqnum);
2716                     vb->ht.setMaxDeletedRevSeqno(seqnum);
2717                 }
2718
2719                 if (data) {
2720                     uint64_t checkpointId;
2721                     memcpy(&checkpointId, data, sizeof(checkpointId));
2722                     checkpointId = ntohll(checkpointId);
2723                     ConnHandlerCheckPoint(tc, tap_event, vbucket,
2724                                           checkpointId);
2725                 }
2726                 else {
2727                     ret = ENGINE_DISCONNECT;
2728                     LOG(EXTENSION_LOG_WARNING,
2729                         "%s Checkpoint Id is missing in "
2730                         "CHECKPOINT messages. Force disconnect...\n",
2731                         connection->logHeader());
2732                 }
2733             }
2734             else {
2735                 ret = ENGINE_DISCONNECT;
2736                 LOG(EXTENSION_LOG_WARNING,
2737                     "%s not a consumer! Force disconnect\n",
2738                     connection->logHeader());
2739             }
2740         }
2741
2742         break;
2743
2744     case TAP_MUTATION:
2745         {
2746             uint8_t nru = INITIAL_NRU_VALUE;
2747             uint64_t revSeqno = 0;
2748             TapEngineSpecific::readSpecificData(tap_event, engine_specific,
2749                                                 nengine, &revSeqno, &nru);
2750
2751             if (!isDatatypeSupported(cookie)) {
2752                 datatype = PROTOCOL_BINARY_RAW_BYTES;
2753                 const unsigned char *dat = (const unsigned char*)data;
2754                 const int datlen = ndata;
2755                 if (checkUTF8JSON(dat, datlen)) {
2756                     datatype = PROTOCOL_BINARY_DATATYPE_JSON;
2757                 }
2758             }
2759             ret = connection->mutation(0, key, nkey, data, ndata, cas, vbucket,
2760                                        flags, datatype, 0, 0, revSeqno, exptime,
2761                                        nru, NULL, 0);
2762         }
2763
2764         break;
2765
2766     case TAP_OPAQUE:
2767         if (nengine == sizeof(uint32_t)) {
2768             uint32_t cc;
2769             memcpy(&cc, engine_specific, sizeof(cc));
2770             cc = ntohl(cc);
2771
2772             switch (cc) {
2773             case TAP_OPAQUE_ENABLE_AUTO_NACK:
2774                 // @todo: the memcached core will _ALWAYS_ send nack
2775                 //        if it encounter an error. This should be
2776                 // set as the default when we move to .next after 2.0
2777                 // (currently we need to allow the message for
2778                 // backwards compatibility)
2779                 LOG(EXTENSION_LOG_INFO, "%s Enable auto nack mode\n",
2780                     connection->logHeader());
2781                 break;
2782             case TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
2783                 connection->setSupportCheckpointSync(true);
2784                 LOG(EXTENSION_LOG_INFO,
2785                     "%s Enable checkpoint synchronization\n",
2786                     connection->logHeader());
2787                 break;
2788             case TAP_OPAQUE_OPEN_CHECKPOINT:
2789                 /**
2790                  * This event is only received by the TAP client that wants to
2791                  * get mutations from closed checkpoints only. At this time,
2792                  * only incremental backup client receives this event so that
2793                  * it can close the connection and reconnect later.
2794                  */
2795                 LOG(EXTENSION_LOG_INFO, "%s Beginning of checkpoint.\n",
2796                     connection->logHeader());
2797                 break;
2798             case TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
2799                 {
2800                     LOG(EXTENSION_LOG_INFO,
2801                         "%s Backfill started for vbucket %d.\n",
2802                         connection->logHeader(), vbucket);
2803                     BlockTimer timer(&stats.tapVbucketResetHisto);
2804                     ret = resetVBucket(vbucket) ? ENGINE_SUCCESS :
2805                                                   ENGINE_DISCONNECT;
2806                     if (ret == ENGINE_DISCONNECT) {
2807                         LOG(EXTENSION_LOG_WARNING,
2808                          "%s Failed to reset a vbucket %d. Force disconnect\n",
2809                             connection->logHeader(), vbucket);
2810                     } else {
2811                         LOG(EXTENSION_LOG_WARNING,
2812                          "%s Reset vbucket %d was completed succecssfully.\n",
2813                             connection->logHeader(), vbucket);
2814                     }
2815
2816                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2817                     if (tc) {
2818                         tc->setBackfillPhase(true, vbucket);
2819                     } else {
2820                         ret = ENGINE_DISCONNECT;
2821                         LOG(EXTENSION_LOG_WARNING,
2822                             "TAP consumer doesn't exists. Force disconnect\n");
2823                     }
2824                 }
2825                 break;
2826             case TAP_OPAQUE_CLOSE_BACKFILL:
2827                 {
2828                     LOG(EXTENSION_LOG_INFO, "%s Backfill finished.\n",
2829                         connection->logHeader());
2830                     TapConsumer *tc = dynamic_cast<TapConsumer*>(connection);
2831                     if (tc) {
2832                         tc->setBackfillPhase(false, vbucket);
2833                     } else {
2834                         ret = ENGINE_DISCONNECT;
2835                         LOG(EXTENSION_LOG_WARNING,
2836                             "%s not a consumer! Force disconnect\n",
2837                             connection->logHeader());
2838                     }
2839                 }
2840                 break;
2841             case TAP_OPAQUE_CLOSE_TAP_STREAM:
2842                 /**
2843                  * This event is sent by the eVBucketMigrator to notify that
2844                  * the source node closes the tap replication stream and
2845                  * switches to TAKEOVER_VBUCKETS phase.
2846                  * This is just an informative message and doesn't require any
2847                  * action.
2848                  */
2849                 LOG(EXTENSION_LOG_INFO,
2850                 "%s Received close tap stream. Switching to takeover phase.\n",
2851                     connection->logHeader());
2852                 break;
2853             case TAP_OPAQUE_COMPLETE_VB_FILTER_CHANGE:
2854                 /**
2855                  * This opaque message is just for notifying that the source
2856                  * node receives change_vbucket_filter request and processes
2857                  * it successfully.
2858                  */
2859                 LOG(EXTENSION_LOG_INFO,
2860                 "%s Notified that the source node changed a vbucket filter.\n",
2861                     connection->logHeader());
2862                 break;
2863             default:
2864                 LOG(EXTENSION_LOG_WARNING,
2865                     "%s Received an unknown opaque command\n",
2866                     connection->logHeader());
2867             }
2868         } else {
2869             LOG(EXTENSION_LOG_WARNING,
2870                 "%s Received tap opaque with unknown size %d\n",
2871                 connection->logHeader(), nengine);
2872         }
2873         break;
2874
2875     case TAP_VBUCKET_SET:
2876         {
2877             BlockTimer timer(&stats.tapVbucketSetHisto);
2878
2879             if (nengine != sizeof(vbucket_state_t)) {
2880                 // illegal datasize
2881                 LOG(EXTENSION_LOG_WARNING,
2882                     "%s Received TAP_VBUCKET_SET with illegal size."
2883                     " Force disconnect\n", connection->logHeader());
2884                 ret = ENGINE_DISCONNECT;
2885                 break;
2886             }
2887
2888             vbucket_state_t state;
2889             memcpy(&state, engine_specific, nengine);
2890             state = (vbucket_state_t)ntohl(state);
2891
2892             ret = connection->setVBucketState(0, vbucket, state);
2893         }
2894         break;
2895
2896     default:
2897         // Unknown command
2898         LOG(EXTENSION_LOG_WARNING,
2899             "%s Recieved bad opcode, ignoring message\n",
2900             connection->logHeader());
2901     }
2902
2903     connection->processedEvent(tap_event, ret);
2904     return ret;
2905 }
2906
2907 ENGINE_ERROR_CODE EventuallyPersistentEngine::ConnHandlerCheckPoint(
2908                                                       TapConsumer *consumer,
2909                                                       uint8_t event,
2910                                                       uint16_t vbucket,
2911                                                       uint64_t checkpointId) {
2912     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2913
2914     if (consumer->processCheckpointCommand(event, vbucket, checkpointId)) {
2915         getEpStore()->wakeUpFlusher();
2916         ret = ENGINE_SUCCESS;
2917     }
2918     else {
2919         ret = ENGINE_DISCONNECT;
2920         LOG(EXTENSION_LOG_WARNING, "%s Error processing "
2921             "checkpoint %llu. Force disconnect\n",
2922             consumer->logHeader(), checkpointId);
2923     }
2924
2925     return ret;
2926 }
2927
2928 TapProducer* EventuallyPersistentEngine::getTapProducer(const void *cookie) {
2929     TapProducer *rv =
2930         reinterpret_cast<TapProducer*>(getEngineSpecific(cookie));
2931     if (!(rv && rv->isConnected())) {
2932         LOG(EXTENSION_LOG_WARNING,
2933             "Walking a non-existent tap queue, disconnecting\n");
2934         return NULL;
2935     }
2936
2937     if (rv->doDisconnect()) {
2938         LOG(EXTENSION_LOG_WARNING,
2939             "%s Disconnecting pending connection\n", rv->logHeader());
2940         return NULL;
2941     }
2942     return rv;
2943 }
2944
2945 ENGINE_ERROR_CODE EventuallyPersistentEngine::processTapAck(const void *cookie,
2946                                                             uint32_t seqno,
2947                                                             uint16_t status,
2948                                                             const std::string
2949                                                             &msg)
2950 {
2951     TapProducer *connection = getTapProducer(cookie);
2952     if (!connection) {
2953         LOG(EXTENSION_LOG_WARNING,
2954             "Unable to process tap ack. No producer found\n");
2955         return ENGINE_DISCONNECT;
2956     }
2957
2958     return connection->processAck(seqno, status, msg);
2959 }
2960
2961 void EventuallyPersistentEngine::queueBackfill(const VBucketFilter
2962                                                              &backfillVBFilter,
2963                                                Producer *tc)
2964 {
2965     ExTask backfillTask = new BackfillTask(this, *tapConnMap, tc,
2966                                            backfillVBFilter);
2967     ExecutorPool::get()->schedule(backfillTask, NONIO_TASK_IDX);
2968 }
2969
2970 bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
2971     ++numVbucket;
2972     item_eviction_policy_t policy = engine.getEpStore()->
2973                                                        getItemEvictionPolicy();
2974     numItems += vb->getNumItems(policy);
2975     numTempItems += vb->getNumTempItems();
2976     nonResident += vb->getNumNonResidentItems(policy);
2977
2978     if (vb->getHighPriorityChkSize() > 0) {
2979         chkPersistRemaining++;
2980     }
2981
2982     fileSpaceUsed += vb->fileSpaceUsed;
2983     fileSize += vb->fileSize;
2984
2985     if (desired_state != vbucket_state_dead) {
2986         htMemory += vb->ht.memorySize();
2987         htItemMemory += vb->ht.getItemMemory();
2988         htCacheSize += vb->ht.cacheSize;
2989         numEjects += vb->ht.getNumEjects();
2990         numExpiredItems += vb->numExpiredItems;
2991         metaDataMemory += vb->ht.metaDataMemory;
2992         metaDataDisk += vb->metaDataDisk;
2993         opsCreate += vb->opsCreate;
2994         opsUpdate += vb->opsUpdate;
2995         opsDelete += vb->opsDelete;
2996         opsReject += vb->opsReject;
2997
2998         queueSize += vb->dirtyQueueSize;
2999         queueMemory += vb->dirtyQueueMem;
3000         queueFill += vb->dirtyQueueFill;
3001         queueDrain += vb->dirtyQueueDrain;
3002         queueAge += vb->getQueueAge();
3003         pendingWrites += vb->dirtyQueuePendingWrites;
3004     }
3005
3006     return false;
3007 }
3008
3009 /**
3010  * A container class holding VBucketCountVisitors to aggregate stats for
3011  * different vbucket states.
3012  */
3013 class VBucketCountAggregator : public VBucketVisitor  {
3014 public:
3015     bool visitBucket(RCPtr<VBucket> &vb)  {
3016         std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
3017         it = visitorMap.find(vb->getState());
3018         if ( it != visitorMap.end() ) {
3019             it->second->visitBucket(vb);
3020         }
3021
3022         return false;
3023     }
3024
3025     void addVisitor(VBucketCountVisitor* visitor)  {
3026         visitorMap[visitor->getVBucketState()] = visitor;
3027     }
3028 private:
3029     std::map<vbucket_state_t, VBucketCountVisitor*> visitorMap;
3030 };
3031
3032 ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
3033                                                            ADD_STAT add_stat) {
3034     VBucketCountAggregator aggregator;
3035
3036     VBucketCountVisitor activeCountVisitor(*this, vbucket_state_active);
3037     aggregator.addVisitor(&activeCountVisitor);
3038
3039     VBucketCountVisitor replicaCountVisitor(*this, vbucket_state_replica);
3040     aggregator.addVisitor(&replicaCountVisitor);
3041
3042     VBucketCountVisitor pendingCountVisitor(*this, vbucket_state_pending);
3043     aggregator.addVisitor(&pendingCountVisitor);
3044
3045     VBucketCountVisitor deadCountVisitor(*this, vbucket_state_dead);
3046     aggregator.addVisitor(&deadCountVisitor);
3047
3048     epstore->visit(aggregator);
3049
3050     epstore->updateCachedResidentRatio(activeCountVisitor.getMemResidentPer(),
3051                                       replicaCountVisitor.getMemResidentPer());
3052     tapThrottle->adjustWriteQueueCap(activeCountVisitor.getNumItems() +
3053                                      replicaCountVisitor.getNumItems() +
3054                                      pendingCountVisitor.getNumItems());
3055
3056     configuration.addStats(add_stat, cookie);
3057
3058     EPStats &epstats = getEpStats();
3059     add_casted_stat("ep_version", VERSION, add_stat, cookie);
3060     add_casted_stat("ep_storage_age",
3061                     epstats.dirtyAge, add_stat, cookie);
3062     add_casted_stat("ep_storage_age_highwat",
3063                     epstats.dirtyAgeHighWat, add_stat, cookie);
3064     add_casted_stat("ep_num_workers", ExecutorPool::get()->getNumWorkersStat(),
3065                     add_stat, cookie);
3066
3067     if (getWorkloadPriority() == HIGH_BUCKET_PRIORITY) {
3068         add_casted_stat("ep_bucket_priority", "HIGH", add_stat, cookie);
3069     } else if (getWorkloadPriority() == LOW_BUCKET_PRIORITY) {
3070         add_casted_stat("ep_bucket_priority", "LOW", add_stat, cookie);
3071     }
3072
3073     add_casted_stat("ep_total_enqueued",
3074                     epstats.totalEnqueued, add_stat, cookie);
3075     add_casted_stat("ep_total_persisted",
3076                     epstats.totalPersisted, add_stat, cookie);
3077     add_casted_stat("ep_item_flush_failed",
3078                     epstats.flushFailed, add_stat, cookie);
3079     add_casted_stat("ep_item_commit_failed",
3080                     epstats.commitFailed, add_stat, cookie);
3081     add_casted_stat("ep_item_begin_failed",
3082                     epstats.beginFailed, add_stat, cookie);
3083     add_casted_stat("ep_expired_access", epstats.expired_access,
3084                     add_stat, cookie);
3085     add_casted_stat("ep_expired_pager", epstats.expired_pager,
3086                     add_stat, cookie);
3087     add_casted_stat("ep_item_flush_expired",
3088                     epstats.flushExpired, add_stat, cookie);
3089     add_casted_stat("ep_queue_size",
3090                     epstats.diskQueueSize, add_stat, cookie);
3091     add_casted_stat("ep_flusher_todo",
3092                     epstats.flusher_todo, add_stat, cookie);
3093     add_casted_stat("ep_uncommitted_items",
3094                     epstats.flusher_todo, add_stat, cookie);
3095     add_casted_stat("ep_diskqueue_items",
3096                     epstats.diskQueueSize, add_stat, cookie);
3097     add_casted_stat("ep_flusher_state",
3098                     epstore->getFlusher(0)->stateName(),
3099                     add_stat, cookie);
3100     add_casted_stat("ep_commit_num", epstats.flusherCommits,
3101                     add_stat, cookie);
3102     add_casted_stat("ep_commit_time",
3103                     epstats.commit_time, add_stat, cookie);
3104     add_casted_stat("ep_commit_time_total",
3105                     epstats.cumulativeCommitTime, add_stat, cookie);
3106     add_casted_stat("ep_vbucket_del",
3107                     epstats.vbucketDeletions, add_stat, cookie);
3108     add_casted_stat("ep_vbucket_del_fail",
3109                     epstats.vbucketDeletionFail, add_stat, cookie);
3110     add_casted_stat("ep_flush_duration_total",
3111                     epstats.cumulativeFlushTime, add_stat, cookie);
3112     add_casted_stat("ep_flush_all",
3113                     epstore->isFlushAllScheduled() ? "true" : "false",
3114                     add_stat, cookie);
3115     add_casted_stat("curr_items", activeCountVisitor.getNumItems(), add_stat,
3116                     cookie);
3117     add_casted_stat("curr_temp_items", activeCountVisitor.getNumTempItems(),
3118                     add_stat, cookie);
3119     add_casted_stat("curr_items_tot",
3120                     activeCountVisitor.getNumItems() +
3121                     replicaCountVisitor.getNumItems() +
3122                     pendingCountVisitor.getNumItems(),
3123                     add_stat, cookie);
3124     add_casted_stat("vb_active_num", activeCountVisitor.getVBucketNumber(),
3125                     add_stat, cookie);
3126     add_casted_stat("vb_active_curr_items", activeCountVisitor.getNumItems(),
3127                     add_stat, cookie);
3128     add_casted_stat("vb_active_num_non_resident",
3129                     activeCountVisitor.getNonResident(),
3130                     add_stat, cookie);
3131     add_casted_stat("vb_active_perc_mem_resident",