Merge branch 'watson' 80/77880/1
authorDave Rigby <daver@couchbase.com>
Tue, 9 May 2017 10:42:19 +0000 (11:42 +0100)
committerDave Rigby <daver@couchbase.com>
Tue, 9 May 2017 10:42:31 +0000 (11:42 +0100)
* watson:
  MB-23591: Re-instate isBucketDeletion check for flusher

Change-Id: I03b313a511ca9b8745213e89edb7d2efb0b964bc

273 files changed:
CMakeLists.txt
Doxyfile
README.md
benchmarks/access_scanner_bench.cc [new file with mode: 0644]
benchmarks/benchmark_memory_tracker.cc [new file with mode: 0644]
benchmarks/benchmark_memory_tracker.h [new file with mode: 0644]
benchmarks/defragmenter_bench.cc [new file with mode: 0644]
configuration.json
docs/collections.md [new file with mode: 0644]
docs/protocol/del_with_meta.md
docs/protocol/get_meta.md
docs/protocol/set_with_meta.md
docs/stats.org
management/cbcompact
management/cbepctl
management/cbstats
management/cli_auth_utils.py [new file with mode: 0644]
management/clitool.py
management/mc_bin_client.py
management/memcacheConstants.py
scripts/unmerged-commits.py [new file with mode: 0755]
src/access_scanner.cc
src/access_scanner.h
src/atomic.cc
src/atomic.h
src/atomic_unordered_map.h [new file with mode: 0644]
src/atomicqueue.h
src/backfill.cc
src/backfill.h
src/bgfetcher.cc
src/bgfetcher.h
src/bloomfilter.cc
src/bloomfilter.h
src/callbacks.h
src/checkpoint.cc
src/checkpoint.h
src/checkpoint_remover.cc
src/checkpoint_remover.h
src/collections/collections_dockey.h [new file with mode: 0644]
src/collections/collections_types.h [new file with mode: 0644]
src/collections/manifest.cc [new file with mode: 0644]
src/collections/manifest.h [new file with mode: 0644]
src/collections/vbucket_manifest.cc [new file with mode: 0644]
src/collections/vbucket_manifest.h [new file with mode: 0644]
src/collections/vbucket_manifest_entry.cc [new file with mode: 0644]
src/collections/vbucket_manifest_entry.h [new file with mode: 0644]
src/collections/vbucket_serialised_manifest_entry.h [new file with mode: 0644]
src/common.h
src/compress.cc [deleted file]
src/config_static.h
src/configuration.cc
src/configuration.h
src/configuration_impl.h [new file with mode: 0644]
src/conflict_resolution.cc
src/conflict_resolution.h
src/connmap.cc
src/connmap.h
src/couch-kvstore/couch-fs-stats.cc
src/couch-kvstore/couch-fs-stats.h
src/couch-kvstore/couch-kvstore-metadata.h [new file with mode: 0644]
src/couch-kvstore/couch-kvstore.cc
src/couch-kvstore/couch-kvstore.h
src/dcp/backfill-manager.cc
src/dcp/backfill-manager.h
src/dcp/backfill.cc [deleted file]
src/dcp/backfill.h
src/dcp/backfill_disk.cc [new file with mode: 0644]
src/dcp/backfill_disk.h [new file with mode: 0644]
src/dcp/backfill_memory.cc [new file with mode: 0644]
src/dcp/backfill_memory.h [new file with mode: 0644]
src/dcp/consumer.cc
src/dcp/consumer.h
src/dcp/dcp-types.h
src/dcp/dcpconnmap.cc [new file with mode: 0644]
src/dcp/dcpconnmap.h [new file with mode: 0644]
src/dcp/flow-control.cc
src/dcp/flow-control.h
src/dcp/producer.cc
src/dcp/producer.h
src/dcp/response.cc
src/dcp/response.h
src/dcp/stream.cc
src/dcp/stream.h
src/defragmenter.cc
src/defragmenter.h
src/defragmenter_visitor.cc
src/defragmenter_visitor.h
src/ep.cc [deleted file]
src/ep_bucket.cc [new file with mode: 0644]
src/ep_bucket.h [new file with mode: 0644]
src/ep_engine.cc
src/ep_engine.h
src/ep_types.cc [moved from tests/module_tests/compress_test.cc with 51% similarity]
src/ep_types.h
src/ep_vb.cc [new file with mode: 0644]
src/ep_vb.h [new file with mode: 0644]
src/ephemeral_bucket.cc [new file with mode: 0644]
src/ephemeral_bucket.h [new file with mode: 0644]
src/ephemeral_tombstone_purger.cc [new file with mode: 0644]
src/ephemeral_tombstone_purger.h [new file with mode: 0644]
src/ephemeral_vb.cc [new file with mode: 0644]
src/ephemeral_vb.h [new file with mode: 0644]
src/ephemeral_vb_count_visitor.cc [new file with mode: 0644]
src/ephemeral_vb_count_visitor.h [new file with mode: 0644]
src/executorpool.cc
src/executorpool.h
src/executorthread.cc
src/executorthread.h
src/failover-table.cc
src/failover-table.h
src/fakes/fake_executorpool.h
src/flusher.cc
src/flusher.h
src/forest-kvstore/forest-kvstore.cc
src/forest-kvstore/forest-kvstore.h
src/futurequeue.h [new file with mode: 0644]
src/globaltask.cc [new file with mode: 0644]
src/globaltask.h [new file with mode: 0644]
src/hash_table.cc [new file with mode: 0644]
src/hash_table.h [new file with mode: 0644]
src/hlc.cc [new file with mode: 0644]
src/hlc.h
src/htresizer.cc
src/htresizer.h
src/item.cc
src/item.h
src/item_pager.cc
src/item_pager.h
src/kv_bucket.cc [new file with mode: 0644]
src/kv_bucket.h [moved from src/ep.h with 50% similarity]
src/kv_bucket_iface.h [new file with mode: 0644]
src/kvshard.cc
src/kvshard.h
src/kvstore.cc
src/kvstore.h
src/linked_list.cc [new file with mode: 0644]
src/linked_list.h [new file with mode: 0644]
src/locks.h
src/logger.cc
src/logger.h
src/memory_tracker.cc
src/memory_tracker.h
src/monotonic.h [new file with mode: 0644]
src/murmurhash3.cc
src/murmurhash3.h
src/mutation_log.cc
src/mutation_log.h
src/mutation_log_entry.cc [new file with mode: 0644]
src/mutation_log_entry.h [new file with mode: 0644]
src/objectregistry.cc
src/objectregistry.h
src/pre_link_document_context.cc [moved from src/mutex.h with 61% similarity]
src/pre_link_document_context.h [new file with mode: 0644]
src/ringbuffer.h [deleted file]
src/seqlist.h [new file with mode: 0644]
src/sizes.cc
src/stats.h
src/statwriter.h
src/stored-value.cc
src/stored-value.h
src/stored_value_factories.h [new file with mode: 0644]
src/storeddockey.cc [new file with mode: 0644]
src/storeddockey.h [new file with mode: 0644]
src/string_utils.cc
src/string_utils.h
src/syncobject.h
src/systemevent.cc [new file with mode: 0644]
src/systemevent.h [new file with mode: 0644]
src/tapconnection.cc
src/tapconnection.h
src/tapconnmap.cc [new file with mode: 0644]
src/tapconnmap.h [new file with mode: 0644]
src/task_type.h
src/taskable.h
src/taskqueue.cc
src/taskqueue.h
src/tasks.cc
src/tasks.def.h
src/tasks.h
src/threadlocal.h
src/threadlocal_posix.h
src/threadlocal_win32.h
src/utility.h
src/vb_count_visitor.cc [new file with mode: 0644]
src/vb_count_visitor.h [new file with mode: 0644]
src/vbucket.cc
src/vbucket.h
src/vbucketdeletiontask.cc [new file with mode: 0644]
src/vbucketdeletiontask.h [new file with mode: 0644]
src/vbucketmap.cc
src/vbucketmap.h
src/warmup.cc
src/warmup.h
src/workload.h
tests/ep_perfsuite.cc
tests/ep_test_apis.cc
tests/ep_test_apis.h
tests/ep_testsuite.cc
tests/ep_testsuite_basic.cc
tests/ep_testsuite_checkpoint.cc
tests/ep_testsuite_common.cc
tests/ep_testsuite_common.h
tests/ep_testsuite_dcp.cc
tests/ep_testsuite_tap.cc
tests/ep_testsuite_xdcr.cc
tests/mock/mock_basic_ll.h [new file with mode: 0644]
tests/mock/mock_dcp.cc
tests/mock/mock_dcp_backfill_mgr.h [new file with mode: 0644]
tests/mock/mock_dcp_consumer.h
tests/mock/mock_dcp_producer.h
tests/mock/mock_ephemeral_vb.h [new file with mode: 0644]
tests/mock/mock_global_task.h [moved from src/rwlock.h with 51% similarity]
tests/mock/mock_stream.h
tests/mock/mock_synchronous_ep_engine.cc [new file with mode: 0644]
tests/mock/mock_synchronous_ep_engine.h [new file with mode: 0644]
tests/module_tests/UNUSED_vbucket_test.cc [new file with mode: 0644]
tests/module_tests/atomic_ptr_test.cc
tests/module_tests/atomic_test.cc [deleted file]
tests/module_tests/atomic_unordered_map_test.cc [new file with mode: 0644]
tests/module_tests/basic_ll_test.cc [new file with mode: 0644]
tests/module_tests/bloomfilter_test.cc [new file with mode: 0644]
tests/module_tests/checkpoint_test.cc
tests/module_tests/chunk_creation_test.cc [deleted file]
tests/module_tests/collections/collection_dockey_test.cc [new file with mode: 0644]
tests/module_tests/collections/evp_store_collections_test.cc [new file with mode: 0644]
tests/module_tests/collections/manifest_test.cc [new file with mode: 0644]
tests/module_tests/collections/vbucket_manifest_entry_test.cc [new file with mode: 0644]
tests/module_tests/collections/vbucket_manifest_test.cc [new file with mode: 0644]
tests/module_tests/configuration_test.cc
tests/module_tests/couch-fs-stats_test.cc [new file with mode: 0644]
tests/module_tests/dcp_test.cc
tests/module_tests/defragmenter_test.cc
tests/module_tests/defragmenter_test.h [new file with mode: 0644]
tests/module_tests/ep_unit_tests_main.cc
tests/module_tests/ephemeral_bucket_test.cc [new file with mode: 0644]
tests/module_tests/ephemeral_bucket_test.h [moved from src/compress.h with 55% similarity]
tests/module_tests/ephemeral_vb_test.cc [new file with mode: 0644]
tests/module_tests/evp_engine_test.cc
tests/module_tests/evp_engine_test.h
tests/module_tests/evp_store_rollback_test.cc
tests/module_tests/evp_store_single_threaded_test.cc
tests/module_tests/evp_store_single_threaded_test.h [new file with mode: 0644]
tests/module_tests/evp_store_test.cc
tests/module_tests/evp_store_test.h
tests/module_tests/evp_store_with_meta.cc [new file with mode: 0644]
tests/module_tests/executorpool_test.cc [new file with mode: 0644]
tests/module_tests/executorpool_test.h [new file with mode: 0644]
tests/module_tests/failover_table_test.cc
tests/module_tests/futurequeue_test.cc [new file with mode: 0644]
tests/module_tests/hash_table_test.cc
tests/module_tests/item_pager_test.cc [new file with mode: 0644]
tests/module_tests/kv_bucket_test.cc [new file with mode: 0644]
tests/module_tests/kv_bucket_test.h [new file with mode: 0644]
tests/module_tests/kvstore_test.cc
tests/module_tests/mutation_log_test.cc
tests/module_tests/mutex_test.cc
tests/module_tests/ringbuffer_test.cc [deleted file]
tests/module_tests/stats_test.cc [new file with mode: 0644]
tests/module_tests/stats_test.h [new file with mode: 0644]
tests/module_tests/stored_value_test.cc [new file with mode: 0644]
tests/module_tests/storeddockey_test.cc [new file with mode: 0644]
tests/module_tests/systemevent_test.cc [new file with mode: 0644]
tests/module_tests/test_helpers.cc [new file with mode: 0644]
tests/module_tests/test_helpers.h [new file with mode: 0644]
tests/module_tests/test_task.h [new file with mode: 0644]
tests/module_tests/thread_gate.h [new file with mode: 0644]
tests/module_tests/threadtests.h
tests/module_tests/timing_tests.cc
tests/module_tests/vbucket_test.cc
tests/module_tests/vbucket_test.h [new file with mode: 0644]
tests/module_tests/warmup_test.cc [new file with mode: 0644]
tools/genconfig.cc
wrapper/wrapper

index 7952f66..ffcfe84 100644 (file)
@@ -13,22 +13,37 @@ INCLUDE(CheckSymbolExists)
 INCLUDE(CheckTypeSize)
 INCLUDE(CTest)
 
+OPTION(EP_USE_FORESTDB "Enable support for ForestDB" OFF)
+
 INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include
                            ${CMAKE_CURRENT_SOURCE_DIR}
                            ${CMAKE_CURRENT_SOURCE_DIR}/src
                            ${CMAKE_CURRENT_BINARY_DIR}/src
+                           ${BOOST_INCLUDE_DIR}
                            ${SNAPPY_INCLUDE_DIR}
+                           ${BOOST_INCLUDE_DIR}
                            ${Platform_SOURCE_DIR}/include
                            ${Platform_SOURCE_DIR}/external
                            ${Memcached_SOURCE_DIR}
+                           ${Memcached_SOURCE_DIR}/utilities
                            ${Memcached_SOURCE_DIR}/include
                            ${Couchstore_SOURCE_DIR}/include
-                           ${ForestDB_SOURCE_DIR}/include
+                           ${phosphor_SOURCE_DIR}/include
                            ${CMAKE_CURRENT_BINARY_DIR})
 
+INCLUDE_DIRECTORIES(AFTER ${Memcached_BINARY_DIR}/include)
+
+IF (EP_USE_FORESTDB)
+    INCLUDE_DIRECTORIES(AFTER ${ForestDB_SOURCE_DIR}/include)
+    SET(FOREST_KVSTORE_SOURCE src/forest-kvstore/forest-kvstore.cc)
+    SET(EP_FORESTDB_LIB forestdb)
+    ADD_DEFINITIONS(-DEP_USE_FORESTDB=1)
+    MESSAGE(STATUS "ep-engine: Using ForestDB")
+ENDIF (EP_USE_FORESTDB)
+
 INCLUDE_DIRECTORIES(AFTER
-                    ${gmock_SOURCE_DIR}/include
-                    ${gtest_SOURCE_DIR}/include)
+                    ${gtest_SOURCE_DIR}/include
+                    ${gmock_SOURCE_DIR}/include)
 
 CHECK_INCLUDE_FILES("alloca.h" HAVE_ALLOCA_H)
 CHECK_INCLUDE_FILES("arpa/inet.h" HAVE_ARPA_INET_H)
@@ -52,12 +67,8 @@ CHECK_FUNCTION_EXISTS(mach_absolute_time HAVE_MACH_ABSOLUTE_TIME)
 CHECK_FUNCTION_EXISTS(gettimeofday HAVE_GETTIMEOFDAY)
 CHECK_FUNCTION_EXISTS(getopt_long HAVE_GETOPT_LONG)
 
-# ---- uncomment the lines below ONLY for dev/debugging ---
-#if ("${CMAKE_C_COMPILER_ID}" STREQUAL "Clang")
-#    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0")
-#elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
-#    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0")
-#endif()
+# For debugging without compiler optimizations uncomment line below..
+#SET (CMAKE_BUILD_TYPE DEBUG)
 
 IF (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/.git)
    EXECUTE_PROCESS(COMMAND git rev-parse HEAD
@@ -86,6 +97,18 @@ CONFIGURE_FILE (${CMAKE_CURRENT_SOURCE_DIR}/wrapper/wrapper
 CONFIGURE_FILE (${CMAKE_CURRENT_SOURCE_DIR}/wrapper/wrapper
                 ${CMAKE_CURRENT_BINARY_DIR}/wrapper/cbvbucketctl)
 
+SET(Python_ADDITIONAL_VERSIONS 2.6)
+FIND_PACKAGE(PythonInterp)
+
+# to avoid cluttering source dir with name + "c" files
+SET(ENV{PYTHONDONTWRITEBYTECODE} 1)
+
+# runs cbstats & cbepctl - will fail on basic syntax errors
+ADD_TEST(cbstats_test ${PYTHON_EXECUTABLE}
+        ${CMAKE_CURRENT_SOURCE_DIR}/management/cbstats --help)
+ADD_TEST(cbepctl_test ${PYTHON_EXECUTABLE}
+        ${CMAKE_CURRENT_SOURCE_DIR}/management/cbepctl --help)
+
 IF (WIN32)
    INCLUDE_DIRECTORIES(AFTER ${CMAKE_SOURCE_DIR}/platform/include/win32)
 ENDIF (WIN32)
@@ -115,15 +138,18 @@ ADD_CUSTOM_COMMAND(OUTPUT
                         genconfig
                   COMMENT "Generating code for configuration class")
 
-SET(KVSTORE_SOURCE src/crc32.c src/kvstore.cc)
+SET(KVSTORE_SOURCE src/kvstore.cc)
 SET(COUCH_KVSTORE_SOURCE src/couch-kvstore/couch-kvstore.cc
             src/couch-kvstore/couch-fs-stats.cc)
-SET(FOREST_KVSTORE_SOURCE src/forest-kvstore/forest-kvstore.cc)
 SET(OBJECTREGISTRY_SOURCE src/objectregistry.cc)
 SET(CONFIG_SOURCE src/configuration.cc
   ${CMAKE_CURRENT_BINARY_DIR}/src/generated_configuration.cc)
 
-ADD_LIBRARY(ep SHARED
+SET(COLLECTIONS_SOURCE src/collections/manifest.cc
+                       src/collections/vbucket_manifest.cc
+                       src/collections/vbucket_manifest_entry.cc)
+
+ADD_LIBRARY(ep_objs OBJECT
             src/access_scanner.cc
             src/atomic.cc
             src/backfill.cc
@@ -131,12 +157,14 @@ ADD_LIBRARY(ep SHARED
             src/bloomfilter.cc
             src/checkpoint.cc
             src/checkpoint_remover.cc
-            src/compress.cc
             src/conflict_resolution.cc
             src/connmap.cc
+            src/crc32.c
             src/dcp/backfill-manager.cc
-            src/dcp/backfill.cc
+            src/dcp/backfill_disk.cc
+            src/dcp/backfill_memory.cc
             src/dcp/consumer.cc
+            src/dcp/dcpconnmap.cc
             src/dcp/flow-control.cc
             src/dcp/flow-control-manager.cc
             src/dcp/producer.cc
@@ -144,111 +172,127 @@ ADD_LIBRARY(ep SHARED
             src/dcp/stream.cc
             src/defragmenter.cc
             src/defragmenter_visitor.cc
-            src/ep.cc
+            src/ep_bucket.cc
+            src/ep_vb.cc
             src/ep_engine.cc
             src/ep_time.cc
+            src/ep_types.cc
+            src/ephemeral_bucket.cc
+            src/ephemeral_tombstone_purger.cc
+            src/ephemeral_vb.cc
+            src/ephemeral_vb_count_visitor.cc
             src/executorpool.cc
             src/executorthread.cc
             src/ext_meta_parser.cc
             src/failover-table.cc
             src/flusher.cc
+            src/globaltask.cc
+            src/hash_table.cc
+            src/hlc.cc
             src/htresizer.cc
             src/item.cc
             src/item_pager.cc
             src/logger.cc
+            src/kv_bucket.cc
             src/kvshard.cc
             src/memory_tracker.cc
             src/murmurhash3.cc
             src/mutation_log.cc
+            src/mutation_log_entry.cc
+            src/pre_link_document_context.cc
+            src/pre_link_document_context.h
             src/replicationthrottle.cc
-            src/sizes.cc
-            ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
+            src/linked_list.cc
             src/string_utils.cc
+            src/storeddockey.cc
             src/stored-value.cc
+            src/systemevent.cc
             src/tapconnection.cc
+            src/tapconnmap.cc
             src/tasks.cc
             src/taskqueue.cc
+            src/vb_count_visitor.cc
             src/vbucket.cc
             src/vbucketmap.cc
+            src/vbucketdeletiontask.cc
             src/warmup.cc
-            ${KVSTORE_SOURCE} ${COUCH_KVSTORE_SOURCE}
-            ${FOREST_KVSTORE_SOURCE} ${OBJECTREGISTRY_SOURCE}
-            ${CONFIG_SOURCE})
+            ${OBJECTREGISTRY_SOURCE}
+            ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
+            ${CONFIG_SOURCE}
+            ${KVSTORE_SOURCE}
+            ${COUCH_KVSTORE_SOURCE}
+            ${FOREST_KVSTORE_SOURCE}
+            ${COLLECTIONS_SOURCE})
+SET_PROPERTY(TARGET ep_objs PROPERTY POSITION_INDEPENDENT_CODE 1)
+
+ADD_LIBRARY(ep SHARED $<TARGET_OBJECTS:ep_objs>)
 
 SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "")
-TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore forestdb
-  dirutils platform ${LIBEVENT_LIBRARIES})
+TARGET_LINK_LIBRARIES(ep cJSON JSON_checker couchstore ${EP_FORESTDB_LIB}
+                      engine_utilities dirutils cbcompress
+                      platform phosphor xattr ${LIBEVENT_LIBRARIES})
 
 # Single executable containing all class-level unit tests involving
 # EventuallyPersistentEngine driven by GoogleTest.
 # (We end up compiling most of the src/ files of ep-engine for these unit tests,
 # so simpler / quicker just to link them into a single executable).
 ADD_EXECUTABLE(ep-engine_ep_unit_tests
-  tests/mock/mock_dcp.cc
-  tests/module_tests/checkpoint_test.cc
-  tests/module_tests/defragmenter_test.cc
-  tests/module_tests/ep_unit_tests_main.cc
-  tests/module_tests/dcp_test.cc
-  tests/module_tests/evp_engine_test.cc
-  tests/module_tests/evp_store_rollback_test.cc
-  tests/module_tests/evp_store_test.cc
-  tests/module_tests/evp_store_single_threaded_test.cc
-  tests/module_tests/mutation_log_test.cc
-  src/access_scanner.cc
-  src/atomic.cc
-  src/backfill.cc
-  src/bgfetcher.cc
-  src/bloomfilter.cc
-  src/checkpoint.cc
-  src/checkpoint_remover.cc
-  src/conflict_resolution.cc
-  src/compress.cc
-  src/connmap.cc
-  src/dcp/backfill.cc
-  src/dcp/backfill-manager.cc
-  src/dcp/consumer.cc
-  src/dcp/flow-control.cc
-  src/dcp/flow-control-manager.cc
-  src/dcp/producer.cc
-  src/dcp/response.cc
-  src/dcp/stream.cc
-  src/defragmenter.cc
-  src/defragmenter_visitor.cc
-  src/ep.cc
-  src/ep_engine.cc
-  src/ep_time.cc
-  src/executorpool.cc
-  src/executorthread.cc
-  src/ext_meta_parser.cc
-  src/failover-table.cc
-  src/flusher.cc
-  src/htresizer.cc
-  src/item.cc
-  src/item_pager.cc
-  src/kvshard.cc
-  src/logger.cc
-  src/memory_tracker.cc
-  src/murmurhash3.cc
-  src/mutation_log.cc
-  src/objectregistry.cc
-  src/tapconnection.cc
-  src/replicationthrottle.cc
-  src/stored-value.cc
-  src/string_utils.cc
-  src/tasks.cc
-  src/taskqueue.cc
-  src/vbucket.cc
-  src/vbucketmap.cc
-  src/warmup.cc
-  ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
-  ${CONFIG_SOURCE}
-  ${KVSTORE_SOURCE}
-  ${COUCH_KVSTORE_SOURCE}
-  ${FOREST_KVSTORE_SOURCE}
-  $<TARGET_OBJECTS:memory_tracking>
-  ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.cc)
-TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests couchstore cJSON dirutils forestdb gmock gtest JSON_checker mcd_util platform
-                      ${MALLOC_LIBRARIES})
+               tests/mock/mock_dcp.cc
+               tests/mock/mock_synchronous_ep_engine.cc
+               tests/module_tests/atomic_unordered_map_test.cc
+               tests/module_tests/basic_ll_test.cc
+               tests/module_tests/bloomfilter_test.cc
+               tests/module_tests/checkpoint_test.cc
+               tests/module_tests/collections/collection_dockey_test.cc
+               tests/module_tests/collections/evp_store_collections_test.cc
+               tests/module_tests/collections/manifest_test.cc
+               tests/module_tests/collections/vbucket_manifest_test.cc
+               tests/module_tests/collections/vbucket_manifest_entry_test.cc
+               tests/module_tests/configuration_test.cc
+               tests/module_tests/defragmenter_test.cc
+               tests/module_tests/dcp_test.cc
+               tests/module_tests/ep_unit_tests_main.cc
+               tests/module_tests/ephemeral_bucket_test.cc
+               tests/module_tests/ephemeral_vb_test.cc
+               tests/module_tests/evp_engine_test.cc
+               tests/module_tests/evp_store_rollback_test.cc
+               tests/module_tests/evp_store_test.cc
+               tests/module_tests/evp_store_single_threaded_test.cc
+               tests/module_tests/evp_store_with_meta.cc
+               tests/module_tests/executorpool_test.cc
+               tests/module_tests/failover_table_test.cc
+               tests/module_tests/futurequeue_test.cc
+               tests/module_tests/hash_table_test.cc
+               tests/module_tests/item_pager_test.cc
+               tests/module_tests/kvstore_test.cc
+               tests/module_tests/kv_bucket_test.cc
+               tests/module_tests/memory_tracker_test.cc
+               tests/module_tests/mock_hooks_api.cc
+               tests/module_tests/mutation_log_test.cc
+               tests/module_tests/mutex_test.cc
+               tests/module_tests/stats_test.cc
+               tests/module_tests/storeddockey_test.cc
+               tests/module_tests/stored_value_test.cc
+               tests/module_tests/systemevent_test.cc
+               tests/module_tests/test_helpers.cc
+               tests/module_tests/vbucket_test.cc
+               tests/module_tests/warmup_test.cc
+               $<TARGET_OBJECTS:ep_objs>
+               $<TARGET_OBJECTS:memory_tracking>
+               $<TARGET_OBJECTS:couchstore_test_fileops>
+               ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.cc
+               ${Memcached_SOURCE_DIR}/daemon/doc_pre_expiry.cc
+               ${Memcached_SOURCE_DIR}/daemon/protocol/mcbp/engine_errc_2_mcbp.cc
+               ${Memcached_SOURCE_DIR}/utilities/string_utilities.cc)
+
+TARGET_INCLUDE_DIRECTORIES(ep-engine_ep_unit_tests
+        PUBLIC
+        ${Couchstore_SOURCE_DIR})
+
+TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests couchstore cJSON dirutils
+                      engine_utilities ${EP_FORESTDB_LIB}
+                      gtest gmock JSON_checker mcd_util platform
+                      phosphor xattr cbcompress ${MALLOC_LIBRARIES})
 
 ADD_EXECUTABLE(ep-engine_atomic_ptr_test
   tests/module_tests/atomic_ptr_test.cc
@@ -256,141 +300,103 @@ ADD_EXECUTABLE(ep-engine_atomic_ptr_test
   src/testlogger.cc)
 TARGET_LINK_LIBRARIES(ep-engine_atomic_ptr_test platform)
 
-ADD_EXECUTABLE(ep-engine_atomic_test
-  tests/module_tests/atomic_test.cc
-  src/testlogger.cc)
-TARGET_LINK_LIBRARIES(ep-engine_atomic_test platform)
-
-ADD_EXECUTABLE(ep-engine_chunk_creation_test
-  tests/module_tests/chunk_creation_test.cc)
-TARGET_LINK_LIBRARIES(ep-engine_chunk_creation_test platform)
-
-ADD_EXECUTABLE(ep-engine_compress_test
-  tests/module_tests/compress_test.cc
-  src/compress.cc)
-TARGET_LINK_LIBRARIES(ep-engine_compress_test ${SNAPPY_LIBRARIES} platform)
-
-ADD_EXECUTABLE(ep-engine_configuration_test
-        tests/module_tests/configuration_test.cc
-        src/configuration.cc
+ADD_EXECUTABLE(ep-engine_couch-fs-stats_test
+        src/couch-kvstore/couch-fs-stats.cc
         src/generated_configuration.h
-        src/objectregistry.cc
-        src/testlogger.cc)
-TARGET_LINK_LIBRARIES(ep-engine_configuration_test gtest gtest_main platform)
-
-ADD_EXECUTABLE(ep-engine_hash_table_test
-  tests/module_tests/hash_table_test.cc
-  src/atomic.cc
-  src/compress.cc
-  src/item.cc
-  src/stored-value.cc
-  src/testlogger.cc
-  ${OBJECTREGISTRY_SOURCE} ${CONFIG_SOURCE})
-TARGET_LINK_LIBRARIES(ep-engine_hash_table_test gtest ${SNAPPY_LIBRARIES} platform)
+        tests/module_tests/couch-fs-stats_test.cc
+        $<TARGET_OBJECTS:couchstore_wrapped_fileops_test_framework>)
+TARGET_INCLUDE_DIRECTORIES(ep-engine_couch-fs-stats_test
+        PRIVATE
+        ${Couchstore_SOURCE_DIR}
+        ${Couchstore_SOURCE_DIR}/src)
+TARGET_LINK_LIBRARIES(ep-engine_couch-fs-stats_test gtest gtest_main gmock platform)
 
 ADD_EXECUTABLE(ep-engine_hrtime_test tests/module_tests/hrtime_test.cc)
 TARGET_LINK_LIBRARIES(ep-engine_hrtime_test platform)
 
-ADD_EXECUTABLE(ep-engine_memory_tracker_test
-  tests/module_tests/memory_tracker_test.cc
-  tests/module_tests/mock_hooks_api.cc
-  src/memory_tracker.cc
-  src/testlogger.cc
-  ${OBJECTREGISTRY_SOURCE}
-  ${CMAKE_CURRENT_BINARY_DIR}/src/generated_configuration.h)
-TARGET_LINK_LIBRARIES(ep-engine_memory_tracker_test gtest gtest_main platform)
-
 ADD_EXECUTABLE(ep-engine_misc_test tests/module_tests/misc_test.cc)
 TARGET_LINK_LIBRARIES(ep-engine_misc_test platform)
-ADD_EXECUTABLE(ep-engine_mutex_test
-               tests/module_tests/mutex_test.cc
-               src/testlogger.cc)
-TARGET_LINK_LIBRARIES(ep-engine_mutex_test gtest gtest_main platform)
-
-ADD_EXECUTABLE(ep-engine_ringbuffer_test tests/module_tests/ringbuffer_test.cc)
-TARGET_LINK_LIBRARIES(ep-engine_ringbuffer_test platform)
 
 ADD_EXECUTABLE(ep-engine_string_utils_test
                tests/module_tests/string_utils_test.cc
                src/string_utils.cc)
 TARGET_LINK_LIBRARIES(ep-engine_string_utils_test gtest gtest_main platform)
 
-ADD_EXECUTABLE(ep-engine_failover_table_test tests/module_tests/failover_table_test.cc
-                        src/failover-table.cc src/testlogger.cc
-                        ${OBJECTREGISTRY_SOURCE} ${CONFIG_SOURCE})
-TARGET_LINK_LIBRARIES(ep-engine_failover_table_test cJSON platform)
-
-ADD_EXECUTABLE(ep-engine_kvstore_test
-  tests/module_tests/kvstore_test.cc
-  src/compress.cc
-  src/testlogger.cc
-  ${OBJECTREGISTRY_SOURCE} ${KVSTORE_SOURCE} ${COUCH_KVSTORE_SOURCE}
-  ${FOREST_KVSTORE_SOURCE} ${CONFIG_SOURCE})
-TARGET_LINK_LIBRARIES(ep-engine_kvstore_test
-                      cJSON JSON_checker couchstore dirutils forestdb
-                      gmock gtest platform)
+ADD_EXECUTABLE(ep_engine_benchmarks
+               benchmarks/access_scanner_bench.cc
+               tests/mock/mock_synchronous_ep_engine.cc
+               $<TARGET_OBJECTS:ep_objs>
+               $<TARGET_OBJECTS:memory_tracking>
+               $<TARGET_OBJECTS:couchstore_test_fileops>
+               ${Memcached_SOURCE_DIR}/programs/engine_testapp/mock_server.cc
+               ${Memcached_SOURCE_DIR}/daemon/doc_pre_expiry.cc
+               ${Memcached_SOURCE_DIR}/daemon/protocol/mcbp/engine_errc_2_mcbp.cc
+               ${Memcached_SOURCE_DIR}/utilities/string_utilities.cc
+               benchmarks/benchmark_memory_tracker.cc
+               benchmarks/defragmenter_bench.cc
+               tests/module_tests/vbucket_test.cc)
+
+TARGET_LINK_LIBRARIES(ep_engine_benchmarks benchmark platform xattr couchstore
+        cJSON dirutils engine_utilities gtest gmock JSON_checker mcd_util
+        cbcompress ${MALLOC_LIBRARIES})
+TARGET_INCLUDE_DIRECTORIES(ep_engine_benchmarks PUBLIC
+                           ${benchmark_SOURCE_DIR}/include
+                           tests
+                           benchmarks
+                           ${Couchstore_SOURCE_DIR})
 
 ADD_TEST(ep-engine_atomic_ptr_test ep-engine_atomic_ptr_test)
-ADD_TEST(ep-engine_atomic_test ep-engine_atomic_test)
-ADD_TEST(ep-engine_chunk_creation_test ep-engine_chunk_creation_test)
-ADD_TEST(ep-engine_compress_test ep-engine_compress_test)
-ADD_TEST(ep-engine_configuration_test ep-engine_configuration_test)
+ADD_TEST(ep-engine_couch-fs-stats_test ep-engine_couch-fs-stats_test)
 ADD_TEST(ep-engine_ep_unit_tests ep-engine_ep_unit_tests)
-ADD_TEST(ep-engine_failover_table_test ep-engine_failover_table_test)
-ADD_TEST(ep-engine_hash_table_test ep-engine_hash_table_test)
 ADD_TEST(ep-engine_hrtime_test ep-engine_hrtime_test)
 ADD_TEST(ep-engine_misc_test ep-engine_misc_test)
-ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
-ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
-ADD_TEST(ep-engine_kvstore_test ep-engine_kvstore_test)
-ADD_TEST(ep-engine_memory_tracker_test ep-engine_memory_tracker_test)
 
 ADD_LIBRARY(timing_tests SHARED tests/module_tests/timing_tests.cc)
 SET_TARGET_PROPERTIES(timing_tests PROPERTIES PREFIX "")
 TARGET_LINK_LIBRARIES(timing_tests platform)
 
-ADD_EXECUTABLE(ep-engine_sizes src/sizes.cc src/mutex.h src/testlogger.cc
-              ${OBJECTREGISTRY_SOURCE} ${CONFIG_SOURCE})
-TARGET_LINK_LIBRARIES(ep-engine_sizes platform)
+ADD_EXECUTABLE(ep-engine_sizes src/sizes.cc
+                               $<TARGET_OBJECTS:ep_objs>)
+TARGET_LINK_LIBRARIES(ep-engine_sizes cJSON JSON_checker
+  engine_utilities couchstore
+  ${EP_FORESTDB_LIB} dirutils cbcompress platform phosphor xattr
+  ${LIBEVENT_LIBRARIES})
 
 ADD_LIBRARY(ep_testsuite SHARED
    tests/ep_testsuite.cc
-   src/atomic.cc
-   src/compress.cc
-   src/ep_time.cc
    src/ext_meta_parser.cc
-   src/item.cc
-   src/testlogger.cc
    tests/ep_testsuite_common.cc
    tests/ep_test_apis.cc
    tests/mock/mock_dcp.cc
-   ${OBJECTREGISTRY_SOURCE}
-   ${CONFIG_SOURCE})
+   ${Memcached_SOURCE_DIR}/utilities/string_utilities.cc)
 SET_TARGET_PROPERTIES(ep_testsuite PROPERTIES PREFIX "")
 TARGET_LINK_LIBRARIES(ep_testsuite couchstore dirutils JSON_checker platform
-                      ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+                      xattr
+                      ${LIBEVENT_LIBRARIES})
+ADD_DEPENDENCIES(ep_testsuite engine_testapp)
 
 ADD_LIBRARY(ep_testsuite_basic SHARED
    tests/ep_testsuite_basic.cc
-   src/compress.cc
    src/ext_meta_parser.cc
    tests/ep_testsuite_common.cc
    tests/ep_test_apis.cc
    tests/mock/mock_dcp.cc
 )
 SET_TARGET_PROPERTIES(ep_testsuite_basic PROPERTIES PREFIX "")
