},
"time_synchronization": {
"default": "disabled",
- "descr": "Time synchronization setting for XDCR Last Write Wins (LWW) conflict resolution",
+ "descr": "No longer supported. This config parameter has no effect.",
"type": "std::string",
"validator": {
"enum": [
| | | resolution to use |
| item_eviction_policy | string | Item eviction policy used by the item |
| | | pager (value_only or full_eviction) |
-| time_synchronization | string | Time synchronization setting for the bucket|
-| | | (disabled, enabled_without_drift, |
-| | | enabled_with_drift) |
std::string failovers("[{\"id\":0, \"seq\":0}]");
cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
- 0, 0, 0, INITIAL_DRIFT,
- failovers);
+ 0, 0, 0, failovers);
updateDbFileMap(vbucket, 1);
}
uint64_t lastSnapStart = 0;
uint64_t lastSnapEnd = 0;
uint64_t maxCas = 0;
- int64_t driftCounter = INITIAL_DRIFT;
DbInfo info;
errCode = couchstore_db_info(db, &info);
cJSON_GetObjectItem(jsonObj, "snap_end"));
const std::string maxCasValue = getJSONObjString(
cJSON_GetObjectItem(jsonObj, "max_cas"));
- const std::string driftCount = getJSONObjString(
- cJSON_GetObjectItem(jsonObj, "drift_counter"));
cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
|| max_deleted_seqno.compare("") == 0) {
}
}
- if (driftCount.compare("") != 0) {
- parseInt64(driftCount.c_str(), &driftCounter);
- }
-
if (failover_json) {
char* json = cJSON_PrintUnformatted(failover_json);
failovers.assign(json);
cachedVBStates[vbId] = new vbucket_state(state, checkpointId,
maxDeletedSeqno, highSeqno,
purgeSeqno, lastSnapStart,
- lastSnapEnd, maxCas, driftCounter,
- failovers);
+ lastSnapEnd, maxCas, failovers);
return couchErr2EngineErr(errCode);
}
<< ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
<< ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
<< ",\"max_cas\": \"" << vbState.maxCas << "\""
- << ",\"drift_counter\": \"" << vbState.driftCounter << "\""
<< "}";
LocalDoc lDoc;
bufferedBackfill.bytes.fetch_add(itm->size());
bufferedBackfill.items++;
- pushToReadyQ(new MutationResponse(itm, opaque_,
- prepareExtendedMetaData(itm->getVBucketId())));
+ pushToReadyQ(new MutationResponse(itm, opaque_, nullptr));
lastReadSeqno.store(itm->getBySeqno());
lh.unlock();
curChkSeqno = qi->getBySeqno();
lastReadSeqnoUnSnapshotted = qi->getBySeqno();
- mutations.push_back(new MutationResponse(qi, opaque_,
- prepareExtendedMetaData(qi->getVBucketId()),
+ mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
KEY_VALUE));
} else if (qi->getOperation() == queue_op_checkpoint_start) {
return lastSentSeqno.load();
}
-ExtendedMetaData* ActiveStream::prepareExtendedMetaData(uint16_t vBucketId)
-{
- ExtendedMetaData *emd = NULL;
- if (producer->isExtMetaDataEnabled()) {
- RCPtr<VBucket> vb = engine->getVBucket(vBucketId);
- if (vb && vb->getTimeSyncConfig() == time_sync_t::ENABLED_WITH_DRIFT) {
- int64_t adjustedTime = gethrtime() + vb->getDriftCounter();
- emd = new ExtendedMetaData(adjustedTime);
- }
- }
- return emd;
-}
-
const Logger& ActiveStream::getLogger() const
{
return producer->getLogger();
const char* getEndStreamStatusStr(end_stream_status_t status);
- ExtendedMetaData* prepareExtendedMetaData(uint16_t vBucketId);
-
bool isCurrentSnapshotCompleted() const;
/* Drop the cursor registered with the checkpoint manager.
abort();
}
- // Update drift counter for vbucket upon a success only
- if (ret == ENGINE_SUCCESS && emd) {
- vb->setDriftCounter(emd->getAdjustedTime());
- }
-
return ret;
}
vbucket_state vb_state(vb->getState(), chkId, 0,
vb->getHighSeqno(), vb->getPurgeSeqno(),
range.start, range.end, vb->getMaxCas(),
- vb->getDriftCounter() ,failovers);
+ failovers);
states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
}
return false;
vb->getPersistedSnapshot(range);
vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
vb->getPurgeSeqno(), range.start, range.end,
- vb->getMaxCas(), vb->getDriftCounter(),
- failovers);
+ vb->getMaxCas(), failovers);
KVStore *rwUnderlying = getRWUnderlying(vbid);
if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
newvb->createFilter(config.getBfilterKeyCount(),
config.getBfilterFpProb());
}
- const std::string& timeSyncConfig = config.getTimeSynchronization();
- newvb->setTimeSyncConfig(VBucket::convertStrToTimeSyncConfig(timeSyncConfig));
// The first checkpoint for active vbucket should start with id 2.
uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
}
}
- // Update drift counter for vbucket upon a success only
- if (ret == ENGINE_SUCCESS && emd) {
- vb->setDriftCounter(emd->getAdjustedTime());
- }
-
return ret;
}
ret = ENGINE_EWOULDBLOCK;
}
- // Update drift counter for vbucket upon a success only
- if (ret == ENGINE_SUCCESS && emd) {
- vb->setDriftCounter(emd->getAdjustedTime());
- }
-
return ret;
}
vbMap.getPersistenceCheckpointId(vbid),
maxDeletedRevSeqno, vb->getHighSeqno(),
vb->getPurgeSeqno(), range.start,
- range.end, maxCas, vb->getDriftCounter(),
- failovers);
+ range.end, maxCas, failovers);
if (rwUnderlying->snapshotVBucket(vb->getId(), vbState,
NULL, false) != true) {
reinterpret_cast<protocol_binary_request_get_keys*>
(request), response);
}
+ // MB-21143: Remove adjusted time/drift API, but return NOT_SUPPORTED
case PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME:
- {
- return h->getAdjustedTime(cookie,
- reinterpret_cast<protocol_binary_request_get_adjusted_time*>
- (request), response);
- }
case PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE:
{
- return h->setDriftCounterState(cookie,
- reinterpret_cast<protocol_binary_request_set_drift_counter_state*>
- (request), response);
+ return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
+ cookie);
}
}
return ret;
}
-ENGINE_ERROR_CODE EventuallyPersistentEngine::getAdjustedTime(
- const void *cookie,
- protocol_binary_request_get_adjusted_time *request,
- ADD_RESPONSE response) {
-
- uint16_t vbucket = ntohs(request->message.header.request.vbucket);
- RCPtr<VBucket> vb = getVBucket(vbucket);
- if (!vb) {
- return sendNotMyVBucketResponse(response, cookie, 0);
- }
- // Will return the vbucket's adjusted time, only if
- // time synchronization for the vbucket is enabled
- if (vb->getTimeSyncConfig() == time_sync_t::ENABLED_WITH_DRIFT) {
- int64_t adjusted_time = gethrtime() + vb->getDriftCounter();
- adjusted_time = htonll(adjusted_time);
- return sendResponse(response, NULL, 0, NULL, 0,
- (const void *)&adjusted_time, sizeof(int64_t),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
- } else {
- return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
- cookie);
- }
-}
-
-ENGINE_ERROR_CODE EventuallyPersistentEngine::setDriftCounterState(
- const void *cookie,
- protocol_binary_request_set_drift_counter_state *request,
- ADD_RESPONSE response) {
-
- uint16_t vbucket = ntohs(request->message.header.request.vbucket);
- RCPtr<VBucket> vb = getVBucket(vbucket);
- if (!vb) {
- return sendNotMyVBucketResponse(response, cookie, 0);
- }
-
- if (vb->getTimeSyncConfig() != time_sync_t::ENABLED_WITH_DRIFT) {
- return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0,
- cookie);
- }
-
- int64_t driftCount = ntohll(request->message.body.initial_drift);
- if (driftCount == INITIAL_DRIFT) {
- return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_EINVAL, 0,
- cookie);
- }
-
- set_drift_state_resp_t resp = vb->setDriftCounterState(driftCount);
- uint64_t uuid = htonll(resp.last_vb_uuid);
- int64_t seqno = htonll(resp.last_seqno);
-
- std::stringstream result;
- result.write((char*) &uuid, sizeof(uuid));
- result.write((char*) &seqno, sizeof(seqno));
-
- return sendResponse(response, NULL, 0, NULL, 0,
- result.str().data(), result.str().length(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_SUCCESS, 0 , cookie);
-}
-
ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(const void* cookie,
uint32_t opaque,
uint32_t seqno,
protocol_binary_request_get_keys *request,
ADD_RESPONSE response);
- ENGINE_ERROR_CODE getAdjustedTime(const void* cookie,
- protocol_binary_request_get_adjusted_time *request,
- ADD_RESPONSE response);
-
- ENGINE_ERROR_CODE setDriftCounterState(const void* cookie,
- protocol_binary_request_set_drift_counter_state *request,
- ADD_RESPONSE response);
-
/**
* Visit the objects and add them to the tap/dcp connecitons queue.
* @todo this code should honor the backfill time!
ExtendedMetaData::ExtendedMetaData(const void *meta, uint16_t nmeta) {
len = nmeta;
data = static_cast<const char*>(meta);
- adjustedTime = 0;
ret = ENGINE_SUCCESS;
memoryAllocated = false;
decodeMeta();
}
-ExtendedMetaData::ExtendedMetaData(int64_t adjusted_time) {
- len = 0;
- data = NULL;
- adjustedTime = adjusted_time;
- ret = ENGINE_SUCCESS;
- memoryAllocated = false;
- encodeMeta();
-}
-
ExtendedMetaData::~ExtendedMetaData() {
if (memoryAllocated) {
delete[] data;
}
switch (type) {
case CMD_META_ADJUSTED_TIME:
- memcpy(&adjustedTime, data + offset, length);
- adjustedTime = ntohll(adjustedTime);
- break;
+ // Ignoring adjusted_time
case CMD_META_CONFLICT_RES_MODE:
// MB-21143: Now ignoring conflict_res_mode
// 4.6 no longer sends, but older versions
ret = ENGINE_EINVAL;
}
}
-
-void ExtendedMetaData::encodeMeta() {
- uint8_t version = META_EXT_VERSION_ONE;
- uint8_t type;
- int64_t adjusted_time = htonll(adjustedTime);
- uint16_t length;
- uint16_t nmeta = sizeof(version) + sizeof(type) + sizeof(length) +
- sizeof(adjusted_time);
-
- char* meta = new char[nmeta];
- if (meta == NULL) {
- ret = ENGINE_ENOMEM;
- } else {
- memoryAllocated = true;
- uint32_t offset = 0;
-
- memcpy(meta, &version, sizeof(version));
- offset += sizeof(version);
-
- type = CMD_META_ADJUSTED_TIME;
- length = sizeof(adjusted_time);
- length = htons(length);
-
- memcpy(meta + offset, &type, sizeof(type));
- offset += sizeof(type);
-
- memcpy(meta + offset, &length, sizeof(length));
- offset += sizeof(length);
-
- memcpy(meta + offset, &adjusted_time, sizeof(adjusted_time));
-
- data = (const char*)meta;
- len = nmeta;
- }
-}
*/
class ExtendedMetaData {
public:
+ ExtendedMetaData()
+ : data(nullptr),
+ ret(ENGINE_SUCCESS),
+ len(0),
+ memoryAllocated(false) {}
+
ExtendedMetaData(const void *meta, uint16_t nmeta);
- ExtendedMetaData(int64_t adjusted_time);
~ExtendedMetaData();
ENGINE_ERROR_CODE getStatus() {
return ret;
}
- int64_t getAdjustedTime() {
- return adjustedTime;
- }
-
std::pair<const char*, uint16_t> getExtMeta() {
return std::make_pair(data, len);
}
private:
+ /*
+ void encodeMeta(); is currently removed as there's no extmeta to encode.
+ Resurrect from history as required.
+ */
void decodeMeta();
- void encodeMeta();
const char* data;
- int64_t adjustedTime;
ENGINE_ERROR_CODE ret;
uint16_t len;
bool memoryAllocated;
uint64_t lastSnapStart = 0;
uint64_t lastSnapEnd = 0;
uint64_t maxCas = 0;
- int64_t driftCounter = INITIAL_DRIFT;
fdb_kvs_info kvsInfo;
fdb_kvs_handle *kvsHandle = getKvsHandle(vbId, handleType::READER);
const std::string maxCasValue = getJSONObjString(
cJSON_GetObjectItem(jsonObj, "max_cas"));
- const std::string driftCount = getJSONObjString(
- cJSON_GetObjectItem(jsonObj, "drift_counter"));
-
cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
|| max_deleted_seqno.compare("") == 0) {
parseUint64(maxCasValue.c_str(), &maxCas);
}
- if (driftCount.compare("")) {
- parseInt64(driftCount.c_str(), &driftCounter);
- }
-
if (failover_json) {
char* json = cJSON_PrintUnformatted(failover_json);
failovers.assign(json);
cachedVBStates[vbId] = new vbucket_state(state, checkpointId,
maxDeletedSeqno, highSeqno, 0,
lastSnapStart, lastSnapEnd,
- maxCas, driftCounter,
- failovers);
+ maxCas, failovers);
fdb_doc_free(statDoc);
return forestErr2EngineErr(status);
}
std::string failovers("[{\"id\":0, \"seq\":0}]");
cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
- 0, 0, 0, INITIAL_DRIFT,
- failovers);
+ 0, 0, 0, failovers);
vbucket_state *state = cachedVBStates[vbucket];
std::string stateStr = state->toJSON();
vbState->lastSnapStart = newState.lastSnapStart;
vbState->lastSnapEnd = newState.lastSnapEnd;
vbState->maxCas = std::max(vbState->maxCas, newState.maxCas);
- vbState->driftCounter = newState.driftCounter;
} else {
cachedVBStates[vbid] = new vbucket_state(newState);
}
<< ",\"snap_start\": \"" << lastSnapStart << "\""
<< ",\"snap_end\": \"" << lastSnapEnd << "\""
<< ",\"max_cas\": \"" << maxCas << "\""
- << ",\"drift_counter\": \"" << driftCounter << "\""
<< "}";
return jsonState.str();
vbucket_state(vbucket_state_t _state, uint64_t _chkid,
uint64_t _maxDelSeqNum, int64_t _highSeqno,
uint64_t _purgeSeqno, uint64_t _lastSnapStart,
- uint64_t _lastSnapEnd, uint64_t _maxCas,
- uint64_t _driftCounter, std::string& _failovers) :
+ uint64_t _lastSnapEnd, uint64_t _maxCas, std::string& _failovers) :
state(_state), checkpointId(_chkid), maxDeletedSeqno(_maxDelSeqNum),
highSeqno(_highSeqno), purgeSeqno(_purgeSeqno),
lastSnapStart(_lastSnapStart), lastSnapEnd(_lastSnapEnd),
- maxCas(_maxCas), driftCounter(_driftCounter),failovers(_failovers) { }
+ maxCas(_maxCas), failovers(_failovers) { }
vbucket_state(const vbucket_state& vbstate) {
state = vbstate.state;
lastSnapStart = vbstate.lastSnapStart;
lastSnapEnd = vbstate.lastSnapEnd;
maxCas = vbstate.maxCas;
- driftCounter = vbstate.driftCounter;
}
std::string toJSON() const;
lastSnapStart = 0;
lastSnapEnd = 0;
maxCas = 0;
- driftCounter = INITIAL_DRIFT;
failovers.assign("[{\"id\":0, \"seq\":0}]");
}
uint64_t lastSnapStart;
uint64_t lastSnapEnd;
uint64_t maxCas;
- int64_t driftCounter;
std::string failovers;
};
}
uint64_t VBucket::nextHLCCas() {
- int64_t adjusted_time = gethrtime();
- uint64_t final_adjusted_time = 0;
-
- if (time_sync_config == time_sync_t::ENABLED_WITH_DRIFT) {
- adjusted_time += drift_counter;
- }
-
- if (adjusted_time < 0) {
- LOG(EXTENSION_LOG_WARNING,
- "Adjusted time is negative: %" PRId64 "\n", adjusted_time);
- }
-
- final_adjusted_time = ((uint64_t)adjusted_time) & ~((1 << 16) - 1);
+ hrtime_t now = gethrtime();
+ uint64_t now48bits = ((uint64_t)now) & ~((1 << 16) - 1);
uint64_t local_max_cas = max_cas.load();
- if (final_adjusted_time > local_max_cas) {
- atomic_setIfBigger(max_cas, final_adjusted_time);
- return final_adjusted_time;
+ if (now48bits > local_max_cas) {
+ atomic_setIfBigger(max_cas, now48bits);
+ return now48bits;
}
atomic_setIfBigger(max_cas, local_max_cas + 1);
addStat("bloom_filter_size", getFilterSize(), add_stat, c);
addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
addStat("max_cas", getMaxCas(), add_stat, c);
- addStat("drift_counter", getDriftCounter(), add_stat, c);
- addStat("time_sync", isTimeSyncEnabled() ? "enabled" : "disabled",
- add_stat, c);
addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
}
}
#include <queue>
-typedef struct {
- uint64_t last_vb_uuid;
- int64_t last_seqno;
-} set_drift_state_resp_t;
-
class BgFetcher;
const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
class KVShard;
/**
- * Indicates the possible time synchronization settings
- * for the vbucket
- */
-
-enum class time_sync_t {
- DISABLED, //No time synchronization.
- ENABLED_WITHOUT_DRIFT, //Time synchronization but no usage of drift counter
- ENABLED_WITH_DRIFT //Time synchronization with usage of drift counter
-};
-
-/**
* An individual vbucket.
*/
class VBucket : public RCValue {
std::shared_ptr<Callback<id_type> > cb,
vbucket_state_t initState = vbucket_state_dead,
uint64_t chkId = 1, uint64_t purgeSeqno = 0,
- uint64_t maxCas = 0, int64_t driftCounter = INITIAL_DRIFT):
+ uint64_t maxCas = 0):
ht(st),
checkpointManager(st, i, chkConfig, lastSeqno, lastSnapStart,
lastSnapEnd, cb, chkId),
stats(st),
purge_seqno(purgeSeqno),
max_cas(maxCas),
- drift_counter(driftCounter),
- time_sync_config(time_sync_t::DISABLED),
takeover_backed_up(false),
persisted_snapshot_start(lastSnapStart),
persisted_snapshot_end(lastSnapEnd),
return max_cas;
}
- bool isTimeSyncEnabled() {
- if (time_sync_config == time_sync_t::ENABLED_WITHOUT_DRIFT ||
- time_sync_config == time_sync_t::ENABLED_WITH_DRIFT) {
- return true;
- }
-
- return false;
- }
-
- time_sync_t getTimeSyncConfig() {
- return time_sync_config;
- }
-
- void setTimeSyncConfig(time_sync_t timeSyncConfig) {
- time_sync_config.store(timeSyncConfig);
- }
-
void setMaxCas(uint64_t cas) {
atomic_setIfBigger(max_cas, cas);
}
- /**
- * To set drift counter's initial value
- *
- * Returns last_vbuuid and last_seqno of vbucket (atomically)
- */
- set_drift_state_resp_t setDriftCounterState(int64_t initial_drift) {
- drift_counter = initial_drift;
-
- // Get vbucket uuid from the failover table, and then get
- // the vbucket high seqno, return these 2 values as long as
- // the uuid did not change after getting the high seqno.
- uint64_t last_vbuuid = 0;
- int64_t last_seqno = 0;
- do {
- last_vbuuid = failovers->getLatestUUID();
- last_seqno = getHighSeqno();
- } while (failovers->getLatestUUID() != last_vbuuid);
- set_drift_state_resp_t resp;
- resp.last_vb_uuid = last_vbuuid;
- resp.last_seqno = last_seqno;
- return resp;
- }
-
- int64_t getDriftCounter() {
- return drift_counter;
- }
-
- void setDriftCounter(int64_t adjustedTime) {
- // Update drift counter only if timeSync is enabled for
- // the vbucket.
- if (time_sync_config == time_sync_t::ENABLED_WITH_DRIFT) {
- int64_t wallTime = gethrtime();
- if ((wallTime + getDriftCounter()) < adjustedTime) {
- drift_counter = (adjustedTime - wallTime);
- }
- }
- }
-
bool isTakeoverBackedUp() {
return takeover_backed_up.load();
}
}
}
- static time_sync_t convertStrToTimeSyncConfig(const std::string& timeSyncConfig) {
- if (timeSyncConfig == "enabled_without_drift") {
- return time_sync_t::ENABLED_WITHOUT_DRIFT;
- } else if (timeSyncConfig == "enabled_with_drift") {
- return time_sync_t::ENABLED_WITH_DRIFT;
- }
-
- return time_sync_t::DISABLED;
- }
-
void addHighPriorityVBEntry(uint64_t id, const void *cookie,
bool isBySeqno);
void notifyOnPersistence(EventuallyPersistentEngine &e,
uint64_t purge_seqno;
AtomicValue<uint64_t> max_cas;
- AtomicValue<int64_t> drift_counter;
- AtomicValue<time_sync_t> time_sync_config;
-
AtomicValue<bool> takeover_backed_up;
Mutex pendingBGFetchesLock;
void Warmup::createVBuckets(uint16_t shardId) {
size_t maxEntries = store.getEPEngine().getMaxFailoverEntries();
std::map<uint16_t, vbucket_state>& vbStates = shardVbStates[shardId];
- Configuration& config = store.getEPEngine().getConfiguration();
- time_sync_t timeSyncConfig =
- VBucket::convertStrToTimeSyncConfig(config.getTimeSynchronization());
std::map<uint16_t, vbucket_state>::iterator itr;
for (itr = vbStates.begin(); itr != vbStates.end(); ++itr) {
store.getEPEngine().getCheckpointConfig(),
shard, vbs.highSeqno, vbs.lastSnapStart,
vbs.lastSnapEnd, table, cb, vbs.state, 1,
- vbs.purgeSeqno, vbs.maxCas,
- vbs.driftCounter));
-
- vb->setTimeSyncConfig(timeSyncConfig);
+ vbs.purgeSeqno, vbs.maxCas));
if(vbs.state == vbucket_state_active && !cleanShutdown) {
if (static_cast<uint64_t>(vbs.highSeqno) == vbs.lastSnapEnd) {
return req;
}
-void set_drift_counter_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
- int64_t initialDriftCount) {
-
- protocol_binary_request_header *request;
-
- int64_t driftCount = htonll(initialDriftCount);
- uint8_t timeSync = 0x00;
- uint8_t extlen = sizeof(driftCount) + sizeof(timeSync);
- char *ext = new char[extlen];
- memcpy(ext, (char *)&driftCount, sizeof(driftCount));
- memcpy(ext + sizeof(driftCount), (char *)&timeSync, sizeof(timeSync));
-
- request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
- 0, 0, ext, extlen);
- h1->unknown_command(h, NULL, request, add_response);
- check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
- "Expected success for CMD_SET_DRIFT_COUNTER_STATE");
- cb_free(request);
- delete[] ext;
-}
-
void add_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
const size_t keylen, const char *val, const size_t vallen,
const uint32_t vb, ItemMetaData *itemMeta,
bool skipConflictResolution, uint8_t datatype,
- bool includeExtMeta, int64_t adjustedTime) {
+ bool includeExtMeta) {
int blen = 0;
char *ext;
ExtendedMetaData *emd = NULL;
blen = 26;
ext = new char[blen];
encodeWithMetaExt(ext, itemMeta);
- emd = new ExtendedMetaData(adjustedTime);
+ emd = new ExtendedMetaData();
// nmeta added to ext below
}
const size_t keylen, const uint32_t vb,
ItemMetaData *itemMeta, uint64_t cas_for_delete,
bool skipConflictResolution, bool includeExtMeta,
- int64_t adjustedTime, const void *cookie) {
+ const void *cookie) {
int blen = 0;
char *ext;
ExtendedMetaData *emd = NULL;
blen = 26;
ext = new char[blen];
encodeWithMetaExt(ext, itemMeta);
- emd = new ExtendedMetaData(adjustedTime);
+ emd = new ExtendedMetaData();
// nmeta added to ext below
}
const size_t keylen, const char *val, const size_t vallen,
const uint32_t vb, ItemMetaData *itemMeta,
uint64_t cas_for_set, bool skipConflictResolution,
- uint8_t datatype, bool includeExtMeta,
- int64_t adjustedTime, const void *cookie) {
+ uint8_t datatype, bool includeExtMeta, const void *cookie) {
int blen = 0;
char *ext;
ExtendedMetaData *emd = NULL;
blen = 26;
ext = new char[blen];
encodeWithMetaExt(ext, itemMeta);
- emd = new ExtendedMetaData(adjustedTime);
+ emd = new ExtendedMetaData();
// nmeta added to ext below
}
const size_t keylen, const char *val, const size_t vallen,
const uint32_t vb, ItemMetaData *itemMeta,
bool skipConflictResolution = false,
- uint8_t datatype = 0x00, bool includeExtMeta = false,
- int64_t adjusted_time = 0);
+ uint8_t datatype = 0x00, bool includeExtMeta = false);
bool get_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
bool reqExtMeta = false);
void del_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
const size_t keylen, const uint32_t vb,
ItemMetaData *itemMeta, uint64_t cas_for_delete = 0,
bool skipConflictResolution = false,
- bool includeExtMeta = false,
- int64_t adjustedTime = 0, const void *cookie = NULL);
+ bool includeExtMeta = false, const void *cookie = NULL);
void set_with_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
const size_t keylen, const char *val, const size_t vallen,
const uint32_t vb, ItemMetaData *itemMeta,
uint64_t cas_for_set, bool skipConflictResolution = false,
uint8_t datatype = 0x00, bool includeExtMeta = false,
- int64_t adjustedTime = 0, const void *cookie = NULL);
+ const void *cookie = NULL);
void return_meta(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *key,
const size_t keylen, const char *val, const size_t vallen,
const uint32_t vb, const uint64_t cas, const uint32_t flags,
itm_meta.exptime = info.exptime;
itm_meta.flags = info.flags;
set_with_meta(h, h1, key1, strlen(key1), val1, strlen(val1), 0, &itm_meta,
- last_cas, false, info.datatype, false, 0, cookie);
+ last_cas, false, info.datatype, false, cookie);
checkeq(ENGINE_SUCCESS,
h1->get(h, cookie, &itm, key1, strlen(key1), 0),
//SET_WITH_META
set_with_meta(h, h1, key, strlen(key), val, strlen(val), 0, &itm_meta,
- 0, false, datatype, false, 0, cookie);
+ 0, false, datatype, false, cookie);
checkeq(ENGINE_SUCCESS,
h1->get(h, cookie, &itm, key, strlen(key), 0),
memset(&info, 0, sizeof(info));
- //Set a really large drift value
- set_drift_counter_state(h, h1, 100000);
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_ADD, key, "data1", &i, 0, 0),
"Failed to store an item");
check(curr_cas > prev_cas, "CAS is not monotonically increasing");
prev_cas = curr_cas;
- //set a lesser drift and ensure that the CAS is monotonically
- //increasing
- set_drift_counter_state(h, h1, 100);
-
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, key, "data2", &i, 0, 0),
"Failed to store an item");
check(curr_cas > prev_cas, "CAS is not monotonically increasing");
prev_cas = curr_cas;
- //ensure that the adjusted time will be negative
- int64_t drift_counter = (-1) * (gethrtime() * 2);
- set_drift_counter_state(h, h1, drift_counter);
-
- protocol_binary_request_header *request;
- int64_t adjusted_time;
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
- "Expected Success");
- checkeq(sizeof(int64_t), last_body.size(),
- "Bodylen didn't match expected value");
- memcpy(&adjusted_time, last_body.data(), last_body.size());
- adjusted_time = ntohll(adjusted_time);
- std::string err_msg("Adjusted time " + std::to_string(adjusted_time) +
- " is supposed to have been negative!");
- check(adjusted_time < 0, err_msg.c_str());
-
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_REPLACE, key, "data3", &i, 0, 0),
"Failed to store an item");
check(curr_cas > prev_cas, "CAS is not monotonically increasing");
prev_cas = curr_cas;
- //Set the drift value to 0
- set_drift_counter_state(h, h1, 0);
-
getl(h, h1, key, 0, 10);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
"Expected to be able to getl on first try");
"vb_0:bloom_filter_size",
"vb_0:db_data_size",
"vb_0:db_file_size",
- "vb_0:drift_counter",
"vb_0:high_seqno",
"vb_0:ht_cache_size",
"vb_0:ht_item_memory",
"vb_0:queue_memory",
"vb_0:queue_size",
"vb_0:rollback_item_count",
- "vb_0:time_sync",
"vb_0:uuid"
}
},
TestCase("test failover log behavior", test_failover_log_behavior,
test_setup, teardown, NULL, prepare, cleanup),
TestCase("test hlc cas", test_hlc_cas, test_setup, teardown,
- "time_synchronization=enabled_with_drift", prepare, cleanup),
+ NULL, prepare, cleanup),
TestCaseV2("multi_bucket set/get ", test_multi_bucket_set_get, NULL,
teardown_v2, NULL, prepare, cleanup),
exp_markers(0),
extra_takeover_ops(0),
exp_disk_snapshot(false),
- time_sync_enabled(false),
exp_conflict_res(0),
skip_estimate_check(false),
live_frontend_client(false),
size_t extra_takeover_ops;
/* Flag - expect disk snapshot or not */
bool exp_disk_snapshot;
- /* Flag - indicating time sync status */
- bool time_sync_enabled;
/* Expected conflict resolution flag */
uint8_t exp_conflict_res;
/* Skip estimate check during takeover */
last_by_seqno(0),
extra_takeover_ops(0),
exp_disk_snapshot(false),
- time_sync_enabled(false),
exp_conflict_res(0) { }
size_t num_mutations;
uint64_t last_by_seqno;
size_t extra_takeover_ops;
bool exp_disk_snapshot;
- bool time_sync_enabled;
uint8_t exp_conflict_res;
};
PROTOCOL_BINARY_RESPONSE_SUCCESS,
dcp_last_opaque);
}
- if (stats.time_sync_enabled) {
- checkeq(static_cast<size_t>(16), dcp_last_meta.size(),
- "Expected extended meta in mutation packet");
- }
break;
case PROTOCOL_BINARY_CMD_DCP_DELETION:
PROTOCOL_BINARY_RESPONSE_SUCCESS,
dcp_last_opaque);
}
- if (stats.time_sync_enabled) {
- checkeq(static_cast<size_t>(16),
- dcp_last_meta.size(),
- "Expected adjusted time in mutation packet");
- }
break;
case PROTOCOL_BINARY_CMD_DCP_STREAM_END:
VBStats stats;
stats.extra_takeover_ops = ctx.extra_takeover_ops;
stats.exp_disk_snapshot = ctx.exp_disk_snapshot;
- stats.time_sync_enabled = ctx.time_sync_enabled;
stats.exp_conflict_res = ctx.exp_conflict_res;
vb_stats[ctx.vbucket] = stats;
return SUCCESS;
}
-static enum test_result test_dcp_producer_stream_req_partial_with_time_sync(
- ENGINE_HANDLE *h,
- ENGINE_HANDLE_V1 *h1) {
- /*
- * temporarily skip this testcase to prevent CV regr run failures
- * till fix for it will be implemented and committed (MB-18669)
- */
- return SKIPPED;
-
- set_drift_counter_state(h, h1, /* initial drift */1000);
-
- const int num_items = 200;
- write_items(h, h1, num_items);
-
- wait_for_flusher_to_settle(h, h1);
- stop_persistence(h, h1);
-
- for (int j = 0; j < (num_items / 2); ++j) {
- std::stringstream ss;
- ss << "key" << j;
- checkeq(ENGINE_SUCCESS,
- del(h, h1, ss.str().c_str(), 0, 0),
- "Expected delete to succeed");
- }
-
- wait_for_stat_to_be(h, h1, "vb_0:num_checkpoints", 2, "checkpoint");
-
- const void *cookie = testHarness.create_cookie();
-
- DcpStreamCtx ctx;
- ctx.vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
- ctx.seqno = {95, 209};
- ctx.snapshot = {95, 95};
- ctx.exp_mutations = 105;
- ctx.exp_deletions = 100;
- ctx.exp_markers = 2;
- ctx.time_sync_enabled = true;
- ctx.exp_conflict_res = 1;
-
- TestDcpConsumer tdc("unittest", cookie);
- tdc.addStreamCtx(ctx);
- tdc.run(h, h1);
-
- testHarness.destroy_cookie(cookie);
-
- return SUCCESS;
-}
-
static enum test_result test_dcp_producer_stream_req_full(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
const int num_items = 300, batch_items = 100;
return SUCCESS;
}
-static enum test_result test_dcp_consumer_mutate_with_time_sync(
- ENGINE_HANDLE *h,
- ENGINE_HANDLE_V1 *h1) {
-
- check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
- "Failed to set vbucket state.");
-
- set_drift_counter_state(h, h1, /* initial_drift */1000);
-
- const void *cookie = testHarness.create_cookie();
- uint32_t opaque = 0xFFFF0000;
- uint32_t seqno = 0;
- uint32_t flags = 0;
- const char *name = "unittest";
- uint16_t nname = strlen(name);
-
- // Open an DCP connection
- checkeq(ENGINE_SUCCESS,
- h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
- "Failed dcp producer open connection.");
-
- std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
- checkeq(0, type.compare("consumer"), "Consumer not found");
-
- opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
- PROTOCOL_BINARY_RESPONSE_SUCCESS);
-
- uint32_t dataLen = 100;
- char *data = static_cast<char *>(cb_malloc(dataLen));
- memset(data, 'x', dataLen);
-
- uint8_t cas = 0x1;
- uint16_t vbucket = 0;
- uint8_t datatype = 1;
- uint64_t bySeqno = 10;
- uint64_t revSeqno = 0;
- uint32_t exprtime = 0;
- uint32_t lockTime = 0;
-
- checkeq(ENGINE_SUCCESS,
- h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1),
- "Failed to send snapshot marker");
-
- // Consume a DCP mutation with extended meta
- int64_t adjusted_time1 = gethrtime() * 2;
- ExtendedMetaData *emd = new ExtendedMetaData(adjusted_time1);
- cb_assert(emd && emd->getStatus() == ENGINE_SUCCESS);
- std::pair<const char*, uint16_t> meta = emd->getExtMeta();
- checkeq(ENGINE_SUCCESS,
- h1->dcp.mutation(h, cookie, opaque, "key", 3, data, dataLen, cas,
- vbucket, flags, datatype,
- bySeqno, revSeqno, exprtime,
- lockTime, meta.first, meta.second, 0),
- "Failed dcp mutate.");
- delete emd;
-
- wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0, "dcp");
-
- checkeq(ENGINE_SUCCESS,
- h1->dcp.close_stream(h, cookie, opaque, 0),
- "Expected success");
-
- check(set_vbucket_state(h, h1, 0, vbucket_state_active),
- "Failed to set vbucket state.");
- wait_for_flusher_to_settle(h, h1);
-
- check_key_value(h, h1, "key", data, dataLen);
-
- testHarness.destroy_cookie(cookie);
- cb_free(data);
-
- protocol_binary_request_header *request;
- int64_t adjusted_time2;
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
- "Expected Success");
- checkeq(last_body.size(), sizeof(int64_t),
- "Bodylen didn't match expected value");
- memcpy(&adjusted_time2, last_body.data(), last_body.size());
- adjusted_time2 = ntohll(adjusted_time2);
-
- /**
- * Check that adjusted_time2 is marginally greater than
- * adjusted_time1.
- */
- check(adjusted_time2 >= adjusted_time1,
- "Adjusted time after mutation: Not what is expected");
-
- return SUCCESS;
-}
-
static enum test_result test_dcp_consumer_delete(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
// Store an item
item *i = NULL;
return SUCCESS;
}
-static enum test_result test_dcp_consumer_delete_with_time_sync(
- ENGINE_HANDLE *h,
- ENGINE_HANDLE_V1 *h1) {
-
- //Set drift value
- set_drift_counter_state(h, h1, /* initial drift */1000);
-
- // Store an item
- item *i = NULL;
- checkeq(ENGINE_SUCCESS,
- store(h, h1, NULL, OPERATION_ADD,"key", "value", &i),
- "Failed to fail to store an item.");
- h1->release(h, NULL, i);
- verify_curr_items(h, h1, 1, "one item stored");
-
- wait_for_flusher_to_settle(h, h1);
-
- check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
- "Failed to set vbucket state.");
-
- const void *cookie = testHarness.create_cookie();
- uint32_t opaque = 0;
- uint8_t cas = 0x1;
- uint16_t vbucket = 0;
- uint32_t flags = 0;
- uint64_t bySeqno = 10;
- uint64_t revSeqno = 0;
- const char *name = "unittest";
- uint16_t nname = strlen(name);
- uint32_t seqno = 0;
-
- // Open an DCP connection
- checkeq(ENGINE_SUCCESS,
- h1->dcp.open(h, cookie, opaque, seqno, flags, (void*)name, nname),
- "Failed dcp producer open connection.");
-
- std::string type = get_str_stat(h, h1, "eq_dcpq:unittest:type", "dcp");
- checkeq(0, type.compare("consumer"), "Consumer not found");
-
- opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
- PROTOCOL_BINARY_RESPONSE_SUCCESS);
-
- checkeq(ENGINE_SUCCESS,
- h1->dcp.snapshot_marker(h, cookie, opaque, 0, 10, 10, 1),
- "Failed to send snapshot marker");
-
- // Consume an DCP deletion
- int64_t adjusted_time1 = gethrtime() * 2;
- ExtendedMetaData *emd = new ExtendedMetaData(adjusted_time1);
- cb_assert(emd && emd->getStatus() == ENGINE_SUCCESS);
- std::pair<const char*, uint16_t> meta = emd->getExtMeta();
- checkeq(ENGINE_SUCCESS,
- h1->dcp.deletion(h, cookie, opaque, "key", 3, cas, vbucket,
- bySeqno, revSeqno, meta.first, meta.second),
- "Failed dcp delete.");
- delete emd;
-
- wait_for_stat_to_be(h, h1, "eq_dcpq:unittest:stream_0_buffer_items", 0,
- "dcp");
-
- wait_for_stat_change(h, h1, "curr_items", 1);
- verify_curr_items(h, h1, 0, "one item deleted");
- testHarness.destroy_cookie(cookie);
-
- protocol_binary_request_header *request;
- int64_t adjusted_time2;
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
- "Expected Success");
- checkeq(sizeof(int64_t), last_body.size(),
- "Bodylen didn't match expected value");
- memcpy(&adjusted_time2, last_body.data(), last_body.size());
- adjusted_time2 = ntohll(adjusted_time2);
-
- /**
- * Check that adjusted_time2 is marginally greater than
- * adjusted_time1.
- */
- check(adjusted_time2 >= adjusted_time1,
- "Adjusted time after deletion: Not what is expected");
-
- return SUCCESS;
-}
-
-
static enum test_result test_dcp_replica_stream_backfill(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1)
{
TestCase("test producer stream request (partial)",
test_dcp_producer_stream_req_partial, test_setup, teardown,
"chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
- TestCase("test producer stream request with time sync (partial)",
- test_dcp_producer_stream_req_partial_with_time_sync,
- test_setup, teardown, "chk_remover_stime=1;chk_max_items=100;"
- "time_synchronization=enabled_with_drift", prepare, cleanup),
TestCase("test producer stream request (full)",
test_dcp_producer_stream_req_full, test_setup, teardown,
"chk_remover_stime=1;chk_max_items=100", prepare, cleanup),
cleanup),
TestCase("dcp consumer mutate", test_dcp_consumer_mutate, test_setup,
teardown, "dcp_enable_noop=false", prepare, cleanup),
- TestCase("dcp consumer mutate with time sync",
- test_dcp_consumer_mutate_with_time_sync, test_setup, teardown,
- "dcp_enable_noop=false;time_synchronization=enabled_with_drift",
- prepare, cleanup),
TestCase("dcp consumer delete", test_dcp_consumer_delete, test_setup,
teardown, "dcp_enable_noop=false", prepare, cleanup),
- TestCase("dcp consumer delete with time sync",
- test_dcp_consumer_delete_with_time_sync, test_setup, teardown,
- "dcp_enable_noop=false;time_synchronization=enabled_with_drift",
- prepare, cleanup),
TestCase("dcp failover log", test_failover_log_dcp, test_setup,
teardown, NULL, prepare, cleanup),
TestCase("dcp persistence seqno", test_dcp_persistence_seqno, test_setup,
const void *cookie = testHarness.create_cookie();
// delete an item with meta data
- del_with_meta(h, h1, key1, keylen, 0, &itemMeta, 0, false, false,
- 0, cookie);
+ del_with_meta(h, h1, key1, keylen, 0, &itemMeta, 0, false, false, cookie);
check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
check(last_seqno == high_seqno + 1, "Expected valid sequence number");
testHarness.set_mutation_extras_handling(cookie, false);
// delete an item with meta data
- del_with_meta(h, h1, key2, keylen, 0, &itemMeta, 0, false, false,
- 0, cookie);
+ del_with_meta(h, h1, key2, keylen, 0, &itemMeta, 0, false, false, cookie);
check(last_uuid == vb_uuid, "Expected same vbucket uuid");
check(last_seqno == high_seqno + 1, "Expected same sequence number");
// do set with meta with the correct cas value. should pass.
set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set,
- false, 0, false, 0, cookie);
+ false, 0, false, cookie);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
check(last_seqno == high_seqno + 1, "Expected valid sequence number");
itm_meta.revSeqno++;
cas_for_set = last_meta.cas;
set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set,
- false, 0, false, 0, cookie);
+ false, 0, false, cookie);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
check(last_uuid == vb_uuid, "Expected same vbucket uuid");
check(last_seqno == high_seqno + 1, "Expected same sequence number");
"Expect zero setMeta ops");
set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
+ PROTOCOL_BINARY_RAW_BYTES, false);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
checkeq(0, get_int_stat(h, h1, "ep_bg_meta_fetched"),
"Expected no bg meta fetchs, thanks to bloom filters");
// Check all meta data is the same
set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
+ PROTOCOL_BINARY_RAW_BYTES, false);
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
checkeq(1, get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail"),
"Expected set meta conflict resolution failure");
// Check that an older cas fails
itemMeta.cas = 0xdeadbeee;
set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
+ PROTOCOL_BINARY_RAW_BYTES, false);
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
checkeq(2, get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail"),
"Expected set meta conflict resolution failure");
// Check that a higher cas passes
itemMeta.cas = 0xdeadbeff;
set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
+ PROTOCOL_BINARY_RAW_BYTES, false);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
return SUCCESS;
itemMeta.exptime = 0;
itemMeta.flags = 0xdeadbeef;
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
+ del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
wait_for_flusher_to_settle(h, h1);
wait_for_stat_to_be(h, h1, "curr_items", 0);
// Check all meta data is the same
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
+ del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true);
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
checkeq(1, get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail"),
"Expected delete meta conflict resolution failure");
// Check that higher rev seqno but lower cas fails
itemMeta.cas = info.cas;
itemMeta.revSeqno = 11;
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
+ del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true);
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
checkeq(2, get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail"),
"Expected delete meta conflict resolution failure");
// Check that a higher cas and lower rev seqno passes
itemMeta.cas = info.cas + 2;
itemMeta.revSeqno = 9;
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
+ del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected sucess");
return SUCCESS;
}
-static enum test_result test_adjusted_time_negative_tests(ENGINE_HANDLE *h,
- ENGINE_HANDLE_V1 *h1) {
- protocol_binary_request_header *request;
-
- /* GET_ADJUSTED_TIME with a non-existent vbucket: vbid 1 */
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 1, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, last_status.load(),
- "Expected not my vbucket");
-
- /* GET_ADJUSTED_TIME when time_synchronization is "disabled" */
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, last_status.load(),
- "Expected not supported response");
-
- int64_t initialDriftCount = 1000;
- uint8_t timeSync = 0x00;
-
- int64_t driftCount = htonll(initialDriftCount);
- uint8_t extlen = sizeof(driftCount) + sizeof(timeSync);
- char *ext = new char[extlen];
- memcpy(ext, (char *)&driftCount, sizeof(driftCount));
- memcpy(ext + sizeof(driftCount), (char *)&timeSync, sizeof(timeSync));
-
- /* SET_DRIFT_COUNTER_STATE with non-existent vbucket: vbid 1 */
- request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
- 1, 0, ext, extlen);
- h1->unknown_command(h, NULL, request, add_response);
- checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, last_status.load(),
- "Expected not my vbucket");
- cb_free(request);
-
- /* SET_DRIFT_COUNTER_STATE when time_synchronization is "disabled" */
- request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
- 0, 0, ext, extlen);
- h1->unknown_command(h, NULL, request, add_response);
- checkeq(PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, last_status.load(),
- "Expected not supported");
- cb_free(request);
- delete[] ext;
-
- return SUCCESS;
-}
-
static enum test_result test_getMeta_with_item_eviction(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1)
{
TestCase("get meta", test_get_meta, test_setup,
teardown, NULL, prepare, cleanup),
TestCase("get meta with extras", test_get_meta_with_extras,
- test_setup, teardown,
- "time_synchronization=enabled_without_drift", prepare, cleanup),
+ test_setup, teardown, NULL, prepare, cleanup),
TestCase("get meta deleted", test_get_meta_deleted,
test_setup, teardown, NULL, prepare, cleanup),
TestCase("get meta nonexistent", test_get_meta_nonexistent,
TestCase("temp item deletion", test_temp_item_deletion,
test_setup, teardown,
"exp_pager_stime=1", prepare, cleanup),
- TestCase("test getAdjustedTime, setDriftCounter apis negative tests",
- test_adjusted_time_negative_tests, test_setup, teardown,
- NULL, prepare, cleanup),
-
TestCase("test get_meta with item_eviction",
test_getMeta_with_item_eviction, test_setup, teardown,
"item_eviction_policy=full_eviction", prepare, cleanup),
StatsCallback sc;
std::string failoverLog("");
- vbucket_state state(vbucket_state_active, 0, 0, 0, 0, 0, 0, 0, 0,
- failoverLog);
+ vbucket_state state(vbucket_state_active, 0, 0, 0, 0, 0, 0, 0, failoverLog);
kvstore->snapshotVBucket(0, state, &sc);
return kvstore;
std::string failoverLog("[]");
vbucket_state state(vbucket_state_active, /*ckid*/0, /*maxDelSeqNum*/0,
/*highSeqno*/0, /*purgeSeqno*/0, /*lastSnapStart*/0,
- /*lastSnapEnd*/0, /*maxCas*/-1, /*driftCounter*/0,
- failoverLog);
+ /*lastSnapEnd*/0, /*maxCas*/-1, failoverLog);
EXPECT_TRUE(kvstore->snapshotVBucket(/*vbid*/0, state, nullptr));
EXPECT_EQ(~0ull, kvstore->listPersistedVbuckets()[0]->maxCas);
kvstore.reset(new MockCouchKVStore(config));
StatsCallback sc;
std::string failoverLog("");
- vbucket_state state(vbucket_state_active, 0, 0, 0, 0, 0, 0, 0, 0,
+ vbucket_state state(vbucket_state_active, 0, 0, 0, 0, 0, 0, 0,
failoverLog);
kvstore->snapshotVBucket(0, state, &sc, true);
}