MB-16181: Add collections checks to more commands 90/78390/3
authorJim Walker <jim@couchbase.com>
Mon, 22 May 2017 09:30:20 +0000 (10:30 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 24 May 2017 09:29:36 +0000 (09:29 +0000)
Currently only get/set have collections validation in-place, this
needs extending to cover all "commands-paths" where a client is
operating on a key.

Change-Id: I976533788e3365fb4eb789bce6132812a4339e4c
Reviewed-on: http://review.couchbase.org/78390
Reviewed-by: Daniel Owen <owend@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
src/kv_bucket.cc

index 6acb324..f5975ca 100644 (file)
@@ -678,7 +678,14 @@ ENGINE_ERROR_CODE KVBucket::add(Item &itm, const void *cookie)
         return ENGINE_NOT_STORED;
     }
 
-    return vb->add(itm, cookie, engine, bgFetchDelay);
+    { // collections read-lock scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(itm.getKey())) {
+            return ENGINE_UNKNOWN_COLLECTION;
+        } // now hold collections read access for the duration of the set
+
+        return vb->add(itm, cookie, engine, bgFetchDelay);
+    }
 }
 
 ENGINE_ERROR_CODE KVBucket::replace(Item &itm, const void *cookie) {
@@ -701,7 +708,14 @@ ENGINE_ERROR_CODE KVBucket::replace(Item &itm, const void *cookie) {
         }
     }
 
-    return vb->replace(itm, cookie, engine, bgFetchDelay);
+    { // collections read-lock scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(itm.getKey())) {
+            return ENGINE_UNKNOWN_COLLECTION;
+        } // now hold collections read access for the duration of the set
+
+        return vb->replace(itm, cookie, engine, bgFetchDelay);
+    }
 }
 
 ENGINE_ERROR_CODE KVBucket::addBackfillItem(Item& itm,
@@ -1501,9 +1515,15 @@ ENGINE_ERROR_CODE KVBucket::getMetaData(const DocKey& key,
         return ENGINE_NOT_MY_VBUCKET;
     }
 
-    return vb->getMetaData(
-            key, cookie, engine, bgFetchDelay, metadata,
-            deleted, datatype);
+    { // collections read scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(key)) {
+            return ENGINE_UNKNOWN_COLLECTION;
+        }
+
+        return vb->getMetaData(
+                key, cookie, engine, bgFetchDelay, metadata, deleted, datatype);
+    }
 }
 
 ENGINE_ERROR_CODE KVBucket::setWithMeta(Item &itm,
@@ -1545,17 +1565,24 @@ ENGINE_ERROR_CODE KVBucket::setWithMeta(Item &itm,
         return ENGINE_KEY_EEXISTS;
     }
 
-    return vb->setWithMeta(itm,
-                           cas,
-                           seqno,
-                           cookie,
-                           engine,
-                           bgFetchDelay,
-                           force,
-                           allowExisting,
-                           genBySeqno,
-                           genCas,
-                           isReplication);
+    { // collections read scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(itm.getKey())) {
+            return ENGINE_UNKNOWN_COLLECTION;
+        }
+
+        return vb->setWithMeta(itm,
+                               cas,
+                               seqno,
+                               cookie,
+                               engine,
+                               bgFetchDelay,
+                               force,
+                               allowExisting,
+                               genBySeqno,
+                               genCas,
+                               isReplication);
+    }
 }
 
 GetValue KVBucket::getAndUpdateTtl(const DocKey& key, uint16_t vbucket,
@@ -1580,7 +1607,14 @@ GetValue KVBucket::getAndUpdateTtl(const DocKey& key, uint16_t vbucket,
         }
     }
 
