MB-23714: Make VBucketPtr deletion schedule the delete of the VBucket 00/77100/14
authorJim Walker <jim@couchbase.com>
Thu, 20 Apr 2017 16:06:46 +0000 (17:06 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 27 Apr 2017 15:46:13 +0000 (15:46 +0000)
Using the std::shared_ptr Deleter this patch hooks into the deletion
of the VBucket so that a specific AUXIO (for persistent buckets) or a
NONIO (for ephemeral buckets) task performs the removal of VBucket
resources. This to ensure that no front-end thread runs the removal of
memory and more specifically the deletion of the disk data.

The aim of this commit is also to ensure there's no "race" between a
background task deleting the VB disk file when the VB has itself been
recreated. This is achieved now by having the creation of a VB to
increment the revision of the file and ensuring a delete can only
delete the older revision of the VB.

This has a few subtle changes, as only compaction or creation are
moving the revision forward, no code paths move the revision back.

1. A flush of the bucket which calls KVStore::reset would previously
return all vbuckets to a revision of 1. Again to ensure a delete task
cannot delete a "live" file, the flush doesn't reset the revision
number. So a flush of the bucket will result in deleted data, but
with x.couch.4 etc...

2. A delete of an individual vbucket has the same change. Previously
after a delete, the revision was reset to 1. This is no longer the
case and after the delete completes the revision is left unchanged.

Change-Id: I40d2f5fd658d9f8dd28a671028544831518a90d0
Reviewed-on: http://review.couchbase.org/77100
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
33 files changed:
CMakeLists.txt
src/couch-kvstore/couch-kvstore.cc
src/couch-kvstore/couch-kvstore.h
src/ep_bucket.cc
src/ep_vb.cc
src/ep_vb.h
src/ephemeral_bucket.cc
src/ephemeral_vb.cc
src/ephemeral_vb.h
src/globaltask.cc
src/hash_table.cc
src/hash_table.h
src/kv_bucket.cc
src/kv_bucket.h
src/kv_bucket_iface.h
src/kvshard.cc
src/kvshard.h
src/kvstore.h
src/tasks.cc
src/tasks.def.h
src/tasks.h
src/vbucket.cc
src/vbucket.h
src/vbucketdeletiontask.cc [new file with mode: 0644]
src/vbucketdeletiontask.h [new file with mode: 0644]
src/vbucketmap.cc
src/vbucketmap.h
src/vbucketmemorydeletiontask.cc [deleted file]
src/vbucketmemorydeletiontask.h [deleted file]
tests/ep_testsuite_basic.cc
tests/module_tests/evp_store_single_threaded_test.cc
tests/module_tests/evp_store_test.cc
tests/module_tests/kv_bucket_test.cc

index df7b93b..ffcfe84 100644 (file)
@@ -214,7 +214,7 @@ ADD_LIBRARY(ep_objs OBJECT
             src/vb_count_visitor.cc
             src/vbucket.cc
             src/vbucketmap.cc
-            src/vbucketmemorydeletiontask.cc
+            src/vbucketdeletiontask.cc
             src/warmup.cc
             ${OBJECTREGISTRY_SOURCE}
             ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
index 00c15b3..1720059 100644 (file)
@@ -282,6 +282,7 @@ CouchKVStore::CouchKVStore(KVStoreConfig &config, FileOpsInterface& ops,
                            bool read_only)
     : KVStore(config, read_only),
       dbname(config.getDBName()),
+      dbFileRevMap(configuration.getMaxVBuckets()),
       intransaction(false),
       scanCounter(0),
       logger(config.getLogger()),
@@ -298,7 +299,6 @@ CouchKVStore::CouchKVStore(KVStoreConfig &config, FileOpsInterface& ops,
 
     // pre-allocate lookup maps (vectors) given we have a relatively
     // small, fixed number of vBuckets.
-    dbFileRevMap.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(1));
     cachedDocCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(0));
     cachedDeleteCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(-1));
     cachedFileSize.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(0));
