Refactor setWithMeta/delWithMeta in EventuallyPersistentEngine 55/76255/5
authorJim Walker <jim@couchbase.com>
Tue, 4 Apr 2017 12:50:06 +0000 (13:50 +0100)
committerDave Rigby <daver@couchbase.com>
Thu, 6 Apr 2017 15:02:48 +0000 (15:02 +0000)
The refactoring separates the new object (Item/ExtendedMetaData)
allocation from the packet decoding/validation layer.

Change-Id: Iddcbd8d60bd743f396631043765a4052ffc3a74e
Reviewed-on: http://review.couchbase.org/76255
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
src/ep_engine.cc
src/ep_engine.h

index d56e207..f8c2b13 100644 (file)
@@ -5133,16 +5133,17 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
     // so 27, 25 etc... are illegal
     if ((extlen != 24 && extlen != 26 && extlen != 28  && extlen != 30)
         || keylen == 0) {
-        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
+        return sendErrorResponse(response,
+                                 PROTOCOL_BINARY_RESPONSE_EINVAL,
+                                 0,
+                                 cookie);
     }
 
     if (isDegradedMode()) {
-        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_ETMPFAIL,
-                            0, cookie);
+        return sendErrorResponse(response,
+                                 PROTOCOL_BINARY_RESPONSE_ETMPFAIL,
+                                 0,
+                                 cookie);
     }
 
     uint8_t opcode = request->message.header.request.opcode;
@@ -5159,7 +5160,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
 
     bool skipConflictResolution = false;
     GenerateCas generateCas = GenerateCas::No;
-    std::unique_ptr<ExtendedMetaData> emd;
     int keyOffset = 0;
     protocol_binary_response_status error = PROTOCOL_BINARY_RESPONSE_SUCCESS;
     if ((error = decodeWithMetaOptions(request, generateCas,
@@ -5169,6 +5169,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
                             PROTOCOL_BINARY_RAW_BYTES, error, 0, cookie);
     }
 
+    cb::const_byte_buffer emd;
     if (extlen == 26 || extlen == 30) {
         uint16_t nmeta = 0;
         memcpy(&nmeta, key + keyOffset, sizeof(nmeta));
@@ -5177,20 +5178,8 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
         if (nmeta > 0) {
             // Correct the vallen
             vallen -= nmeta;
-            try {
-                emd.reset(new ExtendedMetaData(key + keylen + keyOffset + vallen,
-                                               nmeta));
-            } catch (const std::bad_alloc&) {
-                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_ENOMEM, 0, cookie);
-            }
-
-            if (emd->getStatus() == ENGINE_EINVAL) {
-                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
-            }
+            emd = cb::const_byte_buffer{key + keylen + keyOffset + vallen,
+                                        nmeta};
         }
     }
 
@@ -5198,28 +5187,13 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
         LOG(EXTENSION_LOG_WARNING,
             "Item value size %ld for setWithMeta is bigger "
             "than the max size %ld allowed!!!\n", vallen, maxItemSize);
-        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_E2BIG, 0, cookie);
+        return sendErrorResponse(response,
+                                 PROTOCOL_BINARY_RESPONSE_E2BIG,
+                                 0,
+                                 cookie);
     }
 
-    uint8_t* dta = key + keyOffset + keylen;
 
-    datatype = checkForDatatypeJson(
-            cookie, datatype, {reinterpret_cast<const char*>(dta), vallen});
-
-    uint8_t ext_meta[1];
-    uint8_t ext_len = EXT_META_LEN;
-    *(ext_meta) = datatype;
-    Item *itm = new Item(DocKey(key + keyOffset, keylen, docNamespace),
-                         flags, expiration, dta, vallen,
-                         ext_meta, ext_len, cas, -1, vbucket);
-
-    if (itm == NULL) {
-        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_ENOMEM, 0, cookie);
-    }
 
     void *startTimeC = getEngineSpecific(cookie);
     hrtime_t startTime;
@@ -5229,35 +5203,45 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
         startTime = gethrtime();
     }
 
-    itm->setRevSeqno(seqno);
-
     bool allowExisting = (opcode == PROTOCOL_BINARY_CMD_SET_WITH_META ||
                           opcode == PROTOCOL_BINARY_CMD_SETQ_WITH_META);
 
