MB-21143: Remove adjusted-time/drift and associated code 99/67999/10
authorJim Walker <jim@couchbase.com>
Mon, 26 Sep 2016 13:36:01 +0000 (14:36 +0100)
committerDave Rigby <daver@couchbase.com>
Wed, 12 Oct 2016 07:42:21 +0000 (07:42 +0000)
As part of simplfying the supported LWW code, remove the
adjusted-time API and associated code.

Change-Id: I4d1cb092d4fce3155d1cd1e0134333655bcb3a61
Reviewed-on: http://review.couchbase.org/67999
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
22 files changed:
configuration.json
docs/engine-params.org
src/couch-kvstore/couch-kvstore.cc
src/dcp/stream.cc
src/dcp/stream.h
src/ep.cc
src/ep_engine.cc
src/ep_engine.h
src/ext_meta_parser.cc
src/ext_meta_parser.h
src/forest-kvstore/forest-kvstore.cc
src/kvstore.cc
src/kvstore.h
src/vbucket.cc
src/vbucket.h
src/warmup.cc
tests/ep_test_apis.cc
tests/ep_test_apis.h
tests/ep_testsuite.cc
tests/ep_testsuite_dcp.cc
tests/ep_testsuite_xdcr.cc
tests/module_tests/kvstore_test.cc

index 41fa1d4..9702e9e 100644 (file)
         },
         "time_synchronization": {
             "default": "disabled",
-            "descr": "Time synchronization setting for XDCR Last Write Wins (LWW) conflict resolution",
+            "descr": "No longer supported. This config parameter has no effect.",
             "type": "std::string",
             "validator": {
                 "enum": [
index 8fa9d60..d0768d9 100644 (file)
@@ -133,6 +133,3 @@ memcached like this:
 |                                |        | resolution to use                          |
 | item_eviction_policy           | string | Item eviction policy used by the item      |
 |                                |        | pager (value_only or full_eviction)        |
-| time_synchronization           | string | Time synchronization setting for the bucket|
-|                                |        | (disabled, enabled_without_drift,          |
-|                                |        |  enabled_with_drift)                       |
index 1883b46..b89de4c 100644 (file)
@@ -544,8 +544,7 @@ void CouchKVStore::delVBucket(uint16_t vbucket) {
 
     std::string failovers("[{\"id\":0, \"seq\":0}]");
     cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
-                                                0, 0, 0, INITIAL_DRIFT,
-                                                failovers);
+                                                0, 0, 0, failovers);
     updateDbFileMap(vbucket, 1);
 }
 
@@ -1914,7 +1913,6 @@ ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
     uint64_t lastSnapStart = 0;
     uint64_t lastSnapEnd = 0;
     uint64_t maxCas = 0;
-    int64_t driftCounter = INITIAL_DRIFT;
 
     DbInfo info;
     errCode = couchstore_db_info(db, &info);
@@ -1967,8 +1965,6 @@ ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
                                 cJSON_GetObjectItem(jsonObj, "snap_end"));
         const std::string maxCasValue = getJSONObjString(
                                 cJSON_GetObjectItem(jsonObj, "max_cas"));
-        const std::string driftCount = getJSONObjString(
-                                cJSON_GetObjectItem(jsonObj, "drift_counter"));
         cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
         if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
                 || max_deleted_seqno.compare("") == 0) {
@@ -2009,10 +2005,6 @@ ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
                 }
             }
 
-            if (driftCount.compare("") != 0) {
-                parseInt64(driftCount.c_str(), &driftCounter);
-            }
-
             if (failover_json) {
                 char* json = cJSON_PrintUnformatted(failover_json);
                 failovers.assign(json);
@@ -2027,8 +2019,7 @@ ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
     cachedVBStates[vbId] = new vbucket_state(state, checkpointId,
                                              maxDeletedSeqno, highSeqno,
                                              purgeSeqno, lastSnapStart,
-                                             lastSnapEnd, maxCas, driftCounter,
-                                             failovers);
+                                             lastSnapEnd, maxCas, failovers);
 
     return couchErr2EngineErr(errCode);
 }
