}
}
+
+uint64_t VBucket::getPersistenceCheckpointId() const {
+ return persistenceCheckpointId.load();
+}
+
+void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
+ persistenceCheckpointId.store(checkpointId);
+}
+
bool VBucket::isResidentRatioUnderThreshold(float threshold,
item_eviction_policy_t policy) {
if (policy != FULL_EVICTION) {
return rollbackItemCount.load(std::memory_order_relaxed);
}
+ // Return the persistence checkpoint ID
+ uint64_t getPersistenceCheckpointId() const;
+
+ // Set the persistence checkpoint ID to the given value.
+ void setPersistenceCheckpointId(uint64_t checkpointId);
+
static const vbucket_state_t ACTIVE;
static const vbucket_state_t REPLICA;
static const vbucket_state_t PENDING;
HLC hlc;
std::string statPrefix;
+ // The persistence checkpoint ID for this vbucket.
+ AtomicValue<uint64_t> persistenceCheckpointId;
static size_t chkFlushTimeout;
EventuallyPersistentStore &store) :
bucketDeletion(new AtomicValue<bool>[config.getMaxVbuckets()]),
bucketCreation(new AtomicValue<bool>[config.getMaxVbuckets()]),
- persistenceCheckpointIds(new
- AtomicValue<uint64_t>[config.getMaxVbuckets()]),
persistenceSeqnos(new AtomicValue<uint64_t>[config.getMaxVbuckets()]),
size(config.getMaxVbuckets())
{
for (size_t i = 0; i < size; ++i) {
bucketDeletion[i].store(false);
bucketCreation[i].store(false);
- persistenceCheckpointIds[i].store(0);
persistenceSeqnos[i].store(0);
}
}
VBucketMap::~VBucketMap() {
delete[] bucketDeletion;
delete[] bucketCreation;
- delete[] persistenceCheckpointIds;
delete[] persistenceSeqnos;
while (!shards.empty()) {
delete shards.back();
}
uint64_t VBucketMap::getPersistenceCheckpointId(id_type id) const {
- return persistenceCheckpointIds[id].load();
+ if (id < size) {
+ auto vb = getBucket(id);
+ if (vb) {
+ return vb->getPersistenceCheckpointId();
+ }
+ }
+ return {};
}
void VBucketMap::setPersistenceCheckpointId(id_type id,
uint64_t checkpointId) {
- persistenceCheckpointIds[id].store(checkpointId);
+ if (id < size) {
+ auto vb = getBucket(id);
+ if (vb) {
+ getBucket(id)->setPersistenceCheckpointId(checkpointId);
+ }
+ }
}
uint64_t VBucketMap::getPersistenceSeqno(id_type id) const {
std::vector<KVShard*> shards;
AtomicValue<bool> *bucketDeletion;
AtomicValue<bool> *bucketCreation;
- AtomicValue<uint64_t> *persistenceCheckpointIds;
AtomicValue<uint64_t> *persistenceSeqnos;
const id_type size;