Merge remote-tracking branch 'couchbase/3.0.x' into sherlock 89/65189/2
authorDave Rigby <daver@couchbase.com>
Tue, 28 Jun 2016 15:13:14 +0000 (16:13 +0100)
committerDave Rigby <daver@couchbase.com>
Tue, 28 Jun 2016 15:14:34 +0000 (16:14 +0100)
* couchbase/3.0.x:
  MB-19343: Use cb_gmtime_r instead of gmtime_r
  [BP] MB-16366: Obtain vbstate readlock in numerous operations
  MB-19280: Fix data race in CouchKVStore stats access
  MB-19279: Fix race in use of gmtime()
  MB-19113: Suppress test_mb16357 when on thread sanitizer

Change-Id: Id289ae95e6fc5e03a64957cc41f53af9ee262a2a

99 files changed:
CMakeLists.txt
configuration.json
docs/engine-params.org
docs/protocol/del_with_meta.md [new file with mode: 0644]
docs/protocol/get_meta.md [new file with mode: 0644]
docs/protocol/set_with_meta.md [new file with mode: 0644]
docs/stats.org
management/cbadm-tap-registration [deleted file]
management/cbcompact [changed mode: 0644->0755]
management/cbepctl
management/cbstats
management/clitool.py
management/mc_bin_client.py
management/memcacheConstants.py
src/atomic.cc
src/atomic.h
src/atomic/gcc_atomics.h [deleted file]
src/atomic/libatomic.h [deleted file]
src/atomicqueue.h
src/backfill.cc
src/bgfetcher.cc
src/bgfetcher.h
src/bloomfilter.cc [new file with mode: 0644]
src/bloomfilter.h [new file with mode: 0644]
src/callbacks.h
src/checkpoint.cc
src/checkpoint.h
src/checkpoint_remover.cc
src/common.h
src/config_static.h
src/conflict_resolution.cc
src/conflict_resolution.h
src/connmap.cc
src/connmap.h
src/couch-kvstore/couch-fs-stats.cc
src/couch-kvstore/couch-kvstore.cc
src/couch-kvstore/couch-kvstore.h
src/dcp-backfill-manager.cc [new file with mode: 0644]
src/dcp-backfill-manager.h [new file with mode: 0644]
src/dcp-backfill.cc [new file with mode: 0644]
src/dcp-backfill.h [new file with mode: 0644]
src/dcp-consumer.cc
src/dcp-consumer.h
src/dcp-producer.cc
src/dcp-producer.h
src/dcp-response.h
src/dcp-stream.cc
src/dcp-stream.h
src/defragmenter.cc [new file with mode: 0644]
src/defragmenter.h [new file with mode: 0644]
src/defragmenter_visitor.cc [new file with mode: 0644]
src/defragmenter_visitor.h [new file with mode: 0644]
src/ep.cc
src/ep.h
src/ep_engine.cc
src/ep_engine.h
src/executorpool.cc
src/executorpool.h
src/executorthread.cc
src/executorthread.h
src/ext_meta_parser.cc [new file with mode: 0644]
src/ext_meta_parser.h [new file with mode: 0644]
src/failover-table.cc
src/failover-table.h
src/flusher.cc
src/item.h
src/item_pager.cc
src/kvshard.cc
src/kvshard.h
src/kvstore.cc
src/kvstore.h
src/memory_tracker.cc
src/murmurhash3.cc [new file with mode: 0644]
src/murmurhash3.h [new file with mode: 0644]
src/mutation_log.cc
src/mutation_log.h
src/priority.cc
src/priority.h
src/relaxed_atomic.h
src/stats.h
src/stored-value.cc
src/stored-value.h
src/tapconnection.cc
src/tapconnection.h
src/tasks.cc
src/tasks.h
src/vbucket.cc
src/vbucket.h
src/vbucketmap.cc
src/warmup.cc
tests/ep_test_apis.cc
tests/ep_test_apis.h
tests/ep_testsuite.cc
tests/mock/mock_dcp.cc
tests/module_tests/checkpoint_test.cc
tests/module_tests/defragmenter_test.cc [new file with mode: 0644]
tests/module_tests/hash_table_test.cc
tests/module_tests/kvstore_test.cc [new file with mode: 0644]
tests/module_tests/stream_test.cc

index 82c4504..3bdf830 100644 (file)
@@ -1,6 +1,10 @@
 PROJECT(EventuallyPersistentEngine)
 CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
 
+IF (${CMAKE_MAJOR_VERSION} GREATER 2)
+    CMAKE_POLICY(SET CMP0042 NEW)
+ENDIF (${CMAKE_MAJOR_VERSION} GREATER 2)
+
 INCLUDE(CheckFunctionExists)
 INCLUDE(CheckIncludeFileCXX)
 INCLUDE(CheckIncludeFiles)
@@ -15,6 +19,7 @@ INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include
                            ${CMAKE_CURRENT_BINARY_DIR}/src
                            ${SNAPPY_INCLUDE_DIR}
                            ${Platform_SOURCE_DIR}/include
+                           ${Memcached_SOURCE_DIR}
                            ${Memcached_SOURCE_DIR}/include
                            ${Couchstore_SOURCE_DIR}/include
                            ${CMAKE_CURRENT_BINARY_DIR})
@@ -24,7 +29,6 @@ CHECK_INCLUDE_FILES("unistd.h" HAVE_UNISTD_H)
 CHECK_INCLUDE_FILES("netdb.h" HAVE_NETDB_H)
 CHECK_INCLUDE_FILES("mach/mach_time.h" HAVE_MACH_MACH_TIME_H)
 CHECK_INCLUDE_FILES("poll.h" HAVE_POLL_H)
-CHECK_INCLUDE_FILES("atomic.h" HAVE_ATOMIC_H)
 CHECK_INCLUDE_FILES("sysexits.h" HAVE_SYSEXITS_H)
 CHECK_INCLUDE_FILES("unistd.h" HAVE_UNISTD_H)
 CHECK_INCLUDE_FILES("sched.h" HAVE_SCHED_H)
@@ -51,11 +55,13 @@ CHECK_FUNCTION_EXISTS(getopt_long HAVE_GETOPT_LONG)
 #endif()
 
 IF (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/.git)