-TARGET_LINK_LIBRARIES(ep_testsuite_basic JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+TARGET_LINK_LIBRARIES(ep_testsuite_basic engine_utilities JSON_checker dirutils
+                      platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+ADD_DEPENDENCIES(ep_testsuite engine_testapp)
 
 ADD_LIBRARY(ep_testsuite_dcp SHARED
 tests/ep_testsuite_dcp.cc
-src/compress.cc
 src/ext_meta_parser.cc
 tests/ep_testsuite_common.cc
 tests/ep_test_apis.cc
 tests/mock/mock_dcp.cc
 )
 SET_TARGET_PROPERTIES(ep_testsuite_dcp PROPERTIES PREFIX "")
-TARGET_LINK_LIBRARIES(ep_testsuite_dcp JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+TARGET_LINK_LIBRARIES(ep_testsuite_dcp cbcompress JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+ADD_DEPENDENCIES(ep_testsuite_dcp engine_testapp)
 
 ADD_LIBRARY(ep_testsuite_tap SHARED
    tests/ep_testsuite_common.cc
@@ -399,6 +405,7 @@ ADD_LIBRARY(ep_testsuite_tap SHARED
    src/ext_meta_parser.cc)
 SET_TARGET_PROPERTIES(ep_testsuite_tap PROPERTIES PREFIX "")
 TARGET_LINK_LIBRARIES(ep_testsuite_tap JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+ADD_DEPENDENCIES(ep_testsuite_tap engine_testapp)
 
 ADD_LIBRARY(ep_testsuite_checkpoint SHARED
    tests/ep_testsuite_common.cc
@@ -407,14 +414,18 @@ ADD_LIBRARY(ep_testsuite_checkpoint SHARED
    src/ext_meta_parser.cc)
 SET_TARGET_PROPERTIES(ep_testsuite_checkpoint PROPERTIES PREFIX "")
 TARGET_LINK_LIBRARIES(ep_testsuite_checkpoint JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+ADD_DEPENDENCIES(ep_testsuite_checkpoint engine_testapp)
 
 ADD_LIBRARY(ep_testsuite_xdcr SHARED
    tests/ep_testsuite_common.cc
    tests/ep_testsuite_xdcr.cc
    tests/ep_test_apis.cc
-   src/ext_meta_parser.cc)
+   src/ext_meta_parser.cc
+   ${Memcached_SOURCE_DIR}/utilities/string_utilities.cc)
 SET_TARGET_PROPERTIES(ep_testsuite_xdcr PROPERTIES PREFIX "")
-TARGET_LINK_LIBRARIES(ep_testsuite_xdcr JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+TARGET_LINK_LIBRARIES(ep_testsuite_xdcr JSON_checker dirutils platform xattr
+                      ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
+ADD_DEPENDENCIES(ep_testsuite_xdcr engine_testapp)
 
 ADD_LIBRARY(ep_perfsuite SHARED
    tests/ep_perfsuite.cc
@@ -424,6 +435,7 @@ ADD_LIBRARY(ep_perfsuite SHARED
    tests/mock/mock_dcp.cc)
 SET_TARGET_PROPERTIES(ep_perfsuite PROPERTIES PREFIX "")
 TARGET_LINK_LIBRARIES(ep_perfsuite dirutils platform)
+ADD_DEPENDENCIES(ep_perfsuite engine_testapp)
 
 #ADD_CUSTOM_COMMAND(OUTPUT
 #                     ${CMAKE_CURRENT_BINARY_DIR}/generated_suite_0.c
@@ -477,6 +489,7 @@ INSTALL(PROGRAMS
 
 INSTALL(FILES
         management/clitool.py
+        management/cli_auth_utils.py
         management/mc_bin_client.py
         management/mc_bin_server.py
         management/memcacheConstants.py
@@ -494,41 +507,41 @@ INSTALL(TARGETS ep
         ARCHIVE DESTINATION lib)
 
 
-# Defines a testsuite which runs in full and value eviction variants.
-FUNCTION(ADD_TESTSUITE value_evict_name full_evict_name testsuite_so timeout)
+# Defines a testsuite which runs in full and value eviction
+# (persistent), ephemeral buckets (which has no eviction).
+FUNCTION(ADD_TESTSUITE name timeout)
+  CMAKE_PARSE_ARGUMENTS("arg" "SKIP_EPHEMERAL" "" "" ${ARGN} )
+
   SET(_cmdline
-      ${CMAKE_BINARY_DIR}/memcached/engine_testapp -E ${CMAKE_CURRENT_BINARY_DIR}/ep.so -T ${testsuite_so})
-  ADD_TEST(NAME ${value_evict_name}
-         COMMAND ${_cmdline} -v -e "dbname=./${value_evict_name}")
-  ADD_TEST(NAME ${full_evict_name}
-         COMMAND ${_cmdline} -v -e "item_eviction_policy=full_eviction$<SEMICOLON>dbname=./${full_evict_name}")
-  SET_TESTS_PROPERTIES(${value_evict_name} PROPERTIES TIMEOUT ${timeout})
-  SET_TESTS_PROPERTIES(${full_evict_name} PROPERTIES TIMEOUT ${timeout})
+    ${CMAKE_BINARY_DIR}/memcached/engine_testapp
+        -E ${CMAKE_CURRENT_BINARY_DIR}/ep.so
+        -T ${CMAKE_CURRENT_BINARY_DIR}/${name}.so)
+  ADD_TEST(NAME ${name}.value_eviction
+         COMMAND ${_cmdline} -v -e "dbname=./${name}.value_eviction.db")
+  ADD_TEST(NAME ${name}.full_eviction
+         COMMAND ${_cmdline} -v -e "item_eviction_policy=full_eviction$<SEMICOLON>dbname=./${name}.full_eviction.db")
+  SET_TESTS_PROPERTIES(${name}.value_eviction PROPERTIES TIMEOUT ${timeout})
+  SET_TESTS_PROPERTIES(${name}.full_eviction PROPERTIES TIMEOUT ${timeout})
+
+  IF(NOT arg_SKIP_EPHEMERAL)
+    ADD_TEST(NAME ${name}.ephemeral
+             COMMAND ${_cmdline} -v -e "bucket_type=ephemeral$<SEMICOLON>dbname=./${name}.ephemeral.db")
+    SET_TESTS_PROPERTIES(${name}.ephemeral PROPERTIES TIMEOUT ${timeout})
+  ENDIF()
 ENDFUNCTION()
 
-ADD_TESTSUITE(ep-engine_engine_tests ep-engine_full_eviction_tests
-              ${CMAKE_CURRENT_BINARY_DIR}/ep_testsuite.so
-              1800)
+ADD_TESTSUITE(ep_testsuite 1800)
 
-ADD_TESTSUITE(ep-engine_basic_tests ep-engine_full_eviction_basic
-              ${CMAKE_CURRENT_BINARY_DIR}/ep_testsuite_basic.so
-              600)
+ADD_TESTSUITE(ep_testsuite_basic 600)
 
-ADD_TESTSUITE(ep-engine_dcp_tests ep-engine_full_eviction_dcp
-              ${CMAKE_CURRENT_BINARY_DIR}/ep_testsuite_dcp.so
-              1200)
+ADD_TESTSUITE(ep_testsuite_dcp 1200)
 
-ADD_TESTSUITE(ep-engine_value_eviction_tap ep-engine_full_eviction_tap
-              ${CMAKE_CURRENT_BINARY_DIR}/ep_testsuite_tap.so
-              180)
+# TAP not supported for Ephemeral buckets.
+ADD_TESTSUITE(ep_testsuite_tap 180 SKIP_EPHEMERAL)
 
-ADD_TESTSUITE(ep-engine_value_eviction_checkpoint ep-engine_full_eviction_checkpoint
-              ${CMAKE_CURRENT_BINARY_DIR}/ep_testsuite_checkpoint.so
-              120)
+ADD_TESTSUITE(ep_testsuite_checkpoint 120)
 
-ADD_TESTSUITE(ep-engine_value_eviction_xdcr ep-engine_full_eviction_xdcr
-              ${CMAKE_CURRENT_BINARY_DIR}/ep_testsuite_xdcr.so
-              120)
+ADD_TESTSUITE(ep_testsuite_xdcr 120)
 
 # ================================ PERF_TESTS ================================ #
 SET(_ep_perfsuite_cmdline ${CMAKE_BINARY_DIR}/memcached/engine_testapp -E ${CMAKE_CURRENT_BINARY_DIR}/ep.so -T ${CMAKE_CURRENT_BINARY_DIR}/ep_perfsuite.so -v)
@@ -541,8 +554,11 @@ ADD_CUSTOM_TARGET(test-perfsuite
                           ep
                           ep_perfsuite
                   VERBATIM)
-ADD_TEST(NAME ep-engine_perfsuite
-         COMMAND ${_ep_perfsuite_cmdline} -e "dbname=./value_eviction_perf")
+ADD_TEST(NAME ep_perfsuite.value_eviction
+         COMMAND ${_ep_perfsuite_cmdline} -e "dbname=./ep_perfsuite.value_eviction.db")
+
+ADD_TEST(NAME ep_perfsuite.ephemeral
+         COMMAND ${_ep_perfsuite_cmdline} -e "bucket_type=ephemeral$<SEMICOLON>dbname=./ep_perfsuite.ephemeral.db")
 
 # ============================================================================ #
 
index c9b60a5..2bb1417 100644 (file)
--- a/Doxyfile
+++ b/Doxyfile
@@ -574,7 +574,7 @@ WARN_LOGFILE           =
 # directories like "/usr/src/myproject". Separate the files or directories
 # with spaces.
 
-INPUT                  = src/ src/atomic src/blackhole-kvstore src/couch-kvstore src/iomanager
+INPUT                  = src/
 
 # This tag can be used to specify the character encoding of the source files
 # that doxygen parses. Internally doxygen uses the UTF-8 encoding, which is
@@ -597,7 +597,7 @@ FILE_PATTERNS          =
 # should be searched for input files as well. Possible values are YES and NO.
 # If left blank NO is used.
 
-RECURSIVE              = NO
+RECURSIVE              = YES
 
 # The EXCLUDE tag can be used to specify files and/or directories that should
 # excluded from the INPUT source files. This way you can easily exclude a
index 0b83470..2c3071a 100644 (file)
--- a/README.md
+++ b/README.md
@@ -10,40 +10,38 @@ thread exist.
 
 ## Synchronisation Primitives
 
-There are three mutual-exclusion primitives available in ep-engine.
+There are two mutual-exclusion primitives available in ep-engine (in
+addition to those provided by the C++ standard library):
 
-1. `Mutex` exclusive lock - [mutex.h](./src/mutex.h)
-2. `RWLock` shared, reader/writer lock - [rwlock.h](./src/rwlock.h)
-3. `SpinLock` 1-byte exclusive lock - [atomix.h](./src/atomic.h)
+1. `RWLock` shared, reader/writer lock - [rwlock.h](./src/rwlock.h)
+2. `SpinLock` 1-byte exclusive lock - [atomix.h](./src/atomic.h)
 
-A conditional-variable is also available called `SyncObject`
-[syncobject.h](./src/syncobject.h). `SyncObject` glues a `Mutex` and
-conditional-variable together in one object.
+A condition-variable is also available called `SyncObject`
+[syncobject.h](./src/syncobject.h). `SyncObject` glues a `std::mutex` and
+`std::condition_variable` together in one object.
 
 These primitives are managed via RAII wrappers - [locks.h](./src/locks.h).
 
-1. `LockHolder` - for acquiring a `Mutex` or `SyncObject`.
-2. `MultiLockHolder` - for acquiring an array of `Mutex` or `SyncObject`.
-3. `WriterLockHolder` - for acquiring write access to a `RWLock`.
-4. `ReaderLockHolder` - for acquiring read access to a `RWLock`.
-5. `SpinLockHolder` - for acquiring a `SpinLock`.
+1. `LockHolder` - a deprecated alias for std::lock_guard
+2. `MultiLockHolder` - for acquiring an array of `std::mutex` or `SyncObject`.
 
-## Mutex
-The general style is to create a `LockHolder` when you need to acquire a
-`Mutex`, the constructor will acquire and when the `LockHolder` goes out of
-scope, the destructor will release the `Mutex`. For certain use-cases the
-caller can explicitly lock/unlock a `Mutex` via the `LockHolder` class.
+### Mutex
+The general style is to create a `std::lock_guard` when you need to acquire a
+`std::mutex`, the constructor will acquire and when the `lock_guard` goes out of
+scope, the destructor will release the `std::mutex`. For certain use-cases the
+caller can explicitly lock/unlock a `std::mutex` via the `std::unique_lock`
+class.
 
 ```c++
-Mutex mutex;
+std::mutex mutex;
 void example1() {
-    LockHolder lockHolder(&mutex);
+    std::lock_guard<std::mutex> lockHolder(mutex);
     ...
     return;
 }
 
 void example2() {
-    LockHolder lockHolder(&mutex);
+    std::unique_lock<std::mutex> lockHolder(mutex);
     ...
     lockHolder.unlock();
     ...
@@ -58,7 +56,7 @@ released, and similarly to `LockHolder` the caller can choose to manually
 lock/unlock at any time (with all locks locked/unlocked via one call).
 
 ```c++
-Mutex mutexes[10];
+std::mutex mutexes[10];
 Object objects[10];
 void foo() {
     MultiLockHolder lockHolder(&mutexes, 10);
@@ -69,33 +67,34 @@ void foo() {
 }
 ```
 
-## RWLock
+### RWLock
 
 `RWLock` allows many readers to acquire it and exclusive access for a writer.
-`ReadLockHolder` acquires the lock for a reader and `WriteLockHolder` acquires
-the lock for a writer. Neither classes enable manual lock/unlock, all
-acquisitions and release are performed via the constructor and destructor.
+Like a std::mutex `RWLock` can be used with a std::lock_guard. The RWLock can
+either be explicitly casted to a `ReaderLock` / `WriterLock` through its
+`reader()` and `writer()` member functions or you can rely on the implicit
+conversions used by the `lock_guard` constructor.
 
 ```c++
 RWLock rwLock;
 Object thing;
 
 void foo1() {
-    ReaderLockHolder rlh(&rwLock);
+    std::lock_guard<ReaderLock> rlh(rwLock);
     if (thing.getData()) {
     ...
     }
 }
 
 void foo2() {
-    WriterLockHolder wlh(&rwLock);
+    std::lock_guard<WriterLock> wlh(rwLock);
     thing.setData(...);
 }
 ```
 
-## SyncObject
+### SyncObject
 
-`SyncObject` inherits from `Mutex` and is thus managed via a `LockHolder` or
+`SyncObject` inherits from `std::mutex` and is thus managed via a `LockHolder` or
 `MultiLockHolder`. The `SyncObject` provides the conditional-variable
 synchronisation primitive enabling threads to block and be woken.
 
@@ -123,24 +122,24 @@ void foo2() {
 }
 ```
 
-## SpinLock
+### SpinLock
 
 A `SpinLock` uses a single byte for the lock and our own code to spin until the
 lock is acquired. The intention for this lock is for low contention locks.
 
-The RAII pattern is just like for a Mutex.
+The RAII pattern is just like for a mutex.
 
 
 ```c++
 SpinLock spinLock;
 void example1() {
-    SpinLockHolder lockHolder(&spinLock);
+    std::lock_guard<SpinLock> lockHolder(&spinLock);
     ...
     return;
 }
 ```
 
-## _UNLOCKED convention
+### _UNLOCKED convention
 
 ep-engine has a function naming convention that indicates the function should
 be called with a lock acquired.
@@ -159,6 +158,18 @@ void Object::run() {
     return;
 }
 ```
+
+## Atomic / thread-safe data structures
+
+In addition to the basic synchronization primitives described above,
+there are also the following higher-level data structures which
+support atomic / thread-safe access from multiple threads:
+
+1. `AtomicQueue`: thread-safe, approximate-FIFO queue, optimized for
+   multiple-writers, one reader - [atomicqueue.h](./src/atomicqueue.h)
+2. `AtomicUnorderedMap` : thread-safe unordered map -
+   [atomic_unordered_map.h](./src/atomic_unordered_map.h)
+
 ## Thread Local Storage (ObjectRegistry).
 
 Threads in ep-engine are servicing buckets and when a thread is dispatched to
diff --git a/benchmarks/access_scanner_bench.cc b/benchmarks/access_scanner_bench.cc
new file mode 100644 (file)
index 0000000..ca6cd76
--- /dev/null
@@ -0,0 +1,178 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include <access_scanner.h>
+#include <benchmark/benchmark.h>
+#include <fakes/fake_executorpool.h>
+#include <mock/mock_synchronous_ep_engine.h>
+#include <programs/engine_testapp/mock_server.h>
+#include "benchmark_memory_tracker.h"
+#include "dcp/dcpconnmap.h"
+
+class EngineFixture : public benchmark::Fixture {
+protected:
+    void SetUp(const benchmark::State& state) override {
+        SingleThreadedExecutorPool::replaceExecutorPoolWithFake();
+        executorPool = reinterpret_cast<SingleThreadedExecutorPool*>(
+                ExecutorPool::get());
+        memoryTracker = BenchmarkMemoryTracker::getInstance(
+                *get_mock_server_api()->alloc_hooks);
+        memoryTracker->reset();
+        std::string config = "dbname=benchmarks-test;" + varConfig;
+
+        engine.reset(new SynchronousEPEngine(config));
+        ObjectRegistry::onSwitchThread(engine.get());
+
+        engine->setKVBucket(
+                engine->public_makeBucket(engine->getConfiguration()));
+
+        engine->public_initializeEngineCallbacks();
+        initialize_time_functions(get_mock_server_api()->core);
+        cookie = create_mock_cookie();
+    }
+
+    void TearDown(const benchmark::State& state) override {
+        executorPool->cancelAndClearAll();
+        destroy_mock_cookie(cookie);
+        destroy_mock_event_callbacks();
+        engine->getDcpConnMap().manageConnections();
+        engine.reset();
+        ObjectRegistry::onSwitchThread(nullptr);
+        ExecutorPool::shutdown();
+        memoryTracker->destroyInstance();
+    }
+
+    Item make_item(uint16_t vbid,
+                   const std::string& key,
+                   const std::string& value) {
+        uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_DATATYPE_JSON};
+        Item item({key, DocNamespace::DefaultCollection},
+                  /*flags*/ 0,
+                  /*exp*/ 0,
+                  value.c_str(),
+                  value.size(),
+                  ext_meta,
+                  sizeof(ext_meta));
+        item.setVBucketId(vbid);
+        return item;
+    }
+
+    std::unique_ptr<SynchronousEPEngine> engine;
+    const void* cookie = nullptr;
+    const int vbid = 0;
+
+    // Allows subclasses to add stuff to the config
+    std::string varConfig;
+    BenchmarkMemoryTracker* memoryTracker;
+    SingleThreadedExecutorPool* executorPool;
+};
+
+class AccessLogBenchEngine : public EngineFixture {
+protected:
+    void SetUp(const benchmark::State& state) override {
+        // If the access scanner is running then it will always scan
+        varConfig = "alog_resident_ratio_threshold=100;";
+        varConfig += "alog_max_stored_items=" +
+                     std::to_string(alog_max_stored_items);
+        EngineFixture::SetUp(state);
+    }
+
+    const size_t alog_max_stored_items = 2048;
+};
+
+ProcessClock::time_point runNextTask(SingleThreadedExecutorPool* pool,
+                                     task_type_t taskType,
+                                     std::string expectedTask) {
+    CheckedExecutor executor(pool, *pool->getLpTaskQ()[taskType]);
+    executor.runCurrentTask(expectedTask);
+    return executor.completeCurrentTask();
+}
+
+/*
+ * Varies whether the access scanner is running or not. Also varies the
+ * number of items stored in the vbucket. The purpose of this benchmark is to
+ * measure the maximum memory usage of the access scanner.
+ * Variables:
+ *  - range(0) : Whether to run access scanner constantly (0: no, 1: yes)
+ *  - range(1) : The number of items to fill the vbucket with
+ *
+ */
+BENCHMARK_DEFINE_F(AccessLogBenchEngine, MemoryOverhead)
+(benchmark::State& state) {
+    ExTask task = nullptr;
+    engine->getKVBucket()->setVBucketState(0, vbucket_state_active, false);
+    if (state.range(0) == 1) {
+        state.SetLabel("AccessScanner");
+        task = make_STRCPtr<AccessScanner>(
+                *(engine->getKVBucket()), engine->getEpStats(), 1000);
+        ExecutorPool::get()->schedule(task);
+    } else {
+        state.SetLabel("Control");
+    }
+
+    // The content of each doc, nothing too large
+    std::string value(200, 'x');
+
+    // We have a key prefix so that our keys are more realistic in length
+    std::string keyPrefixPre(20, 'a');
+
+    for (int i = 0; i < state.range(1); ++i) {
+        auto item = make_item(vbid, keyPrefixPre + std::to_string(i), value);
+        engine->getKVBucket()->set(item, cookie);
+    }
+    size_t baseMemory = memoryTracker->getCurrentAlloc();
+    while (state.KeepRunning()) {
+        if (state.range(0) == 1) {
+            executorPool->wake(task->getId());
+            runNextTask(executorPool,
+                        AUXIO_TASK_IDX,
+                        "Generating access "
+                        "log");
+            runNextTask(executorPool,
+                        AUXIO_TASK_IDX,
+                        "Item Access Scanner on"
+                        " vb 0");
+        }
+    }
+    state.counters["MaxBytesAllocatedPerItem"] =
+            (memoryTracker->getMaxAlloc() - baseMemory) / alog_max_stored_items;
+}
+
+static void AccessScannerArguments(benchmark::internal::Benchmark* b) {
+    std::array<int, 2> numItems{{32768, 65536}};
+    for (int j : numItems) {
+        b->ArgPair(0, j);
+        b->ArgPair(1, j);
+    }
+}
+
+BENCHMARK_REGISTER_F(AccessLogBenchEngine, MemoryOverhead)
+        ->Apply(AccessScannerArguments)
+        ->MinTime(0.000001);
+
+static char allow_no_stats_env[] = "ALLOW_NO_STATS_UPDATE=yeah";
+int main(int argc, char** argv) {
+    putenv(allow_no_stats_env);
+    mock_init_alloc_hooks();
+    init_mock_server(true);
+    HashTable::setDefaultNumLocks(47);
+    initialize_time_functions(get_mock_server_api()->core);
+    ::benchmark::Initialize(&argc, argv);
+    ::benchmark::RunSpecifiedBenchmarks();
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/benchmarks/benchmark_memory_tracker.cc b/benchmarks/benchmark_memory_tracker.cc
new file mode 100644 (file)
index 0000000..49401d8
--- /dev/null
@@ -0,0 +1,106 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "benchmark_memory_tracker.h"
+#include <objectregistry.h>
+#include <utility.h>
+
+#include <algorithm>
+
+std::atomic<BenchmarkMemoryTracker*> BenchmarkMemoryTracker::instance;
+std::mutex BenchmarkMemoryTracker::instanceMutex;
+std::atomic<size_t> BenchmarkMemoryTracker::maxTotalAllocation;
+std::atomic<size_t> BenchmarkMemoryTracker::currentAlloc;
+
+BenchmarkMemoryTracker::~BenchmarkMemoryTracker() {
+    hooks_api.remove_new_hook(&NewHook);
+    hooks_api.remove_delete_hook(&DeleteHook);
+}
+
+BenchmarkMemoryTracker* BenchmarkMemoryTracker::getInstance(
+        const ALLOCATOR_HOOKS_API& hooks_api_) {
+    BenchmarkMemoryTracker* tmp = instance.load();
+    if (tmp == nullptr) {
+        std::lock_guard<std::mutex> lock(instanceMutex);
+        tmp = instance.load();
+        if (tmp == nullptr) {
+            tmp = new BenchmarkMemoryTracker(hooks_api_);
+            instance.store(tmp);
+
+            instance.load()->connectHooks();
+        }
+    }
+    return tmp;
+}
+
+void BenchmarkMemoryTracker::destroyInstance() {
+    std::lock_guard<std::mutex> lock(instanceMutex);
+    BenchmarkMemoryTracker* tmp = instance.load();
+    if (tmp != nullptr) {
+        delete tmp;
+        instance = nullptr;
+    }
+}
+
+size_t BenchmarkMemoryTracker::getMaxAlloc() {
+    return maxTotalAllocation;
+}
+
+size_t BenchmarkMemoryTracker::getCurrentAlloc() {
+    return currentAlloc;
+}
+
+BenchmarkMemoryTracker::BenchmarkMemoryTracker(
+        const ALLOCATOR_HOOKS_API& hooks_api)
+    : hooks_api(hooks_api) {
+}
+void BenchmarkMemoryTracker::connectHooks() {
+    if (hooks_api.add_new_hook(&NewHook)) {
+        LOG(EXTENSION_LOG_DEBUG, "Registered add hook");
+        if (hooks_api.add_delete_hook(&DeleteHook)) {
+            LOG(EXTENSION_LOG_DEBUG, "Registered delete hook");
+            return;
+        }
+        hooks_api.remove_new_hook(&NewHook);
+    }
+    LOG(EXTENSION_LOG_WARNING, "Failed to register allocator hooks");
+}
+void BenchmarkMemoryTracker::NewHook(const void* ptr, size_t) {
+    if (ptr != NULL) {
+        const auto* tracker = BenchmarkMemoryTracker::instance.load();
+        void* p = const_cast<void*>(ptr);
+        size_t alloc = tracker->hooks_api.get_allocation_size(p);
+        currentAlloc += alloc;
+        maxTotalAllocation.store(
+                std::max(currentAlloc.load(), maxTotalAllocation.load()));
+        ObjectRegistry::memoryAllocated(alloc);
+    }
+}
+void BenchmarkMemoryTracker::DeleteHook(const void* ptr) {
+    if (ptr != NULL) {
+        const auto* tracker = BenchmarkMemoryTracker::instance.load();
+        void* p = const_cast<void*>(ptr);
+        size_t alloc = tracker->hooks_api.get_allocation_size(p);
+        currentAlloc -= alloc;
+        ObjectRegistry::memoryDeallocated(alloc);
+    }
+}
+
+void BenchmarkMemoryTracker::reset() {
+    currentAlloc.store(0);
+    maxTotalAllocation.store(0);
+}
diff --git a/benchmarks/benchmark_memory_tracker.h b/benchmarks/benchmark_memory_tracker.h
new file mode 100644 (file)
index 0000000..174f210
--- /dev/null
@@ -0,0 +1,58 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include <memcached/allocator_hooks.h>
+#include <atomic>
+#include <mutex>
+
+/*
+ * A singleton which tracks memory usage for use in benchmarks.
+ *
+ * This class provides hooks for new and delete which are registered when the
+ * singleton is created.
+ *
+ * Tracks the current allocation along with the maximum total allocation size
+ * it has seen.
+ */
+class BenchmarkMemoryTracker {
+public:
+    ~BenchmarkMemoryTracker();
+
+    static BenchmarkMemoryTracker* getInstance(
+            const ALLOCATOR_HOOKS_API& hooks_api_);
+
+    static void destroyInstance();
+
+    void reset();
+
+    size_t getMaxAlloc();
+    size_t getCurrentAlloc();
+
+private:
+    BenchmarkMemoryTracker(const ALLOCATOR_HOOKS_API& hooks_api);
+    void connectHooks();
+    static void NewHook(const void* ptr, size_t);
+    static void DeleteHook(const void* ptr);
+
+    static std::atomic<BenchmarkMemoryTracker*> instance;
+    static std::mutex instanceMutex;
+    ALLOCATOR_HOOKS_API hooks_api;
+    static std::atomic<size_t> maxTotalAllocation;
+    static std::atomic<size_t> currentAlloc;
+};
diff --git a/benchmarks/defragmenter_bench.cc b/benchmarks/defragmenter_bench.cc
new file mode 100644 (file)
index 0000000..99c0ab7
--- /dev/null
@@ -0,0 +1,137 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "defragmenter_visitor.h"
+#include "tests/module_tests/defragmenter_test.h"
+#include "tests/module_tests/test_helpers.h"
+
+#include <gtest/gtest.h>
+#include <valgrind/valgrind.h>
+
+
+/* Measure the rate at which the defragmenter can defragment documents, using
+ * the given age threshold.
+ *
+ * Setup a Defragmenter, then time how long it takes to visit them all
+ * documents in the given vbucket, npasses times.
+ */
+static size_t benchmarkDefragment(VBucket& vbucket, size_t passes,
+                                  uint8_t age_threshold,
+                                  size_t chunk_duration_ms) {
+    // Create and run visitor for the specified number of iterations, with
+    // the given age.
+    DefragmentVisitor visitor(age_threshold);
+    hrtime_t start = gethrtime();
+    for (size_t i = 0; i < passes; i++) {
+        // Loop until we get to the end; this may take multiple chunks depending
+        // on the chunk_duration.
+        HashTable::Position pos;
+        while (pos != vbucket.ht.endPosition()) {
+            visitor.setDeadline(gethrtime() +
+                                (chunk_duration_ms * 1000 * 1000));
+            pos = vbucket.ht.pauseResumeVisit(visitor, pos);
+        }
+    }
+    hrtime_t end = gethrtime();
+    size_t visited = visitor.getVisitedCount();
+
+    double duration_s = (end - start) / double(1000 * 1000 * 1000);
+    return size_t(visited / duration_s);
+}
+
+
+class DefragmenterBenchmarkTest : public DefragmenterTest {
+protected:
+    /* Fill the bucket with the given number of docs. Returns the rate at which
+     * items were added.
+     */
+    size_t populateVbucket() {
+        // How many items to create in the VBucket. Use a large number for
+        // normal runs when measuring performance, but a very small number
+        // (enough for functional testing) when running under Valgrind
+        // where there's no sense in measuring performance.
+        const size_t ndocs = RUNNING_ON_VALGRIND ? 10 : 500000;
+
+        /* Set the hashTable to a sensible size */
+        vbucket->ht.resize(ndocs);
+
+        /* Store items */
+        char value[256];
+        hrtime_t start = gethrtime();
+        for (size_t i = 0; i < ndocs; i++) {
+            std::string key = "key" + std::to_string(i);
+            Item item(makeStoredDocKey(key), 0, 0, value, sizeof(value));
+            public_processSet(item, 0);
+        }
+        hrtime_t end = gethrtime();
+
+        // Let hashTable set itself to correct size, post-fill
+        vbucket->ht.resize();
+
+        double duration_s = (end - start) / double(1000 * 1000 * 1000);
+        return size_t(ndocs / duration_s);
+    }
+
+};
+
+TEST_P(DefragmenterBenchmarkTest, Populate) {
+    size_t populateRate = populateVbucket();
+    RecordProperty("items_per_sec", populateRate);
+}
+
+TEST_P(DefragmenterBenchmarkTest, Visit) {
+    populateVbucket();
+    const size_t one_minute = 60 * 1000;
+    size_t visit_rate = benchmarkDefragment(*vbucket, 1,
+                                            std::numeric_limits<uint8_t>::max(),
+                                            one_minute);
+    RecordProperty("items_per_sec", visit_rate);
+}
+
+TEST_P(DefragmenterBenchmarkTest, DefragAlways) {
+    populateVbucket();
+    const size_t one_minute = 60 * 1000;
+    size_t defrag_always_rate = benchmarkDefragment(*vbucket, 1, 0,
+                                                    one_minute);
+    RecordProperty("items_per_sec", defrag_always_rate);
+}
+
+TEST_P(DefragmenterBenchmarkTest, DefragAge10) {
+    populateVbucket();
+    const size_t one_minute = 60 * 1000;
+    size_t defrag_age10_rate = benchmarkDefragment(*vbucket, 1, 10,
+                                                   one_minute);
+    RecordProperty("items_per_sec", defrag_age10_rate);
+}
+
+TEST_P(DefragmenterBenchmarkTest, DefragAge10_20ms) {
+    populateVbucket();
+    size_t defrag_age10_20ms_rate = benchmarkDefragment(*vbucket, 1, 10, 20);
+    RecordProperty("items_per_sec", defrag_age10_20ms_rate);
+}
+
+INSTANTIATE_TEST_CASE_P(
+        FullAndValueEviction,
+        DefragmenterBenchmarkTest,
+        ::testing::Values(VALUE_ONLY, FULL_EVICTION),
+        [](const ::testing::TestParamInfo<item_eviction_policy_t>& info) {
+            if (info.param == VALUE_ONLY) {
+                return "VALUE_ONLY";
+            } else {
+                return "FULL_EVICTION";
+            }
+        });
index 5c4f6ce..f90692d 100644 (file)
@@ -4,18 +4,27 @@
             "default": "4096",
             "descr": "Logging block size.",
             "dynamic": false,
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "bucket_type": "persistent"
+            }
         },
         "alog_path": {
             "default": "",
             "descr": "Path to the access log.",
             "dynamic": false,
-            "type": "std::string"
+            "type": "std::string",
+            "requires": {
+                "bucket_type": "persistent"
+            }
         },
         "access_scanner_enabled": {
             "default": "true",
             "descr": "True if access scanner task is enabled",
-            "type": "bool"
+            "type": "bool",
+            "requires": {
+                "bucket_type": "persistent"
+            }
         },
         "alog_sleep_time": {
             "default": "1440",
@@ -26,6 +35,9 @@
                     "max": 4320,
                     "min": 1
                 }
+            },
+            "requires": {
+                "bucket_type": "persistent"
             }
         },
         "alog_task_time": {
@@ -37,6 +49,9 @@
                     "max": 23,
                     "min": 0
                 }
+            },
+            "requires": {
+                "bucket_type": "persistent"
             }
         },
         "alog_resident_ratio_threshold": {
                     "max": 100,
                     "min": 0
                 }
+            },
+            "requires": {
+                "bucket_type": "persistent"
+            }
+        },
+        "alog_max_stored_items": {
+            "default": "1024",
+            "desr": "The maximum number of items the Access Scanner will hold in memory before commiting them to disk",
+            "type": "size_t",
+            "dynamic": false,
+            "validator": {
+                "range": {
+                    "min": 1
+                }
+            },
+            "requires": {
+                "bucket_type": "persistent"
             }
         },
         "backend": {
         "bfilter_key_count": {
             "default": "10000",
             "desr": "Bloomfilter: Estimated key count per vbucket",
-            "type": "size_t"
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "min": 1
+                }
+            }
         },
         "bfilter_fp_prob": {
             "default": "0.01",
                 }
             }
         },
+        "bucket_type": {
+            "default": "persistent",
+            "descr": "Bucket type in the couchbase server",
+            "dynamic": false,
+            "type": "std::string",
+            "validator": {
+                "enum": [
+                         "ephemeral",
+                         "persistent"
+                        ]
+            }
+        },
         "compaction_exp_mem_threshold": {
             "default": "85",
             "desr": "Memory usage threshold after which compaction will not queue expired items for deletion",
             "default": "5",
             "type": "size_t"
         },
+        "collections_prototype_enabled" : {
+            "default": "false",
+            "descr": "Enable the collections functionality. Warning breaks upgrades and compatibility with legacy clients",
+            "type": "bool"
+        },
         "compaction_write_queue_cap": {
             "default": "10000",
             "desr" : "Disk write queue threshold after which compaction tasks will be made to snooze, if there are already pending compaction tasks",
             "descr": "True if merging closed checkpoints is enabled",
             "type": "bool"
         },
+        "ephemeral_full_policy": {
+            "default": "auto_delete",
+            "descr": "How should an Ephemeral bucket becoming full be handled?",
+            "type": "std::string",
+            "validator": {
+                "enum": [
+                    "auto_delete",
+                    "fail_new_data"
+                ]
+            },
+            "requires": {
+                "bucket_type": "ephemeral"
+            }
+        },
+        "ephemeral_metadata_purge_age": {
+            "default": "60",
+            "descr": "Age in seconds after which Ephemeral metadata is purged entirely from memory. Purging disabled if set to -1.",
+            "type": "ssize_t",
+            "requires": {
+                "bucket_type": "ephemeral"
+            }
+        },
+        "ephemeral_metadata_purge_interval": {
+            "default": "60",
+            "descr": "Time in seconds between automatic, periodic runs of the Ephemeral metadata purge task. Periodic purging disabled if set to 0.",
+            "type": "size_t",
+            "requires": {
+                "bucket_type": "ephemeral"
+            }
+        },
         "exp_pager_enabled": {
             "default": "true",
             "descr": "True if expiry pager task is enabled",
             "default": "47",
             "type": "size_t"
         },
+        "ht_resize_interval": {
+            "default": "60",
+            "descr": "Interval in seconds to wait between HashtableResizerTask executions.",
+            "type": "size_t"
+        },
         "ht_size": {
             "default": "0",
             "type": "size_t"
                     "value_only",
                     "full_eviction"
                 ]
+            },
+            "requires": {
+                "bucket_type": "persistent"
             }
         },
         "item_num_based_new_chk": {
             "type": "bool"
         },
         "connection_manager_interval": {
-            "default": "2",
+            "default": "1",
             "descr": "How often connection manager task should be run (in seconds).",
             "type": "size_t",
             "dynamic": false,
             "validator": {
                 "range": {
-                    "min": 2
+                    "min": 1
                 }
             }
         },
             "descr": "maximum number of failover log entries",
             "type": "size_t"
         },
+        "max_item_privileged_bytes": {
+            "default": "(1024 * 1024)",
+            "descr": "Maximum number of bytes allowed for 'privileged' (system) data for an item in addition to the max_item_size bytes",
+            "type": "size_t"
+        },
         "max_item_size": {
             "default": "(20 * 1024 * 1024)",
             "descr": "Maximum number of bytes allowed for an item",
         },
         "max_size": {
             "default": "0",
-            "type": "size_t"
+            "type": "size_t",
+            "aliases":["cache_size"]
         },
         "max_vbuckets": {
             "default": "1024",
                 }
             }
         },
-        "max_num_readers": {
+       "mem_merge_count_threshold" : {
+            "default": "100",
+            "descr": "No.of mem changes after which the thread-local mem is merged to the bucket counter",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 1000000,
+                    "min": 1
+                }
+            }
+        },
+       "mem_merge_bytes_threshold" : {
+            "default": "102400",
+            "descr": "Amount of mem changes after which the thread-local mem is merged to the bucket counter",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max": 104857600,
+                    "min": 1
+                }
+            }
+        },
+        "num_reader_threads": {
             "default": "0",
             "descr": "Throttle max number of reader threads",
             "dynamic": false,
                     "max": 512,
                     "min": 0
                 }
-            }
+            },
+            "aliases":["max_num_readers"]
         },
-        "max_num_writers": {
+        "num_writer_threads": {
             "default": "0",
             "descr": "Throttle max number of writer threads",
             "dynamic": false,
                     "max": 512,
                     "min": 0
                 }
-            }
+            },
+            "aliases":["max_num_writers"]
         },
-        "max_num_auxio": {
+        "num_auxio_threads": {
             "default": "0",
             "descr": "Throttle max number of aux io threads",
             "dynamic": false,
                     "max": 512,
                     "min": 0
                 }
-            }
+            },
+            "aliases":["max_num_auxio"]
         },
-        "max_num_nonio": {
+        "num_nonio_threads": {
             "default": "0",
             "descr": "Throttle max number of non io threads",
             "dynamic": false,
                     "max": 512,
                     "min": 0
                 }
-            }
+            },
+            "aliases":["max_num_nonio"]
         },
         "mem_high_wat": {
             "default": "max",
             "default": "",
             "type": "std::string"
         },
+        "tap": {
+            "default": "true",
+            "descr":"True if the TAP protocol is supported and enabled.",
+            "type": "bool",
+            "dynamic": false
+        },
         "tap_ack_grace_period": {
             "default": "300",
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_ack_initial_sequence_number": {
             "default": "1",
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_ack_interval": {
             "default": "1000",
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_ack_window_size": {
             "default": "10",
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_backfill_resident": {
             "default": "0.9",
-            "type": "float"
+            "type": "float",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_backlog_limit": {
             "default": "5000",
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_backoff_period": {
             "default": "5.0",
-            "type": "float"
+            "type": "float",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_bg_max_pending": {
             "default": "500",
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_keepalive": {
             "default": "0",
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_noop_interval": {
             "default": "200",
             "descr": "Number of seconds between a noop is sent on an idle connection",
-            "type": "size_t"
+            "type": "size_t",
+            "requires": {
+                "tap": true
+            }
         },
         "tap_requeue_sleep_time": {
             "default": "0.1",
-            "type": "float"
+            "type": "float",
+            "requires": {
+                "tap": true
+            }
         },
         "replication_throttle_cap_pcnt": {
             "default": "10",
                 }
             }
         },
-        "dcp_noop_interval": {
-            "default": "180",
-            "descr": "Number of seconds between a noop",
+        "dcp_idle_timeout": {
+            "default": "360",
+            "descr": "The maximum number of seconds between dcp messages before a connection is disconnected",
             "type": "size_t"
         },
+        "dcp_noop_tx_interval": {
+            "default": "1",
+            "descr": "The time interval in seconds between noop messages being sent to the consumer",
+            "type": "size_t",
+            "validator": {
+                "range": {
+                    "max" : 360,
+                    "min" : 1
+                }
+            }
+        },
         "dcp_max_unacked_bytes": {
             "default": "524288",
             "descr": "Amount of processed bytes before an ack is required",
diff --git a/docs/collections.md b/docs/collections.md
new file mode 100644 (file)
index 0000000..5b99968
--- /dev/null
@@ -0,0 +1,259 @@
+
+# Collections
+
+This document contains some brief diagrams to help aid the understanding of
+important state changes within the collection's code and the impact those
+state changes have on persistence and DCP.
+
+## Collection state change diagram
+
+The following diagram shows a collections lifetime when stimuli occurs. The
+stimuli are new JSON Collections::Manifests being applied to the
+Collections::VB::Manifest.  For example a new manifest with a previously unknown
+collection triggers entry into the state diagram, a manifest that now omits a
+known collection would move the known collection to exclusive deleting.
+
+
+Collections state changes:
+```
+        ●
+        │┌────────────────────────┐
+        ├┤ Collection is created  │
+        │└────────────────────────┘
+        ▼
+   .─────────.
+  /           \
+ :  exclusive  : ◀──────────────────────────────┐
+ :    open     :                                │
+  \           /                                 │
+    ─────────                                   │
+        │                                       │┌─────────────────────────┐
+        │                                       ├┤Collection (old revision)│
+        │                                       ││  is completely deleted  │
+        │┌────────────────────────┐             ││   (all items removed)   │
+        ├┤ Collection is deleted  │             │└─────────────────────────┘
+        │└────────────────────────┘             │
+        ▼                                       │
+  .─────────.                                  .─────────.
+ /           \                                /           \
+:  exclusive  : ────────────┬──────────────▶ :  open and   :
+:  deleting   : ┌───────────┴─────────────┐  :  deleting   :
+ \           /  │ Collection of same name │   \           /
+   ─────────    │(new revision) is created│     ─────────
+        │       └─────────────────────────┘
+        │
+        │┌───────────────────────────┐
+        ├┤ Collection is completely  │
+        ││deleted (all items removed)│
+        │└───────────────────────────┘
+        ●
+```
+
+## State diagram with SystemEvents
+
+The same state diagram is now annotated to show the SystemEvents that are
+generated by each state change.
+
+A SystemEvent is a 'special' Item owned by the server, but queued into the
+user's data stream. The SystemEvent allows the flusher and DCP to trigger
+specific actions that will be ordered with the user's data stream.
+
+```
+                          ●
+                          │
+ ┌───────────────────────┐│
+ │   CreateCollection    ├┤
+ └───────────────────────┘▼
+                     .─────────.
+                    /           \
+                   :  exclusive  : ◀──────────────────────────────┐
+                   :    open     :                                │
+                    \           /                                 │
+                      ─────────                                   │
+                          │                                       │
+                          │                                       │
+                          │                                       │
+                          │                                       │
+┌────────────────────────┐│                                       │┌─────────────────────────┐
+│ BeginDeleteCollection  ├┤                                       ├┤  DeleteCollectionSoft   │
+└────────────────────────┘▼          ┌───────────────────────┐    │└─────────────────────────┘
+                    .─────────.      │   CreateCollection    │   .─────────.
+                   /           \     └───────────┬───────────┘  /           \
+                  :  exclusive  : ───────────────┴───────────▶ :  open and   :
+                  :  deleting   :                              :  deleting   :
+                   \           /                                \           /
+                     ─────────                                    ─────────
+                          │
+                          │┌────────────────────────┐
+                          ├┤  DeleteCollectionHard  │
+                          │└────────────────────────┘
+                          │
+                          │
+                          ●
+```
+
+## Collection States and Sequence Numbers
+
+A collection's state is determined by the sequence numbers assigned to the
+collection.
+
+For example, when a collection is created we record the sequence number at which
+we queue the create SystemEvent. And when a collection is deleted we record the
+sequence number at which we queue the delete SystemEvent. This gives us a
+sequence number life-time of a collection.
+
+* Each collection has a `start_seqno` and an `end_seqno`.
+* The `end_seqno` is permitted to have a special value of -6, this value means there is no end.
+
+### Determining states from the seqno start/end:
+* Exclusive Open
+  * `end_seqno < start_seqno`
+* Exclusive Deleting
+  * `end_seqno > start_seqno`
+* Open and Deleting
+  * `end_seqno > 0 && end_seqno < start_seqno`
+
+## VBucket JSON manifest
+
+The code refers to a serialised JSON VB manifest. This is a persisted copy of
+the VB::Manifest which can be used to quickly recover a VBucket's collection
+state following a restart.
+
+The VB::Manifest is also recording the sequence number of the system-events and
+uses those values to track a collection's life time. A VBucket JSON manifest
+looks like the following (although we don't store a formatted document).
+
+```
+{
+  "separator":"::",
+   "collections":[
+      {"name":"$default","revision":"0","start_seqno":"1","end_seqno":"-6"},
+      {"name":"fruit","revision":"1","start_seqno":"13012","end_seqno":"-6"},
+    ]
+}
+```
+
+### SystemEvent keys
+
+The SystemEvents are Item's and thus have keys, they also trigger actions by the
+flusher and DCP as follows, consider the life of the fruit collection which is
+created at revision 1, deleted at revision 2 and created again at revision 3.
+
+* `CreateCollection` key = `$collection::create:fruit1`
+* `BeginDeleteCollection` key = `$collection::delete:fruit1`
+* `DeleteCollectionSoft` key = `$collection::create:fruit1`
+* `DeleteCollectionHard` key = `$collection::create:fruit1`
+* The second create of `fruit` key = `$collection::create:fruit3`
+
+### SystemEvent flushing actions
+
+* `CreateCollection`
+  * Stores a document called `$collection::create:fruit1`
+  * Updates the _local/collections_manifest (A JSON copy of the VB::Manifest)
+* `BeginDeleteCollection`
+  * Updates the _local/collections_manifest (A JSON copy of the VB::Manifest)
+* `DeleteCollectionSoft`
+* Deletes a document called `$collection::create:fruit1`
+  * Updates the _local/collections_manifest (A JSON copy of the VB::Manifest)
+* `DeleteCollectionHard`
+* Deletes a document called `$collection::create:fruit1`
+  * Updates the _local/collections_manifest (A JSON copy of the VB::Manifest)
+
+### SystemEvent DCP actions
+
+* `CreateCollection`
+  * Sends DcpSystem event message containing CreateCollection, collection="fruit" and revision="1"
+* `BeginDeleteCollection`
+  * Nothing
+* `DeleteCollectionSoft`
+  * Sends DcpSystem event message containing DeleteCollection, collection="fruit" and revision="1"
+* `DeleteCollectionHard`
+  * Sends DcpSystem event message containing DeleteCollection, collection="fruit" and revision="1"
+
+## Examples
+
+### create/delete
+
+1. Start with `$default=exclusive open`
+2. Receive (assume VB high-seqno = 200)
+
+   `{"revision":1 "separator":"::","collections":[{"$default", "fruit"}]}`
+
+  * `$default=exclusive open, fruit=exclusive open`
+  * stored a document `$collection::create:fruit1` at seqno 201
+  * _local/collections_manifest
+   ```
+  {"collections":[
+      {"name":"$default","revision":"0","start_seqno":"1","end_seqno":"-6"},
+      {"name":"fruit","revision":"1","start_seqno":"201","end_seqno":"-6"}]}
+   ```
+3. Receive (assume VB high-seqno = 430)
+
+   `{"revision":2 "separator":"::","collections":[{"$default"}]}`
+
+  * `$default=exclusive open, fruit=exclusive deleting`
+  * note BeginCollectionDeletion will trigger a background scrub of the collection's items.
+  * _local/collections_manifest
+   ```
+  {"collections":[
+      {"name":"$default","revision":"0","start_seqno":"1","end_seqno":"-6"},
+      {"name":"fruit","revision":"1","start_seqno":"201","end_seqno":"431"}]}
+
+4. When the background delete of fruit is complete and assuming VB high-seqno = 561
+
+  * `$default=exclusive open'
+  * Deleted a document `$collection::create:fruit1` at seqno 562
+  * _local/collections_manifest
+   ```
+  {"collections":[
+      {"name":"$default","revision":"0","start_seqno":"1","end_seqno":"-6"}]}
+
+### create/delete/create
+
+1. Start with `$default=exclusive open`
+2. Receive (assume VB high-seqno = 836)
+
+   `{"revision":1 "separator":"::","collections":[{"$default", "fruit"}]}`
+
+  * `$default=exclusive open, fruit=exclusive open`
+  * stored a document `$collection::create:fruit1` at seqno 837
+  * _local/collections_manifest
+   ```
+  {"collections":[
+      {"name":"$default","revision":"0","start_seqno":"1","end_seqno":"-6"},
+      {"name":"fruit","revision":"1","start_seqno":"837","end_seqno":"-6"}]}
+   ```
+3. Receive (assume VB high-seqno = 919)
+
+   `{"revision":2 "separator":"::","collections":[{"$default"}]}`
+
+  * `$default=exclusive open, fruit=exclusive deleting`
+  * note BeginCollectionDeletion will trigger a background scrub of the collection's items.
+  * _local/collections_manifest
+   ```
+  {"collections":[
+      {"name":"$default","revision":"0","start_seqno":"1","end_seqno":"-6"},
+      {"name":"fruit","revision":"1","start_seqno":"837","end_seqno":"920"}]}
+
+4. Receive (before the background delete completes, VB high-seqno = 1617)
+
+   `{"revision":3 "separator":"::","collections":[{"$default", "fruit"}]}`
+
+  * `$default=exclusive open, fruit=open and deleting`
+  * stored a document `$collection::create:fruit3` at seqno 1618
+  * _local/collections_manifest
+   ```
+  {"collections":[
+      {"name":"$default","revision":"0","start_seqno":"1","end_seqno":"-6"},
+      {"name":"fruit","revision":"3","start_seqno":"1618","end_seqno":"920"}]}
+   ```
+5. When the background delete of fruit is complete and assuming VB high-seqno = 2010
+
+  * `$default=exclusive open, fruit=exclusive open`
+  * Deleted a document `$collection::create:fruit1` at seqno 2011
+  * _local/collections_manifest
+   ```
+  {"collections":[
+      {"name":"$default","revision":"0","start_seqno":"1","end_seqno":"-6"},
+      {"name":"fruit","revision":"3","start_seqno":"1618","end_seqno":"-6"}]}
+   ```
index 010bdf6..fc54e2a 100644 (file)
@@ -1,13 +1,58 @@
 
-##Delete With Meta (delwithmeta) v4.0
+##Delete With Meta (delwithmeta) v4.6
 
-The delete with meta command is used to delete data with metadata for a key. Meta data passed is cas, sequence number, flags and expiration along with an extended meta data section.
+The delete with meta command is used to delete data with metadata for a key. The
+minimum meta data passed is CAS, revision sequence number, flags and expiration.
+The command can also include an extended meta data section which allows for many
+more fields to be specified.
 
 The request:
 
 * Must have extras
 * Must have key
 
+### Conflict Resolution
+
+A delete with meta command performs conflict resolution based upon the bucket
+configuration flag, `conflict_resolution_mode`. When conflict resolution is
+performed the command's CAS or revision sequence number is compared against any
+existing document's CAS or revision sequence number.
+
+#### conflict_resolution_mode=lww (Last Write Wins)
+
+Last write wins compares the incoming CAS (from Extras) with any existing
+document's CAS, if the CAS doesn't determine a winner, RevSeqno is compared.
+
+```
+if (command.CAS > document.CAS)
+  command succeeds
+else if (command.CAS == document.CAS)
+  // Check the RevSeqno
+  if (command.RevSeqno > document.RevSeqno)
+    command succeeds
+
+command fails
+```
+
+#### conflict_resolution_mode=seqno (Revision Seqno or "Most Writes Wins")
+
+Revision seqno compares the incoming RevSeqno with any existing document's
+RevSeqno. If the RevSeqno doesn't determine a winner, CAS is compared.
+
+```
+if (command.RevSeqno > document.RevSeqno)
+  command succeeds
+else if (command.RevSeqno == document.RevSeqno)
+  // Check the CAS
+  if (command.CAS > document.CAS)
+    command succeeds
+
+command fails
+```
+
+If the compared values are equal
+
+
 ####Binary Implementation
 
     Delete With Meta Binary Request
@@ -40,30 +85,32 @@ The request:
       +---------------+---------------+---------------+---------------+
     44|       00      |       00      |       00      |       1E      |
       +---------------+---------------+---------------+---------------+
-    48|       00      |       00      |       6D      |       79      |
+    48|       00      |       00      |       00      |       02      |
       +---------------+---------------+---------------+---------------+
-    52|       6B      |       65      |       79      |
+    52|       00      |       00      |       6D      |       79      |
+      +---------------+---------------+---------------+---------------+
+    56|       6B      |       65      |       79      |
       +---------------+---------------+---------------+
 
     DEL_WITH_META command
-    Field        (offset) (value)
-    Magic        (0)    : 0x80 (Request)
-    Opcode       (1)    : 0xA8 (Delete With Meta)
-    Key length   (2,3)  : 0x0005 (5)
-    Extra length (4)    : 0x1A (26)
-    Data type    (5)    : 0x00
-    VBucket      (6,7)  : 0x0003 (3)
-    Total body   (8-11) : 0x0000001F (31)
-    Opaque       (12-15): 0x00000000
-    CAS          (16-23): 0x0000000000000000
-    Extras              :
-      Flags      (24-27): 0x00000007 (7)
-      Expiration (28-31): 0x0000000A (10)
-      Seqno      (32-39): 0x0000000000000014 (20)
-      Cas        (40-47): 0x000000000000001E (30)
-      Meta Len   (48-49): 0x0000 (0)
-    Key          (50-54): mykey
-
+    Field         (offset) (value)
+    Magic         (0)    : 0x80 (Request)
+    Opcode        (1)    : 0xA8 (Delete With Meta)
+    Key length    (2,3)  : 0x0005 (5)
+    Extra length  (4)    : 0x1A (26)
+    Data type     (5)    : 0x00
+    VBucket       (6,7)  : 0x0003 (3)
+    Total body    (8-11) : 0x0000001F (31)
+    Opaque        (12-15): 0x00000000
+    CAS           (16-23): 0x0000000000000000
+    Extras               :
+      Flags       (24-27): 0x00000007 (7)
+      Expiration  (28-31): 0x0000000A (10)
+      RevSeqno    (32-39): 0x0000000000000014 (20)
+      Cas         (40-47): 0x000000000000001E (30)
+      Options     (48-51): 0x00000002 (FORCE_ACCEPT_WITH_META_OPS is set)
+      Meta length (52,53): 0x0000 (0)
+    Key           (54-58): mykey
 
     Delete With Meta Binary Response
 
@@ -96,10 +143,60 @@ The request:
     Opaque       (12-15): 0x00000000
     CAS          (16-23): 0x0000000000000001 (1)
 
+###Extras
+The following fields are required by the command:
+* Flags
+* Expiration
+* RevSeqno
+* Cas
+
+The following are optional fields:
+* Options
+* Meta length
+
+Thus valid values for "Extra length" are:
+
+* 24 - The required fields
+* 26 - The required fields and the "Meta length" field
+* 28 - The required fields and the "Options" field
+* 30 - The required fields, "Options" and the "Meta length" field
+
+Any other "Extra length" values will result in `PROTOCOL_BINARY_RESPONSE_EINVAL`
+
+###Options
+
+Set with meta supports a number of options which change the behaviour of the
+command.
+
+* `SKIP_CONFLICT_RESOLUTION_FLAG 0x01`
+* `FORCE_ACCEPT_WITH_META_OPS 0x02`
+* `REGENERATE_CAS 0x04`
+
+The options are encoded in the extras as a 4-byte value. Having no options
+encoded is valid, but certain configurations, like last-write-wins require
+options.
+
+#### SKIP_CONFLICT_RESOLUTION_FLAG
+
+The command will perform no conflict resolution - the incoming delete will
+always occur.
+
+#### FORCE_ACCEPT_WITH_META_OPS
+
+All Last Write Wins buckets require that the client sets the force flag.
+Revision seqno buckets will reject the command if this flag is set.
+
+#### REGENERATE_CAS
+
+This flag must be specified in conjunction with `SKIP_CONFLICT_RESOLUTION_FLAG`.
+When this flag is set the CAS of the document is regenerated by the server.
 
 ###Extended Meta Data Section
 
-The extended meta data section is used to send extra meta data for a particular mutation. This section should come at the very end, after the key. Its length should be set in the nmeta field. A length of 0 means that there is no extended meta data section.
+The extended meta data section is used to send extra meta data for a particular
+mutation. This section should come at the very end, after the key. Its length
+should be set in the "Meta length" field. A length of 0 or the omission of the
+"Meta length" field means that there is no extended meta data section.
 
 ####Verison 1 (0x01)
 
@@ -115,9 +212,8 @@ Here,
 
 **Meta Data IDs:**
 
-* 0x01 - adjusted time
-* 0x02 - conflict resolution mode
-
+* 0x01 - adjusted time. (4.6 ignores this).
+* 0x02 - conflict resolution mode. (4.6 ignores this).
 
 ###Errors
 
@@ -125,6 +221,10 @@ Here,
 
 If a the key does not exist.
 
+**PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS (0x02)**
+
+Failed conflict resolution.
+
 **PROTOCOL_BINARY_RESPONSE_EINVAL (0x04)**
 
 If data in this packet is malformed or incomplete then this error is returned.
index 3443ab6..d743ce8 100644 (file)
@@ -1,12 +1,35 @@
 
-##Get Meta (getmeta) v4.0
+##Get Meta (getmeta) v5.0
 
-The get meta command is used to fetch the meta data for a key. Extras will contain 1 byte set to 0x01, indicating that extended metadata for the key will need to be sent in the response.
+The get meta command is used to fetch the meta data for a key.
 
 The request:
 
-* Must have key
-* Can have extras (1B)
+* Must have a key
+* Can have 1 byte extras.
+
+The Get Meta command on success will return the document's
+
+* Deleted flag (indicating if the document deleted).
+* Flags
+* Expiry time
+* Sequence number
+
+### Extras - version1
+
+When the 1 byte extras field contains 0x01...
+
+Prior to v4.6 specifiying version 1 in the extras would mean that GET META would return the document's conflict resolution mode.
+
+v4.6 removed conflict resolution mode and GET META now ignores any V1 requests.
+
+### Extras - version2
+
+When the 1 byte extras field contains 0x02...
+
+v5.0 adds the ability to request the datatype of the document by setting extras to 0x02.
+
+* Returns a 1 byte datatype field in the response
 
 ####Binary Implementation
 
@@ -45,7 +68,7 @@ The request:
     Opaque       (12-15): 0x00000000
     CAS          (16-23): 0x0000000000000000
     Extras              :
-      ReqExtMeta (24)   : 0x01 (1)
+      ReqExtMeta (24)   : 0x02 (2)
     Key          (25-29): mykey
 
     Get Meta Binary Response
@@ -95,15 +118,21 @@ The request:
       Flags      (28-31): 0x00000001 (1)
       Exptime    (32-35): 0x00000007 (7)
       Seqno      (36-43): 0x0000000000000009 (9)
-      ConfRes    (44)   : 0x01 (1)
+      Datatype   (44)   : 0x03 (3)
 
 ###Extended Meta Data Section
 
-The extras section in the response packet will contain 1 extra byte indicating the conflict resolution mode that the item is eligible for. This 1 byte of extra meta information will be sent as part of the response only if the ReqExtMeta flag (set to 0x01) is sent in the request as part of the extras section.
+The extras section in the response packet may contain 1 extra byte indicating the document's datatype. This 1 byte of extra meta information will be sent as part of the response only if the ReqExtMeta flag is sent and set to 0x02.
+
+#### Datatype
 
-####ReqExtMeta (0x01)
+The datatype byte indicates the document's type using a combination of the following flags.
+* `PROTOCOL_BINARY_RAW_BYTES` = 0
+* `PROTOCOL_BINARY_DATATYPE_JSON` = 1
+* `PROTOCOL_BINARY_DATATYPE_SNAPPY` = 2
+* `PROTOCOL_BINARY_DATATYPE_XATTR` = 4
 
-ReqExtMeta: 0x01 in the request's extras' section, will ensure that an extra byte containing the conflict resolution mode will be sent along with the rest of the metadata in the extras section of the response packet.
+Thus a compressed JSON document would have the datatype of 0x03.
 
 ###Errors
 
index 68cafc4..e3d1742 100644 (file)
@@ -1,13 +1,73 @@
 
-##Set With Meta (setwithmeta) v4.0
-
-The set with meta command is used to set data and metadata for a key. Meta data passed is cas, sequence number, flags and expiration along with an extended meta data section.
-
-The request:
+##Set With Meta (setwithmeta) v4.6
+
+The set with meta command is used to set data and metadata for a key. The
+minimum meta data passed is CAS, revision sequence number, flags and expiration.
+The command can also include an extended meta data section which allows for many
+more fields to be specified.
+
+### Conflict Resolution
+
+A set with meta command performs conflict resolution based upon the bucket
+configuration flag, `conflict_resolution_mode`. When conflict resolution is
+performed the command's CAS or revision sequence number is compared against any
+existing document's CAS or revision sequence number.
+
+#### conflict_resolution_mode=lww (Last Write Wins)
+
+Last write wins compares the incoming CAS (from Extras) with any existing
+document's CAS, if the CAS doesn't determine a winner, other metadata is
+compared.
+
+```
+if (command.CAS > document.CAS)
+  command succeeds
+else if (command.CAS == document.CAS)
+  // Check the RevSeqno
+  if (command.RevSeqno > document.RevSeqno)
+    command succeeds
+  else if (command.RevSeqno == document.RevSeqno)
+    // Check the expiry time
+    if (command.Expiry > document.Expiry)
+      command succeeds
+    else if (command.Expiry == document.Expiry)
+      // Finally check flags
+      if (command.Flags < document.Flags)
+        command succeeds
+
+command fails
+```
+
+#### conflict_resolution_mode=seqno (Revision Seqno or "Most Writes Wins")
+
+Revision seqno compares the incoming RevSeqno with any existing document's
+RevSeqno. If the RevSeqno doesn't determine a winner, other metadata is
+compared.
+
+```
+if (command.RevSeqno > document.RevSeqno)
+  command succeeds
+else if (command.RevSeqno == document.RevSeqno)
+  // Check the CAS
+  if (command.CAS > document.CAS)
+    command succeeds
+  else if (command.CAS == document.CAS)
+    // Check the expiry time
+    if (command.Expiry > document.Expiry)
+      command succeeds
+    else if (command.Expiry == document.Expiry)
+      // Finally check flags
+      if (command.Flags < document.Flags)
+        command succeeds
+
+command fails
+```
+
+All set_with_meta requests:
 
 * Must have extras
-* Must have key
-* Must have value
+* Must have key
+* Must have value
 
 ####Binary Implementation
 
@@ -41,35 +101,38 @@ The request:
       +---------------+---------------+---------------+---------------+
     44|       00      |       00      |       00      |       1E      |
       +---------------+---------------+---------------+---------------+
-    48|       00      |       00      |       6D      |       79      |
+    48|       00      |       00      |       00      |       02      |
       +---------------+---------------+---------------+---------------+
-    52|       6B      |       65      |       79      |       6D      |
+    52|       00      |       00      |       6D      |       79      |
       +---------------+---------------+---------------+---------------+
-    56|       79      |       76      |       61      |       6C      |
+    56|       6B      |       65      |       79      |       6D      |
       +---------------+---------------+---------------+---------------+
-    60|       75      |       65      |
+    50|       79      |       76      |       61      |       6C      |
+      +---------------+---------------+---------------+---------------+
+    54|       75      |       65      |
       +---------------+---------------+
 
     SET_WITH_META command
-    Field        (offset) (value)
-    Magic        (0)    : 0x80 (Request)
-    Opcode       (1)    : 0xA2 (setwithmeta)
-    Key length   (2,3)  : 0x0005 (5)
-    Extra length (4)    : 0x1A (26)
-    Data type    (5)    : 0x00
-    VBucket      (6,7)  : 0x0003 (3)
-    Total body   (8-11) : 0x00000026 (38)
-    Opaque       (12-15): 0x00000000
-    CAS          (16-23): 0x0000000000000000
-    Extras              :
-      Flags      (24-27): 0x00000007 (7)
-      Expiration (28-31): 0x0000000A (10)
-      Seqno      (32-39): 0x0000000000000014 (20)
-      Cas        (40-47): 0x000000000000001E (30)
-      Meta Len   (48-49): 0x0000 (0)
-    Key          (50-54): mykey
-    Value        (55-61): myvalue
-
+    Field         (offset) (value)
+    Magic         (0)    : 0x80 (Request)
+    Opcode        (1)    : 0xA2 (setwithmeta)
+    Key length    (2,3)  : 0x0005 (5)
+    Extra length  (4)    : 0x1E (30)
+    Data type     (5)    : 0x00
+    VBucket       (6,7)  : 0x0003 (3)
+    Total body    (8-11) : 0x0000002A (3)
+    Opaque        (12-15): 0x00000000
+    CAS           (16-23): 0x0000000000000000
+    Extras               :
+      Flags       (24-27): 0x00000007 (7)
+      Expiration  (28-31): 0x0000000A (10)
+      RevSeqno    (32-39): 0x0000000000000014 (20)
+      Cas         (40-47): 0x000000000000001E (30)
+      Options     (48-51): 0x00000002 (FORCE_ACCEPT_WITH_META_OPS is set)
+      Meta length (52,53): 0x0000 (0)
+    Key           (54-58): mykey
+    Value         (59-65): myvalue
+    Note: Extended Meta would appear after the value.
 
     Set With Meta Binary Response
 
@@ -102,9 +165,60 @@ The request:
     Opaque       (12-15): 0x00000000
     CAS          (16-23): 0x0000000000000001 (1)
 
+###Extras
+The following fields are required by the command:
+* Flags
+* Expiration
+* RevSeqno
+* Cas
+
+The following are optional fields:
+* Options
+* Meta length
+
+Thus valid values for "Extra length" are:
+
+* 24 - The required fields
+* 26 - The required fields and the "Meta length" field
+* 28 - The required fields and the "Options" field
+* 30 - The required fields, "Options" and the "Meta length" field
+
+Any other "Extra length" values will result in `PROTOCOL_BINARY_RESPONSE_EINVAL`
+
+###Options
+
+Set with meta supports a number of options which change the behaviour of the
+command.
+
+* `SKIP_CONFLICT_RESOLUTION_FLAG 0x01`
+* `FORCE_ACCEPT_WITH_META_OPS 0x02`
+* `REGENERATE_CAS 0x04`
+
+The options are encoded in the extras as a 4-byte value. Having no options
+encoded is valid, but certain configurations, like last-write-wins require
+options.
+
+#### SKIP_CONFLICT_RESOLUTION_FLAG
+
+The command will perform no conflict resolution - the incoming set will always
+occur.
+
+#### FORCE_ACCEPT_WITH_META_OPS
+
+All Last Write Wins buckets require that the client sets the force flag.
+Revision seqno buckets will reject the command if this flag is set.
+
+#### REGENERATE_CAS
+
+This flag must be specified in conjunction with `SKIP_CONFLICT_RESOLUTION_FLAG`.
+When this flag is set the CAS of the document is regenerated by the server.
+
 ###Extended Meta Data Section
 
-The extended meta data section is used to send extra meta data for a particular mutation. This section should come at the very end, after the value. Its length should be set in the nmeta field. A length of 0 means that there is no extended meta data section.
+The extended meta data section is used to send extra meta data for a particular
+mutation. This section should come at the very end, after the value. Its length
+should be set in the "Meta length" field. A length of 0 or the omission of the
+field means that there is no extended meta data section.
 
 ####Verison 1 (0x01)
 
@@ -120,9 +234,8 @@ Here,
 
 **Meta Data IDs:**
 
-* 0x01 - adjusted time
-* 0x02 - conflict resolution mode
-
+* 0x01 - adjusted time (from 4.6 the server ignores this).
+* 0x02 - conflict resolution mode (from 4.6 the server ignores this).
 
 ###Errors
 
@@ -130,6 +243,10 @@ Here,
 
 If a the key does not exist.
 
+**PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS (0x02)**
+
+Failed conflict resolution.
+
 **PROTOCOL_BINARY_RESPONSE_EINVAL (0x04)**
 
 If data in this packet is malformed or incomplete then this error is returned.
index 51b07ae..21193d1 100644 (file)
@@ -56,6 +56,7 @@ For introductory information on stats within Couchbase, start with the
 | ep_item_flush_expired              | Number of times an item is not flushed |
 |                                    | due to the expiry of the item          |
 | ep_queue_size                      | Number of items queued for storage     |
+| ep_vb_backfill_queue_size          | Number of items in vb backfill queues  |
 | ep_flusher_todo                    | Number of items currently being        |
 |                                    | written                                |
 | ep_flusher_state                   | Current state of the flusher thread    |
@@ -73,7 +74,7 @@ For introductory information on stats within Couchbase, start with the
 |                                    | a vbucket                              |
 | ep_pending_compactions             | Number of pending vbucket compactions  |
 | ep_rollback_count                  | Number of rollbacks on consumer        |
-| ep_flush_duration_total            | Cumulative seconds spent flushing      |
+| ep_flush_duration_total            | Cumulative milliseconds spent flushing |
 | ep_flush_all                       | True if disk flush_all is scheduled    |
 | ep_num_ops_get_meta                | Number of getMeta operations           |
 | ep_num_ops_set_meta                | Number of setWithMeta operations       |
@@ -129,6 +130,7 @@ For introductory information on stats within Couchbase, start with the
 |                                    | enabled                                |
 | ep_bg_fetched                      | Number of items fetched from disk      |
 | ep_bg_meta_fetched                 | Number of meta items fetched from disk |
+| ep_bg_remaining_items              | Number of remaining bg fetch items     |
 | ep_bg_remaining_jobs               | Number of remaining bg fetch jobs      |
 | ep_max_bg_remaining_jobs           | Max number of remaining bg fetch jobs  |
 |                                    | that we have seen in the queue so far  |
@@ -214,6 +216,7 @@ For introductory information on stats within Couchbase, start with the
 |                                    | switches modes from accounting just    |
 |                                    | non resident items and deletes to      |
 |                                    | accounting all items                   |
+| ep_bucket_type                     | The bucket type                        |
 | ep_chk_max_items                   | The number of items allowed in a       |
 |                                    | checkpoint before a new one is created |
 | ep_chk_period                      | The maximum lifetime of a checkpoint   |
@@ -385,20 +388,24 @@ For introductory information on stats within Couchbase, start with the
 | vb_active_eject               | Number of times item values got ejected    |
 | vb_active_expired             | Number of times an item was expired        |
 | vb_active_ht_memory           | Memory overhead of the hashtable           |
-| vb_active_itm_memory          | Total item memory                          |
-| vb_active_meta_data_memory    | Total metadata memory                      |
+| vb_active_itm_memory          | Total memory of all items in active        |
+|                               | vBuckets (StoredValue + key + value Blob)  |
+| vb_active_meta_data_memory    | Metadata memory of all items in active     |
+|                               | vBuckets (StoredValue + key)               |
 | vb_active_meta_data_disk      | Total metadata disk                        |
 | vb_active_ops_create          | Number of create operations                |
 | vb_active_ops_update          | Number of update operations                |
 | vb_active_ops_delete          | Number of delete operations                |
 | vb_active_ops_reject          | Number of rejected operations              |
 | vb_active_queue_size          | Active items in disk queue                 |
+| vb_active_backfill_queue_size | Items in active vbucket backfill queue     |
 | vb_active_queue_memory        | Memory used for disk queue                 |
 | vb_active_queue_age           | Sum of disk queue item age in milliseconds |
 | vb_active_queue_pending       | Total bytes of pending writes              |
 | vb_active_queue_fill          | Total enqueued items                       |
 | vb_active_queue_drain         | Total drained items                        |
 | vb_active_rollback_item_count | Num of items rolled back                   |
+| vb_active_hp_vb_req_size      | Num of async high priority requests        |
 
 *** Replica vBucket stats
 
@@ -411,20 +418,24 @@ For introductory information on stats within Couchbase, start with the
 | vb_replica_eject              | Number of times item values got ejected    |
 | vb_replica_expired            | Number of times an item was expired        |
 | vb_replica_ht_memory          | Memory overhead of the hashtable           |
-| vb_replica_itm_memory         | Total item memory                          |
-| vb_replica_meta_data_memory   | Total metadata memory                      |
+| vb_replica_itm_memory         | Total memory of all items in replica       |
+|                               | vBuckets (StoredValue + key + value Blob)  |
+| vb_replica_meta_data_memory   | Metadata memory of all items in replica    |
+|                               | vBuckets (StoredValue + key)               |
 | vb_replica_meta_data_disk     | Total metadata disk                        |
 | vb_replica_ops_create         | Number of create operations                |
 | vb_replica_ops_update         | Number of update operations                |
 | vb_replica_ops_delete         | Number of delete operations                |
 | vb_replica_ops_reject         | Number of rejected operations              |
 | vb_replica_queue_size         | Replica items in disk queue                |
+| vb_replica_backfill_queue_size| Items in replica vbucket backfill queue    |
 | vb_replica_queue_memory       | Memory used for disk queue                 |
 | vb_replica_queue_age          | Sum of disk queue item age in milliseconds |
 | vb_replica_queue_pending      | Total bytes of pending writes              |
 | vb_replica_queue_fill         | Total enqueued items                       |
 | vb_replica_queue_drain        | Total drained items                        |
 | vb_replica_rollback_item_count| Num of items rolled back                   |
+| vb_replica_hp_vb_req_size     | Num of async high priority requests        |
 
 *** Pending vBucket stats
 
@@ -437,20 +448,24 @@ For introductory information on stats within Couchbase, start with the
 | vb_pending_eject              | Number of times item values got ejected    |
 | vb_pending_expired            | Number of times an item was expired        |
 | vb_pending_ht_memory          | Memory overhead of the hashtable           |
-| vb_pending_itm_memory         | Total item memory                          |
-| vb_pending_meta_data_memory   | Total metadata memory                      |
+| vb_pending_itm_memory         | Total memory of all items in pending       |
+|                               | vBuckets (StoredValue + key + value Blob)  |
+| vb_pending_meta_data_memory   | Metadata memory of all items in pending    |
+|                               | vBuckets (StoredValue + key)               |
 | vb_pending_meta_data_disk     | Total metadata disk                        |
 | vb_pending_ops_create         | Number of create operations                |
 | vb_pending_ops_update         | Number of update operations                |
 | vb_pending_ops_delete         | Number of delete operations                |
 | vb_pending_ops_reject         | Number of rejected operations              |
 | vb_pending_queue_size         | Pending items in disk queue                |
+| vb_pending_backfill_queue_size| Items in pending vbucket backfill queue    |
 | vb_pending_queue_memory       | Memory used for disk queue                 |
 | vb_pending_queue_age          | Sum of disk queue item age in milliseconds |
 | vb_pending_queue_pending      | Total bytes of pending writes              |
 | vb_pending_queue_fill         | Total enqueued items                       |
 | vb_pending_queue_drain        | Total drained items                        |
 | vb_pending_rollback_item_count| Num of items rolled back                   |
+| vb_pending_hp_vb_req_size     | Num of async high priority requests        |
 
 
 ** vBucket detail stats
@@ -476,6 +491,7 @@ The stats below are listed for each vbucket.
 | ops_delete                    | Number of delete operations                |
 | ops_reject                    | Number of rejected operations              |
 | queue_size                    | Pending items in disk queue                |
+| backfill_queue_size           | Items in backfill queue                    |
 | queue_memory                  | Memory used for disk queue                 |
 | queue_age                     | Sum of disk queue item age in milliseconds |
 | queue_fill                    | Total enqueued items                       |
@@ -492,6 +508,7 @@ The stats below are listed for each vbucket.
 |                               | so this may not be accurate at times.      |
 | uuid                          | The current vbucket uuid                   |
 | rollback_item_count           | Num of items rolled back                   |
+| hp_vb_req_size                | Num of async high priority requests        |
 | max_cas                       | Maximum CAS of all items in the vbucket.   |
 |                               | This is a hybrid logical clock value in    |
 |                               | nanoseconds.                               |
@@ -513,6 +530,22 @@ The stats below are listed for each vbucket.
 | logical_clock_ticks           | How many times this vbucket's HLC has      |
 |                               | returned logical clock ticks.              |
 
+For Ephemeral buckets, the following additional statistics are listed for
+each vbucket:
+
+| Stat                          | Description                                                                                                                                   |
+|-------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------|
+| seqlist_count                 | number of documents in this VBucket's sequence list.                                                                                          |
+| seqlist_deleted_count         | Count of deleted documents in this VBucket's sequence list.                                                                                   |
+| seqlist_high_seqno            | High sequence number in sequence list for this VBucket.                                                                                       |
+| seqlist_highest_deduped_seqno | Highest de-duplicated sequence number in sequence list for this VBucket.                                                                      |
+| seqlist_read_range_begin      | Starting sequence number for this VBucket's sequence list read range. Marks the lower bound of possible stale documents in the sequence list. |
+| seqlist_read_range_end        | Ending sequence number for this VBucket's sequence list read range. Marks the upper bound of possible stale documents in the sequence list.   |
+| seqlist_read_range_count      | Count of elements for this VBucket's sequence list read range (i.e. end - begin).                                                             |
+| seqlist_stale_count           | Count of stale documents in this VBucket's sequence list.                                                                                     |
+| seqlist_stale_value_bytes     | Number of bytes of stale values in this VBucket's sequence list.                                                                              |
+| seqlist_stale_metadata_bytes  | Number of bytes of stale metadata (key + fixed metadata) in this VBucket's sequence list.                                                     |
+
 ** vBucket seqno stats
 
 | Stats                         | Description                                |
@@ -732,6 +765,7 @@ another colon.  For example, if your client is named, =slave1=, the
 | items_remaining       | The amount of items remaining to be sent               |
 | items_sent            | The amount of items already sent to the consumer       |
 | last_sent_time        | The last time this connection sent a message           |
+| last_receive_time     | The last time this connection received a message       |
 | max_buffer_bytes      | The maximum amount of bytes that can be sent without   |
 |                       | receiving an ack from the consumer                     |
 | noop_enabled          | Whether or not this connection sends noops             |
@@ -849,37 +883,41 @@ this:
 The following histograms are available from "timings" in the above
 form to describe when time was spent doing various things:
 
-| bg_wait               | bg fetches waiting in the dispatcher queue     |
-| bg_load               | bg fetches waiting for disk                    |
-| set_with_meta         | set_with_meta latencies                        |
-| access_scanner        | access scanner run times                       |
-| checkpoint_remover    | checkpoint remover run times                   |
-| item_pager            | item pager run times                           |
-| expiry_pager          | expiry pager run times                         |
-| bg_tap_wait           | tap bg fetches waiting in the dispatcher queue |
-| bg_tap_load           | tap bg fetches waiting for disk                |
-| pending_ops           | client connections blocked for operations      |
-|                       | in pending vbuckets                            |
-| storage_age           | Analogous to ep_storage_age in main stats      |
-| data_age              | Analogous to ep_data_age in main stats         |
-| get_cmd               | servicing get requests                         |
-| arith_cmd             | servicing incr/decr requests                   |
-| get_stats_cmd         | servicing get_stats requests                   |
-| get_vb_cmd            | servicing vbucket status requests              |
-| set_vb_cmd            | servicing vbucket set state commands           |
-| del_vb_cmd            | servicing vbucket deletion commands            |
-| chk_persistence_cmd   | waiting for checkpoint persistence             |
-| tap_vb_set            | servicing tap vbucket set state commands       |
-| tap_vb_reset          | servicing tap vbucket reset commands           |
-| tap_mutation          | servicing tap mutations                        |
-| notify_io             | waking blocked connections                     |
-| paged_out_time        | time (in seconds) objects are non-resident     |
-| disk_insert           | waiting for disk to store a new item           |
-| disk_update           | waiting for disk to modify an existing item    |
-| disk_del              | waiting for disk to delete an item             |
-| disk_vb_del           | waiting for disk to delete a vbucket           |
-| disk_commit           | waiting for a commit after a batch of updates  |
-| item_alloc_sizes      | Item allocation size counters (in bytes)       |
+| bg_wait                         | bg fetches waiting in the dispatcher queue     |
+| bg_load                         | bg fetches waiting for disk                    |
+| set_with_meta                   | set_with_meta latencies                        |
+| access_scanner                  | access scanner run times                       |
+| checkpoint_remover              | checkpoint remover run times                   |
+| item_pager                      | item pager run times                           |
+| expiry_pager                    | expiry pager run times                         |
+| bg_tap_wait                     | tap bg fetches waiting in the dispatcher queue |
+| bg_tap_load                     | tap bg fetches waiting for disk                |
+| pending_ops                     | client connections blocked for operations      |
+|                                 | in pending vbuckets                            |
+| storage_age                     | Analogous to ep_storage_age in main stats      |
+| data_age                        | Analogous to ep_data_age in main stats         |
+| get_cmd                         | servicing get requests                         |
+| arith_cmd                       | servicing incr/decr requests                   |
+| get_stats_cmd                   | servicing get_stats requests                   |
+| get_vb_cmd                      | servicing vbucket status requests              |
+| set_vb_cmd                      | servicing vbucket set state commands           |
+| del_vb_cmd                      | servicing vbucket deletion commands            |
+| chk_persistence_cmd             | waiting for checkpoint persistence             |
+| tap_vb_set                      | servicing tap vbucket set state commands       |
+| tap_vb_reset                    | servicing tap vbucket reset commands           |
+| tap_mutation                    | servicing tap mutations                        |
+| notify_io                       | waking blocked connections                     |
+| paged_out_time                  | time (in seconds) objects are non-resident     |
+| disk_insert                     | waiting for disk to store a new item           |
+| disk_update                     | waiting for disk to modify an existing item    |
+| disk_del                        | waiting for disk to delete an item             |
+| disk_vb_del                     | waiting for disk to delete a vbucket           |
+| disk_commit                     | waiting for a commit after a batch of updates  |
+| item_alloc_sizes                | Item allocation size counters (in bytes)       |
+| persistence_cursor_get_all_items| Time spent in fetching all items by            |
+|                                 | persistence cursor from checkpoint queues      |
+| dcp_cursors_get_all_items       | Time spent in fetching all items by all dcp    |
+|                                 | cursors from checkpoint queues                 |
 
 The following histograms are available from "scheduler" and "runtimes"
 describing the scheduling overhead times and task runtimes incurred by various
@@ -1012,6 +1050,8 @@ each stat name.
 |                                  | name 'cursor_name' is pointing now        |
 | cursor_name:cursor_seqno         | The seqno at which the cursor             |
 |                                  | 'cursor_name' is pointing now             |
+| cursor_name:num_visits           | Number of times a batch of items have been|
+|                                  | drained from a checkpoint of 'cursor_name'|
 | open_checkpoint_id               | ID of the current open checkpoint         |
 | num_conn_cursors                 | Number of referencing dcp/tap cursors     |
 | num_checkpoint_items             | Number of total items in a checkpoint     |
@@ -1051,6 +1091,12 @@ Note that tcmalloc stats are not available on some operating systems
 | ep_mem_low_wat_percent              | Low water mark (as a percentage)       |
 | ep_mem_high_wat                     | High water mark for auto-evictions   |
 | ep_mem_high_wat_percent             | High water mark (as a percentage)      |
+| ep_mem_merge_bytes_threshold        | The amount of thread-local memory    |
+|                                     | accumulation at which the local ctr  |
+|                                     | is to be merged with bucket level ctr|
+| ep_mem_merge_count_threshold        | No.of modifications to thread-local  |
+|                                     | mem ctr after which the ctr is to be |
+|                                     | merged with bucket level ctr         |
 | ep_oom_errors                       | Number of times unrecoverable OOMs   |
 |                                     | happened while processing operations |
 | ep_tmp_oom_errors                   | Number of times temporary OOMs       |
@@ -1104,6 +1150,9 @@ Note that tcmalloc stats are not available on some operating systems
 | key_valid                     | See description below                  | V|
 | key_vb_state                  | The vbucket state of this key          |KV|
 
+All of the above numeric statistics (cas, exptime, flags) are printed as
+decimal integers.
+
 =key_valid= can have the following responses:
 
 this_is_a_bug - Some case we didn't take care of.
@@ -1173,6 +1222,8 @@ The following stats are available for the CouchStore database engine:
 | io_total_write_bytes      | Number of bytes written (total, including Couchstore B-Tree and other overheads)          |
 | io_compaction_read_bytes  | Number of bytes read (compaction only, includes Couchstore B-Tree and other overheads)    |
 | io_compaction_write_bytes | Number of bytes written (compaction only, includes Couchstore B-Tree and other overheads) |
+| block_cache_hits          | Number of block cache hits in buffer cache provided by underlying store                   |
+| block_cache_misses        | Number of block cache misses in buffer cache provided by underlying store                 |
 
 ** KV Store Timing Stats
 
@@ -1314,6 +1365,8 @@ Reset Histograms:
 | get_vb_cmd                        |
 | notify_io                         |
 | pending_ops                       |
+| persistence_cursor_get_all_items  |
+| dcp_cursors_get_all_items         |
 | set_vb_cmd                        |
 | storage_age                       |
 | tap_mutation                      |
index 21207e3..fa92dda 100755 (executable)
@@ -4,15 +4,13 @@
 # install/bin/cbcompact 127.0.0.1:12000 compact -b beer-sample 799
 #   where 12000 is memcached port and 799 is vbucket id
 
-import clitool
+import cli_auth_utils
 from time import sleep
 import sys
 import mc_bin_client
 
 def cmd(f):
-    """Decorate a function with code to authenticate based on 1-2
-    arguments past the normal number of arguments."""
-
+    f = cli_auth_utils.cmd_decorator(f)
     def g(*args, **kwargs):
         mc = args[0]
 
@@ -22,26 +20,13 @@ def cmd(f):
             print "Must specify a vbucket id after the compact argument"
             sys.exit(1)
 
-        n = f.func_code.co_argcount
-        if len(args) > n:
-            print "Too many args, given %s, but expected a maximum of %s"\
-                    % (list(args[1:]), n - 1)
-            sys.exit(1)
-
-        bucket = kwargs.get('bucketName', None)
-        password = kwargs.get('password', None) or ""
-        purgeBeforeTs = long(kwargs.get('purgeBeforeTs', None) or 0)
-        purgeBeforeSeq = long(kwargs.get('purgeBeforeSeq', None) or 0)
-        dropDeletes = int(kwargs.get('dropDeletes', None) or 0)
-
-        if bucket:
-            try:
-                mc.sasl_auth_plain(bucket, password)
-            except mc_bin_client.MemcachedError:
-                print "Authentication error for %s" % bucket
-                sys.exit(1)
+        # These arguments are /always/ in kwargs, but if they are not specified
+        # on the command line will have the value None.
+        purgeBeforeTs = long(kwargs.pop('purgeBeforeTs') or 0)
+        purgeBeforeSeq = long(kwargs.pop('purgeBeforeSeq') or 0)
+        dropDeletes = int(kwargs.pop('dropDeletes') or 0)
 
-        f(mc, vbucket, purgeBeforeTs, purgeBeforeSeq, dropDeletes)
+        f(mc, vbucket, purgeBeforeTs, purgeBeforeSeq, dropDeletes, **kwargs)
     return g
 
 @cmd
@@ -56,12 +41,9 @@ def compact(mc, vbucket, purgeBeforeTs, purgeBeforeSeq, dropDeletes):
              % (vbucket, purgeBeforeTs, purgeBeforeSeq, dropDeletes)
 
 def main():
-    c = clitool.CliTool()
+    c = cli_auth_utils.get_authed_clitool()
 
     c.addCommand('compact', compact, 'compact vbucketid')
-    c.addOption('-b', 'bucketName',
-                'the bucket to perform compaction on (Default: default)')
-    c.addOption('-p', 'password', 'the password for the bucket if one exists')
     c.addOption('--purge-before', 'purgeBeforeTs',
                 'purge documents before this timestamp')
     c.addOption('--purge-only-upto-seq', 'purgeBeforeSeq',
index d709412..97772b3 100755 (executable)
@@ -4,41 +4,13 @@ import time
 import sys
 
 import clitool
+import cli_auth_utils
 import exceptions
 import mc_bin_client
 import memcacheConstants
 import sys
 
-def cmd(f):
-    """Decorate a function with code to authenticate based on 1-2
-    arguments past the normal number of arguments."""
-
-    def g(*args, **kwargs):
-        mc = args[0]
-        n = f.func_code.co_argcount
-
-        bucket = kwargs.get('bucketName', None)
-        password = kwargs.get('password', None) or ""
-
-        if bucket:
-            try:
-                mc.sasl_auth_plain(bucket, password)
-            except mc_bin_client.MemcachedError:
-                print "Authentication error for %s" % bucket
-                sys.exit(1)
-
-        if kwargs.get('allBuckets', None):
-            buckets = mc.stats('bucket')
-            for bucket in buckets.iterkeys():
-                print '*' * 78
-                print bucket
-                print
-                mc.bucket_select(bucket)
-                f(*args[:n])
-        else:
-            f(*args[:n])
-
-    return g
+cmd = cli_auth_utils.cmd_decorator
 
 @cmd
 def set_param(mc, type, key, val):
@@ -237,14 +209,14 @@ Available params for "set":
                                    traffic
     warmup_min_items_threshold   - Item number threshold (%) during warmup to enable
                                    traffic
-    max_num_readers              - Override default number of global threads that
-                                   prioritize read operations.
-    max_num_writers              - Override default number of global threads that
-                                   prioritize write operations.
-    max_num_auxio                - Override default number of global threads that
-                                   prioritize auxio operations.
-    max_num_nonio                - Override default number of global threads that
-                                   prioritize nonio operations.
+    num_reader_threads           - Override default number of global threads
+                                   that perform read operations.
+    num_writer_threads           - Override default number of global threads
+                                   that perform write operations.
+    num_auxio_threads            - Override default number of global threads
+                                   that perform auxio operations.
+    num_nonio_threads            - Override default number of global threads
+                                   that perform nonio operations.
 
   Available params for "set tap_param":
     tap_keepalive                    - Seconds to hold a named tap connection.
@@ -265,7 +237,7 @@ Available params for "set":
                                                         DCP processor will consume
                                                         in a single batch.
 
-Available params for "set_vbucket_param:
+Available params for "set_vbucket_param":
     max_cas - Change the max_cas of a vbucket. The value and vbucket are specified as decimal
               integers. The new-value is interpretted as an unsigned 64-bit integer.
 
@@ -273,12 +245,12 @@ Available params for "set_vbucket_param:
 
     """)
 
+    c = cli_auth_utils.get_authed_clitool()
+
     c.addCommand('drain', drain, "drain")
     c.addCommand('set', set_param, 'set type param value')
     c.addCommand('set_vbucket_param', set_vbucket_param, 'type vbucket value')
     c.addCommand('start', start, 'start')
     c.addCommand('stop', stop, 'stop')
-    c.addFlag('-a', 'allBuckets', 'iterate over all buckets (requires admin u/p)')
-    c.addOption('-b', 'bucketName', 'the bucket to get stats from (Default: default)')
-    c.addOption('-p', 'password', 'the password for the bucket if one exists')
+
     c.execute()
index 67a9ba4..49caf3a 100755 (executable)
@@ -1,12 +1,17 @@
 #!/usr/bin/env python
 
 import clitool
+import cli_auth_utils
 import sys
 import math
+import inspect
 import itertools
 import mc_bin_client
 import re
 
+from collections import defaultdict
+from operator import itemgetter
+
 try:
     import simplejson as json
 except ImportError:
@@ -23,41 +28,11 @@ SMALL_VALUE = - (2 ** 60)
 output_json = False
 
 def cmd(f):
-    """Decorate a function with code to authenticate based on 1-2
-    arguments past the normal number of arguments."""
-
+    f = cli_auth_utils.cmd_decorator(f)
     def g(*args, **kwargs):
         global output_json
-        mc = args[0]
-        n = f.func_code.co_argcount
-        if len(args) > n:
-            print >> sys.stderr, ("Error: too many arguments - expected" +
-                                  " a maximum of %s but was passed %s: %s" \
-                                  % (n - 1, len(args) - 1, list(args[1:])))
-            sys.exit(1)
-
-        bucket = kwargs.get('bucketName', None)
-        password = kwargs.get('password', None) or ""
-        output_json = kwargs.get('json', None)
-
-        if bucket:
-            try:
-                mc.sasl_auth_plain(bucket, password)
-            except mc_bin_client.MemcachedError:
-                print "Authentication error for %s" % bucket
-                sys.exit(1)
-
-        if kwargs.get('allBuckets', None):
-            buckets = mc.list_buckets()
-            for bucket in buckets:
-                print '*' * 78
-                print bucket
-                print
-                mc.bucket_select(bucket)
-                f(*args[:n])
-        else:
-            f(*args[:n])
-
+        output_json = kwargs.pop('json', None)
+        f(*args, **kwargs)
     return g
 
 def stats_perform(mc, cmd=''):
@@ -82,6 +57,190 @@ def stats_formatter(stats, prefix=" ", cmp=None):
                 s = stat + ":"
                 print "%s%s%s" % (prefix, s.ljust(longest), val)
 
+
+def table_formatter(columns, data, sort_key=None, reverse=False):
+    """Formats data in a top-style table.
+
+    Takes a list of Columns (holding a display name and alignment), and a list
+    of lists representing each row in the table. The data in the rows should be
+    in the same order as the Columns.
+    """
+    column_widths = [len(c) for c in columns]
+    for row in data:
+        for index, item in enumerate(row):
+            column_widths[index] = max([column_widths[index], len(str(item))])
+
+    template = ""
+    for index, column in enumerate(columns[:-1]):
+        align = ">" if column.ralign else "<"
+        template += "{{{0}:{1}{2}}}  ".format(index, align,
+                                              column_widths[index])
+    # Last line is not padded unless right aligned
+    # so only long lines will wrap, not all of them
+    template += ("{{{0}:>{1}}}  ".format(len(columns) - 1, column_widths[-1])
+                if columns[-1].ralign else ("{" + str(len(columns) - 1) + "}"))
+
+    print template.format(*columns), "\n"
+    for row in sorted(data, key=sort_key, reverse=reverse):
+        print template.format(*row)
+
+class TaskStat(object):
+    """Represents a stat which must be sorted by a different value than is
+    displayed, i.e. pretty-printed timestamps
+    """
+    def __init__(self, display_value, value):
+        self.display_value = display_value
+        self.value = value
+
+    def __eq__(self, other):
+        return self.value == (other.value if hasattr(other, "value") else other)
+
+    def __lt__(self, other):
+        return self.value < (other.value if hasattr(other, "value") else other)
+
+    # total_ordering decorator unavailable in Python 2.6, otherwise only
+    # __eq__ and one comparision would be necessary
+
+    def __gt__(self, other):
+        return self.value > (other.value if hasattr(other, "value") else other)
+
+    def __str__(self):
+        return self.display_value
+
+class Column(object):
+    def __init__(self, display_name, invert_sort, ralign):
+        self.display_name = display_name
+        self.invert_sort = invert_sort
+        self.ralign = ralign
+
+    def __str__(self):
+        return self.display_name
+
+    def __len__(self):
+        return len(str(self))
+
+def ps_time_stat(t):
+    """convenience method for constructing a stat displaying a ps-style
+    timestamp but sorting on the underlying time since epoch.
+    """
+    t = t / 1000
+    return TaskStat(ps_time_label(t), t)
+
+def tasks_stats_formatter(stats, sort_by=None, reverse=False, *args):
+    """Formats the data from ep_tasks in a top-like display"""
+    if stats:
+        if output_json:
+            stats_formatter(stats)
+        else:
+            cur_time = int(stats.pop("ep_tasks:cur_time"))
+
+            total_tasks = {"Reader":0,
+                           "Writer":0,
+                           "AuxIO":0,
+                           "NonIO":0}
+
+            running_tasks = total_tasks.copy()
+
+            states = ["R", "S", "D"]
+
+
+            tasks = json.loads(stats["ep_tasks:tasks"])
+
+            for task in tasks:
+                total_tasks[task["type"]]+=1
+
+                task["waketime_ns"] = (ps_time_stat(
+                                        (task["waketime_ns"] - cur_time))
+                                    if task["waketime_ns"] < BIG_VALUE
+                                    else TaskStat("inf", task["waketime_ns"]))
+
+                task["total_runtime_ns"] = ps_time_stat(
+                                                   task["total_runtime_ns"])
+
+                if task["state"] == "RUNNING":
+                    # task is running
+                    task["runtime"] = ps_time_stat(cur_time -
+                                                   task["last_starttime_ns"])
+                    task["waketime_ns"] = ps_time_stat(0)
+                    running_tasks[task["type"]]+=1
+                else:
+                    task["runtime"] = ps_time_stat(0)
+
+                task["state"] = task["state"][0]
+
+
+            running_tasks["Total"] = sum(running_tasks.values())
+            total_tasks["Total"] = len(tasks)
+
+            headers = (
+                "Tasks     Writer Reader AuxIO  NonIO  Total \n" +
+                "Running   {Writer:<6} {Reader:<6} "
+                "{AuxIO:<6} {NonIO:<6} {Total:<6}\n"
+                    .format(**running_tasks) +
+                "All       {Writer:<6} {Reader:<6} "
+                "{AuxIO:<6} {NonIO:<6} {Total:<6}\n"
+                    .format(**total_tasks)
+            )
+
+            print headers
+
+            table_columns = [
+                    (key, Column(*options)) for key, options in (
+                    # Stat            Display Name, Invert Sort, Right Align
+                    ('tid',              ('TID',      False, True )),
+                    ('priority',         ('Pri',      False, True )),
+                    ('state',            ('St',       False, False)),
+                    ('bucket',           ('Bucket',   False, False)),
+                    ('waketime_ns',      ('SleepFor', True,  True )),
+                    ('runtime',          ('Runtime',  True,  True )),
+                    ('total_runtime_ns', ('TotalRun', True,  True )),
+                    ('type',             ('Type',     False, False)),
+                    ('name',             ('Name',     False, False)),
+                    ('description',      ('Descr.',   False, False)),
+                )]
+
+            table_column_keys = [x[0] for x in table_columns]
+            table_column_values = [x[1] for x in table_columns]
+
+            table_data = []
+
+            for row in tasks:
+                table_data.append(tuple(row[key]
+                                        for key in table_column_keys))
+
+            sort_key = None
+            if sort_by is not None:
+                if isinstance(sort_by, int):
+                    sort_key = itemgetter(sort_by)
+
+                elif isinstance(sort_by, basestring):
+                    if sort_by.isdigit():
+                        sort_key = itemgetter(int(sort_by))
+                    else:
+                        sort_by = sort_by.lower()
+                        for index, column in enumerate(table_column_values):
+                            if sort_by == column.display_name.lower():
+                                sort_key = itemgetter(index)
+                                reverse ^= column.invert_sort
+                                break
+
+            table_formatter(table_column_values, table_data,
+                            sort_key, reverse=reverse)
+
+
+def ps_time_label(microseconds):
+    sign = "-" if microseconds < 0 else ""
+
+    centiseconds = abs(microseconds)//10000
+
+    seconds = centiseconds//100
+    centiseconds %= 100
+
+    minutes = seconds//60
+    seconds %= 60
+
+    return "{0}{1}:{2:0>2}.{3:0>2}".format(sign, minutes, seconds, centiseconds)
+
 def time_label(s):
     # -(2**64) -> '-inf'
     # 2**64 -> 'inf'
@@ -95,6 +254,11 @@ def time_label(s):
         return '-inf'
     elif s == 0:
         return '0'
+
+    isNegative = s < 0
+
+    s = abs(s)
+
     product = 1
     sizes = (('us', 1), ('ms', 1000), ('s', 1000), ('m', 60))
     sizeMap = []
@@ -107,8 +271,11 @@ def time_label(s):
     if lbl == 'm':
         mins = s / factor
         secs = (s % factor) / (factor / 60)
-        return '%d%s:%02ds' % (mins, lbl, secs)
-    return "%d%s" % (s / factor, lbl)
+        result = '%d%s:%02ds' % (mins, lbl, secs)
+    else:
+        result = "%d%s" % (s / factor, lbl)
+
+    return ("-" if isNegative else "") + result
 
 def sec_label(s):
     return time_label(s * 1000000)
@@ -121,8 +288,6 @@ def size_label(s):
     suffix = sizes[int(e)]
     return "%d%s" % (s/(1024 ** math.floor(e)), suffix)
 
-@cmd
-
 def histograms(mc, raw_stats):
     # Try to figure out the terminal width.  If we can't, 79 is good
     def termWidth():
@@ -501,11 +666,29 @@ def stats_dispatcher(mc, with_logs='no'):
                     print "        ---------"
 
 @cmd
+def stats_tasks(mc, *args):
+    tasks_stats_formatter(stats_perform(mc, 'tasks'), *args)
+
+@cmd
+def stats_responses(mc, all=''):
+    resps = json.loads(stats_perform(mc, 'responses')['responses'])
+    c = mc.get_error_map()['errors']
+    d = {}
+    for k, v in resps.iteritems():
+        try:
+            if v > 0 or all:
+                d[c[int(k, 16)]['name']] = v
+        except KeyError:
+            pass # Ignore it, no matching status code
+
+    stats_formatter(d)
+
+@cmd
 def reset(mc):
     stats_perform(mc, 'reset')
 
 def main():
-    c = clitool.CliTool()
+    c = cli_auth_utils.get_authed_clitool()
 
     c.addCommand('all', stats_all, 'all')
     c.addCommand('allocator', stats_allocator, 'allocator')
@@ -515,6 +698,7 @@ def main():
     c.addCommand('scheduler', stats_scheduler, 'scheduler')
     c.addCommand('runtimes', stats_runtimes, 'runtimes')
     c.addCommand('dispatcher', stats_dispatcher, 'dispatcher [logs]')
+    c.addCommand('tasks', stats_tasks, 'tasks [sort column]')
     c.addCommand('workload', stats_workload, 'workload')
     c.addCommand('failovers', stats_failovers, 'failovers [vbid]')
     c.addCommand('hash', stats_hash, 'hash [detail]')
@@ -526,6 +710,7 @@ def main():
     c.addCommand('prev-vbucket', stats_prev_vbucket, 'prev-vbucket')
     c.addCommand('raw', stats_raw, 'raw argument')
     c.addCommand('reset', reset, 'reset')
+    c.addCommand('responses', stats_responses, 'responses [all]')
     c.addCommand('slabs', stats_slabs, 'slabs (memcached bucket only)')
     c.addCommand('tap', stats_tap, 'tap')
     c.addCommand('tapagg', stats_tapagg, 'tapagg')
@@ -541,9 +726,6 @@ def main():
     c.addCommand('warmup', stats_warmup, 'warmup')
     c.addCommand('uuid', stats_uuid, 'uuid')
     c.addFlag('-j', 'json', 'output the results in json format')
-    c.addHiddenFlag('-a', 'allBuckets')
-    c.addOption('-b', 'bucketName', 'the bucket to get stats from (Default: default)')
-    c.addOption('-p', 'password', 'the password for the bucket if one exists')
 
     c.execute()
 
diff --git a/management/cli_auth_utils.py b/management/cli_auth_utils.py
new file mode 100644 (file)
index 0000000..136f80d
--- /dev/null
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+
+import clitool
+import inspect
+import mc_bin_client
+import memcacheConstants
+import sys
+import os
+
+def cmd_decorator(f):
+    """Decorate a function with code to authenticate based on 1-3
+    additional arguments."""
+
+    def g(*args, **kwargs):
+        mc = args[0]
+        spec = inspect.getargspec(f)
+        max = len(spec.args)
+        defaults = len(spec.defaults) if spec.defaults else 0
+        min = max - defaults
+
+        if len(args) < min:
+            print >> sys.stderr, ("Error: too few arguments - command "
+                                  "expected a minimum of %s but was passed "
+                                  "%s: %s"
+                                  % (min - 1, len(args) - 1, list(args[1:])))
+            sys.exit(2)
+
+        if spec.varargs is None:
+            if len(args) > max:
+                print >> sys.stderr, ("Error: too many arguments - command "
+                                      "expected a maximum of %s but was passed "
+                                      "%s: %s"
+                                      % (max - 1, len(args) - 1, list(args[1:])))
+                sys.exit(2)
+
+        bucket = kwargs.pop('bucketName', None)
+        username = kwargs.pop('username', None) or bucket
+        password = kwargs.pop('password', None)
+
+
+        if username is not None or password is not None:
+            bucket = bucket or 'default'
+            username = username or bucket
+            password = password or ''
+            try:
+                mc.sasl_auth_plain(username, password)
+            except mc_bin_client.MemcachedError:
+                print ("Authentication error for user:{0} bucket:{1}"
+                       .format(username, bucket))
+                sys.exit(1)
+
+        mc.enable_xerror()
+        mc.hello("{0} {1}".format(os.path.split(sys.argv[0])[1],
+                                os.getenv("EP_ENGINE_VERSION",
+                                          "unknown version")))
+        try:
+            if kwargs.pop('allBuckets', None):
+                buckets = mc.list_buckets()
+                for bucket in buckets:
+                    print '*' * 78
+                    print bucket
+                    print
+                    mc.bucket_select(bucket)
+                    f(*args, **kwargs)
+            elif bucket is not None:
+                mc.bucket_select(bucket)
+                f(*args, **kwargs)
+            else:
+                f(*args, **kwargs)
+        except mc_bin_client.ErrorEaccess:
+            print ("No access to bucket:{} - permission denied "
+                   "or bucket does not exist.".format(bucket))
+            sys.exit(1)
+
+    return g
+
+
+
+def get_authed_clitool():
+    c = clitool.CliTool()
+
+    c.addFlag('-a', 'allBuckets', 'iterate over all buckets')
+    c.addOption('-b', 'bucketName', 'the bucket to get stats from (Default: default)')
+    c.addOption('-u', 'username', 'the user as which to authenticate (Default: bucketName)')
+    c.addOption('-p', 'password', 'the password for the bucket if one exists')
+
+    return c
index e006bd6..5dfb1f0 100644 (file)
@@ -1,7 +1,6 @@
 import optparse
 import socket
 import sys
-import os
 
 import mc_bin_client
 
@@ -11,7 +10,9 @@ class CliTool(object):
         self.cmds = {}
         self.flags = {}
         self.extraUsage = extraUsage.strip()
-        self.parser = optparse.OptionParser()
+        self.parser = optparse.OptionParser(
+            usage="%prog host[:dataport] command [options]\n\n"
+                  "dataport [default:11210]")
 
     def addCommand(self, name, f, help=None):
         if not help:
@@ -33,18 +34,27 @@ class CliTool(object):
                                help=description)
 
     def execute(self):
+        self.parser.usage +=  "\n" + self.format_command_list()
 
-        try:
-            opts, args = self.parser.parse_args()
-        except SystemExit:
-            self.usage(True)
+        opts, args = self.parser.parse_args()
+
+        if len(args) < 2:
+            print >> sys.stderr, self.parser.error("Too few arguments")
+            sys.exit(2)
 
-        try:
-            hp, self.cmd = args[:2]
-            host, port = hp.split(':')
-            port = int(port)
-        except ValueError:
-            self.usage()
+
+        hp, self.cmd = args[:2]
+        if ':' in hp:
+            host, port = hp.split(':', 1)
+            try:
+                port = int(port)
+            except ValueError:
+                print >> sys.stderr, self.parser.error(
+                                                 "invalid host[:dataport]")
+                sys.exit(2)
+        else:
+            host = hp
+            port = 11210
 
         try:
             mc = mc_bin_client.MemcachedClient(host, port)
@@ -55,7 +65,7 @@ class CliTool(object):
         f = self.cmds.get(self.cmd)
 
         if not f:
-            self.usage()
+             print self.parser.error("Unknown command")
 
         try:
             if callable(f[0]):
@@ -71,16 +81,17 @@ class CliTool(object):
             else:
                 raise
 
-    def usage(self, skipOptions=False):
-        program=os.path.basename(sys.argv[0])
-        print "Usage: %s host:dataport command [options]" % program
-        print "       Note that the default dataport is 11210"
-        print "\nOptions:"
-        for o in self.flags.keys():
-            print >>sys.stderr," %s\t%s"%(o, self.flags[o])
+    def format_command_list(self):
+        output = ""
+
         cmds = sorted(c[1] for c in self.cmds.values())
-        print >>sys.stderr, "\nUsage: %s host:dataport %s" % (program, cmds[0])
-        for c in cmds[1:]:
-            print >>sys.stderr, "  or   %s host:dataport %s" % (program, c)
-        print >>sys.stderr, self.extraUsage
-        sys.exit(1)
+        output += "\nCommands:\n"
+
+        for c in cmds:
+            output += "    %s\n" % c
+
+        output = output[:-1]
+
+        output += self.extraUsage
+
+        return output
index 1411e95..20b9874 100644 (file)
@@ -5,14 +5,15 @@ Binary memcached test client.
 Copyright (c) 2007  Dustin Sallings <dustin@spy.net>
 """
 
-import sys
-import time
+import exceptions
 import hmac
-import socket
+import json
 import random
-import struct
-import exceptions
 import select
+import socket
+import struct
+import sys
+import time
 
 from memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE
 from memcacheConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET
@@ -35,11 +36,38 @@ class TimeoutError(exceptions.Exception):
         str += 'passed or the connectivity to a server to be connected'
         return str
 
+
+# This metaclass causes any instantiation of MemcachedError to transparently
+# construct the appropriate subclass if the error code is known.
+class MemcachedErrorMetaclass(type):
+    _dispatch = {}
+    def __new__(meta, name, bases, dct):
+        cls = (super(MemcachedErrorMetaclass, meta)
+               .__new__(meta, name, bases, dct))
+        if 'ERRCODE' in dct:
+            meta._dispatch[dct['ERRCODE']] = cls
+        return cls
+
+    def __call__(cls, *args, **kwargs):
+        err = None
+        if 'status' in kwargs:
+            err = kwargs['status']
+        elif len(args):
+            err = args[0]
+
+        if err in cls._dispatch:
+            cls = cls._dispatch[err]
+
+        return (super(MemcachedErrorMetaclass, cls)
+                .__call__(*args, **kwargs))
+
 class MemcachedError(exceptions.Exception):
     """Error raised when a command fails."""
 
+    __metaclass__ = MemcachedErrorMetaclass
+
     def __init__(self, status, msg):
-        supermsg='Memcached error #' + `status`
+        supermsg='Memcached error #' + repr(status)
         if msg: supermsg += ":  " + msg
         exceptions.Exception.__init__(self, supermsg)
 
@@ -49,6 +77,48 @@ class MemcachedError(exceptions.Exception):
     def __repr__(self):
         return "<MemcachedError #%d ``%s''>" % (self.status, self.msg)
 
+class ErrorKeyEnoent(MemcachedError): ERRCODE = 0x1
+class ErrorKeyEexists(MemcachedError): ERRCODE = 0x2
+class ErrorE2big(MemcachedError): ERRCODE = 0x3
+class ErrorEinval(MemcachedError): ERRCODE = 0x4
+class ErrorNotStored(MemcachedError): ERRCODE = 0x5
+class ErrorDeltaBadval(MemcachedError): ERRCODE = 0x6
+class ErrorNotMyVbucket(MemcachedError): ERRCODE = 0x7
+class ErrorNoBucket(MemcachedError): ERRCODE = 0x8
+class ErrorLocked(MemcachedError): ERRCODE = 0x9
+class ErrorAuthStale(MemcachedError): ERRCODE = 0x1f
+class ErrorAuthError(MemcachedError): ERRCODE = 0x20
+class ErrorAuthContinue(MemcachedError): ERRCODE = 0x21
+class ErrorErange(MemcachedError): ERRCODE = 0x22
+class ErrorRollback(MemcachedError): ERRCODE = 0x23
+class ErrorEaccess(MemcachedError): ERRCODE = 0x24
+class ErrorNotInitialized(MemcachedError): ERRCODE = 0x25
+class ErrorUnknownCommand(MemcachedError): ERRCODE = 0x81
+class ErrorEnomem(MemcachedError): ERRCODE = 0x82
+class ErrorNotSupported(MemcachedError): ERRCODE = 0x83
+class ErrorEinternal(MemcachedError): ERRCODE = 0x84
+class ErrorEbusy(MemcachedError): ERRCODE = 0x85
+class ErrorEtmpfail(MemcachedError): ERRCODE = 0x86
+class ErrorXattrEinval(MemcachedError): ERRCODE = 0x87
+class ErrorUnknownCollection(MemcachedError): ERRCODE = 0x88
+class ErrorSubdocPathEnoent(MemcachedError): ERRCODE = 0xc0
+class ErrorSubdocPathMismatch(MemcachedError): ERRCODE = 0xc1
+class ErrorSubdocPathEinval(MemcachedError): ERRCODE = 0xc2
+class ErrorSubdocPathE2big(MemcachedError): ERRCODE = 0xc3
+class ErrorSubdocPathE2deep(MemcachedError): ERRCODE = 0xc4
+class ErrorSubdocValueCantinsert(MemcachedError): ERRCODE = 0xc5
+class ErrorSubdocDocNotjson(MemcachedError): ERRCODE = 0xc6
+class ErrorSubdocNumErange(MemcachedError): ERRCODE = 0xc7
+class ErrorSubdocDeltaEinval(MemcachedError): ERRCODE = 0xc8
+class ErrorSubdocPathEexists(MemcachedError): ERRCODE = 0xc9
+class ErrorSubdocValueEtoodeep(MemcachedError): ERRCODE = 0xca
+class ErrorSubdocInvalidCombo(MemcachedError): ERRCODE = 0xcb
+class ErrorSubdocMultiPathFailure(MemcachedError): ERRCODE = 0xcc
+class ErrorSubdocSuccessDeleted(MemcachedError): ERRCODE = 0xcd
+class ErrorSubdocXattrInvalidFlagCombo(MemcachedError): ERRCODE = 0xce
+class ErrorSubdocXattrInvalidKeyCombo(MemcachedError): ERRCODE = 0xcf
+class ErrorSubdocXattrUnknownMacro(MemcachedError): ERRCODE = 0xd0
+
 class MemcachedClient(object):
     """Simple memcached client."""
 
@@ -64,6 +134,9 @@ class MemcachedClient(object):
             self.s.connect_ex((host, port))
         self.s.setblocking(0)
         self.r=random.Random()
+        self.features = []
+        self.error_map = None
+        self.error_map_version = 1
 
     def close(self):
         self.s.close()
@@ -116,7 +189,13 @@ class MemcachedClient(object):
         assert myopaque is None or opaque == myopaque, \
             "expected opaque %x, got %x" % (myopaque, opaque)
         if errcode != 0:
-            raise MemcachedError(errcode,  rv)
+            if self.error_map is None:
+                msg = rv
+            else:
+                err = self.error_map['errors'].get(errcode, rv)
+                msg = "{name} : {desc} : {rv}".format(rv=rv, **err)
+
+            raise MemcachedError(errcode,  msg)
         return cmd, opaque, cas, keylen, extralen, rv
 
     def _handleSingleResponse(self, myopaque):
@@ -136,6 +215,11 @@ class MemcachedClient(object):
     def _cat(self, cmd, key, cas, val):
         return self._doCmd(cmd, key, val, '', cas)
 
+    def hello(self, name):
+        return self._doCmd(memcacheConstants.CMD_HELLO, name,
+                           struct.pack('>' + 'H' * len(self.features),
+                                       *self.features))
+
     def append(self, key, value, cas=0):
         return self._cat(memcacheConstants.CMD_APPEND, key, cas, value)
 
@@ -410,6 +494,15 @@ class MemcachedClient(object):
                 done = True
         return rv
 
+    def get_random_key(self):
+        opaque=self.r.randint(0, 2**32)
+        self._sendCmd(memcacheConstants.CMD_GET_RANDOM_KEY, '', '', opaque)
+        cmd, opaque, cas, klen, extralen, data = self._handleKeyedResponse(None)
+        rv = {}
+        if klen:
+            rv[data[4:klen+4]] = data[klen:]
+        return rv
+
     def noop(self):
         """Send a noop command."""
         return self._doCmd(memcacheConstants.CMD_NOOP, '', '')
@@ -439,3 +532,21 @@ class MemcachedClient(object):
         opaque, cas, data = self._doCmd(
             memcacheConstants.CMD_LIST_BUCKETS, '', '', '', 0)
         return data.strip().split(' ')
+
+    def get_error_map(self):
+        _, _, errmap = self._doCmd(memcacheConstants.CMD_GET_ERROR_MAP, '',
+                    struct.pack("!H", self.error_map_version))
+
+        errmap = json.loads(errmap)
+
+        d = {}
+
+        for k,v in errmap['errors'].iteritems():
+            d[int(k, 16)] = v
+
+        errmap['errors'] = d
+        return errmap
+
+    def enable_xerror(self):
+        self.features.append(memcacheConstants.FEATURE_XERROR)
+        self.error_map = self.get_error_map()
index 2f60009..bf04613 100644 (file)
@@ -9,11 +9,9 @@ import struct
 # Command constants
 CMD_GET = 0
 CMD_SET = 1
-CMD_SETQ = 0x11
 CMD_ADD = 2
 CMD_REPLACE = 3
 CMD_DELETE = 4
-CMD_DELETEQ = 0x14
 CMD_INCR = 5
 CMD_DECR = 6
 CMD_QUIT = 7
@@ -21,12 +19,15 @@ CMD_FLUSH = 8
 CMD_GETQ = 9
 CMD_NOOP = 10
 CMD_VERSION = 11
-CMD_STAT = 0x10
 CMD_APPEND = 0x0e
 CMD_PREPEND = 0x0f
+CMD_STAT = 0x10
+CMD_SETQ = 0x11
+CMD_DELETEQ = 0x14
 CMD_VERBOSE = 0x1b
 CMD_TOUCH = 0x1c
 CMD_GAT = 0x1d
+CMD_HELLO = 0x1f
 CMD_GET_REPLICA = 0x83
 CMD_OBSERVE = 0x92
 
@@ -83,6 +84,9 @@ CMD_DELETE_VBUCKET = 0x3f
 
 CMD_GET_LOCKED = 0x94
 CMD_COMPACT_DB = 0xb3
+CMD_GET_RANDOM_KEY = 0xb6
+
+CMD_GET_ERROR_MAP = 0xfe
 
 # event IDs for the SYNC command responses
 CMD_SYNC_EVENT_PERSISTED  = 1
@@ -111,6 +115,17 @@ ENGINE_PARAM_VBUCKET    = 5
 
 COMMAND_NAMES = dict(((globals()[k], k) for k in globals() if k.startswith("CMD_")))
 
+# Enableable features
+FEATURE_DATATYPE = 0x01
+FEATURE_TLS = 0x2
+FEATURE_TCPNODELAY = 0x03
+FEATURE_MUTATION_SEQNO = 0x04
+FEATURE_TCPDELAY = 0x05
+FEATURE_XATTR = 0x06
+FEATURE_XERROR = 0x07
+FEATURE_SELECT_BUCKET = 0x08
+FEATURE_COLLECTIONS = 0x09
+
 # TAP_OPAQUE types
 TAP_OPAQUE_ENABLE_AUTO_NACK        = 0
 TAP_OPAQUE_INITIAL_VBUCKET_STREAM  = 1
diff --git a/scripts/unmerged-commits.py b/scripts/unmerged-commits.py
new file mode 100755 (executable)
index 0000000..2a6a31d
--- /dev/null
@@ -0,0 +1,44 @@
+#!/usr/bin/env python2.7
+
+# Script to show which commit(s) are not yet merged between our release branches.
+
+from __future__ import print_function
+import subprocess
+import sys
+
+class bcolors:
+    """Define ANSI color codes, if we're running under a TTY."""
+    if sys.stdout.isatty():
+        HEADER = '\033[36m'
+        WARNING = '\033[33m'
+        ENDC = '\033[0m'
+    else:
+        HEADER = ''
+        WARNING = ''
+        ENDC = ''
+
+# Set of branches to check for unmerged patches. Ordered by ancestory;
+# i.e. the oldest supported branch to the newest, which is the order
+# patches should be merged.
+branches = ['couchbase/3.0.x',
+            'couchbase/sherlock',
+            'couchbase/watson',
+            'couchbase/master']
+
+total_unmerged = 0
+for downstream, upstream in zip(branches, branches[1:]):
+    commits = subprocess.check_output(['git', 'cherry', '-v',
+                                       upstream, downstream])
+    count = len(commits.splitlines())
+    total_unmerged += count
+    if count > 0:
+        print((bcolors.HEADER +
+               "{} commits in '{}' not present in '{}':" +
+               bcolors.ENDC).format(count, downstream, upstream))
+        print(commits)
+
+if total_unmerged:
+    print((bcolors.WARNING + "Total of {} commits outstanding" +
+           bcolors.ENDC).format(total_unmerged))
+
+sys.exit(total_unmerged)
index c6053ef..415797b 100644 (file)
 
 #include <iostream>
 
+#include <phosphor/phosphor.h>
+#include <platform/make_unique.h>
+
 #include "access_scanner.h"
 #include "ep_engine.h"
 #include "mutation_log.h"
+#include "vb_count_visitor.h"
+
+#include <numeric>
 
 class ItemAccessVisitor : public VBucketVisitor,
-                          public HashTableVisitor {
+                          public PauseResumeHashTableVisitor {
 public:
-    ItemAccessVisitor(EventuallyPersistentStore &_store, EPStats &_stats,
-                      uint16_t sh, AtomicValue<bool> &sfin, AccessScanner &aS) :
-        store(_store), stats(_stats), startTime(ep_real_time()),
-        taskStart(gethrtime()), shardID(sh), stateFinalizer(sfin), as(aS)
-    {
+    ItemAccessVisitor(KVBucket& _store,
+                      EPStats& _stats,
+                      uint16_t sh,
+                      std::atomic<bool>& sfin,
+                      AccessScanner& aS,
+                      uint64_t items_to_scan)
+        : VBucketVisitor(VBucketFilter(
+                  _store.getVBuckets().getShard(sh)->getVBuckets())),
+          store(_store),
+          stats(_stats),
+          startTime(ep_real_time()),
+          taskStart(gethrtime()),
+          shardID(sh),
+          stateFinalizer(sfin),
+          as(aS),
+          items_to_scan(items_to_scan) {
         Configuration &conf = store.getEPEngine().getConfiguration();
         name = conf.getAlogPath();
         std::stringstream s;
@@ -39,64 +56,69 @@ public:
         prev = name + ".old";
         next = name + ".next";
 
-        log = new MutationLog(next, conf.getAlogBlockSize());
+        log = std::make_unique<MutationLog>(next, conf.getAlogBlockSize());
         log->open();
         if (!log->isOpen()) {
             LOG(EXTENSION_LOG_WARNING, "Failed to open access log: '%s'",
                 next.c_str());
-            delete log;
-            log = NULL;
+            log.reset();
         } else {
             LOG(EXTENSION_LOG_NOTICE, "Attempting to generate new access file "
                 "'%s'", next.c_str());
         }
     }
 
-    void visit(StoredValue *v) override {
-        if (log != NULL && v->isResident()) {
-            if (v->isExpired(startTime) || v->isDeleted()) {
+    bool visit(StoredValue& v) override {
+        if (log && v.isResident()) {
+            if (v.isExpired(startTime) || v.isDeleted()) {
                 LOG(EXTENSION_LOG_INFO,
-                "INFO: Skipping expired/deleted item: %s",v->getKey().c_str());
+                    "INFO: Skipping expired/deleted item: %" PRIu64,
+                    v.getBySeqno());
             } else {
-                accessed.push_back(std::make_pair(v->getBySeqno(), v->getKey()));
+                accessed.push_back(StoredDocKey(v.getKey()));
+                return ++items_scanned < items_to_scan;
             }
         }
+        return true;
     }
 
     void update() {
-        if (log != NULL) {
-            std::list<std::pair<uint64_t, std::string> >::iterator it;
-            for (it = accessed.begin(); it != accessed.end(); ++it) {
-                log->newItem(currentBucket->getId(), it->second, it->first);
+        if (log != nullptr) {
+            for (auto it = accessed.begin(); it != accessed.end(); ++it) {
+                log->newItem(currentBucket->getId(), *it);
             }
         }
         accessed.clear();
     }
 
-    void visitBucket(RCPtr<VBucket> &vb) override {
+    void visitBucket(VBucketPtr &vb) override {
         currentBucket = vb;
         update();
 
-        if (log == NULL) {
+        if (log == nullptr) {
             return;
         }
-
+        HashTable::Position ht_start;
         if (vBucketFilter(vb->getId())) {
-            vb->ht.visit(*this);
+            while (ht_start != vb->ht.endPosition()) {
+                ht_start = vb->ht.pauseResumeVisit(*this, ht_start);
+                update();
+                log->commit1();
+                log->commit2();
+                items_scanned = 0;
+            }
         }
     }
 
     void complete() override {
-        update();
 
         if (log == nullptr) {
             updateStateFinalizer(false);
         } else {
-            size_t num_items = log->itemsLogged[ML_NEW];
+            size_t num_items = log->itemsLogged[int(MutationLogType::New)];
             log->commit1();
             log->commit2();
-            delete log;
-            log = NULL;
+            log.reset();
             stats.alogRuntime.store(ep_real_time() - startTime);
             stats.alogNumItems.store(num_items);
             stats.accessScannerHisto.add((gethrtime() - taskStart) / 1000);
@@ -164,8 +186,10 @@ private:
         }
     }
 
-    EventuallyPersistentStore &store;
-    EPStats &stats;
+    VBucketFilter vBucketFilter;
+
+    KVBucket& store;
+    EPStats& stats;
     rel_time_t startTime;
     hrtime_t taskStart;
     std::string prev;
@@ -173,28 +197,38 @@ private:
     std::string name;
     uint16_t shardID;
 
-    std::list<std::pair<uint64_t, std::string> > accessed;
+    std::vector<StoredDocKey> accessed;
 
-    MutationLog *log;
-    AtomicValue<bool> &stateFinalizer;
+    std::unique_ptr<MutationLog> log;
+    std::atomic<bool> &stateFinalizer;
     AccessScanner &as;
-    RCPtr<VBucket> currentBucket;
+
+    // The number items scanned since last pause
+    uint64_t items_scanned;
+    // The number of items to scan before we pause
+    const uint64_t items_to_scan;
+
+    VBucketPtr currentBucket;
 };
 
-AccessScanner::AccessScanner(EventuallyPersistentStore &_store, EPStats &st,
-                             double sleeptime, bool useStartTime,
+AccessScanner::AccessScanner(KVBucket& _store,
+                             EPStats& st,
+                             double sleeptime,
+                             bool useStartTime,
                              bool completeBeforeShutdown)
-    : GlobalTask(&_store.getEPEngine(), TaskId::AccessScanner, sleeptime,
+    : GlobalTask(&_store.getEPEngine(),
+                 TaskId::AccessScanner,
+                 sleeptime,
                  completeBeforeShutdown),
       completedCount(0),
       store(_store),
       stats(st),
       sleepTime(sleeptime),
       available(true) {
-
     Configuration &conf = store.getEPEngine().getConfiguration();
     residentRatioThreshold = conf.getAlogResidentRatioThreshold();
     alogPath = conf.getAlogPath();
+    maxStoredItems = conf.getAlogMaxStoredItems();
     double initialSleep = sleeptime;
     if (useStartTime) {
         size_t startTime = conf.getAlogTaskTime();
@@ -232,6 +266,8 @@ AccessScanner::AccessScanner(EventuallyPersistentStore &_store, EPStats &st,
 }
 
 bool AccessScanner::run() {
+    TRACE_EVENT0("ep-engine/task", "AccessScanner");
+
     bool inverse = true;
     if (available.compare_exchange_strong(inverse, false)) {
         store.resetAccessScannerTasktime();
@@ -240,11 +276,9 @@ bool AccessScanner::run() {
         bool deleteAccessLogFiles = false;
         /* Get the resident ratio */
         VBucketCountAggregator aggregator;
-        VBucketCountVisitor activeCountVisitor(store.getEPEngine(),
-                                               vbucket_state_active);
+        VBucketCountVisitor activeCountVisitor(vbucket_state_active);
         aggregator.addVisitor(&activeCountVisitor);
-        VBucketCountVisitor replicaCountVisitor(store.getEPEngine(),
-                                                vbucket_state_replica);
+        VBucketCountVisitor replicaCountVisitor(vbucket_state_replica);
         aggregator.addVisitor(&replicaCountVisitor);
 
         store.visit(aggregator);
@@ -272,13 +306,15 @@ bool AccessScanner::run() {
                 deleteAlogFile(name);
                 stats.accessScannerSkips++;
             } else {
-                std::shared_ptr<ItemAccessVisitor> pv(new ItemAccessVisitor(store,
-                                                 stats, i, available, *this));
-                std::shared_ptr<VBucketVisitor> vbv(pv);
-                ExTask task = new VBucketVisitorTask(&store, vbv, i,
-                                                     "Item Access Scanner",
-                                                     sleepTime, true);
-                ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
+                auto pv = std::make_unique<ItemAccessVisitor>(
+                        store, stats, i, available, *this, maxStoredItems);
+                ExTask task = new VBCBAdaptor(&store,
+                                              TaskId::AccessScannerVisitor,
+                                              std::move(pv),
+                                              "Item Access Scanner",
+                                              sleepTime,
+                                              true);
+                ExecutorPool::get()->schedule(task);
             }
         }
     }
@@ -295,8 +331,8 @@ void AccessScanner::updateAlogTime(double sleepSecs) {
     stats.alogTime.store(_waketime.tv_sec);
 }
 
-std::string AccessScanner::getDescription() {
-    return std::string("Generating access log");
+cb::const_char_buffer AccessScanner::getDescription() {
+    return "Generating access log";
 }
 
 void AccessScanner::deleteAlogFile(const std::string& fileName) {
index f878b79..3821b76 100644 (file)
 
 #include "config.h"
 
-#include <string>
+#include "globaltask.h"
 
-#include "tasks.h"
+#include <string>
 
 // Forward declaration.
-class EventuallyPersistentStore;
+class EPStats;
+class KVBucket;
 class AccessScannerValueChangeListener;
 
 class AccessScanner : public GlobalTask {
     friend class AccessScannerValueChangeListener;
 public:
-    AccessScanner(EventuallyPersistentStore &_store, EPStats &st,
+    AccessScanner(KVBucket& _store, EPStats& st,
                   double sleeptime = 0,
                   bool useStartTime = false,
                   bool completeBeforeShutdown = false);
 
     bool run();
-    std::string getDescription();
-    AtomicValue<size_t> completedCount;
+    cb::const_char_buffer getDescription();
+    std::atomic<size_t> completedCount;
 
 private:
     void updateAlogTime(double sleepSecs);
     void deleteAlogFile(const std::string& fileName);
 
-    EventuallyPersistentStore &store;
-    EPStats &stats;
+    KVBucket& store;
+    EPStatsstats;
     double sleepTime;
     std::string alogPath;
-    AtomicValue<bool> available;
+    std::atomic<bool> available;
     uint8_t residentRatioThreshold;
+    uint64_t maxStoredItems;
 };
 
 #endif  // SRC_ACCESS_SCANNER_H_
index 6b910b1..0c7ad57 100644 (file)
 
 SpinLock::SpinLock()
 {
-    lock.clear();
+    lck.clear();
 }
 
 SpinLock::~SpinLock() {}
 
 bool SpinLock::tryAcquire(void) {
-    return !lock.test_and_set(std::memory_order_acquire);
+    return !lck.test_and_set(std::memory_order_acquire);
 }
 
 
-void SpinLock::acquire(void) {
+void SpinLock::lock(void) {
    int spin = 0;
    while (!tryAcquire()) {
       ++spin;
@@ -41,6 +41,6 @@ void SpinLock::acquire(void) {
    }
 }
 
-void SpinLock::release(void) {
-    lock.clear(std::memory_order_release);
+void SpinLock::unlock(void) {
+    lck.clear(std::memory_order_release);
 }
index a32418d..8945b1b 100644 (file)
 
 #include <atomic>
 
-#define AtomicValue std::atomic
-
-#include "callbacks.h"
 #include "locks.h"
 #include "utility.h"
 
 template <typename T>
-void atomic_setIfBigger(AtomicValue<T> &obj, const T &newValue) {
+void atomic_setIfBigger(std::atomic<T> &obj, const T &newValue) {
     T oldValue = obj.load();
     while (newValue > oldValue) {
         if (obj.compare_exchange_strong(oldValue, newValue)) {
@@ -40,7 +37,7 @@ void atomic_setIfBigger(AtomicValue<T> &obj, const T &newValue) {
 }
 
 template <typename T>
-void atomic_setIfLess(AtomicValue<T> &obj, const T &newValue) {
+void atomic_setIfLess(std::atomic<T> &obj, const T &newValue) {
     T oldValue = obj.load();
     while (newValue < oldValue) {
         if (obj.compare_exchange_strong(oldValue, newValue)) {
@@ -51,7 +48,7 @@ void atomic_setIfLess(AtomicValue<T> &obj, const T &newValue) {
 }
 
 template <typename T>
-T atomic_swapIfNot(AtomicValue<T> &obj, const T &badValue, const T &newValue) {
+T atomic_swapIfNot(std::atomic<T> &obj, const T &badValue, const T &newValue) {
     T oldValue;
     while (true) {
         oldValue = obj.load();
@@ -72,26 +69,26 @@ T atomic_swapIfNot(AtomicValue<T> &obj, const T &badValue, const T &newValue) {
  * This does *not* make the item that's pointed to atomic.
  */
 template <typename T>
-class AtomicPtr : public AtomicValue<T*> {
+class AtomicPtr : public std::atomic<T*> {
 public:
-    AtomicPtr(T *initial = NULL) : AtomicValue<T*>(initial) {}
+    AtomicPtr(T *initial = NULL) : std::atomic<T*>(initial) {}
 
     ~AtomicPtr() {}
 
     T *operator ->() {
-        return AtomicValue<T*>::load();
+        return std::atomic<T*>::load();
     }
 
     T &operator *() {
-        return *AtomicValue<T*>::load();
+        return *std::atomic<T*>::load();
     }
 
     operator bool() const {
-        return AtomicValue<T*>::load() != NULL;
+        return std::atomic<T*>::load() != NULL;
     }
 
     bool operator !() const {
-        return AtomicValue<T*>::load() == NULL;
+        return std::atomic<T*>::load() == NULL;
     }
 };
 
@@ -107,45 +104,16 @@ public:
     SpinLock();
     ~SpinLock();
 
-    void acquire(void);
-    void release(void);
+    void lock(void);
+    void unlock(void);
 
 private:
     bool tryAcquire(void);
 
-    std::atomic_flag lock;
+    std::atomic_flag lck;
     DISALLOW_COPY_AND_ASSIGN(SpinLock);
 };
 
-/**
- * Safe LockHolder for SpinLock instances.
- */
-class SpinLockHolder {
-public:
-    SpinLockHolder(SpinLock *theLock) : sl(theLock) {
-        lock();
-    }
-
-    ~SpinLockHolder() {
-        unlock();
-    }
-
-    void lock() {
-        sl->acquire();
-        locked = true;
-    }
-
-    void unlock() {
-        if (locked) {
-            sl->release();
-            locked = false;
-        }
-    }
-private:
-    SpinLock *sl;
-    bool locked;
-};
-
 template <class T> class RCPtr;
 template <class S> class SingleThreadedRCPtr;
 
@@ -168,7 +136,7 @@ private:
         return --_rc_refcount;
     }
 
-    mutable AtomicValue<int> _rc_refcount;
+    mutable std::atomic<int> _rc_refcount;
 };
 
 /**
@@ -230,7 +198,7 @@ public:
 
 private:
     C *gimme() const {
-        SpinLockHolder lh(&lock);
+        std::lock_guard<SpinLock> lh(lock);
         if (value) {
             static_cast<RCValue *>(value)->_rc_incref();
         }
@@ -238,9 +206,11 @@ private:
     }
 
     void swap(C *newValue) {
-        SpinLockHolder lh(&lock);
-        C *tmp(value.exchange(newValue));
-        lh.unlock();
+        C* tmp;
+        {
+            std::lock_guard<SpinLock> lh(lock);
+            tmp = value.exchange(newValue);
+        }
         if (tmp != NULL && static_cast<RCValue *>(tmp)->_rc_decref() == 0) {
             delete tmp;
         }
@@ -251,6 +221,16 @@ private:
 };
 
 /**
+ * Dynamic cast for RCPtr. Modelled on method of the same name for
+ * std::shared_ptr.
+ */
+template <class T, class U>
+RCPtr<T> dynamic_pointer_cast(const RCPtr<U>& r) {
+    T* p = dynamic_cast<T*>(r.get());
+    return p ? RCPtr<T>(p) : RCPtr<T>();
+}
+
+/**
  * Single-threaded reference counted pointer.
  * "Single-threaded" means that the reference counted pointer should be accessed
  * by only one thread at any time or accesses to the reference counted pointer
@@ -267,6 +247,15 @@ public:
 
     SingleThreadedRCPtr(const SingleThreadedRCPtr<T> &other) : value(other.gimme()) {}
 
+    template <typename Y>
+    SingleThreadedRCPtr(const SingleThreadedRCPtr<Y>& other)
+        : value(other.gimme()) {
+    }
+
+    SingleThreadedRCPtr(std::unique_ptr<T>&& other)
+        : SingleThreadedRCPtr(other.release()) {
+    }
+
     ~SingleThreadedRCPtr() {
         if (value && static_cast<RCValue *>(value)->_rc_decref() == 0) {
             delete value;
@@ -284,6 +273,10 @@ public:
         swap(other.gimme());
     }
 
+    int refCount() const {
+        return static_cast<RCValue*>(value)->_rc_refcount.load();
+    }
+
     // safe for the lifetime of this instance
     T *get() const {
         return value;
@@ -311,6 +304,9 @@ public:
     }
 
 private:
+    template <typename Y>
+    friend class SingleThreadedRCPtr;
+
     T *gimme() const {
         if (value) {
             static_cast<RCValue *>(value)->_rc_incref();
@@ -329,6 +325,10 @@ private:
     T *value;
 };
 
+template <typename T, class... Args>
+SingleThreadedRCPtr<T> make_STRCPtr(Args&&... args) {
+    return SingleThreadedRCPtr<T>(new T(std::forward<Args>(args)...));
+}
 
 /**
  * Debugging wrapper around std::atomic which print all accesses to the atomic
@@ -376,6 +376,17 @@ public:
         return result;
     }
 
+    bool compare_exchange_strong(T& expected, T desired,
+                                 std::memory_order order =
+                                      std::memory_order_seq_cst ) {
+        std::lock_guard<std::mutex> lock(stderr_mutex);
+        std::cerr << "LoggedAtomic[" << this << "]::compare_exchange_strong("
+                  << "expected:" << expected << ", desired:) = " << desired;
+        auto result = value.compare_exchange_strong(expected, desired, order);
+        std::cerr << result << std::endl;
+        return result;
+    }
+
     T fetch_add(T arg,
                 std::memory_order order = std::memory_order_seq_cst ) {
         std::lock_guard<std::mutex> lock(stderr_mutex);
@@ -394,6 +405,14 @@ public:
         return value.load();
     }
 
+    T& operator++() {
+        std::lock_guard<std::mutex> lock(stderr_mutex);
+        ++value;
+        std::cerr << "LoggedAtomic[" << this << "]::pre-increment: "
+                  << value << std::endl;
+        return value;
+    }
+
 protected:
     mutable std::mutex stderr_mutex;
     std::atomic<T> value;
diff --git a/src/atomic_unordered_map.h b/src/atomic_unordered_map.h
new file mode 100644 (file)
index 0000000..cdcb780
--- /dev/null
@@ -0,0 +1,251 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2016 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/**
+ * AtomicUnorderedMap - A thread-safe map class.
+ *
+ * AtomicUnorderedMap is a thread-safe unordered map (associative array).
+ * Elements can be added, removed and found concurrently from different
+ * threads safely.
+ *
+ *
+ * THREAD SAFETY
+ * Items are returned by value (instead of via an iterator) - this ensures that
+ * once an item is passed back to the caller, it can safely be accessed even if
+ * another thread has concurrently deleted it from the map.
+ *
+ * While this may seen limiting, the value type can be a (smart) pointer if
+ * desired, removing the need to copy the actual underlying object. However,
+ * if a pointer type is used then operations on the pointed-to objects are
+ * *not* automatically thread-safe. In other words, while you can safely call
+ * insert(<ptr>) from multiple threads, you *cannot* safely mutate the object
+ * pointed to (by the pointer which insert() returns) from multiple threads
+ * without additional synchronization. For example, having an per-object
+ * mutex, or making the object atomic.
+ *
+ *
+ * FUNCTIONALITY
+ * Implements a relatively simple set of operations modeled on
+ * std::unordered_map:
+ *
+ *   - size() to return the number of elements in the map.
+ *   - insert() to add an element
+ *   - find() to search for an element
+ *   - erase() to delete an element
+ *   - clear() to delete all elements
+ *
+ * Iteration, a la `auto it = begin(); it++; ...` isn't directly supported;
+ * the main reason is that another thread may have removed an item between
+ * calling begin() and moving to the next item, so it's not possible to ensure
+ * all elements are acted on. Instead, a number of functions similar to
+ * std::algorithm are provided:
+ *
+ *   - find_if() to search for the first element matching a given predicate.
+ *   - for_each() to apply a function to every element in the map.
+ *
+ *
+ * LOCKING STRATEGIES
+ * There are two locking strategies available:
+ * - Internal locking, where the methods themselves lock on entry (and unlock
+ *   on exit).
+ * - External locking, where a lock is acquired before calling the methods.
+ *
+ * For simple use-cases internal locking is sufficient (and safer) - the caller
+ * doesn't have to concern themselves with locking, and can rely on the object
+ * doing the right thing.
+ * However if the caller needs to ensure that multiple operations on the map
+ * are atomic (e.g. find()ing an item and then conditionally erase()ing it) then
+ * external locking can be used.
+ *
+ * For example, to atomically remove an key only if it's value is false:
+ *
+ *     typedef AtomicUnorderedMap<int, bool> M; // Key:int, Value:bool
+ *     M map;
+ *     ...
+ *     { // Create new scope for external lock guard.
+ *         std::lock_guard<M> guard(map);
+ *         bool it = map.find(key_of_interest, guard);
+ *         if (it && *it == false) {
+ *             map.erase(it, guard);
+ *         }
+ *     } // end of scope, map unlocked.
+ *
+ * Note that the guard is passed into the find() and erase() functions to
+ * indicate that an external lock is already acquired (and hence an internal
+ * lock should not be acquired).
+ *
+ * See Boost Synchronization
+ * (http://www.boost.org/doc/libs/1_60_0/doc/html/thread/synchronization.html)
+ * for more details & background on the internal / external locking strategies
+ * used here.
+ */
+
+#pragma once
+
+#include "config.h"
+
+#include "atomic.h"
+
+#include <algorithm>
+#include <mutex>
+#include <unordered_map>
+
+template<class Key,
+         class T,
+         class Hash = std::hash<Key>,
+         class KeyEqual = std::equal_to<Key>,
+         class Allocator = std::allocator< std::pair<const Key, T> > >
+class AtomicUnorderedMap;
+
+template<class Key, class T, class Hash, class KeyEqual,
+         class Allocator>
+class AtomicUnorderedMap {
+public:
+
+    using map_type = AtomicUnorderedMap<Key, T, Hash, KeyEqual, Allocator>;
+
+    // Alias to simplify all the other defs
+    using base_map_type = typename std::unordered_map<Key, T, Hash,
+                                                      KeyEqual, Allocator>;
+
+    // Map to the type aliases in the underlying map.
+    using key_type = typename base_map_type::key_type;
+    using mapped_type = typename base_map_type::mapped_type;
+    using value_type = typename base_map_type::value_type;
+    using size_type = typename base_map_type::size_type;
+
+    size_type size() const {
+        std::lock_guard<std::mutex> guard(this->mutex); // internally locked
+        return map.size();
+    }
+
+    /* Lookup */
+
+    /** Searches for the given key in the map.
+     *  Returns a pair consisting of:
+     *  - the found element (or a default-constructed element if not found)
+     *  - and bool denoting if the given key was found.
+     */
+    std::pair<T, bool> find(const Key& key, std::lock_guard<map_type>&) {
+        // Externally locked)
+        auto iter = map.find(key);
+        if (iter != map.end()) {
+            return {iter->second, true};
+        } else {
+            return std::make_pair(T(), false);
+        }
+    }
+    std::pair<T, bool> find(const Key& key) {
+        std::lock_guard<map_type> guard(*this); // internally locked
+        return find(key, guard);
+    }
+
+    /** Searches for first element which matches the given predicate.
+     *  Returns a pair consisting of:
+     *  - the first found element (or a default-constructed element if not found)
+     *  - and bool denoting if a matching element was found.
+     */
+    template<class UnaryPredicate>
+    std::pair<T, bool>  find_if(UnaryPredicate p) {
+        std::lock_guard<map_type> guard(*this); // internally locked
+        auto iter = std::find_if(map.begin(), map.end(), p);
+        if (iter != map.end()) {
+            return {iter->second, true};
+        } else {
+            return std::make_pair(T(), false);
+        }
+    }
+
+    /* Modifiers */
+
+    void clear(std::lock_guard<map_type>&) {
+        // Externally locked
+        map.clear();
+    }
+    void clear() {
+        std::lock_guard<map_type> guard(*this); // internally locked
+        clear(guard);
+    }
+
+    /** Applies the given function object to every element in the map.
+     */
+    template<class UnaryFunction>
+    void for_each(UnaryFunction f, std::lock_guard<map_type>&) {
+        // Externally locked
+        std::for_each(map.begin(), map.end(), f);
+    }
+
+    template<class UnaryFunction>
+    void for_each(UnaryFunction f) {
+        std::lock_guard<map_type> guard(*this); // internally locked
+        for_each(f, guard);
+    }
+
+    /**
+     * Attempts to erase the given key from the map.
+     *  Returns a pair consisting of:
+     *  - the erased element (or a default-constructed element if not found)
+     *  - and bool denoting if the given key was erased.
+     */
+    std::pair<T, bool> erase(const key_type& key, std::lock_guard<map_type>&) {
+        // Externally locked
+        auto iter = map.find(key);
+        if (iter != map.end()) {
+            T result = iter->second;
+            map.erase(iter);
+            return {result, true};
+        } else {
+            return std::make_pair(T(), false);
+        }
+    }
+    std::pair<T, bool> erase(const key_type& key) {
+        std::lock_guard<map_type> guard(*this); // internally locked
+        return erase(key, guard);
+    }
+
+    /**
+     * Attempts to insert the given key into the map, if it does not already
+     * exist.
+     *  Returns true if the element was inserted, or false if an element
+     *  with the given key already exists.
+     */
+    bool insert(const value_type& value) {
+        std::lock_guard<map_type> guard(*this); // internally locked
+        auto result = map.insert(value);
+        return result.second;
+    }
+
+    /*
+     * Locking
+     *
+     * Note: Prefer to use RAII-style lock holders (e.g. std::lock_guard<>())
+     *       instead of the raw methods here.
+     */
+
+    /* Explicitly locks the container. */
+    void lock() {
+        mutex.lock();
+    }
+
+    void unlock() {
+        mutex.unlock();
+    }
+
+private:
+    std::unordered_map<Key, T, Hash, KeyEqual, Allocator> map;
+    mutable std::mutex mutex;
+};
index 4c05eec..eaf564e 100644 (file)
@@ -69,7 +69,7 @@ private:
 
 #include <queue>
 
-#include "atomic.h"
+#include <atomic>
 #include "threadlocal.h"
 #include "utility.h"
 
@@ -171,8 +171,8 @@ private:
 
     ThreadLocalPtr<AtomicPtr<std::queue<T> > > threadQueue;
     AtomicPtr<std::queue<T> > queues[MAX_THREADS];
-    AtomicValue<size_t> counter;
-    AtomicValue<size_t> numItems;
+    std::atomic<size_t> counter;
+    std::atomic<size_t> numItems;
     DISALLOW_COPY_AND_ASSIGN(AtomicQueue);
 };
 #endif
index 42ab731..750c633 100644 (file)
 #include <string>
 #include <vector>
 
+#include <phosphor/phosphor.h>
+
 #include "atomic.h"
 #include "backfill.h"
-#include "ep.h"
+#include "ep_engine.h"
+#include "kv_bucket_iface.h"
+#include "tapconnmap.h"
 #include "vbucket.h"
 
 class ItemResidentCallback : public Callback<CacheLookup> {
@@ -46,22 +50,26 @@ private:
 };
 
 void ItemResidentCallback::callback(CacheLookup &lookup) {
-    RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(lookup.getVBucketId());
+    VBucketPtr vb = engine->getKVBucket()->getVBucket(
+                                                        lookup.getVBucketId());
     if (!vb) {
         setStatus(ENGINE_SUCCESS);
         return;
     }
 
-    int bucket_num(0);
-    LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
-    StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num);
+    auto hbl = vb->ht.getLockedBucket(lookup.getKey());
+    StoredValue* v = vb->ht.unlocked_find(lookup.getKey(),
+                                          hbl.getBucketNum(),
+                                          WantsDeleted::No,
+                                          TrackReference::Yes);
     if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
-        Item* it = v->toItem(false, lookup.getVBucketId());
-        lh.unlock();
+        auto it = v->toItem(false, lookup.getVBucketId());
+        hbl.getHTLock().unlock();
         CompletedBGFetchTapOperation tapop(connToken,
                                            lookup.getVBucketId(), true);
-        if (!connMap.performOp(tapConnName, tapop, it)) {
-            delete it;
+        if (connMap.performOp(tapConnName, tapop, it.get())) {
+            // On success performOp has taken ownership of the item.
+            it.release();
         }
         setStatus(ENGINE_KEY_EEXISTS);
     } else {
@@ -103,8 +111,31 @@ void BackfillDiskCallback::callback(GetValue &gv) {
     }
 }
 
+BackfillDiskLoad::BackfillDiskLoad(const std::string& n,
+                                   EventuallyPersistentEngine* e,
+                                   TapConnMap& cm,
+                                   KVStore* s,
+                                   uint16_t vbid,
+                                   uint64_t start_seqno,
+                                   hrtime_t token,
+                                   double sleeptime,
+                                   bool shutdown)
+    : GlobalTask(e, TaskId::BackfillDiskLoad, sleeptime, shutdown),
+      name(n),
+      description("Loading TAP backfill from disk: vb " + std::to_string(vbid)),
+      engine(e),
+      connMap(cm),
+      store(s),
+      vbucket(vbid),
+      startSeqno(start_seqno),
+      connToken(token) {
+    ScheduleDiskBackfillTapOperation tapop;
+    cm.performOp(name, tapop, static_cast<void*>(NULL));
+}
+
 bool BackfillDiskLoad::run() {
-    if (engine->getEpStore()->isMemoryUsageTooHigh()) {
+    TRACE_EVENT0("ep-engine/task", "BackfillDiskload");
+    if (engine->getKVBucket()->isMemoryUsageTooHigh()) {
         LOG(EXTENSION_LOG_INFO, "VBucket %d backfill task from disk is "
          "temporarily suspended  because the current memory usage is too high",
          vbucket);
@@ -113,12 +144,11 @@ bool BackfillDiskLoad::run() {
     }
 
     if (connMap.checkConnectivity(name) &&
-                               !engine->getEpStore()->isFlushAllScheduled()) {
+        !engine->getKVBucket()->isDeleteAllScheduled()) {
         size_t num_items;
         size_t num_deleted;
         try {
-            DBFileInfo info = store->getDbFileInfo(vbucket);
-            num_items = info.itemCount;
+            num_items = store->getItemCount(vbucket);
             num_deleted = store->getNumPersistedDeletes(vbucket);
         } catch (std::system_error& e) {
             if (e.code() == std::error_code(ENOENT, std::system_category())) {
@@ -162,30 +192,38 @@ bool BackfillDiskLoad::run() {
     return false;
 }
 
-std::string BackfillDiskLoad::getDescription() {
-    std::stringstream rv;
-    rv << "Loading TAP backfill from disk: vb " << vbucket;
-    return rv.str();
+cb::const_char_buffer BackfillDiskLoad::getDescription() {
+    return description;
+}
+
+BackFillVisitor::BackFillVisitor(EventuallyPersistentEngine* e,
+                                 TapConnMap& cm,
+                                 Producer* tc,
+                                 const VBucketFilter& backfillVBfilter)
+    : VBucketVisitor(backfillVBfilter),
+      engine(e),
+      connMap(cm),
+      name(tc->getName()),
+      connToken(tc->getConnectionToken()),
+      valid(true) {
 }
 
-void BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
+void BackFillVisitor::visitBucket(VBucketPtr &vb) {
     if (vBucketFilter(vb->getId())) {
-        item_eviction_policy_t policy =
-            engine->getEpStore()->getItemEvictionPolicy();
-        double num_items = static_cast<double>(vb->getNumItems(policy));
+        double num_items = static_cast<double>(vb->getNumItems());
 
         if (num_items == 0) {
             return;
         }
 
-        KVStore *underlying(engine->getEpStore()->
-                            getROUnderlying(vb->getId()));
+        KVStore *underlying(engine->getKVBucket()->getROUnderlying(
+                                                                vb->getId()));
         LOG(EXTENSION_LOG_INFO,
             "Schedule a full backfill from disk for vbucket %d.", vb->getId());
         ExTask task = new BackfillDiskLoad(name, engine, connMap,
                                           underlying, vb->getId(), 0, connToken,
                                           0, false);
-        ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
+        ExecutorPool::get()->schedule(task);
     }
 }
 
@@ -229,9 +267,3 @@ bool BackFillVisitor::checkValidity() {
     }
     return valid;
 }
-
-bool BackfillTask::run(void) {
-    engine->getEpStore()->visit(bfv, "Backfill task", NONIO_TASK_IDX,
-                                TaskId::BackfillVisitorTask, 1);
-    return false;
-}
index 950f7c5..278e7f9 100644 (file)
 
 #include "config.h"
 
-#include <list>
-#include <map>
-#include <set>
 #include <string>
-#include <unordered_map>
-#include <vector>
 
-#include "ep_engine.h"
-#include "stats.h"
-#include "tasks.h"
-#include "connmap.h"
+#include "globaltask.h"
+#include "kv_bucket_iface.h"
 
 #define DEFAULT_BACKFILL_SNOOZE_TIME 1.0
 
+class EventuallyPersistentEngine;
+class TapConnMap;
+
 enum backfill_t {
     ALL_MUTATIONS = 1,
     DELETIONS_ONLY
@@ -52,20 +48,15 @@ public:
     BackfillDiskLoad(const std::string &n, EventuallyPersistentEngine* e,
                      TapConnMap &cm, KVStore *s, uint16_t vbid,
                      uint64_t start_seqno, hrtime_t token,
-                     double sleeptime = 0, bool shutdown = false)
-        : GlobalTask(e, TaskId::BackfillDiskLoad, sleeptime, shutdown),
-          name(n), engine(e), connMap(cm), store(s), vbucket(vbid),
-          startSeqno(start_seqno), connToken(token) {
-        ScheduleDiskBackfillTapOperation tapop;
-        cm.performOp(name, tapop, static_cast<void*>(NULL));
-    }
+                     double sleeptime = 0, bool shutdown = false);
 
     bool run();
 
-    std::string getDescription();
+    cb::const_char_buffer getDescription();
 
 private:
     const std::string           name;
+    const std::string description;
     EventuallyPersistentEngine *engine;
     TapConnMap                    &connMap;
     KVStore                    *store;
@@ -75,20 +66,21 @@ private:
 };
 
 /**
- * VBucketVisitor to backfill a Producer. This visitor basically performs backfill from memory
- * for only resident items if it needs to schedule a separate disk backfill task because of
- * low resident ratio.
+ * VBucketVisitor to backfill a Producer. This visitor basically performs
+ * backfill from memory for only resident items if it needs to schedule a
+ * separate disk backfill task because of low resident ratio.
+ *
+ * The visitor will pause if the current backfill backlog for the corresponding
+ * producer is greater than the threshold (5000 by default).
  */
 class BackFillVisitor : public VBucketVisitor {
 public:
     BackFillVisitor(EventuallyPersistentEngine *e, TapConnMap &cm, Producer *tc,
-                    const VBucketFilter &backfillVBfilter):
-        VBucketVisitor(backfillVBfilter), engine(e), connMap(cm),
-        name(tc->getName()), connToken(tc->getConnectionToken()), valid(true) {}
+                    const VBucketFilter &backfillVBfilter);
 
     virtual ~BackFillVisitor() {}
 
-    void visitBucket(RCPtr<VBucket> &vb) override;
+    void visitBucket(VBucketPtr &vb) override;
 
     void complete(void) override;
 
@@ -105,31 +97,4 @@ private:
     bool valid;
 };
 
-/**
- * Backfill task is scheduled as a non-IO task. Each backfill task performs
- * backfill from memory or disk depending on the resident ratio. Each backfill
- * task can backfill more than one vbucket, but will snooze for 1 sec if the
- * current backfill backlog for the corresponding TAP producer is greater than
- * the threshold (5000 by default).
- */
-class BackfillTask : public GlobalTask {
-public:
-
-    BackfillTask(EventuallyPersistentEngine *e, TapConnMap &cm, Producer *tc,
-                 const VBucketFilter &backfillVBFilter):
-                 GlobalTask(e, TaskId::BackfillTask, 0, false),
-      bfv(new BackFillVisitor(e, cm, tc, backfillVBFilter)), engine(e) {}
-
-    virtual ~BackfillTask() {}
-
-    bool run(void);
-
-    std::string getDescription() {
-        return std::string("Backfilling items from memory and disk.");
-    }
-
-    std::shared_ptr<BackFillVisitor> bfv;
-    EventuallyPersistentEngine *engine;
-};
-
 #endif  // SRC_BACKFILL_H_
index 7bdd062..16272f9 100644 (file)
 #include <vector>
 
 #include "bgfetcher.h"
-#include "ep.h"
-#include "kvshard.h"
+#include "ep_engine.h"
 #include "executorthread.h"
+#include "kv_bucket.h"
+#include "kvshard.h"
+#include "tasks.h"
 
 const double BgFetcher::sleepInterval = MIN_SLEEP_TIME;
 
+BgFetcher::BgFetcher(KVBucket& s, KVShard& k)
+    : BgFetcher(&s, &k, s.getEPEngine().getEpStats()) {
+}
+
 void BgFetcher::start() {
     bool inverse = false;
     pendingFetch.compare_exchange_strong(inverse, true);
     ExecutorPool* iom = ExecutorPool::get();
     ExTask task = new MultiBGFetcherTask(&(store->getEPEngine()), this, false);
     this->setTaskId(task->getId());
-    iom->schedule(task, READER_TASK_IDX);
+    iom->schedule(task);
 }
 
 void BgFetcher::stop() {
@@ -43,60 +49,58 @@ void BgFetcher::stop() {
 }
 
 void BgFetcher::notifyBGEvent(void) {
-    ++stats.numRemainingBgJobs;
+    ++stats.numRemainingBgItems;
     bool inverse = false;
     if (pendingFetch.compare_exchange_strong(inverse, true)) {
         ExecutorPool::get()->wake(taskId);
     }
 }
 
-size_t BgFetcher::doFetch(VBucket::id_type vbId) {
-    hrtime_t startTime(gethrtime());
-    LOG(EXTENSION_LOG_DEBUG, "BgFetcher is fetching data, vBucket = %d "
-        "numDocs = %" PRIu64 ", startTime = %" PRIu64,
-        vbId, uint64_t(items2fetch.size()), startTime/1000000);
+size_t BgFetcher::doFetch(VBucket::id_type vbId,
+                          vb_bgfetch_queue_t& itemsToFetch) {
+    ProcessClock::time_point startTime(ProcessClock::now());
+    LOG(EXTENSION_LOG_DEBUG,
+        "BgFetcher is fetching data, vb:%" PRIu16 " numDocs:%" PRIu64 " "
+        "startTime:%" PRIu64,
+        vbId,
+        uint64_t(itemsToFetch.size()),
+        std::chrono::duration_cast<std::chrono::milliseconds>(
+                startTime.time_since_epoch())
+                .count());
 
-    shard->getROUnderlying()->getMulti(vbId, items2fetch);
+    shard->getROUnderlying()->getMulti(vbId, itemsToFetch);
 
-    size_t totalfetches = 0;
     std::vector<bgfetched_item_t> fetchedItems;
-    vb_bgfetch_queue_t::iterator itr = items2fetch.begin();
-    for (; itr != items2fetch.end(); ++itr) {
-        vb_bgfetch_item_ctx_t &bg_item_ctx = (*itr).second;
-        std::list<VBucketBGFetchItem *> &requestedItems = bg_item_ctx.bgfetched_list;
-        std::list<VBucketBGFetchItem *>::iterator itm = requestedItems.begin();
-        for(; itm != requestedItems.end(); ++itm) {
-            const std::string &key = (*itr).first;
-            fetchedItems.push_back(std::make_pair(key, *itm));
-            ++totalfetches;
+    for (const auto& fetch : itemsToFetch) {
+        auto& key = fetch.first;
+        const vb_bgfetch_item_ctx_t& bg_item_ctx = fetch.second;
+
+        for (const auto& itm : bg_item_ctx.bgfetched_list) {
+            // We don't want to transfer ownership of itm here as we clean it
+            // up at the end of this method in clearItems()
+            fetchedItems.push_back(std::make_pair(key, itm.get()));
         }
     }
 
-    if (totalfetches > 0) {
+    if (fetchedItems.size() > 0) {
         store->completeBGFetchMulti(vbId, fetchedItems, startTime);
-        stats.getMultiHisto.add((gethrtime()-startTime)/1000, totalfetches);
+        stats.getMultiHisto.add(
+                std::chrono::duration_cast<std::chrono::microseconds>(
+                        ProcessClock::now() - startTime)
+                        .count(),
+                fetchedItems.size());
     }
 
-    // failed requests will get requeued for retry within clearItems()
-    clearItems(vbId);
-    return totalfetches;
+    clearItems(vbId, itemsToFetch);
+    return fetchedItems.size();
 }
 
-void BgFetcher::clearItems(VBucket::id_type vbId) {
-    vb_bgfetch_queue_t::iterator itr = items2fetch.begin();
-
-    for(; itr != items2fetch.end(); ++itr) {
+void BgFetcher::clearItems(VBucket::id_type vbId,
+                           vb_bgfetch_queue_t& itemsToFetch) {
+    for (auto& fetch : itemsToFetch) {
         // every fetched item belonging to the same key shares
         // a single data buffer, just delete it from the first fetched item
-        vb_bgfetch_item_ctx_t& bg_item_ctx = (*itr).second;
-        std::list<VBucketBGFetchItem *> &doneItems = bg_item_ctx.bgfetched_list;
-        VBucketBGFetchItem *firstItem = doneItems.front();
-        firstItem->delValue();
-
-        std::list<VBucketBGFetchItem *>::iterator dItr = doneItems.begin();
-        for (; dItr != doneItems.end(); ++dItr) {
-            delete *dItr;
-        }
+        fetch.second.bgfetched_list.front()->delValue();
     }
 }
 
@@ -105,36 +109,35 @@ bool BgFetcher::run(GlobalTask *task) {
     bool inverse = true;
     pendingFetch.compare_exchange_strong(inverse, false);
 
-    std::vector<uint16_t> bg_vbs;
-    LockHolder lh(queueMutex);
-    std::set<uint16_t>::iterator it = pendingVbs.begin();
-    for (; it != pendingVbs.end(); ++it) {
-        bg_vbs.push_back(*it);
+    std::vector<uint16_t> bg_vbs(pendingVbs.size());
+    {
+        LockHolder lh(queueMutex);
+        bg_vbs.assign(pendingVbs.begin(), pendingVbs.end());
+        pendingVbs.clear();
     }
-    pendingVbs.clear();
-    lh.unlock();
-
-    std::vector<uint16_t>::iterator ita = bg_vbs.begin();
-    for (; ita != bg_vbs.end(); ++ita) {
-        uint16_t vbId = *ita;
-        if (store->getVBuckets().isBucketCreation(vbId)) {
-            // Requeue the bg fetch task if a vbucket DB file is not
-            // created yet.
-            lh.lock();
-            pendingVbs.insert(vbId);
-            lh.unlock();
-            bool inverse = false;
-            pendingFetch.compare_exchange_strong(inverse, true);
-            continue;
-        }
-        RCPtr<VBucket> vb = shard->getBucket(vbId);
-        if (vb && vb->getBGFetchItems(items2fetch)) {
-            num_fetched_items += doFetch(vbId);
-            items2fetch.clear();
+
+    for (const uint16_t vbId : bg_vbs) {
+        VBucketPtr vb = shard->getBucket(vbId);
+        if (vb) {
+            // Requeue the bg fetch task if vbucket DB file is not created yet.
+            if (vb->isBucketCreation()) {
+                {
+                    LockHolder lh(queueMutex);
+                    pendingVbs.insert(vbId);
+                }
+                bool inverse = false;
+                pendingFetch.compare_exchange_strong(inverse, true);
+                continue;
+            }
+
+            auto items = vb->getBGFetchItems();
+            if (items.size() > 0) {
+                num_fetched_items += doFetch(vbId, items);
+            }
         }
     }
 
-    stats.numRemainingBgJobs.fetch_sub(num_fetched_items);
+    stats.numRemainingBgItems.fetch_sub(num_fetched_items);
 
     if (!pendingFetch.load()) {
         // wait a bit until next fetch request arrives
@@ -150,9 +153,9 @@ bool BgFetcher::run(GlobalTask *task) {
     return true;
 }
 
-bool BgFetcher::pendingJob() {
-    for (auto vbid : shard->getVBuckets()) {
-        RCPtr<VBucket> vb = shard->getBucket(vbid);
+bool BgFetcher::pendingJob() const {
+    for (const auto vbid : shard->getVBuckets()) {
+        VBucketPtr vb = shard->getBucket(vbid);
         if (vb && vb->hasPendingBGFetchItems()) {
             return true;
         }
index 87f09f1..e8ea1b6 100644 (file)
@@ -30,7 +30,7 @@
 #include "vbucket.h"
 
 // Forward declarations.
-class EventuallyPersistentStore;
+class KVBucket;
 class KVShard;
 class GlobalTask;
 
@@ -48,8 +48,20 @@ public:
      * @param k  The shard to which this background fetcher belongs
      * @param st reference to statistics
      */
-    BgFetcher(EventuallyPersistentStore *s, KVShard *k, EPStats &st) :
+    BgFetcher(KVBucket* s, KVShard* k, EPStats &st) :
         store(s), shard(k), taskId(0), stats(st), pendingFetch(false) {}
+
+    /**
+     * Construct a BgFetcher
+     *
+     * Equivalent to above constructor except stats reference is obtained
+     * from KVBucket's reference to EPEngine's epstats.
+     *
+     * @param s The store
+     * @param k The shard to which this background fetcher belongs
+     */
+    BgFetcher(KVBucket& s, KVShard& k);
+
     ~BgFetcher() {
         LockHolder lh(queueMutex);
         if (!pendingVbs.empty()) {
@@ -63,7 +75,7 @@ public:
     void start(void);
     void stop(void);
     bool run(GlobalTask *task);
-    bool pendingJob(void);
+    bool pendingJob(void) const;
     void notifyBGEvent(void);
     void setTaskId(size_t newId) { taskId = newId; }
     void addPendingVB(VBucket::id_type vbId) {
@@ -72,17 +84,16 @@ public:
     }
 
 private:
-    size_t doFetch(VBucket::id_type vbId);
-    void clearItems(VBucket::id_type vbId);
+    size_t doFetch(VBucket::id_type vbId, vb_bgfetch_queue_t& items);
+    void clearItems(VBucket::id_type vbId, vb_bgfetch_queue_t& items);
 
-    EventuallyPersistentStore *store;
-    KVShard *shard;
-    vb_bgfetch_queue_t items2fetch;
+    KVBucket* store;
+    KVShard* shard;
     size_t taskId;
-    Mutex queueMutex;
+    std::mutex queueMutex;
     EPStats &stats;
 
-    AtomicValue<bool> pendingFetch;
+    std::atomic<bool> pendingFetch;
     std::set<VBucket::id_type> pendingVbs;
 };
 
index a4f1a1a..f79a0d9 100644 (file)
  *   limitations under the License.
  */
 
+#include <memcached/engine.h>
+
 #include "bloomfilter.h"
+
 #include "murmurhash3.h"
 
 #include <cmath>
@@ -51,6 +54,13 @@ size_t BloomFilter::estimateNoOfHashes(size_t key_count) {
     return round(((double) filterSize / key_count) * (log(2.0)));
 }
 
+uint64_t BloomFilter::hashDocKey(const DocKey& key, uint32_t iteration) {
+    uint64_t result = 0;
+    uint32_t seed = iteration + (uint32_t(key.getDocNamespace()) * noOfHashes);
+    MURMURHASH_3(key.data(), key.size(), seed, &result);
+    return result;
+}
+
 void BloomFilter::setStatus(bfilter_status_t to) {
     switch (status) {
         case BFILTER_DISABLED:
@@ -103,13 +113,11 @@ std::string BloomFilter::getStatusString() {
     return "UNKNOWN";
 }
 
-void BloomFilter::addKey(const char *key, size_t keylen) {
+void BloomFilter::addKey(const DocKey& key) {
     if (status == BFILTER_COMPACTING || status == BFILTER_ENABLED) {
         bool overlap = true;
-        uint32_t i;
-        uint64_t result;
-        for (i = 0; i < noOfHashes; i++) {
-            MURMURHASH_3(key, keylen, i, &result);
+        for (uint32_t i = 0; i < noOfHashes; i++) {
+            uint64_t result = hashDocKey(key, i);
             if (overlap && bitArray[result % filterSize] == 0) {
                 overlap = false;
             }
@@ -121,12 +129,10 @@ void BloomFilter::addKey(const char *key, size_t keylen) {
     }
 }
 
-bool BloomFilter::maybeKeyExists(const char *key, uint32_t keylen) {
+bool BloomFilter::maybeKeyExists(const DocKey& key) {
     if (status == BFILTER_COMPACTING || status == BFILTER_ENABLED) {
-        uint32_t i;
-        uint64_t result;
-        for (i = 0; i < noOfHashes; i++) {
-            MURMURHASH_3(key, keylen, i, &result);
+        for (uint32_t i = 0; i < noOfHashes; i++) {
+            uint64_t result = hashDocKey(key, i);
             if (bitArray[result % filterSize] == 0) {
                 // The key does NOT exist.
                 return false;
index 24509e6..67033ce 100644 (file)
@@ -23,6 +23,8 @@
 #include <string>
 #include <vector>
 
+#include <memcached/dockey.h>
+
 enum bfilter_status_t {
     BFILTER_DISABLED,
     BFILTER_PENDING,
@@ -46,16 +48,18 @@ public:
     bfilter_status_t getStatus();
     std::string getStatusString();
 
-    void addKey(const char *key, size_t keylen);
-    bool maybeKeyExists(const char *key, uint32_t keylen);
+    void addKey(const DocKey& key);
+    bool maybeKeyExists(const DocKey& key);
 
     size_t getNumOfKeysInFilter();
     size_t getFilterSize();
 
-private:
+protected:
     size_t estimateFilterSize(size_t key_count, double false_positive_prob);
     size_t estimateNoOfHashes(size_t key_count);
 
+    uint64_t hashDocKey(const DocKey& key, uint32_t iteration);
+
     size_t filterSize;
     size_t noOfHashes;
 
index e364d99..ebeb9e8 100644 (file)
@@ -20,9 +20,8 @@
 
 #include "config.h"
 
-#include <string>
-
 #include "locks.h"
+#include "storeddockey.h"
 #include "syncobject.h"
 #include "utility.h"
 
@@ -30,18 +29,18 @@ class Item;
 
 class CacheLookup {
 public:
-    CacheLookup(std::string k, int64_t s, uint16_t vb) :
+    CacheLookup(const DocKey& k, int64_t s, uint16_t vb) :
         key(k), bySeqno(s), vbid(vb) {}
 
     ~CacheLookup() {}
 
-    std::string& getKey() { return key; }
+    StoredDocKey& getKey() { return key; }
 
     int64_t getBySeqno() { return bySeqno; }
 
     uint16_t getVBucketId() { return vbid; }
 private:
-    std::string key;
+    StoredDocKey key;
     int64_t bySeqno;
     uint16_t vbid;
 };
@@ -63,7 +62,9 @@ public:
     /**
      * The value retrieved for the key.
      */
-    Item* getValue() { return value; }
+    Item* getValue() const {
+        return value;
+    }
 
     /**
      * Engine code describing what happened.
index 2e5e3cb..6e0de06 100644 (file)
@@ -24,6 +24,7 @@
 
 #include "checkpoint.h"
 #include "ep_engine.h"
+#include "pre_link_document_context.h"
 #define STATWRITER_NAMESPACE checkpoint
 #include "statwriter.h"
 #undef STATWRITER_NAMESPACE
@@ -136,7 +137,7 @@ void Checkpoint::popBackCheckpointEndItem() {
     }
 }
 
-bool Checkpoint::keyExists(const std::string &key) {
+bool Checkpoint::keyExists(const DocKey& key) {
     return keyIndex.find(key) != keyIndex.end();
 }
 
@@ -148,9 +149,7 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
                         ") is not OPEN");
     }
     queue_dirty_t rv;
-
     checkpoint_index::iterator it = keyIndex.find(qi->getKey());
-
     // Check if the item is a meta item
     if (qi->isCheckPointMetaItem()) {
         // empty items act only as a dummy element for the start of the
@@ -165,7 +164,7 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
         // Check if this checkpoint already had an item for the same key
         if (it != keyIndex.end()) {
             rv = EXISTING_ITEM;
-            std::list<queued_item>::iterator currPos = it->second.position;
+            CheckpointQueue::iterator currPos = it->second.position;
             const int64_t currMutationId{it->second.mutation_id};
 
             // Given the key already exists, need to check all cursors in this
@@ -233,8 +232,8 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
         }
     }
 
-    if (qi->getNKey() > 0) {
-        std::list<queued_item>::iterator last = toWrite.end();
+    if (qi->getKey().size() > 0) {
+        CheckpointQueue::iterator last = toWrite.end();
         // --last is okay as the list is not empty now.
         index_entry entry = {--last, qi->getBySeqno()};
         // Set the index of the key to the new item that is pushed back into
@@ -246,8 +245,8 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
             keyIndex[qi->getKey()] = entry;
         }
         if (rv == NEW_ITEM) {
-            size_t newEntrySize = qi->getNKey() + sizeof(index_entry) +
-                                  sizeof(queued_item);
+            size_t newEntrySize = qi->getKey().size() +
+                                  sizeof(index_entry) + sizeof(queued_item);
             memOverhead += newEntrySize;
             stats.memOverhead->fetch_add(newEntrySize);
             if (stats.memOverhead->load() >= GIGANTOR) {
@@ -270,6 +269,11 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
     return rv;
 }
 
+const StoredDocKey Checkpoint::DummyKey("dummy_key", DocNamespace::System);
+const StoredDocKey Checkpoint::CheckpointStartKey("checkpoint_start", DocNamespace::System);
+const StoredDocKey Checkpoint::CheckpointEndKey("checkpoint_end", DocNamespace::System);
+const StoredDocKey Checkpoint::SetVBucketStateKey("set_vbucket_state", DocNamespace::System);
+
 size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
     size_t numNewItems = 0;
     size_t newEntryMemOverhead = 0;
@@ -279,13 +283,13 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
         " for vbucket %d",
         pPrevCheckpoint->getId(), checkpointId, vbucketId);
 
-    std::list<queued_item>::iterator itr = toWrite.begin();
-    uint64_t seqno = pPrevCheckpoint->getMutationIdForKey("dummy_key", true);
-    metaKeyIndex["dummy_key"].mutation_id = seqno;
+    CheckpointQueue::iterator itr = toWrite.begin();
+    uint64_t seqno = pPrevCheckpoint->getMutationIdForKey(Checkpoint::DummyKey, true);
+    metaKeyIndex[Checkpoint::DummyKey].mutation_id = seqno;
     (*itr)->setBySeqno(seqno);
 
-    seqno = pPrevCheckpoint->getMutationIdForKey("checkpoint_start", true);
-    metaKeyIndex["checkpoint_start"].mutation_id = seqno;
+    seqno = pPrevCheckpoint->getMutationIdForKey(Checkpoint::CheckpointStartKey, true);
+    metaKeyIndex[Checkpoint::CheckpointStartKey].mutation_id = seqno;
     ++itr;
     (*itr)->setBySeqno(seqno);
 
@@ -293,7 +297,7 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
     // into the current checkpoint as necessary.
     for (auto rit = pPrevCheckpoint->rbegin(); rit != pPrevCheckpoint->rend();
             ++rit) {
-        const std::string &key = (*rit)->getKey();
+        const auto key = (*rit)->getKey();
         switch ((*rit)->getOperation()) {
             case queue_op::set:
             case queue_op::del:
@@ -337,6 +341,7 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
                 break;
 
             case queue_op::set_vbucket_state:
+            case queue_op::system_event:
                 // Need to re-insert these into the correct place in the index.
                 if (metaKeyIndex.find(key) == metaKeyIndex.end()) {
                     // Skip the first two meta items (empty & checkpoint start).
@@ -373,8 +378,7 @@ size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
     return numNewItems;
 }
 
-uint64_t Checkpoint::getMutationIdForKey(const std::string &key, bool isMeta)
-{
+uint64_t Checkpoint::getMutationIdForKey(const DocKey& key, bool isMeta) {
     uint64_t mid = 0;
     checkpoint_index& chkIdx = isMeta ? metaKeyIndex : keyIndex;
 
@@ -382,7 +386,11 @@ uint64_t Checkpoint::getMutationIdForKey(const std::string &key, bool isMeta)
     if (it != chkIdx.end()) {
         mid = it->second.mutation_id;
     } else {
-        LOG(EXTENSION_LOG_WARNING, "%s not found in chk index", key.c_str());
+        throw std::invalid_argument("key{" +
+                                    std::string(reinterpret_cast<const char*>(key.data())) +
+                                    "} not found in " +
+                                    std::string(isMeta ? "meta" : "key") +
+                                    " index");
     }
     return mid;
 }
@@ -407,12 +415,36 @@ std::ostream& operator <<(std::ostream& os, const Checkpoint& c) {
     for (const auto& e : c.toWrite) {
         os << "\t{" << e->getBySeqno() << ","
            << to_string(e->getOperation()) << ","
-           << e->getKey() << "}" << std::endl;
+           << e->getKey().c_str() << "}" << std::endl;
     }
     os << "]";
     return os;
 }
 
+CheckpointManager::CheckpointManager(EPStats& st,
+                                     uint16_t vbucket,
+                                     CheckpointConfig& config,
+                                     int64_t lastSeqno,
+                                     uint64_t lastSnapStart,
+                                     uint64_t lastSnapEnd,
+                                     FlusherCallback cb)
+    : stats(st),
+      checkpointConfig(config),
+      vbucketId(vbucket),
+      numItems(0),
+      lastBySeqno(lastSeqno),
+      lastClosedChkBySeqno(lastSeqno),
+      isCollapsedCheckpoint(false),
+      pCursorPreCheckpointId(0),
+      flusherCB(cb) {
+    LockHolder lh(queueLock);
+    addNewCheckpoint_UNLOCKED(1, lastSnapStart, lastSnapEnd);
+    if (checkpointConfig.isPersistenceEnabled()) {
+        registerCursor_UNLOCKED(
+                "persistence", 1, false, MustSendCheckpointEnd::NO);
+    }
+}
+
 CheckpointManager::~CheckpointManager() {
     std::list<Checkpoint*>::iterator it = checkpointList.begin();
     while(it != checkpointList.end()) {
@@ -639,7 +671,7 @@ CursorRegResult CheckpointManager::registerCursorBySeqno(
             // Requested sequence number lies within this checkpoint.
             // Calculate which item to position the cursor at.
             size_t ckpt_meta_skipped{0};
-            std::list<queued_item>::iterator iitr = (*itr)->begin();
+            CheckpointQueue::iterator iitr = (*itr)->begin();
             while (++iitr != (*itr)->end() &&
                     (startBySeqno >=
                      static_cast<uint64_t>((*iitr)->getBySeqno()))) {
@@ -678,9 +710,9 @@ CursorRegResult CheckpointManager::registerCursorBySeqno(
          * number we are looking for is higher than anything currently assigned
          *  and there is already an assert above for this case.
          */
-        LOG(EXTENSION_LOG_WARNING, "Cursor not registered into vb %d "
-            " for stream '%s' because seqno %" PRIu64 " is too high",
-            vbucketId, name.c_str(), startBySeqno);
+        throw std::logic_error(
+                "CheckpointManager::registerCursorBySeqno the sequences number "
+                "is higher than anything currently assigned");
     }
     return result;
 }
@@ -755,7 +787,7 @@ bool CheckpointManager::registerCursor_UNLOCKED(
         (*it)->registerCursorName(name);
     } else {
         size_t offset = 0, meta_offset = 0;
-        std::list<queued_item>::iterator curr;
+        CheckpointQueue::iterator curr;
 
         LOG(EXTENSION_LOG_DEBUG,
             "Checkpoint %" PRIu64 " for vbucket %d exists in memory. "
@@ -858,32 +890,26 @@ checkpointCursorInfoList CheckpointManager::getAllCursors() {
 }
 
 bool CheckpointManager::isCheckpointCreationForHighMemUsage(
-                                              const RCPtr<VBucket> &vbucket) {
+        const VBucket& vbucket) {
     bool forceCreation = false;
     double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
     // pesistence and conn cursors are all currently in the open checkpoint?
     bool allCursorsInOpenCheckpoint =
         (connCursors.size() + 1) == checkpointList.back()->getNumberOfCursors();
 
-    if (memoryUsed > stats.mem_high_wat &&
-        allCursorsInOpenCheckpoint &&
+    if (memoryUsed > stats.mem_high_wat && allCursorsInOpenCheckpoint &&
         (checkpointList.back()->getNumItems() >= MIN_CHECKPOINT_ITEMS ||
-         checkpointList.back()->getNumItems() == vbucket->ht.getNumInMemoryItems())) {
+         checkpointList.back()->getNumItems() ==
+                 vbucket.ht.getNumInMemoryItems())) {
         forceCreation = true;
     }
     return forceCreation;
 }
 
 size_t CheckpointManager::removeClosedUnrefCheckpoints(
-                                              const RCPtr<VBucket> &vbucket,
-                                              bool &newOpenCheckpointCreated) {
-
+        VBucket& vbucket, bool& newOpenCheckpointCreated) {
     // This function is executed periodically by the non-IO dispatcher.
-    LockHolder lh(queueLock);
-    if (!vbucket) {
-        throw std::invalid_argument("CheckpointManager::removeCloseUnrefCheckpoints:"
-                        " vbucket must be non-NULL");
-    }
+    std::unique_lock<std::mutex> lh(queueLock);
     uint64_t oldCheckpointId = 0;
     bool canCreateNewCheckpoint = false;
     if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
@@ -891,9 +917,7 @@ size_t CheckpointManager::removeClosedUnrefCheckpoints(
          checkpointList.front()->getNumberOfCursors() == 0)) {
         canCreateNewCheckpoint = true;
     }
-    if (vbucket->getState() == vbucket_state_active &&
-        canCreateNewCheckpoint) {
-
+    if (vbucket.getState() == vbucket_state_active && canCreateNewCheckpoint) {
         bool forceCreation = isCheckpointCreationForHighMemUsage(vbucket);
         // Check if this master active vbucket needs to create a new open
         // checkpoint.
@@ -913,11 +937,22 @@ size_t CheckpointManager::removeClosedUnrefCheckpoints(
     size_t numMetaItems = 0;
     size_t numCheckpointsRemoved = 0;
     std::list<Checkpoint*> unrefCheckpointList;
-    std::list<Checkpoint*>::iterator it = checkpointList.begin();
-    for (; it != checkpointList.end(); ++it) {
+    // Iterate through the current checkpoints (from oldest to newest), checking
+    // if the checkpoint can be removed.
+    auto it = checkpointList.begin();
+    // Note terminating condition - we stop at one before the last checkpoint -
+    // we must leave at least one checkpoint in existence.
+    for (;
+         it != checkpointList.end() && std::next(it) != checkpointList.end();
+         ++it) {
+
         removeInvalidCursorsOnCheckpoint(*it);
+
+        // When we encounter the first checkpoint which has cursor(s) in it,
+        // or if the persistence cursor is still operating, stop.
         if ((*it)->getNumberOfCursors() > 0 ||
-            (*it)->getId() > pCursorPreCheckpointId) {
+                (checkpointConfig.isPersistenceEnabled() &&
+                 (*it)->getId() > pCursorPreCheckpointId)) {
             break;
         } else {
             numUnrefItems += (*it)->getNumItems();
@@ -951,20 +986,11 @@ size_t CheckpointManager::removeClosedUnrefCheckpoints(
     // the memory overhead.
     if (checkpointConfig.isCheckpointMergeSupported() &&
         !checkpointConfig.canKeepClosedCheckpoints() &&
-        vbucket->getState() == vbucket_state_replica)
-    {
+        vbucket.getState() == vbucket_state_replica) {
         size_t curr_remains = getNumItemsForCursor_UNLOCKED(pCursorName);
         collapseClosedCheckpoints(unrefCheckpointList);
         size_t new_remains = getNumItemsForCursor_UNLOCKED(pCursorName);
-        if (curr_remains > new_remains) {
-            size_t diff = curr_remains - new_remains;
-            stats.decrDiskQueueSize(diff);
-            vbucket->decrDirtyQueueSize(diff);
-        } else if (curr_remains < new_remains) {
-            size_t diff = new_remains - curr_remains;
-            stats.diskQueueSize.fetch_add(diff);
-            vbucket->dirtyQueueSize.fetch_add(diff);
-        }
+        updateDiskQueueStats(vbucket, curr_remains, new_remains);
     }
     lh.unlock();
 
@@ -1037,7 +1063,7 @@ void CheckpointManager::collapseClosedCheckpoints(
                 (*rit)->getCursorNameList().begin();
             for (; nameItr != (*rit)->getCursorNameList().end(); ++nameItr) {
                 cursor_index::iterator cc = connCursors.find(*nameItr);
-                const std::string& key = (*(cc->second.currentPos))->getKey();
+                const auto key = (*(cc->second.currentPos))->getKey();
                 bool isMetaItem =
                             (*(cc->second.currentPos))->isCheckPointMetaItem();
                 bool cursor_on_chk_start = false;
@@ -1106,15 +1132,20 @@ void CheckpointManager::updateStatsForNewQueuedItem_UNLOCKED(const LockHolder&,
                                                              VBucket& vb,
                                                              const queued_item& qi) {
     ++stats.totalEnqueued;
-    ++stats.diskQueueSize;
-    vb.doStatsForQueueing(*qi, qi->size());
+    if (checkpointConfig.isPersistenceEnabled()) {
+        ++stats.diskQueueSize;
+        vb.doStatsForQueueing(*qi, qi->size());
+    }
     // Update the checkpoint's memory usage
     checkpointList.back()->incrementMemConsumption(qi->size());
 }
 
-bool CheckpointManager::queueDirty(VBucket& vb, queued_item& qi,
-                                   const GenerateBySeqno generateBySeqno,
-                                   const GenerateCas generateCas) {
+bool CheckpointManager::queueDirty(
+        VBucket& vb,
+        queued_item& qi,
+        const GenerateBySeqno generateBySeqno,
+        const GenerateCas generateCas,
+        PreLinkDocumentContext* preLinkDocumentContext) {
     LockHolder lh(queueLock);
 
     bool canCreateNewCheckpoint = false;
@@ -1160,7 +1191,11 @@ bool CheckpointManager::queueDirty(VBucket& vb, queued_item& qi,
     // MB-20798: Allow the HLC to be created 'atomically' with the seqno as
     // we're holding the ::queueLock.
     if (GenerateCas::Yes == generateCas) {
-        qi->setCas(vb.nextHLCCas());
+        auto cas = vb.nextHLCCas();
+        qi->setCas(cas);
+        if (preLinkDocumentContext != nullptr) {
+            preLinkDocumentContext->preLink(cas, lastBySeqno);
+        }
     }
 
     uint64_t st = checkpointList.back()->getSnapshotStartSeqno();
@@ -1242,6 +1277,8 @@ snapshot_range_t CheckpointManager::getAllItemsForCursor(
             "cursor:%s range:{%" PRIu64 ", %" PRIu64 "}",
             name.c_str(), range.start, range.end);
 
+    it->second.numVisits++;
+
     return range;
 }
 
@@ -1253,7 +1290,7 @@ queued_item CheckpointManager::nextItem(const std::string &name,
         LOG(EXTENSION_LOG_WARNING,
         "The cursor with name \"%s\" is not found in the checkpoint of vbucket"
         "%d.\n", name.c_str(), vbucketId);
-        queued_item qi(new Item(std::string(""), 0xffff,
+        queued_item qi(new Item(DocKey("", DocNamespace::System), 0xffff,
                                 queue_op::empty, 0, 0));
         return qi;
     }
@@ -1262,7 +1299,7 @@ queued_item CheckpointManager::nextItem(const std::string &name,
             "VBucket %d is still in backfill phase that doesn't allow "
             " the cursor to fetch an item from it's current checkpoint",
             vbucketId);
-        queued_item qi(new Item(std::string(""), 0xffff,
+        queued_item qi(new Item(DocKey("", DocNamespace::System), 0xffff,
                                 queue_op::empty, 0, 0));
         return qi;
     }
@@ -1273,7 +1310,7 @@ queued_item CheckpointManager::nextItem(const std::string &name,
         return *(cursor.currentPos);
     } else {
         isLastMutationItem = false;
-        queued_item qi(new Item(std::string(""), 0xffff,
+        queued_item qi(new Item(DocKey("", DocNamespace::System), 0xffff,
                                 queue_op::empty, 0, 0));
         return qi;
     }
@@ -1297,14 +1334,16 @@ void CheckpointManager::dump() const {
     std::cerr << *this << std::endl;
 }
 
-void CheckpointManager::clear(RCPtr<VBucket> &vb, uint64_t seqno) {
+void CheckpointManager::clear(VBucket& vb, uint64_t seqno) {
     LockHolder lh(queueLock);
-    clear_UNLOCKED(vb->getState(), seqno);
+    clear_UNLOCKED(vb.getState(), seqno);
 
     // Reset the disk write queue size stat for the vbucket
-    size_t currentDqSize = vb->dirtyQueueSize.load();
-    vb->decrDirtyQueueSize(currentDqSize);
-    stats.decrDiskQueueSize(currentDqSize);
+    if (checkpointConfig.isPersistenceEnabled()) {
+        size_t currentDqSize = vb.dirtyQueueSize.load();
+        vb.dirtyQueueSize.fetch_sub(currentDqSize);
+        stats.diskQueueSize.fetch_sub(currentDqSize);
+    }
 }
 
 void CheckpointManager::clear_UNLOCKED(vbucket_state_t vbState, uint64_t seqno) {
@@ -1411,14 +1450,15 @@ uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(bool forceCreation,
     return checkpoint_id;
 }
 
-size_t CheckpointManager::getNumItemsForCursor(const std::string &name) {
+size_t CheckpointManager::getNumItemsForCursor(const std::string &name) const {
     LockHolder lh(queueLock);
     return getNumItemsForCursor_UNLOCKED(name);
 }
 
-size_t CheckpointManager::getNumItemsForCursor_UNLOCKED(const std::string &name) {
+size_t CheckpointManager::getNumItemsForCursor_UNLOCKED(
+                                                const std::string &name) const {
     size_t remains = 0;
-    cursor_index::iterator it = connCursors.find(name);
+    cursor_index::const_iterator it = connCursors.find(name);
     if (it != connCursors.end()) {
         size_t offset = it->second.offset + getNumOfMetaItemsFromCursor(it->second);
         remains = (numItems > offset) ? numItems - offset : 0;
@@ -1461,7 +1501,7 @@ void CheckpointManager::decrCursorFromCheckpointEnd(const std::string &name) {
 
 bool CheckpointManager::isLastMutationItemInCheckpoint(
                                                    CheckpointCursor &cursor) {
-    std::list<queued_item>::iterator it = cursor.currentPos;
+    CheckpointQueue::iterator it = cursor.currentPos;
     ++it;
     if (it == (*(cursor.currentCheckpoint))->end() ||
         (*it)->getOperation() == queue_op::checkpoint_end) {
@@ -1553,8 +1593,27 @@ snapshot_info_t CheckpointManager::getSnapshotInfo() {
     return info;
 }
 
+void CheckpointManager::updateDiskQueueStats(VBucket& vbucket,
+                                             size_t curr_remains,
+                                             size_t new_remains) {
+    if (!checkpointConfig.isPersistenceEnabled()) {
+        /* we do not have a disk and hence no disk stats to update */
+        return;
+    }
+
+    if (curr_remains > new_remains) {
+        size_t diff = curr_remains - new_remains;
+        stats.diskQueueSize.fetch_sub(diff);
+        vbucket.dirtyQueueSize.fetch_sub(diff);
+    } else if (curr_remains < new_remains) {
+        size_t diff = new_remains - curr_remains;
+        stats.diskQueueSize.fetch_add(diff);
+        vbucket.dirtyQueueSize.fetch_add(diff);
+    }
+}
+
 void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
-                                               const RCPtr<VBucket> &vbucket) {
+                                                 VBucket& vbucket) {
     LockHolder lh(queueLock);
 
     // Ignore CHECKPOINT_START message with ID 0 as 0 is reserved for
@@ -1613,15 +1672,7 @@ void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
         size_t curr_remains = getNumItemsForCursor_UNLOCKED(pCursorName);
         collapseCheckpoints(id);
         size_t new_remains = getNumItemsForCursor_UNLOCKED(pCursorName);
-        if (curr_remains > new_remains) {
-            size_t diff = curr_remains - new_remains;
-            stats.decrDiskQueueSize(diff);
-            vbucket->decrDirtyQueueSize(diff);
-        } else if (curr_remains < new_remains) {
-            size_t diff = new_remains - curr_remains;
-            stats.diskQueueSize.fetch_add(diff);
-            vbucket->dirtyQueueSize.fetch_add(diff);
-        }
+        updateDiskQueueStats(vbucket, curr_remains, new_remains);
     }
 }
 
@@ -1638,7 +1689,7 @@ void CheckpointManager::collapseCheckpoints(uint64_t id) {
                 queue_op::checkpoint_start;
 
         Checkpoint* chk = *(itr.second.currentCheckpoint);
-        const std::string& key = (*(itr.second.currentPos))->getKey();
+        auto key = (*(itr.second.currentPos))->getKey();
         cursorMap[itr.first] = CursorPosition{chk->getMutationIdForKey(key, isMetaItem),
                                               cursor_on_chk_start};
     }
@@ -1758,7 +1809,7 @@ bool CheckpointManager::hasNext(const std::string &name) {
     }
 
     bool hasMore = true;
-    std::list<queued_item>::iterator curr = it->second.currentPos;
+    CheckpointQueue::iterator curr = it->second.currentPos;
     ++curr;
     if (curr == (*(it->second.currentCheckpoint))->end() &&
         (*(it->second.currentCheckpoint)) == checkpointList.back()) {
@@ -1797,7 +1848,8 @@ queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
                         ") is not a valid item to create");
     }
 
-    queued_item qi(new Item(key, vbid, checkpoint_op, id, bySeqno));
+    queued_item qi(new Item(DocKey(key, DocNamespace::System), vbid,
+                   checkpoint_op, id, bySeqno));
     return qi;
 }
 
@@ -1817,9 +1869,11 @@ uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
 
 void CheckpointManager::itemsPersisted() {
     LockHolder lh(queueLock);
-    CheckpointCursor& persistenceCursor = connCursors[pCursorName];
-    std::list<Checkpoint*>::iterator itr = persistenceCursor.currentCheckpoint;
-    pCursorPreCheckpointId = ((*itr)->getId() > 0) ? (*itr)->getId() - 1 : 0;
+    auto persistenceCursor = connCursors.find(pCursorName);
+    if (persistenceCursor != connCursors.end()) {
+        auto itr = persistenceCursor->second.currentCheckpoint;
+        pCursorPreCheckpointId = ((*itr)->getId() > 0) ? (*itr)->getId() - 1 : 0;
+    }
 }
 
 size_t CheckpointManager::getMemoryUsage_UNLOCKED() {
@@ -1884,6 +1938,7 @@ CheckpointConfig::CheckpointConfig(EventuallyPersistentEngine &e) {
     itemNumBasedNewCheckpoint = config.isItemNumBasedNewChk();
     keepClosedCheckpoints = config.isKeepClosedChks();
     enableChkMerge = config.isEnableChkMerge();
+    persistenceEnabled = config.getBucketType() == "persistent";
 }
 
 bool CheckpointConfig::validateCheckpointMaxItemsParam(size_t
@@ -1979,7 +2034,6 @@ void CheckpointManager::addStats(ADD_STAT add_stat, const void *cookie) {
                          vbucketId);
         add_casted_stat(buf, getNumItemsForCursor_UNLOCKED(pCursorName),
                         add_stat, cookie);
-
         checked_snprintf(buf, sizeof(buf), "vb_%d:mem_usage", vbucketId);
         add_casted_stat(buf, getMemoryUsage_UNLOCKED(), add_stat, cookie);
 
@@ -1995,6 +2049,11 @@ void CheckpointManager::addStats(ADD_STAT add_stat, const void *cookie) {
                              cur_it->first.c_str());
             add_casted_stat(buf, (*(cur_it->second.currentPos))->getBySeqno(),
                             add_stat, cookie);
+            checked_snprintf(buf, sizeof(buf), "vb_%d:%s:num_visits",
+                             vbucketId,
+                             cur_it->first.c_str());
+            add_casted_stat(buf, cur_it->second.numVisits.load(),
+                            add_stat, cookie);
         }
     } catch (std::exception& error) {
         LOG(EXTENSION_LOG_WARNING,
index 2f6de8b..9245215 100644 (file)
 
 #include "config.h"
 
+#include "callbacks.h"
+#include "ep_types.h"
+#include "item.h"
+#include "locks.h"
+#include "stats.h"
+
+#include <atomic>
 #include <list>
 #include <map>
+#include <memory>
 #include <set>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
-#include "atomic.h"
-#include "ep_types.h"
-#include "item.h"
-#include "locks.h"
-#include "stats.h"
-
 #define GIGANTOR ((size_t)1<<(sizeof(size_t)*8-1))
 
 #define MIN_CHECKPOINT_ITEMS 10
@@ -56,11 +58,15 @@ enum checkpoint_state {
 
 const char* to_string(enum checkpoint_state);
 
+// List is used for queueing mutations as vector incurs shift operations for
+// deduplication.
+typedef std::list<queued_item> CheckpointQueue;
+
 /**
  * A checkpoint index entry.
  */
 struct index_entry {
-    std::list<queued_item>::iterator position;
+    CheckpointQueue::iterator position;
     int64_t mutation_id;
 };
 
@@ -85,7 +91,7 @@ enum class MustSendCheckpointEnd {
 /**
  * The checkpoint index maps a key to a checkpoint index_entry.
  */
-typedef std::unordered_map<std::string, index_entry> checkpoint_index;
+typedef std::unordered_map<StoredDocKey, index_entry> checkpoint_index;
 
 /**
  * List of pairs containing checkpoint cursor name and corresponding flag
@@ -97,6 +103,7 @@ typedef std::list<std::pair<std::string, MustSendCheckpointEnd>>
 class Checkpoint;
 class CheckpointManager;
 class CheckpointConfig;
+class PreLinkDocumentContext;
 class VBucket;
 
 /**
@@ -149,12 +156,12 @@ public:
      */
     CheckpointCursor(const std::string &n,
                      std::list<Checkpoint*>::iterator checkpoint,
-                     std::list<queued_item>::iterator pos,
+                     CheckpointQueue::iterator pos,
                      size_t offset_,
                      size_t meta_items_read,
                      bool beginningOnChkCollapse,
                      MustSendCheckpointEnd needsCheckpointEndMetaItem) :
-        name(n), currentCheckpoint(checkpoint), currentPos(pos),
+        name(n), currentCheckpoint(checkpoint), currentPos(pos), numVisits(0),
         offset(offset_),
         ckptMetaItemsRead(meta_items_read),
         fromBeginningOnChkCollapse(beginningOnChkCollapse),
@@ -164,7 +171,8 @@ public:
     // that std::atomic implicitly deleted the assignment operator
     CheckpointCursor(const CheckpointCursor &other) :
         name(other.name), currentCheckpoint(other.currentCheckpoint),
-        currentPos(other.currentPos), offset(other.offset.load()),
+        currentPos(other.currentPos), numVisits(other.numVisits.load()),
+        offset(other.offset.load()),
         ckptMetaItemsRead(other.ckptMetaItemsRead),
         fromBeginningOnChkCollapse(other.fromBeginningOnChkCollapse),
         sendCheckpointEndMetaItem(other.sendCheckpointEndMetaItem) { }
@@ -173,6 +181,7 @@ public:
         name.assign(other.name);
         currentCheckpoint = other.currentCheckpoint;
         currentPos = other.currentPos;
+        numVisits = other.numVisits.load();
         offset.store(other.offset.load());
         setMetaItemOffset(other.ckptMetaItemsRead);
         fromBeginningOnChkCollapse = other.fromBeginningOnChkCollapse;
@@ -213,13 +222,16 @@ protected:
 private:
     std::string                      name;
     std::list<Checkpoint*>::iterator currentCheckpoint;
-    std::list<queued_item>::iterator currentPos;
+    CheckpointQueue::iterator currentPos;
+
+    // Number of times a cursor has been moved or processed.
+    std::atomic<size_t>              numVisits;
 
     // The offset (in terms of items) this cursor is from the start of the
     // checkpoint list. Includes meta and non-meta items. Used to calculate
     // how many items this cursor has remaining by subtracting
     // offset from CheckpointManager::numItems.
-    AtomicValue<size_t>              offset;
+    std::atomic<size_t>              offset;
     // Count of the number of meta items which have been read (processed) for
     // the *current* checkpoint.
     size_t ckptMetaItemsRead;
@@ -475,31 +487,31 @@ public:
         snapEndSeqno = seqno;
     }
 
-    std::list<queued_item>::iterator begin() {
+    CheckpointQueue::iterator begin() {
         return toWrite.begin();
     }
 
-    std::list<queued_item>::const_iterator begin() const {
+    CheckpointQueue::const_iterator begin() const {
         return toWrite.begin();
     }
 
-    std::list<queued_item>::iterator end() {
+    CheckpointQueue::iterator end() {
         return toWrite.end();
     }
 
-    std::list<queued_item>::const_iterator end() const {
+    CheckpointQueue::const_iterator end() const {
         return toWrite.end();
     }
 
-    std::list<queued_item>::reverse_iterator rbegin() {
+    CheckpointQueue::reverse_iterator rbegin() {
         return toWrite.rbegin();
     }
 
-    std::list<queued_item>::reverse_iterator rend() {
+    CheckpointQueue::reverse_iterator rend() {
         return toWrite.rend();
     }
 
-    bool keyExists(const std::string &key);
+    bool keyExists(const DocKey& key);
 
     /**
      * Return the memory overhead of this checkpoint instance, except for the memory used by
@@ -525,7 +537,7 @@ public:
      * @param isMetaKey indicates if the key is a checkpoint meta item
      * @return the mutation id for a given key
      */
-    uint64_t getMutationIdForKey(const std::string &key, bool isMetaKey);
+    uint64_t getMutationIdForKey(const DocKey& key, bool isMetaKey);
 
     /**
      * Function invoked by the cursor-dropper which checks if the
@@ -551,6 +563,11 @@ public:
         return effectiveMemUsage;
     }
 
+    static const StoredDocKey DummyKey;
+    static const StoredDocKey CheckpointStartKey;
+    static const StoredDocKey CheckpointEndKey;
+    static const StoredDocKey SetVBucketStateKey;
+
 private:
     EPStats                       &stats;
     uint64_t                       checkpointId;
@@ -564,8 +581,7 @@ private:
     /// Number of meta items (see Item::isCheckPointMetaItem).
     size_t numMetaItems;
     std::set<std::string>          cursors; // List of cursors with their unique names.
-    // List is used for queueing mutations as vector incurs shift operations for deduplication.
-    std::list<queued_item>         toWrite;
+    CheckpointQueue                toWrite;
     checkpoint_index               keyIndex;
     /* Index for meta keys like "dummy_key" */
     checkpoint_index               metaKeyIndex;
@@ -593,21 +609,13 @@ public:
 
     typedef std::shared_ptr<Callback<uint16_t> > FlusherCallback;
 
-    CheckpointManager(EPStats &st, uint16_t vbucket, CheckpointConfig &config,
-                      int64_t lastSeqno, uint64_t lastSnapStart,
+    CheckpointManager(EPStats& st,
+                      uint16_t vbucket,
+                      CheckpointConfig& config,
+                      int64_t lastSeqno,
+                      uint64_t lastSnapStart,
                       uint64_t lastSnapEnd,
-                      FlusherCallback cb,
-                      uint64_t checkpointId = 1) :
-        stats(st), checkpointConfig(config), vbucketId(vbucket), numItems(0),
-        lastBySeqno(lastSeqno), lastClosedChkBySeqno(lastSeqno),
-        isCollapsedCheckpoint(false),
-        pCursorPreCheckpointId(0),
-        flusherCB(cb) {
-        LockHolder lh(queueLock);
-        addNewCheckpoint_UNLOCKED(checkpointId, lastSnapStart, lastSnapEnd);
-            registerCursor_UNLOCKED("persistence", checkpointId, false,
-                                    MustSendCheckpointEnd::NO);
-    }
+                      FlusherCallback cb);
 
     ~CheckpointManager();
 
@@ -631,8 +639,8 @@ public:
      * as a result of running this function.
      * @return the number of items that are purged from checkpoint
      */
-    size_t removeClosedUnrefCheckpoints(const RCPtr<VBucket> &vbucket,
-                                        bool &newOpenCheckpointCreated);
+    size_t removeClosedUnrefCheckpoints(VBucket& vbucket,
+                                        boolnewOpenCheckpointCreated);
 
     /**
      * Register the cursor for getting items whose bySeqno values are between
@@ -698,11 +706,18 @@ public:
      * @param vb the vbucket that a new item is pushed into.
      * @param qi item to be persisted.
      * @param generateBySeqno yes/no generate the seqno for the item
+     * @param preLinkDocumentContext A context object needed for the
+     *        pre link document API in the server API. It&