Merge remote-tracking branch 'couchbase/3.0.x' into sherlock
[ep-engine.git] / src / ep.cc
index 4c22f0d..9902709 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -502,18 +502,6 @@ void EventuallyPersistentStore::stopBgFetcher() {
     }
 }
 
-RCPtr<VBucket> EventuallyPersistentStore::getVBucket(uint16_t vbid,
-                                                vbucket_state_t wanted_state) {
-    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
-    vbucket_state_t found_state(vb ? vb->getState() : vbucket_state_dead);
-    if (found_state == wanted_state) {
-        return vb;
-    } else {
-        RCPtr<VBucket> rv;
-        return rv;
-    }
-}
-
 void
 EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
                                              time_t startTime,
@@ -585,7 +573,9 @@ StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
             if (vb->getState() != vbucket_state_active) {
                 return wantDeleted ? v : NULL;
             }
-            if (queueExpired) {
+            ReaderLockHolder(vb->getStateLock());
+            // queueDirty only allowed on active VB
+            if (queueExpired && vb->getState() == vbucket_state_active) {
                 incExpirationStat(vb, false);
                 vb->ht.unlocked_softDelete(v, 0, eviction_policy);
                 queueDirty(vb, v, NULL, NULL, false, true);
@@ -694,7 +684,15 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
                                                  uint8_t nru) {
 
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb || vb->getState() == vbucket_state_dead) {
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    // Obtain read-lock on VB state to ensure VB state changes are interlocked
+    // with this set
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
     } else if (vb->getState() == vbucket_state_replica && !force) {
@@ -781,7 +779,15 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
                                                  const void *cookie)
 {
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb || vb->getState() == vbucket_state_dead ||
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    // Obtain read-lock on VB state to ensure VB state changes are interlocked
+    // with this add
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead ||
         vb->getState() == vbucket_state_replica) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
@@ -843,7 +849,15 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
 ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
                                                      const void *cookie) {
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb || vb->getState() == vbucket_state_dead ||
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    // Obtain read-lock on VB state to ensure VB state changes are interlocked
+    // with this replace
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead ||
         vb->getState() == vbucket_state_replica) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
@@ -932,8 +946,15 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
                                                         ExtendedMetaData *emd) {
 
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb ||
-        vb->getState() == vbucket_state_dead ||
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    // Obtain read-lock on VB state to ensure VB state changes are interlocked
+    // with this add-tapbackfill
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead ||
         vb->getState() == vbucket_state_active) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
@@ -1660,6 +1681,7 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
                 if (gcb.val.getStatus() == ENGINE_SUCCESS) {
                     v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
                     cb_assert(v->isResident());
+                    ReaderLockHolder(vb->getStateLock());
                     if (vb->getState() == vbucket_state_active &&
                         v->getExptime() != gcb.val.getValue()->getExptime() &&
                         v->getCas() == gcb.val.getValue()->getCas()) {
@@ -1773,6 +1795,7 @@ void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
                 if (status == ENGINE_SUCCESS) {
                     v->unlocked_restoreValue(fetchedValue, vb->ht);
                     cb_assert(v->isResident());
+                    ReaderLockHolder(vb->getStateLock());
                     if (vb->getState() == vbucket_state_active &&
                         v->getExptime() != fetchedValue->getExptime() &&
                         v->getCas() == fetchedValue->getCas()) {
@@ -2062,7 +2085,13 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
                                                      bool isReplication)
 {
     RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
-    if (!vb || vb->getState() == vbucket_state_dead) {
+    if (!vb) {
+        ++stats.numNotMyVBuckets;
+        return ENGINE_NOT_MY_VBUCKET;
+    }
+
+    ReaderLockHolder(vb->getStateLock());
+    if (vb->getState() == vbucket_state_dead) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
     } else if (vb->getState() == vbucket_state_replica && !force) {
@@ -2230,9 +2259,12 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
                     ENGINE_SUCCESS, v->getBySeqno());
 
         if (exptime_mutated) {
-            // persist the item in the underlying storage for
-            // mutated exptime
-            queueDirty(vb, v, &lh, NULL);
+            ReaderLockHolder(vb->getStateLock());
+            if (vb->getState() == vbucket_state_active) {
+                // persist the item in the underlying storage for
+                // mutated exptime but only if VB is active.
+                queueDirty(vb, v, &lh, NULL);
+            }
         }
         return rv;
     } else {
@@ -2365,8 +2397,8 @@ bool EventuallyPersistentStore::getLocked(const std::string &key,
                                           rel_time_t currentTime,
                                           uint32_t lockTimeout,
                                           const void *cookie) {
-    RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
-    if (!vb) {
+    RCPtr<VBucket> vb = getVBucket(vbucket);
+    if (!vb || vb->getState() != vbucket_state_active) {
         ++stats.numNotMyVBuckets;
         GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
         cb.callback(rv);
@@ -2443,8 +2475,8 @@ EventuallyPersistentStore::unlockKey(const std::string &key,
                                      rel_time_t currentTime)
 {
 
-    RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
-    if (!vb) {
+    RCPtr<VBucket> vb = getVBucket(vbucket);
+    if (!vb || vb->getState() != vbucket_state_active) {
         ++stats.numNotMyVBuckets;
         return ENGINE_NOT_MY_VBUCKET;
     }