-   EXECUTE_PROCESS(COMMAND git describe
+   EXECUTE_PROCESS(COMMAND git rev-parse HEAD
                    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
                    OUTPUT_VARIABLE EP_ENGINE_VERSION
-                   ERROR_QUIET
                    OUTPUT_STRIP_TRAILING_WHITESPACE)
+   IF (EP_ENGINE_VERSION STREQUAL "" OR EP_ENGINE_VERSION STREQUAL "HEAD")
+     MESSAGE (FATAL_ERROR "Failed to determine commit SHA!")
+   ENDIF (EP_ENGINE_VERSION STREQUAL "" OR EP_ENGINE_VERSION STREQUAL "HEAD")
 ELSE (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/.git)
    SET(EP_ENGINE_VERSION "unknown")
 ENDIF (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/.git)
@@ -65,8 +71,6 @@ CONFIGURE_FILE (${CMAKE_CURRENT_SOURCE_DIR}/src/config.cmake.h
 
 # Generate the python wrappers
 CONFIGURE_FILE (${CMAKE_CURRENT_SOURCE_DIR}/wrapper/wrapper
-                ${CMAKE_CURRENT_BINARY_DIR}/wrapper/cbadm-tap-registration)
-CONFIGURE_FILE (${CMAKE_CURRENT_SOURCE_DIR}/wrapper/wrapper
                 ${CMAKE_CURRENT_BINARY_DIR}/wrapper/cbepctl)
 CONFIGURE_FILE (${CMAKE_CURRENT_SOURCE_DIR}/wrapper/wrapper
                 ${CMAKE_CURRENT_BINARY_DIR}/wrapper/cbstats)
@@ -115,21 +119,25 @@ SET(CONFIG_SOURCE src/configuration.cc
 
 ADD_LIBRARY(ep SHARED
             src/access_scanner.cc src/atomic.cc src/backfill.cc
-            src/bgfetcher.cc src/checkpoint.cc
+            src/bgfetcher.cc src/bloomfilter.cc src/checkpoint.cc
             src/checkpoint_remover.cc src/conflict_resolution.cc
+            src/dcp-backfill-manager.cc src/dcp-backfill.cc
+            src/dcp-consumer.cc src/dcp-producer.cc
+            src/dcp-stream.cc src/dcp-response.cc
+            src/defragmenter.cc
+            src/defragmenter_visitor.cc
             src/ep.cc src/ep_engine.cc src/ep_time.c
-            src/executorpool.cc src/failover-table.cc
-            src/flusher.cc src/htresizer.cc
+            src/executorpool.cc src/ext_meta_parser.cc
+            src/failover-table.cc src/flusher.cc src/htresizer.cc
             src/item.cc src/item_pager.cc src/kvshard.cc
-            src/memory_tracker.cc src/mutex.cc src/priority.cc
+            src/memory_tracker.cc src/murmurhash3.cc
+            src/mutex.cc src/priority.cc
             src/executorthread.cc
             src/sizes.cc
             ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
             src/stored-value.cc src/tapconnection.cc src/connmap.cc
             src/tapthrottle.cc src/tasks.cc
-            src/taskqueue.cc
-            src/dcp-response.cc src/dcp-consumer.cc
-            src/dcp-producer.cc src/dcp-stream.cc src/vbucket.cc
+            src/taskqueue.cc src/vbucket.cc
             src/vbucketmap.cc src/warmup.cc
             ${KVSTORE_SOURCE} ${COUCH_KVSTORE_SOURCE}
             ${OBJECTREGISTRY_SOURCE} ${CONFIG_SOURCE})
@@ -137,25 +145,44 @@ ADD_LIBRARY(ep SHARED
 SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "")
 TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore dirutils platform ${LIBEVENT_LIBRARIES})
 
+IF (APPLE)
+    SET(MEMORY_TRACKING_SRCS ${Memcached_SOURCE_DIR}/daemon/alloc_hooks.c
+                             ${Memcached_SOURCE_DIR}/daemon/darwin_zone.c)
+ELSE (APPLE)
+    SET(MEMORY_TRACKING_SRCS ${Memcached_SOURCE_DIR}/daemon/alloc_hooks.c)
+ENDIF (APPLE)
+
+IF (MEMORY_ALLOCATOR)
+    INCLUDE_DIRECTORIES(AFTER ${MALLOC_INCLUDE_DIR})
+ELSE (MEMORY_ALLOCATOR)
+    SET(MALLOC_LIBRARIES "")
+ENDIF (MEMORY_ALLOCATOR)
+
 ADD_EXECUTABLE(ep-engine_stream_test
   tests/module_tests/stream_test.cc
   src/access_scanner.cc
   src/atomic.cc
   src/backfill.cc
   src/bgfetcher.cc
+  src/bloomfilter.cc
   src/checkpoint.cc
   src/checkpoint_remover.cc
   src/conflict_resolution.cc
   src/connmap.cc
+  src/dcp-backfill.cc
+  src/dcp-backfill-manager.cc
   src/dcp-consumer.cc
   src/dcp-producer.cc
   src/dcp-response.cc
   src/dcp-stream.cc
+  src/defragmenter.cc
+  src/defragmenter_visitor.cc
   src/ep.cc
   src/ep_engine.cc
   src/ep_time.c
   src/executorpool.cc
   src/executorthread.cc
+  src/ext_meta_parser.cc
   src/failover-table.cc
   src/flusher.cc
   src/htresizer.cc
@@ -163,6 +190,7 @@ ADD_EXECUTABLE(ep-engine_stream_test
   src/item_pager.cc
   src/kvshard.cc
   src/memory_tracker.cc
+  src/murmurhash3.cc
   src/mutation_log.cc
   src/mutex.cc
   src/objectregistry.cc
@@ -179,8 +207,9 @@ ADD_EXECUTABLE(ep-engine_stream_test
   ${CONFIG_SOURCE}
   ${KVSTORE_SOURCE}
   ${COUCH_KVSTORE_SOURCE}
+  ${MEMORY_TRACKING_SRCS}
   ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.c)
-TARGET_LINK_LIBRARIES(ep-engine_stream_test couchstore cJSON dirutils JSON_checker mcd_util platform)
+TARGET_LINK_LIBRARIES(ep-engine_stream_test couchstore cJSON dirutils JSON_checker mcd_util platform ${MALLOC_LIBRARIES})
 
 ADD_EXECUTABLE(ep-engine_atomic_ptr_test
   tests/module_tests/atomic_ptr_test.cc
@@ -197,6 +226,7 @@ TARGET_LINK_LIBRARIES(ep-engine_atomic_test platform)
 
 ADD_EXECUTABLE(ep-engine_checkpoint_test
   tests/module_tests/checkpoint_test.cc
+  src/bloomfilter.cc src/murmurhash3.cc
   src/checkpoint.cc src/failover-table.cc
   src/testlogger.cc src/stored-value.cc
   src/atomic.cc src/mutex.cc
@@ -207,6 +237,7 @@ TARGET_LINK_LIBRARIES(ep-engine_checkpoint_test ${SNAPPY_LIBRARIES} cJSON platfo
 
 ADD_EXECUTABLE(ep-engine_chunk_creation_test
   tests/module_tests/chunk_creation_test.cc)
+TARGET_LINK_LIBRARIES(ep-engine_chunk_creation_test platform)
 
 ADD_EXECUTABLE(ep-engine_hash_table_test
   tests/module_tests/hash_table_test.cc src/item.cc
@@ -217,17 +248,21 @@ ADD_EXECUTABLE(ep-engine_hash_table_test
 TARGET_LINK_LIBRARIES(ep-engine_hash_table_test ${SNAPPY_LIBRARIES} platform)
 
 ADD_EXECUTABLE(ep-engine_histo_test tests/module_tests/histo_test.cc)
+TARGET_LINK_LIBRARIES(ep-engine_histo_test platform)
 ADD_EXECUTABLE(ep-engine_hrtime_test tests/module_tests/hrtime_test.cc)
 TARGET_LINK_LIBRARIES(ep-engine_hrtime_test platform)
 
 ADD_EXECUTABLE(ep-engine_misc_test tests/module_tests/misc_test.cc)
+TARGET_LINK_LIBRARIES(ep-engine_misc_test platform)
 ADD_EXECUTABLE(ep-engine_mutex_test
   tests/module_tests/mutex_test.cc src/testlogger.cc src/mutex.cc)
 TARGET_LINK_LIBRARIES(ep-engine_mutex_test platform)
 
-ADD_EXECUTABLE(ep-engine_priority_test  tests/module_tests/priority_test.cc
-                        src/priority.cc)
+ADD_EXECUTABLE(ep-engine_priority_test  tests/module_tests/priority_test.cc src/priority.cc)
+TARGET_LINK_LIBRARIES(ep-engine_priority_test platform)
+
 ADD_EXECUTABLE(ep-engine_ringbuffer_test tests/module_tests/ringbuffer_test.cc)
+TARGET_LINK_LIBRARIES(ep-engine_ringbuffer_test platform)
 
 ADD_EXECUTABLE(ep-engine_failover_table_test tests/module_tests/failover_table_test.cc
                         src/failover-table.cc src/mutex.cc src/testlogger.cc
@@ -235,6 +270,60 @@ ADD_EXECUTABLE(ep-engine_failover_table_test tests/module_tests/failover_table_t
                         ${OBJECTREGISTRY_SOURCE} ${CONFIG_SOURCE})
 TARGET_LINK_LIBRARIES(ep-engine_failover_table_test cJSON platform)
 
+ADD_EXECUTABLE(ep-engine_kvstore_test
+  tests/module_tests/kvstore_test.cc
+  src/access_scanner.cc
+  src/atomic.cc
+  src/backfill.cc
+  src/bgfetcher.cc
+  src/bloomfilter.cc
+  src/checkpoint.cc
+  src/checkpoint_remover.cc
+  src/conflict_resolution.cc
+  src/connmap.cc
+  src/dcp-backfill.cc
+  src/dcp-backfill-manager.cc
+  src/dcp-consumer.cc
+  src/dcp-producer.cc
+  src/dcp-response.cc
+  src/dcp-stream.cc
+  src/defragmenter.cc
+  src/defragmenter_visitor.cc
+  src/ep.cc
+  src/ep_engine.cc
+  src/ep_time.c
+  src/executorpool.cc
+  src/executorthread.cc
+  src/ext_meta_parser.cc
+  src/failover-table.cc
+  src/flusher.cc
+  src/htresizer.cc
+  src/item.cc
+  src/item_pager.cc
+  src/kvshard.cc
+  src/memory_tracker.cc
+  src/murmurhash3.cc
+  src/mutation_log.cc
+  src/mutex.cc
+  src/objectregistry.cc
+  src/priority.cc
+  src/tapconnection.cc
+  src/stored-value.cc
+  src/tapthrottle.cc
+  src/tasks.cc
+  src/taskqueue.cc
+  src/vbucket.cc
+  src/vbucketmap.cc
+  src/warmup.cc
+  ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
+  ${OBJECTREGISTRY_SOURCE}
+  ${KVSTORE_SOURCE}
+  ${COUCH_KVSTORE_SOURCE}
+  ${CONFIG_SOURCE})
+TARGET_LINK_LIBRARIES(ep-engine_kvstore_test
+                      gmock gtest cJSON JSON_checker
+                      couchstore dirutils platform)
+
 ADD_TEST(ep-engine_atomic_ptr_test ep-engine_atomic_ptr_test)
 ADD_TEST(ep-engine_atomic_test ep-engine_atomic_test)
 ADD_TEST(ep-engine_checkpoint_test ep-engine_checkpoint_test)
@@ -248,6 +337,7 @@ ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
 ADD_TEST(ep-engine_priority_test ep-engine_priority_test)
 ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
 ADD_TEST(ep-engine_stream_test ep-engine_stream_test)
+ADD_TEST(ep-engine_kvstore_test ep-engine_kvstore_test)
 
 ADD_LIBRARY(timing_tests SHARED tests/module_tests/timing_tests.cc)
 SET_TARGET_PROPERTIES(timing_tests PROPERTIES PREFIX "")
@@ -257,15 +347,34 @@ ADD_EXECUTABLE(ep-engine_sizes src/sizes.cc src/mutex.h src/mutex.cc src/testlog
               ${OBJECTREGISTRY_SOURCE} ${CONFIG_SOURCE})
 TARGET_LINK_LIBRARIES(ep-engine_sizes platform)
 
+ADD_EXECUTABLE(ep-engine_defragmenter_test
+               tests/module_tests/defragmenter_test.cc
+               src/bloomfilter.cc
+               src/checkpoint.cc
+               src/configuration.cc
+               src/defragmenter_visitor.cc
+               src/ep_time.c
+               src/generated_configuration.cc
+               src/failover-table.cc
+               src/item.cc
+               src/murmurhash3.cc
+               src/mutex.cc
+               src/stored-value.cc
+               src/testlogger.cc
+               src/vbucket.cc
+               ${OBJECTREGISTRY_SOURCE})
+TARGET_LINK_LIBRARIES(ep-engine_defragmenter_test cJSON platform ${SNAPPY_LIBRARIES})
+
 ADD_LIBRARY(ep_testsuite SHARED
    tests/ep_testsuite.cc
    src/atomic.cc src/mutex.cc
    src/item.cc src/testlogger.cc
-   src/ep_time.c
+   src/ep_time.c src/ext_meta_parser.cc
    tests/mock/mock_dcp.cc
    tests/ep_test_apis.cc ${OBJECTREGISTRY_SOURCE} ${CONFIG_SOURCE})
 SET_TARGET_PROPERTIES(ep_testsuite PROPERTIES PREFIX "")
-TARGET_LINK_LIBRARIES(ep_testsuite JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+TARGET_LINK_LIBRARIES(ep_testsuite couchstore dirutils JSON_checker platform
+                      ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
 
 
 #ADD_CUSTOM_COMMAND(OUTPUT
@@ -302,7 +411,6 @@ TARGET_LINK_LIBRARIES(ep_testsuite JSON_checker dirutils platform ${LIBEVENT_LIB
 #SET_TARGET_PROPERTIES(generated_testsuite PROPERTIES PREFIX "")
 
 INSTALL(PROGRAMS
-        ${CMAKE_CURRENT_BINARY_DIR}/wrapper/cbadm-tap-registration
         ${CMAKE_CURRENT_BINARY_DIR}/wrapper/cbepctl
         ${CMAKE_CURRENT_BINARY_DIR}/wrapper/cbstats
         ${CMAKE_CURRENT_BINARY_DIR}/wrapper/cbcompact
@@ -312,7 +420,6 @@ INSTALL(PROGRAMS
         DESTINATION bin)
 
 INSTALL(PROGRAMS
-        management/cbadm-tap-registration
         management/cbepctl
         management/cbstats
         management/cbcompact
index 5bd8e04..5607590 100644 (file)
@@ -34,7 +34,7 @@
             }
         },
         "alog_task_time": {
-            "default": "10",
+            "default": "2",
             "descr": "Hour in GMT time when access scanner task is scheduled to run",
             "type": "size_t",
             "validator": {
                 }
             }
         },
+        "bfilter_enabled": {
+            "default": "true",
+            "desr": "Enable or disable the bloom filter",
+            "type": "bool"
+        },
+        "bfilter_key_count": {
+            "default": "10000",
+            "desr": "Bloomfilter: Estimated key count per vbucket",
+            "type": "size_t"
+        },
+        "bfilter_fp_prob": {
+            "default": "0.01",
+            "desr": "Bloomfilter: Allowed probability for false positives",
+            "type": "float"
+        },
+        "bfilter_residency_threshold": {
+            "default": "0.1",
+            "desr" : "If resident ratio (during full eviction) were found less than this threshold, compaction will include all items into bloomfilter",
+            "type" : "float"
+        },
         "compaction_exp_mem_threshold": {
             "default": "85",
             "desr": "Memory usage threshold after which compaction will not queue expired items for deletion",
             "dynamic": false,
             "type": "std::string"
         },
+        "defragmenter_enabled": {
+            "default": "true",
+            "descr": "True if defragmenter task is enabled",
+            "type": "bool"
+        },
+        "defragmenter_interval": {
+            "default": "10",
+            "descr": "How often defragmenter task should be run (in seconds).",
+            "type": "size_t"
+        },
+        "defragmenter_age_threshold": {
+            "default": "10",
+            "descr": "How old (measured in number of defragmenter passes) must a document be to be considered for degragmentation.",
+            "type": "size_t"
+        },
+        "defragmenter_chunk_duration": {
+            "default": "20",
+            "descr": "Maximum time (in ms) defragmentation task will run for before being paused (and resumed at the next defragmenter_interval).",
+            "type": "size_t"
+        },
         "enable_chk_merge": {
             "default": "false",
             "descr": "True if merging closed checkpoints is enabled",
             "type": "bool"
         },
         "flushall_enabled": {
-            "default": "false",
+            "default": "true",
             "descr": "True if memcached flush API is enabled",
             "type": "bool"
         },
             "dynamic" : false,
             "type": "std::string"
         },
+        "dcp_backfill_byte_limit": {
+            "default": "20971832",
+            "descr": "Max bytes a connection can backfill into memory",
+            "dynamic": false,
+            "type": "size_t"
+        },
         "dcp_conn_buffer_size": {
             "default": "10485760",
             "descr": "Size in bytes of an dcp consumer connection buffer",
             "dynamic": false,
             "type": "size_t"
         },
+        "dcp_enable_dynamic_conn_buffer_size": {
+            "default": "true",
+            "descr": "Whether or not the buffer size in dcp flow control be decided dynamically",
+            "dynamic": false,
+            "type": "bool"
+        },
+        "dcp_conn_buffer_size_max": {
+            "default": "52428800",
+            "descr": "Max size in bytes of an dcp consumer connection buffer",
+            "dynamic": false,
+            "type": "size_t"
+        },
+        "dcp_conn_buffer_size_perc": {
+            "default": "1",
+            "descr": "Percentage of memQuota for a dcp consumer connection buffer",
+            "type": "size_t",
+            "dynamic": false,
+            "validator": {
+                "range": {
+                    "max": 10,
+                    "min": 1
+                }
+            }
+        },
+        "dcp_conn_buffer_size_aggr_mem_threshold": {
+            "default": "10",
+            "descr": "Aggr mem usage by all dcp conns (as percentage of memQuota) after which only dcp_conn_buffer_size is allocated",
+            "type": "size_t",
+            "dynamic": false,
+            "validator": {
+                "range": {
+                    "max": 20,
+                    "min": 1
+                }
+            }
+        },
         "dcp_enable_flow_control": {
             "default": "true",
             "descr": "Whether or not dcp connections should use flow control",
             "dynamic": false,
             "type": "size_t"
         },
+        "dcp_scan_byte_limit": {
+            "default": "4194304",
+            "descr": "Max bytes that can be read in a single disk scan",
+            "dynamic": false,
+            "type": "size_t"
+        },
+        "dcp_scan_item_limit": {
+            "default": "4096",
+            "descr": "Max items that can be read in a single disk scan",
+            "dynamic": false,
+            "type": "size_t"
+        },
         "dcp_producer_snapshot_marker_yield_limit": {
             "default": "10",
             "descr": "The number of snapshots before ActiveStreamCheckpointProcessorTask::run yields.",
-            "type": "size_t"
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 100000000,
+                    "min": 1
+                }
+            }
+        },
+        "dcp_consumer_process_buffered_messages_yield_limit" : {
+            "default": "10",
+            "descr": "The number of processBufferedMessages iterations before forcing the task to yield.",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 100000000,
+                    "min": 1
+                }
+            }
+        },
+        "dcp_consumer_process_buffered_messages_batch_size" : {
+            "default": "10",
+            "descr": "The maximum number of items stream->processBufferedMessages will consume.",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 100000000,
+                    "min": 1
+                }
+            }
         },
         "vb0": {
             "default": "false",
index 5d77422..ae16a67 100644 (file)
@@ -80,6 +80,11 @@ memcached like this:
 |                             |        | below high water mark                      |
 | bf_resident_threshold       | float  | Resident item threshold for only memory    |
 |                             |        | backfill to be kicked off                  |
+| bfilter_enabled             | bool   | Bloom filter enabled or disabled           |
+| bfilter_residency_threshold | float  | Resident ratio threshold for full eviction |
+|                             |        | policy after which bloom filter switches   |
+|                             |        | mode from accounting just deletes and non  |
+|                             |        | resident items to all items                |
 | getl_default_timeout        | int    | The default timeout for a getl lock in (s) |
 | getl_max_timeout            | int    | The maximum timeout for a getl lock in (s) |
 | backfill_mem_threshold      | float  | Memory threshold on the current bucket     |
diff --git a/docs/protocol/del_with_meta.md b/docs/protocol/del_with_meta.md
new file mode 100644 (file)
index 0000000..010bdf6
--- /dev/null
@@ -0,0 +1,142 @@
+
+##Delete With Meta (delwithmeta) v4.0
+
+The delete with meta command is used to delete data with metadata for a key. Meta data passed is cas, sequence number, flags and expiration along with an extended meta data section.
+
+The request:
+
+* Must have extras
+* Must have key
+
+####Binary Implementation
+
+    Delete With Meta Binary Request
+
+    Byte/     0       |       1       |       2       |       3       |
+       /              |               |               |               |
+      |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+      +---------------+---------------+---------------+---------------+
+     0|       80      |       A8      |       00      |       05      |
+      +---------------+---------------+---------------+---------------+
+     4|       1A      |       00      |       00      |       03      |
+      +---------------+---------------+---------------+---------------+
+     8|       00      |       00      |       00      |       1F      |
+      +---------------+---------------+---------------+---------------+
+    12|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    16|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    20|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    24|       00      |       00      |       00      |       07      |
+      +---------------+---------------+---------------+---------------+
+    28|       00      |       00      |       00      |       0A      |
+      +---------------+---------------+---------------+---------------+
+    32|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    36|       00      |       00      |       00      |       14      |
+      +---------------+---------------+---------------+---------------+
+    40|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    44|       00      |       00      |       00      |       1E      |
+      +---------------+---------------+---------------+---------------+
+    48|       00      |       00      |       6D      |       79      |
+      +---------------+---------------+---------------+---------------+
+    52|       6B      |       65      |       79      |
+      +---------------+---------------+---------------+
+
+    DEL_WITH_META command
+    Field        (offset) (value)
+    Magic        (0)    : 0x80 (Request)
+    Opcode       (1)    : 0xA8 (Delete With Meta)
+    Key length   (2,3)  : 0x0005 (5)
+    Extra length (4)    : 0x1A (26)
+    Data type    (5)    : 0x00
+    VBucket      (6,7)  : 0x0003 (3)
+    Total body   (8-11) : 0x0000001F (31)
+    Opaque       (12-15): 0x00000000
+    CAS          (16-23): 0x0000000000000000
+    Extras              :
+      Flags      (24-27): 0x00000007 (7)
+      Expiration (28-31): 0x0000000A (10)
+      Seqno      (32-39): 0x0000000000000014 (20)
+      Cas        (40-47): 0x000000000000001E (30)
+      Meta Len   (48-49): 0x0000 (0)
+    Key          (50-54): mykey
+
+
+    Delete With Meta Binary Response
+
+    Byte/     0       |       1       |       2       |       3       |
+       /              |               |               |               |
+      |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+      +---------------+---------------+---------------+---------------+
+     0|       81      |       A8      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+     4|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+     8|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    12|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    16|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    20|       00      |       00      |       00      |       01      |
+      +---------------+---------------+---------------+---------------+
+
+    DEL_WITH_META command
+    Field        (offset) (value)
+    Magic        (0)    : 0x81 (Response)
+    Opcode       (1)    : 0xA8 (Delete With Meta)
+    Key length   (2,3)  : 0x0000
+    Extra length (4)    : 0x00
+    Data type    (5)    : 0x00
+    Status       (6,7)  : 0x0000 (0)
+    Total body   (8-11) : 0x00000000
+    Opaque       (12-15): 0x00000000
+    CAS          (16-23): 0x0000000000000001 (1)
+
+
+###Extended Meta Data Section
+
+The extended meta data section is used to send extra meta data for a particular mutation. This section should come at the very end, after the key. Its length should be set in the nmeta field. A length of 0 means that there is no extended meta data section.
+
+####Verison 1 (0x01)
+
+In this version the extended meta data section has the following format:
+
+    | version | id_1 | len_1 | field_1 | ... | id_n | len_n | field_n |
+
+Here,
+* version: 1B
+* id_n: 1B
+* len_n: 2B
+* field_n: "len_n"B
+
+**Meta Data IDs:**
+
+* 0x01 - adjusted time
+* 0x02 - conflict resolution mode
+
+
+###Errors
+
+**PROTOCOL_BINARY_RESPONSE_KEY_ENOENT (0x01)**
+
+If a the key does not exist.
+
+**PROTOCOL_BINARY_RESPONSE_EINVAL (0x04)**
+
+If data in this packet is malformed or incomplete then this error is returned.
+
+**PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET (0x07)**
+
+If the vbucket does not exist.
+
+**PROTOCOL_BINARY_RESPONSE_ENOMEM (0x82)**
+
+If the server is permanently out of memory
+
+**PROTOCOL_BINARY_RESPONSE_ETMPFAIL (0x86)**
+
+If the server is currently warming up or we are temporarily out of memory.
diff --git a/docs/protocol/get_meta.md b/docs/protocol/get_meta.md
new file mode 100644 (file)
index 0000000..3443ab6
--- /dev/null
@@ -0,0 +1,120 @@
+
+##Get Meta (getmeta) v4.0
+
+The get meta command is used to fetch the meta data for a key. Extras will contain 1 byte set to 0x01, indicating that extended metadata for the key will need to be sent in the response.
+
+The request:
+
+* Must have key
+* Can have extras (1B)
+
+####Binary Implementation
+
+    Get Meta Binary Request
+
+    Byte/     0       |       1       |       2       |       3       |
+       /              |               |               |               |
+      |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+      +---------------+---------------+---------------+---------------+
+     0|       80      |       A0      |       00      |       05      |
+      +---------------+---------------+---------------+---------------+
+     4|       01      |       00      |       00      |       03      |
+      +---------------+---------------+---------------+---------------+
+     8|       00      |       00      |       00      |       06      |
+      +---------------+---------------+---------------+---------------+
+    12|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    16|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    20|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    24|       01      |       6D      |       79      |       6B      |
+      +---------------+---------------+---------------+---------------+
+    28|       65      |       79      |
+      +---------------+---------------+
+
+    GET_META command
+    Field        (offset) (value)
+    Magic        (0)    : 0x80 (Request)
+    Opcode       (1)    : 0xA0 (Get Meta)
+    Key length   (2,3)  : 0x0005 (5)
+    Extra length (4)    : 0x01 (1)
+    Data type    (5)    : 0x00
+    VBucket      (6,7)  : 0x0003 (3)
+    Total body   (8-11) : 0x00000006 (6)
+    Opaque       (12-15): 0x00000000
+    CAS          (16-23): 0x0000000000000000
+    Extras              :
+      ReqExtMeta (24)   : 0x01 (1)
+    Key          (25-29): mykey
+
+    Get Meta Binary Response
+
+    Byte/     0       |       1       |       2       |       3       |
+       /              |               |               |               |
+      |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+      +---------------+---------------+---------------+---------------+
+     0|       81      |       A0      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+     4|       15      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+     8|       00      |       00      |       00      |       15      |
+      +---------------+---------------+---------------+---------------+
+    12|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    16|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    20|       00      |       00      |       00      |       01      |
+      +---------------+---------------+---------------+---------------+
+    24|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    28|       00      |       00      |       00      |       01      |
+      +---------------+---------------+---------------+---------------+
+    32|       00      |       00      |       00      |       07      |
+      +---------------+---------------+---------------+---------------+
+    36|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    40|       00      |       00      |       00      |       09      |
+      +---------------+---------------+---------------+---------------+
+    44|       01      |
+      +---------------+
+
+    GET_META command
+    Field        (offset) (value)
+    Magic        (0)    : 0x81 (Response)
+    Opcode       (1)    : 0xA0 (Get Meta)
+    Key length   (2,3)  : 0x0000
+    Extra length (4)    : 0x15 (21)
+    Data type    (5)    : 0x00
+    Status       (6,7)  : 0x0000 (0)
+    Total body   (8-11) : 0x00000015 (21)
+    Opaque       (12-15): 0x00000000
+    CAS          (16-23): 0x0000000000000001 (1)
+    Extras              :
+      Deleted    (24-27): 0x00000000 (0)
+      Flags      (28-31): 0x00000001 (1)
+      Exptime    (32-35): 0x00000007 (7)
+      Seqno      (36-43): 0x0000000000000009 (9)
+      ConfRes    (44)   : 0x01 (1)
+
+###Extended Meta Data Section
+
+The extras section in the response packet will contain 1 extra byte indicating the conflict resolution mode that the item is eligible for. This 1 byte of extra meta information will be sent as part of the response only if the ReqExtMeta flag (set to 0x01) is sent in the request as part of the extras section.
+
+####ReqExtMeta (0x01)
+
+ReqExtMeta: 0x01 in the request's extras' section, will ensure that an extra byte containing the conflict resolution mode will be sent along with the rest of the metadata in the extras section of the response packet.
+
+###Errors
+
+**PROTOCOL_BINARY_RESPONSE_KEY_ENOENT (0x01)**
+
+If a the key does not exist.
+
+**PROTOCOL_BINARY_RESPONSE_EINVAL (0x04)**
+
+If data in this packet is malformed or incomplete then this error is returned.
+
+**PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET (0x07)**
+
+If the vbucket does not exist.
diff --git a/docs/protocol/set_with_meta.md b/docs/protocol/set_with_meta.md
new file mode 100644 (file)
index 0000000..68cafc4
--- /dev/null
@@ -0,0 +1,147 @@
+
+##Set With Meta (setwithmeta) v4.0
+
+The set with meta command is used to set data and metadata for a key. Meta data passed is cas, sequence number, flags and expiration along with an extended meta data section.
+
+The request:
+
+* Must have extras
+* Must have key
+* Must have value
+
+####Binary Implementation
+
+    Set With Meta Binary Request
+
+    Byte/     0       |       1       |       2       |       3       |
+       /              |               |               |               |
+      |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+      +---------------+---------------+---------------+---------------+
+     0|       80      |       A2      |       00      |       05      |
+      +---------------+---------------+---------------+---------------+
+     4|       1A      |       00      |       00      |       03      |
+      +---------------+---------------+---------------+---------------+
+     8|       00      |       00      |       00      |       26      |
+      +---------------+---------------+---------------+---------------+
+    12|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    16|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    20|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    24|       00      |       00      |       00      |       07      |
+      +---------------+---------------+---------------+---------------+
+    28|       00      |       00      |       00      |       0A      |
+      +---------------+---------------+---------------+---------------+
+    32|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    36|       00      |       00      |       00      |       14      |
+      +---------------+---------------+---------------+---------------+
+    40|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    44|       00      |       00      |       00      |       1E      |
+      +---------------+---------------+---------------+---------------+
+    48|       00      |       00      |       6D      |       79      |
+      +---------------+---------------+---------------+---------------+
+    52|       6B      |       65      |       79      |       6D      |
+      +---------------+---------------+---------------+---------------+
+    56|       79      |       76      |       61      |       6C      |
+      +---------------+---------------+---------------+---------------+
+    60|       75      |       65      |
+      +---------------+---------------+
+
+    SET_WITH_META command
+    Field        (offset) (value)
+    Magic        (0)    : 0x80 (Request)
+    Opcode       (1)    : 0xA2 (setwithmeta)
+    Key length   (2,3)  : 0x0005 (5)
+    Extra length (4)    : 0x1A (26)
+    Data type    (5)    : 0x00
+    VBucket      (6,7)  : 0x0003 (3)
+    Total body   (8-11) : 0x00000026 (38)
+    Opaque       (12-15): 0x00000000
+    CAS          (16-23): 0x0000000000000000
+    Extras              :
+      Flags      (24-27): 0x00000007 (7)
+      Expiration (28-31): 0x0000000A (10)
+      Seqno      (32-39): 0x0000000000000014 (20)
+      Cas        (40-47): 0x000000000000001E (30)
+      Meta Len   (48-49): 0x0000 (0)
+    Key          (50-54): mykey
+    Value        (55-61): myvalue
+
+
+    Set With Meta Binary Response
+
+    Byte/     0       |       1       |       2       |       3       |
+       /              |               |               |               |
+      |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+      +---------------+---------------+---------------+---------------+
+     0|       81      |       A2      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+     4|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+     8|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    12|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    16|       00      |       00      |       00      |       00      |
+      +---------------+---------------+---------------+---------------+
+    20|       00      |       00      |       00      |       01      |
+      +---------------+---------------+---------------+---------------+
+
+    SET_WITH_META command
+    Field        (offset) (value)
+    Magic        (0)    : 0x81 (Response)
+    Opcode       (1)    : 0xA2 (setwithmeta)
+    Key length   (2,3)  : 0x0000
+    Extra length (4)    : 0x00
+    Data type    (5)    : 0x00
+    Status       (6,7)  : 0x0000 (0)
+    Total body   (8-11) : 0x00000000
+    Opaque       (12-15): 0x00000000
+    CAS          (16-23): 0x0000000000000001 (1)
+
+###Extended Meta Data Section
+
+The extended meta data section is used to send extra meta data for a particular mutation. This section should come at the very end, after the value. Its length should be set in the nmeta field. A length of 0 means that there is no extended meta data section.
+
+####Verison 1 (0x01)
+
+In this version the extended meta data section has the following format:
+
+    | version | id_1 | len_1 | field_1 | ... | id_n | len_n | field_n |
+
+Here,
+* version: 1B
+* id_n: 1B
+* len_n: 2B
+* field_n: "len_n"B
+
+**Meta Data IDs:**
+
+* 0x01 - adjusted time
+* 0x02 - conflict resolution mode
+
+
+###Errors
+
+**PROTOCOL_BINARY_RESPONSE_KEY_ENOENT (0x01)**
+
+If a the key does not exist.
+
+**PROTOCOL_BINARY_RESPONSE_EINVAL (0x04)**
+
+If data in this packet is malformed or incomplete then this error is returned.
+
+**PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET (0x07)**
+
+If the vbucket does not exist.
+
+**PROTOCOL_BINARY_RESPONSE_ENOMEM (0x82)**
+
+If the server is permenently out of memory
+
+**PROTOCOL_BINARY_RESPONSE_ETMPFAIL (0x86)**
+
+If the server is currently warming up or we are temporarily out of memory.
index 2aa7f09..56ef5c6 100644 (file)
@@ -16,8 +16,8 @@
 
 * Getting Started
 
-For introductory information on stats within membase, start with the
-[[http://wiki.membase.org/display/membase/Membase%2BStatistics][membase wiki stats page]].
+For introductory information on stats within Couchbase, start with the
+[[http://docs.couchbase.com/][Couchbase server documentations]].
 
 * Stats Definitions
 
@@ -159,10 +159,6 @@ For introductory information on stats within membase, start with the
 |                                    | exception happened during runtime      |
 | ep_tap_keepalive                   | Tap keepalive time                     |
 | ep_dbname                          | DB path                                |
-| ep_io_num_read                     | Number of io read operations           |
-| ep_io_num_write                    | Number of io write operations          |
-| ep_io_read_bytes                   | Number of bytes read (key + values)    |
-| ep_io_write_bytes                  | Number of bytes written (key + values) |
 | ep_pending_ops                     | Number of ops awaiting pending         |
 |                                    | vbuckets                               |
 | ep_pending_ops_total               | Total blocked pending ops since reset  |
@@ -171,7 +167,7 @@ For introductory information on stats within membase, start with the
 | ep_pending_ops_max_duration        | Max time (µs) used waiting on pending  |
 |                                    | vbuckets                               |
 | ep_bg_num_samples                  | The number of samples included in the  |
-|                                    | avgerage                               |
+|                                    | average                                |
 | ep_bg_min_wait                     | The shortest time (µs) in the wait     |
 |                                    | queue                                  |
 | ep_bg_max_wait                     | The longest time (µs) in the wait      |
@@ -203,6 +199,16 @@ For introductory information on stats within membase, start with the
 |                                    | it is made to back off.                |
 | ep_bg_fetch_delay                  | The amount of time to wait before      |
 |                                    | doing a background fetch               |
+| ep_bfilter_enabled                 | Bloom filter use: enabled or disabled  |
+| ep_bfilter_key_count               | Minimum key count that bloom filter    |
+|                                    | will accomodate                        |
+| ep_bfilter_fp_prob                 | Bloom filter's allowed false positive  |
+|                                    | probability                            |
+| ep_bfilter_residency_threshold     | Resident ratio threshold for full      |
+|                                    | eviction policy, after which bloom     |
+|                                    | switches modes from accounting just    |
+|                                    | non resident items and deletes to      |
+|                                    | accounting all items                   |
 | ep_chk_max_items                   | The number of items allowed in a       |
 |                                    | checkpoint before a new one is created |
 | ep_chk_period                      | The maximum lifetime of a checkpoint   |
@@ -309,6 +315,13 @@ For introductory information on stats within membase, start with the
 | ep_warmup_time                     | The amount of time warmup took         |
 | ep_workload_pattern                | Workload pattern (mixed, read_heavy,   |
 |                                    | write_heavy) monitored at runtime      |
+| ep_defragmenter_interval           | How often defragmenter task should be  |
+|                                    | run (in seconds).                      |
+| ep_defragmenter_num_moved          | Number of items moved by the           |
+|                                    | defragmentater task.                   |
+| ep_defragmenter_num_visited        | Number of items visited (considered    |
+|                                    | for defragmentation) by the            |
+|                                    | defragmenter task.                     |
 
 
 ** vBucket total stats
@@ -610,6 +623,7 @@ another colon.  For example, if your client is named, =slave1=, the
 | supports_ack       | True if the connection use flow control               |
 | total_acked_bytes  | The amount of bytes that the consumer has acked       |
 | type               | The connection type (producer, consumer, or notifier) |
+| max_buffer_bytes   | Size of flow control buffer                           |
 
 ****Per Stream Stats
 
@@ -628,49 +642,59 @@ another colon.  For example, if your client is named, =slave1=, the
 
 ***Producer/Notifier Connections
 
-| bytes_sent         | The amount of unacked bytes sent to the consumer       |
-| connected          | True if this client is connected                       |
-| created            | Creation time for the tap connection                   |
-| flow_control       | True if the connection use flow control                |
-| items_remaining    | The amount of items remaining to be sent               |
-| items_sent         | The amount of items already sent to the consumer       |
-| last_sent_time     | The last time this connection sent a message           |
-| max_buffer_bytes   | The maximum amount of bytes that can be sent without   |
-|                    | receiving an ack from the consumer                     |
-| noop_enabled       | Whether or not this connection sends noops             |
-| noop_wait          | Whether or not this connection is waiting for a        |
-|                    | noop response from the consumer                        |
-| pending_disconnect | True if we're hanging up on this client                |
-| reserved           | True if the dcp stream is reserved                     |
-| supports_ack       | True if the connection use flow control                |
-| total_acked_bytes  | The amount of bytes that have been acked by the        |
-|                    | consumer when flow control is enabled                  |
-| total_bytes_sent   | The amount of bytes already sent to the consumer       |
-| type               | The connection type (producer, consumer, or notifier)  |
-| unacked_bytes      | The amount of bytes the consumer has no acked          |
+| buf_backfill_bytes    | The amount of bytes backfilled but not sent            |
+| buf_backfill_items    | The amount of items backfilled but not sent            |
+| bytes_sent            | The amount of unacked bytes sent to the consumer       |
+| connected             | True if this client is connected                       |
+| created               | Creation time for the tap connection                   |
+| flow_control          | True if the connection use flow control                |
+| items_remaining       | The amount of items remaining to be sent               |
+| items_sent            | The amount of items already sent to the consumer       |
+| last_sent_time        | The last time this connection sent a message           |
+| max_buffer_bytes      | The maximum amount of bytes that can be sent without   |
+|                       | receiving an ack from the consumer                     |
+| noop_enabled          | Whether or not this connection sends noops             |
+| noop_wait             | Whether or not this connection is waiting for a        |
+|                       | noop response from the consumer                        |
+| pending_disconnect    | True if we're hanging up on this client                |
+| priority              | The connection priority for streaming data             |
+| reserved              | True if the dcp stream is reserved                     |
+| supports_ack          | True if the connection use flow control                |
+| total_acked_bytes     | The amount of bytes that have been acked by the        |
+|                       | consumer when flow control is enabled                  |
+| total_bytes_sent      | The amount of bytes already sent to the consumer       |
+| type                  | The connection type (producer, consumer, or notifier)  |
+| unacked_bytes         | The amount of bytes the consumer has no acked          |
+| backfill_num_active   | Number of active (running) backfills                   |
+| backfill_num_snoozing | Number of snoozing (running) backfills                 |
+| backfill_num_pending  | Number of pending (not running) backfills              |
 
 ****Per Stream Stats
 
-| backfilled         | The amount of items sent from disk                     |
-| end_seqno          | The seqno send mutations up to                         |
-| flags              | The flags supplied in the stream request               |
-| items_ready        | Whether the stream has items ready to send             |
-| last_sent_seqno    | The last seqno sent by this stream                     |
-| last_read_seqno    | The last seqno read by this stream from disk or memory |
-| ready_queue_memory | Memory occupied by elements in the DCP readyQ          |
-| memory             | The amount of items sent from memory                   |
-| opaque             | The unique stream identifier                           |
-| snap_end_seqno     | The last snapshot end seqno (Used if a consumer is     |
-|                    | resuming a stream)                                     |
-| snap_start_seqno   | The last snapshot start seqno (Used if a consumer is   |
-|                    | resuming a stream)                                     |
-| start_seqno        | The seqno to start sending mutations from              |
-| state              | The stream state (pending, backfilling, in-memory,     |
-|                    | takeover-send, takeover-wait, or dead)                 |
-| vb_uuid            | The vb uuid used in the stream request                 |
-| cur_snapshot_type  | The type of the current snapshot being received        |
-| cur_snapshot_start | The start seqno of the current snapshot being received |
-| cur_snapshot_end   | The end seqno of the current snapshot being received   |
+| backfill_disk_items      | The amount of items read during backfill from disk    |
+| backfill_mem_items       | The amount of items read during backfill from memory  |
+| backfill_sent            | The amount of items sent to the consumer during the   |
+| end_seqno                | The seqno send mutations up to                        |
+| flags                    | The flags supplied in the stream request              |
+| items_ready              | Whether the stream has items ready to send            |
+| last_sent_seqno          | The last seqno sent by this stream                    |
+| last_sent_snap_end_seqno | The last snapshot end seqno sent by active stream     |
+| last_read_seqno          | The last seqno read by this stream from disk or memory|
+| ready_queue_memory       | Memory occupied by elements in the DCP readyQ         |
+| memory_phase             | The amount of items sent during the memory phase      |
+| opaque                   | The unique stream identifier                          |
+| snap_end_seqno           | The last snapshot end seqno (Used if a consumer is    |
+|                          | resuming a stream)                                    |
+| snap_start_seqno         | The last snapshot start seqno (Used if a consumer is  |
+|                          | resuming a stream)                                    |
+| start_seqno              | The seqno to start sending mutations from             |
+| state                    | The stream state (pending, backfilling, in-memory,    |
+|                          | takeover-send, takeover-wait, or dead)                |
+| vb_uuid                  | The vb uuid used in the stream request                |
+| cur_snapshot_type        | The type of the current snapshot being received       |
+| cur_snapshot_start       | The start seqno of the current snapshot being         |
+|                          | received                                              |
+| cur_snapshot_end         | The end seqno of the current snapshot being received  |
 
 ** Dcp Aggregated Stats
 
@@ -691,7 +715,11 @@ set for =replication=.
 |                             | prefix                                       |
 | [prefix]:total_bytes        | Total number of bytes sent with this prefix  |
 | [prefix]:total_backlog_size | Total backfill items remaining to be sent    |
-|                             | with this prefix
+|                             | with this prefix                             |
+| ep_dcp_num_running_backfills| Total number of running backfills across all |
+|                             | dcp connections                              |
+| ep_dcp_max_running_backfills| Max running backfills we can have across all |
+|                             | dcp connections                              |
 
 ** Timing Stats
 
@@ -1032,6 +1060,10 @@ The following stats are available for the CouchStore database engine:
 | failure_get       | Number of failed get operation                     |
 | failure_vbset     | Number of failed vbucket set operation             |
 | save_documents    | Time spent in CouchStore save documents operation  |
+| io_num_read       | Number of io read operations                       |
+| io_num_write      | Number of io write operations                      |
+| io_read_bytes     | Number of bytes read (key + values)                |
+| io_write_bytes    | Number of bytes written (key + values)             |
 
 ** KV Store Timing Stats
 
diff --git a/management/cbadm-tap-registration b/management/cbadm-tap-registration
deleted file mode 100755 (executable)
index b666e53..0000000
+++ /dev/null
@@ -1,126 +0,0 @@
-#!/usr/bin/env python
-import os
-import sys
-import getopt
-import exceptions
-import socket
-import struct
-import mc_bin_client
-import memcacheConstants
-
-DEFAULT_PORT = "11210"
-DEFAULT_HOST_PORT = ["127.0.0.1", DEFAULT_PORT]
-
-def usage(err=0):
-    print >> sys.stderr, """
-
-Usage: %s [-h %s[:%s]] [-r|d tap_name] [-l last_closed_checkpoint_id] [-c][-b]
-
-""" % (os.path.basename(sys.argv[0]),
-       DEFAULT_HOST_PORT[0], DEFAULT_HOST_PORT[1])
-    sys.exit(err)
-
-def parse_args(args):
-    host_port = DEFAULT_HOST_PORT
-    tap_name = ''
-    is_registration = True
-    # By default, the TAP client receives mutations from the open checkpoint as well.
-    closed_checkpoint_only = 0x00
-    last_closed_checkpoint_id = -1
-    # By default, we disable backfill for the registered TAP client.
-    enable_backfill = False
-
-    try:
-        opts, args = getopt.getopt(args, 'h:r:d:l:cb', ['help'])
-    except getopt.GetoptError, e:
-        usage(e.msg)
-
-    for (o, a) in opts:
-        if o == '--help':
-            usage()
-        elif o == '-h':
-            host_port = a.split(':')
-            if len(host_port) < 2:
-                host_port = [a, DEFAULT_PORT]
-        elif o == '-r':
-            tap_name = a
-        elif o == '-d':
-            tap_name = a
-            is_registration = False
-        elif o == '-c':
-            closed_checkpoint_only = 0x01
-        elif o == '-b':
-            enable_backfill = True # Do backfill if required.
-        elif o == '-l':
-            last_closed_checkpoint_id = a
-        else:
-            usage("unknown option - " + o)
-
-    if len(tap_name) == 0:
-        usage("missing name argument, which is the registered client name")
-    return host_port, tap_name, is_registration, closed_checkpoint_only, \
-           last_closed_checkpoint_id, enable_backfill
-
-def readTap(mc):
-    ext = ''
-    key = ''
-    val = ''
-    cmd, vbucketId, opaque, cas, keylen, extlen, data = mc._recvMsg()
-    if data:
-        ext = data[0:extlen]
-        key = data[extlen:extlen+keylen]
-        val = data[extlen+keylen:]
-    return cmd, opaque, cas, vbucketId, key, ext, val
-
-def encodeTAPConnectOpts(opts):
-    header = 0
-    val = []
-    for op in sorted(opts.keys()):
-        header |= op
-        if op in memcacheConstants.TAP_FLAG_TYPES:
-            val.append(struct.pack(memcacheConstants.TAP_FLAG_TYPES[op],
-                                   opts[op]))
-        elif op == memcacheConstants.TAP_FLAG_CHECKPOINT:
-            if opts[op][2] >= 0:
-                val.append(struct.pack(">HHQ", opts[op][0], opts[op][1], opts[op][2]))
-        else:
-            val.append(opts[op])
-    return struct.pack(">I", header), ''.join(val)
-
-if __name__ == '__main__':
-    global mc
-    host_port, tap_name, is_registration, \
-        closed_checkpoint_only, last_closed_checkpoint_id, enable_backfill = parse_args(sys.argv[1:])
-
-    try:
-        mc = mc_bin_client.MemcachedClient(host_port[0], int(host_port[1]))
-
-        if is_registration:
-            backfill_age = 0xffffffff # Disable backfill by default.
-            if enable_backfill == True:
-                backfill_age = 0x00000000
-            ext, val = encodeTAPConnectOpts({
-            ## The three args for TAP_FLAG_CHECKPOINT represents the number of vbuckets,
-            ## the list of vbucket ids, and their last closed checkpoint ids. At this time,
-            ## we only support a single vbucket 0.
-            memcacheConstants.TAP_FLAG_CHECKPOINT: (1, 0, int(last_closed_checkpoint_id)),
-            memcacheConstants.TAP_FLAG_SUPPORT_ACK: '',
-            memcacheConstants.TAP_FLAG_REGISTERED_CLIENT: closed_checkpoint_only,
-            memcacheConstants.TAP_FLAG_BACKFILL: backfill_age
-            })
-            mc._sendCmd(memcacheConstants.CMD_TAP_CONNECT, tap_name, val, 0, ext)
-            cmd, opaque, cas, vbucketId, key, ext, val = readTap(mc)
-            if cmd == memcacheConstants.CMD_TAP_OPAQUE:
-                sys.exit(0);
-            sys.exit("ERROR: could not register name: " + tap_name)
-        else:
-            mc.deregister_tap_client(tap_name)
-    except mc_bin_client.MemcachedError as ne:
-        sys.exit("ERROR: " + str(ne))
-    except socket.error:
-        sys.exit("ERROR: Connection to server on %s:%s failed" %(host_port[0],
-            host_port[1]))
-
-    finally:
-        if mc:
-           mc.close()
old mode 100644 (file)
new mode 100755 (executable)
index 7f55310..21207e3
@@ -15,7 +15,13 @@ def cmd(f):
 
     def g(*args, **kwargs):
         mc = args[0]
-        vbucket = int(args[1])
+
+        try:
+            vbucket = int(args[1])
+        except IndexError:
+            print "Must specify a vbucket id after the compact argument"
+            sys.exit(1)
+
         n = f.func_code.co_argcount
         if len(args) > n:
             print "Too many args, given %s, but expected a maximum of %s"\
@@ -44,7 +50,9 @@ def compact(mc, vbucket, purgeBeforeTs, purgeBeforeSeq, dropDeletes):
         return mc.compact_db(vbucket, purgeBeforeTs, purgeBeforeSeq,
                              dropDeletes)
      except:
-         print "Unable to compact '%d %d %d %d' in requested engine."\
+         print "Unable to compact vbucket %d with the following parameters "\
+               "(purge before time: %d, purge before seqno: %d, drop deletes: "\
+               "%d) in requested engine."\
              % (vbucket, purgeBeforeTs, purgeBeforeSeq, dropDeletes)
 
 def main():
@@ -52,7 +60,7 @@ def main():
 
     c.addCommand('compact', compact, 'compact vbucketid')
     c.addOption('-b', 'bucketName',
-                'the bucket to get stats from (Default: default)')
+                'the bucket to perform compaction on (Default: default)')
     c.addOption('-p', 'password', 'the password for the bucket if one exists')
     c.addOption('--purge-before', 'purgeBeforeTs',
                 'purge documents before this timestamp')
index 7c615b1..2e1a53f 100755 (executable)
@@ -4,6 +4,7 @@ import time
 import sys
 
 import clitool
+import exceptions
 import mc_bin_client
 import memcacheConstants
 import sys
@@ -48,6 +49,8 @@ def set_param(mc, type, key, val):
         engine_param = memcacheConstants.ENGINE_PARAM_FLUSH
     elif type == 'tap_param':
         engine_param = memcacheConstants.ENGINE_PARAM_TAP
+    elif type == 'dcp_param':
+        engine_param = memcacheConstants.ENGINE_PARAM_DCP
     else:
         print 'Error: Bad parameter %s' % type
 
@@ -71,6 +74,8 @@ def set_param(mc, type, key, val):
         print 'set %s to %s' %(key, val)
     except mc_bin_client.MemcachedError, error:
         print 'Error: %s' % error.msg
+    except mc_bin_client.TimeoutError, error:
+        print error
     except Exception, e:
         print 'Generic error (%s)' % e
 
@@ -96,6 +101,8 @@ def stop(mc):
         print 'Persistence stopped'
     except mc_bin_client.MemcachedError, error:
         print 'Error: %s' % error.msg
+    except mc_bin_client.TimeoutError, error:
+        print error
     except Exception, e:
         print 'Generic error (%s)' % e
 
@@ -106,6 +113,8 @@ def start(mc):
         print 'Persistence started'
     except mc_bin_client.MemcachedError, error:
         print 'Error: %s' % error.msg
+    except mc_bin_client.TimeoutError, error:
+        print error
     except Exception, e:
         print 'Generic error (%s)' % e
 
@@ -123,6 +132,8 @@ def drain(mc):
         print 'Write queues drained'
     except mc_bin_client.MemcachedError, error:
         print 'Error: %s' % error.msg
+    except mc_bin_client.TimeoutError, error:
+        print error
     except Exception, e:
         print 'Generic error (%s)' % e
 
@@ -159,19 +170,36 @@ Available params for "set":
                                    before backfill task is made to back off.
     bg_fetch_delay               - Delay before executing a bg fetch (test
                                    feature).
+    bfilter_enabled              - Enable or disable bloom filters (true/false)
+    bfilter_residency_threshold  - Resident ratio threshold below which all items
+                                   will be considered in the bloom filters in full
+                                   eviction policy (0.0 - 1.0)
     compaction_exp_mem_threshold - Memory threshold (%) on the current bucket quota
                                    after which compaction will not queue expired
                                    items for deletion.
     compaction_write_queue_cap   - Disk write queue threshold after which compaction
                                    tasks will be made to snooze, if there are already
                                    pending compaction tasks.
-    exp_pager_stime              - Expiry Pager Sleeptime.
+    defragmenter_enabled         - Enable or disable the defragmenter
+                                   (true/false).
+    defragmenter_interval        - How often defragmenter task should be run
+                                   (in seconds).
+    defragmenter_age_threshold   - How old (measured in number of defragmenter
+                                   passes) must a document be to be considered
+                                   for degragmentation.
+    defragmenter_chunk_duration  - Maximum time (in ms) defragmentation task
+                                   will run for before being paused (and
+                                   resumed at the next defragmenter_interval).
+    exp_pager_stime              - Expiry Pager Sleeptime (if value is 0, expiry_pager
+                                   will be disabled)
     flushall_enabled             - Enable flush operation.
     pager_active_vb_pcnt         - Percentage of active vbuckets items among
                                    all ejected items by item pager.
     max_size                     - Max memory used by the server.
-    mem_high_wat                 - High water mark.
-    mem_low_wat                  - Low water mark.
+    mem_high_wat                 - High water mark (suffix with '%' to make it a
+                                   percentage of the RAM quota)
+    mem_low_wat                  - Low water mark. (suffix with '%' to make it a
+                                   percentage of the RAM quota)
     mutation_mem_threshold       - Memory threshold (%) on the current bucket quota
                                    for accepting a new mutation.
     timing_log                   - path to log detailed timing stats.
@@ -196,6 +224,15 @@ Available params for "set":
                                    which we throttle tap input
     tap_throttle_threshold       - Percentage of memory in use to throttle tap
                                    streams.
+  Available params for "set dcp_param":
+    dcp_consumer_process_buffered_messages_yield_limit - The threshold at which
+                                                         the Consumer will yield
+                                                         when processing items.
+
+    dcp_consumer_process_buffered_messages_batch_size - The number of items the
+                                                        DCP processor will consume
+                                                        in a single batch.
+
     """)
 
     c.addCommand('drain', drain, "drain")
index 2ce5aab..e5accee 100755 (executable)
@@ -6,7 +6,14 @@ import math
 import itertools
 import mc_bin_client
 import re
-import simplejson as json
+
+try:
+    import simplejson as json
+except ImportError:
+    try:
+        import json
+    except ImportError:
+        sys.exit("Error: could not import json module")
 
 MAGIC_CONVERT_RE=re.compile("(\d+)")
 
index feda7c7..e006bd6 100644 (file)
@@ -74,6 +74,7 @@ class CliTool(object):
     def usage(self, skipOptions=False):
         program=os.path.basename(sys.argv[0])
         print "Usage: %s host:dataport command [options]" % program
+        print "       Note that the default dataport is 11210"
         print "\nOptions:"
         for o in self.flags.keys():
             print >>sys.stderr," %s\t%s"%(o, self.flags[o])
index 62fb07e..3081009 100644 (file)
@@ -21,6 +21,20 @@ from memcacheConstants import TOUCH_PKT_FMT, GAT_PKT_FMT, GETL_PKT_FMT
 from memcacheConstants import COMPACT_DB_PKT_FMT
 import memcacheConstants
 
+class TimeoutError(exceptions.Exception):
+    def __init__(self, time):
+        exceptions.Exception.__init__(self, "Operation timed out")
+        self.time = time
+
+    def __str__(self):
+        return self.__repr__()
+
+    def __repr__(self):
+        str = 'Error: Operation timed out (%d seconds)\n' % self.time
+        str += 'Please check list of arguments (e.g., IP address, port number) '
+        str += 'passed or the connectivity to a server to be connected'
+        return str
+
 class MemcachedError(exceptions.Exception):
     """Error raised when a command fails."""
 
@@ -73,7 +87,7 @@ class MemcachedClient(object):
         ready = select.select([self.s], [], [], 30)
         if ready[0]:
             return self.s.recv(amount)
-        raise exceptions.EOFError("Operation timed out")
+        raise TimeoutError(30)
 
     def _recvMsg(self):
         response = ""
index 7364f2b..527e464 100644 (file)
@@ -105,6 +105,7 @@ VB_STATE_NAMES={'active': VB_STATE_ACTIVE,
 ENGINE_PARAM_FLUSH      = 1
 ENGINE_PARAM_TAP        = 2
 ENGINE_PARAM_CHECKPOINT = 3
+ENGINE_PARAM_DCP        = 4
 
 
 COMMAND_NAMES = dict(((globals()[k], k) for k in globals() if k.startswith("CMD_")))
index ffcc882..6b910b1 100644 (file)
 #include "atomic.h"
 
 SpinLock::SpinLock()
-#ifndef USE_CXX11_ATOMICS
-    : lock(0)
-#endif
 {
-#ifdef USE_CXX11_ATOMICS
     lock.clear();
-#endif
 }
 
-SpinLock::~SpinLock() {
-#ifndef USE_CXX11_ATOMICS
-    cb_assert(lock == 0);
-#endif
-}
+SpinLock::~SpinLock() {}
 
 bool SpinLock::tryAcquire(void) {
-#ifdef USE_CXX11_ATOMICS
     return !lock.test_and_set(std::memory_order_acquire);
-#else
-    return ep_sync_lock_test_and_set(&lock, 1) == 0;
-#endif
 }
 
 
@@ -55,9 +42,5 @@ void SpinLock::acquire(void) {
 }
 
 void SpinLock::release(void) {
-#ifdef USE_CXX11_ATOMICS
     lock.clear(std::memory_order_release);
-#else
-    ep_sync_lock_release(&lock);
-#endif
 }
index 104846d..e309bc2 100644 (file)
 
 #include "config.h"
 
-#include <algorithm>
-#include <queue>
-#include <vector>
-
-#ifdef USE_CXX11_ATOMICS
 #include <atomic>
-#include <thread>
+
 #define AtomicValue std::atomic
-using std::memory_order;
-using std::memory_order_relaxed;
-using std::memory_order_consume;
-using std::memory_order_acquire;
-using std::memory_order_release;
-using std::memory_order_acq_rel;
-using std::memory_order_seq_cst;
-#else
-#define AtomicValue CouchbaseAtomic
-enum memory_order {
-    memory_order_relaxed,
-    memory_order_consume,
-    memory_order_acquire,
-    memory_order_release,
-    memory_order_acq_rel,
-    memory_order_seq_cst
-};
-#endif
-
-#if defined(HAVE_GCC_ATOMICS)
-#include "atomic/gcc_atomics.h"
-#elif defined(HAVE_ATOMIC_H)
-#include "atomic/libatomic.h"
-#elif _MSC_VER
-#define ep_sync_synchronize() MemoryBarrier()
-#else
-#error "Don't know how to use atomics on your target system!"
-#endif
 
 #include "callbacks.h"
 #include "locks.h"
 
-#ifndef _MSC_VER
-/**
- * Holder of atomic values.
- */
-template <typename T>
-class CouchbaseAtomic {
-public:
-
-    CouchbaseAtomic(const T &initial = (T)0) {
-        store(initial);
-    }
-
-    ~CouchbaseAtomic() {}
-
-    T load(memory_order sync = memory_order_acq_rel) const {
-        (void) sync;
-        return ep_sync_fetch_and_add(const_cast<T*>(&value), T(0));
-    }
-
-    void store(const T &newValue, memory_order sync = memory_order_acq_rel) {
-        (void) sync;
-        ep_sync_lock_test_and_set(&value, newValue);
-    }
-
-
-    bool compare_exchange_strong(T& expected, T val,
-                                 memory_order sync = memory_order_acq_rel)  {
-        (void) sync;
-        if (ep_sync_bool_compare_and_swap(&value, expected, val)) {
-            return true;
-        } else {
-            expected = load();
-            return false;
-        }
-    }
-
-    operator T() const {
-        return load();
-    }
-
-    void operator =(const T &newValue) {
-        store(newValue);
-    }
-
-    T operator ++() { // prefix
-        return ep_sync_add_and_fetch(&value, 1);
-    }
-
-    T operator ++(int) { // postfix
-        return ep_sync_fetch_and_add(&value, 1);
-    }
-
-    T operator --() { // prefix
-        return ep_sync_add_and_fetch(&value, -1);
-    }
-
-    T operator --(int) { // postfix
-        return ep_sync_fetch_and_add(&value, -1);
-    }
-
-    T fetch_add(const T &increment, memory_order sync = memory_order_acq_rel) {
-        // Returns the old value
-        (void) sync;
-        return ep_sync_fetch_and_add(&value, increment);
-    }
-
-    T fetch_sub(const T &decrement, memory_order sync = memory_order_acq_rel) {
-        (void) sync;
-        return ep_sync_add_and_fetch(&value, -(signed)decrement);
-    }
-
-    T exchange(const T &newValue) {
-        T rv;
-        while (true) {
-            rv = load();
-            if (compare_exchange_strong(rv, newValue)) {
-                break;
-            }
-        }
-        return rv;
-    }
-
-    T swapIfNot(const T &badValue, const T &newValue) {
-        T oldValue;
-        while (true) {
-            oldValue = load();
-            if (oldValue != badValue) {
-                if (compare_exchange_strong(oldValue, newValue)) {
-                    break;
-                }
-            } else {
-                break;
-            }
-        }
-        return oldValue;
-    }
-
-private:
-
-    volatile T value;
-};
-
-template <>
-class CouchbaseAtomic<double> {
-    typedef union doubleIntStore {
-        double d;
-        uint64_t i;
-    } DoubleIntStore;
-
-public:
-    CouchbaseAtomic(const double& initial = 0) {
-        store(initial);
-    }
-
-    double load(memory_order sync = memory_order_acq_rel) const {
-        (void) sync;
-        DoubleIntStore rv;
-        rv.i = ep_sync_fetch_and_add((uint64_t*)&value.i, 0);
-        return rv.d;
-    }
-
-    void store(const double& newValue, memory_order sync = memory_order_acq_rel) {
-        (void) sync;
-        DoubleIntStore input;
-        input.d = newValue;
-        (void)ep_sync_lock_test_and_set(&value.i, input.i);
-    }
-
-    operator double() const {
-        return load();
-    }
-
-private:
-    volatile DoubleIntStore value;
-};
-
-#endif
-
 template <typename T>
 void atomic_setIfBigger(AtomicValue<T> &obj, const T &newValue) {
     T oldValue = obj.load();
@@ -220,6 +49,22 @@ void atomic_setIfLess(AtomicValue<T> &obj, const T &newValue) {
     }
 }
 
+template <typename T>
+T atomic_swapIfNot(AtomicValue<T> &obj, const T &badValue, const T &newValue) {
+    T oldValue;
+    while (true) {
+        oldValue = obj.load();
+        if (oldValue != badValue) {
+            if (obj.compare_exchange_strong(oldValue, newValue)) {
+                break;
+            }
+        } else {
+            break;
+        }
+    }
+    return oldValue;
+}
+
 /**
  * Atomic pointer.
  *
@@ -267,11 +112,7 @@ public:
 private:
     bool tryAcquire(void);
 
-#ifdef USE_CXX11_ATOMICS
     std::atomic_flag lock;
-#else
-    volatile int lock;
-#endif
     DISALLOW_COPY_AND_ASSIGN(SpinLock);
 };
 
diff --git a/src/atomic/gcc_atomics.h b/src/atomic/gcc_atomics.h
deleted file mode 100644 (file)
index 46f5280..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- *     Copyright 2010 Couchbase, Inc
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-#ifndef SRC_ATOMIC_GCC_ATOMICS_H_
-#define SRC_ATOMIC_GCC_ATOMICS_H_ 1
-
-#include "config.h"
-
-#define ep_sync_add_and_fetch(a, b) __sync_add_and_fetch(a, b);
-#define ep_sync_bool_compare_and_swap(a, b, c) __sync_bool_compare_and_swap(a, b, c)
-#define ep_sync_fetch_and_add(a, b) __sync_fetch_and_add(a, b);
-#define ep_sync_lock_release(a) __sync_lock_release(a)
-#define ep_sync_lock_test_and_set(a, b) __sync_lock_test_and_set(a, b)
-#define ep_sync_synchronize() __sync_synchronize()
-
-#endif  // SRC_ATOMIC_GCC_ATOMICS_H_
diff --git a/src/atomic/libatomic.h b/src/atomic/libatomic.h
deleted file mode 100644 (file)
index 2443d1e..0000000
+++ /dev/null
@@ -1,231 +0,0 @@
-/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- *     Copyright 2010 Couchbase, Inc
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-#ifndef SRC_ATOMIC_LIBATOMIC_H_
-#define SRC_ATOMIC_LIBATOMIC_H_ 1
-
-/**
- * atomic.h provides a function interface to the various atomic functions,
- * but it's a C API and not C++ so I can't use the preprocessor to map
- * each function. Let's use function overloading instead and let the compiler
- * pick the one it wants...
- */
-
-#include "config.h"
-
-#include <atomic.h>
-#include <queue>
-
-inline int ep_sync_lock_test_and_set(volatile int *dest, int value) {
-    return atomic_swap_uint((volatile uint*)dest, value);
-}
-
-inline void ep_sync_lock_release(volatile int *dest) {
-    atomic_swap_uint((volatile uint*)dest, 0);
-}
-
-inline void ep_sync_synchronize(void) {
-    // I don't know how to add this yet...
-
-}
-
-inline rel_time_t ep_sync_add_and_fetch(volatile uint64_t *dest, uint64_t value) {
-     if (value == 1) {
-         return atomic_inc_64_nv(dest);
-     } else {
-         return atomic_add_64_nv(dest, value);
-     }
-}
-
-inline rel_time_t ep_sync_add_and_fetch(volatile uint32_t *dest, uint32_t value) {
-     if (value == 1) {
-         return atomic_inc_32_nv(dest);
-     } else {
-         return atomic_add_32_nv(dest, value);
-     }
-}
-
-inline int ep_sync_add_and_fetch(volatile int *dest, int value) {
-    if (value == 1) {
-        return atomic_inc_uint_nv((volatile uint_t*)dest);
-    } else {
-        return atomic_add_int_nv((volatile uint_t*)dest, value);
-    }
-}
-
-inline uint8_t ep_sync_add_and_fetch(volatile uint8_t *dest, uint8_t value) {
-    if (value == 1) {
-        return atomic_inc_8_nv(dest);
-    } else {
-        return atomic_add_8_nv(dest, value);
-    }
-}
-
-inline hrtime_t ep_sync_add_and_fetch(volatile hrtime_t *dest, hrtime_t value) {
-    if (value == 1) {
-        return atomic_inc_64_nv((volatile uint64_t*)dest);
-    } else {
-        return atomic_add_64_nv((volatile uint64_t*)dest, value);
-    }
-}
-
-inline int ep_sync_fetch_and_add(volatile int *dest, int value) {
-    size_t original = *dest;
-    if (value == 1) {
-        atomic_inc_uint((volatile uint_t*)dest);
-    } else {
-        atomic_add_int((volatile uint_t*)dest, value);
-    }
-    return original;
-}
-
-inline uint64_t ep_sync_fetch_and_add(volatile uint64_t *dest, uint64_t value) {
-    uint64_t original = *dest;
-    if (value == 1) {
-        atomic_inc_64(dest);
-    } else {
-        atomic_add_64(dest, value);
-    }
-
-    return original;
-}
-
-inline uint32_t ep_sync_fetch_and_add(volatile uint32_t *dest, uint32_t value) {
-    uint32_t original = *dest;
-    if (value == 1) {
-        atomic_inc_32(dest);
-    } else {
-        atomic_add_32(dest, value);
-    }
-
-    return original;
-}
-
-inline hrtime_t ep_sync_fetch_and_add(volatile hrtime_t *dest, hrtime_t value) {
-    size_t original = *dest;
-    if (value == 1) {
-        atomic_inc_64((volatile uint64_t*)dest);
-    } else {
-        atomic_add_64((volatile uint64_t*)dest, value);
-    }
-
-    return original;
-}
-
-inline bool ep_sync_bool_compare_and_swap(volatile uint8_t *dest, uint8_t prev, uint8_t next) {
-    uint8_t original = *dest;
-    if (original == atomic_cas_8((volatile uint8_t*)dest, (uint8_t)prev, (uint8_t)next)) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-inline bool ep_sync_bool_compare_and_swap(volatile bool *dest, bool prev, bool next) {
-    return ep_sync_bool_compare_and_swap((volatile uint8_t*)dest,
-                                         (uint8_t)prev,
-                                         (uint8_t)next);
-}
-
-inline bool ep_sync_bool_compare_and_swap(volatile int *dest, int prev, int next) {
-    uint_t original = *dest;
-    if (original == atomic_cas_uint((volatile uint_t*)dest, (uint_t)prev, (uint_t)next)) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-inline bool ep_sync_bool_compare_and_swap(volatile unsigned int *dest,
-                                          unsigned int prev,
-                                          unsigned int next) {
-    uint_t original = *dest;
-    if (original == atomic_cas_uint((volatile uint_t*)dest, (uint_t)prev, (uint_t)next)) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-inline bool ep_sync_bool_compare_and_swap(volatile uint64_t *dest,
-                                          uint64_t prev,
-                                          uint64_t next) {
-    uint64_t original = *dest;
-    if (original == atomic_cas_64(dest, prev, next)) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-inline bool ep_sync_bool_compare_and_swap(volatile int64_t *dest,
-                                          int64_t prev,
-                                          int64_t next) {
-    uint64_t original = (uint64_t)*dest;
-    if (original == atomic_cas_64((volatile uint64_t*)dest,
-                                  (uint64_t)prev,
-                                  (uint64_t)next)) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-#ifdef _LP64
-inline bool ep_sync_bool_compare_and_swap(volatile longlong_t *dest,
-                                          longlong_t prev,
-                                          longlong_t next) {
-    return ep_sync_bool_compare_and_swap((volatile uint64_t *)dest,
-                                         (uint64_t)prev,
-                                         (uint64_t)next);
-}
-#endif
-
-/*
- * Unfortunately C++ isn't all that happy about assinging everything to/from a
- * void pointer without a cast, so we need to add extra functions.
- * Luckily we know that the size_t is big enough to keep a pointer, so
- * we can reuse the size_t function for we already defined
- */
-typedef std::queue<int> IntQueue;
-class VBucket;
-class VBucketHolder;
-class Doodad;
-class Blob;
-
-inline bool ep_sync_bool_compare_and_swap(VBucket* volatile* dest, VBucket* prev, VBucket* next) {
-    return ep_sync_bool_compare_and_swap((size_t*)dest, (size_t)prev, (size_t)next);
-}
-
-inline bool ep_sync_bool_compare_and_swap(Blob* volatile* dest, Blob* prev, Blob* next) {
-    return ep_sync_bool_compare_and_swap((size_t*)dest, (size_t)prev, (size_t)next);
-}
-
-inline bool ep_sync_bool_compare_and_swap(VBucketHolder* volatile* dest, VBucketHolder* prev, VBucketHolder* next) {
-    return ep_sync_bool_compare_and_swap((size_t*)dest, (size_t)prev, (size_t)next);
-}
-
-inline bool ep_sync_bool_compare_and_swap(Doodad* volatile* dest, Doodad* prev, Doodad* next) {
-    return ep_sync_bool_compare_and_swap((size_t*)dest, (size_t)prev, (size_t)next);
-}
-
-inline bool ep_sync_bool_compare_and_swap(IntQueue * volatile *dest, IntQueue *prev, IntQueue *next) {
-    return ep_sync_bool_compare_and_swap((size_t*)dest, (size_t)prev, (size_t)next);
-}
-
-
-#endif  // SRC_ATOMIC_LIBATOMIC_H_
index a8644dc..6e46d61 100644 (file)
@@ -56,34 +56,9 @@ private:
 
 #else
 
+#include "atomic.h"
 #include "threadlocal.h"
 
-template <typename T>
-class CouchbaseAtomicPtr : public CouchbaseAtomic<T*> {
-public:
-    CouchbaseAtomicPtr(T *initial = NULL) : CouchbaseAtomic<T*>(initial) {}
-
-    ~CouchbaseAtomicPtr() {}
-
-    T *operator ->() {
-        return CouchbaseAtomic<T*>::load();
-    }
-
-    T &operator *() {
-        return *CouchbaseAtomic<T*>::load();
-    }
-
-    operator bool() const {
-        return CouchbaseAtomic<T*>::load() != NULL;
-    }
-
-    bool operator !() const {
-        return CouchbaseAtomic<T*>::load() == NULL;
-    }
-};
-
-
-
 /**
  * Efficient approximate-FIFO queue optimize for concurrent writers.
  */
@@ -95,7 +70,7 @@ public:
     ~AtomicQueue() {
         size_t i;
         for (i = 0; i < counter; ++i) {
-            delete queues[i];
+            delete queues[i].load();
         }
     }
 
@@ -130,7 +105,8 @@ public:
         size_t c(counter);
         for (size_t i = 0; i < c; ++i) {
             // Swap with another thread
-            newQueue = queues[i].swapIfNot(NULL, q);
+            std::queue<T> *nullQueue(NULL);
+            newQueue = atomic_swapIfNot(queues[i], nullQueue, q);
             // Empty the queue
             if (newQueue != NULL) {
                 q = newQueue;
@@ -160,25 +136,25 @@ public:
         return numItems;
     }
 private:
-    CouchbaseAtomicPtr<std::queue<T> > *initialize() {
+    AtomicPtr<std::queue<T> > *initialize() {
         std::queue<T> *q = new std::queue<T>;
         size_t i(counter++);
         cb_assert(counter <= MAX_THREADS);
-        queues[i] = q;
+        queues[i].store(q);
         threadQueue = &queues[i];
         return &queues[i];
     }
 
     std::queue<T> *swapQueue(std::queue<T> *newQueue = NULL) {
-        CouchbaseAtomicPtr<std::queue<T> > *qPtr(threadQueue);
+        AtomicPtr<std::queue<T> > *qPtr(threadQueue);
         if (qPtr == NULL) {
             qPtr = initialize();
         }
         return qPtr->exchange(newQueue);
     }
 
-    ThreadLocalPtr<CouchbaseAtomicPtr<std::queue<T> > > threadQueue;
-    CouchbaseAtomicPtr<std::queue<T> > queues[MAX_THREADS];
+    ThreadLocalPtr<AtomicPtr<std::queue<T> > > threadQueue;
+    AtomicPtr<std::queue<T> > queues[MAX_THREADS];
     AtomicValue<size_t> counter;
     AtomicValue<size_t> numItems;
     DISALLOW_COPY_AND_ASSIGN(AtomicQueue);
index 3e970e9..e1f3631 100644 (file)
 #include "ep.h"
 #include "vbucket.h"
 
-class RangeCallback : public Callback<SeqnoRange> {
-public:
-    RangeCallback() {}
-    ~RangeCallback() {}
-    void callback(SeqnoRange&) {}
-};
-
 class ItemResidentCallback : public Callback<CacheLookup> {
 public:
     ItemResidentCallback(hrtime_t token, const std::string &n,
@@ -112,7 +105,8 @@ bool BackfillDiskLoad::run() {
 
     if (connMap.checkConnectivity(name) &&
                                !engine->getEpStore()->isFlushAllScheduled()) {
-        size_t num_items = store->getNumItems(vbucket);
+        DBFileInfo info = store->getDbFileInfo(vbucket);
+        size_t num_items = info.itemCount;
         size_t num_deleted = store->getNumPersistedDeletes(vbucket);
         connMap.incrBackfillRemaining(name, num_items + num_deleted);
 
@@ -120,9 +114,13 @@ bool BackfillDiskLoad::run() {
             cb(new BackfillDiskCallback(connToken, name, connMap));
         shared_ptr<Callback<CacheLookup> >
             cl(new ItemResidentCallback(connToken, name, connMap, engine));
-        shared_ptr<Callback<SeqnoRange> >
-            sr(new RangeCallback());
-        store->dump(vbucket, startSeqno, cb, cl, sr);
+
+        ScanContext* ctx = store->initScanContext(cb, cl, vbucket, startSeqno,
+                                                  false, false, false);
+        if (ctx) {
+            store->scan(ctx);
+            store->destroyScanContext(ctx);
+        }
     }
 
     LOG(EXTENSION_LOG_INFO,"VBucket %d backfill task from disk is completed",
index 1c57eba..3ad0e38 100644 (file)
@@ -66,21 +66,10 @@ size_t BgFetcher::doFetch(uint16_t vbId) {
     std::vector<bgfetched_item_t> fetchedItems;
     vb_bgfetch_queue_t::iterator itr = items2fetch.begin();
     for (; itr != items2fetch.end(); ++itr) {
-        const std::string &key = (*itr).first;
         std::list<VBucketBGFetchItem *> &requestedItems = (*itr).second;
         std::list<VBucketBGFetchItem *>::iterator itm = requestedItems.begin();
         for(; itm != requestedItems.end(); ++itm) {
-            if ((*itm)->value.getStatus() == ENGINE_TMPFAIL &&
-                (*itm)->canRetry()) {
-                // underlying kvstore failed to fetch requested data
-                // don't return the failed request yet. Will requeue
-                // it for retry later
-                LOG(EXTENSION_LOG_WARNING,
-                        "Warning: bgfetcher failed to fetch data for vb = %d "
-                        "key = %s retry = %d\n", vbId, key.c_str(),
-                        (*itm)->getRetryCount());
-                continue;
-            }
+            const std::string &key = (*itr).first;
             fetchedItems.push_back(std::make_pair(key, *itm));
             ++totalfetches;
         }
@@ -108,18 +97,7 @@ void BgFetcher::clearItems(uint16_t vbId) {
 
         std::list<VBucketBGFetchItem *>::iterator dItr = doneItems.begin();
         for (; dItr != doneItems.end(); ++dItr) {
-            if ((*dItr)->value.getStatus() != ENGINE_TMPFAIL ||
-                !(*dItr)->canRetry()) {
-                delete *dItr;
-            } else {
-                RCPtr<VBucket> vb = store->getVBuckets().getBucket(vbId);
-                cb_assert(vb);
-                (*dItr)->incrRetryCount();
-                LOG(EXTENSION_LOG_DEBUG, "BgFetcher is re-queueing failed "
-                    "request for vb = %d key = %s retry = %d\n",
-                    vbId, (*itr).first.c_str(), (*dItr)->getRetryCount());
-                vb->queueBGFetchItem((*itr).first, *dItr, this);
-            }
+            delete *dItr;
         }
     }
 }
index c895a44..a11e245 100644 (file)
 #include "item.h"
 #include "stats.h"
 
-const uint16_t MAX_BGFETCH_RETRY=5;
-
 class VBucketBGFetchItem {
 public:
     VBucketBGFetchItem(const void *c, bool meta_only) :
-        cookie(c), initTime(gethrtime()), retryCount(0), metaDataOnly(meta_only)
+        cookie(c), initTime(gethrtime()), metaDataOnly(meta_only)
     { }
     ~VBucketBGFetchItem() {}
 
@@ -41,20 +39,10 @@ public:
         delete value.getValue();
         value.setValue(NULL);
     }
-    bool canRetry() {
-        return retryCount < MAX_BGFETCH_RETRY;
-    }
-    void incrRetryCount() {
-        ++retryCount;
-    }
-    uint16_t getRetryCount() {
-        return retryCount;
-    }
 
     GetValue value;
     const void * cookie;
     hrtime_t initTime;
-    uint16_t retryCount;
     bool metaDataOnly;
 };
 
diff --git a/src/bloomfilter.cc b/src/bloomfilter.cc
new file mode 100644 (file)
index 0000000..861f2ba
--- /dev/null
@@ -0,0 +1,132 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2014 Couchbase, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "bloomfilter.h"
+#include "murmurhash3.h"
+
+#if __x86_64__ || __ppc64__
+#define MURMURHASH_3 MurmurHash3_x64_128
+#else
+#define MURMURHASH_3 MurmurHash3_x86_128
+#endif
+
+BloomFilter::BloomFilter(size_t key_count, double false_positive_prob,
+                         bfilter_status_t new_status) {
+
+    status = new_status;
+    filterSize = estimateFilterSize(key_count, false_positive_prob);
+    noOfHashes = estimateNoOfHashes(key_count);
+    bitArray.assign(filterSize, false);
+}
+
+BloomFilter::~BloomFilter() {
+    status = BFILTER_DISABLED;
+    bitArray.clear();
+}
+
+size_t BloomFilter::estimateFilterSize(size_t key_count,
+                                       double false_positive_prob) {
+    return round(-(((double)(key_count) * log(false_positive_prob))
+                                                    / (pow(log(2.0), 2))));
+}
+
+size_t BloomFilter::estimateNoOfHashes(size_t key_count) {
+    return round(((double) filterSize / key_count) * (log(2.0)));
+}
+
+void BloomFilter::setStatus(bfilter_status_t to) {
+    switch (status) {
+        case BFILTER_DISABLED:
+            if (to == BFILTER_ENABLED) {
+                status = BFILTER_PENDING;
+            }
+            break;
+        case BFILTER_PENDING:
+            if (to == BFILTER_DISABLED) {
+                status = to;
+                bitArray.clear();
+            } else if (to == BFILTER_COMPACTING) {
+                status = to;
+            }
+            break;
+        case BFILTER_COMPACTING:
+            if (to == BFILTER_DISABLED) {
+                status = to;
+                bitArray.clear();
+            } else if (to == BFILTER_ENABLED) {
+                status = to;
+            }
+            break;
+        case BFILTER_ENABLED:
+            if (to == BFILTER_DISABLED) {
+                status = to;
+                bitArray.clear();
+            } else if (to == BFILTER_COMPACTING) {
+                status = to;
+            }
+            break;
+    }
+}
+
+bfilter_status_t BloomFilter::getStatus() {
+    return status;
+}
+
+std::string BloomFilter::getStatusString() {
+    switch (status) {
+        case BFILTER_DISABLED:
+            return "DISABLED";
+        case BFILTER_PENDING:
+            return "PENDING (ENABLED)";
+        case BFILTER_COMPACTING:
+            return "COMPACTING";
+        case BFILTER_ENABLED:
+            return "ENABLED";
+        default:
+            // Fix warining:
+            //  control reaches end of non-void function [-Wreturn-type]
+            cb_assert(false);
+            return "UNKNOWN";
+    }
+}
+
+void BloomFilter::addKey(const char *key, size_t keylen) {
+    if (status == BFILTER_COMPACTING || status == BFILTER_ENABLED) {
+        uint32_t i;
+        uint64_t result;
+        for (i = 0; i < noOfHashes; i++) {
+            MURMURHASH_3(key, keylen, i, &result);
+            bitArray[result % filterSize] = 1;
+        }
+    }
+}
+
+bool BloomFilter::maybeKeyExists(const char *key, uint32_t keylen) {
+    if (status == BFILTER_COMPACTING || status == BFILTER_ENABLED) {
+        uint32_t i;
+        uint64_t result;
+        for (i = 0; i < noOfHashes; i++) {
+            MURMURHASH_3(key, keylen, i, &result);
+            if (bitArray[result % filterSize] == 0) {
+                // The key does NOT exist.
+                return false;
+            }
+        }
+    }
+    // The key may exist.
+    return true;
+}
diff --git a/src/bloomfilter.h b/src/bloomfilter.h
new file mode 100644 (file)
index 0000000..9d93e8a
--- /dev/null
@@ -0,0 +1,60 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2014 Couchbase, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#ifndef SRC_BLOOMFILTER_H_
+#define SRC_BLOOMFILTER_H_ 1
+
+#include "config.h"
+#include "common.h"
+
+typedef enum {
+    BFILTER_DISABLED,
+    BFILTER_PENDING,
+    BFILTER_COMPACTING,
+    BFILTER_ENABLED
+} bfilter_status_t;
+
+/**
+ * A bloom filter instance for a vbucket.
+ * We are to maintain the vbucket-number of these instances.
+ *
+ * Each vbucket will hold one such object.
+ */
+class BloomFilter {
+public:
+    BloomFilter(size_t key_count, double false_positive_prob,
+                bfilter_status_t newStatus = BFILTER_DISABLED);
+    ~BloomFilter();
+
+    void setStatus(bfilter_status_t to);
+    bfilter_status_t getStatus();
+    std::string getStatusString();
+
+    void addKey(const char *key, size_t keylen);
+    bool maybeKeyExists(const char *key, uint32_t keylen);
+
+private:
+    size_t estimateFilterSize(size_t key_count, double false_positive_prob);
+    size_t estimateNoOfHashes(size_t key_count);
+
+    size_t filterSize;
+    size_t noOfHashes;
+    bfilter_status_t status;
+    std::vector<bool> bitArray;
+};
+
+#endif // SRC_BLOOMFILTER_H_
index 540bc49..9134be4 100644 (file)
 
 class Item;
 
-class SeqnoRange {
-public:
-    SeqnoRange(uint64_t st, uint64_t en)
-        : start(st), end(en) {}
-
-    uint64_t getStartSeqno() {
-        return start;
-    }
-
-    uint64_t getEndSeqno() {
-        return end;
-    }
-
-private:
-    uint64_t start;
-    uint64_t end;
-};
-
 class CacheLookup {
 public:
     CacheLookup(std::string k, int64_t s, uint16_t vb) :
index f7907b8..045deb7 100644 (file)
@@ -28,6 +28,8 @@
 #undef STATWRITER_NAMESPACE
 #include "vbucket.h"
 
+const std::string CheckpointManager::pCursorName("persistence");
+
 /**
  * A listener class to update checkpoint related configs at runtime.
  */
@@ -60,6 +62,22 @@ private:
     CheckpointConfig &config;
 };
 
+void CheckpointCursor::decrOffset(size_t decr) {
+    if (offset >= decr) {
+        offset.fetch_sub(decr);
+    } else {
+        offset = 0;
+        LOG(EXTENSION_LOG_INFO, "%s cursor offset is negative. Reset it to 0.",
+            name.c_str());
+    }
+}
+
+void CheckpointCursor::decrPos() {
+    if (currentPos != (*currentCheckpoint)->begin()) {
+        --currentPos;
+    }
+}
+
 Checkpoint::~Checkpoint() {
     LOG(EXTENSION_LOG_INFO,
         "Checkpoint %llu for vbucket %d is purged from memory",
@@ -101,29 +119,6 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
             rv = EXISTING_ITEM;
             std::list<queued_item>::iterator currPos = it->second.position;
             uint64_t currMutationId = it->second.mutation_id;
-            CheckpointCursor &pcursor = checkpointManager->persistenceCursor;
-            queued_item &pqi = *(pcursor.currentPos);
-
-            if (*(pcursor.currentCheckpoint) == this) {
-                // If the existing item is in the left-hand side of the item
-                // pointed by the persistence cursor, decrease the persistence
-                // cursor's offset by 1
-                const std::string &key = pqi->getKey();
-                checkpoint_index::iterator ita = keyIndex.find(key);
-                if ((ita != keyIndex.end()) && (!pqi->isCheckPointMetaItem())) {
-                    uint64_t mutationId = ita->second.mutation_id;
-                    if (currMutationId <= mutationId) {
-                        checkpointManager->decrCursorOffset_UNLOCKED(pcursor,
-                                                                     1);
-                        rv = PERSIST_AGAIN;
-                    }
-                }
-                // If the persistence cursor points to the existing item for the
-                // same key, shift the cursor left by 1.
-                if (pcursor.currentPos == currPos) {
-                    checkpointManager->decrCursorPos_UNLOCKED(pcursor);
-                }
-            }
 
             cursor_index::iterator map_it =
                                         checkpointManager->tapCursors.begin();
@@ -137,15 +132,17 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
                     {
                         uint64_t mutationId = ita->second.mutation_id;
                         if (currMutationId <= mutationId) {
-                            checkpointManager->
-                            decrCursorOffset_UNLOCKED(map_it->second, 1);
+                            map_it->second.decrOffset(1);
+                            if (map_it->second.name.compare(CheckpointManager::pCursorName)
+                                == 0) {
+                                rv = PERSIST_AGAIN;
+                            }
                         }
                     }
                     /* If an TAP cursor points to the existing item for the same
                        key, shift it left by 1 */
                     if (map_it->second.currentPos == currPos) {
-                        checkpointManager->decrCursorPos_UNLOCKED(
-                                                                map_it->second);
+                        map_it->second.decrPos();
                     }
                 }
             }
@@ -182,6 +179,12 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
         }
     }
 
+    // Notify flusher if in case queued item is a checkpoint meta item
+    if (qi->getOperation() == queue_op_checkpoint_start ||
+        qi->getOperation() == queue_op_checkpoint_end) {
+        checkpointManager->notifyFlusher();
+    }
+
     return rv;
 }
 
@@ -224,6 +227,15 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
             ++numNewItems;
         }
     }
+
+    /**
+     * Update snapshot start of current checkpoint to the first
+     * item's sequence number, after merge completed, as items
+     * from the previous checkpoint will be inserted into this
+     * checkpoint.
+     */
+    setSnapshotStartSeqno(getLowSeqno());
+
     memOverhead += newEntryMemOverhead;
     stats.memOverhead.fetch_add(newEntryMemOverhead);
     cb_assert(stats.memOverhead.load() < GIGANTOR);
@@ -288,29 +300,37 @@ void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
         (*it)->setRevSeqno(id);
         if (checkpointList.back()->getId() == 0) {
             (*it)->setBySeqno(lastBySeqno + 1);
+            checkpointList.back()->setSnapshotStartSeqno(lastBySeqno);
+            checkpointList.back()->setSnapshotEndSeqno(lastBySeqno);
         }
+
         checkpointList.back()->setId(id);
         LOG(EXTENSION_LOG_INFO, "Set the current open checkpoint id to %llu "
             "for vbucket %d, bySeqno is %llu, max is %llu", id, vbucketId,
             (*it)->getBySeqno(), lastBySeqno);
-
     }
 }
 
 bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
+    return addNewCheckpoint_UNLOCKED(id, lastBySeqno, lastBySeqno);
+}
+
+bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id,
+                                                  uint64_t snapStartSeqno,
+                                                  uint64_t snapEndSeqno) {
     // This is just for making sure that the current checkpoint should be
     // closed.
     if (!checkpointList.empty() &&
         checkpointList.back()->getState() == CHECKPOINT_OPEN) {
-        closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
+        closeOpenCheckpoint_UNLOCKED();
     }
 
     LOG(EXTENSION_LOG_INFO, "Create a new open checkpoint %llu for vbucket %d",
         id, vbucketId);
 
     bool was_empty = checkpointList.empty() ? true : false;
-    Checkpoint *checkpoint = new Checkpoint(stats, id, vbucketId,
-                                            CHECKPOINT_OPEN);
+    Checkpoint *checkpoint = new Checkpoint(stats, id, snapStartSeqno,
+                                            snapEndSeqno, vbucketId);
     // Add a dummy item into the new checkpoint, so that any cursor referring
     // to the actual first
     // item in this new checkpoint can be safely shifted left by 1 if the
@@ -329,36 +349,22 @@ bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
         return true;
     }
 
-    // Move the persistence cursor to the next checkpoint if it already reached to
-    // the end of its current checkpoint.
-    ++(persistenceCursor.currentPos);
-    if (persistenceCursor.currentPos != (*(persistenceCursor.currentCheckpoint))->end()) {
-        if ((*(persistenceCursor.currentPos))->getOperation() == queue_op_checkpoint_end) {
-            // Skip checkpoint_end meta item that is only used by replication cursors.
-            ++(persistenceCursor.offset);
-            ++(persistenceCursor.currentPos); // cursor now reaches to the checkpoint end.
-        }
-    }
-    if (persistenceCursor.currentPos == (*(persistenceCursor.currentCheckpoint))->end()) {
-        if ((*(persistenceCursor.currentCheckpoint))->getState() == CHECKPOINT_CLOSED) {
-            if (!moveCursorToNextCheckpoint(persistenceCursor)) {
-                --(persistenceCursor.currentPos);
-            }
-        } else {
-            // The persistence cursor is already reached to the end of the open checkpoint.
-            --(persistenceCursor.currentPos);
-        }
-    } else {
-        --(persistenceCursor.currentPos);
-    }
-
-    // If any of replication cursors reached to the end of its current checkpoint, move it to
-    // the next checkpoint. Note that the replication cursors cannot skip a checkpoint_end
-    // meta item.
+    // If any of replication cursors reached to the end of its current
+    // checkpoint, move it to the next checkpoint. Note that the replication
+    // cursors cannot skip a checkpoint_end meta item.
     std::map<const std::string, CheckpointCursor>::iterator tap_it = tapCursors.begin();
     for (; tap_it != tapCursors.end(); ++tap_it) {
         CheckpointCursor &cursor = tap_it->second;
-        if (++(cursor.currentPos) == (*(cursor.currentCheckpoint))->end()) {
+        ++(cursor.currentPos);
+        if (cursor.name.compare(pCursorName) == 0 &&
+            cursor.currentPos != (*(cursor.currentCheckpoint))->end() &&
+            (*(cursor.currentPos))->getOperation() == queue_op_checkpoint_end) {
+            // checkpoint_end meta item is only used by replication cursors
+            ++(cursor.offset);
+            ++(cursor.currentPos); // cursor now reaches to the checkpoint end
+        }
+
+        if (cursor.currentPos == (*(cursor.currentCheckpoint))->end()) {
            if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_CLOSED) {
                if (!moveCursorToNextCheckpoint(cursor)) {
                    --(cursor.currentPos);
@@ -376,20 +382,15 @@ bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
     return true;
 }
 
-bool CheckpointManager::addNewCheckpoint(uint64_t id) {
-    LockHolder lh(queueLock);
-    return addNewCheckpoint_UNLOCKED(id);
-}
-
-bool CheckpointManager::closeOpenCheckpoint_UNLOCKED(uint64_t id) {
+bool CheckpointManager::closeOpenCheckpoint_UNLOCKED() {
     if (checkpointList.empty()) {
         return false;
     }
-    if (id != checkpointList.back()->getId() ||
-        checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
+    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
         return true;
     }
 
+    uint64_t id = checkpointList.back()->getId();
     LOG(EXTENSION_LOG_INFO, "Close the open checkpoint %llu for vbucket %d",
         id, vbucketId);
 
@@ -404,35 +405,25 @@ bool CheckpointManager::closeOpenCheckpoint_UNLOCKED(uint64_t id) {
     return true;
 }
 
-bool CheckpointManager::closeOpenCheckpoint(uint64_t id) {
-    LockHolder lh(queueLock);
-    return closeOpenCheckpoint_UNLOCKED(id);
-}
-
-void CheckpointManager::registerPersistenceCursor() {
+bool CheckpointManager::closeOpenCheckpoint() {
     LockHolder lh(queueLock);
-    cb_assert(!checkpointList.empty());
-    persistenceCursor.currentCheckpoint = checkpointList.begin();
-    persistenceCursor.currentPos = checkpointList.front()->begin();
-    checkpointList.front()->registerCursorName(persistenceCursor.name);
+    return closeOpenCheckpoint_UNLOCKED();
 }
 
-bool CheckpointManager::registerTAPCursor(const std::string &name,
+bool CheckpointManager::registerCursor(const std::string &name,
                                           uint64_t checkpointId,
                                           bool alwaysFromBeginning) {
     LockHolder lh(queueLock);
-    return registerTAPCursor_UNLOCKED(name,
-                                      checkpointId,
-                                      alwaysFromBeginning);
+    return registerCursor_UNLOCKED(name, checkpointId, alwaysFromBeginning);
 }
 
-CursorRegResult CheckpointManager::registerTAPCursorBySeqno(const std::string &name,
-                                                            uint64_t startBySeqno) {
+CursorRegResult CheckpointManager::registerCursorBySeqno(const std::string &name,
+                                                         uint64_t startBySeqno) {
     LockHolder lh(queueLock);
     cb_assert(!checkpointList.empty());
     cb_assert(checkpointList.back()->getHighSeqno() >= startBySeqno);
 
-    removeTAPCursor_UNLOCKED(name);
+    removeCursor_UNLOCKED(name);
 
     size_t skipped = 0;
     CursorRegResult result;
@@ -489,11 +480,16 @@ CursorRegResult CheckpointManager::registerTAPCursorBySeqno(const std::string &n
     return result;
 }
 
-bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
-                                                   uint64_t checkpointId,
-                                                   bool alwaysFromBeginning) {
+bool CheckpointManager::registerCursor_UNLOCKED(const std::string &name,
+                                                uint64_t checkpointId,
+                                                bool alwaysFromBeginning) {
     cb_assert(!checkpointList.empty());
 
+    bool resetOnCollapse = true;
+    if (name.compare(pCursorName) == 0) {
+        resetOnCollapse = false;
+    }
+
     bool found = false;
     std::list<Checkpoint*>::iterator it = checkpointList.begin();
     for (; it != checkpointList.end(); ++it) {
@@ -536,7 +532,7 @@ bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
         }
 
         tapCursors[name] = CheckpointCursor(name, it, (*it)->begin(), offset,
-                                            true);
+                                            resetOnCollapse);
         (*it)->registerCursorName(name);
     } else {
         size_t offset = 0;
@@ -566,7 +562,8 @@ bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
             }
         }
 
-        tapCursors[name] = CheckpointCursor(name, it, curr, offset, true);
+        tapCursors[name] = CheckpointCursor(name, it, curr, offset,
+                                            resetOnCollapse);
         // Register the tap cursor's name to the checkpoint.
         (*it)->registerCursorName(name);
     }
@@ -574,12 +571,12 @@ bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
     return found;
 }
 
-bool CheckpointManager::removeTAPCursor(const std::string &name) {
+bool CheckpointManager::removeCursor(const std::string &name) {
     LockHolder lh(queueLock);
-    return removeTAPCursor_UNLOCKED(name);
+    return removeCursor_UNLOCKED(name);
 }
 
-bool CheckpointManager::removeTAPCursor_UNLOCKED(const std::string &name) {
+bool CheckpointManager::removeCursor_UNLOCKED(const std::string &name) {
     cursor_index::iterator it = tapCursors.find(name);
     if (it == tapCursors.end()) {
         return false;
@@ -606,8 +603,7 @@ bool CheckpointManager::removeTAPCursor_UNLOCKED(const std::string &name) {
     return true;
 }
 
-uint64_t CheckpointManager::getCheckpointIdForTAPCursor(
-                                                     const std::string &name) {
+uint64_t CheckpointManager::getCheckpointIdForCursor(const std::string &name) {
     LockHolder lh(queueLock);
     cursor_index::iterator it = tapCursors.find(name);
     if (it == tapCursors.end()) {
@@ -617,7 +613,7 @@ uint64_t CheckpointManager::getCheckpointIdForTAPCursor(
     return (*(it->second.currentCheckpoint))->getId();
 }
 
-size_t CheckpointManager::getNumOfTAPCursors() {
+size_t CheckpointManager::getNumOfCursors() {
     LockHolder lh(queueLock);
     return tapCursors.size();
 }
@@ -627,7 +623,7 @@ size_t CheckpointManager::getNumCheckpoints() {
     return checkpointList.size();
 }
 
-std::list<std::string> CheckpointManager::getTAPCursorNames() {
+std::list<std::string> CheckpointManager::getCursorNames() {
     LockHolder lh(queueLock);
     std::list<std::string> cursor_names;
     cursor_index::iterator tap_it = tapCursors.begin();
@@ -714,10 +710,9 @@ size_t CheckpointManager::removeClosedUnrefCheckpoints(
     size_t total_items = numUnrefItems + numMetaItems;
     numItems.fetch_sub(total_items);
     if (total_items > 0) {
-        decrCursorOffset_UNLOCKED(persistenceCursor, total_items);
         cursor_index::iterator map_it = tapCursors.begin();
         for (; map_it != tapCursors.end(); ++map_it) {
-            decrCursorOffset_UNLOCKED(map_it->second, total_items);
+            map_it->second.decrOffset(total_items);
         }
     }
     unrefCheckpointList.splice(unrefCheckpointList.begin(), checkpointList,
@@ -732,9 +727,9 @@ size_t CheckpointManager::removeClosedUnrefCheckpoints(
         !checkpointConfig.canKeepClosedCheckpoints() &&
         vbucket->getState() == vbucket_state_replica)
     {
-        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
+        size_t curr_remains = getNumItemsForCursor_UNLOCKED(pCursorName);
         collapseClosedCheckpoints(unrefCheckpointList);
-        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
+        size_t new_remains = getNumItemsForCursor_UNLOCKED(pCursorName);
         if (curr_remains > new_remains) {
             size_t diff = curr_remains - new_remains;
             stats.decrDiskQueueSize(diff);
@@ -761,17 +756,10 @@ void CheckpointManager::removeInvalidCursorsOnCheckpoint(
     const std::set<std::string> &cursors = pCheckpoint->getCursorNameList();
     std::set<std::string>::const_iterator cit = cursors.begin();
     for (; cit != cursors.end(); ++cit) {
-        // Check it with persistence cursor
-        if ((*cit).compare(persistenceCursor.name) == 0) {
-            if (pCheckpoint != *(persistenceCursor.currentCheckpoint)) {
-                invalidCursorNames.push_back(*cit);
-            }
-        } else { // Check it with tap cursors
-            cursor_index::iterator mit = tapCursors.find(*cit);
-            if (mit == tapCursors.end() ||
-                pCheckpoint != *(mit->second.currentCheckpoint)) {
-                invalidCursorNames.push_back(*cit);
-            }
+        cursor_index::iterator mit = tapCursors.find(*cit);
+        if (mit == tapCursors.end() ||
+            pCheckpoint != *(mit->second.currentCheckpoint)) {
+            invalidCursorNames.push_back(*cit);
         }
     }
 
@@ -799,20 +787,13 @@ void CheckpointManager::collapseClosedCheckpoints(
         // those cursors move to the first regular item. Otherwise, those cursors will
         // visit old items from collapsed checkpoints again.
         for (; nitr != (*lastClosedChk)->getCursorNameList().end(); ++nitr) {
-            if (nitr->compare(persistenceCursor.name) == 0) {
-                enum queue_operation qop = (*(persistenceCursor.currentPos))->getOperation();
-                if (qop == queue_op_empty || qop == queue_op_checkpoint_start) {
-                    return;
-                }
-            } else {
-                cursor_index::iterator cc = tapCursors.find(*nitr);
-                if (cc == tapCursors.end()) {
-                    continue;
-                }
-                enum queue_operation qop = (*(cc->second.currentPos))->getOperation();
-                if (qop ==  queue_op_empty || qop == queue_op_checkpoint_start) {
-                    return;
-                }
+            cursor_index::iterator cc = tapCursors.find(*nitr);
+            if (cc == tapCursors.end()) {
+                continue;
+            }
+            enum queue_operation qop = (*(cc->second.currentPos))->getOperation();
+            if (qop ==  queue_op_empty || qop == queue_op_checkpoint_start) {
+                return;
             }
         }
 
@@ -829,37 +810,18 @@ void CheckpointManager::collapseClosedCheckpoints(
             std::set<std::string>::iterator nameItr =
                 (*rit)->getCursorNameList().begin();
             for (; nameItr != (*rit)->getCursorNameList().end(); ++nameItr) {
-                if (nameItr->compare(persistenceCursor.name) == 0) {
-                    const std::string& key =
-                                 (*(persistenceCursor.currentPos))->getKey();
-                    bool isMetaItem =
-                      (*(persistenceCursor.currentPos))->isCheckPointMetaItem();
-                    bool cursor_on_chk_start = false;
-                    if ((*(persistenceCursor.currentPos))->getOperation() ==
-                        queue_op_checkpoint_start) {
-                        cursor_on_chk_start = true;
-                    }
-                    slowCursors[*nameItr] =
-                        std::make_pair((*rit)->getMutationIdForKey(key,
-                                                                   isMetaItem),
-                                       cursor_on_chk_start);
-                } else {
-                    cursor_index::iterator cc =
-                        tapCursors.find(*nameItr);
-                    const std::string& key = (*(cc->second.currentPos))->
-                                             getKey();
-                    bool isMetaItem =
-                             (*(cc->second.currentPos))->isCheckPointMetaItem();
-                    bool cursor_on_chk_start = false;
-                    if ((*(cc->second.currentPos))->getOperation() ==
-                        queue_op_checkpoint_start) {
-                        cursor_on_chk_start = true;
-                    }
-                    slowCursors[*nameItr] =
-                        std::make_pair((*rit)->getMutationIdForKey(key,
-                                                                   isMetaItem),
-                                       cursor_on_chk_start);
+                cursor_index::iterator cc = tapCursors.find(*nameItr);
+                const std::string& key = (*(cc->second.currentPos))->getKey();
+                bool isMetaItem =
+                            (*(cc->second.currentPos))->isCheckPointMetaItem();
+                bool cursor_on_chk_start = false;
+                if ((*(cc->second.currentPos))->getOperation() ==
+                    queue_op_checkpoint_start) {
+                    cursor_on_chk_start = true;
                 }
+                slowCursors[*nameItr] =
+                    std::make_pair((*rit)->getMutationIdForKey(key, isMetaItem),
+                                   cursor_on_chk_start);
             }
         }
         putCursorsInCollapsedChk(slowCursors, lastClosedChk);
@@ -874,15 +836,9 @@ void CheckpointManager::collapseClosedCheckpoints(
         std::set<std::string>::const_iterator cit = fastCursors.begin();
         // Update the offset of each fast cursor.
         for (; cit != fastCursors.end(); ++cit) {
-            if ((*cit).compare(persistenceCursor.name) == 0) {
-                decrCursorOffset_UNLOCKED(persistenceCursor,
-                                          total_items);
-            } else {
-                cursor_index::iterator mit = tapCursors.find(*cit);
-                if (mit != tapCursors.end()) {
-                    decrCursorOffset_UNLOCKED(mit->second,
-                                              total_items);
-                }
+            cursor_index::iterator mit = tapCursors.find(*cit);
+            if (mit != tapCursors.end()) {
+                mit->second.decrOffset(total_items);
             }
         }
         collapsedChks.splice(collapsedChks.end(), checkpointList,
@@ -908,16 +864,35 @@ bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb, queued_item& qi,
     }
 
     if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
-        addNewCheckpoint_UNLOCKED(checkpointList.back()->getId() + 1);
+        if (vb->getState() == vbucket_state_active) {
+            addNewCheckpoint_UNLOCKED(checkpointList.back()->getId() + 1);
+        } else {
+            LOG(EXTENSION_LOG_WARNING, "Checkpoint closed in queueDirty()!"
+                "This is not expected. vb %d, vb state %d, lastBySeqno %llu,"
+                "genSeqno: %d", vb->getId(), vb->getState(), lastBySeqno,
+                genSeqno);
+            cb_assert(false);
+        }
     }
 
     cb_assert(checkpointList.back()->getState() == CHECKPOINT_OPEN);
 
     if (genSeqno) {
         qi->setBySeqno(++lastBySeqno);
+        checkpointList.back()->setSnapshotEndSeqno(lastBySeqno);
     } else {
         lastBySeqno = qi->getBySeqno();
     }
+    uint64_t st = checkpointList.back()->getSnapshotStartSeqno();
+    uint64_t en = checkpointList.back()->getSnapshotEndSeqno();
+    if (!(st <= static_cast<uint64_t>(lastBySeqno) &&
+          static_cast<uint64_t>(lastBySeqno) <= en)) {
+        LOG(EXTENSION_LOG_WARNING, "lastBySeqno not in snapshot range "
+            "vb %d, vb state %d, snapshotstart %llu, lastBySeqno %llu, "
+            "snapshotend %llu genSeqno: %d", vb->getId(), vb->getState(),
+            st, lastBySeqno, en, genSeqno);
+        cb_assert(false);
+    }
 
     queue_dirty_t result = checkpointList.back()->queueDirty(qi, this);
     if (result == NEW_ITEM) {
@@ -933,41 +908,43 @@ bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb, queued_item& qi,
     return result != EXISTING_ITEM;
 }
 
-void CheckpointManager::getAllItemsForCursor(const std::string& name,
-                                             std::deque<queued_item> &items) {
+snapshot_range_t CheckpointManager::getAllItemsForCursor(
+                                             const std::string& name,
+                                             std::vector<queued_item> &items) {
     LockHolder lh(queueLock);
+    snapshot_range_t range;
     cursor_index::iterator it = tapCursors.find(name);
     if (it == tapCursors.end()) {
-        return;
+        range.start = 0;
+        range.end = 0;
+        return range;
     }
 
-    while (incrCursor(it->second)) {
+    bool moreItems;
+    range.start = (*it->second.currentCheckpoint)->getSnapshotStartSeqno();
+    range.end = (*it->second.currentCheckpoint)->getSnapshotEndSeqno();
+    while ((moreItems = incrCursor(it->second))) {
         queued_item& qi = *(it->second.currentPos);
         items.push_back(qi);
 
         if (qi->getOperation() == queue_op_checkpoint_end) {
+            range.end = (*it->second.currentCheckpoint)->getSnapshotEndSeqno();
             moveCursorToNextCheckpoint(it->second);
-            break;
+            if (name.compare(pCursorName) != 0) {
+                break;
+            }
         }
     }
-}
 
-void CheckpointManager::getAllItemsForPersistence(
-                                             std::vector<queued_item> &items) {
-    LockHolder lh(queueLock);
-    // Get all the items up to the end of the current open checkpoint.
-    while (incrCursor(persistenceCursor)) {
-        items.push_back(*(persistenceCursor.currentPos));
+    if (!moreItems) {
+        range.end = (*it->second.currentCheckpoint)->getSnapshotEndSeqno();
     }
 
-    LOG(EXTENSION_LOG_DEBUG,
-        "Grab %ld items through the persistence cursor from vbucket %d",
-        items.size(), vbucketId);
+    return range;
 }
 
 queued_item CheckpointManager::nextItem(const std::string &name,
-                                        bool &isLastMutationItem,
-                                        uint64_t &endSeqno) {
+                                        bool &isLastMutationItem) {
     LockHolder lh(queueLock);
     cursor_index::iterator it = tapCursors.find(name);
     if (it == tapCursors.end()) {
@@ -991,19 +968,9 @@ queued_item CheckpointManager::nextItem(const std::string &name,
     CheckpointCursor &cursor = it->second;
     if (incrCursor(cursor)) {
         isLastMutationItem = isLastMutationItemInCheckpoint(cursor);
-        if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_CLOSED) {
-            endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
-        } else {
-            endSeqno = -1;
-        }
         return *(cursor.currentPos);
     } else {
         isLastMutationItem = false;
-        if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_CLOSED) {
-            endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
-        } else {
-            endSeqno = -1;
-        }
         queued_item qi(new Item(std::string(""), 0xffff,
                                 queue_op_empty, 0, 0));
         return qi;
@@ -1021,8 +988,17 @@ bool CheckpointManager::incrCursor(CheckpointCursor &cursor) {
     return incrCursor(cursor);
 }
 
-void CheckpointManager::clear(vbucket_state_t vbState) {
+void CheckpointManager::clear(RCPtr<VBucket> &vb, uint64_t seqno) {
     LockHolder lh(queueLock);
+    clear_UNLOCKED(vb->getState(), seqno);
+
+    // Reset the disk write queue size stat for the vbucket
+    size_t currentDqSize = vb->dirtyQueueSize.load();
+    vb->decrDirtyQueueSize(currentDqSize);
+    stats.decrDiskQueueSize(currentDqSize);
+}
+
+void CheckpointManager::clear_UNLOCKED(vbucket_state_t vbState, uint64_t seqno) {
     std::list<Checkpoint*>::iterator it = checkpointList.begin();
     // Remove all the checkpoints.
     while(it != checkpointList.end()) {
@@ -1031,7 +1007,7 @@ void CheckpointManager::clear(vbucket_state_t vbState) {
     }
     checkpointList.clear();
     numItems = 0;
-    lastBySeqno = 0;
+    lastBySeqno = seqno;
     pCursorPreCheckpointId = 0;
 
     uint64_t checkpointId = vbState == vbucket_state_active ? 1 : 0;
@@ -1041,19 +1017,16 @@ void CheckpointManager::clear(vbucket_state_t vbState) {
 }
 
 void CheckpointManager::resetCursors(bool resetPersistenceCursor) {
-    // Reset the persistence cursor.
-    if (resetPersistenceCursor) {
-        persistenceCursor.currentCheckpoint = checkpointList.begin();
-        persistenceCursor.currentPos = checkpointList.front()->begin();
-        persistenceCursor.offset = 0;
-        checkpointList.front()->registerCursorName(persistenceCursor.name);
-        uint64_t chkid = checkpointList.front()->getId();
-        pCursorPreCheckpointId = chkid ? chkid - 1 : 0;
-    }
-
-    // Reset all the TAP cursors.
     cursor_index::iterator cit = tapCursors.begin();
     for (; cit != tapCursors.end(); ++cit) {
+        if (cit->second.name.compare(pCursorName) == 0) {
+            if (!resetPersistenceCursor) {
+                continue;
+            } else {
+                uint64_t chkid = checkpointList.front()->getId();
+                pCursorPreCheckpointId = chkid ? chkid - 1 : 0;
+            }
+        }
         cit->second.currentCheckpoint = checkpointList.begin();
         cit->second.currentPos = checkpointList.front()->begin();
         cit->second.offset = 0;
@@ -1061,11 +1034,11 @@ void CheckpointManager::resetCursors(bool resetPersistenceCursor) {
     }
 }
 
-void CheckpointManager::resetTAPCursors(const std::list<std::string> &cursors){
+void CheckpointManager::resetCursors(const std::list<std::string> &cursors) {
     LockHolder lh(queueLock);
     std::list<std::string>::const_iterator it = cursors.begin();
     for (; it != cursors.end(); ++it) {
-        registerTAPCursor_UNLOCKED(*it, getOpenCheckpointId_UNLOCKED(), true);
+        registerCursor_UNLOCKED(*it, getOpenCheckpointId_UNLOCKED(), true);
     }
 }
 
@@ -1119,70 +1092,26 @@ uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(bool forceCreation,
         (checkpointList.back()->getNumItems() > 0 && timeBound)) {
 
         checkpoint_id = checkpointList.back()->getId();
-        closeOpenCheckpoint_UNLOCKED(checkpoint_id);
         addNewCheckpoint_UNLOCKED(checkpoint_id + 1);
     }
     return checkpoint_id;
 }
 
-bool CheckpointManager::eligibleForEviction(const std::string &key, bool isMeta)
-{
+size_t CheckpointManager::getNumItemsForCursor(const std::string &name) {
     LockHolder lh(queueLock);
-    uint64_t smallest_mid;
-
-    // Get the mutation id of the item pointed by the slowest cursor.
-    // This won't cause much overhead as the number of cursors per vbucket is
-    // usually bounded to 3 (persistence cursor + 2 replicas).
-    const std::string &pkey = (*(persistenceCursor.currentPos))->getKey();
-    bool isMetaItem = (*(persistenceCursor.currentPos))->isCheckPointMetaItem();
-    smallest_mid = (*(persistenceCursor.currentCheckpoint))->
-                                          getMutationIdForKey(pkey, isMetaItem);
-    cursor_index::iterator mit = tapCursors.begin();
-    for (; mit != tapCursors.end(); ++mit) {
-        const std::string &tkey = (*(mit->second.currentPos))->getKey();
-        bool isMetaItem = (*(mit->second.currentPos))->isCheckPointMetaItem();
-        uint64_t mid = (*(mit->second.currentCheckpoint))->
-                                          getMutationIdForKey(tkey, isMetaItem);
-        if (mid < smallest_mid) {
-            smallest_mid = mid;
-        }
-    }
-
-    bool can_evict = true;
-    std::list<Checkpoint*>::reverse_iterator it = checkpointList.rbegin();
-    for (; it != checkpointList.rend(); ++it) {
-        uint64_t mid = (*it)->getMutationIdForKey(key, isMeta);
-        if (mid == 0) { // key doesn't exist in a checkpoint.
-            continue;
-        }
-        if (smallest_mid < mid) { // The slowest cursor is still
-            can_evict = false;    //sitting behind a given key.
-            break;
-        }
-    }
-
-    return can_evict;
+    return getNumItemsForCursor_UNLOCKED(name);
 }
 
-size_t CheckpointManager::getNumItemsForTAPConnection(const std::string &name) {
-    LockHolder lh(queueLock);
+size_t CheckpointManager::getNumItemsForCursor_UNLOCKED(const std::string &name) {
     size_t remains = 0;
     cursor_index::iterator it = tapCursors.find(name);
     if (it != tapCursors.end()) {
         size_t offset = it->second.offset + getNumOfMetaItemsFromCursor(it->second);
-        remains = (numItems > offset) ?
-                   numItems - offset : 0;
+        remains = (numItems > offset) ? numItems - offset : 0;
     }
     return remains;
 }
 
-size_t CheckpointManager::getNumItemsForPersistence_UNLOCKED() {
-    size_t num_items = numItems;
-    size_t offset = persistenceCursor.offset +
-                    getNumOfMetaItemsFromCursor(persistenceCursor);
-    return num_items > offset ? num_items - offset : 0;
-}
-
 size_t CheckpointManager::getNumOfMetaItemsFromCursor(CheckpointCursor &cursor) {
     // Get the number of meta items that can be skipped by a given cursor.
     size_t meta_items = 0;
@@ -1216,27 +1145,14 @@ size_t CheckpointManager::getNumOfMetaItemsFromCursor(CheckpointCursor &cursor)
     return meta_items;
 }
 
-void CheckpointManager::decrTapCursorFromCheckpointEnd(
-                                                    const std::string &name) {
+void CheckpointManager::decrCursorFromCheckpointEnd(const std::string &name) {
     LockHolder lh(queueLock);
     cursor_index::iterator it = tapCursors.find(name);
     if (it != tapCursors.end() &&
         (*(it->second.currentPos))->getOperation() ==
         queue_op_checkpoint_end) {
-        decrCursorPos_UNLOCKED(it->second);
-    }
-}
-
-uint64_t CheckpointManager::getMutationIdForKey(uint64_t chk_id,
-                                                std::string key,
-                                                bool isMeta) {
-    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
-    for (; itr != checkpointList.end(); ++itr) {
-        if (chk_id == (*itr)->getId()) {
-            return (*itr)->getMutationIdForKey(key, isMeta);
-        }
+        it->second.decrPos();
     }
-    return 0;
 }
 
 bool CheckpointManager::isLastMutationItemInCheckpoint(
@@ -1250,6 +1166,80 @@ bool CheckpointManager::isLastMutationItemInCheckpoint(
     return false;
 }
 
+void CheckpointManager::setBackfillPhase(uint64_t start, uint64_t end) {
+    LockHolder lh(queueLock);
+    setOpenCheckpointId_UNLOCKED(0);
+    checkpointList.back()->setSnapshotStartSeqno(start);
+    checkpointList.back()->setSnapshotEndSeqno(end);
+}
+
+void CheckpointManager::createSnapshot(uint64_t snapStartSeqno,
+                                       uint64_t snapEndSeqno) {
+    LockHolder lh(queueLock);
+    cb_assert(!checkpointList.empty());
+
+    size_t lastChkId = checkpointList.back()->getId();
+
+    if (checkpointList.back()->getState() == CHECKPOINT_OPEN &&
+        checkpointList.back()->getNumItems() == 0) {
+        if (lastChkId == 0) {
+            setOpenCheckpointId_UNLOCKED(lastChkId + 1);
+            resetCursors(false);
+        }
+        checkpointList.back()->setSnapshotStartSeqno(snapStartSeqno);
+        checkpointList.back()->setSnapshotEndSeqno(snapEndSeqno);
+        return;
+    }
+
+    addNewCheckpoint_UNLOCKED(lastChkId + 1, snapStartSeqno, snapEndSeqno);
+}
+
+void CheckpointManager::resetSnapshotRange() {
+    LockHolder lh(queueLock);
+    cb_assert(!checkpointList.empty());
+
+    // Update snapshot_start and snapshot_end only if the open
+    // checkpoint has no items, otherwise just set the
+    // snapshot_end to the high_seqno.
+    if (checkpointList.back()->getState() == CHECKPOINT_OPEN &&
+        checkpointList.back()->getNumItems() == 0) {
+        checkpointList.back()->setSnapshotStartSeqno(
+                                        static_cast<uint64_t>(lastBySeqno + 1));
+        checkpointList.back()->setSnapshotEndSeqno(
+                                        static_cast<uint64_t>(lastBySeqno + 1));
+
+    } else {
+        checkpointList.back()->setSnapshotEndSeqno(
+                                        static_cast<uint64_t>(lastBySeqno));
+    }
+}
+
+snapshot_info_t CheckpointManager::getSnapshotInfo() {
+    LockHolder lh(queueLock);
+    cb_assert(!checkpointList.empty());
+
+    snapshot_info_t info;
+    info.range.start = checkpointList.back()->getSnapshotStartSeqno();
+    info.start = lastBySeqno;
+    info.range.end = checkpointList.back()->getSnapshotEndSeqno();
+
+    // If there are no items in the open checkpoint then we need to resume by
+    // using that sequence numbers of the last closed snapshot. The exception is
+    // if we are in a partial snapshot which can be detected by checking if the
+    // snapshot start sequence number is greater than the start sequence number
+    // Also, since the last closed snapshot may not be in the checkpoint manager
+    // we should just use the last by sequence number. The open checkpoint will
+    // be overwritten once the next snapshot marker is received since there are
+    // no items in it.
+    if (checkpointList.back()->getNumItems() == 0 &&
+        static_cast<uint64_t>(lastBySeqno) < info.range.start) {
+        info.range.start = lastBySeqno;
+        info.range.end = lastBySeqno;
+    }
+
+    return info;
+}
+
 void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
                                                const RCPtr<VBucket> &vbucket) {
     LockHolder lh(queueLock);
@@ -1298,7 +1288,7 @@ void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
                                                    getCursorNameList();
             std::set<std::string>::const_iterator cit = cursors.begin();
             for (; cit != cursors.end(); ++cit) {
-                if ((*cit).compare(persistenceCursor.name) == 0) {
+                if ((*cit).compare(pCursorName) == 0) {
                     // Persistence cursor
                     continue;
                 } else { // TAP cursors
@@ -1307,13 +1297,12 @@ void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
                 }
             }
         } else {
-            closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
             addNewCheckpoint_UNLOCKED(id);
         }
     } else {
-        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
+        size_t curr_remains = getNumItemsForCursor_UNLOCKED(pCursorName);
         collapseCheckpoints(id);
-        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
+        size_t new_remains = getNumItemsForCursor_UNLOCKED(pCursorName);
         if (curr_remains > new_remains) {
             size_t diff = curr_remains - new_remains;
             stats.decrDiskQueueSize(diff);
@@ -1344,17 +1333,6 @@ void CheckpointManager::collapseCheckpoints(uint64_t id) {
                            cursor_on_chk_start);
     }
 
-    Checkpoint* chk = *(persistenceCursor.currentCheckpoint);
-    std::string key = (*(persistenceCursor.currentPos))->getKey();
-    bool isMetaItem = (*(persistenceCursor.currentPos))->isCheckPointMetaItem();
-    bool cursor_on_chk_start = false;
-    if ((*(persistenceCursor.currentPos))->getOperation() == queue_op_checkpoint_start) {
-        cursor_on_chk_start = true;
-    }
-    cursorMap[persistenceCursor.name.c_str()] =
-        std::make_pair(chk->getMutationIdForKey(key, isMetaItem),
-                       cursor_on_chk_start);
-
     setOpenCheckpointId_UNLOCKED(id);
 
     std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
@@ -1397,24 +1375,18 @@ putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &curs
         while (mit != cursors.end()) {
             std::pair<uint64_t, bool> val = mit->second;
             if (val.first < id || (val.first == id && val.second &&
-                                   (*last)->getOperation() == queue_op_checkpoint_start)) {
-                if (mit->first.compare(persistenceCursor.name) == 0) {
-                    persistenceCursor.currentCheckpoint = chkItr;
-                    persistenceCursor.currentPos = last;
-                    persistenceCursor.offset = (i > 0) ? i - 1 : 0;
-                    chk->registerCursorName(persistenceCursor.name);
-                } else {
-                    cursor_index::iterator cc = tapCursors.find(mit->first);
-                    if (cc == tapCursors.end() ||
-                        cc->second.fromBeginningOnChkCollapse) {
-                        ++mit;
-                        continue;
-                    }
-                    cc->second.currentCheckpoint = chkItr;
-                    cc->second.currentPos = last;
-                    cc->second.offset = (i > 0) ? i - 1 : 0;
-                    chk->registerCursorName(cc->second.name);
+                (*last)->getOperation() == queue_op_checkpoint_start)) {
+
+                cursor_index::iterator cc = tapCursors.find(mit->first);
+                if (cc == tapCursors.end() ||
+                    cc->second.fromBeginningOnChkCollapse) {
+                    ++mit;
+                    continue;
                 }
+                cc->second.currentCheckpoint = chkItr;
+                cc->second.currentPos = last;
+                cc->second.offset = (i > 0) ? i - 1 : 0;
+                chk->registerCursorName(cc->second.name);
                 cursors.erase(mit++);
             } else {
                 ++mit;
@@ -1429,26 +1401,19 @@ putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &curs
 
     std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
     for (; mit != cursors.end(); ++mit) {
-        if (mit->first.compare(persistenceCursor.name) == 0) {
-            persistenceCursor.currentCheckpoint = chkItr;
-            persistenceCursor.currentPos = last;
-            persistenceCursor.offset = (i > 0) ? i - 1 : 0;
-            chk->registerCursorName(persistenceCursor.name);
+        cursor_index::iterator cc = tapCursors.find(mit->first);
+        if (cc == tapCursors.end()) {
+            continue;
+        }
+        cc->second.currentCheckpoint = chkItr;
+        if (cc->second.fromBeginningOnChkCollapse) {
+            cc->second.currentPos = chk->begin();
+            cc->second.offset = 0;
         } else {
-            cursor_index::iterator cc = tapCursors.find(mit->first);
-            if (cc == tapCursors.end()) {
-                continue;
-            }
-            cc->second.currentCheckpoint = chkItr;
-            if (cc->second.fromBeginningOnChkCollapse) {
-                cc->second.currentPos = chk->begin();
-                cc->second.offset = 0;
-            } else {
-                cc->second.currentPos = last;
-                cc->second.offset = (i > 0) ? i - 1 : 0;
-            }
-            chk->registerCursorName(cc->second.name);
+            cc->second.currentPos = last;
+            cc->second.offset = (i > 0) ? i - 1 : 0;
         }
+        chk->registerCursorName(cc->second.name);
     }
 }
 
@@ -1491,46 +1456,15 @@ queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
     return qi;
 }
 
-bool CheckpointManager::hasNextForPersistence() {
-    LockHolder lh(queueLock);
-    bool hasMore = true;
-    std::list<queued_item>::iterator curr = persistenceCursor.currentPos;
-    ++curr;
-    if (curr == (*(persistenceCursor.currentCheckpoint))->end() &&
-        (*(persistenceCursor.currentCheckpoint)) == checkpointList.back()) {
-        hasMore = false;
-    }
-    return hasMore;
-}
-
 uint64_t CheckpointManager::createNewCheckpoint() {
     LockHolder lh(queueLock);
     if (checkpointList.back()->getNumItems() > 0) {
         uint64_t chk_id = checkpointList.back()->getId();
-        closeOpenCheckpoint_UNLOCKED(chk_id);
         addNewCheckpoint_UNLOCKED(chk_id + 1);
     }
     return checkpointList.back()->getId();
 }
 
-void CheckpointManager::decrCursorOffset_UNLOCKED(CheckpointCursor &cursor,
-                                                  size_t decr) {
-    if (cursor.offset >= decr) {
-        cursor.offset.fetch_sub(decr);
-    } else {
-        cursor.offset = 0;
-        LOG(EXTENSION_LOG_INFO,
-            "%s cursor offset is negative. Reset it to 0.",
-            cursor.name.c_str());
-    }
-}
-
-void CheckpointManager::decrCursorPos_UNLOCKED(CheckpointCursor &cursor) {
-    if (cursor.currentPos != (*(cursor.currentCheckpoint))->begin()) {
-        --(cursor.currentPos);
-    }
-}
-
 uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
     LockHolder lh(queueLock);
     return pCursorPreCheckpointId;
@@ -1538,6 +1472,7 @@ uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
 
 void CheckpointManager::itemsPersisted() {
     LockHolder lh(queueLock);
+    CheckpointCursor& persistenceCursor = tapCursors[pCursorName];
     std::list<Checkpoint*>::iterator itr = persistenceCursor.currentCheckpoint;
     pCursorPreCheckpointId = ((*itr)->getId() > 0) ? (*itr)->getId() - 1 : 0;
 }
@@ -1654,7 +1589,7 @@ void CheckpointManager::addStats(ADD_STAT add_stat, const void *cookie) {
     snprintf(buf, sizeof(buf), "vb_%d:num_checkpoints", vbucketId);
     add_casted_stat(buf, checkpointList.size(), add_stat, cookie);
     snprintf(buf, sizeof(buf), "vb_%d:num_items_for_persistence", vbucketId);
-    add_casted_stat(buf, getNumItemsForPersistence_UNLOCKED(),
+    add_casted_stat(buf, getNumItemsForCursor_UNLOCKED(pCursorName),
                     add_stat, cookie);
 
     cursor_index::iterator tap_it = tapCursors.begin();
index e7e9092..7fb2f00 100644 (file)
@@ -59,6 +59,16 @@ struct index_entry {
     int64_t mutation_id;
 };
 
+typedef struct {
+    uint64_t start;
+    uint64_t end;
+} snapshot_range_t;
+
+typedef struct {
+    uint64_t start;
+    snapshot_range_t range;
+} snapshot_info_t;
+
 /**
  * The checkpoint index maps a key to a checkpoint index_entry.
  */
@@ -110,6 +120,10 @@ public:
         return *this;
     }
 
+    void decrOffset(size_t decr);
+
+    void decrPos();
+
 private:
     std::string                      name;
     std::list<Checkpoint*>::iterator currentCheckpoint;
@@ -148,14 +162,16 @@ typedef enum {
 } queue_dirty_t;
 
 /**
- * Representation of a checkpoint used in the unified queue for persistence and tap.
+ * Representation of a checkpoint used in the unified queue for persistence and
+ * replication.
  */
 class Checkpoint {
 public:
-    Checkpoint(EPStats &st, uint64_t id, uint16_t vbid,
-               checkpoint_state state = CHECKPOINT_OPEN) :
-        stats(st), checkpointId(id), vbucketId(vbid), creationTime(ep_real_time()),
-        checkpointState(state), numItems(0), memOverhead(0) {
+    Checkpoint(EPStats &st, uint64_t id, uint64_t snapStart, uint64_t snapEnd,
+               uint16_t vbid) :
+        stats(st), checkpointId(id), snapStartSeqno(snapStart),
+        snapEndSeqno(snapEnd), vbucketId(vbid), creationTime(ep_real_time()),
+        checkpointState(CHECKPOINT_OPEN), numItems(0), memOverhead(0) {
         stats.memOverhead.fetch_add(memorySize());
         cb_assert(stats.memOverhead.load() < GIGANTOR);
     }
@@ -262,6 +278,22 @@ public:
         return (*pos)->getBySeqno();
     }
 
+    uint64_t getSnapshotStartSeqno() {
+        return snapStartSeqno;
+    }
+
+    void setSnapshotStartSeqno(uint64_t seqno) {
+        snapStartSeqno = seqno;
+    }
+
+    uint64_t getSnapshotEndSeqno() {
+        return snapEndSeqno;
+    }
+
+    void setSnapshotEndSeqno(uint64_t seqno) {
+        snapEndSeqno = seqno;
+    }
+
     std::list<queued_item>::iterator begin() {
         return toWrite.begin();
     }
@@ -309,6 +341,8 @@ public:
 private:
     EPStats                       &stats;
     uint64_t                       checkpointId;
+    uint64_t                       snapStartSeqno;
+    uint64_t                       snapEndSeqno;
     uint16_t                       vbucketId;
     rel_time_t                     creationTime;
     checkpoint_state               checkpointState;
@@ -336,13 +370,18 @@ class CheckpointManager {
 public:
 
     CheckpointManager(EPStats &st, uint16_t vbucket, CheckpointConfig &config,
-                      int64_t lastSeqno, uint64_t checkpointId = 1) :
+                      int64_t lastSeqno, uint64_t lastSnapStart,
+                      uint64_t lastSnapEnd,
+                      shared_ptr<Callback<uint16_t> > cb,
+                      uint64_t checkpointId = 1) :
         stats(st), checkpointConfig(config), vbucketId(vbucket), numItems(0),
         lastBySeqno(lastSeqno), lastClosedChkBySeqno(lastSeqno),
-        persistenceCursor("persistence"), isCollapsedCheckpoint(false),
-        pCursorPreCheckpointId(0) {
-        addNewCheckpoint(checkpointId);
-        registerPersistenceCursor();
+        isCollapsedCheckpoint(false),
+        pCursorPreCheckpointId(0),
+        flusherCB(cb) {
+        LockHolder lh(queueLock);
+        addNewCheckpoint_UNLOCKED(checkpointId, lastSnapStart, lastSnapEnd);
+        registerCursor_UNLOCKED("persistence", checkpointId);
     }
 
     ~CheckpointManager();
@@ -379,38 +418,38 @@ public:
      * which the cursor can start and (2) flag indicating if the cursor starts
      * with the first item on a checkpoint.
      */
-    CursorRegResult registerTAPCursorBySeqno(const std::string &name,
-                                             uint64_t startBySeqno);
+    CursorRegResult registerCursorBySeqno(const std::string &name,
+                                          uint64_t startBySeqno);
 
     /**
-     * Register the new cursor for a given TAP connection
-     * @param name the name of a given TAP connection
+     * Register the new cursor for a given connection
+     * @param name the name of a given connection
      * @param checkpointId the checkpoint Id to start with.
      * @param alwaysFromBeginning the flag indicating if a cursor should be set to the beginning of
      * checkpoint to start with, even if the cursor is currently in that checkpoint.
      * @return true if the checkpoint to start with exists in the queue.
      */
-    bool registerTAPCursor(const std::string &name, uint64_t checkpointId = 1,
-                           bool alwaysFromBeginning = false);
+    bool registerCursor(const std::string &name, uint64_t checkpointId = 1,
+                        bool alwaysFromBeginning = false);
 
     /**
-     * Remove the cursor for a given TAP connection.
-     * @param name the name of a given TAP connection
-     * @return true if the TAP cursor is removed successfully.
+     * Remove the cursor for a given connection.
+     * @param name the name of a given connection
+     * @return true if the cursor is removed successfully.
      */
-    bool removeTAPCursor(const std::string &name);
+    bool removeCursor(const std::string &name);
 
     /**
-     * Get the Id of the checkpoint where the given TAP connection's cursor is currently located.
+     * Get the Id of the checkpoint where the given connections cursor is currently located.
      * If the cursor is not found, return 0 as a checkpoint Id.
-     * @param name the name of a given TAP connection
-     * @return the checkpoint Id for a given TAP connection's cursor.
+     * @param name the name of a given connection
+     * @return the checkpoint Id for a given connections cursor.
      */
-    uint64_t getCheckpointIdForTAPCursor(const std::string &name);
+    uint64_t getCheckpointIdForCursor(const std::string &name);
 
-    size_t getNumOfTAPCursors();
+    size_t getNumOfCursors();
 
-    std::list<std::string> getTAPCursorNames();
+    std::list<std::string> getCursorNames();
 
     /**
      * Queue an item to be written to persistent layer.
@@ -422,24 +461,16 @@ public:
     bool queueDirty(const RCPtr<VBucket> &vb, queued_item& qi, bool genSeqno);
 
     /**
-     * Return the next item to be sent to a given TAP connection
-     * @param name the name of a given TAP connection
-     * @param isLastMutationItem flag indicating if the item to be returned is the last mutation one
-     * in the closed checkpoint.
-     * @return the next item to be sent to a given TAP connection.
-     */
-    queued_item nextItem(const std::string &name, bool &isLastMutationItem,
-                         uint64_t &highSeqno);
-
-    /**
-     * Return the list of items, which needs to be persisted, to the flusher.
-     * @param items the array that will contain the list of items to be persisted and
-     * be pushed into the flusher's outgoing queue where the further IO optimization is performed.
+     * Return the next item to be sent to a given connection
+     * @param name the name of a given connection
+     * @param isLastMutationItem flag indicating if the item to be returned is
+     * the last mutation one in the closed checkpoint.
+     * @return the next item to be sent to a given connection.
      */
-    void getAllItemsForPersistence(std::vector<queued_item> &items);
+    queued_item nextItem(const std::string &name, bool &isLastMutationItem);
 
-    void getAllItemsForCursor(const std::string& name,
-                              std::deque<queued_item> &items);
+    snapshot_range_t getAllItemsForCursor(const std::string& name,
+                                          std::vector<queued_item> &items);
 
     /**
      * Return the total number of items that belong to this checkpoint manager.
@@ -452,41 +483,28 @@ public:
 
     size_t getNumCheckpoints();
 
-    /**
-     * Return the total number of remaining items that should be visited by the persistence cursor.
-     */
-    size_t getNumItemsForPersistence_UNLOCKED();
+    size_t getNumItemsForCursor(const std::string &name);
 
-    size_t getNumItemsForPersistence() {
+    void clear(vbucket_state_t vbState) {
         LockHolder lh(queueLock);
-        return getNumItemsForPersistence_UNLOCKED();
+        clear_UNLOCKED(vbState, lastBySeqno);
     }
 
-    size_t getNumItemsForTAPConnection(const std::string &name);
-
-    /**
-     * Return true if a given key was already visited by all the cursors
-     * and is eligible for eviction.
-     */
-    bool eligibleForEviction(const std::string &key, bool isMeta);
-
     /**
      * Clear all the checkpoints managed by this checkpoint manager.
      */
-    void clear(vbucket_state_t vbState);
+    void clear(RCPtr<VBucket> &vb, uint64_t seqno);
 
     /**
-     * If a given TAP cursor currently points to the checkpoint_end dummy item,
-     * decrease its current position by 1. This function is mainly used for checkpoint
-     * synchronization between the master and slave nodes.
-     * @param name the name of a given TAP connection
+     * If a given cursor currently points to the checkpoint_end dummy item,
+     * decrease its current position by 1. This function is mainly used for
+     * checkpoint synchronization between the master and slave nodes.
+     * @param name the name of a given connection
      */
-    void decrTapCursorFromCheckpointEnd(const std::string &name);
+    void decrCursorFromCheckpointEnd(const std::string &name);
 
     bool hasNext(const std::string &name);
 
-    bool hasNextForPersistence();
-
     const CheckpointConfig &getCheckpointConfig() const {
         return checkpointConfig;
     }
@@ -499,7 +517,7 @@ public:
      */
     uint64_t createNewCheckpoint();
 
-    void resetTAPCursors(const std::list<std::string> &cursors);
+    void resetCursors(const std::list<std::string> &cursors);
 
     /**
      * Get id of the previous checkpoint that is followed by the checkpoint
@@ -517,23 +535,35 @@ public:
      * 1) Check if the checkpoint manager contains any checkpoints with IDs >= i1.
      * 2) If exists, collapse all checkpoints and set the open checkpoint id to a given ID.
      * 3) Otherwise, simply create a new open checkpoint with a given ID.
-     * This method is mainly for dealing with rollback events from a TAP producer.
+     * This method is mainly for dealing with rollback events from a producer.
      * @param id the id of a checkpoint to be created.
      * @param vbucket vbucket of the checkpoint.
      */
     void checkAndAddNewCheckpoint(uint64_t id, const RCPtr<VBucket> &vbucket);
 
-    /**
-     * Gets the mutation id for a given checkpoint item.
-     * @param The checkpoint to look for the key in
-     * @param The key to get the mutation id for
-     * @param isMeta indicates if the key is a checkpoint meta item
-     * @return The mutation id or 0 if not found
-     */
-    uint64_t getMutationIdForKey(uint64_t chk_id, std::string key, bool isMeta);
+    bool closeOpenCheckpoint();
+
+    void setBackfillPhase(uint64_t start, uint64_t end);
+
+    void createSnapshot(uint64_t snapStartSeqno, uint64_t snapEndSeqno);
+
+    void resetSnapshotRange();
+
+    void updateCurrentSnapshotEnd(uint64_t snapEnd) {
+        LockHolder lh(queueLock);
+        checkpointList.back()->setSnapshotEndSeqno(snapEnd);
+    }
+
+    snapshot_info_t getSnapshotInfo();
 
     bool incrCursor(CheckpointCursor &cursor);
 
+    void notifyFlusher() {
+        if (flusherCB) {
+            flusherCB->callback(vbucketId);
+        }
+    }
+
     void setBySeqno(int64_t seqno) {
         LockHolder lh(queueLock);
         lastBySeqno = seqno;
@@ -554,15 +584,19 @@ public:
         return ++lastBySeqno;
     }
 
+    static const std::string pCursorName;
+
 private:
 
-    bool removeTAPCursor_UNLOCKED(const std::string &name);
+    bool removeCursor_UNLOCKED(const std::string &name);
 
-    bool registerTAPCursor_UNLOCKED(const std::string &name,
+    bool registerCursor_UNLOCKED(const std::string &name,
                                     uint64_t checkpointId = 1,
                                     bool alwaysFromBeginning = false);
 
-    void registerPersistenceCursor();
+    size_t getNumItemsForCursor_UNLOCKED(const std::string &name);
+
+    void clear_UNLOCKED(vbucket_state_t vbState, uint64_t seqno);
 
     /**
      * Create a new open checkpoint and add it to the checkpoint list.
@@ -571,13 +605,11 @@ private:
      */
     bool addNewCheckpoint_UNLOCKED(uint64_t id);
 
-    void removeInvalidCursorsOnCheckpoint(Checkpoint *pCheckpoint);
+    bool addNewCheckpoint_UNLOCKED(uint64_t id,
+                                   uint64_t snapStartSeqno,
+                                   uint64_t snapEndSeqno);
 
-    /**
-     * Create a new open checkpoint and add it to the checkpoint list.
-     * @param id the id of a checkpoint to be created.
-     */
-    bool addNewCheckpoint(uint64_t id);
+    void removeInvalidCursorsOnCheckpoint(Checkpoint *pCheckpoint);
 
     bool moveCursorToNextCheckpoint(CheckpointCursor &cursor);
 
@@ -597,12 +629,7 @@ private:
         return checkOpenCheckpoint_UNLOCKED(forceCreation, timeBound);
     }
 
-    bool closeOpenCheckpoint_UNLOCKED(uint64_t id);
-    bool closeOpenCheckpoint(uint64_t id);
-
-    void decrCursorOffset_UNLOCKED(CheckpointCursor &cursor, size_t decr);
-
-    void decrCursorPos_UNLOCKED(CheckpointCursor &cursor);
+    bool closeOpenCheckpoint_UNLOCKED();
 
     bool isLastMutationItemInCheckpoint(CheckpointCursor &cursor);
 
@@ -630,11 +657,12 @@ private:
     int64_t                  lastBySeqno;
     int64_t                  lastClosedChkBySeqno;
     std::list<Checkpoint*>   checkpointList;
-    CheckpointCursor         persistenceCursor;
     bool                     isCollapsedCheckpoint;
     uint64_t                 lastClosedCheckpointId;
     uint64_t                 pCursorPreCheckpointId;
     cursor_index             tapCursors;
+
+    shared_ptr<Callback<uint16_t> > flusherCB;
 };
 
 /**
index c5d0c5b..937569a 100644 (file)
@@ -35,7 +35,7 @@ public:
     CheckpointVisitor(EventuallyPersistentStore *s, EPStats &st,
                       AtomicValue<bool> &sfin)
         : store(s), stats(st), removed(0),
-          stateFinalizer(sfin) {}
+          wasHighMemoryUsage(s->isMemoryUsageTooHigh()), stateFinalizer(sfin) {}
 
     bool visitBucket(RCPtr<VBucket> &vb) {
         currentBucket = vb;
@@ -68,12 +68,19 @@ public:
     void complete() {
         bool inverse = false;
         stateFinalizer.compare_exchange_strong(inverse, true);
+
+        // Wake up any sleeping backfill tasks if the memory usage is lowered
+        // below the high watermark as a result of checkpoint removal.
+        if (wasHighMemoryUsage && !store->isMemoryUsageTooHigh()) {
+            store->getEPEngine().getDcpConnMap().notifyBackfillManagerTasks();
+        }
     }
 
 private:
     EventuallyPersistentStore *store;
     EPStats                   &stats;
     size_t                     removed;
+    bool                       wasHighMemoryUsage;
     AtomicValue<bool>         &stateFinalizer;
 };
 
index c27c5b0..7a0da41 100644 (file)
@@ -76,6 +76,9 @@ using std::tr1::unordered_map;
     TypeName(const TypeName&);                  \
     void operator=(const TypeName&)
 
+#define DISALLOW_ASSIGN(TypeName)               \
+    void operator=(const TypeName&)
+
 // Utility functions implemented in various modules.
 
 extern void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...);
@@ -165,6 +168,25 @@ inline bool parseUint32(const char *str, uint32_t *out) {
     return false;
 }
 
+inline bool parseInt64(const char *str, int64_t *out) {
+    cb_assert(out != NULL);
+    errno = 0;
+    *out = 0;
+    char *endptr;
+
+    int64_t ll = strtoll(str, &endptr, 10);
+    if (errno == ERANGE) {
+        return false;
+    }
+
+    if (isspace(*endptr) || (*endptr == '\0' && endptr != str)) {
+        *out = static_cast<int64_t>(ll);
+        return true;
+    }
+
+    return false;
+}
+
 #define xisspace(c) isspace((unsigned char)c)
 inline bool parseUint64(const char *str, uint64_t *out) {
     cb_assert(out != NULL);
@@ -277,5 +299,29 @@ bool sorted(ForwardIterator first, ForwardIterator last, Compare compare) {
     return is_sorted;
 }
 
+/**
+ * Returns the last system call error as a string
+ */
+inline std::string getStringErrno(void) {
+    std::stringstream ss;
+#ifdef WIN32
+    char* win_msg = NULL;
+    DWORD err = GetLastError();
+    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
+                   FORMAT_MESSAGE_FROM_SYSTEM |
+                   FORMAT_MESSAGE_IGNORE_INSERTS,
+                   NULL, err,
+                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+                   (LPTSTR) &win_msg,
+                   0, NULL);
+    ss << "errno = " << err << ": '" << win_msg << "'";
+    LocalFree(win_msg);
+#else
+    ss << "errno = " << errno << ": '" << strerror(errno) << "'";
+#endif
+
+    return ss.str();
+}
+
 #define GIGANTOR ((size_t)1<<(sizeof(size_t)*8-1))
 #endif  // SRC_COMMON_H_
index 82611c5..145cd47 100644 (file)
@@ -125,11 +125,6 @@ typedef unsigned int useconds_t;
 #undef HAVE_CXX11_SUPPORT
 #endif
 
-#ifdef HAVE_CXX11_SUPPORT
-/* For now we'll allow to enable/disable separate features of the language */
-#define USE_CXX11_ATOMICS 1
-#endif
-
 #ifdef HAVE_SCHED_H
 #include <sched.h>
 #endif
index 8503e61..d595d17 100644 (file)
  */
 
 #include "config.h"
-
 #include "conflict_resolution.h"
-#include "item.h"
 #include "stored-value.h"
 
-bool SeqBasedResolution::resolve(StoredValue *v, const ItemMetaData &meta,
-                                 bool deletion) {
+static const char * getConflictResModeStr(enum conflict_resolution_mode confResMode) {
+    switch (confResMode) {
+        case revision_seqno:
+            return "revision_seqno";
+        case last_write_wins:
+            return "last_write_wins";
+        default:
+            return "unknown";
+    }
+}
+
+/**
+ * A conflict resolution strategy that compares the meta data for a document
+ * from a remote node and this node. This conflict resolution works by picking
+ * a winning document based on comparing meta data fields and finding a field
+ * that has a larger value than the other document's fields. The fields are
+ * compared in the following order: cas, rev seqno, expiration, flags. The cas
+ * is chosen as the first field of comparison if the document's conflict
+ * resolution is set last_write_wins. The last_write_wins indicates that the cas
+ * generated for the document was using a Hybrid Logical Clock (HLC). If all
+ * fields are equal than the local document is chosen as the winner.
+ */
+bool ConflictResolution::resolve_lww(StoredValue *v,
+                                     const ItemMetaData &meta,
+                                     bool deletion) {
+    if (!v->isTempNonExistentItem()) {
+        if (v->getCas() > meta.cas) {
+            return false;
+        } else if (v->getCas() == meta.cas) {
+            if (v->getRevSeqno() > meta.revSeqno) {
+                return false;
+            } else if (v->getRevSeqno() == meta.revSeqno) {
+                if (deletion || v->getExptime() > meta.exptime) {
+                    return false;
+                } else if (v->getExptime() == meta.exptime) {
+                    if (v->getFlags() >= meta.flags) {
+                        return false;
+                    }
+                }
+            }
+        }
+    }
+    return true;
+}
+
+/**
+ * A conflict resolution strategy that compares the meta data for a document
+ * from a remote node and this node. The conflict strategy works by picking
+ * a winning document based on comparing meta data fields and finding a field
+ * that has a larger value than the other documents field. The fields are
+ * compared in the following order: rev seqno, cas, expiration, flags. If all
+ * fields are equal than the local document is chosen as the winner.
+ */
+bool ConflictResolution::resolve_rev_seqno(StoredValue *v,
+                                           const ItemMetaData &meta,
+                                           bool deletion) {
     if (!v->isTempNonExistentItem()) {
         if (v->getRevSeqno() > meta.revSeqno) {
             return false;
@@ -42,3 +94,29 @@ bool SeqBasedResolution::resolve(StoredValue *v, const ItemMetaData &meta,
     }
     return true;
 }
+
+bool ConflictResolution::resolve(RCPtr<VBucket> &vb , StoredValue *v,
+                                 const ItemMetaData &meta,
+                                 bool deletion, enum conflict_resolution_mode
+                                 itmConfResMode) {
+    if (vb->isTimeSyncEnabled()) {
+        if (v->getConflictResMode() == last_write_wins &&
+                itmConfResMode == last_write_wins) {
+            return resolve_lww(v, meta, deletion);
+        } else if ((v->getConflictResMode() != last_write_wins &&
+                       itmConfResMode == last_write_wins) ||
+                   (v->getConflictResMode() == last_write_wins &&
+                       itmConfResMode != last_write_wins)) {
+                   //log the event when the time sync is enabled and
+                   //one of the mutations is not eligible for last_write_wins
+                   LOG(EXTENSION_LOG_WARNING,
+                       "Resolving conflict by comparing rev seqno: key: %s,"
+                       "source conflict resolution mode: %s, target conflict resolution"
+                       "mode: %s", v->getKey().c_str(),
+                       getConflictResModeStr(itmConfResMode),
+                       getConflictResModeStr(v->getConflictResMode()));
+        }
+    }
+
+    return resolve_rev_seqno(v, meta, deletion);
+}
index 5831396..e6d1c72 100644 (file)
@@ -19,6 +19,8 @@
 #define SRC_CONFLICT_RESOLUTION_H_ 1
 
 #include "config.h"
+#include "item.h"
+#include <vbucket.h>
 
 class ItemMetaData;
 class StoredValue;
@@ -31,7 +33,7 @@ class ConflictResolution {
 public:
     ConflictResolution() {}
 
-    virtual ~ConflictResolution() {}
+    ~ConflictResolution() {}
 
     /**
      * Resolves a conflict between two documents.
@@ -40,28 +42,20 @@ public:
      * @param meta the remote document's meta data
      * @param isDelete the flag indicating if conflict resolution is
      * for delete operations
+     * @param itmConfResMode conflict resolution mode of the
+     * remote document
      * @return true is the remote document is the winner, false otherwise
      */
-    virtual bool resolve(StoredValue *v, const ItemMetaData &meta,
-                         bool isDelete = false) = 0;
-};
-
-/**
- * A conflict resolution strategy that compares the meta data for a document
- * from a remote node and this node. The conflict strategy works by picking
- * a winning document based on comparing meta data fields and finding a field
- * that has a larger value than the other documents field. The fields are
- * compared in the following order: seqno, cas, expiration, flags. If all fields
- * are equal than the local document is chosen as the winner.
- */
-class SeqBasedResolution : public ConflictResolution {
-public:
-    SeqBasedResolution() {}
+    bool resolve(RCPtr<VBucket> &vb, StoredValue *v, const ItemMetaData &meta,
+                 bool isDelete = false,
+                 enum conflict_resolution_mode itmConfResMode = revision_seqno);
 
-    ~SeqBasedResolution() {}
+private:
+    bool resolve_rev_seqno(StoredValue *v, const ItemMetaData &meta,
+                           bool isDelete = false);
 
-    bool resolve(StoredValue *v, const ItemMetaData &meta,
-                 bool isDelete = false);
+    bool resolve_lww(StoredValue *v, const ItemMetaData &meta,
+                     bool isDelete = false);
 };
 
 #endif  // SRC_CONFLICT_RESOLUTION_H_
index 7eb96e0..6fd4c05 100644 (file)
 #include "executorthread.h"
 #include "tapconnection.h"
 #include "connmap.h"
+#include "dcp-backfill-manager.h"
 #include "dcp-consumer.h"
 #include "dcp-producer.h"
 
 size_t ConnMap::vbConnLockNum = 32;
 const double ConnNotifier::DEFAULT_MIN_STIME = 1.0;
+const uint32_t DcpConnMap::dbFileMem = 10 * 1024;
+const uint16_t DcpConnMap::numBackfillsThreshold = 4096;
+const uint8_t DcpConnMap::numBackfillsMemThreshold = 1;
 
 /**
  * NonIO task to free the resource of a tap connection.
@@ -249,11 +253,13 @@ void ConnMap::notifyAllPausedConnections() {
     while (!queue.empty()) {
         connection_t &conn = queue.front();
         Notifiable *tp = dynamic_cast<Notifiable*>(conn.get());
-        if (tp && tp->isPaused() && conn->isReserved()) {
-            engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
-            tp->setNotifySent(true);
+        if (tp) {
+            tp->setNotificationScheduled(false);
+            if (tp->isPaused() && conn->isReserved()) {
+                engine.notifyIOComplete(conn->getCookie(), ENGINE_SUCCESS);
+                tp->setNotifySent(true);
+            }
         }
-        tp->setNotificationScheduled(false);
         queue.pop();
     }
 }
@@ -875,7 +881,7 @@ void TapConnMap::removeTapCursors_UNLOCKED(TapProducer *tp) {
                 LOG(EXTENSION_LOG_INFO,
                     "%s Remove the TAP cursor from vbucket %d",
                     tp->logHeader(), vbid);
-                vb->checkpointManager.removeTAPCursor(tp->getName());
+                vb->checkpointManager.removeCursor(tp->getName());
             }
         }
     }
@@ -926,8 +932,18 @@ void TAPSessionStats::clearStats(const std::string &name) {
 }
 
 DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
-    : ConnMap(e) {
-
+    : ConnMap(e),
+      aggrDcpConsumerBufferSize(0) {
+    numActiveSnoozingBackfills = 0;
+    updateMaxActiveSnoozingBackfills(engine.getEpStats().getMaxDataSize());
+
+    // Note: these allocations are deleted by ~Configuration
+    engine.getConfiguration().
+        addValueChangedListener("dcp_consumer_process_buffered_messages_yield_limit",
+                                new DcpConfigChangeListener(*this));
+    engine.getConfiguration().
+        addValueChangedListener("dcp_consumer_process_buffered_messages_batch_size",
+                                new DcpConfigChangeListener(*this));
 }
 
 DcpConsumer *DcpConnMap::newConsumer(const void* cookie,
@@ -989,6 +1005,45 @@ ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler* conn,
     return conn->addStream(opaque, vbucket, flags);
 }
 
+
+DcpConnMap::DcpConfigChangeListener::DcpConfigChangeListener(DcpConnMap& connMap)
+    : myConnMap(connMap){}
+
+void DcpConnMap::DcpConfigChangeListener::sizeValueChanged(const std::string &key,
+                                                           size_t value) {
+    if (key == "dcp_consumer_process_buffered_messages_yield_limit") {
+        myConnMap.consumerYieldConfigChanged(value);
+    } else if (key == "dcp_consumer_process_buffered_messages_batch_size") {
+        myConnMap.consumerBatchSizeConfigChanged(value);
+    }
+}
+
+/*
+ * Find all DcpConsumers and set the yield threshold
+ */
+void DcpConnMap::consumerYieldConfigChanged(size_t newValue) {
+    LockHolder lh(connsLock);
+    for (auto it : all) {
+        DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it.get());
+        if (dcpConsumer) {
+            dcpConsumer->setProcessorYieldThreshold(newValue);
+        }
+    }
+}
+
+/*
+ * Find all DcpConsumers and set the processor batchsize
+ */
+void DcpConnMap::consumerBatchSizeConfigChanged(size_t newValue) {
+    LockHolder lh(connsLock);
+    for (auto it : all) {
+        DcpConsumer* dcpConsumer = dynamic_cast<DcpConsumer*>(it.get());
+        if (dcpConsumer) {
+            dcpConsumer->setProcessBufferedMessagesBatchSize(newValue);
+        }
+    }
+}
+
 DcpProducer *DcpConnMap::newProducer(const void* cookie,
                                      const std::string &name,
                                      bool notifyOnly)
@@ -1055,7 +1110,7 @@ void DcpConnMap::closeAllStreams_UNLOCKED() {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
         if (producer) {
             producer->closeAllStreams();
-            producer->cancelCheckpointProcessorTask();
+            producer->clearCheckpointProcessorTaskQueues();
         } else {
             static_cast<DcpConsumer*>(itr->second.get())->closeAllStreams();
         }
@@ -1089,7 +1144,7 @@ void DcpConnMap::disconnect_UNLOCKED(const void *cookie) {
         DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
         if (producer) {
             producer->closeAllStreams();
-            producer->cancelCheckpointProcessorTask();
+            producer->clearCheckpointProcessorTaskQueues();
         } else {
             static_cast<DcpConsumer*>(conn.get())->closeAllStreams();
         }
@@ -1171,8 +1226,7 @@ void DcpConnMap::removeVBConnections(connection_t &conn) {
     }
 }
 
-void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
-{
+void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno) {
     size_t lock_num = vbid % vbConnLockNum;
     SpinLockHolder lh(&vbConnLocks[lock_num]);
 
@@ -1184,8 +1238,47 @@ void DcpConnMap::notifyVBConnections(uint16_t vbid, uint64_t bySeqno)
     }
 }
 
-void DcpConnMap::addStats(ADD_STAT add_stat, const void *c) {
+void DcpConnMap::notifyBackfillManagerTasks() {
     LockHolder lh(connsLock);
-    add_casted_stat("ep_dcp_dead_conn_count", deadConnections.size(),
-                    add_stat, c);
+    std::map<const void*, connection_t>::iterator itr = map_.begin();
+    for (; itr != map_.end(); ++itr) {
+        DcpProducer* producer = dynamic_cast<DcpProducer*> (itr->second.get());
+        if (producer) {
+            producer->getBackfillManager()->wakeUpTask();
+        }
+    }
+}
+
+bool DcpConnMap::canAddBackfillToActiveQ()
+{
+    SpinLockHolder lh(&numBackfillsLock);
+    if (numActiveSnoozingBackfills < maxActiveSnoozingBackfills) {
+        ++numActiveSnoozingBackfills;
+        return true;
+    }
+    return false;
+}
+
+void DcpConnMap::decrNumActiveSnoozingBackfills()
+{
+    SpinLockHolder lh(&numBackfillsLock);
+    if (numActiveSnoozingBackfills > 0) {
+        --numActiveSnoozingBackfills;
+    } else {
+        LOG(EXTENSION_LOG_WARNING, "ActiveSnoozingBackfills already zero!!!");
+    }
+}
+
+void DcpConnMap::updateMaxActiveSnoozingBackfills(size_t maxDataSize)
+{
+    double numBackfillsMemThresholdPercent =
+                         static_cast<double>(numBackfillsMemThreshold)/100;
+    size_t max = maxDataSize * numBackfillsMemThresholdPercent / dbFileMem;
+    /* We must have atleast one active/snoozing backfill */
+    SpinLockHolder lh(&numBackfillsLock);
+    maxActiveSnoozingBackfills =
+        std::max(static_cast<size_t>(1),
+                 std::min(max, static_cast<size_t>(numBackfillsThreshold)));
+    LOG(EXTENSION_LOG_DEBUG, "Max active snoozing backfills set to %d",
+        maxActiveSnoozingBackfills);
 }
index 067348a..f6ba910 100644 (file)
@@ -239,7 +239,8 @@ protected:
 class ConnNotifier {
 public:
     ConnNotifier(conn_notifier_type ntype, ConnMap &cm)
-        : notifier_type(ntype), connMap(cm), pendingNotification(false)  { }
+        : notifier_type(ntype), connMap(cm), task(0),
+          pendingNotification(false)  { }
 
     void start();
 
@@ -429,9 +430,7 @@ private:
 
 };
 
-
 class DcpConnMap : public ConnMap {
-
 public:
 
     DcpConnMap(EventuallyPersistentEngine &engine);
@@ -454,6 +453,8 @@ public:
 
     void notifyVBConnections(uint16_t vbid, uint64_t bySeqno);
 
+    void notifyBackfillManagerTasks();
+
     void removeVBConnections(connection_t &conn);
 
     void vbucketStateChanged(uint16_t vbucket, vbucket_state_t state);
@@ -464,10 +465,44 @@ public:
 
     void manageConnections();
 
+    bool canAddBackfillToActiveQ();
+
+    void decrNumActiveSnoozingBackfills();
+
+    void updateMaxActiveSnoozingBackfills(size_t maxDataSize);
+
+    uint16_t getNumActiveSnoozingBackfills () const {
+        return numActiveSnoozingBackfills;
+    }
+
+    uint16_t getMaxActiveSnoozingBackfills () const {
+        return maxActiveSnoozingBackfills;
+    }
+
+    size_t getAggrDcpConsumerBufferSize () const {
+        return aggrDcpConsumerBufferSize.load();
+    }
+
+    void incAggrDcpConsumerBufferSize (size_t bufSize) {
+        aggrDcpConsumerBufferSize.fetch_add(bufSize);
+    }
+
+    void decAggrDcpConsumerBufferSize (size_t bufSize) {
+        aggrDcpConsumerBufferSize.fetch_sub(bufSize);
+    }
+
     ENGINE_ERROR_CODE addPassiveStream(ConnHandler* conn, uint32_t opaque,
                                        uint16_t vbucket, uint32_t flags);
 
-    void addStats(ADD_STAT add_stat, const void *c);
+    /*
+     * Change the value at which a DcpConsumer::Processor task will yield
+     */
+    void consumerYieldConfigChanged(size_t newValue);
+
+    /*
+     * Change the batchsize that the DcpConsumer::Processor operates with
+     */
+    void consumerBatchSizeConfigChanged(size_t newValue);
 
 private:
 
@@ -478,6 +513,27 @@ private:
     void closeAllStreams_UNLOCKED();
 
     std::list<connection_t> deadConnections;
+
+    SpinLock numBackfillsLock;
+    /* Db file memory */
+    static const uint32_t dbFileMem;
+    uint16_t numActiveSnoozingBackfills;
+    uint16_t maxActiveSnoozingBackfills;
+    /* Max num of backfills we want to have irrespective of memory */
+    static const uint16_t numBackfillsThreshold;
+    /* Max percentage of memory we want backfills to occupy */
+    static const uint8_t numBackfillsMemThreshold;
+    /* Total memory used by all DCP consumer buffers */
+    AtomicValue<size_t> aggrDcpConsumerBufferSize;
+
+    class DcpConfigChangeListener : public ValueChangedListener {
+    public:
+        DcpConfigChangeListener(DcpConnMap& connMap);
+        virtual ~DcpConfigChangeListener() { }
+        virtual void sizeValueChanged(const std::string &key, size_t value);
+    private:
+        DcpConnMap& myConnMap;
+    };
 };
 
 
index ed026e2..a5767b7 100644 (file)
@@ -97,7 +97,7 @@ extern "C" {
         StatFile* sf = reinterpret_cast<StatFile*>(h);
         sf->stats->readSizeHisto.add(sz);
         if(sf->last_offs) {
-            sf->stats->readSeekHisto.add(abs(off - sf->last_offs));
+            sf->stats->readSeekHisto.add(std::abs(off - sf->last_offs));
         }
         sf->last_offs = off;
         BlockTimer bt(&sf->stats->readTimeHisto);
index c90b150..26d5cec 100644 (file)
@@ -60,6 +60,9 @@ static const int MUTATION_SUCCESS = 1;
 static const int MAX_OPEN_DB_RETRY = 10;
 
 static const uint32_t DEFAULT_META_LEN = 16;
+static const uint32_t V1_META_LEN = 18;
+static const uint32_t V2_META_LEN = 19;
+
 
 class NoLookupCallback : public Callback<CacheLookup> {
 public:
@@ -68,13 +71,6 @@ public:
     void callback(CacheLookup&) {}
 };
 
-class NoRangeCallback : public Callback<SeqnoRange> {
-public:
-    NoRangeCallback() {}
-    ~NoRangeCallback() {}
-    void callback(SeqnoRange&) {}
-};
-
 extern "C" {
     static int recordDbDumpC(Db *db, DocInfo *docinfo, void *ctx)
     {
@@ -97,27 +93,6 @@ static std::string getStrError(Db *db) {
     return errorStr;
 }
 
-static std::string getSystemStrerror(void) {
-    std::stringstream ss;
-#ifdef WIN32
-    char* win_msg = NULL;
-    DWORD err = GetLastError();
-    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
-                   FORMAT_MESSAGE_FROM_SYSTEM |
-                   FORMAT_MESSAGE_IGNORE_INSERTS,
-                   NULL, err,
-                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
-                   (LPTSTR) &win_msg,
-                   0, NULL);
-    ss << "errno = " << err << ": '" << win_msg << "'";
-    LocalFree(win_msg);
-#else
-    ss << "errno = " << errno << ": '" << strerror(errno) << "'";
-#endif
-
-    return ss.str();
-}
-
 static uint8_t determine_datatype(const unsigned char* value,
                                   size_t length) {
     if (checkUTF8JSON(value, length)) {
@@ -223,14 +198,6 @@ public:
     uint16_t vbId;
 };
 
-struct LoadResponseCtx {
-    shared_ptr<Callback<GetValue> > callback;
-    shared_ptr<Callback<CacheLookup> > lookup;
-    uint16_t vbucketId;
-    bool keysonly;
-    EPStats *stats;
-};
-
 struct AllKeysCtx {
     AllKeysCtx(AllKeysCB *callback, uint32_t cnt) :
         cb(callback), count(cnt) { }
@@ -248,6 +215,7 @@ CouchRequest::CouchRequest(const Item &it, uint64_t rev,
     uint32_t flags = it.getFlags();
     uint32_t vlen = it.getNBytes();
     uint32_t exptime = it.getExptime();
+    uint8_t confresmode = static_cast<uint8_t>(it.getConflictResMode());
 
     // Datatype used to determine whether document requires compression or not
     uint8_t datatype;
@@ -275,8 +243,20 @@ CouchRequest::CouchRequest(const Item &it, uint64_t rev,
     memcpy(meta + 8, &exptime, 4);
     memcpy(meta + 12, &flags, 4);
     *(meta + DEFAULT_META_LEN) = FLEX_META_CODE;
-    memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET, it.getExtMeta(),
-           it.getExtMetaLen());
+
+    //For a deleted item, there is no extended meta data available
+    //as part of the item object, hence by default populate the
+    //data type to PROTOCOL_BINARY_RAW_BYTES
+    if (del) {
+        uint8_t del_datatype = PROTOCOL_BINARY_RAW_BYTES;
+        memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
+               &del_datatype, sizeof(uint8_t));
+    } else {
+        memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET, it.getExtMeta(),
+               it.getExtMetaLen());
+    }
+    memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET + EXT_META_LEN,
+           &confresmode, CONFLICT_RES_META_LEN);
 
     dbDocInfo.db_seq = it.getBySeqno();
     dbDocInfo.rev_meta.buf = reinterpret_cast<char *>(meta);
@@ -304,9 +284,10 @@ CouchRequest::CouchRequest(const Item &it, uint64_t rev,
     start = gethrtime();
 }
 
-CouchKVStore::CouchKVStore(EPStats &stats, Configuration &config, bool read_only) :
-    KVStore(read_only), epStats(stats), configuration(config),
-    dbname(configuration.getDbname()), intransaction(false)
+CouchKVStore::CouchKVStore(Configuration &config, bool read_only) :
+    KVStore(read_only), configuration(config),
+    dbname(configuration.getDbname()), intransaction(false),
+    backfillCounter(0)
 {
     open();
     statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
@@ -326,8 +307,7 @@ CouchKVStore::CouchKVStore(EPStats &stats, Configuration &config, bool read_only
 }
 
 CouchKVStore::CouchKVStore(const CouchKVStore &copyFrom) :
-    KVStore(copyFrom), epStats(copyFrom.epStats),
-    configuration(copyFrom.configuration),
+    KVStore(copyFrom), configuration(copyFrom.configuration),
     dbname(copyFrom.dbname), dbFileRevMap(copyFrom.dbFileRevMap),
     numDbFiles(copyFrom.numDbFiles), intransaction(false)
 {
@@ -363,7 +343,9 @@ void CouchKVStore::initialize() {
         }
 
         db = NULL;
-        removeCompactFile(dbname, id, rev);
+        if (!isReadOnly()) {
+            removeCompactFile(dbname, id, rev);
+        }
     }
 }
 
@@ -408,7 +390,6 @@ void CouchKVStore::set(const Item &itm, Callback<mutation_result> &cb) {
     cb_assert(intransaction);
     bool deleteItem = false;
     CouchRequestCallback requestcb;
-    std::string dbFile;
     uint64_t fileRev = dbFileRevMap[itm.getVBucketId()];
 
     // each req will be de-allocated after commit
@@ -417,10 +398,9 @@ void CouchKVStore::set(const Item &itm, Callback<mutation_result> &cb) {
     pendingReqsQ.push_back(req);
 }
 
-void CouchKVStore::get(const std::string &key, uint64_t, uint16_t vb,
+void CouchKVStore::get(const std::string &key, uint16_t vb,
                        Callback<GetValue> &cb, bool fetchDelete) {
     Db *db = NULL;
-    std::string dbFile;
     GetValue rv;
     uint64_t fileRev = dbFileRevMap[vb];
 
@@ -430,8 +410,8 @@ void CouchKVStore::get(const std::string &key, uint64_t, uint16_t vb,
         ++st.numGetFailure;
         LOG(EXTENSION_LOG_WARNING,
             "Warning: failed to open database to retrieve data "
-            "from vBucketId = %d, key = %s, file = %s\n",
-            vb, key.c_str(), dbFile.c_str());
+            "from vBucketId = %d, key = %s\n",
+            vb, key.c_str());
         rv.setStatus(couchErr2EngineErr(errCode));
         cb.callback(rv);
         return;
@@ -452,10 +432,10 @@ void CouchKVStore::getWithHeader(void *dbHandle, const std::string &key,
     DocInfo *docInfo = NULL;
     sized_buf id;
     GetValue rv;
-    std::string dbFile;
 
     id.size = key.size();
     id.buf = const_cast<char *>(key.c_str());
+
     couchstore_error_t errCode = couchstore_docinfo_by_id(db, (uint8_t *)id.buf,
                                                           id.size, &docInfo);
     if (errCode != COUCHSTORE_SUCCESS) {
@@ -463,8 +443,8 @@ void CouchKVStore::getWithHeader(void *dbHandle, const std::string &key,
             // log error only if this is non-xdcr case
             LOG(EXTENSION_LOG_WARNING,
                 "Warning: failed to retrieve doc info from "
-                "database, name=%s key=%s error=%s [%s]\n",
-                dbFile.c_str(), id.buf, couchstore_strerror(errCode),
+                "database, vbucketId=%d, key=%s error=%s [%s]\n",
+                vb, id.buf, couchstore_strerror(errCode),
                 couchkvstore_strerrno(db, errCode).c_str());
         }
     } else {
@@ -473,8 +453,8 @@ void CouchKVStore::getWithHeader(void *dbHandle, const std::string &key,
         if (errCode != COUCHSTORE_SUCCESS) {
             LOG(EXTENSION_LOG_WARNING,
                 "Warning: failed to retrieve key value from "
-                "database, name=%s key=%s error=%s [%s] "
-                "deleted=%s", dbFile.c_str(), id.buf,
+                "database, vbucketId=%d key=%s error=%s [%s] "
+                "deleted=%s", vb, id.buf,
                 couchstore_strerror(errCode),
                 couchkvstore_strerrno(db, errCode).c_str(),
                 docInfo->deleted ? "yes" : "no");
@@ -530,6 +510,7 @@ void CouchKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t &itms) {
     }
 
     GetMultiCbCtx ctx(*this, vb, itms);
+
     errCode = couchstore_docinfos_by_id(db, ids, itms.size(),
                                         0, getMultiCbC, &ctx);
     if (errCode != COUCHSTORE_SUCCESS) {
@@ -573,7 +554,8 @@ void CouchKVStore::delVBucket(uint16_t vbucket) {
 
     std::string failovers("[{\"id\":0, \"seq\":0}]");
     cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
-                                                0, 0, failovers);
+                                                0, 0, 0, INITIAL_DRIFT,
+                                                failovers);
     updateDbFileMap(vbucket, 1);
 }
 
@@ -647,6 +629,9 @@ static std::string getDBFileName(const std::string &dbname,
 
 static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
     if ((*info)->rev_meta.size == DEFAULT_META_LEN) {
+        // Metadata doesn't have flex_meta_code, datatype and
+        // conflict_resolution_mode, provision space for
+        // these paramenters.
         const unsigned char* data;
         bool ret;
         if (((*info)->content_meta | COUCH_DOC_IS_COMPRESSED) ==
@@ -673,7 +658,8 @@ static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
         DocInfo *docinfo = (DocInfo *) calloc (sizeof(DocInfo) +
                                                (*info)->id.size +
                                                (*info)->rev_meta.size +
-                                               FLEX_DATA_OFFSET + EXT_META_LEN,
+                                               FLEX_DATA_OFFSET + EXT_META_LEN +
+                                               sizeof(uint8_t),
                                                sizeof(uint8_t));
         if (!docinfo) {
             LOG(EXTENSION_LOG_WARNING, "Failed to allocate docInfo, "
@@ -692,9 +678,51 @@ static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
                &flex_code, FLEX_DATA_OFFSET);
         memcpy(extra + (*info)->rev_meta.size + FLEX_DATA_OFFSET,
                &datatype, sizeof(uint8_t));
+        uint8_t conflict_resolution_mode = revision_seqno;
+        memcpy(extra + (*info)->rev_meta.size + FLEX_DATA_OFFSET + EXT_META_LEN,
+               &conflict_resolution_mode, sizeof(uint8_t));
         docinfo->rev_meta.buf = extra;
         docinfo->rev_meta.size = (*info)->rev_meta.size +
-                                 FLEX_DATA_OFFSET + EXT_META_LEN;
+                                 FLEX_DATA_OFFSET + EXT_META_LEN +
+                                 sizeof(uint8_t);
+
+        docinfo->db_seq = (*info)->db_seq;
+        docinfo->rev_seq = (*info)->rev_seq;
+        docinfo->deleted = (*info)->deleted;
+        docinfo->content_meta = (*info)->content_meta;
+        docinfo->bp = (*info)->bp;
+        docinfo->size = (*info)->size;
+
+        couchstore_free_docinfo(*info);
+        *info = docinfo;
+        return 1;
+    } else if ((*info)->rev_meta.size == V1_META_LEN) {
+        // Metadata doesn't have conflict_resolution_mode,
+        // provision space for this flag.
+        DocInfo *docinfo = (DocInfo *) calloc (sizeof(DocInfo) +
+                                               (*info)->id.size +
+                                               (*info)->rev_meta.size +
+                                               sizeof(uint8_t),
+                                               sizeof(uint8_t));
+        if (!docinfo) {
+            LOG(EXTENSION_LOG_WARNING, "Failed to allocate docInfo, "
+                    "while editing docinfo in the compaction's docinfo_hook");
+            return 0;
+        }
+
+        char *extra = (char *)docinfo + sizeof(DocInfo);
+        memcpy(extra, (*info)->id.buf, (*info)->id.size);
+        docinfo->id.buf = extra;
+        docinfo->id.size = (*info)->id.size;
+
+        extra += (*info)->id.size;
+        memcpy(extra, (*info)->rev_meta.buf, (*info)->rev_meta.size);
+        uint8_t conflict_resolution_mode = revision_seqno;
+        memcpy(extra + (*info)->rev_meta.size,
+               &conflict_resolution_mode, sizeof(uint8_t));
+        docinfo->rev_meta.buf = extra;
+        docinfo->rev_meta.size = (*info)->rev_meta.size +
+                                 sizeof(uint8_t);
 
         docinfo->db_seq = (*info)->db_seq;
         docinfo->rev_seq = (*info)->rev_seq;
@@ -748,10 +776,16 @@ static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
         }
     }
 
+    if (ctx->bfcb) {
+        (ctx->bfcb)->addKeyToFilter((const char *)info->id.buf,
+                                    info->id.size,
+                                    info->deleted);
+    }
+
     return COUCHSTORE_COMPACT_KEEP_ITEM;
 }
 
-void CouchKVStore::compactVBucket(const uint16_t vbid,
+bool CouchKVStore::compactVBucket(const uint16_t vbid,
                                   compaction_ctx *hook_ctx,
                                   Callback<compaction_ctx> &cb,
                                   Callback<kvstats_ctx> &kvcb) {
@@ -766,7 +800,6 @@ void CouchKVStore::compactVBucket(const uint16_t vbid,
     uint64_t                   new_rev = fileRev + 1;
     couchstore_error_t         errCode = COUCHSTORE_SUCCESS;
     hrtime_t                     start = gethrtime();
-    uint64_t              newHeaderPos = 0;
     std::string                 dbfile;
     std::string           compact_file;
     std::string               new_file;
@@ -780,7 +813,7 @@ void CouchKVStore::compactVBucket(const uint16_t vbid,
         LOG(EXTENSION_LOG_WARNING,
                 "Warning: failed to open database, vbucketId = %d "
                 "fileRev = %llu", vbid, fileRev);
-        return;
+        return false;
     }
 
     // Build the temporary vbucket.compact file name
@@ -798,7 +831,7 @@ void CouchKVStore::compactVBucket(const uint16_t vbid,
             couchstore_strerror(errCode),
             couchkvstore_strerrno(compactdb, errCode).c_str());
         closeDatabaseHandle(compactdb);
-        return;
+        return false;
     }
 
     // Close the source Database File once compaction is done
@@ -810,10 +843,10 @@ void CouchKVStore::compactVBucket(const uint16_t vbid,
         LOG(EXTENSION_LOG_WARNING,
             "Warning: failed to rename '%s' to '%s': %s",
             compact_file.c_str(), new_file.c_str(),
-            getSystemStrerror().c_str());
+            getStringErrno().c_str());
 
         removeCompactFile(compact_file);
-        return;
+        return false;
     }
 
     // Open the newly compacted VBucket database file ...
@@ -824,11 +857,11 @@ void CouchKVStore::compactVBucket(const uint16_t vbid,
                 "Warning: failed to open compacted database file %s "
                 "fileRev = %llu", new_file.c_str(), new_rev);
         if (remove(new_file.c_str()) != 0) {
-            LOG(EXTENSION_LOG_WARNING, NULL,
+            LOG(EXTENSION_LOG_WARNING,
                 "Warning: Failed to remove '%s': %s",
-                new_file.c_str(), getSystemStrerror().c_str());
+                new_file.c_str(), getStringErrno().c_str());
         }
-        return;
+        return false;
     }
 
     // Update the global VBucket file map so all operations use the new file
@@ -855,7 +888,6 @@ void CouchKVStore::compactVBucket(const uint16_t vbid,
     }
 
     // Notify MCCouch that compaction is Done...
-    newHeaderPos = couchstore_get_header_position(targetDb);
     closeDatabaseHandle(targetDb);
 
     if (hook_ctx->expiredItems.size()) {
@@ -866,6 +898,8 @@ void CouchKVStore::compactVBucket(const uint16_t vbid,
     unlinkCouchFile(vbid, fileRev);
 
     st.compactHisto.add((gethrtime() - start) / 1000);
+
+    return true;
 }
 
 bool CouchKVStore::snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
@@ -964,22 +998,23 @@ bool CouchKVStore::snapshotStats(const std::map<std::string,
 }
 
 bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
-                                   Callback<kvstats_ctx> *kvcb) {
+                                   Callback<kvstats_ctx> *kvcb, bool reset) {
     Db *db = NULL;
     uint64_t fileRev, newFileRev;
-    std::stringstream id;
+    std::stringstream id, rev;
     std::string dbFileName;
     std::map<uint16_t, uint64_t>::iterator mapItr;
     kvstats_ctx kvctx;
     kvctx.vbucket = vbucketId;
 
     id << vbucketId;
-    dbFileName = dbname + "/" + id.str() + ".couch." + id.str();
     fileRev = dbFileRevMap[vbucketId];
+    rev << fileRev;
+    dbFileName = dbname + "/" + id.str() + ".couch." + rev.str();
 
     couchstore_error_t errorCode;
     errorCode = openDB(vbucketId, fileRev, &db,
-            (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE, &newFileRev);
+            (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE, &newFileRev, reset);
     if (errorCode != COUCHSTORE_SUCCESS) {
         ++st.numVbSetFailure;
         LOG(EXTENSION_LOG_WARNING,
@@ -989,12 +1024,16 @@ bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
     }
 
     fileRev = newFileRev;
+    rev << fileRev;
+    dbFileName = dbname + "/" + id.str() + ".couch." + rev.str();
 
     vbucket_state *state = cachedVBStates[vbucketId];
     vbstate.highSeqno = state->highSeqno;
     vbstate.lastSnapStart = state->lastSnapStart;
     vbstate.lastSnapEnd = state->lastSnapEnd;
     vbstate.maxDeletedSeqno = state->maxDeletedSeqno;
+    vbstate.maxCas = state->maxCas;
+    vbstate.driftCounter = state->driftCounter;
 
     errorCode = saveVBState(db, vbstate);
     if (errorCode != COUCHSTORE_SUCCESS) {
@@ -1028,56 +1067,18 @@ bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
     return true;
 }
 
-void CouchKVStore::dump(std::vector<uint16_t> &vbids,
-                        shared_ptr<Callback<GetValue> > cb,
-                        shared_ptr<Callback<CacheLookup> > cl) {
-
-    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
-    std::vector<uint16_t>::iterator itr = vbids.begin();
-    for (; itr != vbids.end(); ++itr) {
-        loadDB(cb, cl, sr, false, *itr, 0, COUCHSTORE_NO_DELETES);
-    }
-}
-
-void CouchKVStore::dump(uint16_t vb, uint64_t stSeqno,
-                        shared_ptr<Callback<GetValue> > cb,
-                        shared_ptr<Callback<CacheLookup> > cl,
-                        shared_ptr<Callback<SeqnoRange> > sr) {
-
-    loadDB(cb, cl, sr, false, vb, stSeqno);
-}
-
-void CouchKVStore::dumpKeys(std::vector<uint16_t> &vbids,
-                            shared_ptr<Callback<GetValue> > cb) {
-
-    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
-    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
-    std::vector<uint16_t>::iterator itr = vbids.begin();
-    for (; itr != vbids.end(); ++itr) {
-        loadDB(cb, cl, sr, true, *itr, 0, COUCHSTORE_NO_DELETES);
-    }
-}
-
-void CouchKVStore::dumpDeleted(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno,
-                               shared_ptr<Callback<GetValue> > cb) {
-
-    std::vector<uint16_t> vbids;
-    vbids.push_back(vb);
-    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
-    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
-    loadDB(cb, cl, sr, true, vb, stSeqno, COUCHSTORE_DELETES_ONLY);
-}
-
 StorageProperties CouchKVStore::getStorageProperties() {
     StorageProperties rv(true, true, true, true);
     return rv;
 }
 
 bool CouchKVStore::commit(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno,
-                          uint64_t snapEndSeqno) {
+                          uint64_t snapEndSeqno, uint64_t maxCas,
+                          uint64_t driftCounter) {
     cb_assert(!isReadOnly());
     if (intransaction) {
-        if (commit2couchstore(cb, snapStartSeqno, snapEndSeqno)) {
+        if (commit2couchstore(cb, snapStartSeqno, snapEndSeqno,
+                              maxCas, driftCounter)) {
             intransaction = false;
         }
     }
@@ -1116,6 +1117,12 @@ void CouchKVStore::addStats(const std::string &prefix,
         addStat(prefix_str, "failure_vbset", st.numVbSetFailure, add_stat, c);
         addStat(prefix_str, "lastCommDocs",  st.docsCommitted,   add_stat, c);
     }
+
+    addStat(prefix_str, "io_num_read", st.io_num_read, add_stat, c);
+    addStat(prefix_str, "io_num_write", st.io_num_write, add_stat, c);
+    addStat(prefix_str, "io_read_bytes", st.io_read_bytes, add_stat, c);
+    addStat(prefix_str, "io_write_bytes", st.io_write_bytes, add_stat, c);
+
 }
 
 void CouchKVStore::addTimingStats(const std::string &prefix,
@@ -1185,71 +1192,110 @@ void CouchKVStore::pendingTasks() {
     }
 }
 
-void CouchKVStore::loadDB(shared_ptr<Callback<GetValue> > cb,
-                          shared_ptr<Callback<CacheLookup> > cl,
-                          shared_ptr<Callback<SeqnoRange> > sr,
-                          bool keysOnly, uint16_t vbid,
-                          uint64_t startSeqno,
-                          couchstore_docinfos_options options) {
+ScanContext* CouchKVStore::initScanContext(shared_ptr<Callback<GetValue> > cb,
+                                           shared_ptr<Callback<CacheLookup> > cl,
+                                           uint16_t vbid, uint64_t startSeqno,
+                                           bool keysOnly, bool noDeletes,
+                                           bool deletesOnly) {
     Db *db = NULL;
     uint64_t rev = dbFileRevMap[vbid];
     couchstore_error_t errorCode = openDB(vbid, rev, &db,
                                           COUCHSTORE_OPEN_FLAG_RDONLY);
     if (errorCode != COUCHSTORE_SUCCESS) {
-        LOG(EXTENSION_LOG_WARNING,
-            "Failed to open database, name=%s/%d.couch.%lu",
-            dbname.c_str(), vbid, rev);
+        LOG(EXTENSION_LOG_WARNING, "Failed to open database, "
+            "name=%s/%d.couch.%lu", dbname.c_str(), vbid, rev);
         remVBucketFromDbFileMap(vbid);
+        return NULL;
+    }
+
+    DbInfo info;
+    errorCode = couchstore_db_info(db, &info);
+    if (errorCode != COUCHSTORE_SUCCESS) {
+        LOG(EXTENSION_LOG_WARNING, "Failed to read DB info for backfill");
+        closeDatabaseHandle(db);
+        abort();
+    }
+
+    size_t backfillId = backfillCounter++;
+
+    LockHolder lh(backfillLock);
+    backfills[backfillId] = db;
+
+    return new ScanContext(cb, cl, vbid, backfillId, startSeqno,
+                           info.last_sequence, keysOnly, noDeletes,
+                           deletesOnly);
+}
+
+scan_error_t CouchKVStore::scan(ScanContext* ctx) {
+    if (!ctx) {
+        return scan_failed;
+    }
+
+    if (ctx->lastReadSeqno == ctx->maxSeqno) {
+        return scan_success;
+    }
+
+    LockHolder lh(backfillLock);
+    std::map<size_t, Db*>::iterator itr = backfills.find(ctx->scanId);
+    if (itr == backfills.end()) {
+        return scan_failed;
+    }
+
+    Db* db = itr->second;
+    lh.unlock();
+
+    couchstore_docinfos_options options;
+    if (ctx->noDeletes) {
+        options = COUCHSTORE_NO_DELETES;
+    } else if (ctx->onlyDeletes) {
+        options = COUCHSTORE_DELETES_ONLY;
     } else {
-        DbInfo info;
-        errorCode = couchstore_db_info(db, &info);
-        if (errorCode != COUCHSTORE_SUCCESS) {
-            LOG(EXTENSION_LOG_WARNING, "Failed to read DB info for backfill");
-            closeDatabaseHandle(db);
-            abort();
-        }
-        SeqnoRange range(startSeqno, info.last_sequence);
-        sr->callback(range);
-
-        LoadResponseCtx ctx;
-        ctx.vbucketId = vbid;
-        ctx.keysonly = keysOnly;
-        ctx.callback = cb;
-        ctx.lookup = cl;
-        ctx.stats = &epStats;
-        errorCode = couchstore_changes_since(db, startSeqno, options,
-                                             recordDbDumpC,
-                                             static_cast<void *>(&ctx));
-        if (errorCode != COUCHSTORE_SUCCESS) {
-            if (errorCode == COUCHSTORE_ERROR_CANCEL) {
-                LOG(EXTENSION_LOG_WARNING,
-                    "Canceling loading database, warmup has completed\n");
-            } else {
-                LOG(EXTENSION_LOG_WARNING,
-                    "couchstore_changes_since failed, error=%s [%s]",
-                    couchstore_strerror(errorCode),
-                    couchkvstore_strerrno(db, errorCode).c_str());
-                remVBucketFromDbFileMap(vbid);
-            }
+        options = COUCHSTORE_NO_OPTIONS;
+    }
+
+    uint64_t start = ctx->startSeqno;
+    if (ctx->lastReadSeqno != 0) {
+        start = ctx->lastReadSeqno + 1;
+    }
+
+    couchstore_error_t errorCode;
+    errorCode = couchstore_changes_since(db, start, options, recordDbDumpC,
+                                         static_cast<void*>(ctx));
+    if (errorCode != COUCHSTORE_SUCCESS) {
+        if (errorCode == COUCHSTORE_ERROR_CANCEL) {
+            return scan_again;
+        } else {
+            LOG(EXTENSION_LOG_WARNING,
+                "couchstore_changes_since failed, error=%s [%s]",
+                couchstore_strerror(errorCode),
+                couchkvstore_strerrno(db, errorCode).c_str());
+            remVBucketFromDbFileMap(ctx->vbid);
+            return scan_failed;
         }
-        closeDatabaseHandle(db);
     }
+    return scan_success;
+}
+
+void CouchKVStore::destroyScanContext(ScanContext* ctx) {
+    if (!ctx) {
+        return;
+    }
+
+    LockHolder lh(backfillLock);
+    std::map<size_t, Db*>::iterator itr = backfills.find(ctx->scanId);
+    if (itr != backfills.end()) {
+        closeDatabaseHandle(itr->second);
+        backfills.erase(itr);
+    }
+    delete ctx;
 }
 
 void CouchKVStore::open() {
     // TODO intransaction, is it needed?
     intransaction = false;
 
-    struct stat dbstat;
-    bool havedir = false;
-
-    if (stat(dbname.c_str(), &dbstat) == 0 &&
-                            (dbstat.st_mode & S_IFDIR) == S_IFDIR) {
-        havedir = true;
-    }
-
-    if (!havedir) {
-        if (mkdir(dbname.c_str(), S_IRWXU) == -1) {
+    if (mkdir(dbname.c_str(), S_IRWXU) == -1) {
+        if (errno != EEXIST) {
             std::stringstream ss;
             ss << "Warning: Failed to create data directory ["
                << dbname << "]: " << strerror(errno);
@@ -1297,7 +1343,7 @@ uint64_t CouchKVStore::checkNewRevNum(std::string &dbFileName, bool newFile) {
 
 void CouchKVStore::updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev) {
     if (vbucketId >= numDbFiles) {
-        LOG(EXTENSION_LOG_WARNING, NULL,
+        LOG(EXTENSION_LOG_WARNING,
             "Warning: cannot update db file map for an invalid vbucket, "
             "vbucket id = %d, rev = %lld\n", vbucketId, newFileRev);
         return;
@@ -1310,38 +1356,58 @@ couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
                                         uint64_t fileRev,
                                         Db **db,
                                         uint64_t options,
-                                        uint64_t *newFileRev) {
+                                        uint64_t *newFileRev,
+                                        bool reset) {
     std::string dbFileName = getDBFileName(dbname, vbucketId, fileRev);
     couch_file_ops* ops = &statCollectingFileOps;
 
     uint64_t newRevNum = fileRev;
     couchstore_error_t errorCode = COUCHSTORE_SUCCESS;
 
-    if (options == COUCHSTORE_OPEN_FLAG_CREATE) {
-        // first try to open the requested file without the create option
-        // in case it does already exist
-        errorCode = couchstore_open_db_ex(dbFileName.c_str(), 0, ops, db);
-        if (errorCode != COUCHSTORE_SUCCESS) {
-            // open_db failed but still check if the file exists
-            newRevNum = checkNewRevNum(dbFileName);
-            bool fileExists = (newRevNum) ? true : false;
-            if (fileExists) {
-                errorCode = openDB_retry(dbFileName, 0, ops, db, &newRevNum);
-            } else {
-                // requested file doesn't seem to exist, just create one
-                errorCode = couchstore_open_db_ex(dbFileName.c_str(), options,
-                                                  ops, db);
-                if (errorCode == COUCHSTORE_SUCCESS) {
-                    newRevNum = 1;
-                    updateDbFileMap(vbucketId, fileRev);
-                    LOG(EXTENSION_LOG_INFO,
-                        "INFO: created new couch db file, name=%s rev=%llu",
-                        dbFileName.c_str(), fileRev);
+    if (reset) {
+        errorCode = couchstore_open_db_ex(dbFileName.c_str(), options,
+                                          ops, db);
+        if (errorCode == COUCHSTORE_SUCCESS) {
+            newRevNum = 1;
+            updateDbFileMap(vbucketId, fileRev);
+            LOG(EXTENSION_LOG_INFO,
+                "reset: created new couchstore file, name=%s rev=%llu",
+                dbFileName.c_str(), fileRev);
+        } else {
+            LOG(EXTENSION_LOG_WARNING,
+                "reset: creating a new couchstore file,"
+                "name=%s rev=%llu failed with error=%s", dbFileName.c_str(),
+                fileRev, couchstore_strerror(errorCode));
+        }
+    } else {
+        if (options == COUCHSTORE_OPEN_FLAG_CREATE) {
+            // first try to open the requested file without the
+            // create option in case it does already exist
+            errorCode = couchstore_open_db_ex(dbFileName.c_str(), 0, ops, db);
+            if (errorCode != COUCHSTORE_SUCCESS) {
+                // open_db failed but still check if the file exists
+                newRevNum = checkNewRevNum(dbFileName);
+                bool fileExists = (newRevNum) ? true : false;
+                if (fileExists) {
+                    errorCode = openDB_retry(dbFileName, 0, ops, db,
+                                             &newRevNum);
+                } else {
+                    // requested file doesn't seem to exist, just create one
+                    errorCode = couchstore_open_db_ex(dbFileName.c_str(),
+                                                      options, ops, db);
+                    if (errorCode == COUCHSTORE_SUCCESS) {
+                        newRevNum = 1;
+                        updateDbFileMap(vbucketId, fileRev);
+                        LOG(EXTENSION_LOG_INFO,
+                            "INFO: created new couch db file, name=%s rev=%llu",
+                            dbFileName.c_str(), fileRev);
+                    }
                 }
             }
+        } else {
+            errorCode = openDB_retry(dbFileName, options, ops, db,
+                                     &newRevNum);
         }
-    } else {
-        errorCode = openDB_retry(dbFileName, options, ops, db, &newRevNum);
     }
 
     /* update command statistics */
@@ -1349,10 +1415,11 @@ couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
     if (errorCode) {
         st.numOpenFailure++;
         LOG(EXTENSION_LOG_WARNING, "Warning: couchstore_open_db failed, name=%s"
-            " option=%X rev=%llu error=%s [%s]\n", dbFileName.c_str(), options,
+            " option=%" PRIX64 " rev=%llu error=%s [%s]\n",
+            dbFileName.c_str(), options,
             ((newRevNum > fileRev) ? newRevNum : fileRev),
             couchstore_strerror(errorCode),
-            getSystemStrerror().c_str());
+            getStringErrno().c_str());
     } else {
         if (newRevNum > fileRev) {
             // new revision number found, update it
@@ -1379,9 +1446,9 @@ couchstore_error_t CouchKVStore::openDB_retry(std::string &dbfile,
             return errCode;
         }
         LOG(EXTENSION_LOG_INFO, "INFO: couchstore_open_db failed, name=%s "
-            "options=%X error=%s [%s], try it again!",
+            "options=%" PRIX64 " error=%s [%s], try it again!",
             dbfile.c_str(), options, couchstore_strerror(errCode),
-            getSystemStrerror().c_str());
+            getStringErrno().c_str());
         *newFileRev = checkNewRevNum(dbfile);
         ++retry;
         if (retry == MAX_OPEN_DB_RETRY - 1 && options == 0 &&
@@ -1436,7 +1503,7 @@ void CouchKVStore::populateFileNameMap(std::vector<std::string> &filenames,
                     } else {
                         LOG(EXTENSION_LOG_WARNING,
                             "Warning: Failed to remove the stale file '%s': %s",
-                            old_file.str().c_str(), getSystemStrerror().c_str());
+                            old_file.str().c_str(), getStringErrno().c_str());
                     }
                 } else {
                     LOG(EXTENSION_LOG_WARNING,
@@ -1459,42 +1526,49 @@ couchstore_error_t CouchKVStore::fetchDoc(Db *db, DocInfo *docinfo,
                                           bool metaOnly, bool fetchDelete) {
     couchstore_error_t errCode = COUCHSTORE_SUCCESS;
     sized_buf metadata = docinfo->rev_meta;
-    uint32_t itemFlags;
-    uint64_t cas;
-    time_t exptime;
+    uint32_t itemFlags = 0;
+    uint64_t cas = 0;
+    time_t exptime = 0;
     uint8_t ext_meta[EXT_META_LEN];
-    uint8_t ext_len;
+    uint8_t ext_len = 0;
+    uint8_t conf_res_mode = 0;
 
     cb_assert(metadata.size >= DEFAULT_META_LEN);
-    if (metadata.size == DEFAULT_META_LEN) {
+    if (metadata.size >= DEFAULT_META_LEN) {
         memcpy(&cas, (metadata.buf), 8);
         memcpy(&exptime, (metadata.buf) + 8, 4);
         memcpy(&itemFlags, (metadata.buf) + 12, 4);
         ext_len = 0;
-    } else {
-        //metadata.size => 18, FLEX_META_CODE at offset 16
-        memcpy(&cas, (metadata.buf), 8);
-        memcpy(&exptime, (metadata.buf) + 8, 4);
-        memcpy(&itemFlags, (metadata.buf) + 12, 4);
-        ext_len = metadata.size - DEFAULT_META_LEN - FLEX_DATA_OFFSET;
+    }
+
+    if (metadata.size >= V1_META_LEN) {
         memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
-               ext_len);
+               EXT_META_LEN);
+        ext_len = EXT_META_LEN;
     }
+
+    if (metadata.size == V2_META_LEN) {
+        memcpy(&conf_res_mode, metadata.buf + V1_META_LEN, CONFLICT_RES_META_LEN);
+    }
+
     cas = ntohll(cas);
     exptime = ntohl(exptime);
 
     if (metaOnly || (fetchDelete && docinfo->deleted)) {
         Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
-                            docinfo->size, itemFlags, (time_t)exptime,
+                            itemFlags, (time_t)exptime, NULL, docinfo->size,
                             ext_meta, ext_len, cas, docinfo->db_seq, vbId);
         if (docinfo->deleted) {
             it->setDeleted();
         }
+
+        it->setConflictResMode(
+                static_cast<enum conflict_resolution_mode>(conf_res_mode));
         it->setRevSeqno(docinfo->rev_seq);
         docValue = GetValue(it);
         // update ep-engine IO stats
-        ++epStats.io_num_read;
-        epStats.io_read_bytes.fetch_add(docinfo->id.size);
+        ++st.io_num_read;
+        st.io_read_bytes.fetch_add(docinfo->id.size);
     } else {
         Doc *doc = NULL;
         errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
@@ -1524,11 +1598,15 @@ couchstore_error_t CouchKVStore::fetchDoc(Db *db, DocInfo *docinfo,
                                     itemFlags, (time_t)exptime, valuePtr, valuelen,
                                     ext_meta, ext_len, cas, docinfo->db_seq, vbId,
                                     docinfo->rev_seq);
+
+                it->setConflictResMode(
+                           static_cast<enum conflict_resolution_mode>(conf_res_mode));
+
                 docValue = GetValue(it);
 
                 // update ep-engine IO stats
-                ++epStats.io_num_read;
-                epStats.io_read_bytes.fetch_add(docinfo->id.size + valuelen);
+                ++st.io_num_read;
+                st.io_read_bytes.fetch_add(docinfo->id.size + valuelen);
             }
             couchstore_free_document(doc);
         }
@@ -1538,22 +1616,23 @@ couchstore_error_t CouchKVStore::fetchDoc(Db *db, DocInfo *docinfo,
 
 int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
 
-    LoadResponseCtx *loadCtx = (LoadResponseCtx *)ctx;
-    shared_ptr<Callback<GetValue> > cb = loadCtx->callback;
-    shared_ptr<Callback<CacheLookup> > cl = loadCtx->lookup;
+    ScanContext* sctx = static_cast<ScanContext*>(ctx);
+    shared_ptr<Callback<GetValue> > cb = sctx->callback;
+    shared_ptr<Callback<CacheLookup> > cl = sctx->lookup;
 
     Doc *doc = NULL;
     void *valuePtr = NULL;
     size_t valuelen = 0;
     uint64_t byseqno = docinfo->db_seq;
     sized_buf  metadata = docinfo->rev_meta;
-    uint16_t vbucketId = loadCtx->vbucketId;
+    uint16_t vbucketId = sctx->vbid;
     sized_buf key = docinfo->id;
-    uint32_t itemflags;
-    uint64_t cas;
-    uint32_t exptime;
+    uint32_t itemflags = 0;
+    uint64_t cas = 0;
+    uint32_t exptime = 0;
     uint8_t ext_meta[EXT_META_LEN];
-    uint8_t ext_len;
+    uint8_t ext_len = 0;
+    uint8_t conf_res_mode = 0;
 
     cb_assert(key.size <= UINT16_MAX);
     cb_assert(metadata.size >= DEFAULT_META_LEN);
@@ -1562,27 +1641,33 @@ int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
     CacheLookup lookup(docKey, byseqno, vbucketId);
     cl->callback(lookup);
     if (cl->getStatus() == ENGINE_KEY_EEXISTS) {
+        sctx->lastReadSeqno = byseqno;
         return COUCHSTORE_SUCCESS;
+    } else if (cl->getStatus() == ENGINE_ENOMEM) {
+        return COUCHSTORE_ERROR_CANCEL;
     }
 
-    if (metadata.size == DEFAULT_META_LEN) {
+    if (metadata.size >= DEFAULT_META_LEN) {
         memcpy(&cas, (metadata.buf), 8);
         memcpy(&exptime, (metadata.buf) + 8, 4);
         memcpy(&itemflags, (metadata.buf) + 12, 4);
         ext_len = 0;
-    } else {
-        //metadata.size > 16, FLEX_META_CODE at offset 16
-        memcpy(&cas, (metadata.buf), 8);
-        memcpy(&exptime, (metadata.buf) + 8, 4);
-        memcpy(&itemflags, (metadata.buf) + 12, 4);
-        ext_len = metadata.size - DEFAULT_META_LEN - FLEX_DATA_OFFSET;
+    }
+
+    if (metadata.size >= V1_META_LEN) {
         memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
-               ext_len);
+               EXT_META_LEN);
+        ext_len = EXT_META_LEN;
+    }
+
+    if (metadata.size == V2_META_LEN) {
+        memcpy(&conf_res_mode, metadata.buf + V1_META_LEN, CONFLICT_RES_META_LEN);
     }
+
     exptime = ntohl(exptime);
     cas = ntohll(cas);
 
-    if (!loadCtx->keysonly && !docinfo->deleted) {
+    if (!sctx->onlyKeys && !docinfo->deleted) {
         couchstore_error_t errCode ;
         errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
                                                    DECOMPRESS_DOC_BODIES);
@@ -1627,8 +1712,10 @@ int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
         it->setDeleted();
     }
 
+    it->setConflictResMode(
+                 static_cast<enum conflict_resolution_mode>(conf_res_mode));
 
-    GetValue rv(it, ENGINE_SUCCESS, -1, loadCtx->keysonly);
+    GetValue rv(it, ENGINE_SUCCESS, -1, sctx->onlyKeys);
     cb->callback(rv);
 
     couchstore_free_document(doc);
@@ -1636,12 +1723,15 @@ int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
     if (cb->getStatus() == ENGINE_ENOMEM) {
         return COUCHSTORE_ERROR_CANCEL;
     }
+
+    sctx->lastReadSeqno = byseqno;
     return COUCHSTORE_SUCCESS;
 }
 
 bool CouchKVStore::commit2couchstore(Callback<kvstats_ctx> *cb,
                                      uint64_t snapStartSeqno,
-                                     uint64_t snapEndSeqno) {
+                                     uint64_t snapEndSeqno,
+                                     uint64_t maxCas, uint64_t driftCounter) {
     bool success = true;
 
     size_t pendingCommitCnt = pendingReqsQ.size();
@@ -1668,12 +1758,12 @@ bool CouchKVStore::commit2couchstore(Callback<kvstats_ctx> *cb,
     // flush all
     couchstore_error_t errCode = saveDocs(vbucket2flush, fileRev, docs,
                                           docinfos, pendingCommitCnt, kvctx,
-                                          snapStartSeqno, snapEndSeqno);
+                                          snapStartSeqno, snapEndSeqno, maxCas,
+                                          driftCounter);
     if (errCode) {
         LOG(EXTENSION_LOG_WARNING,
             "Warning: commit failed, cannot save CouchDB docs "
             "for vbucket = %d rev = %llu\n", vbucket2flush, fileRev);
-        ++epStats.commitFailed;
     }
     if (cb) {
         cb->callback(kvctx);
@@ -1711,7 +1801,8 @@ couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev,
                                           Doc **docs, DocInfo **docinfos,
                                           size_t docCount, kvstats_ctx &kvctx,
                                           uint64_t snapStartSeqno,
-                                          uint64_t snapEndSeqno) {
+                                          uint64_t snapEndSeqno,
+                                          uint64_t maxCas, uint64_t driftCounter) {
     couchstore_error_t errCode;
     uint64_t fileRev = rev;
     DbInfo info;
@@ -1766,6 +1857,11 @@ couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev,
 
         state->lastSnapStart = snapStartSeqno;
         state->lastSnapEnd = snapEndSeqno;
+
+        if (maxCas > state->maxCas) {
+            state->maxCas = maxCas;
+        }
+        state->driftCounter = driftCounter;
         errCode = saveVBState(db, *state);
         if (errCode != COUCHSTORE_SUCCESS) {
             LOG(EXTENSION_LOG_WARNING, "Warning: failed to save local docs to "
@@ -1816,7 +1912,7 @@ couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev,
 
 void CouchKVStore::remVBucketFromDbFileMap(uint16_t vbucketId) {
     if (vbucketId >= numDbFiles) {
-        LOG(EXTENSION_LOG_WARNING, NULL,
+        LOG(EXTENSION_LOG_WARNING,
             "Warning: cannot remove db file map entry for an invalid vbucket, "
             "vbucket id = %d\n", vbucketId);
         return;
@@ -1835,8 +1931,8 @@ void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
         size_t dataSize = committedReqs[index]->getNBytes();
         size_t keySize = committedReqs[index]->getKey().length();
         /* update ep stats */
-        ++epStats.io_num_write;
-        epStats.io_write_bytes.fetch_add(keySize + dataSize);
+        ++st.io_num_write;
+        st.io_write_bytes.fetch_add(keySize + dataSize);
 
         if (committedReqs[index]->isDelete()) {
             int rv = getMutationStatus(errCode);
@@ -1879,10 +1975,12 @@ void CouchKVStore::readVBState(Db *db, uint16_t vbId) {
     uint64_t checkpointId = 0;
     uint64_t maxDeletedSeqno = 0;
     int64_t highSeqno = 0;
-    std::string failovers("[{\"id\":0,\"seq\":0}]");
+    std::string failovers;
     uint64_t purgeSeqno = 0;
     uint64_t lastSnapStart = 0;
     uint64_t lastSnapEnd = 0;
+    uint64_t maxCas = 0;
+    int64_t driftCounter = INITIAL_DRIFT;
 
     DbInfo info;
     errCode = couchstore_db_info(db, &info);
@@ -1925,6 +2023,10 @@ void CouchKVStore::readVBState(Db *db, uint16_t vbId) {
                                 cJSON_GetObjectItem(jsonObj, "snap_start"));
         const std::string snapEnd = getJSONObjString(
                                 cJSON_GetObjectItem(jsonObj, "snap_end"));
+        const std::string maxCasValue = getJSONObjString(
+                                cJSON_GetObjectItem(jsonObj, "max_cas"));
+        const std::string driftCount = getJSONObjString(
+                                cJSON_GetObjectItem(jsonObj, "drift_counter"));
         cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
         if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
                 || max_deleted_seqno.compare("") == 0) {
@@ -1948,6 +2050,25 @@ void CouchKVStore::readVBState(Db *db, uint16_t vbId) {
                 parseUint64(snapEnd.c_str(), &lastSnapEnd);
             }
 
+            if (maxCasValue.compare("") != 0) {
+                parseUint64(maxCasValue.c_str(), &maxCas);
+
+                // MB-17517: If the maxCas on disk was invalid then don't use it -
+                // instead rebuild from the items we load from disk (i.e. as per
+                // an upgrade from an earlier version).
+                if (maxCas == static_cast<uint64_t>(-1)) {
+                    LOG(EXTENSION_LOG_WARNING,
+                        "Invalid max_cas (0x%" PRIx64 ") read from '%s' for "
+                        "vbucket %" PRIu16 ". Resetting max_cas to zero.",
+                        maxCas, id.buf, vbId);
+                    maxCas = 0;
+                }
+            }
+
+            if (driftCount.compare("") != 0) {
+                parseInt64(driftCount.c_str(), &driftCounter);
+            }
+
             if (failover_json) {
                 char* json = cJSON_PrintUnformatted(failover_json);
                 failovers.assign(json);
@@ -1960,9 +2081,10 @@ void CouchKVStore::readVBState(Db *db, uint16_t vbId) {
 
     delete cachedVBStates[vbId];
     cachedVBStates[vbId] = new vbucket_state(state, checkpointId,
-                                               maxDeletedSeqno, highSeqno,
-                                               purgeSeqno, lastSnapStart,
-                                               lastSnapEnd, failovers);
+                                             maxDeletedSeqno, highSeqno,
+                                             purgeSeqno, lastSnapStart,
+                                             lastSnapEnd, maxCas, driftCounter,
+                                             failovers);
 }
 
 couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState) {
@@ -1974,6 +2096,8 @@ couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState) {
               << ",\"failover_table\": " << vbState.failovers
               << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
               << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
+              << ",\"max_cas\": \"" << vbState.maxCas << "\""
+              << ",\"drift_counter\": \"" << vbState.driftCounter << "\""
               << "}";
 
     LocalDoc lDoc;
@@ -2024,6 +2148,7 @@ int CouchKVStore::getMultiCb(Db *db, DocInfo *docinfo, void *ctx) {
     }
 
     GetValue returnVal;
+
     couchstore_error_t errCode = cbCtx->cks.fetchDoc(db, docinfo, returnVal,
                                                      cbCtx->vbId, meta_only);
     if (errCode != COUCHSTORE_SUCCESS && !meta_only) {
@@ -2071,20 +2196,11 @@ ENGINE_ERROR_CODE CouchKVStore::couchErr2EngineErr(couchstore_error_t errCode) {
     case COUCHSTORE_ERROR_NO_HEADER:
     default:
         // same as the general error return code of
-        // EvetuallyPersistentStore::getInternal
+        // EventuallyPersistentStore::getInternal
         return ENGINE_TMPFAIL;
     }
 }
 
-size_t CouchKVStore::getEstimatedItemCount(std::vector<uint16_t> &vbs) {
-    size_t items = 0;
-    std::vector<uint16_t>::iterator it;
-    for (it = vbs.begin(); it != vbs.end(); ++it) {
-        items += getNumItems(*it);
-    }
-    return items;
-}
-
 size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
     size_t delCount = cachedDeleteCount[vbid];
     if (delCount != (size_t) -1) {
@@ -2117,14 +2233,13 @@ size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
 
 }
 
-size_t CouchKVStore::getNumItems(uint16_t vbid) {
-    size_t docCount = cachedDocCount[vbid];
-    if ( docCount != (size_t) -1) {
-        return docCount;
-    }
+DBFileInfo CouchKVStore::getDbFileInfo(uint16_t vbid) {
 
     Db *db = NULL;
     uint64_t rev = dbFileRevMap[vbid];
+
+    DBFileInfo vbinfo;
+
     couchstore_error_t errCode = openDB(vbid, rev, &db,
                                         COUCHSTORE_OPEN_FLAG_RDONLY);
     if (errCode == COUCHSTORE_SUCCESS) {
@@ -2132,8 +2247,9 @@ size_t CouchKVStore::getNumItems(uint16_t vbid) {
         errCode = couchstore_db_info(db, &info);
         if (errCode == COUCHSTORE_SUCCESS) {
             cachedDocCount[vbid] = info.doc_count;
-            closeDatabaseHandle(db);
-            return info.doc_count;
+            vbinfo.itemCount = info.doc_count;
+            vbinfo.fileSize = info.file_size;
+            vbinfo.spaceUsed = info.space_used;
         } else {
             LOG(EXTENSION_LOG_WARNING,
                 "Warning: failed to read database info for "
@@ -2145,7 +2261,7 @@ size_t CouchKVStore::getNumItems(uint16_t vbid) {
             "Warning: failed to open database file for "
             "vBucket = %d rev = %llu\n", vbid, rev);
     }
-    return 0;
+    return vbinfo;
 }
 
 size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
@@ -2265,27 +2381,14 @@ RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
     }
 
     cb->setDbHeader(newdb);
+
     shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
-    LoadResponseCtx ctx;
-    ctx.vbucketId = vbid;
-    ctx.keysonly = true;
-    ctx.lookup = cl;
-    ctx.callback = cb;
-    ctx.stats = &epStats;
-    errCode = couchstore_changes_since(db, info.last_sequence + 1,
-                                       COUCHSTORE_NO_OPTIONS,
-                                       recordDbDumpC,
-                                       static_cast<void *>(&ctx));
-    if (errCode != COUCHSTORE_SUCCESS) {
-        if (errCode == COUCHSTORE_ERROR_CANCEL) {
-            LOG(EXTENSION_LOG_WARNING,
-                "Canceling loading database\n");
-        } else {
-            LOG(EXTENSION_LOG_WARNING,
-                "Couchstore_changes_since failed, error=%s [%s]",
-                couchstore_strerror(errCode),
-                couchkvstore_strerrno(db, errCode).c_str());
-        }
+    ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence + 1,
+                                       true, false, false);
+    scan_error_t error = scan(ctx);
+    destroyScanContext(ctx);
+
+    if (error != scan_success) {
         closeDatabaseHandle(db);
         closeDatabaseHandle(newdb);
         return RollbackResult(false, 0, 0, 0);
@@ -2361,7 +2464,7 @@ void CouchKVStore::unlinkCouchFile(uint16_t vbucket,
 
     int errCode;
     char fname[PATH_MAX];
-    snprintf(fname, sizeof(fname), "%s/%d.couch.%llu",
+    snprintf(fname, sizeof(fname), "%s/%d.couch.%" PRIu64,
              dbname.c_str(), vbucket, fRev);
 #ifdef _MSC_VER
     errCode = _unlink(fname);
@@ -2406,7 +2509,7 @@ void CouchKVStore::removeCompactFile(const std::string &filename) {
         else {
             LOG(EXTENSION_LOG_WARNING,
                 "Warning: Failed to remove compact file '%s': %s",
-                filename.c_str(), getSystemStrerror().c_str());
+                filename.c_str(), getStringErrno().c_str());
 
             if (errno != ENOENT) {
                 pendingFileDeletions.push(const_cast<std::string &>(filename));
index 349d839..14bbc87 100644 (file)
@@ -31,7 +31,6 @@
 #include "histo.h"
 #include "item.h"
 #include "kvstore.h"
-#include "stats.h"
 #include "tasks.h"
 #include "atomicqueue.h"
 
@@ -50,6 +49,7 @@ public:
       docsCommitted(0), numOpen(0), numClose(0),
       numLoadedVb(0), numGetFailure(0), numSetFailure(0),
       numDelFailure(0), numOpenFailure(0), numVbSetFailure(0),
+      io_num_read(0), io_num_write(0), io_read_bytes(0), io_write_bytes(0),
       readSizeHisto(ExponentialGenerator<size_t>(1, 2), 25),
       writeSizeHisto(ExponentialGenerator<size_t>(1, 2), 25) {
     }
@@ -93,6 +93,15 @@ public:
     AtomicValue<size_t> numOpenFailure;
     AtomicValue<size_t> numVbSetFailure;
 
+    //! Number of read related io operations
+    AtomicValue<size_t> io_num_read;
+    //! Number of write related io operations
+    AtomicValue<size_t> io_num_write;
+    //! Number of bytes read
+    AtomicValue<size_t> io_read_bytes;
+    //! Number of bytes written
+    AtomicValue<size_t> io_write_bytes;
+
     /* for flush and vb delete, no error handling in CouchKVStore, such
      * failure should be tracked in MC-engine  */
 
@@ -120,16 +129,18 @@ public:
 };
 
 class EventuallyPersistentEngine;
-class EPStats;
 
 typedef union {
     Callback <mutation_result> *setCb;
     Callback <int> *delCb;
 } CouchRequestCallback;
 
-// Additional 2 Bytes included: 1 for flex_meta_code and the other for datatype field
+const size_t CONFLICT_RES_META_LEN = 1;
+
+// Additional 3 Bytes for flex meta, datatype and conflict resolution mode
 const size_t COUCHSTORE_METADATA_SIZE(2 * sizeof(uint32_t) + sizeof(uint64_t) +
-                                      FLEX_DATA_OFFSET + EXT_META_LEN);
+                                      FLEX_DATA_OFFSET + EXT_META_LEN +
+                                      CONFLICT_RES_META_LEN);
 
 /**
  * Class representing a document to be persisted in couchstore.
@@ -147,6 +158,9 @@ public:
      */
     CouchRequest(const Item &it, uint64_t rev, CouchRequestCallback &cb, bool del);
 
+
+    virtual ~CouchRequest() {}
+
     /**
      * Get the vbucket id of a document to be persisted
      *
@@ -242,7 +256,7 @@ public:
         return key;
     }
 
-private :
+protected:
     value_t value;
     uint8_t meta[COUCHSTORE_METADATA_SIZE];
     uint16_t vbucketId;
@@ -269,7 +283,7 @@ public:
      * @param config    Configuration information
      * @param read_only flag indicating if this kvstore instance is for read-only operations
      */
-    CouchKVStore(EPStats &stats, Configuration &config, bool read_only = false);
+    CouchKVStore(Configuration &config, bool read_only = false);
 
     /**
      * Copy constructor
@@ -307,7 +321,8 @@ public:
      * @return true if the commit is completed successfully.
      */
     bool commit(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno,
-                uint64_t snapEndSeqno);
+                uint64_t snapEndSeqno, uint64_t maxCas,
+                uint64_t driftCounter);
 
     /**
      * Rollback a transaction (unless not currently in one).
@@ -338,14 +353,13 @@ public:
      * Retrieve the document with a given key from the underlying storage system.
      *
      * @param key the key of a document to be retrieved
-     * @param rowid the sequence number of a document
      * @param vb vbucket id of a document
      * @param cb callback instance for GET
      * @param fetchDelete True if we want to retrieve a deleted item if it not
      *        purged yet.
      */
-    void get(const std::string &key, uint64_t rowid,
-             uint16_t vb, Callback<GetValue> &cb, bool fetchDelete = false);
+    void get(const std::string &key, uint16_t vb, Callback<GetValue> &cb,
+             bool fetchDelete = false);
 
     void getWithHeader(void *dbHandle, const std::string &key,
                        uint16_t vb, Callback<GetValue> &cb,
@@ -416,52 +430,13 @@ public:
      * @param hook_ctx - details of vbucket which needs to be compacted
      * @param cb - callback to help process newly expired items
      * @param kvcb - callback to update kvstore stats
+     * @return true if successful
      */
-    void compactVBucket(const uint16_t vbid, compaction_ctx *cookie,
+    bool compactVBucket(const uint16_t vbid, compaction_ctx *cookie,
                         Callback<compaction_ctx> &cb,
                         Callback<kvstats_ctx> &kvcb);
 
     /**
-     * Retrieve selected documents from the underlying storage system.
-     *
-     * @param vbids list of vbucket ids whose document keys are going to be retrieved
-     * @param cb callback instance to process each document retrieved
-     * @param cl callback to see if we need to read the value from disk
-     */
-    void dump(std::vector<uint16_t> &vbids, shared_ptr<Callback<GetValue> > cb,
-              shared_ptr<Callback<CacheLookup> > cl);
-
-    /**
-     * Retrieve all the documents for a given vbucket from the storage system.
-     *
-     * @param vb vbucket id
-     * @param cb callback instance to process each document retrieved
-     * @param cl callback to see if we need to read the value from disk
-     * @param sr callback to notify the caller what the range of the backfill is
-     */
-    void dump(uint16_t vb, uint64_t stSeqno,
-              shared_ptr<Callback<GetValue> > cb,
-              shared_ptr<Callback<CacheLookup> > cl,
-              shared_ptr<Callback<SeqnoRange> > sr);
-
-    /**
-     * Retrieve all the keys from the underlying storage system.
-     *
-     * @param vbids list of vbucket ids whose document keys are going to be retrieved
-     * @param cb callback instance to process each key retrieved
-     */
-    void dumpKeys(std::vector<uint16_t> &vbids,  shared_ptr<Callback<GetValue> > cb);
-
-    /**
-     * Retrieve the list of keys and their meta data for a given
-     * vbucket, which were deleted.
-     * @param vb vbucket id
-     * @param cb callback instance to process each key and its meta data
-     */
-    void dumpDeleted(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno,
-                     shared_ptr<Callback<GetValue> > cb);
-
-    /**
      * Does the underlying storage system support key-only retrieval operations?
      *
      * @return true if key-only retrieval is supported
@@ -471,13 +446,6 @@ public:
     }
 
     /**
-     * Get the estimated number of items that are going to be loaded during warmup.
-     *
-     * @return the number of estimated items to be loaded during warmup
-     */
-    size_t getEstimatedItemCount(std::vector<uint16_t> &vbs);
-
-    /**
      * Get the number of deleted items that are persisted to a vbucket file
      *
      * @param vbid The vbucket if of the file to get the number of deletes for
@@ -485,11 +453,11 @@ public:
     size_t getNumPersistedDeletes(uint16_t vbid);
 
     /**
-     * Get the number of non-deleted items from a vbucket database file
+     * Get the vbucket pertaining stats from a vbucket database file
      *
      * @param vbid The vbucket of the file to get the number of docs for
      */
-    size_t getNumItems(uint16_t vbid);
+    DBFileInfo getDbFileInfo(uint16_t vbid);
 
     /**
      * Get the number of non-deleted items from a vbucket database file
@@ -569,32 +537,34 @@ public:
     ENGINE_ERROR_CODE getAllKeys(uint16_t vbid, std::string &start_key,
                                  uint32_t count, AllKeysCB *cb);
 
-protected:
-    void loadDB(shared_ptr<Callback<GetValue> > cb,
-                shared_ptr<Callback<CacheLookup> > cl,
-                shared_ptr<Callback<SeqnoRange> > sr,
-                bool keysOnly, uint16_t vbid,
-                uint64_t startSeqno,
-                couchstore_docinfos_options options=COUCHSTORE_NO_OPTIONS);
+    ScanContext* initScanContext(shared_ptr<Callback<GetValue> > cb,
+                                 shared_ptr<Callback<CacheLookup> > cl,
+                                 uint16_t vbid, uint64_t startSeqno,
+                                 bool keysOnly, bool noDeletes,
+                                 bool deletesOnly);
+
+    scan_error_t scan(ScanContext* sctx);
+
+    void destroyScanContext(ScanContext* ctx);
+
     bool setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
-                         Callback<kvstats_ctx> *cb);
+                         Callback<kvstats_ctx> *cb, bool reset=false);
     bool resetVBucket(uint16_t vbucketId, vbucket_state &vbstate) {
         cachedDocCount[vbucketId] = 0;
-        return setVBucketState(vbucketId, vbstate, NULL);
+        return setVBucketState(vbucketId, vbstate, NULL, true);
     }
 
     template <typename T>
     void addStat(const std::string &prefix, const char *nm, T &val,
                  ADD_STAT add_stat, const void *c);
 
-private:
-
     void operator=(const CouchKVStore &from);
 
     void open();
     void close();
     bool commit2couchstore(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno,
-                           uint64_t snapEndSeqno);
+                           uint64_t snapEndSeqno, uint64_t maxCas,
+                           uint64_t driftCounter);
 
     uint64_t checkNewRevNum(std::string &dbname, bool newFile = false);
     void populateFileNameMap(std::vector<std::string> &filenames,
@@ -602,7 +572,8 @@ private:
     void remVBucketFromDbFileMap(uint16_t vbucketId);
     void updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev);
     couchstore_error_t openDB(uint16_t vbucketId, uint64_t fileRev, Db **db,
-                              uint64_t options, uint64_t *newFileRev = NULL);
+                              uint64_t options, uint64_t *newFileRev = NULL,
+                              bool reset=false);
     couchstore_error_t openDB_retry(std::string &dbfile, uint64_t options,
                                     const couch_file_ops *ops,
                                     Db **db, uint64_t *newFileRev);
@@ -610,7 +581,9 @@ private:
                                 DocInfo **docinfos, size_t docCount,
                                 kvstats_ctx &kvctx,
                                 uint64_t snapStartSeqno,
-                                uint64_t snapEndSeqno);
+                                uint64_t snapEndSeqno,
+                                uint64_t maxCas,
+                                uint64_t driftCounter);
     void commitCallback(std::vector<CouchRequest *> &committedReqs,
                         kvstats_ctx &kvctx,
                         couchstore_error_t errCode);
@@ -636,7 +609,6 @@ private:
 
     void removeCompactFile(const std::string &filename);
 
-    EPStats &epStats;
     Configuration &configuration;
     const std::string dbname;
 
@@ -660,6 +632,10 @@ private:
     unordered_map<uint16_t, size_t> cachedDocCount;
     /* pending file deletions */
     AtomicQueue<std::string> pendingFileDeletions;
+
+    AtomicValue<size_t> backfillCounter;
+    std::map<size_t, Db*> backfills;
+    Mutex backfillLock;
 };
 
 #endif  // SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_
diff --git a/src/dcp-backfill-manager.cc b/src/dcp-backfill-manager.cc
new file mode 100644 (file)
index 0000000..6877082
--- /dev/null
@@ -0,0 +1,325 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2014 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "config.h"
+#include "ep_engine.h"
+#include "connmap.h"
+#include "dcp-backfill-manager.h"
+#include "dcp-backfill.h"
+#include "dcp-producer.h"
+
+static const size_t sleepTime = 1;
+
+class BackfillManagerTask : public GlobalTask {
+public:
+    BackfillManagerTask(EventuallyPersistentEngine* e, BackfillManager* mgr,
+                        const Priority &p, double sleeptime = 0,
+                        bool shutdown = false)
+        : GlobalTask(e, p, sleeptime, shutdown), manager(mgr) {}
+
+    bool run();
+
+    std::string getDescription();
+
+private:
+    BackfillManager* manager;
+};
+
+bool BackfillManagerTask::run() {
+    backfill_status_t status = manager->backfill();
+    if (status == backfill_finished) {
+        return false;
+    } else if (status == backfill_snooze) {
+        snooze(sleepTime);
+    }
+
+    if (engine->getEpStats().isShutdown) {
+        return false;
+    }
+
+    return true;
+}
+
+std::string BackfillManagerTask::getDescription() {
+    std::stringstream ss;
+    ss << "Backfilling items for a DCP Connection";
+    return ss.str();
+}
+
+BackfillManager::BackfillManager(EventuallyPersistentEngine* e)
+    : engine(e), managerTask(NULL) {
+
+    Configuration& config = e->getConfiguration();
+
+    scanBuffer.bytesRead = 0;
+    scanBuffer.itemsRead = 0;
+    scanBuffer.maxBytes = config.getDcpScanByteLimit();
+    scanBuffer.maxItems = config.getDcpScanItemLimit();
+
+    buffer.bytesRead = 0;
+    buffer.maxBytes = config.getDcpBackfillByteLimit();
+    buffer.nextReadSize = 0;
+    buffer.full = false;
+}
+
+void BackfillManager::addStats(connection_t conn, ADD_STAT add_stat,
+                               const void *c) {
+    LockHolder lh(lock);
+    conn->addStat("backfill_buffer_bytes_read", buffer.bytesRead, add_stat, c);
+    conn->addStat("backfill_buffer_max_bytes", buffer.maxBytes, add_stat, c);
+    conn->addStat("backfill_buffer_full", buffer.full, add_stat, c);
+    conn->addStat("backfill_num_active", activeBackfills.size(), add_stat, c);
+    conn->addStat("backfill_num_snoozing", snoozingBackfills.size(), add_stat, c);
+    conn->addStat("backfill_num_pending", pendingBackfills.size(), add_stat, c);
+}
+
+BackfillManager::~BackfillManager() {
+    if (managerTask) {
+        managerTask->cancel();
+    }
+
+    while (!activeBackfills.empty()) {
+        DCPBackfill* backfill = activeBackfills.front();
+        activeBackfills.pop_front();
+        backfill->cancel();
+        delete backfill;
+        engine->getDcpConnMap().decrNumActiveSnoozingBackfills();
+    }
+
+    while (!snoozingBackfills.empty()) {
+        DCPBackfill* backfill = (snoozingBackfills.front()).second;
+        snoozingBackfills.pop_front();
+        backfill->cancel();
+        delete backfill;
+        engine->getDcpConnMap().decrNumActiveSnoozingBackfills();
+    }
+
+    while (!pendingBackfills.empty()) {
+        DCPBackfill* backfill = pendingBackfills.front();
+        pendingBackfills.pop_front();
+        backfill->cancel();
+        delete backfill;
+    }
+}
+
+void BackfillManager::schedule(stream_t stream, uint64_t start, uint64_t end) {
+    LockHolder lh(lock);
+    if (engine->getDcpConnMap().canAddBackfillToActiveQ()) {
+        activeBackfills.push_back(new DCPBackfill(engine, stream, start, end));
+    } else {
+        pendingBackfills.push_back(new DCPBackfill(engine, stream, start, end));
+    }
+
+    if (managerTask && !managerTask->isdead()) {
+        ExecutorPool::get()->wake(managerTask->getId());
+        return;
+    }
+
+    managerTask.reset(new BackfillManagerTask(engine, this,
+                                              Priority::BackfillTaskPriority));
+    ExecutorPool::get()->schedule(managerTask, AUXIO_TASK_IDX);
+}
+
+bool BackfillManager::bytesRead(uint32_t bytes) {
+    LockHolder lh(lock);
+    if (scanBuffer.itemsRead >= scanBuffer.maxItems) {
+        return false;
+    }
+
+    // Always allow an item to be backfilled if the scan buffer is empty,
+    // otherwise check to see if there is room for the item.
+    if (scanBuffer.bytesRead + bytes <= scanBuffer.maxBytes ||
+        scanBuffer.bytesRead == 0) {
+        scanBuffer.bytesRead += bytes;
+    }
+
+    if (buffer.bytesRead == 0 || buffer.bytesRead + bytes <= buffer.maxBytes) {
+        buffer.bytesRead += bytes;
+    } else {
+        scanBuffer.bytesRead -= bytes;
+        buffer.full = true;
+        buffer.nextReadSize = bytes;
+        return false;
+    }
+
+    scanBuffer.itemsRead++;
+
+    return true;
+}
+
+void BackfillManager::bytesSent(uint32_t bytes) {
+    LockHolder lh(lock);
+    cb_assert(buffer.bytesRead >= bytes);
+    buffer.bytesRead -= bytes;
+
+    if (buffer.full) {
+        uint32_t bufferSize = buffer.bytesRead;
+        bool canFitNext = buffer.maxBytes - bufferSize >= buffer.nextReadSize;
+        bool enoughCleared = bufferSize < (buffer.maxBytes * 3 / 4);
+        if (canFitNext && enoughCleared) {
+            buffer.nextReadSize = 0;
+            buffer.full = false;
+            if (managerTask) {
+                ExecutorPool::get()->wake(managerTask->getId());
+            }
+        }
+    }
+}
+
+backfill_status_t BackfillManager::backfill() {
+    LockHolder lh(lock);
+
+    if (activeBackfills.empty() && snoozingBackfills.empty()
+        && pendingBackfills.empty()) {
+        managerTask.reset();
+        return backfill_finished;
+    }
+
+    if (engine->getEpStore()->isMemoryUsageTooHigh()) {
+        LOG(EXTENSION_LOG_WARNING, "DCP backfilling task temporarily suspended "
+            "because the current memory usage is too high");
+        return backfill_snooze;
+    }
+
+    moveToActiveQueue();
+
+    if (activeBackfills.empty()) {
+        return backfill_snooze;
+    }
+
+    if (buffer.full) {
+        // If the buffer is full check to make sure we don't have any backfills
+        // that no longer have active streams and remove them. This prevents an
+        // issue where we have dead backfills taking up buffer space.
+        std::list<DCPBackfill*> toDelete;
+        std::list<DCPBackfill*>::iterator a_itr = activeBackfills.begin();
+        while (a_itr != activeBackfills.end()) {
+            if ((*a_itr)->isDead()) {
+                (*a_itr)->cancel();
+                toDelete.push_back(*a_itr);
+                a_itr = activeBackfills.erase(a_itr);
+                engine->getDcpConnMap().decrNumActiveSnoozingBackfills();
+            } else {
+                ++a_itr;
+            }
+        }
+
+        lh.unlock();
+        bool reschedule = !toDelete.empty();
+        while (!toDelete.empty()) {
+            DCPBackfill* backfill = toDelete.front();
+            toDelete.pop_front();
+            delete backfill;
+        }
+        return reschedule ? backfill_success : backfill_snooze;
+    }
+
+    DCPBackfill* backfill = activeBackfills.front();
+    activeBackfills.pop_front();
+
+    lh.unlock();
+    backfill_status_t status = backfill->run();
+    lh.lock();
+
+    scanBuffer.bytesRead = 0;
+    scanBuffer.itemsRead = 0;
+
+    if (status == backfill_success) {
+        activeBackfills.push_back(backfill);
+    } else if (status == backfill_finished) {
+        lh.unlock();
+        delete backfill;
+        engine->getDcpConnMap().decrNumActiveSnoozingBackfills();
+    } else if (status == backfill_snooze) {
+        uint16_t vbid = backfill->getVBucketId();
+        RCPtr<VBucket> vb = engine->getVBucket(vbid);
+        if (vb) {
+            snoozingBackfills.push_back(
+                                std::make_pair(ep_current_time(), backfill));
+        } else {
+            lh.unlock();
+            LOG(EXTENSION_LOG_WARNING, "Deleting the backfill, as vbucket %d "
+                    "seems to have been deleted!", vbid);
+            backfill->cancel();
+            delete backfill;
+            engine->getDcpConnMap().decrNumActiveSnoozingBackfills();
+        }
+    } else {
+        abort();
+    }
+
+    return backfill_success;
+}
+
+void BackfillManager::moveToActiveQueue() {
+    // Order in below AND is important
+    while (!pendingBackfills.empty()
+           && engine->getDcpConnMap().canAddBackfillToActiveQ()) {
+        activeBackfills.push_back(pendingBackfills.front());
+        pendingBackfills.pop_front();
+    }
+
+    while (!snoozingBackfills.empty()) {
+        std::pair<rel_time_t, DCPBackfill*> snoozer = snoozingBackfills.front();
+        // If snoozing task is found to be sleeping for greater than
+        // allowed snoozetime, push into active queue
+        if (snoozer.first + sleepTime <= ep_current_time()) {
+            DCPBackfill* bfill = snoozer.second;
+            activeBackfills.push_back(bfill);
+            snoozingBackfills.pop_front();
+        } else {
+            break;
+        }
+    }
+}
+
+void BackfillManager::wakeUpTask() {
+    LockHolder lh(lock);
+    if (managerTask) {
+        ExecutorPool::get()->wake(managerTask->getId());
+    }
+}
+
+void BackfillManager::wakeUpSnoozingBackfills(uint16_t vbid) {
+    LockHolder lh(lock);
+    std::list<std::pair<rel_time_t, DCPBackfill*> >::iterator it;
+    for (it = snoozingBackfills.begin(); it != snoozingBackfills.end(); ++it) {
+        DCPBackfill *bfill = (*it).second;
+        if (vbid == bfill->getVBucketId()) {
+            activeBackfills.push_back(bfill);
+            snoozingBackfills.erase(it);
+            ExecutorPool::get()->wake(managerTask->getId());
+            return;
+        }
+    }
+}
+
+bool BackfillManager::addIfLessThanMax(AtomicValue<uint32_t>& val,
+                                       uint32_t incr, uint32_t max) {
+    do {
+        uint32_t oldVal = val.load();
+        uint32_t newVal = oldVal + incr;
+
+        if (newVal > max) {
+            return false;
+        }
+
+        if (val.compare_exchange_strong(oldVal, newVal)) {
+            return true;
+        }
+    } while (true);
+}
diff --git a/src/dcp-backfill-manager.h b/src/dcp-backfill-manager.h
new file mode 100644 (file)
index 0000000..1ae8328
--- /dev/null
@@ -0,0 +1,83 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2013 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#ifndef SRC_DCP_BACKFILL_MANAGER_H_
+#define SRC_DCP_BACKFILL_MANAGER_H_ 1
+
+#include "config.h"
+#include "connmap.h"
+#include "dcp-backfill.h"
+#include "dcp-producer.h"
+#include "dcp-stream.h"
+#include "mutex.h"
+
+class EventuallyPersistentEngine;
+
+class BackfillManager {
+public:
+    BackfillManager(EventuallyPersistentEngine* e);
+
+    ~BackfillManager();
+
+    void addStats(connection_t conn, ADD_STAT add_stat, const void *c);
+
+    void schedule(stream_t stream, uint64_t start, uint64_t end);
+
+    bool bytesRead(uint32_t bytes);
+
+    void bytesSent(uint32_t bytes);
+
+    backfill_status_t backfill();
+
+    void wakeUpTask();
+
+    void wakeUpSnoozingBackfills(uint16_t vbid);
+
+private:
+
+    bool addIfLessThanMax(AtomicValue<uint32_t>& val, uint32_t incr,
+                          uint32_t max);
+
+    void moveToActiveQueue();
+
+    Mutex lock;
+    std::list<DCPBackfill*> activeBackfills;
+    std::list<std::pair<rel_time_t, DCPBackfill*> > snoozingBackfills;
+    //! When the number of (activeBackfills + snoozingBackfills) crosses a
+    //!   threshold we use waitingBackfills
+    std::list<DCPBackfill*> pendingBackfills;
+    EventuallyPersistentEngine* engine;
+    ExTask managerTask;
+
+    //! The scan buffer is for the current stream being backfilled
+    struct {
+        uint32_t bytesRead;
+        uint32_t itemsRead;
+        uint32_t maxBytes;
+        uint32_t maxItems;
+    } scanBuffer;
+
+    //! The buffer is the total bytes used by all backfills for this connection
+    struct {
+        uint32_t bytesRead;
+        uint32_t maxBytes;
+        uint32_t nextReadSize;
+        bool full;
+    } buffer;
+};
+
+#endif  // SRC_DCP_BACKFILL_MANAGER_H_
diff --git a/src/dcp-backfill.cc b/src/dcp-backfill.cc
new file mode 100644 (file)
index 0000000..eba1c98
--- /dev/null
@@ -0,0 +1,220 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2013 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "config.h"
+
+#include "ep_engine.h"
+#include "dcp-stream.h"
+#include "dcp-backfill.h"
+
+static const char* backfillStateToString(backfill_state_t state) {
+    switch (state) {
+        case backfill_state_init:
+            return "initalizing";
+        case backfill_state_scanning:
+            return "scanning";
+        case backfill_state_completing:
+            return "completing";
+        case backfill_state_done:
+            return "done";
+        default:
+            abort();
+    }
+}
+
+CacheCallback::CacheCallback(EventuallyPersistentEngine* e, stream_t &s)
+    : engine_(e), stream_(s) {
+    cb_assert(stream_.get() && stream_.get()->getType() == STREAM_ACTIVE);
+}
+
+void CacheCallback::callback(CacheLookup &lookup) {
+    RCPtr<VBucket> vb = engine_->getEpStore()->getVBucket(lookup.getVBucketId());
+    if (!vb) {
+        setStatus(ENGINE_SUCCESS);
+        return;
+    }
+
+    int bucket_num(0);
+    LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
+    StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num, false, false);
+    if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
+        Item* it = v->toItem(false, lookup.getVBucketId());
+        lh.unlock();
+        ActiveStream* as = static_cast<ActiveStream*>(stream_.get());
+        if (!as->backfillReceived(it, BACKFILL_FROM_MEMORY)) {
+            setStatus(ENGINE_ENOMEM); // Pause the backfill
+        } else {
+            setStatus(ENGINE_KEY_EEXISTS);
+        }
+    } else {
+        setStatus(ENGINE_SUCCESS);
+    }
+}
+
+DiskCallback::DiskCallback(stream_t &s)
+    : stream_(s) {
+    cb_assert(stream_.get() && stream_.get()->getType() == STREAM_ACTIVE);
+}
+
+void DiskCallback::callback(GetValue &val) {
+    cb_assert(val.getValue());
+
+    ActiveStream* as = static_cast<ActiveStream*>(stream_.get());
+    if (!as->backfillReceived(val.getValue(), BACKFILL_FROM_DISK)) {
+        setStatus(ENGINE_ENOMEM); // Pause the backfill
+    } else {
+        setStatus(ENGINE_SUCCESS);
+    }
+}
+
+DCPBackfill::DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
+                         uint64_t start_seqno, uint64_t end_seqno)
+    : engine(e), stream(s),startSeqno(start_seqno), endSeqno(end_seqno),
+      scanCtx(NULL), state(backfill_state_init) {
+    cb_assert(stream->getType() == STREAM_ACTIVE);
+}
+
+backfill_status_t DCPBackfill::run() {
+    LockHolder lh(lock);
+    switch (state) {
+        case backfill_state_init:
+            return create();
+        case backfill_state_scanning:
+            return scan();
+        case backfill_state_completing:
+            return complete(false);
+        case backfill_state_done:
+            return backfill_finished;
+        default:
+            LOG(EXTENSION_LOG_WARNING, "Invalid backfill state");
+            abort();
+    }
+}
+
+uint16_t DCPBackfill::getVBucketId() {
+    return stream->getVBucket();
+}
+
+uint64_t DCPBackfill::getEndSeqno() {
+    return endSeqno;
+}
+
+void DCPBackfill::cancel() {
+    LockHolder lh(lock);
+    if (state != backfill_state_done) {
+        complete(true);
+    }
+}
+
+backfill_status_t DCPBackfill::create() {
+    uint16_t vbid = stream->getVBucket();
+
+    uint64_t lastPersistedSeqno =
+        engine->getEpStore()->getLastPersistedSeqno(vbid);
+
+    ActiveStream* as = static_cast<ActiveStream*>(stream.get());
+
+    if (lastPersistedSeqno < endSeqno) {
+        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Rescheduling backfill"
+            "because backfill up to seqno %llu is needed but only up to "
+            "%llu is persisted", as->logHeader(), vbid, endSeqno,
+            lastPersistedSeqno);
+        return backfill_snooze;
+    }
+
+    KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
+    size_t numItems = kvstore->getNumItems(vbid, startSeqno,
+                                           std::numeric_limits<uint64_t>::max());
+
+    as->incrBackfillRemaining(numItems);
+
+    shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
+    shared_ptr<Callback<CacheLookup> > cl(new CacheCallback(engine, stream));
+    scanCtx = kvstore->initScanContext(cb, cl, vbid, startSeqno, false, false,
+                                       false);
+    if (scanCtx) {
+        as->markDiskSnapshot(startSeqno, scanCtx->maxSeqno);
+        transitionState(backfill_state_scanning);
+    } else {
+        transitionState(backfill_state_done);
+    }
+
+    return backfill_success;
+}
+
+backfill_status_t DCPBackfill::scan() {
+    uint16_t vbid = stream->getVBucket();
+
+    if (!(stream->isActive())) {
+        return complete(true);
+    }
+
+    KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
+    scan_error_t error = kvstore->scan(scanCtx);
+
+    if (error == scan_again) {
+        return backfill_success;
+    }
+
+    transitionState(backfill_state_completing);
+
+    return backfill_success;
+}
+
+backfill_status_t DCPBackfill::complete(bool cancelled) {
+    uint16_t vbid = stream->getVBucket();
+    KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
+    kvstore->destroyScanContext(scanCtx);
+
+    ActiveStream* as = static_cast<ActiveStream*>(stream.get());
+    as->completeBackfill();
+
+    LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill task (%llu to %llu) %s",
+        as->logHeader(), vbid, startSeqno, endSeqno,
+        cancelled ? "cancelled" : "finished", stream->getVBucket());
+
+    transitionState(backfill_state_done);
+
+    return backfill_success;
+}
+
+void DCPBackfill::transitionState(backfill_state_t newState) {
+    if (state == newState) {
+        return;
+    }
+
+    switch (newState) {
+        case backfill_state_scanning:
+            cb_assert(state == backfill_state_init);
+            break;
+        case backfill_state_completing:
+            cb_assert(state == backfill_state_scanning);
+            break;
+        case backfill_state_done:
+            cb_assert(state == backfill_state_init ||
+                      state == backfill_state_scanning ||
+                      state == backfill_state_completing);
+            break;
+        default:
+            LOG(EXTENSION_LOG_WARNING, "Invalid backfill state transition from"
+                " %s to %s", backfillStateToString(state),
+                backfillStateToString(newState));
+            abort();
+    }
+    state = newState;
+}
+
diff --git a/src/dcp-backfill.h b/src/dcp-backfill.h
new file mode 100644 (file)
index 0000000..be26689
--- /dev/null
@@ -0,0 +1,99 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2013 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#ifndef SRC_DCP_BACKFILL_H_
+#define SRC_DCP_BACKFILL_H_ 1
+
+#include "config.h"
+
+#include "callbacks.h"
+#include "dcp-stream.h"
+
+class EventuallyPersistentEngine;
+class ScanContext;
+
+typedef enum {
+    backfill_state_init,
+    backfill_state_scanning,
+    backfill_state_completing,
+    backfill_state_done
+} backfill_state_t;
+
+typedef enum {
+    backfill_success,
+    backfill_finished,
+    backfill_snooze
+} backfill_status_t;
+
+class CacheCallback : public Callback<CacheLookup> {
+public:
+    CacheCallback(EventuallyPersistentEngine* e, stream_t &s);
+
+    void callback(CacheLookup &lookup);
+
+private:
+    EventuallyPersistentEngine* engine_;
+    stream_t stream_;
+};
+
+class DiskCallback : public Callback<GetValue> {
+public:
+    DiskCallback(stream_t &s);
+
+    void callback(GetValue &val);
+
+private:
+    stream_t stream_;
+};
+
+class DCPBackfill {
+public:
+    DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
+                uint64_t start_seqno, uint64_t end_seqno);
+
+    backfill_status_t run();
+
+    uint16_t getVBucketId();
+
+    uint64_t getEndSeqno();
+
+    bool isDead() {
+        return !stream->isActive();
+    }
+
+    void cancel();
+
+private:
+
+    backfill_status_t create();
+
+    backfill_status_t scan();
+
+    backfill_status_t complete(bool cancelled);
+
+    void transitionState(backfill_state_t newState);
+
+    EventuallyPersistentEngine *engine;
+    stream_t                    stream;
+    uint64_t                    startSeqno;
+    uint64_t                    endSeqno;
+    ScanContext*                scanCtx;
+    backfill_state_t            state;
+    Mutex                       lock;
+};
+
+#endif  // SRC_DCP_BACKFILL_H_
index e8d7282..b009f36 100644 (file)
@@ -70,32 +70,92 @@ std::string Processer::getDescription() {
 
 DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
                          const std::string &name)
-    : Consumer(engine, cookie, name), opaqueCounter(0), processTaskId(0),
-          itemsToProcess(false), lastNoopTime(ep_current_time()), backoffs(0) {
+    : Consumer(engine, cookie, name),
+      opaqueCounter(0),
+      processTaskId(0),
+      itemsToProcess(false),
+      lastMessageTime(ep_current_time()),
+      backoffs(0),
+      processBufferedMessagesYieldThreshold(engine.getConfiguration().
+                                                getDcpConsumerProcessBufferedMessagesYieldLimit()),
+      processBufferedMessagesBatchSize(engine.getConfiguration().
+                                            getDcpConsumerProcessBufferedMessagesBatchSize()) {
     Configuration& config = engine.getConfiguration();
-    streams = new passive_stream_t[config.getMaxVbuckets()];
+    streams.resize(config.getMaxVbuckets());
     setSupportAck(false);
     setLogHeader("DCP (Consumer) " + getName() + " -");
     setReserved(true);
 
     flowControl.enabled = config.isDcpEnableFlowControl();
-    flowControl.bufferSize = config.getDcpConnBufferSize();
+
+    if (flowControl.enabled && config.isDcpEnableDynamicConnBufferSize()) {
+        double dcpConnBufferSizePerc = static_cast<double>
+                                       (config.getDcpConnBufferSizePerc())/100;
+        size_t bufferSize = dcpConnBufferSizePerc *
+                            engine.getEpStats().getMaxDataSize();
+
+        /* Make sure that the flow control buffer size is within a max and min
+           range */
+        if (bufferSize < config.getDcpConnBufferSize()) {
+            bufferSize = config.getDcpConnBufferSize();
+            LOG(EXTENSION_LOG_WARNING, "%s Conn flow control buffer is set to"
+                "minimum, as calculated sz is (%f) * (%zu)", logHeader(),
+                dcpConnBufferSizePerc, engine.getEpStats().getMaxDataSize());
+        } else if (bufferSize > config.getDcpConnBufferSizeMax()) {
+            bufferSize = config.getDcpConnBufferSizeMax();
+            LOG(EXTENSION_LOG_WARNING, "%s Conn flow control buffer is set to"
+                "maximum, as calculated sz is (%f) * (%zu)", logHeader(),
+                dcpConnBufferSizePerc, engine.getEpStats().getMaxDataSize());
+        }
+
+        /* If aggr memory used for flow control buffers across all consumers
+           exceeds the threshold, then we limit it to min size */
+        double dcpConnBufferSizeThreshold = static_cast<double>
+                            (config.getDcpConnBufferSizeAggrMemThreshold())/100;
+        if ((engine.getDcpConnMap().getAggrDcpConsumerBufferSize() + bufferSize)
+            > dcpConnBufferSizeThreshold * engine.getEpStats().getMaxDataSize())
+        {
+            /* Setting to default minimum size */
+            bufferSize = config.getDcpConnBufferSize();
+            LOG(EXTENSION_LOG_WARNING, "%s Conn flow control buffer is set to"
+                "minimum, as aggr memory used for flow control buffers across"
+                "all consumers is %zu and is above the threshold (%f) * (%zu)",
+                logHeader(),
+                engine.getDcpConnMap().getAggrDcpConsumerBufferSize(),
+                dcpConnBufferSizeThreshold,
+                engine.getEpStats().getMaxDataSize());
+        }
+        flowControl.bufferSize = bufferSize;
+    } else {
+        LOG(EXTENSION_LOG_WARNING, "%s Not using dynamic flow control",
+            logHeader());
+        flowControl.bufferSize = config.getDcpConnBufferSize();
+    }
+    engine.getDcpConnMap().incAggrDcpConsumerBufferSize(flowControl.bufferSize);
+    LOG(EXTENSION_LOG_WARNING, "%s Conn flow control buffer is %u", logHeader(),
+      flowControl.bufferSize);
+
     flowControl.maxUnackedBytes = config.getDcpMaxUnackedBytes();
 
     noopInterval = config.getDcpNoopInterval();
     enableNoop = config.isDcpEnableNoop();
     sendNoopInterval = config.isDcpEnableNoop();
 
+    setPriority = true;
+    enableExtMetaData = true;
+
     ExTask task = new Processer(&engine, this, Priority::PendingOpsPriority, 1);
     processTaskId = ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
 }
 
 DcpConsumer::~DcpConsumer() {
-    delete[] streams;
+    engine_.getDcpConnMap().decAggrDcpConsumerBufferSize
+                                (flowControl.bufferSize);
 }
 
 ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
                                          uint32_t flags) {
+    lastMessageTime = ep_current_time();
     LockHolder lh(readyMutex);
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
@@ -114,14 +174,18 @@ ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
         return ENGINE_NOT_MY_VBUCKET;
     }
 
+    snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
+    if (info.range.end == info.start) {
+        info.range.start = info.start;
+    }
+
     uint32_t new_opaque = ++opaqueCounter;
     failover_entry_t entry = vb->failovers->getLatestEntry();
-    uint64_t start_seqno = vb->getHighSeqno();
+    uint64_t start_seqno = info.start;
     uint64_t end_seqno = std::numeric_limits<uint64_t>::max();
     uint64_t vbucket_uuid = entry.vb_uuid;
-    uint64_t snap_start_seqno;
-    uint64_t snap_end_seqno;
-    vb->getCurrentSnapshot(snap_start_seqno, snap_end_seqno);
+    uint64_t snap_start_seqno = info.range.start;
+    uint64_t snap_end_seqno = info.range.end;
     uint64_t high_seqno = vb->getHighSeqno();
 
     passive_stream_t stream = streams[vbucket];
@@ -143,6 +207,7 @@ ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
 }
 
 ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         streams[vbucket].reset();
         return ENGINE_DISCONNECT;
@@ -168,6 +233,7 @@ ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
 
 ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque, uint16_t vbucket,
                                          uint32_t flags) {
+    lastMessageTime = ep_current_time();
     if (doDisconnect()) {
         return ENGINE_DISCONNECT;
     }
@@ -177,8 +243,14 @@ ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque, uint16_t vbucket,
     if (stream && stream->getOpaque() == opaque && stream->isActive()) {
         LOG(EXTENSION_LOG_INFO, "%s (vb %d) End stream received with reason %d",
             logHeader(), vbucket, flags);
-        StreamEndResponse* response = new StreamEndResponse(opaque, flags,
-                                                            vbucket);
+
+        StreamEndResponse* response;
+        try {
+            response = new StreamEndResponse(opaque, flags, vbucket);
+        } catch (const std::bad_alloc&) {
+            return ENGINE_ENOMEM;
+        }
+
         err = stream->messageReceived(response);
 
         bool disable = false;