-    uint8_t meta[16];
-    uint64_t by_seqno = 0;
-    uint64_t vb_uuid = 0;
-
-    ENGINE_ERROR_CODE ret = kvBucket->setWithMeta(*itm,
-                                                  ntohll(request->
-                                                    message.header.request.cas),
-                                                  &by_seqno, cookie,
-                                                  skipConflictResolution,
-                                                  allowExisting,
-                                                  GenerateBySeqno::Yes,
-                                                  generateCas,
-                                                  emd.get(), false);
-
-
+    uint64_t bySeqno = 0;
+    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+    uint64_t commandCas = ntohll(request->message.header.request.cas);
+    try {
+        uint8_t* value = key + keyOffset + keylen;
+        ret = setWithMeta(vbucket,
+                          DocKey(key + keyOffset, keylen, docNamespace),
+                          {value, vallen},
+                          {cas, seqno, flags, time_t(expiration)},
+                          false /*isDeleted*/,
+                          datatype,
+                          commandCas,
+                          &bySeqno,
+                          cookie,
+                          skipConflictResolution,
+                          allowExisting,
+                          GenerateBySeqno::Yes,
+                          generateCas,
+                          emd);
+    } catch (const std::bad_alloc&) {
+        return sendErrorResponse(response,
+                                 PROTOCOL_BINARY_RESPONSE_ENOMEM,
+                                 0,
+                                 cookie);
+    }
+
+    cas = 0;
     if (ret == ENGINE_SUCCESS) {
         ++stats.numOpsSetMeta;
         hrtime_t endTime(gethrtime());
         hrtime_t elapsed = (endTime - startTime) / 1000;
         stats.setWithMetaHisto.add(elapsed);
+        cas = commandCas;
     } else if (ret == ENGINE_ENOMEM) {
         ret = memoryCondition();
     } else if (ret == ENGINE_EWOULDBLOCK) {
-        delete itm;
         ++stats.numOpsGetMetaOnSetWithMeta;
         if (!startTimeC) {
             startTimeC = cb_malloc(sizeof(hrtime_t));
@@ -5269,18 +5253,11 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
 
     auto rc = serverApi->cookie->engine_error2mcbp(cookie, ret);
 
-    if (ret == ENGINE_SUCCESS) {
-        cas = itm->getCas();
-    } else {
-        cas = 0;
-    }
-
     if (startTimeC) {
         cb_free(startTimeC);
-        startTimeC = NULL;
+        startTimeC = nullptr;
         storeEngineSpecific(cookie, startTimeC);
     }
-    delete itm;
 
     if ((opcode == PROTOCOL_BINARY_CMD_SETQ_WITH_META || opcode == PROTOCOL_BINARY_CMD_ADDQ_WITH_META) &&
         rc == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
@@ -5292,17 +5269,71 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(const void* cookie,
     }
 
     if (ret == ENGINE_SUCCESS && isMutationExtrasSupported(cookie)) {
-        RCPtr<VBucket> vb = kvBucket->getVBucket(vbucket);
-        vb_uuid = htonll(vb->failovers->getLatestUUID());
-        by_seqno = htonll(by_seqno);
-        memcpy(meta, &vb_uuid, sizeof(vb_uuid));
-        memcpy(meta + sizeof(vb_uuid), &by_seqno, sizeof(by_seqno));
-        return sendResponse(response, NULL, 0, (const void *)meta, sizeof(meta),
-                            NULL, 0, PROTOCOL_BINARY_RAW_BYTES, rc, cas, cookie);
+        return sendMutationExtras(response, vbucket, bySeqno, rc, cas, cookie);
+    }
+    return sendErrorResponse(response, rc, cas, cookie);
+}
+
+ENGINE_ERROR_CODE EventuallyPersistentEngine::setWithMeta(
+        uint16_t vbucket,
+        DocKey key,
+        cb::const_byte_buffer value,
+        ItemMetaData itemMeta,
+        bool isDeleted,
+        protocol_binary_datatype_t datatype,
+        uint64_t& cas,
+        uint64_t* seqno,
+        const void* cookie,
+        bool force,
+        bool allowExisting,
+        GenerateBySeqno genBySeqno,
+        GenerateCas genCas,
+        cb::const_byte_buffer emd) {
+    std::unique_ptr<ExtendedMetaData> extendedMetaData;
+    if (emd.data()) {
+        extendedMetaData =
+                std::make_unique<ExtendedMetaData>(emd.data(), emd.size());
+        if (extendedMetaData->getStatus() == ENGINE_EINVAL) {
+            return ENGINE_EINVAL;
+        }
     }
 
-    return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
-                        rc, cas, cookie);
+    datatype = checkForDatatypeJson(
+            cookie,
+            datatype,
+            {reinterpret_cast<const char*>(value.data()), value.size()});
+    uint8_t extMeta[1] = {uint8_t(datatype)};
+    auto item = std::make_unique<Item>(key,
+                                       itemMeta.flags,
+                                       itemMeta.exptime,
+                                       value.data(),
+                                       value.size(),
+                                       extMeta,
+                                       EXT_META_LEN,
+                                       itemMeta.cas,
+                                       -1,
+                                       vbucket);
+    item->setRevSeqno(itemMeta.revSeqno);
+    if (isDeleted) {
+        item->setDeleted();
+    }
+    auto ret = kvBucket->setWithMeta(*item,
+                                     cas,
+                                     seqno,
+                                     cookie,
+                                     force,
+                                     allowExisting,
+                                     genBySeqno,
+                                     genCas,
+                                     extendedMetaData.get(),
+                                     false /*isReplication*/);
+
+    if (ret == ENGINE_SUCCESS) {
+        cas = item->getCas();
+    } else {
+        cas = 0;
+    }
+    return ret;
 }
 
 ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
@@ -5344,7 +5375,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
 
     bool skipConflictResolution = false;
     GenerateCas generateCas = GenerateCas::No;
-    std::unique_ptr<ExtendedMetaData> emd;
     int keyOffset = 0;
     protocol_binary_response_status error = PROTOCOL_BINARY_RESPONSE_SUCCESS;
     if ((error = decodeWithMetaOptions(request, generateCas,
@@ -5354,6 +5384,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
                             PROTOCOL_BINARY_RAW_BYTES, error, 0, cookie);
     }
 
+    cb::const_byte_buffer emd;
     if (extlen == 26 || extlen == 30) {
         uint16_t nmeta = 0;
         memcpy(&nmeta, request->bytes + sizeof(request->bytes),
@@ -5361,47 +5392,33 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
         keyOffset += 2; // 2 bytes for nmeta
         nmeta = ntohs(nmeta);
         if (nmeta > 0) {
-            try {
-                emd.reset(new ExtendedMetaData(request->bytes +
-                                               sizeof(request->bytes) +
-                                               nkey +
-                                               keyOffset, nmeta));
-            } catch (const std::bad_alloc&) {
-                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_ENOMEM, 0, cookie);
-            }
-
-            if (emd->getStatus() == ENGINE_EINVAL) {
-                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
-            }
+            emd = cb::const_byte_buffer(
+                    request->bytes + sizeof(request->bytes) + nkey + keyOffset,
+                    nmeta);
         }
     }
 
     const uint8_t *keyPtr = request->bytes + keyOffset + sizeof(request->bytes);
     DocKey key(keyPtr, nkey, docNamespace);
-
-    ItemMetaData itm_meta(metacas, seqno, flags, expiration);
-
-    uint8_t meta[16];
-    uint64_t by_seqno = 0;
-    uint64_t vb_uuid = 0;
-
-    ENGINE_ERROR_CODE ret = kvBucket->deleteWithMeta(key,
-                                                     cas,
-                                                     &by_seqno,
-                                                     vbucket,
-                                                     cookie,
-                                                     skipConflictResolution,
-                                                     itm_meta,
-                                                     false,
-                                                     GenerateBySeqno::Yes,
-                                                     generateCas,
-                                                     0,
-                                                     emd.get(),
-                                                     false);
+    uint64_t bySeqno = 0;
+    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
+    try {
+        ret = deleteWithMeta(vbucket,
+                             key,
+                             {metacas, seqno, flags, time_t(expiration)},
+                             cas,
+                             &bySeqno,
+                             cookie,
+                             skipConflictResolution,
+                             GenerateBySeqno::Yes,
+                             generateCas,
+                             emd);
+    } catch (const std::bad_alloc&) {
+        return sendErrorResponse(response,
+                                 PROTOCOL_BINARY_RESPONSE_ENOMEM,
+                                 0,
+                                 cookie);
+    }
 
     if (ret == ENGINE_SUCCESS) {
         stats.numOpsDelMeta++;
@@ -5423,17 +5440,45 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
     }
 
     if (ret == ENGINE_SUCCESS && isMutationExtrasSupported(cookie)) {
-        RCPtr<VBucket> vb = kvBucket->getVBucket(vbucket);
-        vb_uuid = htonll(vb->failovers->getLatestUUID());
-        by_seqno = htonll(by_seqno);
-        memcpy(meta, &vb_uuid, 8);
-        memcpy(meta + 8, &by_seqno, 8);
-        return sendResponse(response, NULL, 0, (const void *)meta, sizeof(meta),
-                            NULL, 0, PROTOCOL_BINARY_RAW_BYTES, rc, cas, cookie);
+        return sendMutationExtras(response, vbucket, bySeqno, rc, cas, cookie);
     }
 
-    return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                        PROTOCOL_BINARY_RAW_BYTES, rc, cas, cookie);
+    return sendErrorResponse(response, rc, cas, cookie);
+}
+
+ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
+        uint16_t vbucket,
+        DocKey key,
+        ItemMetaData itemMeta,
+        uint64_t& cas,
+        uint64_t* seqno,
+        const void* cookie,
+        bool force,
+        GenerateBySeqno genBySeqno,
+        GenerateCas genCas,
+        cb::const_byte_buffer emd) {
+    std::unique_ptr<ExtendedMetaData> extendedMetaData;
+    if (emd.data()) {
+        extendedMetaData =
+                std::make_unique<ExtendedMetaData>(emd.data(), emd.size());
+        if (extendedMetaData->getStatus() == ENGINE_EINVAL) {
+            return ENGINE_EINVAL;
+        }
+    }
+
+    return kvBucket->deleteWithMeta(key,
+                                    cas,
+                                    seqno,
+                                    vbucket,
+                                    cookie,
+                                    force,
+                                    itemMeta,
+                                    false /*allowExisting*/,
+                                    genBySeqno,
+                                    genCas,
+                                    0 /*bySeqno*/,
+                                    extendedMetaData.get(),
+                                    false /*isReplication*/);
 }
 
 ENGINE_ERROR_CODE