@@ -2043,7 +2034,6 @@ couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState) {
               << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
               << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
               << ",\"max_cas\": \"" << vbState.maxCas << "\""
-              << ",\"drift_counter\": \"" << vbState.driftCounter << "\""
               << "}";
 
     LocalDoc lDoc;
index 0d6b185..2b52204 100644 (file)
@@ -348,8 +348,7 @@ bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source
             bufferedBackfill.bytes.fetch_add(itm->size());
             bufferedBackfill.items++;
 
-            pushToReadyQ(new MutationResponse(itm, opaque_,
-                              prepareExtendedMetaData(itm->getVBucketId())));
+            pushToReadyQ(new MutationResponse(itm, opaque_, nullptr));
 
             lastReadSeqno.store(itm->getBySeqno());
             lh.unlock();
@@ -796,8 +795,7 @@ void ActiveStream::processItems(std::vector<queued_item>& items) {
                 curChkSeqno = qi->getBySeqno();
                 lastReadSeqnoUnSnapshotted = qi->getBySeqno();
 
-                mutations.push_back(new MutationResponse(qi, opaque_,
-                            prepareExtendedMetaData(qi->getVBucketId()),
+                mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
                             isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
                                                              KEY_VALUE));
             } else if (qi->getOperation() == queue_op_checkpoint_start) {
@@ -1203,19 +1201,6 @@ uint64_t ActiveStream::getLastSentSeqno() {
     return lastSentSeqno.load();
 }
 
-ExtendedMetaData* ActiveStream::prepareExtendedMetaData(uint16_t vBucketId)
-{
-    ExtendedMetaData *emd = NULL;
-    if (producer->isExtMetaDataEnabled()) {
-        RCPtr<VBucket> vb = engine->getVBucket(vBucketId);
-        if (vb && vb->getTimeSyncConfig() == time_sync_t::ENABLED_WITH_DRIFT) {
-            int64_t adjustedTime = gethrtime() + vb->getDriftCounter();
-            emd = new ExtendedMetaData(adjustedTime);
-        }
-    }
-    return emd;
-}
-
 const Logger& ActiveStream::getLogger() const
 {
     return producer->getLogger();
index ced44ed..139aa3c 100644 (file)
@@ -289,8 +289,6 @@ private:
 
     const char* getEndStreamStatusStr(end_stream_status_t status);
 
-    ExtendedMetaData* prepareExtendedMetaData(uint16_t vBucketId);
-
     bool isCurrentSnapshotCompleted() const;
 
     /* Drop the cursor registered with the checkpoint manager.
index 3fddd5b..56ac1ac 100644 (file)
--- a/src/ep.cc
+++ b/src/ep.cc
@@ -1119,11 +1119,6 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
         abort();
     }
 
-    // Update drift counter for vbucket upon a success only
-    if (ret == ENGINE_SUCCESS && emd) {
-        vb->setDriftCounter(emd->getAdjustedTime());
-    }
-
     return ret;
 }
 
@@ -1161,7 +1156,7 @@ void EventuallyPersistentStore::snapshotVBuckets(VBSnapshotTask::Priority prio,
                 vbucket_state vb_state(vb->getState(), chkId, 0,
                                        vb->getHighSeqno(), vb->getPurgeSeqno(),
                                        range.start, range.end, vb->getMaxCas(),
-                                       vb->getDriftCounter() ,failovers);
+                                       failovers);
                 states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
             }
             return false;
@@ -1251,8 +1246,7 @@ bool EventuallyPersistentStore::persistVBState(uint16_t vbid) {
     vb->getPersistedSnapshot(range);
     vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
                            vb->getPurgeSeqno(), range.start, range.end,
-                           vb->getMaxCas(), vb->getDriftCounter(),
-                           failovers);
+                           vb->getMaxCas(), failovers);
 
     KVStore *rwUnderlying = getRWUnderlying(vbid);
     if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
@@ -1343,8 +1337,6 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
             newvb->createFilter(config.getBfilterKeyCount(),
                                 config.getBfilterFpProb());
         }
-        const std::string& timeSyncConfig = config.getTimeSynchronization();
-        newvb->setTimeSyncConfig(VBucket::convertStrToTimeSyncConfig(timeSyncConfig));
 
         // The first checkpoint for active vbucket should start with id 2.
         uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
@@ -2384,11 +2376,6 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
         }
     }
 
-    // Update drift counter for vbucket upon a success only
-    if (ret == ENGINE_SUCCESS && emd) {
-        vb->setDriftCounter(emd->getAdjustedTime());
-    }
-
     return ret;
 }
 
@@ -3045,11 +3032,6 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
         ret = ENGINE_EWOULDBLOCK;
     }
 
-    // Update drift counter for vbucket upon a success only
-    if (ret == ENGINE_SUCCESS && emd) {
-        vb->setDriftCounter(emd->getAdjustedTime());
-    }
-
     return ret;
 }
 
@@ -3389,8 +3371,7 @@ int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
                                       vbMap.getPersistenceCheckpointId(vbid),
                                       maxDeletedRevSeqno, vb->getHighSeqno(),
                                       vb->getPurgeSeqno(), range.start,
-                                      range.end, maxCas, vb->getDriftCounter(),
-                                      failovers);
+                                      range.end, maxCas, failovers);
 
                 if (rwUnderlying->snapshotVBucket(vb->getId(), vbState,
                                                   NULL, false) != true) {
index 6a6e08b..9b4a32f 100644 (file)
@@ -1354,17 +1354,14 @@ extern "C" {
                    reinterpret_cast<protocol_binary_request_get_keys*>
                                                            (request), response);
             }
+        // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
         case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
-            {
-                return h->getAdjustedTime(cookie,
-                   reinterpret_cast<protocol_binary_request_get_adjusted_time*>
-                                                           (request), response);
-            }
         case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE:
             {
-                return h->setDriftCounterState(cookie,
-                reinterpret_cast<protocol_binary_request_set_drift_counter_state*>
-                                                           (request), response);
+                return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+                                    PROTOCOL_BINARY_RAW_BYTES,
+                                    PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
+                                    cookie);
             }
         }
 
@@ -6233,73 +6230,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getRandomKey(const void *cookie,
     return ret;
 }
 
-ENGINE_ERROR_CODE EventuallyPersistentEngine::getAdjustedTime(
-                             const void *cookie,
-                             protocol_binary_request_get_adjusted_time *request,
-                             ADD_RESPONSE response) {
-
-    uint16_t vbucket = ntohs(request->message.header.request.vbucket);
-    RCPtr<VBucket> vb = getVBucket(vbucket);
-    if (!vb) {
-        return sendNotMyVBucketResponse(response, cookie, 0);
-    }
-    // Will return the vbucket's adjusted time, only if
-    // time synchronization for the vbucket is enabled
-    if (vb->getTimeSyncConfig() == time_sync_t::ENABLED_WITH_DRIFT) {
-        int64_t adjusted_time = gethrtime() + vb->getDriftCounter();
-        adjusted_time = htonll(adjusted_time);
-        return sendResponse(response, NULL, 0, NULL, 0,
-                            (const void *)&adjusted_time, sizeof(int64_t),
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
-    } else {
-        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
-                            cookie);
-    }
-}
-
-ENGINE_ERROR_CODE EventuallyPersistentEngine::setDriftCounterState(
-                       const void *cookie,
-                       protocol_binary_request_set_drift_counter_state *request,
-                       ADD_RESPONSE response) {
-
-    uint16_t vbucket = ntohs(request->message.header.request.vbucket);
-    RCPtr<VBucket> vb = getVBucket(vbucket);
-    if (!vb) {
-        return sendNotMyVBucketResponse(response, cookie, 0);
-    }
-
-    if (vb->getTimeSyncConfig() != time_sync_t::ENABLED_WITH_DRIFT) {
-        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
-                            cookie);
-    }
-
-    int64_t driftCount = ntohll(request->message.body.initial_drift);
-    if (driftCount == INITIAL_DRIFT) {
-        return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
-                            PROTOCOL_BINARY_RAW_BYTES,
-                            PROTOCOL_BINARY_RESPONSE_EINVAL, 0,
-                            cookie);
-    }
-
-    set_drift_state_resp_t resp = vb->setDriftCounterState(driftCount);
-    uint64_t uuid = htonll(resp.last_vb_uuid);
-    int64_t seqno = htonll(resp.last_seqno);
-
-    std::stringstream result;
-    result.write((char*) &uuid, sizeof(uuid));
-    result.write((char*) &seqno, sizeof(seqno));
-
-    return sendResponse(response, NULL, 0, NULL, 0,
-                        result.str().data(), result.str().length(),
-                        PROTOCOL_BINARY_RAW_BYTES,
-                        PROTOCOL_BINARY_RESPONSE_SUCCESS, 0 , cookie);
-}
-
 ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(const void* cookie,
                                                        uint32_t opaque,
                                                        uint32_t seqno,
index 98e4991..0c1f411 100644 (file)
@@ -533,14 +533,6 @@ public:
                                 protocol_binary_request_get_keys *request,
                                 ADD_RESPONSE response);
 
-    ENGINE_ERROR_CODE getAdjustedTime(const void* cookie,
-                             protocol_binary_request_get_adjusted_time *request,
-                             ADD_RESPONSE response);
-
-    ENGINE_ERROR_CODE setDriftCounterState(const void* cookie,
-                       protocol_binary_request_set_drift_counter_state *request,
-                       ADD_RESPONSE response);
-
     /**
      * Visit the objects and add them to the tap/dcp connecitons queue.
      * @todo this code should honor the backfill time!
index 267102f..c6a5454 100644 (file)
 ExtendedMetaData::ExtendedMetaData(const void *meta, uint16_t nmeta) {
     len = nmeta;
     data = static_cast<const char*>(meta);
-    adjustedTime = 0;
     ret = ENGINE_SUCCESS;
     memoryAllocated = false;
     decodeMeta();
 }
 
-ExtendedMetaData::ExtendedMetaData(int64_t adjusted_time) {
-    len = 0;
-    data = NULL;
-    adjustedTime = adjusted_time;
-    ret = ENGINE_SUCCESS;
-    memoryAllocated = false;
-    encodeMeta();
-}
-
 ExtendedMetaData::~ExtendedMetaData() {
     if (memoryAllocated) {
         delete[] data;
@@ -78,9 +68,7 @@ void ExtendedMetaData::decodeMeta() {
                 }
                 switch (type) {
                     case CMD_META_ADJUSTED_TIME:
-                        memcpy(&adjustedTime, data + offset, length);
-                        adjustedTime = ntohll(adjustedTime);
-                        break;
+                        // Ignoring adjusted_time
                     case CMD_META_CONFLICT_RES_MODE:
                         // MB-21143: Now ignoring conflict_res_mode
                         // 4.6 no longer sends, but older versions
@@ -100,38 +88,3 @@ void ExtendedMetaData::decodeMeta() {
         ret = ENGINE_EINVAL;
     }
 }
-
-void ExtendedMetaData::encodeMeta() {
-    uint8_t version = META_EXT_VERSION_ONE;
-    uint8_t type;
-    int64_t adjusted_time = htonll(adjustedTime);
-    uint16_t length;
-    uint16_t nmeta = sizeof(version) + sizeof(type) + sizeof(length) +
-                sizeof(adjusted_time);
-
-    char* meta = new char[nmeta];
-    if (meta == NULL) {
-        ret = ENGINE_ENOMEM;
-    } else {
-        memoryAllocated = true;
-        uint32_t offset = 0;
-
-        memcpy(meta, &version, sizeof(version));
-        offset += sizeof(version);
-
-        type = CMD_META_ADJUSTED_TIME;
-        length = sizeof(adjusted_time);
-        length = htons(length);
-
-        memcpy(meta + offset, &type, sizeof(type));
-        offset += sizeof(type);
-
-        memcpy(meta + offset, &length, sizeof(length));
-        offset += sizeof(length);
-
-        memcpy(meta + offset, &adjusted_time, sizeof(adjusted_time));
-
-        data = (const char*)meta;
-        len = nmeta;
-    }
-}
index 5d18517..0714a46 100644 (file)
@@ -52,28 +52,31 @@ enum cmd_meta_extras_type {
  */
 class ExtendedMetaData {
 public:
+    ExtendedMetaData()
+          : data(nullptr),
+            ret(ENGINE_SUCCESS),
+            len(0),
+            memoryAllocated(false) {}
+
     ExtendedMetaData(const void *meta, uint16_t nmeta);
-    ExtendedMetaData(int64_t adjusted_time);
     ~ExtendedMetaData();
 
     ENGINE_ERROR_CODE getStatus() {
         return ret;
     }
 
-    int64_t getAdjustedTime() {
-        return adjustedTime;
-    }
-
     std::pair<const char*, uint16_t> getExtMeta() {
         return std::make_pair(data, len);
     }
 
 private:
+    /*
+    void encodeMeta(); is currently removed as there's no extmeta to encode.
+    Resurrect from history as required.
+    */
     void decodeMeta();
-    void encodeMeta();
 
     const char* data;
-    int64_t adjustedTime;
     ENGINE_ERROR_CODE ret;
     uint16_t len;
     bool memoryAllocated;
index a649d23..0606ff1 100644 (file)
@@ -304,7 +304,6 @@ ENGINE_ERROR_CODE ForestKVStore::readVBState(uint16_t vbId) {
     uint64_t lastSnapStart = 0;
     uint64_t lastSnapEnd = 0;
     uint64_t maxCas = 0;
-    int64_t driftCounter = INITIAL_DRIFT;
 
     fdb_kvs_info kvsInfo;
     fdb_kvs_handle *kvsHandle = getKvsHandle(vbId, handleType::READER);
@@ -363,9 +362,6 @@ ENGINE_ERROR_CODE ForestKVStore::readVBState(uint16_t vbId) {
         const std::string maxCasValue = getJSONObjString(
                                  cJSON_GetObjectItem(jsonObj, "max_cas"));
 
-        const std::string driftCount = getJSONObjString(
-                                 cJSON_GetObjectItem(jsonObj, "drift_counter"));
-
         cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
         if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
                || max_deleted_seqno.compare("") == 0) {
@@ -392,10 +388,6 @@ ENGINE_ERROR_CODE ForestKVStore::readVBState(uint16_t vbId) {
                 parseUint64(maxCasValue.c_str(), &maxCas);
             }
 
-            if (driftCount.compare("")) {
-                parseInt64(driftCount.c_str(), &driftCounter);
-            }
-
             if (failover_json) {
                 char* json = cJSON_PrintUnformatted(failover_json);
                 failovers.assign(json);
@@ -414,8 +406,7 @@ ENGINE_ERROR_CODE ForestKVStore::readVBState(uint16_t vbId) {
     cachedVBStates[vbId] = new vbucket_state(state, checkpointId,
                                              maxDeletedSeqno, highSeqno, 0,
                                              lastSnapStart, lastSnapEnd,
-                                             maxCas, driftCounter,
-                                             failovers);
+                                             maxCas, failovers);
     fdb_doc_free(statDoc);
     return forestErr2EngineErr(status);
 }
@@ -465,8 +456,7 @@ void ForestKVStore::delVBucket(uint16_t vbucket) {
 
     std::string failovers("[{\"id\":0, \"seq\":0}]");
     cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
-                                                0, 0, 0, INITIAL_DRIFT,
-                                                failovers);
+                                                0, 0, 0, failovers);
 
     vbucket_state *state = cachedVBStates[vbucket];
     std::string stateStr = state->toJSON();
index 516f837..84614e2 100644 (file)
@@ -96,7 +96,6 @@ bool KVStore::updateCachedVBState(uint16_t vbid, const vbucket_state& newState)
         vbState->lastSnapStart = newState.lastSnapStart;
         vbState->lastSnapEnd = newState.lastSnapEnd;
         vbState->maxCas = std::max(vbState->maxCas, newState.maxCas);
-        vbState->driftCounter = newState.driftCounter;
     } else {
         cachedVBStates[vbid] = new vbucket_state(newState);
     }
@@ -182,7 +181,6 @@ std::string vbucket_state::toJSON() const {
               << ",\"snap_start\": \"" << lastSnapStart << "\""
               << ",\"snap_end\": \"" << lastSnapEnd << "\""
               << ",\"max_cas\": \"" << maxCas << "\""
-              << ",\"drift_counter\": \"" << driftCounter << "\""
               << "}";
 
     return jsonState.str();
index 752b059..dc86c12 100644 (file)
@@ -134,12 +134,11 @@ struct vbucket_state {
     vbucket_state(vbucket_state_t _state, uint64_t _chkid,
                   uint64_t _maxDelSeqNum, int64_t _highSeqno,
                   uint64_t _purgeSeqno, uint64_t _lastSnapStart,
-                  uint64_t _lastSnapEnd, uint64_t _maxCas,
-                  uint64_t _driftCounter, std::string& _failovers) :
+                  uint64_t _lastSnapEnd, uint64_t _maxCas, std::string& _failovers) :
         state(_state), checkpointId(_chkid), maxDeletedSeqno(_maxDelSeqNum),
         highSeqno(_highSeqno), purgeSeqno(_purgeSeqno),
         lastSnapStart(_lastSnapStart), lastSnapEnd(_lastSnapEnd),
-        maxCas(_maxCas), driftCounter(_driftCounter),failovers(_failovers) { }
+        maxCas(_maxCas), failovers(_failovers) { }
 
     vbucket_state(const vbucket_state& vbstate) {
         state = vbstate.state;
@@ -151,7 +150,6 @@ struct vbucket_state {
         lastSnapStart = vbstate.lastSnapStart;
         lastSnapEnd = vbstate.lastSnapEnd;
         maxCas = vbstate.maxCas;
-        driftCounter = vbstate.driftCounter;
     }
 
     std::string toJSON() const;
@@ -177,7 +175,6 @@ struct vbucket_state {
         lastSnapStart = 0;
         lastSnapEnd = 0;
         maxCas = 0;
-        driftCounter = INITIAL_DRIFT;
         failovers.assign("[{\"id\":0, \"seq\":0}]");
     }
 
@@ -189,7 +186,6 @@ struct vbucket_state {
     uint64_t lastSnapStart;
     uint64_t lastSnapEnd;
     uint64_t maxCas;
-    int64_t driftCounter;
     std::string failovers;
 };
 
index 8cde4cb..5719214 100644 (file)
@@ -604,24 +604,13 @@ size_t VBucket::getNumOfKeysInFilter() {
 }
 
 uint64_t VBucket::nextHLCCas() {
-    int64_t adjusted_time = gethrtime();
-    uint64_t final_adjusted_time = 0;
-
-    if (time_sync_config == time_sync_t::ENABLED_WITH_DRIFT) {
-        adjusted_time += drift_counter;
-    }
-
-    if (adjusted_time < 0) {
-        LOG(EXTENSION_LOG_WARNING,
-            "Adjusted time is negative: %" PRId64 "\n", adjusted_time);
-    }
-
-    final_adjusted_time = ((uint64_t)adjusted_time) & ~((1 << 16) - 1);
+    hrtime_t now = gethrtime();
+    uint64_t now48bits = ((uint64_t)now) & ~((1 << 16) - 1);
     uint64_t local_max_cas = max_cas.load();
 
-    if (final_adjusted_time > local_max_cas) {
-        atomic_setIfBigger(max_cas, final_adjusted_time);
-        return final_adjusted_time;
+    if (now48bits > local_max_cas) {
+        atomic_setIfBigger(max_cas, now48bits);
+        return now48bits;
     }
 
     atomic_setIfBigger(max_cas, local_max_cas + 1);
@@ -662,9 +651,6 @@ void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
         addStat("bloom_filter_size", getFilterSize(), add_stat, c);
         addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
         addStat("max_cas", getMaxCas(), add_stat, c);
-        addStat("drift_counter", getDriftCounter(), add_stat, c);
-        addStat("time_sync", isTimeSyncEnabled() ? "enabled" : "disabled",
-                add_stat, c);
         addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
     }
 }
index 1003d27..1f1c75d 100644 (file)
 
 #include <queue>
 
-typedef struct {
-    uint64_t last_vb_uuid;
-    int64_t last_seqno;
-} set_drift_state_resp_t;
-
 class BgFetcher;
 
 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
@@ -135,17 +130,6 @@ class FailoverTable;
 class KVShard;
 
 /**
- * Indicates the possible time synchronization settings
- * for the vbucket
- */
-
-enum class time_sync_t {
-    DISABLED,               //No time synchronization.
-    ENABLED_WITHOUT_DRIFT,  //Time synchronization but no usage of drift counter
-    ENABLED_WITH_DRIFT      //Time synchronization with usage of drift counter
-};
-
-/**
  * An individual vbucket.
  */
 class VBucket : public RCValue {
@@ -161,7 +145,7 @@ public:
             std::shared_ptr<Callback<id_type> > cb,
             vbucket_state_t initState = vbucket_state_dead,
             uint64_t chkId = 1, uint64_t purgeSeqno = 0,
-            uint64_t maxCas = 0, int64_t driftCounter = INITIAL_DRIFT):
+            uint64_t maxCas = 0):
         ht(st),
         checkpointManager(st, i, chkConfig, lastSeqno, lastSnapStart,
                           lastSnapEnd, cb, chkId),
@@ -186,8 +170,6 @@ public:
         stats(st),
         purge_seqno(purgeSeqno),
         max_cas(maxCas),
-        drift_counter(driftCounter),
-        time_sync_config(time_sync_t::DISABLED),
         takeover_backed_up(false),
         persisted_snapshot_start(lastSnapStart),
         persisted_snapshot_end(lastSnapEnd),
@@ -252,65 +234,10 @@ public:
         return max_cas;
     }
 
-    bool isTimeSyncEnabled() {
-        if (time_sync_config == time_sync_t::ENABLED_WITHOUT_DRIFT ||
-            time_sync_config == time_sync_t::ENABLED_WITH_DRIFT) {
-            return true;
-        }
-
-        return false;
-    }
-
-    time_sync_t getTimeSyncConfig() {
-        return time_sync_config;
-    }
-
-    void setTimeSyncConfig(time_sync_t timeSyncConfig) {
-        time_sync_config.store(timeSyncConfig);
-    }
-
     void setMaxCas(uint64_t cas) {
         atomic_setIfBigger(max_cas, cas);
     }
 
-    /**
-     * To set drift counter's initial value
-     *
-     * Returns last_vbuuid and last_seqno of vbucket (atomically)
-     */
-    set_drift_state_resp_t setDriftCounterState(int64_t initial_drift) {
-        drift_counter = initial_drift;
-
-        // Get vbucket uuid from the failover table, and then get
-        // the vbucket high seqno, return these 2 values as long as
-        // the uuid did not change after getting the high seqno.
-        uint64_t last_vbuuid = 0;
-        int64_t last_seqno = 0;
-        do {
-            last_vbuuid = failovers->getLatestUUID();
-            last_seqno = getHighSeqno();
-        } while (failovers->getLatestUUID() != last_vbuuid);
-        set_drift_state_resp_t resp;
-        resp.last_vb_uuid = last_vbuuid;
-        resp.last_seqno = last_seqno;
-        return resp;
-    }
-
-    int64_t getDriftCounter() {
-        return drift_counter;
-    }
-
-    void setDriftCounter(int64_t adjustedTime) {
-        // Update drift counter only if timeSync is enabled for
-        // the vbucket.
-        if (time_sync_config == time_sync_t::ENABLED_WITH_DRIFT) {
-            int64_t wallTime = gethrtime();
-            if ((wallTime + getDriftCounter()) < adjustedTime) {
-                drift_counter = (adjustedTime - wallTime);
-            }
-        }
-    }
-
     bool isTakeoverBackedUp() {
         return takeover_backed_up.load();
     }
@@ -444,16 +371,6 @@ public:
         }
     }
 
-    static time_sync_t convertStrToTimeSyncConfig(const std::string& timeSyncConfig) {
-        if (timeSyncConfig == "enabled_without_drift") {
-            return time_sync_t::ENABLED_WITHOUT_DRIFT;
-        } else if (timeSyncConfig == "enabled_with_drift") {
-            return time_sync_t::ENABLED_WITH_DRIFT;
-        }
-
-        return time_sync_t::DISABLED;
-    }
-
     void addHighPriorityVBEntry(uint64_t id, const void *cookie,
                                 bool isBySeqno);
     void notifyOnPersistence(EventuallyPersistentEngine &e,
@@ -580,9 +497,6 @@ private:
     uint64_t                        purge_seqno;
 
     AtomicValue<uint64_t>           max_cas;
-    AtomicValue<int64_t>            drift_counter;
-    AtomicValue<time_sync_t>        time_sync_config;
-
     AtomicValue<bool>               takeover_backed_up;
 
     Mutex pendingBGFetchesLock;
index 6d1c0e8..ddcd79a 100644 (file)
@@ -465,9 +465,6 @@ void Warmup::scheduleCreateVBuckets()
 void Warmup::createVBuckets(uint16_t shardId) {
     size_t maxEntries = store.getEPEngine().getMaxFailoverEntries();
     std::map<uint16_t, vbucket_state>& vbStates = shardVbStates[shardId];
-    Configuration& config = store.getEPEngine().getConfiguration();
-    time_sync_t timeSyncConfig =
-               VBucket::convertStrToTimeSyncConfig(config.getTimeSynchronization());
 
     std::map<uint16_t, vbucket_state>::iterator itr;
     for (itr = vbStates.begin(); itr != vbStates.end(); ++itr) {
@@ -489,10 +486,7 @@ void Warmup::createVBuckets(uint16_t shardId) {
                                  store.getEPEngine().getCheckpointConfig(),
                                  shard, vbs.highSeqno, vbs.lastSnapStart,
                                  vbs.lastSnapEnd, table, cb, vbs.state, 1,
-                                 vbs.purgeSeqno, vbs.maxCas,
-                                 vbs.driftCounter));
-
-            vb->setTimeSyncConfig(timeSyncConfig);
+                                 vbs.purgeSeqno, vbs.maxCas));
 
             if(vbs.state == vbucket_state_active && !cleanShutdown) {
                 if (static_cast<uint64_t>(vbs.highSeqno) == vbs.lastSnapEnd) {
index ce8d3a7..18a5e19 100644 (file)
@@ -346,32 +346,11 @@ protocol_binary_request_header* createPacket(uint8_t opcode,
     return req;
 }
 
-void set_drift_counter_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
-                             int64_t initialDriftCount) {
-
-    protocol_binary_request_header *request;
-
-    int64_t driftCount = htonll(initialDriftCount);
-    uint8_t timeSync = 0x00;
-    uint8_t extlen = sizeof(driftCount) + sizeof(timeSync);
-    char *ext = new char[extlen];
-    memcpy(ext, (char *)&driftCount, sizeof(driftCount));
-    memcpy(ext + sizeof(driftCount), (char *)&timeSync, sizeof(timeSync));
-
-    request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
-                           0, 0, ext, extlen);
-    h1->unknown_command(h, NULL, request, add_response);
-    check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
-            "Expected success for CMD_SET_DRIFT_COUNTER_STATE");
-    cb_free(request);
-    delete[] ext;
-}
-
 void add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const char *val, const size_t vallen,
                    const uint32_t vb, ItemMetaData *itemMeta,
                    bool skipConflictResolution, uint8_t datatype,
-                   bool includeExtMeta, int64_t adjustedTime) {
+                   bool includeExtMeta) {
     int blen = 0;
     char *ext;
     ExtendedMetaData *emd = NULL;
@@ -389,7 +368,7 @@ void add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
         blen = 26;
         ext = new char[blen];
         encodeWithMetaExt(ext, itemMeta);
-        emd = new ExtendedMetaData(adjustedTime);
+        emd = new ExtendedMetaData();
         // nmeta added to ext below
     }
 
