MB-23714: Make VBucketPtr a std::shared_ptr 57/76557/10
authorJim Walker <jim@couchbase.com>
Wed, 12 Apr 2017 13:39:18 +0000 (14:39 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 19 Apr 2017 11:32:43 +0000 (11:32 +0000)
Change the VBucketPtr definition so that VBucket poiners are managed
using std::shared_ptr.

To enable some functions to turn this* into a VBucketPtr, VBucket
inherits std::enable_shared_from_this. The only current user of this
is EphemeralVBucket where it constructs the DCPBackfillMemory with a
shared pointer to itself. Arguably EphemeralVBucket could be the class
to inherit std::enable_shared_from_this, but giving the base-class the
shared_from_this method seems more flexible for future enhancement.

Change-Id: Id2f39ece3983509b5c6742107de56b1dcba2d844
Reviewed-on: http://review.couchbase.org/76557
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
13 files changed:
src/dcp/backfill_memory.cc
src/dcp/backfill_memory.h
src/ep_bucket.cc
src/ephemeral_bucket.cc
src/ephemeral_tombstone_purger.cc
src/ephemeral_tombstone_purger.h
src/ephemeral_vb.cc
src/ephemeral_vb.h
src/kvshard.cc
src/kvshard.h
src/vbucket.h
src/vbucketmap.cc
src/vbucketmap.h

index 04384cd..771e330 100644 (file)
@@ -23,7 +23,7 @@
 #include "ephemeral_vb.h"
 #include "seqlist.h"
 
-DCPBackfillMemory::DCPBackfillMemory(SingleThreadedRCPtr<EphemeralVBucket> evb,
+DCPBackfillMemory::DCPBackfillMemory(EphemeralVBucketPtr evb,
                                      const active_stream_t& s,
                                      uint64_t startSeqno,
                                      uint64_t endSeqno)
index ee8a75c..64936e1 100644 (file)
@@ -21,8 +21,7 @@
 
 #include "callbacks.h"
 #include "dcp/backfill.h"
-
-class EphemeralVBucket;
+#include "ephemeral_vb.h"
 
 /**
  * Concrete class that does backfill from in-memory ordered data strucuture and
@@ -34,7 +33,7 @@ class EphemeralVBucket;
  */
 class DCPBackfillMemory : public DCPBackfill {
 public:
-    DCPBackfillMemory(SingleThreadedRCPtr<EphemeralVBucket> evb,
+    DCPBackfillMemory(EphemeralVBucketPtr evb,
                       const active_stream_t& s,
                       uint64_t startSeqno,
                       uint64_t endSeqno);
@@ -54,5 +53,5 @@ private:
     /**
      * Ref counted ptr to EphemeralVBucket
      */
-    SingleThreadedRCPtr<EphemeralVBucket> evb;
+    EphemeralVBucketPtr evb;
 };
index 33ff843..9ce29b0 100644 (file)
@@ -231,23 +231,23 @@ VBucketPtr EPBucket::makeVBucket(VBucket::id_type id,
                                      uint64_t maxCas,
                                      const std::string& collectionsManifest) {
     auto flusherCb = std::make_shared<NotifyFlusherCB>(shard);
-    return VBucketPtr(new EPVBucket(id,
-                                        state,
-                                        stats,
-                                        engine.getCheckpointConfig(),
-                                        shard,
-                                        lastSeqno,
-                                        lastSnapStart,
-                                        lastSnapEnd,
-                                        std::move(table),
-                                        flusherCb,
-                                        std::move(newSeqnoCb),
-                                        engine.getConfiguration(),
-                                        eviction_policy,
-                                        initState,
-                                        purgeSeqno,
-                                        maxCas,
-                                        collectionsManifest));
+    return std::make_shared<EPVBucket>(id,
+                                       state,
+                                       stats,
+                                       engine.getCheckpointConfig(),
+                                       shard,
+                                       lastSeqno,
+                                       lastSnapStart,
+                                       lastSnapEnd,
+                                       std::move(table),
+                                       flusherCb,
+                                       std::move(newSeqnoCb),
+                                       engine.getConfiguration(),
+                                       eviction_policy,
+                                       initState,
+                                       purgeSeqno,
+                                       maxCas,
+                                       collectionsManifest);
 }
 
 ENGINE_ERROR_CODE EPBucket::statsVKey(const DocKey& key,
index 54ba4e6..a6ab370 100644 (file)
@@ -147,22 +147,22 @@ VBucketPtr EphemeralBucket::makeVBucket(
         uint64_t purgeSeqno,
         uint64_t maxCas,
         const std::string& collectionsManifest) {
-    return RCPtr<VBucket>(new EphemeralVBucket(id,
-                                               state,
-                                               stats,
-                                               engine.getCheckpointConfig(),
-                                               shard,
-                                               lastSeqno,
-                                               lastSnapStart,
-                                               lastSnapEnd,
-                                               std::move(table),
-                                               std::move(newSeqnoCb),
-                                               engine.getConfiguration(),
-                                               eviction_policy,
-                                               initState,
-                                               purgeSeqno,
-                                               maxCas,
-                                               collectionsManifest));
+    return std::make_shared<EphemeralVBucket>(id,
+                                              state,
+                                              stats,
+                                              engine.getCheckpointConfig(),
+                                              shard,
+                                              lastSeqno,
+                                              lastSnapStart,
+                                              lastSnapEnd,
+                                              std::move(table),
+                                              std::move(newSeqnoCb),
+                                              engine.getConfiguration(),
+                                              eviction_policy,
+                                              initState,
+                                              purgeSeqno,
+                                              maxCas,
+                                              collectionsManifest);
 }
 
 void EphemeralBucket::completeStatsVKey(const void* cookie,
index 3c3bfd8..9e4a1f6 100644 (file)
@@ -54,7 +54,7 @@ EphemeralVBucket::VBTombstonePurger::VBTombstonePurger(rel_time_t purgeAge)
     : purgeAge(purgeAge), numPurgedItems(0) {
 }
 
-void EphemeralVBucket::VBTombstonePurger::visitBucket(RCPtr<VBucket>& vb) {
+void EphemeralVBucket::VBTombstonePurger::visitBucket(VBucketPtr& vb) {
     auto vbucket = dynamic_cast<EphemeralVBucket*>(vb.get());
     if (!vbucket) {
         throw std::invalid_argument(
index cb1e36b..22361ed 100644 (file)
@@ -69,7 +69,7 @@ class EphemeralVBucket::VBTombstonePurger : public VBucketVisitor {
 public:
     VBTombstonePurger(rel_time_t purgeAge);
 
-    void visitBucket(RCPtr<VBucket>& vb) override;
+    void visitBucket(VBucketPtr& vb) override;
 
     size_t getNumPurgedItems() const {
         return numPurgedItems;
index 6f225c5..28b1c45 100644 (file)
@@ -233,8 +233,10 @@ std::unique_ptr<DCPBackfill> EphemeralVBucket::createDCPBackfill(
         uint64_t startSeqno,
         uint64_t endSeqno) {
     /* create a memory backfill object */
+    EphemeralVBucketPtr evb =
+            std::static_pointer_cast<EphemeralVBucket>(shared_from_this());
     return std::make_unique<DCPBackfillMemory>(
-            this, stream, startSeqno, endSeqno);
+            evb, stream, startSeqno, endSeqno);
 }
 
 std::pair<ENGINE_ERROR_CODE, std::vector<UniqueItemPtr>>
index 103f669..d4d2586 100644 (file)
 #pragma once
 
 #include "config.h"
+#include "seqlist.h"
 #include "vbucket.h"
 
-/* Forward declarations */
-class SequenceList;
-
 class EphemeralVBucket : public VBucket {
 public:
     class CountVisitor;
@@ -215,3 +213,5 @@ private:
      */
     EPStats::Counter seqListPurgeCount;
 };
+
+using EphemeralVBucketPtr = std::shared_ptr<EphemeralVBucket>;
index 19c2531..8a0aa99 100644 (file)
@@ -67,18 +67,18 @@ BgFetcher *KVShard::getBgFetcher() {
 
 VBucketPtr KVShard::getBucket(uint16_t id) const {
     if (id < vbuckets.size()) {
-        return vbuckets[id];
+        return vbuckets[id].lock().get();
     } else {
         return NULL;
     }
 }
 
-void KVShard::setBucket(const VBucketPtr &vb) {
-    vbuckets[vb->getId()].reset(vb);
+void KVShard::setBucket(VBucketPtr vb) {
+    vbuckets[vb->getId()].lock().set(vb);
 }
 
 void KVShard::resetBucket(uint16_t id) {
-    vbuckets[id].reset();
+    vbuckets[id].lock().reset();
 }
 
 std::vector<VBucket::id_type> KVShard::getVBucketsSortedByState() {
@@ -86,9 +86,11 @@ std::vector<VBucket::id_type> KVShard::getVBucketsSortedByState() {
     for (int state = vbucket_state_active;
          state <= vbucket_state_dead;
          ++state) {
-        for (VBucketPtr b : vbuckets) {
-            if (b && b->getState() == state) {
-                rv.push_back(b->getId());
+        for (const auto& b : vbuckets) {
+            auto vb = b.lock();
+            auto vbPtr = vb.get();
+            if (vbPtr && vbPtr->getState() == state) {
+                rv.push_back(vbPtr->getId());
             }
         }
     }
@@ -97,9 +99,11 @@ std::vector<VBucket::id_type> KVShard::getVBucketsSortedByState() {
 
 std::vector<VBucket::id_type> KVShard::getVBuckets() {
     std::vector<VBucket::id_type> rv;
-    for (VBucketPtr b : vbuckets) {
-        if (b) {
-            rv.push_back(b->getId());
+    for (const auto& b : vbuckets) {
+        auto vb = b.lock();
+        auto vbPtr = vb.get();
+        if (vbPtr) {
+            rv.push_back(vbPtr->getId());
         }
     }
     return rv;
index 825f5e8..db95063 100644 (file)
@@ -78,7 +78,7 @@ public:
     BgFetcher *getBgFetcher();
 
     VBucketPtr getBucket(VBucket::id_type id) const;
-    void setBucket(const VBucketPtr &b);
+    void setBucket(VBucketPtr vb);
     void resetBucket(VBucket::id_type id);
 
     KVShard::id_type getId() const {
@@ -90,7 +90,91 @@ public:
 
 private:
     KVStoreConfig kvConfig;
-    std::vector<VBucketPtr> vbuckets;
+
+    /**
+     * VBMapElement comprises the VBucket smart pointer and a mutex.
+     * Access to the smart pointer must be performed through the ::Access object
+     * which will perform RAII locking of the mutex.
+     */
+    class VBMapElement {
+    public:
+        /**
+         * Access for const/non-const VBMapElement (using enable_if to hide
+         * const methods for non-const users and vice-versa)
+         *
+         * This class did have an -> operator for directly reading the VBucket*
+         * but MSVC could not cope with it, so now you must use:
+         *    access.get()-><Vbucket-method>
+         */
+        template <class T>
+        class Access {
+        public:
+            Access(std::mutex& m, T e) : lock(m), element(e) {
+            }
+
+            /**
+             * @return a const VBBucketPtr& (which may have no real pointer)
+             */
+            template <typename U = T>
+            typename std::enable_if<
+                    std::is_const<
+                            typename std::remove_reference<U>::type>::value,
+                    const VBucketPtr&>::type
+            get() const {
+                return element.vbPtr;
+            }
+
+            /**
+             * @return the VBBucketPtr (which may have no real pointer)
+             */
+            template <typename U = T>
+            typename std::enable_if<
+                    !std::is_const<
+                            typename std::remove_reference<U>::type>::value,
+                    VBucketPtr>::type
+            get() const {
+                return element.vbPtr;
+            }
+
+            /**
+             * @param set a new VBBucketPtr for the VB
+             */
+            template <typename U = T>
+            typename std::enable_if<!std::is_const<
+                    typename std::remove_reference<U>::type>::value>::type
+            set(VBucketPtr vb) {
+                element.vbPtr = vb;
+            }
+
+            /**
+             * @param reset VBBucketPtr for the VB
+             */
+            template <typename U = T>
+            typename std::enable_if<!std::is_const<
+                    typename std::remove_reference<U>::type>::value>::type
+            reset() {
+                element.vbPtr.reset();
+            }
+
+        private:
+            std::unique_lock<std::mutex> lock;
+            T& element;
+        };
+
+        Access<VBMapElement&> lock() {
+            return {mutex, *this};
+        }
+
+        Access<const VBMapElement&> lock() const {
+            return {mutex, *this};
+        }
+
+    private:
+        mutable std::mutex mutex;
+        VBucketPtr vbPtr;
+    };
+
+    std::vector<VBMapElement> vbuckets;
 
     std::unique_ptr<KVStore> rwStore;
     std::unique_ptr<KVStore> roStore;
index af9b310..8dba942 100644 (file)
@@ -190,7 +190,7 @@ class KVShard;
 /**
  * An individual vbucket.
  */
-class VBucket : public RCValue {
+class VBucket : public std::enable_shared_from_this<VBucket> {
 public:
 
     // Identifier for a vBucket
@@ -1568,4 +1568,4 @@ private:
     DISALLOW_COPY_AND_ASSIGN(VBucket);
 };
 
-using VBucketPtr = RCPtr<VBucket>;
+using VBucketPtr = std::shared_ptr<VBucket>;
index 28b265c..b4e4895 100644 (file)
@@ -47,15 +47,18 @@ VBucketPtr VBucketMap::getBucket(id_type id) const {
     }
 }
 
-ENGINE_ERROR_CODE VBucketMap::addBucket(const VBucketPtr &b) {
-    if (b->getId() < size) {
-        getShardByVbId(b->getId())->setBucket(b);
-        LOG(EXTENSION_LOG_INFO, "Mapped new vbucket %d in state %s",
-            b->getId(), VBucket::toString(b->getState()));
+ENGINE_ERROR_CODE VBucketMap::addBucket(VBucketPtr vb) {
+    if (vb->getId() < size) {
+        getShardByVbId(vb->getId())->setBucket(vb);
+        LOG(EXTENSION_LOG_INFO,
+            "Mapped new vbucket %d in state %s",
+            vb->getId(),
+            VBucket::toString(vb->getState()));
         return ENGINE_SUCCESS;
     }
     LOG(EXTENSION_LOG_WARNING,
-        "Cannot create vb %" PRIu16", max vbuckets is %" PRIu16, b->getId(),
+        "Cannot create vb %" PRIu16 ", max vbuckets is %" PRIu16,
+        vb->getId(),
         size);
     return ENGINE_ERANGE;
 }
@@ -115,14 +118,6 @@ VBucketMap::getActiveVBucketsSortedByChkMgrMem(void) const {
     return rv;
 }
 
-void VBucketMap::addBuckets(const std::vector<VBucket*> &newBuckets) {
-    std::vector<VBucket*>::const_iterator it;
-    for (it = newBuckets.begin(); it != newBuckets.end(); ++it) {
-        VBucketPtr v(*it);
-        addBucket(v);
-    }
-}
-
 KVShard* VBucketMap::getShardByVbId(id_type id) const {
     return shards[id % shards.size()].get();
 }
index 4e694ed..cc3b7aa 100644 (file)
@@ -56,9 +56,14 @@ public:
 
     VBucketMap(Configuration& config, KVBucket& store);
 
-    ENGINE_ERROR_CODE addBucket(const VBucketPtr &b);
+    /**
+     * Add the VBucket to the map - extending the lifetime of the object until
+     * it is removed from the map via removeBucket.
+     * @param vb shared pointer to the VBucket we are storing.
+     */
+    ENGINE_ERROR_CODE addBucket(VBucketPtr vb);
+
     void removeBucket(id_type id);
-    void addBuckets(const std::vector<VBucket*> &newBuckets);
     VBucketPtr getBucket(id_type id) const;
 
     // Returns the size of the map, i.e. the total number of VBuckets it can