MB-20247: Check callback return value for failure 37/68937/3
authorJim Walker <jim@couchbase.com>
Wed, 19 Oct 2016 16:04:18 +0000 (17:04 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 20 Oct 2016 20:09:07 +0000 (20:09 +0000)
The batchWarmupCallback applies many items to the vbucket yet
is not coded to handle failures, e.g. ENOMEM.

These errors are now checked for and the warmup stops when an error
is found.

Testing of this is difficult as it was always quite theoretical:

1. The real failure case here is very hard to hit, I think if you
warmup close to DGM whilst racing deletes/evict from the frontend
you could encounter this situation.

2. warmup has very little unit-testing that can be adapted.

However I've built an instrumented ep-engine that would force the
error condition after n callbacks and I've ran that version under
valgrind and ASAN.

Change-Id: I05d35e10e577a3b5c2c7d21807996ab7b8455cc1
Reviewed-on: http://review.couchbase.org/68937
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
src/ep.cc
src/warmup.cc

index 4cca8d5..023d383 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -3635,11 +3635,15 @@ bool EventuallyPersistentStore::maybeEnableTraffic()
 
     if (memoryUsed  >= stats.mem_low_wat) {
         LOG(EXTENSION_LOG_NOTICE,
-            "Total memory use reached to the low water mark, stop warmup");
+            "Total memory use reached to the low water mark, stop warmup"
+            ": memoryUsed (%f) >= low water mark (%" PRIu64 ")",
+            memoryUsed, uint64_t(stats.mem_low_wat.load()));
         return true;
     } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
         LOG(EXTENSION_LOG_NOTICE,
-                "Enough MB of data loaded to enable traffic");
+                "Enough MB of data loaded to enable traffic"
+                ": memoryUsed (%f) > (maxSize(%f) * warmupMemUsedCap(%f))",
+                 memoryUsed, maxSize, stats.warmupMemUsedCap.load());
         return true;
     } else if (eviction_policy == VALUE_ONLY &&
                stats.warmedUpValues >=
@@ -3647,7 +3651,10 @@ bool EventuallyPersistentStore::maybeEnableTraffic()
         // Let ep-engine think we're done with the warmup phase
         // (we should refactor this into "enableTraffic")
         LOG(EXTENSION_LOG_NOTICE,
-            "Enough number of items loaded to enable traffic");
+            "Enough number of items loaded to enable traffic (value eviction)"
+            ": warmedUpValues(%" PRIu64 ") >= (warmedUpKeys(%" PRIu64 ") * "
+            "warmupNumReadCap(%f))",  uint64_t(stats.warmedUpValues.load()),
+            uint64_t(stats.warmedUpKeys.load()), stats.warmupNumReadCap.load());
         return true;
     } else if (eviction_policy == FULL_EVICTION &&
                stats.warmedUpValues >=
@@ -3657,7 +3664,11 @@ bool EventuallyPersistentStore::maybeEnableTraffic()
         // of warmed up values, therefore for honoring the min_item threshold
         // in this scenario, we can consider warmup's estimated item count.
         LOG(EXTENSION_LOG_NOTICE,
-            "Enough number of items loaded to enable traffic");
+            "Enough number of items loaded to enable traffic (full eviction)"
+            ": warmedUpValues(%" PRIu64 ") >= (warmup est items(%" PRIu64 ") * "
+            "warmupNumReadCap(%f))",  uint64_t(stats.warmedUpValues.load()),
+            uint64_t(warmupTask->getEstimatedItemCount()),
+            stats.warmupNumReadCap.load());
         return true;
     }
     return false;