@@ -450,7 +429,7 @@ void del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const uint32_t vb,
                    ItemMetaData *itemMeta, uint64_t cas_for_delete,
                    bool skipConflictResolution, bool includeExtMeta,
-                   int64_t adjustedTime, const void *cookie) {
+                   const void *cookie) {
     int blen = 0;
     char *ext;
     ExtendedMetaData *emd = NULL;
@@ -468,7 +447,7 @@ void del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
         blen = 26;
         ext = new char[blen];
         encodeWithMetaExt(ext, itemMeta);
-        emd = new ExtendedMetaData(adjustedTime);
+        emd = new ExtendedMetaData();
         // nmeta added to ext below
     }
 
@@ -793,8 +772,7 @@ void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const char *val, const size_t vallen,
                    const uint32_t vb, ItemMetaData *itemMeta,
                    uint64_t cas_for_set, bool skipConflictResolution,
-                   uint8_t datatype, bool includeExtMeta,
-                   int64_t adjustedTime, const void *cookie) {
+                   uint8_t datatype, bool includeExtMeta, const void *cookie) {
     int blen = 0;
     char *ext;
     ExtendedMetaData *emd = NULL;
@@ -812,7 +790,7 @@ void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
         blen = 26;
         ext = new char[blen];
         encodeWithMetaExt(ext, itemMeta);
-        emd = new ExtendedMetaData(adjustedTime);
+        emd = new ExtendedMetaData();
         // nmeta added to ext below
     }
 
