lastBySeqno = seqno;
}
- int64_t getHighSeqno() {
+ int64_t getHighSeqno() const {
LockHolder lh(queueLock);
return lastBySeqno;
}
return true;
}
-bool CouchKVStore::snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
+bool CouchKVStore::snapshotVBucket(uint16_t vbucketId,
+ const vbucket_state &vbstate,
Callback<kvstats_ctx> *cb, bool persist) {
if (isReadOnly()) {
LOG(EXTENSION_LOG_WARNING,
* @param persist - whether to persist snapshot state or not
* @return true if the snapshot is done successfully
*/
- bool snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
+ bool snapshotVBucket(uint16_t vbucketId,
+ const vbucket_state &vbstate,
Callback<kvstats_ctx> *cb, bool persist);
/**
: vbuckets(vb_map), shardId(sid) { }
bool visitBucket(RCPtr<VBucket> &vb) {
if (vbuckets.getShardByVbId(vb->getId())->getId() == shardId) {
- const snapshot_range_t range = vb->getPersistedSnapshot();
- std::string failovers = vb->failovers->toJSON();
- uint64_t chkId = vbuckets.getPersistenceCheckpointId(vb->getId());
-
- vbucket_state vb_state(vb->getState(), chkId, 0,
- vb->getHighSeqno(), vb->getPurgeSeqno(),
- range.start, range.end, vb->getMaxCas(),
- failovers);
- states.insert(std::pair<uint16_t, vbucket_state>(vb->getId(), vb_state));
+ states.insert(std::make_pair(vb->getId(),
+ vb->getVBucketState()));
}
return false;
}
const hrtime_t start = gethrtime();
KVStatsCallback kvcb(this);
- uint64_t chkId = vbMap.getPersistenceCheckpointId(vbid);
- std::string failovers = vb->failovers->toJSON();
- const snapshot_range_t range = vb->getPersistedSnapshot();
- vbucket_state vb_state(vb->getState(), chkId, 0, vb->getHighSeqno(),
- vb->getPurgeSeqno(), range.start, range.end,
- vb->getMaxCas(), failovers);
+ const vbucket_state vb_state(vb->getVBucketState());
KVStore *rwUnderlying = getRWUnderlying(vbid);
if (rwUnderlying->snapshotVBucket(vbid, vb_state, &kvcb)) {
}
}
- std::string failovers = vb->failovers->toJSON();
- vbucket_state vbState(vb->getState(),
- vbMap.getPersistenceCheckpointId(vbid),
- maxDeletedRevSeqno, vb->getHighSeqno(),
- vb->getPurgeSeqno(), range.start,
- range.end, maxCas, failovers);
+ // Get current vbstate, then update based on the changes we
+ // have just made.
+ auto vbstate = vb->getVBucketState();
+ vbstate.lastSnapStart = range.start;
+ vbstate.lastSnapEnd = range.end;
+ vbstate.maxCas = maxCas;
+ vbstate.maxDeletedSeqno = maxDeletedRevSeqno;
- if (rwUnderlying->snapshotVBucket(vb->getId(), vbState,
+ if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
NULL, false) != true) {
return RETRY_FLUSH_VBUCKET;
}
return dbInfo;
}
-bool ForestKVStore::snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
+bool ForestKVStore::snapshotVBucket(uint16_t vbucketId,
+ const vbucket_state &vbstate,
Callback<kvstats_ctx> *cb, bool persist) {
if (updateCachedVBState(vbucketId, vbstate) && persist) {
* @param cb - callback for updating kv stats
* @return true if the snapshot is done successfully
*/
- bool snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
+ bool snapshotVBucket(uint16_t vbucketId, const vbucket_state &vbstate,
Callback<kvstats_ctx> *cb, bool persist);
/**
Configuration &config = store.getEPEngine().getConfiguration();
maxVbuckets = config.getMaxVbuckets();
- vbuckets = new RCPtr<VBucket>[maxVbuckets];
+ vbuckets.resize(maxVbuckets);
std::string backend = kvConfig.getBackend();
uint16_t commitInterval = 1;
if (kvConfig.getBackend().compare("couchdb") == 0) {
delete roUnderlying;
}
-
- delete[] vbuckets;
}
KVStore *KVShard::getRWUnderlying() {
}
private:
- RCPtr<VBucket> *vbuckets;
+ std::vector<RCPtr<VBucket>> vbuckets;
KVStore *rwUnderlying;
KVStore *roUnderlying;
} RollbackResult;
struct vbucket_state {
+ vbucket_state() = default;
+
vbucket_state(vbucket_state_t _state, uint64_t _chkid,
uint64_t _maxDelSeqNum, int64_t _highSeqno,
uint64_t _purgeSeqno, uint64_t _lastSnapStart,
- uint64_t _lastSnapEnd, uint64_t _maxCas, std::string& _failovers) :
- state(_state), checkpointId(_chkid), maxDeletedSeqno(_maxDelSeqNum),
- highSeqno(_highSeqno), purgeSeqno(_purgeSeqno),
- lastSnapStart(_lastSnapStart), lastSnapEnd(_lastSnapEnd),
- maxCas(_maxCas), failovers(_failovers) { }
+ uint64_t _lastSnapEnd, uint64_t _maxCas,
+ std::string _failovers) :
+ state(_state),
+ checkpointId(_chkid),
+ maxDeletedSeqno(_maxDelSeqNum),
+ highSeqno(_highSeqno),
+ purgeSeqno(_purgeSeqno),
+ lastSnapStart(_lastSnapStart),
+ lastSnapEnd(_lastSnapEnd),
+ maxCas(_maxCas),
+ failovers(std::move(_failovers)) { }
vbucket_state(const vbucket_state& vbstate) {
state = vbstate.state;
* @param cb stats callback
* @param persist whether state needs to be persisted to disk
*/
- virtual bool snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
+ virtual bool snapshotVBucket(uint16_t vbucketId,
+ const vbucket_state &vbstate,
Callback<kvstats_ctx> *cb,
bool persist = true) = 0;
}
}
+vbucket_state VBucket::getVBucketState() const {
+ auto persisted_range = getPersistedSnapshot();
+
+ return vbucket_state{getState(),
+ getPersistenceCheckpointId(), 0, getHighSeqno(),
+ getPurgeSeqno(),
+ persisted_range.start, persisted_range.end,
+ getMaxCas(), failovers->toJSON()};
+}
+
+
+
void VBucket::doStatsForQueueing(Item& qi, size_t itemBytes)
{
++dirtyQueueSize;
~VBucket();
- int64_t getHighSeqno() {
+ int64_t getHighSeqno() const {
return checkpointManager.getHighSeqno();
}
return checkpointManager.getMemoryUsageOfUnrefCheckpoints();
}
- uint64_t getPurgeSeqno() {
+ uint64_t getPurgeSeqno() const {
return purge_seqno;
}
return {persisted_snapshot_start, persisted_snapshot_end};
}
- uint64_t getMaxCas() {
+ uint64_t getMaxCas() const {
return hlc.getMaxHLC();
}
initialState = initState;
}
+ vbucket_state getVBucketState() const;
+
bool addPendingOp(const void *cookie) {
LockHolder lh(pendingOpLock);
if (state != vbucket_state_pending) {