@@ -311,12 +311,16 @@ CouchKVStore::CouchKVStore(KVStoreConfig &config, FileOpsInterface& ops,
 CouchKVStore::CouchKVStore(const CouchKVStore &copyFrom)
     : KVStore(copyFrom),
       dbname(copyFrom.dbname),
-      dbFileRevMap(copyFrom.dbFileRevMap),
+      dbFileRevMap(copyFrom.dbFileRevMap.size()),
       numDbFiles(copyFrom.numDbFiles),
       intransaction(false),
       logger(copyFrom.logger),
       base_ops(copyFrom.base_ops)
 {
+    // std::atomic needs to be manually copied (not copy constructor)
+    for (size_t ii = 0; ii < dbFileRevMap.size(); ii++) {
+        dbFileRevMap[ii].store(copyFrom.dbFileRevMap[ii].load());
+    }
     createDataDir(dbname);
     statCollectingFileOps = getCouchstoreStatsOps(st.fsStats, base_ops);
     statCollectingFileOpsCompaction = getCouchstoreStatsOps(
@@ -385,11 +389,15 @@ void CouchKVStore::reset(uint16_t vbucketId) {
         cachedFileSize[vbucketId] = 0;
         cachedSpaceUsed[vbucketId] = 0;
 
-        //Unlink the couchstore file upon reset
+        // Unlink the current revision and then increment it to ensure any
+        // pending delete doesn't delete us. Note that the expectation is that
+        // some higher level per VB lock is required to prevent data-races here.
+        // KVBucket::vb_mutexes is used in this case.
         unlinkCouchFile(vbucketId, dbFileRevMap[vbucketId]);
+        incrementRevision(vbucketId);
+
         setVBucketState(vbucketId, *state, VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT,
                         true);
-        updateDbFileMap(vbucketId, 1);
     } else {
         throw std::invalid_argument("CouchKVStore::reset: No entry in cached "
                         "states for vbucket " + std::to_string(vbucketId));
@@ -581,29 +589,13 @@ void CouchKVStore::del(const Item &itm,
     pendingReqsQ.push_back(req);
 }
 
-bool CouchKVStore::delVBucket(uint16_t vbucket) {
+void CouchKVStore::delVBucket(uint16_t vbucket, uint64_t fileRev) {
     if (isReadOnly()) {
         throw std::logic_error("CouchKVStore::delVBucket: Not valid on a "
                         "read-only object.");
     }
 
-    unlinkCouchFile(vbucket, dbFileRevMap[vbucket]);
-
-    if (cachedVBStates[vbucket]) {
-        delete cachedVBStates[vbucket];
-    }
-
-    cachedDocCount[vbucket] = 0;
-    cachedDeleteCount[vbucket] = 0;
-    cachedFileSize[vbucket] = 0;
-    cachedSpaceUsed[vbucket] = 0;
-
-    std::string failovers("[{\"id\":0, \"seq\":0}]");
-    cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
-                                                0, 0, 0, failovers);
-    updateDbFileMap(vbucket, 1);
-
-    return true;
+    unlinkCouchFile(vbucket, fileRev);
 }
 
 std::vector<vbucket_state *> CouchKVStore::listPersistedVbuckets() {
@@ -2764,4 +2756,18 @@ std::string CouchKVStore::getCollectionsManifest(uint16_t vbid) {
     return readCollectionsManifest(*db.getDb());
 }
 
+void CouchKVStore::incrementRevision(uint16_t vbid) {
+    dbFileRevMap[vbid]++;
+}
+
+uint64_t CouchKVStore::prepareToDelete(uint16_t vbid) {
+    // Clear the stats so it looks empty (real deletion of the disk data occurs
+    // later)
+    cachedDocCount[vbid] = 0;
+    cachedDeleteCount[vbid] = 0;
+    cachedFileSize[vbid] = 0;
+    cachedSpaceUsed[vbid] = 0;
+    return dbFileRevMap[vbid];
+}
+
 /* end of couch-kvstore.cc */
index 5d6d956..9ee1492 100644 (file)
@@ -184,7 +184,7 @@ public:
     void initialize();
 
     /**
-     * Reset database to a clean state.
+     * Reset vbucket to a clean state.
      */
     void reset(uint16_t vbucketId) override;
 
@@ -281,7 +281,13 @@ public:
      */
     void del(const Item &itm, Callback<int> &cb) override;
 
-    bool delVBucket(uint16_t vbucket) override;
+    /**
+     * Delete a given vbucket database instance from underlying storage
+     *
+     * @param vbucket vbucket id
+     * @param fileRev the revision of the file to delete
+     */
+    void delVBucket(uint16_t vbucket, uint64_t fileRev) override;
 
     /**
      * Retrieve the list of persisted vbucket states
@@ -433,13 +439,28 @@ public:
 
     std::string getCollectionsManifest(uint16_t vbid) override;
 
+    /**
+     * Increment the revision number of the vbucket.
+     * @param vbid ID of the vbucket to change.
+     */
+    void incrementRevision(uint16_t vbid) override;
+
+    /**
+     * Prepare for delete of the vbucket file, this just removes the in-memory
+     * stats for the vbucket and returns the current file revision (which is
+     * the revision that must later be unlinked).
+     *
+     * @param vbid ID of the vbucket being deleted
+     * @return the revision ID to delete (via ::delVBucket)
+     */
+    uint64_t prepareToDelete(uint16_t vbid) override;
+
 protected:
     /*
      * Returns the DbInfo for the given vbucket database.
      */
     DbInfo getDbInfo(uint16_t vbid);
 
-protected:
     bool setVBucketState(uint16_t vbucketId, const vbucket_state &vbstate,
                          VBStatePersist options, bool reset=false);
 
@@ -535,10 +556,10 @@ protected:
 
     const std::string dbname;
 
-    // Map of the fileRev for each vBucket. Using RelaxedAtomic so
-    // stats gathering (doDcpVbTakeoverStats) can get a snapshot
-    // without having to lock.
-    std::vector<Couchbase::RelaxedAtomic<uint64_t>> dbFileRevMap;
+    /**
+     * Per-vbucket file revision atomic to ensure writer threads see increments
+     */
+    std::vector<std::atomic<uint64_t>> dbFileRevMap;
 
     uint16_t numDbFiles;
     std::vector<CouchRequest *> pendingReqsQ;
index 9ce29b0..4a6d7ec 100644 (file)
@@ -231,23 +231,28 @@ VBucketPtr EPBucket::makeVBucket(VBucket::id_type id,
                                      uint64_t maxCas,
                                      const std::string& collectionsManifest) {
     auto flusherCb = std::make_shared<NotifyFlusherCB>(shard);
-    return std::make_shared<EPVBucket>(id,
-                                       state,
-                                       stats,
-                                       engine.getCheckpointConfig(),
-                                       shard,
-                                       lastSeqno,
-                                       lastSnapStart,
-                                       lastSnapEnd,
-                                       std::move(table),
-                                       flusherCb,
-                                       std::move(newSeqnoCb),
-                                       engine.getConfiguration(),
-                                       eviction_policy,
-                                       initState,
-                                       purgeSeqno,
-                                       maxCas,
-                                       collectionsManifest);
+    // Not using make_shared or allocate_shared
+    // 1. make_shared doesn't accept a Deleter
+    // 2. allocate_shared has inconsistencies between platforms in calling
+    //    alloc.destroy (libc++ doesn't call it)
+    return VBucketPtr(new EPVBucket(id,
+                                    state,
+                                    stats,
+                                    engine.getCheckpointConfig(),
+                                    shard,
+                                    lastSeqno,
+                                    lastSnapStart,
+                                    lastSnapEnd,
+                                    std::move(table),
+                                    flusherCb,
+                                    std::move(newSeqnoCb),
+                                    engine.getConfiguration(),
+                                    eviction_policy,
+                                    initState,
+                                    purgeSeqno,
+                                    maxCas,
+                                    collectionsManifest),
+                      VBucket::DeferredDeleter(engine));
 }
 
 ENGINE_ERROR_CODE EPBucket::statsVKey(const DocKey& key,
index 5de7507..06f0d13 100644 (file)
@@ -23,6 +23,7 @@
 #include "failover-table.h"
 #include "kvshard.h"
 #include "stored_value_factories.h"
+#include "vbucketdeletiontask.h"
 
 EPVBucket::EPVBucket(id_type i,
                      vbucket_state_t newState,
@@ -589,3 +590,15 @@ GetValue EPVBucket::getInternalNonResident(const DocKey& key,
     return GetValue(
             NULL, ENGINE_EWOULDBLOCK, v.getBySeqno(), true, v.getNRUValue());
 }
+
+void EPVBucket::setupDeferredDeletion(const void* cookie) {
+    setDeferredDeletionCookie(cookie);
+    deferredDeletionFileRevision.store(
+            getShard()->getRWUnderlying()->prepareToDelete(getId()));
+    setDeferredDeletion(true);
+}
+
+void EPVBucket::scheduleDeferredDeletion(EventuallyPersistentEngine& engine) {
+    ExTask task = new VBucketMemoryAndDiskDeletionTask(engine, *shard, this);
+    ExecutorPool::get()->schedule(task);
+}
\ No newline at end of file
index 81a9a84..1c0dcb9 100644 (file)
@@ -108,6 +108,28 @@ public:
     void queueBackfillItem(queued_item& qi,
                            const GenerateBySeqno generateBySeqno) override;
 
+    /**
+     * Setup deferred deletion, this is where deletion of the vbucket is
+     * deferred and completed by an AUXIO task as it will hit disk for the data
+     * file unlink.
+     *
+     * @param cookie A cookie to notify when the deletion task completes.
+     */
+    void setupDeferredDeletion(const void* cookie) override;
+
+    /**
+     * @return the file revision to be unlinked by the deferred deletion task
+     */
+    uint64_t getDeferredDeletionFileRevision() const {
+        return deferredDeletionFileRevision;
+    }
+
+    /**
+     * Schedule a VBucketMemoryAndDiskDeletionTask to delete this object.
+     * @param engine owning engine (required for task construction)
+     */
+    void scheduleDeferredDeletion(EventuallyPersistentEngine& engine) override;
+
 protected:
     /**
      * queue a background fetch of the specified item.
@@ -180,5 +202,11 @@ private:
     /* Pointer to the shard to which this VBucket belongs to */
     KVShard* shard;
 
+    /**
+     * When deferred deletion is enabled for this object we store the database
+     * file revision we will unlink from disk.
+     */
+    std::atomic<uint64_t> deferredDeletionFileRevision;
+
     friend class EPVBucketTest;
-};
+};
\ No newline at end of file
index 2eaaf45..3ff1fdc 100644 (file)
@@ -146,22 +146,27 @@ VBucketPtr EphemeralBucket::makeVBucket(
         uint64_t purgeSeqno,
         uint64_t maxCas,
         const std::string& collectionsManifest) {
-    return std::make_shared<EphemeralVBucket>(id,
-                                              state,
-                                              stats,
-                                              engine.getCheckpointConfig(),
-                                              shard,
-                                              lastSeqno,
-                                              lastSnapStart,
-                                              lastSnapEnd,
-                                              std::move(table),
-                                              std::move(newSeqnoCb),
-                                              engine.getConfiguration(),
-                                              eviction_policy,
-                                              initState,
-                                              purgeSeqno,
-                                              maxCas,
-                                              collectionsManifest);
+    // Not using make_shared or allocate_shared
+    // 1. make_shared doesn't accept a Deleter
+    // 2. allocate_shared has inconsistencies between platforms in calling
+    //    alloc.destroy (libc++ doesn't call it)
+    return VBucketPtr(new EphemeralVBucket(id,
+                                           state,
+                                           stats,
+                                           engine.getCheckpointConfig(),
+                                           shard,
+                                           lastSeqno,
+                                           lastSnapStart,
+                                           lastSnapEnd,
+                                           std::move(table),
+                                           std::move(newSeqnoCb),
+                                           engine.getConfiguration(),
+                                           eviction_policy,
+                                           initState,
+                                           purgeSeqno,
+                                           maxCas,
+                                           collectionsManifest),
+                      VBucket::DeferredDeleter(engine));
 }
 
 void EphemeralBucket::completeStatsVKey(const void* cookie,
index 87da513..92fdd6c 100644 (file)
@@ -22,6 +22,7 @@
 #include "failover-table.h"
 #include "linked_list.h"
 #include "stored_value_factories.h"
+#include "vbucketdeletiontask.h"
 
 EphemeralVBucket::EphemeralVBucket(id_type i,
                                    vbucket_state_t newState,
@@ -503,3 +504,14 @@ GetValue EphemeralVBucket::getInternalNonResident(
     /* We reach here only if the v is deleted and does not have any value */
     return GetValue();
 }
+
+void EphemeralVBucket::setupDeferredDeletion(const void* cookie) {
+    setDeferredDeletionCookie(cookie);
+    setDeferredDeletion(true);
+}
+
+void EphemeralVBucket::scheduleDeferredDeletion(
+        EventuallyPersistentEngine& engine) {
+    ExTask task = new VBucketMemoryDeletionTask(engine, this);
+    ExecutorPool::get()->schedule(task);
+}
index c28733d..ddfb754 100644 (file)
@@ -141,6 +141,14 @@ public:
      */
     size_t purgeTombstones(rel_time_t purgeAge);
 
+    void setupDeferredDeletion(const void* cookie) override;
+
+    /**
+     * Schedule a VBucketMemoryDeletionTask to delete this object.
+     * @param engine owning engine (required for task construction)
+     */
+    void scheduleDeferredDeletion(EventuallyPersistentEngine& engine) override;
+
 protected:
     /* Data structure for in-memory sequential storage */
     std::unique_ptr<SequenceList> seqList;
index a541dc1..b989d0b 100644 (file)
@@ -24,8 +24,9 @@
 static_assert(TaskPriority::MultiBGFetcherTask < TaskPriority::BGFetchCallback,
               "MultiBGFetcherTask not less than BGFetchCallback");
 
-static_assert(TaskPriority::BGFetchCallback == TaskPriority::VBDeleteTask,
-              "BGFetchCallback not equal VBDeleteTask");
+static_assert(TaskPriority::BGFetchCallback ==
+                      TaskPriority::VBucketMemoryAndDiskDeletionTask,
+              "BGFetchCallback not equal VBucketMemoryAndDiskDeletionTask");
 
 static_assert(TaskPriority::VKeyStatBGFetchTask < TaskPriority::FlusherTask,
               "VKeyStatBGFetchTask not less than FlusherTask");
index 8bc1431..3890043 100644 (file)
@@ -66,7 +66,9 @@ HashTable::HashTable(EPStats& st,
 }
 
 HashTable::~HashTable() {
-    clear(true);
+    // Use unlocked clear for the destructor, avoids lock inversions on VBucket
+    // delete
+    clear_UNLOCKED(true);
     // Wait for any outstanding visitors to finish.
     while (visitors > 0) {
 #ifdef _MSC_VER
@@ -87,6 +89,10 @@ void HashTable::clear(bool deactivate) {
         }
     }
     MultiLockHolder mlh(mutexes, n_locks);
+    clear_UNLOCKED(deactivate);
+}
+
+void HashTable::clear_UNLOCKED(bool deactivate) {
     if (deactivate) {
         setActiveState(false);
     }
index ee73118..53a48e7 100644 (file)
@@ -670,6 +670,8 @@ private:
         return nullptr;
     }
 
+    void clear_UNLOCKED(bool deactivate);
+
     DISALLOW_COPY_AND_ASSIGN(HashTable);
 };
 
index a498147..8e1b9bc 100644 (file)
@@ -52,7 +52,7 @@
 #include "replicationthrottle.h"
 #include "statwriter.h"
 #include "tapconnmap.h"
-#include "vbucketmemorydeletiontask.h"
+#include "vbucketdeletiontask.h"
 #include "vb_count_visitor.h"
 
 class StatsValueChangeListener : public ValueChangedListener {
@@ -724,15 +724,16 @@ ENGINE_ERROR_CODE KVBucket::setVBucketState(uint16_t vbid,
                                             bool transfer,
                                             bool notify_dcp) {
     // Lock to prevent a race condition between a failed update and add.
-    LockHolder lh(vbsetMutex);
+    std::unique_lock<std::mutex> lh(vbsetMutex);
     return setVBucketState_UNLOCKED(vbid, to, transfer, notify_dcp, lh);
 }
 
-ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(uint16_t vbid,
-                                                     vbucket_state_t to,
-                                                     bool transfer,
-                                                     bool notify_dcp,
-                                                     LockHolder& vbset) {
+ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(
+        uint16_t vbid,
+        vbucket_state_t to,
+        bool transfer,
+        bool notify_dcp,
+        std::unique_lock<std::mutex>& vbset) {
     VBucketPtr vb = vbMap.getBucket(vbid);
     if (vb && to == vb->getState()) {
         return ENGINE_SUCCESS;
@@ -804,6 +805,10 @@ ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(uint16_t vbid,
         // The first checkpoint for active vbucket should start with id 2.
         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
         newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
+
+        // Before adding the VB to the map increment the revision
+        getRWUnderlying(vbid)->incrementRevision(vbid);
+
         if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
             return ENGINE_ERANGE;
         }
@@ -836,70 +841,29 @@ void KVBucket::scheduleVBStatePersist(VBucket::id_type vbid) {
     vb->checkpointManager.queueSetVBState(*vb);
 }
 
-bool KVBucket::completeVBucketDeletion(uint16_t vbid, const void* cookie) {
-    hrtime_t start_time(gethrtime());
-    bool bucketDeleting;
-    {
-        LockHolder lh(vbsetMutex);
-        VBucketPtr vb = vbMap.getBucket(vbid);
-        bucketDeleting = !vb ||
-                vb->getState() == vbucket_state_dead ||
-                vb->isBucketDeletion();
-
-        if (bucketDeleting) {
-            LockHolder vlh(vb_mutexes[vbid]);
-            if (!getRWUnderlying(vbid)->delVBucket(vbid)) {
-                return false;
-            }
-            if (vb) {
-                vb->setBucketDeletion(false);
-                vb->setBucketCreation(false);
-                vb->setPersistenceSeqno(0);
-            }
-            ++stats.vbucketDeletions;
-        }
-    }
-
-    hrtime_t spent(gethrtime() - start_time);
-    hrtime_t wall_time = spent / 1000;
-    BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
-    stats.diskVBDelHisto.add(wall_time);
-    atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
-    stats.vbucketDelTotWalltime.fetch_add(wall_time);
-    if (cookie) {
-        engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
-    }
-
-    return true;
-}
-
-void KVBucket::scheduleVBDeletion(VBucketPtr &vb, const void* cookie,
-                                  double delay) {
-    ExTask delTask = make_STRCPtr<VBucketMemoryDeletionTask>(engine, vb, delay);
-    ExecutorPool::get()->schedule(delTask);
-
-    if (vb->setBucketDeletion(true)) {
-        ExTask task = make_STRCPtr<VBDeleteTask>(&engine, vb->getId(), cookie);
-        ExecutorPool::get()->schedule(task);
-    }
-}
-
 ENGINE_ERROR_CODE KVBucket::deleteVBucket(uint16_t vbid, const void* c) {
     // Lock to prevent a race condition between a failed update and add
     // (and delete).
-    VBucketPtr vb;
+    VBucketPtr vb = vbMap.getBucket(vbid);
+    if (!vb) {
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
     {
-        LockHolder lh(vbsetMutex);
-        vb = vbMap.getBucket(vbid);
-        if (!vb) {
-            return ENGINE_NOT_MY_VBUCKET;
-        }
+        std::unique_lock<std::mutex> vbSetLh(vbsetMutex);
+        // Obtain the vb_mutex for the VB to ensure we interlock with other
+        // threads that are manipulating the VB (particularly ones which may
+        // try and change the disk revision e.g. deleteAll and compaction).
+        std::unique_lock<std::mutex> vbMutLh(vb_mutexes[vbid]);
 
         vb->setState(vbucket_state_dead);
         engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
-        vbMap.removeBucket(vbid);
+
+        // Drop the VB to begin the delete, the last holder of the VB will
+        // unknowingly trigger the destructor which schedules a deletion task.
+        vbMap.dropVBucketAndSetupDeferredDeletion(vbid, c);
     }
-    scheduleVBDeletion(vb, c);
+
     if (c) {
         return ENGINE_EWOULDBLOCK;
     }
@@ -1071,23 +1035,28 @@ void KVBucket::updateCompactionTasks(DBFileId db_file_id) {
 }
 
 bool KVBucket::resetVBucket(uint16_t vbid) {
-    LockHolder lh(vbsetMutex);
-    return resetVBucket_UNLOCKED(vbid, lh);
+    std::unique_lock<std::mutex> lh(vbsetMutex);
+    // Obtain the vb_mutex for the VB to ensure we interlock with other
+    // threads that are manipulating the VB (particularly ones which may
+    // try and change the disk revision).
+    std::unique_lock<std::mutex> vbMutLh(vb_mutexes[vbid]);
+    return resetVBucket_UNLOCKED(vbid, lh, vbMutLh);
 }
 
-bool KVBucket::resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset) {
+bool KVBucket::resetVBucket_UNLOCKED(uint16_t vbid,
+                                     std::unique_lock<std::mutex>& vbset,
+                                     std::unique_lock<std::mutex>& vbMutex) {
     bool rv(false);
 
     VBucketPtr vb = vbMap.getBucket(vbid);
     if (vb) {
         vbucket_state_t vbstate = vb->getState();
 
-        vbMap.removeBucket(vbid);
+        vbMap.dropVBucketAndSetupDeferredDeletion(vbid, nullptr /*no cookie*/);
 
         checkpointCursorInfoList cursors =
                                         vb->checkpointManager.getAllCursors();
         // Delete and recreate the vbucket database file
-        scheduleVBDeletion(vb, NULL, 0);
         setVBucketState_UNLOCKED(vbid, vbstate,
                                  false/*transfer*/, true/*notifyDcp*/, vbset);
 
@@ -1920,7 +1889,7 @@ public:
 private:
 
     void redirty() {
-        if (vbucket->isBucketDeletion()) {
+        if (vbucket->isDeletionDeferred()) {
             // updating the member stats for the vbucket is not really necessary
             // as the vbucket is about to be deleted
             vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
@@ -1971,7 +1940,7 @@ void KVBucket::flushOneDeleteAll() {
         VBucketPtr vb = getVBucket(i);
         // Reset the vBucket if it's non-null and not already in the middle of
         // being created / destroyed.
-        if (vb && !(vb->isBucketCreation() || vb->isBucketDeletion())) {
+        if (vb && !(vb->isBucketCreation() || vb->isDeletionDeferred())) {
             LockHolder lh(vb_mutexes[vb->getId()]);
             getRWUnderlying(vb->getId())->reset(i);
         }
@@ -2696,11 +2665,11 @@ KVStore *KVBucket::getOneRWUnderlying(void) {
 }
 
 ENGINE_ERROR_CODE KVBucket::rollback(uint16_t vbid, uint64_t rollbackSeqno) {
-    LockHolder vbset(vbsetMutex);
+    std::unique_lock<std::mutex> vbset(vbsetMutex);
 
-    std::unique_lock<std::mutex> lh(vb_mutexes[vbid], std::try_to_lock);
+    std::unique_lock<std::mutex> vbMutexLh(vb_mutexes[vbid], std::try_to_lock);
 
-    if (!lh.owns_lock()) {
+    if (!vbMutexLh.owns_lock()) {
         return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
     }
 
@@ -2728,7 +2697,7 @@ ENGINE_ERROR_CODE KVBucket::rollback(uint16_t vbid, uint64_t rollbackSeqno) {
             }
         }
 
-        if (resetVBucket_UNLOCKED(vbid, vbset)) {
+        if (resetVBucket_UNLOCKED(vbid, vbset, vbMutexLh)) {
             VBucketPtr newVb = vbMap.getBucket(vbid);
             newVb->incrRollbackItemCount(prevHighSeqno);
             return ENGINE_SUCCESS;
index e447620..76262b8 100644 (file)
@@ -337,15 +337,6 @@ public:
                                       bool transfer, bool notify_dcp = true);
 
     /**
-     * Physically deletes a VBucket from disk. This function should only
-     * be called on a VBucket that has already been logically deleted.
-     *
-     * @param vbid vbucket id
-     * @param cookie The connection that requested the deletion
-     */
-    bool completeVBucketDeletion(uint16_t vbid, const void* cookie);
-
-    /**
      * Deletes a vbucket
      *
      * @param vbid The vbucket to delete.
@@ -769,10 +760,6 @@ protected:
      */
     void compactInternal(compaction_ctx *ctx);
 
-    void scheduleVBDeletion(VBucketPtr &vb,
-                            const void* cookie,
-                            double delay = 0);
-
     void flushOneDeleteAll(void);
     PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
                                           VBucketPtr &vb);
@@ -781,11 +768,16 @@ protected:
                          vbucket_state_t allowedState,
                          get_options_t options = TRACK_REFERENCE);
 
-    bool resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset);
+    bool resetVBucket_UNLOCKED(uint16_t vbid,
+                               std::unique_lock<std::mutex>& vbset,
+                               std::unique_lock<std::mutex>& vbMutex);
 
-    ENGINE_ERROR_CODE setVBucketState_UNLOCKED(uint16_t vbid, vbucket_state_t state,
-                                               bool transfer, bool notify_dcp,
-                                               LockHolder& vbset);
+    ENGINE_ERROR_CODE setVBucketState_UNLOCKED(
+            uint16_t vbid,
+            vbucket_state_t state,
+            bool transfer,
+            bool notify_dcp,
+            std::unique_lock<std::mutex>& vbset);
 
     /* Notify flusher of a new seqno being added in the vbucket */
     virtual void notifyFlusher(const uint16_t vbid);
index b79fafd..9a7c610 100644 (file)
@@ -466,15 +466,6 @@ public:
                                               bool notify_dcp = true) = 0;
 
     /**
-     * Physically deletes a VBucket from disk. This function should only
-     * be called on a VBucket that has already been logically deleted.
-     *
-     * @param vbid vbucket id
-     * @param cookie The connection that requested the deletion
-     */
-    virtual bool completeVBucketDeletion(uint16_t vbid, const void* cookie) = 0;
-
-    /**
      * Deletes a vbucket
      *
      * @param vbid The vbucket to delete.
@@ -845,10 +836,6 @@ protected:
      */
     virtual void compactInternal(compaction_ctx *ctx) = 0;
 
-    virtual void scheduleVBDeletion(VBucketPtr &vb,
-                            const void* cookie,
-                            double delay = 0) = 0;
-
     virtual void flushOneDeleteAll(void) = 0;
     virtual PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
                                                   VBucketPtr &vb) = 0;
index 8a0aa99..e0e8388 100644 (file)
@@ -77,8 +77,12 @@ void KVShard::setBucket(VBucketPtr vb) {
     vbuckets[vb->getId()].lock().set(vb);
 }
 
-void KVShard::resetBucket(uint16_t id) {
-    vbuckets[id].lock().reset();
+void KVShard::dropVBucketAndSetupDeferredDeletion(VBucket::id_type id,
+                                                  const void* cookie) {
+    auto vb = vbuckets[id].lock();
+    auto vbPtr = vb.get();
+    vbPtr->setupDeferredDeletion(cookie);
+    vb.reset();
 }
 
 std::vector<VBucket::id_type> KVShard::getVBucketsSortedByState() {
index db95063..b6ccdfe 100644 (file)
@@ -21,6 +21,7 @@
 
 #include "kvstore.h"
 #include "utility.h"
+#include "vbucket.h"
 
 #include <atomic>
 
@@ -79,7 +80,20 @@ public:
 
     VBucketPtr getBucket(VBucket::id_type id) const;
     void setBucket(VBucketPtr vb);
-    void resetBucket(VBucket::id_type id);
+
+    /**
+     * Drop the vbucket from the map and setup deferred deletion of the VBucket.
+     * Once the VBucketPtr has no more references the vbucket is deleted, but
+     * deletion occurs via a task that is scheduled by the VBucketPtr deleter,
+     * ensuring no front-end thread deletes the memory/disk associated with the
+     * VBucket.
+     *
+     * @param id The VB to drop
+     * @param cookie Optional connection cookie, this cookie will be notified
+     *        when the deletion task is completed.
+     */
+    void dropVBucketAndSetupDeferredDeletion(VBucket::id_type id,
+                                             const void* cookie);
 
     KVShard::id_type getId() const {
         return kvConfig.getShardId();
index fa32a07..2620e7a 100644 (file)
@@ -721,9 +721,9 @@ public:
     }
 
     /**
-     * Reset the store to a clean state.
+     * Reset the vbucket to a clean state.
      */
-    virtual void reset(uint16_t shardId) = 0;
+    virtual void reset(uint16_t vbid) = 0;
 
     /**
      * Begin a transaction (if not already in one).
@@ -791,9 +791,9 @@ public:
      * Delete a given vbucket database instance from underlying storage
      *
      * @param vbucket vbucket id
-     * return true, if vbucket deletion was successful. Else, false.
+     * @param fileRev the revision of the file to delete
      */
-    virtual bool delVBucket(uint16_t vbucket) = 0;
+    virtual void delVBucket(uint16_t vbucket, uint64_t fileRev) = 0;
 
     /**
      * Get a list of all persisted vbuckets (with their states).
@@ -969,6 +969,20 @@ public:
      */
     virtual std::string getCollectionsManifest(uint16_t vbid) = 0;
 
+    /**
+     * Increment the revision number of the vbucket.
+     * @param vbid ID of the vbucket to change.
+     */
+    virtual void incrementRevision(uint16_t vbid) = 0;
+
+    /**
+     * Prepare for delete of the vbucket file
+     *
+     * @param vbid ID of the vbucket being deleted
+     * @return the revision ID to delete (via ::delVBucket)
+     */
+    virtual uint64_t prepareToDelete(uint16_t vbid) = 0;
+
 protected:
 
     /* all stats */
@@ -982,8 +996,6 @@ protected:
     Couchbase::RelaxedAtomic<uint16_t> cachedValidVBCount;
     std::list<PersistenceCallback *> pcbs;
 
-protected:
-
     void createDataDir(const std::string& dbname);
     template <typename T>
     void addStat(const std::string& prefix, const char* nm, T& val,
index 7b589a1..c5ddb3f 100644 (file)
@@ -36,11 +36,6 @@ bool FlusherTask::run() {
     return flusher->step(this);
 }
 
-bool VBDeleteTask::run() {
-    TRACE_EVENT("ep-engine/task", "VBDeleteTask", vbucketId, cookie);
-    return !engine->getKVBucket()->completeVBucketDeletion(vbucketId, cookie);
-}
-
 bool CompactTask::run() {
     TRACE_EVENT("ep-engine/task", "CompactTask", compactCtx.db_file_id);
     return engine->getKVBucket()->doCompact(&compactCtx, cookie);
index 747a8c0..9ffbce2 100644 (file)
@@ -50,9 +50,9 @@ TASK(AccessScanner, AUXIO_TASK_IDX, 3)
 TASK(AccessScannerVisitor, AUXIO_TASK_IDX, 3)
 TASK(ActiveStreamCheckpointProcessorTask, AUXIO_TASK_IDX, 5)
 TASK(BackfillManagerTask, AUXIO_TASK_IDX, 8)
+TASK(VBucketMemoryAndDiskDeletionTask, AUXIO_TASK_IDX, 1)
 
 // Read/Write IO tasks
-TASK(VBDeleteTask, WRITER_TASK_IDX, 1)
 TASK(RollbackTask, WRITER_TASK_IDX, 1)
 TASK(CompactVBucketTask, WRITER_TASK_IDX, 2)
 TASK(FlusherTask, WRITER_TASK_IDX, 5)
index 914981f..3ef5c86 100644 (file)
@@ -56,32 +56,6 @@ private:
 };
 
 /**
- * A task for deleting VBucket files from disk and cleaning up any outstanding
- * writes for that VBucket file.
- * sid (shard ID) passed on to GlobalTask indicates that task needs to be
- *     serialized with other tasks that require serialization on its shard
- */
-class VBDeleteTask : public GlobalTask {
-public:
-    VBDeleteTask(EventuallyPersistentEngine *e, uint16_t vbid, const void* c,
-                 bool completeBeforeShutdown = true)
-        : GlobalTask(e, TaskId::VBDeleteTask, 0, completeBeforeShutdown),
-          vbucketId(vbid), cookie(c),
-          description("Deleting VBucket:" + std::to_string(vbucketId)) {}
-
-    bool run();
-
-    cb::const_char_buffer getDescription() {
-        return description;
-    }
-
-private:
-    uint16_t vbucketId;
-    const void* cookie;
-    const std::string description;
-};
-
-/**
  * A task for compacting a vbucket db file
  */
 class CompactTask : public GlobalTask {
index d4aff69..6f6ff9a 100644 (file)
@@ -31,6 +31,7 @@
 #include "failover-table.h"
 #include "flusher.h"
 #include "pre_link_document_context.h"
+#include "vbucketdeletiontask.h"
 
 #define STATWRITER_NAMESPACE vbucket
 #include "statwriter.h"
@@ -188,7 +189,8 @@ VBucket::VBucket(id_type i,
       statPrefix("vb_" + std::to_string(i)),
       persistenceCheckpointId(0),
       bucketCreation(false),
-      bucketDeletion(false),
+      deferredDeletion(false),
+      deferredDeletionCookie(nullptr),
       newSeqnoCb(std::move(newSeqnoCb)),
       manifest(collectionsManifest) {
     if (config.getConflictResolutionType().compare("lww") == 0) {
@@ -2403,3 +2405,13 @@ std::unique_ptr<Item> VBucket::pruneXattrDocument(
         return {};
     }
 }
+
+void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
+    // If the vbucket is marked as deleting then we must schedule task to
+    // perform the resource destruction (memory/disk).
+    if (vb->isDeletionDeferred()) {
+        vb->scheduleDeferredDeletion(engine);
+        return;
+    }
+    delete vb;
+}
index 374f1e1..4f1ebad 100644 (file)
@@ -186,6 +186,7 @@ private:
 class EventuallyPersistentEngine;
 class FailoverTable;
 class KVShard;
+class VBucketMemoryDeletionTask;
 
 /**
  * An individual vbucket.
@@ -298,16 +299,48 @@ public:
         return bucketCreation.compare_exchange_strong(inverse, rv);
     }
 
-    // States whether the VBucket is in the process of being deleted
-    bool isBucketDeletion() const {
-        return bucketDeletion.load();
+    /**
+     * @return true if the vbucket deletion is to be deferred to a background
+     *         task.
+     */
+    bool isDeletionDeferred() const {
+        return deferredDeletion.load();
     }
 
-    bool setBucketDeletion(bool delBucket) {
-        bool inverse = !delBucket;
-        return bucketDeletion.compare_exchange_strong(inverse, delBucket);
+    /**
+     * @param value true if the vbucket's deletion should be deferred to a
+     *        background task. This is for VBucket objects created by
+     *        makeVBucket and owned by a VBucketPtr. If the VBucket was manually
+     *        created this will have no effect on deletion.
+     */
+    void setDeferredDeletion(bool value) {
+        deferredDeletion.store(true);
     }
 
+    /**
+     * @param A cookie to notify when the deferred deletion completes.
+     */
+    void setDeferredDeletionCookie(const void* cookie) {
+        deferredDeletionCookie = cookie;
+    }
+
+    /**
+     * @return the cookie which could of been set when setupDeferredDeletion was
+     *         called.
+     */
+    const void* getDeferredDeletionCookie() const {
+        return deferredDeletionCookie;
+    }
+
+    /**
+     * Setup deferred deletion, this is where deletion of the vbucket is
+     * deferred and completed by an AUXIO/NONIO task. AUXIO for EPVBucket
+     * as it will hit disk for the data file unlink, NONIO is used for
+     * EphemeralVBucket as only memory resources need freeing.
+     * @param cookie A cookie to notify when the deletion task completes.
+     */
+    virtual void setupDeferredDeletion(const void* cookie) = 0;
+
     // Returns the last persisted sequence number for the VBucket
     virtual uint64_t getPersistenceSeqno() const = 0;
 
@@ -1168,6 +1201,27 @@ public:
 
     std::atomic<size_t>  numExpiredItems;
 
+    /**
+     * A custom delete function for deleting VBucket objects. Any thread could
+     * be the last thread to release a VBucketPtr and deleting a VB will
+     * eventually hit the I/O sub-system when we unlink the file, to be sure no
+     * front-end thread does this work, we schedule the deletion to a background
+     * task. This task scheduling is triggered by the shared_ptr/VBucketPtr
+     * using this object as the deleter.
+     */
+    struct DeferredDeleter {
+        DeferredDeleter(EventuallyPersistentEngine& engine) : engine(engine) {
+        }
+
+        /**
+         * Called when the VBucketPtr has no more owners and runs delete on
+         * the object.
+         */
+        void operator()(VBucket* vb) const;
+
+        EventuallyPersistentEngine& engine;
+    };
+
 protected:
     /**
      * This function checks cas, expiry and other partition (vbucket) related
@@ -1388,6 +1442,15 @@ protected:
     /* size of list hpVBReqs (to avoid MB-9434) */
     Couchbase::RelaxedAtomic<size_t> numHpVBReqs;
 
+    /**
+     * VBucket sub-classes must implement a function that will schedule
+     * an appropriate task that will delete the VBucket and its resources.
+     *
+     * @param engine owning engine (required for task construction)
+     */
+    virtual void scheduleDeferredDeletion(
+            EventuallyPersistentEngine& engine) = 0;
+
 private:
     void fireAllOps(EventuallyPersistentEngine& engine, ENGINE_ERROR_CODE code);
 
@@ -1594,10 +1657,13 @@ private:
     std::string statPrefix;
     // The persistence checkpoint ID for this vbucket.
     std::atomic<uint64_t> persistenceCheckpointId;
-    // Flag to indicate the bucket is being created
+    // Flag to indicate the vbucket is being created
     std::atomic<bool> bucketCreation;
-    // Flag to indicate the bucket is being deleted
-    std::atomic<bool> bucketDeletion;
+    // Flag to indicate the vbucket deletion is deferred
+    std::atomic<bool> deferredDeletion;
+    /// A cookie that can be set when the vbucket is deletion is deferred, the
+    /// cookie will be notified when the deferred deletion completes
+    const void* deferredDeletionCookie;
 
     // Ptr to the item conflict resolution module
     std::unique_ptr<ConflictResolution> conflictResolver;
diff --git a/src/vbucketdeletiontask.cc b/src/vbucketdeletiontask.cc
new file mode 100644 (file)
index 0000000..30c7315
--- /dev/null
@@ -0,0 +1,98 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#include "vbucketdeletiontask.h"
+#include "ep_engine.h"
+#include "ep_vb.h"
+#include "executorpool.h"
+#include "kvshard.h"
+
+#include <phosphor/phosphor.h>
+#include <platform/processclock.h>
+
+VBucketMemoryDeletionTask::VBucketMemoryDeletionTask(
+        EventuallyPersistentEngine& eng, VBucket* vb, TaskId tid)
+    : GlobalTask(&eng, tid, 0.0, true), vbucket(vb) {
+    if (!vbucket) {
+        throw std::logic_error(
+                "VBucketMemoryDeletionTask::VBucketMemoryDeletionTask no "
+                "vbucket");
+    }
+    description = "Removing (dead) vb:" + std::to_string(vbucket->getId()) +
+                  " from memory";
+}
+
+cb::const_char_buffer VBucketMemoryDeletionTask::getDescription() {
+    return description;
+}
+
+bool VBucketMemoryDeletionTask::run() {
+    TRACE_EVENT(
+            "ep-engine/task", "VBucketMemoryDeletionTask", vbucket->getId());
+
+    notifyAllPendingConnsFailed(true);
+
+    return false;
+}
+
+void VBucketMemoryDeletionTask::notifyAllPendingConnsFailed(
+        bool notifyIfCookieSet) {
+    vbucket->notifyAllPendingConnsFailed(*engine);
+
+    if (notifyIfCookieSet && vbucket->getDeferredDeletionCookie()) {
+        engine->notifyIOComplete(vbucket->getDeferredDeletionCookie(),
+                                 ENGINE_SUCCESS);
+    }
+}
+
+VBucketMemoryAndDiskDeletionTask::VBucketMemoryAndDiskDeletionTask(
+        EventuallyPersistentEngine& eng, KVShard& shard, EPVBucket* vb)
+    : VBucketMemoryDeletionTask(eng,
+                                static_cast<VBucket*>(vb),
+                                TaskId::VBucketMemoryAndDiskDeletionTask),
+      shard(shard),
+      vbDeleteRevision(vb->getDeferredDeletionFileRevision()) {
+    description += " and disk";
+}
+
+bool VBucketMemoryAndDiskDeletionTask::run() {
+    TRACE_EVENT("ep-engine/task",
+                "VBucketMemoryAndDiskDeletionTask",
+                vbucket->getId());
+    notifyAllPendingConnsFailed(false);
+
+    auto start = ProcessClock::now();
+    shard.getRWUnderlying()->delVBucket(vbucket->getId(), vbDeleteRevision);
+    auto elapsed = ProcessClock::now() - start;
+    auto wallTime =
+            std::chrono::duration_cast<std::chrono::microseconds>(elapsed);
+
+    engine->getEpStats().vbucketDeletions++;
+    BlockTimer::log(
+            elapsed.count(), "disk_vb_del", engine->getEpStats().timingLog);
+    engine->getEpStats().diskVBDelHisto.add(wallTime.count());
+    atomic_setIfBigger(engine->getEpStats().vbucketDelMaxWalltime,
+                       hrtime_t(wallTime.count()));
+    engine->getEpStats().vbucketDelTotWalltime.fetch_add(wallTime.count());
+
+    if (vbucket->getDeferredDeletionCookie()) {
+        engine->notifyIOComplete(vbucket->getDeferredDeletionCookie(),
+                                 ENGINE_SUCCESS);
+    }
+
+    return false;
+}
diff --git a/src/vbucketdeletiontask.h b/src/vbucketdeletiontask.h
new file mode 100644 (file)
index 0000000..f36fa5f
--- /dev/null
@@ -0,0 +1,92 @@
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ *     Copyright 2017 Couchbase, Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+#pragma once
+
+#include "globaltask.h"
+#include "vbucket.h"
+
+class EPVBucket;
+
+/*
+ * This is a NONIO task called as part of VB deletion.  The task is responsible
+ * for clearing all the VBucket's pending operations and for deleting the
+ * VBucket (via a smart pointer).
+ *
+ * This task is designed to be invoked only when the VBucket has no owners.
+ */
+class VBucketMemoryDeletionTask : public GlobalTask {
+public:
+    /**
+     * @param engine required for GlobalTask construction
+     * @param vbucket the vbucket object to delete
+     */
+    VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
+                              VBucket* vbucket,
+                              TaskId tid = TaskId::VBucketMemoryDeletionTask);
+
+    cb::const_char_buffer getDescription();
+
+    bool run();
+
+protected:
+    /**
+     * Call vbucket->notifyAllPendingConnsFailed and optionally perform
+     * notifyIOComplete
+     *
+     * @pararm notifyIfCookieSet set to true if the function should perform
+     *         notifyIOComplete on vbucket->getDeletingCookie()
+     */
+    void notifyAllPendingConnsFailed(bool notifyIfCookieSet);
+
+    /**
+     * The vbucket we are deleting is stored in a unique_ptr for RAII deletion
+     * once this task is finished and itself deleted, the VBucket will be
+     * deleted.
+     */
+    std::unique_ptr<VBucket> vbucket;
+    std::string description;
+};
+
+/*
+ * This is an AUXIO task called as part of EPVBucket deletion.  The task is
+ * responsible for clearing all the VBucket's pending operations and for
+ * clearing the VBucket's hash table and removing the disk file.
+ *
+ * This task is designed to be invoked only when the EPVBucket has no owners.
+ */
+class VBucketMemoryAndDiskDeletionTask : public VBucketMemoryDeletionTask {
+public:
+    /**
+     * This task will as part of construction increase the vbucket's disk
+     * revision so that the delete can remove the file without new
+     * instances of the same vbucket writing to the file.
+     *
+     * @param engine requird for GlobalTask construction
+     * @param shard the KVShard to use for deletion
+     * @param vbucket the Eventually Persistent vbucket object to delete
+     */
+    VBucketMemoryAndDiskDeletionTask(EventuallyPersistentEngine& engine,
+                                     KVShard& shard,
+                                     EPVBucket* vbucket);
+
+    bool run();
+
+protected:
+    KVShard& shard;
+    uint64_t vbDeleteRevision;
+};
\ No newline at end of file
index b4e4895..bd49573 100644 (file)
@@ -63,11 +63,10 @@ ENGINE_ERROR_CODE VBucketMap::addBucket(VBucketPtr vb) {
     return ENGINE_ERANGE;
 }
 
-void VBucketMap::removeBucket(id_type id) {
+void VBucketMap::dropVBucketAndSetupDeferredDeletion(id_type id,
+                                                     const void* cookie) {
     if (id < size) {
-        // Theoretically, this could be off slightly.  In
-        // practice, this happens only on dead vbuckets.
-        getShardByVbId(id)->resetBucket(id);
+        getShardByVbId(id)->dropVBucketAndSetupDeferredDeletion(id, cookie);
     }
 }
 
index cc3b7aa..05df5cd 100644 (file)
@@ -58,12 +58,23 @@ public:
 
     /**
      * Add the VBucket to the map - extending the lifetime of the object until
-     * it is removed from the map via removeBucket.
+     * it is removed from the map via dropAndDeleteVBucket.
      * @param vb shared pointer to the VBucket we are storing.
      */
     ENGINE_ERROR_CODE addBucket(VBucketPtr vb);
 
-    void removeBucket(id_type id);
+    /**
+     * Drop the vbucket from the map and setup deferred deletion of the VBucket.
+     * Once the VBucketPtr has no more references the vbucket is deleted, but
+     * deletion occurs via a task that is scheduled by the VBucketPtr deleter,
+     * ensuring no front-end thread deletes the memory/disk associated with the
+     * VBucket.
+     *
+     * @param id The VB to drop
+     * @param cookie Optional connection cookie, this cookie will be notified
+     *        when the deletion task is completed.
+     */
+    void dropVBucketAndSetupDeferredDeletion(id_type id, const void* cookie);
     VBucketPtr getBucket(id_type id) const;
 
     // Returns the size of the map, i.e. the total number of VBuckets it can
diff --git a/src/vbucketmemorydeletiontask.cc b/src/vbucketmemorydeletiontask.cc
deleted file mode 100644 (file)
index c66ceef..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- *     Copyright 2016 Couchbase, Inc
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-#include "vbucketmemorydeletiontask.h"
-
-#include <phosphor/phosphor.h>
-
-#include <sstream>
-
-VBucketMemoryDeletionTask::VBucketMemoryDeletionTask(
-        EventuallyPersistentEngine& eng, VBucketPtr& vb, double delay)
-    : GlobalTask(&eng, TaskId::VBucketMemoryDeletionTask, delay, true),
-      e(eng),
-      vbucket(vb) {
-    if (!vb) {
-        throw std::invalid_argument(
-                "VBucketMemoryDeletionTask: vb to delete cannot be null");
-    }
-    description = "Removing (dead) vb:" + std::to_string(vbucket->getId()) +
-                  " from memory";
-}
-
-cb::const_char_buffer VBucketMemoryDeletionTask::getDescription() {
-    return description;
-}
-
-bool VBucketMemoryDeletionTask::run() {
-    TRACE_EVENT("ep-engine/task", "VBucketMemoryDeletionTask",
-                vbucket->getId());
-    vbucket->notifyAllPendingConnsFailed(e);
-    vbucket.reset();
-    return false;
-}
diff --git a/src/vbucketmemorydeletiontask.h b/src/vbucketmemorydeletiontask.h
deleted file mode 100644 (file)
index b26da60..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-/*
- *     Copyright 2016 Couchbase, Inc
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-#pragma once
-
-#include "globaltask.h"
-#include "vbucket.h"
-/*
- * This is a NONIO task called as part of VB deletion.  The task is responsible
- * for clearing all the VBucket's pending operations and for clearing the
- * VBucket's hash table.
- */
-class VBucketMemoryDeletionTask : public GlobalTask {
-public:
-    VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
-                              VBucketPtr& vb,
-                              double delay);
-
-    cb::const_char_buffer getDescription();
-
-    bool run();
-
-private:
-    EventuallyPersistentEngine& e;
-    VBucketPtr vbucket;
-    std::string description;
-};
index 42f860b..7c0fc3f 100644 (file)
@@ -1667,18 +1667,17 @@ static enum test_result test_bug2509(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
 static enum test_result test_bug7023(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     std::vector<std::string> keys;
     // Make a vbucket mess.
-    for (int j = 0; j < 10000; ++j) {
-        std::stringstream ss;
-        ss << "key" << j;
-        std::string key(ss.str());
-        keys.push_back(key);
+    const int nitems = 10000;
+    const int iterations = 5;
+    for (int j = 0; j < nitems; ++j) {
+        keys.push_back("key" + std::to_string(j));
     }
 
     std::vector<std::string>::iterator it;
-    for (int j = 0; j < 5; ++j) {
+    for (int j = 0; j < iterations; ++j) {
         check(set_vbucket_state(h, h1, 0, vbucket_state_dead),
               "Failed set set vbucket 0 dead.");
-        vbucketDelete(h, h1, 0, "async=0");
+        vbucketDelete(h, h1, 0);
         checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS,
                 last_status.load(),
                 "Expected vbucket deletion to work.");
@@ -1702,7 +1701,9 @@ static enum test_result test_bug7023(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
                                   testHarness.get_current_testcase()->cfg,
                                   true, false);
         wait_for_warmup_complete(h, h1);
-        return get_int_stat(h, h1, "ep_warmup_value_count", "warmup") == 10000 ? SUCCESS : FAIL;
+        checkeq(nitems,
+                get_int_stat(h, h1, "ep_warmup_value_count", "warmup"),
+                "Incorrect items following warmup");
     }
     return SUCCESS;
 }
index a92c872..9925665 100644 (file)
@@ -484,23 +484,17 @@ TEST_F(SingleThreadedEPBucketTest, MB19695_doTapVbTakeoverStats) {
     // [[1] Set our state to replica.
     setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
 
-    auto& lpWriterQ = *task_executor->getLpTaskQ()[WRITER_TASK_IDX];
-    auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
-
     // [[2]] Perform a vbucket reset. This will perform some work synchronously,
-    // but also created 2 tasks and notifies the flusher:
-    //   1. vbucket memory deletion (NONIO)
-    //   2. vbucket disk deletion (WRITER)
-    //   3. FlusherTask notified (WRITER)
+    // but also creates the task that will delete the VB.
+    //   * vbucket memory and disk deletion (AUXIO)
     // MB-19695: If we try to get the number of persisted deletes between
-    // steps (2) and (3) running then an exception is thrown (and client
+    // steps [[2]] and [[3]] running then an exception is thrown (and client
     // disconnected).
     EXPECT_TRUE(store->resetVBucket(vbid));
+    auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
+    runNextTask(lpAuxioQ, "Removing (dead) vb:0 from memory and disk");
 
-    runNextTask(lpNonioQ, "Removing (dead) vb:0 from memory");
-    runNextTask(lpWriterQ, "Deleting VBucket:0");
-
-    // [[2]] Ok, let's see if we can get TAP takeover stats. This will
+    // [[3]] Ok, let's see if we can get TAP takeover stats. This will
     // fail with MB-19695.
     // Dummy callback to pass into the stats function below.
     auto dummy_cb = [](const char *key, const uint16_t klen,
index d0a6189..4ac54c1 100644 (file)
@@ -34,7 +34,7 @@
 #include "tapconnmap.h"
 #include "tests/mock/mock_global_task.h"
 #include "tests/module_tests/test_helpers.h"
-#include "vbucketmemorydeletiontask.h"
+#include "vbucketdeletiontask.h"
 
 #include <thread>
 
@@ -339,7 +339,7 @@ TEST_P(EPStoreEvictionTest, MB_21976) {
                                                        HIDE_LOCKED_CAS |
                                                        TRACK_STATISTICS);
     GetValue gv = store->get(makeStoredDocKey("key"), vbid, cookie, options);
-    EXPECT_EQ(ENGINE_EWOULDBLOCK,gv.getStatus());
+    EXPECT_EQ(ENGINE_EWOULDBLOCK, gv.getStatus());
 
     // Mark the status of the cookie so that we can see if notify is called
     lock_mock_cookie(cookie);
@@ -347,14 +347,19 @@ TEST_P(EPStoreEvictionTest, MB_21976) {
     c->status = ENGINE_E2BIG;
     unlock_mock_cookie(cookie);
 
-    // Manually run the VBucketMemoryDeletionTask task
-    VBucketPtr vb = store->getVBucket(vbid);
-    VBucketMemoryDeletionTask deletionTask(*engine, vb, /*delay*/0.0);
-    deletionTask.run();
+    const void* deleteCookie = create_mock_cookie();
+
+    // lock the cookie, waitfor will release and enter the internal cond-var
+    lock_mock_cookie(deleteCookie);
+    store->deleteVBucket(vbid, deleteCookie);
+    waitfor_mock_cookie(deleteCookie);
+    unlock_mock_cookie(deleteCookie);
 
     // Check the status of the cookie to see if the cookie status has changed
     // to ENGINE_NOT_MY_VBUCKET, which means the notify was sent
     EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, c->status);
+
+    destroy_mock_cookie(deleteCookie);
 }
 
 TEST_P(EPStoreEvictionTest, TouchCmdDuringBgFetch) {
index a585eaf..121b10f 100644 (file)
@@ -34,7 +34,7 @@
 #include "tasks.h"
 #include "tests/mock/mock_global_task.h"
 #include "tests/module_tests/test_helpers.h"
-#include "vbucketmemorydeletiontask.h"
+#include "vbucketdeletiontask.h"
 
 #include <platform/dirutils.h>
 #include <chrono>