index f42314d..9ee0afb 100644 (file)
@@ -363,22 +363,20 @@ void add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const char *val, const size_t vallen,
                    const uint32_t vb, ItemMetaData *itemMeta,
                    bool skipConflictResolution = false,
-                   uint8_t datatype = 0x00, bool includeExtMeta = false,
-                   int64_t adjusted_time = 0);
+                   uint8_t datatype = 0x00, bool includeExtMeta = false);
 bool get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
               bool reqExtMeta = false);
 void del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const uint32_t vb,
                    ItemMetaData *itemMeta, uint64_t cas_for_delete = 0,
                    bool skipConflictResolution = false,
-                   bool includeExtMeta = false,
-                   int64_t adjustedTime = 0, const void *cookie = NULL);
+                   bool includeExtMeta = false, const void *cookie = NULL);
 void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                    const size_t keylen, const char *val, const size_t vallen,
                    const uint32_t vb, ItemMetaData *itemMeta,
                    uint64_t cas_for_set, bool skipConflictResolution = false,
                    uint8_t datatype = 0x00, bool includeExtMeta = false,
-                   int64_t adjustedTime = 0, const void *cookie = NULL);
+                   const void *cookie = NULL);
 void return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
                  const size_t keylen, const char *val, const size_t vallen,
                  const uint32_t vb, const uint64_t cas, const uint32_t flags,