index ede15c1..1a66a40 100644 (file)
@@ -53,6 +53,7 @@ static bool batchWarmupCallback(uint16_t vbId,
     if (!c->epstore->maybeEnableTraffic()) {
         vb_bgfetch_queue_t items2fetch;
         for (auto& key : fetches) {
+            // Deleted below via a unique_ptr in the next loop
             VBucketBGFetchItem *fit = new VBucketBGFetchItem(NULL, false);
             vb_bgfetch_item_ctx_t& bg_itm_ctx = items2fetch[key];
             bg_itm_ctx.isMetaOnly = false;
@@ -61,23 +62,41 @@ static bool batchWarmupCallback(uint16_t vbId,
 
         c->epstore->getROUnderlying(vbId)->getMulti(vbId, items2fetch);
 
-        vb_bgfetch_queue_t::iterator items = items2fetch.begin();
-        for (; items != items2fetch.end(); items++) {
-           vb_bgfetch_item_ctx_t& bg_itm_ctx = (*items).second;
-           VBucketBGFetchItem * fetchedItem = bg_itm_ctx.bgfetched_list.back();
-           GetValue &val = fetchedItem->value;
-           if (val.getStatus() == ENGINE_SUCCESS) {
-                c->loaded++;
-                c->cb.callback(val);
-           } else {
-                LOG(EXTENSION_LOG_WARNING,
-                "Warmup failed to load data for vBucket = %d"
-                " key = %s error = %X\n",
-                vbId,
-                    (*items).first.c_str(), val.getStatus());
-                c->error++;
-          }
-          delete fetchedItem;
+        // applyItem controls the  mode this loop operates in.
+        // true we will attempt the callback (attempt a HashTable insert)
+        // false we don't attempt the callback
+        // in both cases the loop must delete the VBucketBGFetchItem we
+        // allocated above.
+        bool applyItem = true;
+        for (auto items : items2fetch) {
+            vb_bgfetch_item_ctx_t& bg_itm_ctx = items.second;
+            std::unique_ptr<VBucketBGFetchItem> fetchedItem(bg_itm_ctx.bgfetched_list.back());
+            if (applyItem) {
+                GetValue &val = fetchedItem->value;
+                if (val.getStatus() == ENGINE_SUCCESS) {
+                    // NB: callback will delete the GetValue's Item
+                    c->cb.callback(val);
+                } else {
+                    LOG(EXTENSION_LOG_WARNING,
+                    "Warmup failed to load data for vBucket = %d"
+                    " key = %s error = %X\n",
+                    vbId, items.first.c_str(), val.getStatus());
+                    c->error++;
+                }
+
+                if (c->cb.getStatus() == ENGINE_SUCCESS) {
+                    c->loaded++;
+                } else {
+                    // Failed to apply an Item, so fail the rest
+                    applyItem = false;
+                }
+            } else {
+                // Providing that the status is SUCCESS, delete the Item
+                if (fetchedItem->value.getStatus() == ENGINE_SUCCESS) {
+                    delete fetchedItem->value.getValue();
+                }
+                c->skipped++;
+            }
         }
 
         return true;
@@ -198,13 +217,14 @@ std::ostream& operator <<(std::ostream &out, const WarmupState &state)
     out << state.toString();
     return out;
 }
-
 void LoadStorageKVPairCallback::callback(GetValue &val) {
-    Item *i = val.getValue();
+    // This callback method is responsible for deleting the Item
+    std::unique_ptr<Item> i(val.getValue());
     bool stopLoading = false;
     if (i != NULL && !epstore.getWarmup()->isComplete()) {
         RCPtr<VBucket> vb = vbuckets.getBucket(i->getVBucketId());
         if (!vb) {
+            setStatus(ENGINE_NOT_MY_VBUCKET);
             return;
         }
         bool succeeded(false);
@@ -253,7 +273,6 @@ void LoadStorageKVPairCallback::callback(GetValue &val) {
             }
         } while (!succeeded && retry-- > 0);
 
-        delete i;
         val.setValue(NULL);
 
         if (maybeEnableTraffic) {
@@ -282,7 +301,6 @@ void LoadStorageKVPairCallback::callback(GetValue &val) {
         }
     } else {
         stopLoading = true;
-        delete i;
     }
 
     if (stopLoading) {