[BP] MB-16366: Obtain vbstate readlock in numerous operations 65/62965/4
authorJim Walker <jim@couchbase.com>
Fri, 9 Oct 2015 14:14:28 +0000 (15:14 +0100)
committerChiyoung Seo <chiyoung@couchbase.com>
Sat, 23 Apr 2016 01:39:32 +0000 (01:39 +0000)
Any KV update operations grab the lock early and test that VB state
is active, they keep the lock until complete, this certainly protects
queueDirty from colliding with a VB state change and also any other
paths we're unaware of.

The GET operations only use the read lock if the GET has triggered a
expiry/queueDirty.

A couple of other locations that trigger queueDirty are also interlocked
with VB state changes.

(Already Reviewed-on: http://review.couchbase.org/55868)

Change-Id: Icaee69520da230a9fdde6eb85365a7ddae790fd6
Reviewed-on: http://review.couchbase.org/62965
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Manu Dhundi <manu@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
src/ep.cc
src/ep.h

index 4bcd493..0e0895c 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -466,18 +466,6 @@ void EventuallyPersistentStore::stopBgFetcher() {
     }
 }
 
-RCPtr<VBucket> EventuallyPersistentStore::getVBucket(uint16_t vbid,
-                                                vbucket_state_t wanted_state) {
-    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
-    vbucket_state_t found_state(vb ? vb->getState() : vbucket_state_dead);
-    if (found_state == wanted_state) {
-        return vb;
-    } else {
-        RCPtr<VBucket> rv;
-        return rv;
-    }
-}
-
 void
 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
                                              time_t startTime,
@@ -546,7 +534,9 @@ StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
             if (vb->getState() != vbucket_state_active) {
                 return wantDeleted ? v : NULL;
             }
-            if (queueExpired) {
+            ReaderLockHolder(vb->getStateLock());
+            // queueDirty only allowed on active VB
+            if (queueExpired && vb->getState() == vbucket_state_active) {
                 incExpirationStat(vb, false);
                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
                 queueDirty(vb, v, NULL, false, true);
@@ -635,7 +625,15 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
                                                  uint8_t nru) {
 
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb || vb->getState() == vbucket_state_dead) {
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    // Obtain read-lock on VB state to ensure VB state changes are interlocked
+    // with this set
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
     } else if (vb->getState() == vbucket_state_replica && !force) {
@@ -706,7 +704,15 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
                                                  const void *cookie)
 {
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb || vb->getState() == vbucket_state_dead ||
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    // Obtain read-lock on VB state to ensure VB state changes are interlocked
+    // with this add
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead ||
         vb->getState() == vbucket_state_replica) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
@@ -751,7 +757,15 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
                                                      const void *cookie) {
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb || vb->getState() == vbucket_state_dead ||
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    // Obtain read-lock on VB state to ensure VB state changes are interlocked
+    // with this replace
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead ||
         vb->getState() == vbucket_state_replica) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
@@ -827,8 +841,15 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
                                                                 bool genBySeqno) {
 
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb ||
-        vb->getState() == vbucket_state_dead ||
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    // Obtain read-lock on VB state to ensure VB state changes are interlocked
+    // with this add-tapbackfill
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead ||
         vb->getState() == vbucket_state_active) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
@@ -1452,6 +1473,7 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
                     cb_assert(v->isResident());
+                    ReaderLockHolder(vb->getStateLock());
                     if (vb->getState() == vbucket_state_active &&
                         v->getExptime() != gcb.val.getValue()->getExptime() &&
                         v->getCas() == gcb.val.getValue()->getCas()) {
@@ -1557,6 +1579,7 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
                 if (status == ENGINE_SUCCESS) {
                     v->unlocked_restoreValue(fetchedValue, vb->ht);
                     cb_assert(v->isResident());
+                    ReaderLockHolder(vb->getStateLock());
                     if (vb->getState() == vbucket_state_active &&
                         v->getExptime() != fetchedValue->getExptime() &&
                         v->getCas() == fetchedValue->getCas()) {
@@ -1811,7 +1834,13 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
                                                          bool isReplication)
 {
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb || vb->getState() == vbucket_state_dead) {
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
     } else if (vb->getState() == vbucket_state_replica && !force) {
@@ -1938,9 +1967,12 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
                     ENGINE_SUCCESS, v->getBySeqno());
 
         if (exptime_mutated) {
-            // persist the itme in the underlying storage for
-            // mutated exptime
-            queueDirty(vb, v, &lh);
+            ReaderLockHolder(vb->getStateLock());
+            if (vb->getState() == vbucket_state_active) {
+                // persist the item in the underlying storage for
+                // mutated exptime but only if VB is active.
+                queueDirty(vb, v, &lh);
+            }
         }
         return rv;
     } else {
@@ -2064,8 +2096,8 @@ bool EventuallyPersistentStore::getLocked(const std::string &key,
                                           rel_time_t currentTime,
                                           uint32_t lockTimeout,
                                           const void *cookie) {
-    RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
-    if (!vb) {
+    RCPtr<VBucket> vb = getVBucket(vbucket);
+    if (!vb || vb->getState() != vbucket_state_active) {
         ++stats.numNotMyVBuckets;
         GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
         cb.callback(rv);
@@ -2133,8 +2165,8 @@ EventuallyPersistentStore::unlockKey(const std::string &key,
                                      rel_time_t currentTime)
 {
 
-    RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
-    if (!vb) {
+    RCPtr<VBucket> vb = getVBucket(vbucket);
+    if (!vb || vb->getState() != vbucket_state_active) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
     }
index fcac2e7..1fa757b 100644 (file)
--- a/src/ep.h
+++ b/src/ep.h
@@ -753,8 +753,6 @@ private:
                             const void* cookie,
                             double delay = 0);
 
-    RCPtr<VBucket> getVBucket(uint16_t vbid, vbucket_state_t wanted_state);
-
     /* Queue an item for persistence and replication
      *
      * The caller of this function must hold the lock of the hash table