@@ -6210,6 +6255,59 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::sendNotSupportedResponse(
                         cookie);
 }
 
+/**
+ * Call the response callback and return the appropriate value so that
+ * the core knows what to do..
+ */
+ENGINE_ERROR_CODE EventuallyPersistentEngine::sendErrorResponse(
+        ADD_RESPONSE response,
+        protocol_binary_response_status status,
+        uint64_t cas,
+        const void* cookie) {
+    // no body/ext data for the error
+    return sendResponse(response,
+                        nullptr,
+                        0,
+                        nullptr,
+                        0,
+                        nullptr,
+                        0,
+                        0,
+                        status,
+                        cas,
+                        cookie);
+}
+
+ENGINE_ERROR_CODE EventuallyPersistentEngine::sendMutationExtras(
+        ADD_RESPONSE response,
+        uint16_t vbucket,
+        uint64_t bySeqno,
+        protocol_binary_response_status status,
+        uint64_t cas,
+        const void* cookie) {
+    RCPtr<VBucket> vb = kvBucket->getVBucket(vbucket);
+    if (!vb) {
+        return sendErrorResponse(
+                response, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, cas, cookie);
+    }
+    const uint64_t uuid = htonll(vb->failovers->getLatestUUID());
+    bySeqno = htonll(bySeqno);
+    uint8_t meta[16];
+    memcpy(meta, &uuid, sizeof(uuid));
+    memcpy(meta + sizeof(uuid), &bySeqno, sizeof(bySeqno));
+    return sendResponse(response,
+                        nullptr,
+                        0,
+                        (const void*)meta,
+                        sizeof(meta),
+                        nullptr,
+                        0,
+                        PROTOCOL_BINARY_RAW_BYTES,
+                        status,
+                        cas,
+                        cookie);
+}
+
 std::unique_ptr<KVBucket> EventuallyPersistentEngine::makeBucket(
         Configuration& config) {
     const auto bucketType = config.getBucketType();
index fbcac07..7efde97 100644 (file)
@@ -798,6 +798,40 @@ protected:
     ENGINE_ERROR_CODE sendNotSupportedResponse(ADD_RESPONSE response,
                                                const void* cookie);
 
+    /**
+     * Sends error response, using the specified error and response callback
+     * to the specified connection via it's cookie.
+     *
+     * @param response callback func to send the response
+     * @param status error status to send
+     * @param cas a cas value to send
+     * @param cookie conn cookie
+     *
+     * @return status of sending response
+     */
+    ENGINE_ERROR_CODE sendErrorResponse(ADD_RESPONSE response,
+                                        protocol_binary_response_status status,
+                                        uint64_t cas,
+                                        const void* cookie);
+
+    /**
+     * Sends a response that includes the mutation extras, the VB uuid and
+     * seqno of the mutation.
+     *
+     * @param response callback func to send the response
+     * @param vbucket vbucket that was mutated
+     * @param bySeqno the seqno to send
+     * @param status a mcbp status code
+     * @param cas cas assigned to the mutation
+     * @param cookie conn cookie
+     * @returns NMVB if VB can't be located, or the ADD_RESPONSE return code.
+     */
+    ENGINE_ERROR_CODE sendMutationExtras(ADD_RESPONSE response,
+                                         uint16_t vbucket,
+                                         uint64_t bySeqno,
+                                         protocol_binary_response_status status,
+                                         uint64_t cas,
+                                         const void* cookie);
 
     /**
      * Factory method for constructing the correct bucket type given the
@@ -822,6 +856,66 @@ protected:
             protocol_binary_datatype_t datatype,
             cb::const_char_buffer body);
 
+    /**
+     * Process the set_with_meta with the given buffers/values.
+     *
+     * @param vbucket VB to mutate
+     * @param key DocKey initialised with key data
+     * @param value buffer for the mutation's value
+     * @param itemMeta mutation's cas/revseq/flags/expiration
+     * @param isDeleted the Item is deleted (with value)
+     * @param datatype datatype of the mutation
+     * @param cas [in,out] CAS for the command (updated with new CAS)
+     * @param seqno [out] optional - returns the seqno allocated to the mutation
+     * @param cookie connection's cookie
+     * @param force Should the set skip conflict resolution?
+     * @param allowExisting true if the set can overwrite existing key
+     * @param genBySeqno generate a new seqno? (yes/no)
+     * @param genCas generate a new CAS? (yes/no)
+     * @param emd buffer referencing ExtendedMetaData
+     * @returns state of the operation as an ENGINE_ERROR_CODE
+     */
+    ENGINE_ERROR_CODE setWithMeta(uint16_t vbucket,
+                                  DocKey key,
+                                  cb::const_byte_buffer value,
+                                  ItemMetaData itemMeta,
+                                  bool isDeleted,
+                                  protocol_binary_datatype_t datatype,
+                                  uint64_t& cas,
+                                  uint64_t* seqno,
+                                  const void* cookie,
+                                  bool force,
+                                  bool allowExisting,
+                                  GenerateBySeqno genBySeqno,
+                                  GenerateCas genCas,
+                                  cb::const_byte_buffer emd);
+
+    /**
+     * Process the del_with_meta with the given buffers/values.
+     *
+     * @param vbucket VB to mutate
+     * @param key DocKey initialised with key data
+     * @param itemMeta mutation's cas/revseq/flags/expiration
+     * @param cas [in,out] CAS for the command (updated with new CAS)
+     * @param seqno [out] optional - returns the seqno allocated to the mutation
+     * @param cookie connection's cookie
+     * @param force Should the set skip conflict resolution?
+     * @param genBySeqno generate a new seqno? (yes/no)
+     * @param genCas generate a new CAS? (yes/no)
+     * @param emd buffer referencing ExtendedMetaData
+     * @returns state of the operation as an ENGINE_ERROR_CODE
+     */
+    ENGINE_ERROR_CODE deleteWithMeta(uint16_t vbucket,
+                                     DocKey key,
+                                     ItemMetaData itemMeta,
+                                     uint64_t& cas,
+                                     uint64_t* seqno,
+                                     const void* cookie,
+                                     bool force,
+                                     GenerateBySeqno genBySeqno,
+                                     GenerateCas genCas,
+                                     cb::const_byte_buffer emd);
+
     SERVER_HANDLE_V1 *serverApi;
     std::unique_ptr<KVBucket> kvBucket;
     WorkLoadPolicy *workload;