index 83590d0..d2fa262 100644 (file)
@@ -2581,7 +2581,7 @@ static enum test_result test_datatype(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     itm_meta.exptime = info.exptime;
     itm_meta.flags = info.flags;
     set_with_meta(h, h1, key1, strlen(key1), val1, strlen(val1), 0, &itm_meta,
-                  last_cas, false, info.datatype, false, 0, cookie);
+                  last_cas, false, info.datatype, false, cookie);
 
     checkeq(ENGINE_SUCCESS,
             h1->get(h, cookie, &itm, key1, strlen(key1), 0),
@@ -2613,7 +2613,7 @@ static enum test_result test_datatype_with_unknown_command(ENGINE_HANDLE *h,
 
     //SET_WITH_META
     set_with_meta(h, h1, key, strlen(key), val, strlen(val), 0, &itm_meta,
-                  0, false, datatype, false, 0, cookie);
+                  0, false, datatype, false, cookie);
 
     checkeq(ENGINE_SUCCESS,
             h1->get(h, cookie, &itm, key, strlen(key), 0),
@@ -5543,8 +5543,6 @@ static enum test_result test_hlc_cas(ENGINE_HANDLE *h,
 
     memset(&info, 0, sizeof(info));
 
-    //Set a really large drift value
-    set_drift_counter_state(h, h1, 100000);
     checkeq(ENGINE_SUCCESS,
             store(h, h1, NULL, OPERATION_ADD, key, "data1", &i, 0, 0),
             "Failed to store an item");
@@ -5555,10 +5553,6 @@ static enum test_result test_hlc_cas(ENGINE_HANDLE *h,
     check(curr_cas > prev_cas, "CAS is not monotonically increasing");
     prev_cas = curr_cas;
 
-    //set a lesser drift and ensure that the CAS is monotonically
-    //increasing
-    set_drift_counter_state(h, h1, 100);
-
     checkeq(ENGINE_SUCCESS,
             store(h, h1, NULL, OPERATION_SET, key, "data2", &i, 0, 0),
             "Failed to store an item");
@@ -5569,26 +5563,6 @@ static enum test_result test_hlc_cas(ENGINE_HANDLE *h,
     check(curr_cas > prev_cas, "CAS is not monotonically increasing");
     prev_cas = curr_cas;
 
-    //ensure that the adjusted time will be negative
-    int64_t drift_counter = (-1) * (gethrtime() * 2);
-    set_drift_counter_state(h, h1, drift_counter);
-
-    protocol_binary_request_header *request;
-    int64_t adjusted_time;
-    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
-                           NULL, 0, NULL, 0);
-    h1->unknown_command(h, NULL, request, add_response);
-    cb_free(request);
-    checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
-            "Expected Success");
-    checkeq(sizeof(int64_t), last_body.size(),
-            "Bodylen didn't match expected value");
-    memcpy(&adjusted_time, last_body.data(), last_body.size());
-    adjusted_time = ntohll(adjusted_time);
-    std::string err_msg("Adjusted time " + std::to_string(adjusted_time) +
-                        " is supposed to have been negative!");
-    check(adjusted_time < 0, err_msg.c_str());
-
     checkeq(ENGINE_SUCCESS,
             store(h, h1, NULL, OPERATION_REPLACE, key, "data3", &i, 0, 0),
             "Failed to store an item");
@@ -5599,9 +5573,6 @@ static enum test_result test_hlc_cas(ENGINE_HANDLE *h,
     check(curr_cas > prev_cas, "CAS is not monotonically increasing");
     prev_cas = curr_cas;
 
-    //Set the drift value to 0
-    set_drift_counter_state(h, h1, 0);
-
     getl(h, h1, key, 0, 10);
     checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
           "Expected to be able to getl on first try");
@@ -5900,7 +5871,6 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "vb_0:bloom_filter_size",
                 "vb_0:db_data_size",
                 "vb_0:db_file_size",
-                "vb_0:drift_counter",
                 "vb_0:high_seqno",
                 "vb_0:ht_cache_size",
                 "vb_0:ht_item_memory",
@@ -5922,7 +5892,6 @@ static enum test_result test_mb19687_fixed(ENGINE_HANDLE* h,
                 "vb_0:queue_memory",
                 "vb_0:queue_size",
                 "vb_0:rollback_item_count",
-                "vb_0:time_sync",
                 "vb_0:uuid"
             }
         },
@@ -7046,7 +7015,7 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("test failover log behavior", test_failover_log_behavior,
                  test_setup, teardown, NULL, prepare, cleanup),
         TestCase("test hlc cas", test_hlc_cas, test_setup, teardown,
-                 "time_synchronization=enabled_with_drift", prepare, cleanup),
+                 NULL, prepare, cleanup),
 
         TestCaseV2("multi_bucket set/get ", test_multi_bucket_set_get, NULL,
                    teardown_v2, NULL, prepare, cleanup),
index 52551a7..8fa647b 100644 (file)
@@ -69,7 +69,6 @@ public:
           exp_markers(0),
           extra_takeover_ops(0),
           exp_disk_snapshot(false),
-          time_sync_enabled(false),
           exp_conflict_res(0),
           skip_estimate_check(false),
           live_frontend_client(false),
@@ -101,8 +100,6 @@ public:
     size_t extra_takeover_ops;
     /* Flag - expect disk snapshot or not */
     bool exp_disk_snapshot;
-    /* Flag - indicating time sync status */
-    bool time_sync_enabled;
     /* Expected conflict resolution flag */
     uint8_t exp_conflict_res;
     /* Skip estimate check during takeover */
@@ -188,7 +185,6 @@ private:
               last_by_seqno(0),
               extra_takeover_ops(0),
               exp_disk_snapshot(false),
-              time_sync_enabled(false),
               exp_conflict_res(0) { }
 
         size_t num_mutations;
@@ -201,7 +197,6 @@ private:
         uint64_t last_by_seqno;
         size_t extra_takeover_ops;
         bool exp_disk_snapshot;
-        bool time_sync_enabled;
         uint8_t exp_conflict_res;
     };
 