-    return vb->getAndUpdateTtl(key, cookie, engine, bgFetchDelay, exptime);
+    { // collections read scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(key)) {
+            return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION);
+        }
+
+        return vb->getAndUpdateTtl(key, cookie, engine, bgFetchDelay, exptime);
+    }
 }
 
 GetValue KVBucket::getLocked(const DocKey& key, uint16_t vbucket,
@@ -1592,8 +1626,15 @@ GetValue KVBucket::getLocked(const DocKey& key, uint16_t vbucket,
         return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
     }
 
-    return vb->getLocked(
-            key, currentTime, lockTimeout, cookie, engine, bgFetchDelay);
+    { // collections read scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(key)) {
+            return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION);
+        }
+
+        return vb->getLocked(
+                key, currentTime, lockTimeout, cookie, engine, bgFetchDelay);
+    }
 }
 
 ENGINE_ERROR_CODE KVBucket::unlockKey(const DocKey& key,
@@ -1608,6 +1649,11 @@ ENGINE_ERROR_CODE KVBucket::unlockKey(const DocKey& key,
         return ENGINE_NOT_MY_VBUCKET;
     }
 
+    auto collectionsRHandle = vb->lockCollections();
+    if (!collectionsRHandle.doesKeyContainValidCollection(key)) {
+        return ENGINE_UNKNOWN_COLLECTION;
+    }
+
     auto hbl = vb->ht.getLockedBucket(key);
     StoredValue* v = vb->fetchValidValue(hbl,
                                          key,
@@ -1652,8 +1698,15 @@ ENGINE_ERROR_CODE KVBucket::getKeyStats(const DocKey& key,
         return ENGINE_NOT_MY_VBUCKET;
     }
 
-    return vb->getKeyStats(
-            key, cookie, engine, bgFetchDelay, kstats, wantsDeleted);
+    { // collections read scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(key)) {
+            return ENGINE_UNKNOWN_COLLECTION;
+        }
+
+        return vb->getKeyStats(
+                key, cookie, engine, bgFetchDelay, kstats, wantsDeleted);
+}
 }
 
 std::string KVBucket::validateKey(const DocKey& key, uint16_t vbucket,
@@ -1706,14 +1759,15 @@ ENGINE_ERROR_CODE KVBucket::deleteItem(const DocKey& key,
                 ", becuase takeover is lagging", vb->getId());
         return ENGINE_TMPFAIL;
     }
+    { // collections read scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(key)) {
+            return ENGINE_UNKNOWN_COLLECTION;
+        }
 
-    return vb->deleteItem(key,
-                          cas,
-                          cookie,
-                          engine,
-                          bgFetchDelay,
-                          itemMeta,
-                          mutInfo);
+        return vb->deleteItem(
+                key, cas, cookie, engine, bgFetchDelay, itemMeta, mutInfo);
+    }
 }
 
 ENGINE_ERROR_CODE KVBucket::deleteWithMeta(const DocKey& key,
@@ -1758,19 +1812,26 @@ ENGINE_ERROR_CODE KVBucket::deleteWithMeta(const DocKey& key,
         return ENGINE_KEY_EEXISTS;
     }
 
-    return vb->deleteWithMeta(key,
-                              cas,
-                              seqno,
-                              cookie,
-                              engine,
-                              bgFetchDelay,
-                              force,
-                              itemMeta,
-                              backfill,
-                              genBySeqno,
-                              generateCas,
-                              bySeqno,
-                              isReplication);
+    { // collections read scope
+        auto collectionsRHandle = vb->lockCollections();
+        if (!collectionsRHandle.doesKeyContainValidCollection(key)) {
+            return ENGINE_UNKNOWN_COLLECTION;
+        }
+
+        return vb->deleteWithMeta(key,
+                                  cas,
+                                  seqno,
+                                  cookie,
+                                  engine,
+                                  bgFetchDelay,
+                                  force,
+                                  itemMeta,
+                                  backfill,
+                                  genBySeqno,
+                                  generateCas,
+                                  bySeqno,
+                                  isReplication);
+    }
 }
 
 void KVBucket::reset() {