#include "conflict_resolution.h"
#include "stored-value.h"
-static const char * getConflictResModeStr(enum conflict_resolution_mode confResMode) {
- switch (confResMode) {
- case revision_seqno:
- return "revision_seqno";
- case last_write_wins:
- return "last_write_wins";
- default:
- return "unknown";
- }
-}
-
/**
* A conflict resolution strategy that compares the meta data for a document
- * from a remote node and this node. This conflict resolution works by picking
+ * from a remote node and this node. The conflict strategy works by picking
* a winning document based on comparing meta data fields and finding a field
- * that has a larger value than the other document's fields. The fields are
- * compared in the following order: cas, rev seqno, expiration, flags. The cas
- * is chosen as the first field of comparison if the document's conflict
- * resolution is set last_write_wins. The last_write_wins indicates that the cas
- * generated for the document was using a Hybrid Logical Clock (HLC). If all
+ * that has a larger value than the other documents field. The fields are
+ * compared in the following order: rev seqno, cas, expiration, flags. If all
* fields are equal than the local document is chosen as the winner.
*/
-bool ConflictResolution::resolve_lww(StoredValue *v,
- const ItemMetaData &meta,
- bool deletion) {
- if (!v->isTempNonExistentItem()) {
- if (v->getCas() > meta.cas) {
+bool RevisionSeqnoResolution::resolve(const StoredValue& v,
+ const ItemMetaData& meta,
+ bool isDelete) const {
+ if (!v.isTempNonExistentItem()) {
+ if (v.getRevSeqno() > meta.revSeqno) {
return false;
- } else if (v->getCas() == meta.cas) {
- if (v->getRevSeqno() > meta.revSeqno) {
+ } else if (v.getRevSeqno() == meta.revSeqno) {
+ if (v.getCas() > meta.cas) {
return false;
- } else if (v->getRevSeqno() == meta.revSeqno) {
- if (deletion || v->getExptime() > meta.exptime) {
+ } else if (v.getCas() == meta.cas) {
+ if (isDelete || v.getExptime() > meta.exptime) {
return false;
- } else if (v->getExptime() == meta.exptime) {
- if (v->getFlags() >= meta.flags) {
+ } else if (v.getExptime() == meta.exptime) {
+ if (v.getFlags() >= meta.flags) {
return false;
}
}
}
}
return true;
+
}
/**
* A conflict resolution strategy that compares the meta data for a document
- * from a remote node and this node. The conflict strategy works by picking
+ * from a remote node and this node. This conflict resolution works by picking
* a winning document based on comparing meta data fields and finding a field
- * that has a larger value than the other documents field. The fields are
- * compared in the following order: rev seqno, cas, expiration, flags. If all
- * fields are equal than the local document is chosen as the winner.
+ * that has a larger value than the other document's fields. The fields are
+ * compared in the following order: cas, rev seqno, expiration, flags.
+ * Regardless of conflict resolution mode, all CAS values are generated from
+ * a Hybrid Logical Clock (HLC), so a larger CAS is the last write.
+ * If all fields are equal than the local document is chosen as the winner.
*/
-bool ConflictResolution::resolve_rev_seqno(StoredValue *v,
- const ItemMetaData &meta,
- bool deletion) {
- if (!v->isTempNonExistentItem()) {
- if (v->getRevSeqno() > meta.revSeqno) {
+bool LastWriteWinsResolution::resolve(const StoredValue& v,
+ const ItemMetaData& meta,
+ bool isDelete) const {
+ if (!v.isTempNonExistentItem()) {
+ if (v.getCas() > meta.cas) {
return false;
- } else if (v->getRevSeqno() == meta.revSeqno) {
- if (v->getCas() > meta.cas) {
+ } else if (v.getCas() == meta.cas) {
+ if (v.getRevSeqno() > meta.revSeqno) {
return false;
- } else if (v->getCas() == meta.cas) {
- if (deletion || v->getExptime() > meta.exptime) {
+ } else if (v.getRevSeqno() == meta.revSeqno) {
+ if (isDelete || v.getExptime() > meta.exptime) {
return false;
- } else if (v->getExptime() == meta.exptime) {
- if (v->getFlags() >= meta.flags) {
+ } else if (v.getExptime() == meta.exptime) {
+ if (v.getFlags() >= meta.flags) {
return false;
}
}
}
}
return true;
-}
-
-bool ConflictResolution::resolve(RCPtr<VBucket> &vb , StoredValue *v,
- const ItemMetaData &meta,
- bool deletion, enum conflict_resolution_mode
- itmConfResMode) {
- if (vb->isTimeSyncEnabled()) {
- if (v->getConflictResMode() == last_write_wins &&
- itmConfResMode == last_write_wins) {
- return resolve_lww(v, meta, deletion);
- } else if ((v->getConflictResMode() != last_write_wins &&
- itmConfResMode == last_write_wins) ||
- (v->getConflictResMode() == last_write_wins &&
- itmConfResMode != last_write_wins)) {
- // Log the event when the time sync is enabled and
- // the mutation is not eligible for last_write_wins
- LOG(EXTENSION_LOG_DEBUG,
- "Resolving conflict by comparing rev seqno: key: %s,"
- "source conflict resolution mode: %s, target conflict resolution"
- "mode: %s", v->getKey().c_str(),
- getConflictResModeStr(itmConfResMode),
- getConflictResModeStr(v->getConflictResMode()));
- }
- }
-
- return resolve_rev_seqno(v, meta, deletion);
-}
+}
\ No newline at end of file
// delete an item with meta data
del_with_meta(h, h1, key1, keylen, 0, &itemMeta, 0, false, false,
- 0, 0, cookie);
+ 0, cookie);
check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
check(last_seqno == high_seqno + 1, "Expected valid sequence number");
// delete an item with meta data
del_with_meta(h, h1, key2, keylen, 0, &itemMeta, 0, false, false,
- 0, 0, cookie);
+ 0, cookie);
check(last_uuid == vb_uuid, "Expected same vbucket uuid");
check(last_seqno == high_seqno + 1, "Expected same sequence number");
// do set with meta with the correct cas value. should pass.
set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set,
- false, 0, false, 0, 0, cookie);
+ false, 0, false, 0, cookie);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
check(last_uuid == vb_uuid, "Expected valid vbucket uuid");
check(last_seqno == high_seqno + 1, "Expected valid sequence number");
itm_meta.revSeqno++;
cas_for_set = last_meta.cas;
set_with_meta(h, h1, key, keylen, newVal, newValLen, 0, &itm_meta, cas_for_set,
- false, 0, false, 0, 0, cookie);
+ false, 0, false, 0, cookie);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
check(last_uuid == vb_uuid, "Expected same vbucket uuid");
check(last_seqno == high_seqno + 1, "Expected same sequence number");
"Expect zero setMeta ops");
set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 1);
+ PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
checkeq(0, get_int_stat(h, h1, "ep_bg_meta_fetched"),
"Expected no bg meta fetchs, thanks to bloom filters");
// Check all meta data is the same
set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 1);
+ PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
checkeq(1, get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail"),
"Expected set meta conflict resolution failure");
// Check that an older cas fails
itemMeta.cas = 0xdeadbeee;
set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 1);
+ PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
checkeq(2, get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail"),
"Expected set meta conflict resolution failure");
// Check that a higher cas passes
itemMeta.cas = 0xdeadbeff;
set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 1);
+ PROTOCOL_BINARY_RAW_BYTES, true, gethrtime());
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
- // Check that a higher cas, lower rev seqno and conflict resolution
- // with revision seqno will fail
- itemMeta.cas = 0xdeadbfff;
- itemMeta.revSeqno = 9;
- set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 0);
- checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
- checkeq(3, get_int_stat(h, h1, "ep_num_ops_set_meta_res_fail"),
- "Expected set meta conflict resolution failure");
-
- // Check that a lower cas, higher rev seqno and conflict resolution
- // with revision seqno will pass
- itemMeta.revSeqno = 11;
- set_with_meta(h, h1, "key", 3, NULL, 0, 0, &itemMeta, 0, false,
- PROTOCOL_BINARY_RAW_BYTES, true, gethrtime(), 0);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
return SUCCESS;
}
itemMeta.exptime = 0;
itemMeta.flags = 0xdeadbeef;
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 1);
+ del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
wait_for_flusher_to_settle(h, h1);
wait_for_stat_to_be(h, h1, "curr_items", 0);
// Check all meta data is the same
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 1);
+ del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
checkeq(1, get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail"),
"Expected delete meta conflict resolution failure");
// Check that higher rev seqno but lower cas fails
itemMeta.cas = info.cas;
itemMeta.revSeqno = 11;
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 1);
+ del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
checkeq(2, get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail"),
"Expected delete meta conflict resolution failure");
// Check that a higher cas and lower rev seqno passes
itemMeta.cas = info.cas + 2;
itemMeta.revSeqno = 9;
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 1);
+ del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime());
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected sucess");
- // Check that a higher rev seqno and lower cas and conflict resolution of
- // revision seqno passes
- itemMeta.revSeqno = 10;
- itemMeta.cas = info.cas + 1;
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 0);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
-
- // Check that a lower rev seqno and higher cas and conflict resolution of
- // revision seqno fails
- itemMeta.revSeqno = 9;
- itemMeta.cas = info.cas + 2;
- del_with_meta(h, h1, "key", 3, 0, &itemMeta, 0, false, true, gethrtime(), 0);
- checkeq(PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, last_status.load(), "Expected exists");
- checkeq(3, get_int_stat(h, h1, "ep_num_ops_del_meta_res_fail"),
- "Expected delete meta conflict resolution failure");
-
- return SUCCESS;
-}
-
-static enum test_result test_adjusted_time_apis(ENGINE_HANDLE *h,
- ENGINE_HANDLE_V1 *h1) {
-
- int64_t adjusted_time1, adjusted_time2;
- protocol_binary_request_header *request;
-
- std::string time_sync = get_str_stat(h, h1, "vb_0:time_sync", "vbucket-details");
- checkeq(std::string("enabled"), time_sync, "Time sync should've been disabled");
-
- for (int j = 0; j < 10; ++j) {
- item *i = NULL;
- std::string key("key-" + std::to_string(j));
- checkeq(ENGINE_SUCCESS,
- store(h, h1, NULL, OPERATION_SET,
- key.c_str(), "data", &i, 0, 0, 0, 0),
- "Failed to store a value");
- h1->release(h, NULL, i);
- }
- wait_for_flusher_to_settle(h, h1);
-
- uint64_t high_seqno = get_ull_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
- uint64_t vb_uuid = get_ull_stat(h, h1, "vb_0:0:id", "failovers");
-
- set_drift_counter_state(h, h1, 1000);
-
- uint64_t recvVbuuid;
- int64_t recvSeqno;
- // Change in time_sync state => last_body should've carried high_seqno
- checkeq(sizeof(recvVbuuid) + sizeof(recvSeqno), last_body.size(),
- "Bodylen didn't match expected value");
-
- memcpy(&recvVbuuid, last_body.data(), sizeof(recvVbuuid));
- memcpy(&recvSeqno, last_body.data() + sizeof(recvVbuuid), sizeof(recvSeqno));
- recvVbuuid = ntohll(recvVbuuid);
- recvSeqno = ntohll(recvSeqno);
- checkeq(vb_uuid, recvVbuuid,
- "setDriftCounterState's response carried incorrect vb_uuid");
- checkeq(static_cast<int64_t>(high_seqno), recvSeqno,
- "setDriftCounterState's response carried incorrect high_seqno");
-
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
- "Expected Success");
- checkeq(sizeof(int64_t), last_body.size(),
- "Bodylen didn't match expected value");
- memcpy(&adjusted_time1, last_body.data(), last_body.size());
- adjusted_time1 = ntohll(adjusted_time1);
-
- set_drift_counter_state(h, h1, 1000000);
-
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
- "Expected Success");
- checkeq(sizeof(int64_t), last_body.size(),
- "Bodylen didn't match expected value");
- memcpy(&adjusted_time2, last_body.data(), last_body.size());
- adjusted_time2 = ntohll(adjusted_time2);
-
- // adjusted_time2 should be greater than adjusted_time1 marginally
- // by adjusted_time1 + (difference in the 2 driftCounts set previously)
- check(adjusted_time2 >= adjusted_time1 + 999000,
- "Adjusted_time2: now what expected");
-
- // Test sending adjustedTime with SetWithMeta
- ItemMetaData itm_meta;
- itm_meta.flags = 0xdeadbeef;
- itm_meta.exptime = 0;
- itm_meta.revSeqno = 10;
- itm_meta.cas = 0xdeadbeef;
- set_with_meta(h, h1, "key", 3, "value", 5, 0, &itm_meta, last_cas,
- false, 0x00, true, adjusted_time2 * 2);
- wait_for_flusher_to_settle(h, h1);
-
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
- "Expected a SUCCESS");
- check_key_value(h, h1, "key", "value", 5, 0);
-
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
- "Expected Success");
- checkeq(sizeof(int64_t), last_body.size(),
- "Bodylen didn't match expected value");
- memcpy(&adjusted_time1, last_body.data(), last_body.size());
- adjusted_time1 = ntohll(adjusted_time1);
-
- // Check that adjusted_time1 should be marginally greater than
- // adjusted_time2 * 2
- check(adjusted_time1 >= adjusted_time2 * 2,
- "Adjusted_time1: not what is expected");
-
- // Test sending adjustedTime with DelWithMeta
- item *i = NULL;
- checkeq(ENGINE_SUCCESS,
- store(h, h1, NULL, OPERATION_SET, "key2", "value2", &i),
- "Failed set.");
- h1->release(h, NULL, i);
- del_with_meta(h, h1, "key2", 4, 0, &itm_meta, last_cas, false,
- true, adjusted_time1 * 2);
- wait_for_flusher_to_settle(h, h1);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Expected success");
-
- request = createPacket(PROTOCOL_BINARY_CMD_GET_ADJUSTED_TIME, 0, 0, NULL, 0,
- NULL, 0, NULL, 0);
- h1->unknown_command(h, NULL, request, add_response);
- cb_free(request);
- checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
- "Expected Success");
- checkeq(sizeof(int64_t), last_body.size(),
- "Bodylen didn't match expected value");
- memcpy(&adjusted_time2, last_body.data(), last_body.size());
- adjusted_time2 = ntohll(adjusted_time2);
-
- // Check that adjusted_time2 should be marginally greater than
- // adjusted_time1 * 2
- check(adjusted_time2 >= adjusted_time1 * 2,
- "Adjusted_time2: not what is expected");
-
- //Check if set drift counter state returns EINVAL when trying to set
- //to initial drift value
- int64_t initialDriftCount = -140737488355328;
- uint8_t timeSync = 0x00;
-
- int64_t driftCount = htonll(initialDriftCount);
- uint8_t extlen = sizeof(driftCount) + sizeof(timeSync);
- char *ext = new char[extlen];
- memcpy(ext, (char *)&driftCount, sizeof(driftCount));
- memcpy(ext + sizeof(driftCount), (char *)&timeSync, sizeof(timeSync));
-
- request = createPacket(PROTOCOL_BINARY_CMD_SET_DRIFT_COUNTER_STATE,
- 0, 0, ext, extlen);
- h1->unknown_command(h, NULL, request, add_response);
- checkeq(PROTOCOL_BINARY_RESPONSE_EINVAL, last_status.load(),
- "Expected invalid response");
- cb_free(request);
- delete[] ext;
-
return SUCCESS;
}
prepare, cleanup),
TestCase("test del meta lww conflict resolution",
test_del_meta_lww_conflict_resolution, test_setup, teardown,
- "time_synchronization=enabled_without_drift",prepare, cleanup),
+ "conflict_resolution_type=lww",prepare, cleanup),
TestCase("test set meta lww conflict resolution",
test_set_meta_lww_conflict_resolution, test_setup, teardown,
- "time_synchronization=enabled_without_drift",prepare, cleanup),
+ "conflict_resolution_type=lww",prepare, cleanup),
TestCase("temp item deletion", test_temp_item_deletion,
test_setup, teardown,
"exp_pager_stime=1", prepare, cleanup),
- TestCase("test getAdjustedTime, setDriftCounter apis",
- test_adjusted_time_apis, test_setup, teardown,
- "time_synchronization=enabled_with_drift", prepare, cleanup),
TestCase("test getAdjustedTime, setDriftCounter apis negative tests",
test_adjusted_time_negative_tests, test_setup, teardown,
NULL, prepare, cleanup),