@@ -291,10 +286,6 @@ void TestDcpConsumer::run(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
                                    PROTOCOL_BINARY_RESPONSE_SUCCESS,
                                    dcp_last_opaque);
                     }
-                    if (stats.time_sync_enabled) {
-                        checkeq(static_cast<size_t>(16), dcp_last_meta.size(),
-                                "Expected extended meta in mutation packet");
-                    }
 
                     break;
                 case PROTOCOL_BINARY_CMD_DCP_DELETION:
@@ -312,11 +303,6 @@ void TestDcpConsumer::run(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
                                    PROTOCOL_BINARY_RESPONSE_SUCCESS,
                                    dcp_last_opaque);
                     }
-                    if (stats.time_sync_enabled) {
-                        checkeq(static_cast<size_t>(16),
-                                dcp_last_meta.size(),
-                                "Expected adjusted time in mutation packet");
-                    }
 
                     break;
                 case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
@@ -634,7 +620,6 @@ ENGINE_ERROR_CODE TestDcpConsumer::openStreams(ENGINE_HANDLE *h, ENGINE_HANDLE_V
         VBStats stats;
         stats.extra_takeover_ops = ctx.extra_takeover_ops;
         stats.exp_disk_snapshot = ctx.exp_disk_snapshot;
-        stats.time_sync_enabled = ctx.time_sync_enabled;
         stats.exp_conflict_res = ctx.exp_conflict_res;
 
         vb_stats[ctx.vbucket] = stats;
@@ -1589,54 +1574,6 @@ static enum test_result test_dcp_producer_stream_req_partial(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
-static enum test_result test_dcp_producer_stream_req_partial_with_time_sync(
-                                                             ENGINE_HANDLE *h,
-                                                             ENGINE_HANDLE_V1 *h1) {
-    /*
-     * temporarily skip this testcase to prevent CV regr run failures
-     * till fix for it will be implemented and committed (MB-18669)
-     */
-    return SKIPPED;
-
-    set_drift_counter_state(h, h1, /* initial drift */1000);
-
-    const int num_items = 200;
-    write_items(h, h1, num_items);
-
-    wait_for_flusher_to_settle(h, h1);
-    stop_persistence(h, h1);
-
-    for (int j = 0; j < (num_items / 2); ++j) {
-        std::stringstream ss;
-        ss << "key" << j;
-        checkeq(ENGINE_SUCCESS,
-                del(h, h1, ss.str().c_str(), 0, 0),
-                "Expected delete to succeed");
-    }
-
-    wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 2, "checkpoint");
-
-    const void *cookie = testHarness.create_cookie();
-
-    DcpStreamCtx ctx;
-    ctx.vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
-    ctx.seqno = {95, 209};
-    ctx.snapshot = {95, 95};
-    ctx.exp_mutations = 105;
-    ctx.exp_deletions = 100;
-    ctx.exp_markers = 2;
-    ctx.time_sync_enabled = true;
-    ctx.exp_conflict_res = 1;
-
-    TestDcpConsumer tdc("unittest", cookie);
-    tdc.addStreamCtx(ctx);
-    tdc.run(h, h1);
-
-    testHarness.destroy_cookie(cookie);
-
-    return SUCCESS;
-}
-
 static enum test_result test_dcp_producer_stream_req_full(ENGINE_HANDLE *h,
                                                           ENGINE_HANDLE_V1 *h1) {
     const int num_items = 300, batch_items = 100;
@@ -4088,100 +4025,6 @@ static enum test_result test_dcp_consumer_mutate(ENGINE_HANDLE *h,
     return SUCCESS;
 }
 
-static enum test_result test_dcp_consumer_mutate_with_time_sync(
-                                                        ENGINE_HANDLE *h,
-                                                        ENGINE_HANDLE_V1 *h1) {
-
-    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
-          "Failed to set vbucket state.");
-
-    set_drift_counter_state(h, h1, /* initial_drift */1000);
-
-    const void *cookie = testHarness.create_cookie();
-    uint32_t opaque = 0xFFFF0000;
-    uint32_t seqno = 0;
-    uint32_t flags = 0;
-    const char *name = "unittest";
-    uint16_t nname = strlen(name);
-
-    // Open an DCP connection
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
-            "Failed dcp producer open connection.");
-
-    std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
-    checkeq(0, type.compare("consumer"), "Consumer not found");
-
-    opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
-                                     PROTOCOL_BINARY_RESPONSE_SUCCESS);
-
-    uint32_t dataLen = 100;
-    char *data = static_cast<char *>(cb_malloc(dataLen));
-    memset(data, 'x', dataLen);
-
-    uint8_t cas = 0x1;
-    uint16_t vbucket = 0;
-    uint8_t datatype = 1;
-    uint64_t bySeqno = 10;
-    uint64_t revSeqno = 0;
-    uint32_t exprtime = 0;
-    uint32_t lockTime = 0;
-
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1),
-            "Failed to send snapshot marker");
-
-    // Consume a DCP mutation with extended meta
-    int64_t adjusted_time1 = gethrtime() * 2;
-    ExtendedMetaData *emd = new ExtendedMetaData(adjusted_time1);
-    cb_assert(emd && emd->getStatus() == ENGINE_SUCCESS);
-    std::pair<const char*, uint16_t> meta = emd->getExtMeta();
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.mutation(h, cookie, opaque, "key", 3, data, dataLen, cas,
-                           vbucket, flags, datatype,
-                           bySeqno, revSeqno, exprtime,
-                           lockTime, meta.first, meta.second, 0),
-            "Failed dcp mutate.");
-    delete emd;
-
-    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0, "dcp");
-
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.close_stream(h, cookie, opaque, 0),
-            "Expected success");
-
-    check(set_vbucket_state(h, h1, 0, vbucket_state_active),
-          "Failed to set vbucket state.");
-    wait_for_flusher_to_settle(h, h1);
-
-    check_key_value(h, h1, "key", data, dataLen);
-
-    testHarness.destroy_cookie(cookie);
-    cb_free(data);
-
-    protocol_binary_request_header *request;
-    int64_t adjusted_time2;
-    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
-                           NULL, 0, NULL, 0);
-    h1->unknown_command(h, NULL, request, add_response);
-    cb_free(request);
-    checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
-            "Expected Success");
-    checkeq(last_body.size(), sizeof(int64_t),
-            "Bodylen didn't match expected value");
-    memcpy(&adjusted_time2, last_body.data(), last_body.size());
-    adjusted_time2 = ntohll(adjusted_time2);
-
-    /**
-     * Check that adjusted_time2 is marginally greater than
-     * adjusted_time1.
-     */
-    check(adjusted_time2 >= adjusted_time1,
-            "Adjusted time after mutation: Not what is expected");
-
-    return SUCCESS;
-}
-
 static enum test_result test_dcp_consumer_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
     // Store an item
     item *i = NULL;
@@ -4244,94 +4087,6 @@ static enum test_result test_dcp_consumer_delete(ENGINE_HANDLE *h, ENGINE_HANDLE
     return SUCCESS;
 }
 
-static enum test_result test_dcp_consumer_delete_with_time_sync(
-                                                        ENGINE_HANDLE *h,
-                                                        ENGINE_HANDLE_V1 *h1) {
-
-    //Set drift value
-    set_drift_counter_state(h, h1, /* initial drift */1000);
-
-    // Store an item
-    item *i = NULL;
-    checkeq(ENGINE_SUCCESS,
-            store(h, h1, NULL, OPERATION_ADD,"key", "value", &i),
-            "Failed to fail to store an item.");
-    h1->release(h, NULL, i);
-    verify_curr_items(h, h1, 1, "one item stored");
-
-    wait_for_flusher_to_settle(h, h1);
-
-    check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
-          "Failed to set vbucket state.");
-
-    const void *cookie = testHarness.create_cookie();
-    uint32_t opaque = 0;
-    uint8_t cas = 0x1;
-    uint16_t vbucket = 0;
-    uint32_t flags = 0;
-    uint64_t bySeqno = 10;
-    uint64_t revSeqno = 0;
-    const char *name = "unittest";
-    uint16_t nname = strlen(name);
-    uint32_t seqno = 0;
-
-    // Open an DCP connection
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
-            "Failed dcp producer open connection.");
-
-    std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
-    checkeq(0, type.compare("consumer"), "Consumer not found");
-
-    opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
-                                     PROTOCOL_BINARY_RESPONSE_SUCCESS);
-
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1),
-            "Failed to send snapshot marker");
-
-    // Consume an DCP deletion
-    int64_t adjusted_time1 = gethrtime() * 2;
-    ExtendedMetaData *emd = new ExtendedMetaData(adjusted_time1);
-    cb_assert(emd && emd->getStatus() == ENGINE_SUCCESS);
-    std::pair<const char*, uint16_t> meta = emd->getExtMeta();
-    checkeq(ENGINE_SUCCESS,
-            h1->dcp.deletion(h, cookie, opaque, "key", 3, cas, vbucket,
-                             bySeqno, revSeqno, meta.first, meta.second),
-            "Failed dcp delete.");
-    delete emd;
-
-    wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
-                        "dcp");
-
-    wait_for_stat_change(h, h1, "curr_items", 1);
-    verify_curr_items(h, h1, 0, "one item deleted");
-    testHarness.destroy_cookie(cookie);
-
-    protocol_binary_request_header *request;
-    int64_t adjusted_time2;
-    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
-                           NULL, 0, NULL, 0);
-    h1->unknown_command(h, NULL, request, add_response);
-    cb_free(request);
-    checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
-            "Expected Success");
-    checkeq(sizeof(int64_t), last_body.size(),
-            "Bodylen didn't match expected value");
-    memcpy(&adjusted_time2, last_body.data(), last_body.size());
-    adjusted_time2 = ntohll(adjusted_time2);
-
-    /**
-     * Check that adjusted_time2 is marginally greater than
-     * adjusted_time1.
-     */
-    check(adjusted_time2 >= adjusted_time1,
-            "Adjusted time after deletion: Not what is expected");
-
-    return SUCCESS;
-}
-
-
 static enum test_result test_dcp_replica_stream_backfill(ENGINE_HANDLE *h,
                                                          ENGINE_HANDLE_V1 *h1)
 {
@@ -5725,10 +5480,6 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("test producer stream request (partial)",
                  test_dcp_producer_stream_req_partial, test_setup, teardown,
                  "chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
-        TestCase("test producer stream request with time sync (partial)",
-                 test_dcp_producer_stream_req_partial_with_time_sync,
-                 test_setup, teardown, "chk_remover_stime=1;chk_max_items=100;"
-                 "time_synchronization=enabled_with_drift", prepare, cleanup),
         TestCase("test producer stream request (full)",
                  test_dcp_producer_stream_req_full, test_setup, teardown,
                  "chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
@@ -5868,16 +5619,8 @@ BaseTestCase testsuite_testcases[] = {
                  cleanup),
         TestCase("dcp consumer mutate", test_dcp_consumer_mutate, test_setup,
                  teardown, "dcp_enable_noop=false", prepare, cleanup),
-        TestCase("dcp consumer mutate with time sync",
-                 test_dcp_consumer_mutate_with_time_sync, test_setup, teardown,
-                 "dcp_enable_noop=false;time_synchronization=enabled_with_drift",
-                 prepare, cleanup),
         TestCase("dcp consumer delete", test_dcp_consumer_delete, test_setup,
                  teardown, "dcp_enable_noop=false", prepare, cleanup),
-        TestCase("dcp consumer delete with time sync",
-                 test_dcp_consumer_delete_with_time_sync, test_setup, teardown,
-                 "dcp_enable_noop=false;time_synchronization=enabled_with_drift",
-                 prepare, cleanup),
         TestCase("dcp failover log", test_failover_log_dcp, test_setup,
                  teardown, NULL, prepare, cleanup),
         TestCase("dcp persistence seqno", test_dcp_persistence_seqno, test_setup,
index 24ebdad..628c996 100644 (file)
@@ -404,8 +404,7 @@ static enum test_result test_delete_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1
     const void *cookie = testHarness.create_cookie();
 
     // delete an item with meta data
-    del_with_meta(h, h1, key1, keylen, 0, &itemMeta, 0, false, false,
-                  0, cookie);
+    del_with_meta(h, h1, key1, keylen, 0, &itemMeta, 0, false, false, cookie);
 
     check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
     check(last_seqno == high_seqno + 1, "Expected valid sequence number");
@@ -417,8 +416,7 @@ static enum test_result test_delete_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1
     testHarness.set_mutation_extras_handling(cookie, false);
 
     // delete an item with meta data
-    del_with_meta(h, h1, key2, keylen, 0, &itemMeta, 0, false, false,
-                  0, cookie);
+    del_with_meta(h, h1, key2, keylen, 0, &itemMeta, 0, false, false, cookie);
 
     check(last_uuid == vb_uuid, "Expected same vbucket uuid");
     check(last_seqno == high_seqno + 1, "Expected same sequence number");
@@ -829,7 +827,7 @@ static enum test_result test_set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
 
     // do set with meta with the correct cas value. should pass.
     set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set,
-                  false, 0, false, 0, cookie);
+                  false, 0, false, cookie);
     checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
     check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
     check(last_seqno == high_seqno + 1, "Expected valid sequence number");
@@ -851,7 +849,7 @@ static enum test_result test_set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
     itm_meta.revSeqno++;
     cas_for_set = last_meta.cas;
     set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set,
-                  false, 0, false, 0, cookie);
+                  false, 0, false, cookie);
     checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
     check(last_uuid == vb_uuid, "Expected same vbucket uuid");
     check(last_seqno == high_seqno + 1, "Expected same sequence number");
@@ -1379,14 +1377,14 @@ static enum test_result test_set_meta_lww_conflict_resolution(ENGINE_HANDLE *h,
           "Expect zero setMeta ops");
 
     set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
-                  PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
+                  PROTOCOL_BINARY_RAW_BYTES, false);
     checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
     checkeq(0, get_int_stat(h, h1, "ep_bg_meta_fetched"),
             "Expected no bg meta fetchs, thanks to bloom filters");
 
     // Check all meta data is the same
     set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
-                  PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
+                  PROTOCOL_BINARY_RAW_BYTES, false);
     checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
     checkeq(1, get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail"),
           "Expected set meta conflict resolution failure");
@@ -1394,7 +1392,7 @@ static enum test_result test_set_meta_lww_conflict_resolution(ENGINE_HANDLE *h,
     // Check that an older cas fails
     itemMeta.cas = 0xdeadbeee;
     set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
-                  PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
+                  PROTOCOL_BINARY_RAW_BYTES, false);
     checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
     checkeq(2, get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail"),
           "Expected set meta conflict resolution failure");
@@ -1402,7 +1400,7 @@ static enum test_result test_set_meta_lww_conflict_resolution(ENGINE_HANDLE *h,
     // Check that a higher cas passes
     itemMeta.cas = 0xdeadbeff;
     set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
-                  PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
+                  PROTOCOL_BINARY_RAW_BYTES, false);
     checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
 
     return SUCCESS;
@@ -1488,13 +1486,13 @@ static enum test_result test_del_meta_lww_conflict_resolution(ENGINE_HANDLE *h,
     itemMeta.exptime = 0;
     itemMeta.flags = 0xdeadbeef;
 
-    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true);
     checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
     wait_for_flusher_to_settle(h, h1);
     wait_for_stat_to_be(h, h1, "curr_items", 0);
 
     // Check all meta data is the same
-    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true);
     checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
     checkeq(1, get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail"),
           "Expected delete meta conflict resolution failure");
@@ -1502,7 +1500,7 @@ static enum test_result test_del_meta_lww_conflict_resolution(ENGINE_HANDLE *h,
     // Check that higher rev seqno but lower cas fails
     itemMeta.cas = info.cas;
     itemMeta.revSeqno = 11;
-    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true);
     checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
     checkeq(2, get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail"),
           "Expected delete meta conflict resolution failure");
@@ -1510,61 +1508,12 @@ static enum test_result test_del_meta_lww_conflict_resolution(ENGINE_HANDLE *h,
     // Check that a higher cas and lower rev seqno passes
     itemMeta.cas = info.cas + 2;
     itemMeta.revSeqno = 9;
-    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
+    del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true);
     checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected sucess");
 
     return SUCCESS;
 }
 
-static enum test_result test_adjusted_time_negative_tests(ENGINE_HANDLE *h,
-                                                          ENGINE_HANDLE_V1 *h1) {
-    protocol_binary_request_header *request;
-
-    /* GET_ADJUSTED_TIME with a non-existent vbucket: vbid 1 */
-    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 1, 0, NULL, 0,
-                           NULL, 0, NULL, 0);
-    h1->unknown_command(h, NULL, request, add_response);
-    cb_free(request);
-    checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, last_status.load(),
-            "Expected not my vbucket");
-
-    /* GET_ADJUSTED_TIME when time_synchronization is "disabled" */
-    request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
-                           NULL, 0, NULL, 0);
-    h1->unknown_command(h, NULL, request, add_response);
-    cb_free(request);
-    checkeq(PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, last_status.load(),
-            "Expected not supported response");
-
-    int64_t initialDriftCount = 1000;
-    uint8_t timeSync = 0x00;
-
-    int64_t driftCount = htonll(initialDriftCount);
-    uint8_t extlen = sizeof(driftCount) + sizeof(timeSync);
-    char *ext = new char[extlen];
-    memcpy(ext, (char *)&driftCount, sizeof(driftCount));
-    memcpy(ext + sizeof(driftCount), (char *)&timeSync, sizeof(timeSync));
-
-    /* SET_DRIFT_COUNTER_STATE with non-existent vbucket: vbid 1 */
-    request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
-                           1, 0, ext, extlen);
-    h1->unknown_command(h, NULL, request, add_response);
-    checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, last_status.load(),
-            "Expected not my vbucket");
-    cb_free(request);
-
-    /* SET_DRIFT_COUNTER_STATE when time_synchronization is "disabled" */
-    request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
-                           0, 0, ext, extlen);
-    h1->unknown_command(h, NULL, request, add_response);
-    checkeq(PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, last_status.load(),
-            "Expected not supported");
-    cb_free(request);
-    delete[] ext;
-
-    return SUCCESS;
-}
-
 static enum test_result test_getMeta_with_item_eviction(ENGINE_HANDLE *h,
                                                         ENGINE_HANDLE_V1 *h1)
 {
@@ -1599,8 +1548,7 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("get meta", test_get_meta, test_setup,
                  teardown, NULL, prepare, cleanup),
         TestCase("get meta with extras", test_get_meta_with_extras,
-                 test_setup, teardown,
-                 "time_synchronization=enabled_without_drift", prepare, cleanup),
+                 test_setup, teardown, NULL, prepare, cleanup),
         TestCase("get meta deleted", test_get_meta_deleted,
                  test_setup, teardown, NULL, prepare, cleanup),
         TestCase("get meta nonexistent", test_get_meta_nonexistent,
@@ -1667,10 +1615,6 @@ BaseTestCase testsuite_testcases[] = {
         TestCase("temp item deletion", test_temp_item_deletion,
                  test_setup, teardown,
                  "exp_pager_stime=1", prepare, cleanup),
-        TestCase("test getAdjustedTime, setDriftCounter apis negative tests",
-                 test_adjusted_time_negative_tests, test_setup, teardown,
-                 NULL, prepare, cleanup),
-
         TestCase("test get_meta with item_eviction",
                  test_getMeta_with_item_eviction, test_setup, teardown,
                  "item_eviction_policy=full_eviction", prepare, cleanup),
index 4a907db..fd298a0 100644 (file)
@@ -138,8 +138,7 @@ static std::unique_ptr<KVStore> setup_kv_store(KVStoreConfig& config) {
 
     StatsCallback sc;
     std::string failoverLog("");
-    vbucket_state state(vbucket_state_active, 0, 0, 0, 0, 0, 0, 0, 0,
-                        failoverLog);
+    vbucket_state state(vbucket_state_active, 0, 0, 0, 0, 0, 0, 0, failoverLog);
     kvstore->snapshotVBucket(0, state, &sc);
 
     return kvstore;
@@ -318,8 +317,7 @@ TEST(CouchKVStoreTest, MB_17517MaxCasOfMinus1) {
     std::string failoverLog("[]");
     vbucket_state state(vbucket_state_active, /*ckid*/0, /*maxDelSeqNum*/0,
                         /*highSeqno*/0, /*purgeSeqno*/0, /*lastSnapStart*/0,
-                        /*lastSnapEnd*/0, /*maxCas*/-1, /*driftCounter*/0,
-                        failoverLog);
+                        /*lastSnapEnd*/0, /*maxCas*/-1, failoverLog);
     EXPECT_TRUE(kvstore->snapshotVBucket(/*vbid*/0, state, nullptr));
     EXPECT_EQ(~0ull, kvstore->listPersistedVbuckets()[0]->maxCas);
 
@@ -461,7 +459,7 @@ public:
         kvstore.reset(new MockCouchKVStore(config));
         StatsCallback sc;
         std::string failoverLog("");
-        vbucket_state state(vbucket_state_active, 0, 0, 0, 0, 0, 0, 0, 0,
+        vbucket_state state(vbucket_state_active, 0, 0, 0, 0, 0, 0, 0,
                             failoverLog);
         kvstore->snapshotVBucket(0, state, &sc, true);
     }