Declare a named type for shared pointers to VBuckets.
Change-Id: I93e121f86199617c1651c5896efc7df7cd99ea83
Reviewed-on: http://review.couchbase.org/76556
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
accessed.clear();
}
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
currentBucket = vb;
update();
// The number of items to scan before we pause
const uint64_t items_to_scan;
- RCPtr<VBucket> currentBucket;
+ VBucketPtr currentBucket;
};
AccessScanner::AccessScanner(KVBucket& _store,
};
void ItemResidentCallback::callback(CacheLookup &lookup) {
- RCPtr<VBucket> vb = engine->getKVBucket()->getVBucket(
+ VBucketPtr vb = engine->getKVBucket()->getVBucket(
lookup.getVBucketId());
if (!vb) {
setStatus(ENGINE_SUCCESS);
valid(true) {
}
-void BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
+void BackFillVisitor::visitBucket(VBucketPtr &vb) {
if (vBucketFilter(vb->getId())) {
double num_items = static_cast<double>(vb->getNumItems());
virtual ~BackFillVisitor() {}
- void visitBucket(RCPtr<VBucket> &vb) override;
+ void visitBucket(VBucketPtr &vb) override;
void complete(void) override;
}
for (const uint16_t vbId : bg_vbs) {
- RCPtr<VBucket> vb = shard->getBucket(vbId);
+ VBucketPtr vb = shard->getBucket(vbId);
if (vb) {
// Requeue the bg fetch task if vbucket DB file is not created yet.
if (vb->isBucketCreation()) {
bool BgFetcher::pendingJob() const {
for (const auto vbid : shard->getVBuckets()) {
- RCPtr<VBucket> vb = shard->getBucket(vbid);
+ VBucketPtr vb = shard->getBucket(vbid);
if (vb && vb->hasPendingBGFetchItems()) {
return true;
}
: store(s), stats(st), removed(0), taskStart(gethrtime()),
wasHighMemoryUsage(s->isMemoryUsageTooHigh()), stateFinalizer(sfin) {}
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
bool newCheckpointCreated = false;
removed = vb->checkpointManager.removeClosedUnrefCheckpoints(
*vb, newCheckpointCreated);
for (const auto& it: vbuckets) {
if (memoryCleared < amountOfMemoryToClear) {
uint16_t vbid = it.first;
- RCPtr<VBucket> vb = kvBucket->getVBucket(vbid);
+ VBucketPtr vb = kvBucket->getVBucket(vbid);
if (vb) {
// Get a list of cursors that can be dropped from the
// vbucket's checkpoint manager, so as to unreference
break;
case backfill_snooze: {
uint16_t vbid = backfill->getVBucketId();
- RCPtr<VBucket> vb = engine.getVBucket(vbid);
+ VBucketPtr vb = engine.getVBucket(vbid);
if (vb) {
snoozingBackfills.push_back(
std::make_pair(ep_current_time(), std::move(backfill)));
}
void CacheCallback::callback(CacheLookup& lookup) {
- RCPtr<VBucket> vb =
+ VBucketPtr vb =
engine_.getKVBucket()->getVBucket(lookup.getVBucketId());
if (!vb) {
setStatus(ENGINE_SUCCESS);
return ENGINE_DISCONNECT;
}
- RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
+ VBucketPtr vb = engine_.getVBucket(vbucket);
if (!vb) {
logger.log(EXTENSION_LOG_WARNING,
"(vb %d) Add stream failed because this vbucket doesn't exist",
"code from EpStore::rollback: " + std::to_string(err));
}
- RCPtr<VBucket> vb = engine_.getVBucket(vbid);
+ VBucketPtr vb = engine_.getVBucket(vbid);
auto stream = findStream(vbid);
if (stream) {
stream->reconnectStream(vb, opaque, vb->getHighSeqno());
auto stream = findStream(vbucket);
if (stream && stream->getOpaque() == opaque && stream->isPending()) {
if (status == ENGINE_SUCCESS) {
- RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
+ VBucketPtr vb = engine_.getVBucket(vbucket);
vb->failovers->replaceFailoverLog(body, bodylen);
KVBucketIface* kvBucket = engine_.getKVBucket();
kvBucket->scheduleVBStatePersist(vbucket);
return ENGINE_DISCONNECT;
}
- RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
+ VBucketPtr vb = engine_.getVBucket(vbucket);
if (!vb) {
LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
"this vbucket doesn't exist", logHeader(), vbucket);
return ENGINE_DISCONNECT;
}
- RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
+ VBucketPtr vb = engine_.getVBucket(vbucket);
if (!vb) {
LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
"because this vbucket doesn't exist", logHeader(), vbucket);
end_seqno_ = dcpMaxSeqno;
}
- RCPtr<VBucket> vbucket = engine->getVBucket(vb);
+ VBucketPtr vbucket = engine->getVBucket(vb);
if (vbucket) {
ReaderLockHolder rlh(vbucket->getStateLock());
if (vbucket->getState() == vbucket_state_replica) {
startSeqno = std::min(snap_start_seqno_, startSeqno);
firstMarkerSent = true;
- RCPtr<VBucket> vb = engine->getVBucket(vb_);
+ VBucketPtr vb = engine->getVBucket(vb_);
if (!vb) {
producer->getLogger().log(EXTENSION_LOG_WARNING,"(vb %" PRIu16 ") "
"ActiveStream::markDiskSnapshot, vbucket "
engine->getKVBucket()->setVBucketState(vb_, vbucket_state_dead,
false, false);
- RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+ VBucketPtr vbucket = engine->getVBucket(vb_);
producer->getLogger().log(EXTENSION_LOG_NOTICE,
"(vb %" PRIu16 ") Vbucket marked as dead, last sent seqno: %"
PRIu64 ", high seqno: %" PRIu64,
DcpResponse* ActiveStream::takeoverSendPhase() {
- RCPtr<VBucket> vb = engine->getVBucket(vb_);
+ VBucketPtr vb = engine->getVBucket(vb_);
if (vb && takeoverStart != 0 &&
!vb->isTakeoverBackedUp() &&
(ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
}
bool ActiveStream::nextCheckpointItem() {
- RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+ VBucketPtr vbucket = engine->getVBucket(vb_);
if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
// schedule this stream to build the next checkpoint
producer->scheduleCheckpointProcessorTask(this);
}
void ActiveStream::nextCheckpointItemTask() {
- RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+ VBucketPtr vbucket = engine->getVBucket(vb_);
if (vbucket) {
std::vector<queued_item> items;
getOutstandingItems(vbucket, items);
}
}
-void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
+void ActiveStream::getOutstandingItems(VBucketPtr &vb,
std::vector<queued_item> &items) {
// Commencing item processing - set guard flag.
chkptItemsExtractionInProgress.store(true);
return;
}
- RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+ VBucketPtr vbucket = engine->getVBucket(vb_);
if (!vbucket) {
producer->getLogger().log(EXTENSION_LOG_WARNING,
"(vb %" PRIu16 ") Failed to schedule "
break;
case StreamState::Dead:
{
- RCPtr<VBucket> vb = engine->getVBucket(vb_);
+ VBucketPtr vb = engine->getVBucket(vb_);
if (vb) {
vb->checkpointManager.removeCursor(name_);
}
}
size_t ActiveStream::getItemsRemaining() {
- RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+ VBucketPtr vbucket = engine->getVBucket(vb_);
if (!vbucket || !isActive()) {
return 0;
bool ActiveStream::isCurrentSnapshotCompleted() const
{
- RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+ VBucketPtr vbucket = engine->getVBucket(vb_);
// An atomic read of vbucket state without acquiring the
// reader lock for state should suffice here.
if (vbucket && vbucket->getState() == vbucket_state_replica) {
void ActiveStream::dropCheckpointCursor_UNLOCKED()
{
- RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
+ VBucketPtr vbucket = engine->getVBucket(vb_);
if (!vbucket) {
endStream(END_STREAM_STATE);
bool inverse = false;
snap_start_seqno, snap_end_seqno, Type::Notifier),
producer(p) {
LockHolder lh(streamMutex);
- RCPtr<VBucket> vbucket = e->getVBucket(vb_);
+ VBucketPtr vbucket = e->getVBucket(vb_);
if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
transitionState(StreamState::Dead);
}
}
-void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
+void PassiveStream::reconnectStream(VBucketPtr &vb,
uint32_t new_opaque,
uint64_t start_seqno) {
vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
}
ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
- RCPtr<VBucket> vb = engine->getVBucket(vb_);
+ VBucketPtr vb = engine->getVBucket(vb_);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
}
ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
- RCPtr<VBucket> vb = engine->getVBucket(vb_);
+ VBucketPtr vb = engine->getVBucket(vb_);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
ENGINE_ERROR_CODE PassiveStream::processSystemEvent(
const SystemEventMessage& event) {
- RCPtr<VBucket> vb = engine->getVBucket(vb_);
+ VBucketPtr vb = engine->getVBucket(vb_);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
void PassiveStream::processMarker(SnapshotMarker* marker) {
- RCPtr<VBucket> vb = engine->getVBucket(vb_);
+ VBucketPtr vb = engine->getVBucket(vb_);
cur_snapshot_start.store(marker->getStartSeqno());
cur_snapshot_end.store(marker->getEndSeqno());
}
}
-void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
+void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
if (byseqno == cur_snapshot_end.load()) {
if (cur_snapshot_type.load() == Snapshot::Disk &&
vb->isBackfillPhase()) {
protected:
// Returns the outstanding items for the stream's checkpoint cursor.
- void getOutstandingItems(RCPtr<VBucket> &vb, std::vector<queued_item> &items);
+ void getOutstandingItems(VBucketPtr &vb, std::vector<queued_item> &items);
// Given a set of queued items, create mutation responses for each item,
// and pass onto the producer associated with this stream.
void acceptStream(uint16_t status, uint32_t add_opaque);
- void reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque,
+ void reconnectStream(VBucketPtr &vb, uint32_t new_opaque,
uint64_t start_seqno);
ENGINE_ERROR_CODE messageReceived(std::unique_ptr<DcpResponse> response);
ENGINE_ERROR_CODE processSeparatorChanged(VBucket& vb,
const CollectionsEvent& event);
- void handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno);
+ void handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno);
void processMarker(SnapshotMarker* marker);
DiskStatVisitor(const void* c, ADD_STAT a) : cookie(c), add_stat(a) {
}
- void visitBucket(RCPtr<VBucket>& vb) override {
+ void visitBucket(VBucketPtr& vb) override {
char buf[32];
uint16_t vbid = vb->getId();
DBFileInfo dbInfo =
return ENGINE_SUCCESS;
}
-RCPtr<VBucket> EPBucket::makeVBucket(VBucket::id_type id,
+VBucketPtr EPBucket::makeVBucket(VBucket::id_type id,
vbucket_state_t state,
KVShard* shard,
std::unique_ptr<FailoverTable> table,
uint64_t maxCas,
const std::string& collectionsManifest) {
auto flusherCb = std::make_shared<NotifyFlusherCB>(shard);
- return RCPtr<VBucket>(new EPVBucket(id,
+ return VBucketPtr(new EPVBucket(id,
state,
stats,
engine.getCheckpointConfig(),
ENGINE_ERROR_CODE EPBucket::statsVKey(const DocKey& key,
uint16_t vbucket,
const void* cookie) {
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
gcb.waitForValue();
if (eviction_policy == FULL_EVICTION) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (vb) {
vb->completeStatsVKey(key, gcb);
}
"EPDiskRollbackCB::callback: dbHandle is NULL");
}
UniqueItemPtr itm(val.getValue());
- RCPtr<VBucket> vb = engine.getVBucket(itm->getVBucketId());
+ VBucketPtr vb = engine.getVBucket(itm->getVBucketId());
RememberingCallback<GetValue> gcb;
engine.getKVBucket()
->getROUnderlying(itm->getVBucketId())
/**
* Creates a VBucket object.
*/
- RCPtr<VBucket> makeVBucket(VBucket::id_type id,
+ VBucketPtr makeVBucket(VBucket::id_type id,
vbucket_state_t state,
KVShard* shard,
std::unique_ptr<FailoverTable> table,
}
uint16_t vbucket = ntohs(req->message.header.request.vbucket);
- RCPtr<VBucket> vb = e->getVBucket(vbucket);
+ VBucketPtr vb = e->getVBucket(vbucket);
if (!vb) {
return e->sendNotMyVBucketResponse(response, cookie, 0);
} else {
const item* itm, item_info* itm_info) {
const Item* it = reinterpret_cast<const Item*>(itm);
auto engine = acquireEngine(handle);
- RCPtr<VBucket> vb = engine->getKVBucket()->getVBucket(it->getVBucketId());
+ VBucketPtr vb = engine->getKVBucket()->getVBucket(it->getVBucketId());
uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
*itm_info = it->toItemInfo(vb_uuid);
return true;
auto* item = gv.getValue();
cb::unique_item_ptr ret{item, cb::ItemDeleter{handle}};
- const RCPtr<VBucket> vb = getKVBucket()->getVBucket(vbucket);
+ const VBucketPtr vb = getKVBucket()->getVBucket(vbucket);
const uint64_t vb_uuid = vb ? vb->failovers->getLatestUUID() : 0;
// Currently
*es = connection->specificData;
} else if (ret == TAP_CHECKPOINT_START) {
// Send the current value of the max deleted seqno
- RCPtr<VBucket> vb = getVBucket(*vbucket);
+ VBucketPtr vb = getVBucket(*vbucket);
if (!vb) {
retry = true;
return TAP_NOOP;
if (tap_event == TAP_CHECKPOINT_START &&
nengine == TapEngineSpecific::sizeRevSeqno) {
// Set the current value for the max deleted seqno
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
return ENGINE_TMPFAIL;
}
1);
}
-void VBucketCountAggregator::visitBucket(RCPtr<VBucket> &vb) {
+void VBucketCountAggregator::visitBucket(VBucketPtr &vb) {
std::map<vbucket_state_t, VBucketCountVisitor*>::iterator it;
it = visitorMap.find(vb->getState());
if ( it != visitorMap.end() ) {
isPrevState(isPrevStateRequested),
isDetailsRequested(detailsRequested) {}
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
addVBStats(cookie, add_stat, vb, eps, isPrevState,
isDetailsRequested);
}
static void addVBStats(const void *cookie, ADD_STAT add_stat,
- RCPtr<VBucket> &vb,
+ VBucketPtr &vb,
KVBucketIface* store,
bool isPrevStateRequested,
bool detailsRequested) {
if (!parseUint16(vbid.c_str(), &vbucket_id)) {
return ENGINE_EINVAL;
}
- RCPtr<VBucket> vb = getVBucket(vbucket_id);
+ VBucketPtr vb = getVBucket(vbucket_id);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
StatVBucketVisitor(const void *c, ADD_STAT a) : cookie(c),
add_stat(a) {}
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
uint16_t vbid = vb->getId();
char buf[32];
try {
ADD_STAT a) : kvBucket(kvs), cookie(c),
add_stat(a) {}
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
addCheckpointStat(cookie, add_stat, kvBucket, vb);
}
static void addCheckpointStat(const void *cookie, ADD_STAT add_stat,
KVBucketIface* eps,
- RCPtr<VBucket> &vb) {
+ VBucketPtr &vb) {
if (!vb) {
return;
}
if (!parseUint16(vbid.c_str(), &vbucket_id)) {
return ENGINE_EINVAL;
}
- RCPtr<VBucket> vb = getVBucket(vbucket_id);
+ VBucketPtr vb = getVBucket(vbucket_id);
StatCheckpointVisitor::addCheckpointStat(cookie, add_stat,
kvBucket.get(), vb);
const void *cookie,
ADD_STAT add_stat,
uint16_t vbid) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if(!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
StatVBucketVisitor(const void *c, ADD_STAT a) :
cookie(c), add_stat(a) {}
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
vb->failovers->addStats(cookie, vb->getId(), add_stat);
}
void EventuallyPersistentEngine::addSeqnoVbStats(const void *cookie,
ADD_STAT add_stat,
- const RCPtr<VBucket> &vb) {
+ const VBucketPtr &vb) {
// MB-19359: An atomic read of vbucket state without acquiring the
// reader lock for state should suffice here.
uint64_t relHighSeqno = vb->getHighSeqno();
}
int vbucket = atoi(value.c_str());
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb || vb->getState() == vbucket_state_dead) {
return ENGINE_NOT_MY_VBUCKET;
}
auto vbuckets = kvBucket->getVBuckets().getBuckets();
for (auto vbid : vbuckets) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (vb) {
addSeqnoVbStats(cookie, add_stat, vb);
}
LOG(EXTENSION_LOG_DEBUG, "Observing vbucket: %d with uuid: %" PRIu64,
vb_id, vb_uuid);
- RCPtr<VBucket> vb = kvBucket->getVBucket(vb_id);
+ VBucketPtr vb = kvBucket->getVBucket(vb_id);
if (!vb) {
return sendNotMyVBucketResponse(response, cookie, 0);
/// its checkpoint cursors.
const VBucketMap &vbuckets = getKVBucket()->getVBuckets();
for (VBucketMap::id_type vbid = 0; vbid < vbuckets.getSize(); ++vbid) {
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
+ VBucketPtr vb = vbuckets.getBucket(vbid);
if (!vb) {
continue;
}
{
std::stringstream msg;
uint16_t vbucket = ntohs(req->request.vbucket);
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
return sendNotMyVBucketResponse(response, cookie, 0);
{
std::stringstream msg;
uint16_t vbucket = ntohs(req->request.vbucket);
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
return sendNotMyVBucketResponse(response, cookie, 0);
ADD_STAT add_stat,
std::string &key,
uint16_t vbid) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
ADD_STAT add_stat,
std::string &key,
uint16_t vbid) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
}
uint16_t vbucket = ntohs(request->message.header.request.vbucket);
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
for (auto id : vbuckets) {
- RCPtr<VBucket> vb = getVBucket(id);
+ VBucketPtr vb = getVBucket(id);
if (vb) {
auto state = vb->getState();
bool getSeqnoForThisVb = false;
protocol_binary_response_status status,
uint64_t cas,
const void* cookie) {
- RCPtr<VBucket> vb = kvBucket->getVBucket(vbucket);
+ VBucketPtr vb = kvBucket->getVBucket(vbucket);
if (!vb) {
return sendErrorResponse(
response, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, cas, cookie);
*/
class VBucketCountAggregator : public VBucketVisitor {
public:
- void visitBucket(RCPtr<VBucket> &vb) override;
+ void visitBucket(VBucketPtr &vb) override;
void addVisitor(VBucketCountVisitor* visitor);
private:
protocol_binary_request_header *request,
ADD_RESPONSE response);
- RCPtr<VBucket> getVBucket(uint16_t vbucket) {
+ VBucketPtr getVBucket(uint16_t vbucket) {
return kvBucket->getVBucket(vbucket);
}
ENGINE_ERROR_CODE doSeqnoStats(const void *cookie, ADD_STAT add_stat,
const char* stat_key, int nkey);
void addSeqnoVbStats(const void *cookie, ADD_STAT add_stat,
- const RCPtr<VBucket> &vb);
+ const VBucketPtr &vb);
void addLookupResult(const void *cookie, Item *result) {
LockHolder lh(lookupMutex);
return true;
}
-RCPtr<VBucket> EphemeralBucket::makeVBucket(
+VBucketPtr EphemeralBucket::makeVBucket(
VBucket::id_type id,
vbucket_state_t state,
KVShard* shard,
/* the name is getNumPersistedDeletes, in ephemeral buckets the equivalent
meaning is the number of deletes seen by the vbucket.
This is needed by ns-server during vb-takeover */
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
return vb->getNumInMemoryDeletes();
}
/* In ephemeral buckets we must notify high priority requests as well.
We do not wait for persistence to notify high priority requests */
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
vb->notifyHighPriorityRequests(
engine, notifyCtx.bySeqno, HighPriorityVBNotify::Seqno);
}
/**
* Creates an EphemeralVBucket
*/
- RCPtr<VBucket> makeVBucket(VBucket::id_type id,
+ VBucketPtr makeVBucket(VBucket::id_type id,
vbucket_state_t state,
KVShard* shard,
std::unique_ptr<FailoverTable> table,
#include "ephemeral_vb.h"
#include "seqlist.h"
-void EphemeralVBucket::CountVisitor::visitBucket(RCPtr<VBucket>& vb) {
+void EphemeralVBucket::CountVisitor::visitBucket(VBucketPtr& vb) {
// Handle base class counts
VBucketCountVisitor::visitBucket(vb);
: VBucketCountVisitor(state) {
}
- void visitBucket(RCPtr<VBucket> &vb) override;
+ void visitBucket(VBucketPtr &vb) override;
size_t autoDeleteCount = 0;
uint64_t seqlistCount = 0;
if (!doHighPriority && shard->highPriorityCount.load() > 0) {
for (auto vbid : shard->getVBuckets()) {
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
if (vb && vb->getHighPriorityChkSize() > 0) {
hpVbs.push(vbid);
}
public:
ResizingVisitor() { }
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
vb->ht.resize();
}
};
}
}
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
update();
bool newCheckpointCreated = false;
bool wasHighMemoryUsage;
hrtime_t taskStart;
std::atomic<item_pager_phase>* pager_phase;
- RCPtr<VBucket> currentBucket;
+ VBucketPtr currentBucket;
};
ItemPager::ItemPager(EventuallyPersistentEngine *e, EPStats &st) :
}
void callback(uint16_t& vbucketId, const DocKey& key, bool& isDeleted) {
- RCPtr<VBucket> vb = store.getVBucket(vbucketId);
+ VBucketPtr vb = store.getVBucket(vbucketId);
if (vb) {
/* Check if a temporary filter has been initialized. If not,
* initialize it. If initialization fails, throw an exception
bool BloomFilterCallback::initTempFilter(uint16_t vbucketId) {
Configuration& config = store.getEPEngine().getConfiguration();
- RCPtr<VBucket> vb = store.getVBucket(vbucketId);
+ VBucketPtr vb = store.getVBucket(vbucketId);
if (!vb) {
return false;
}
class PendingOpsNotification : public GlobalTask {
public:
- PendingOpsNotification(EventuallyPersistentEngine& e, RCPtr<VBucket>& vb)
+ PendingOpsNotification(EventuallyPersistentEngine& e, VBucketPtr& vb)
: GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
engine(e),
vbucket(vb),
private:
EventuallyPersistentEngine &engine;
- RCPtr<VBucket> vbucket;
+ VBucketPtr vbucket;
const std::string description;
};
protocol_binary_response_status KVBucket::evictKey(const DocKey& key,
VBucket::id_type vbucket,
const char** msg) {
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb || (vb->getState() != vbucket_state_active)) {
return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
}
time_t startTime,
uint64_t revSeqno,
ExpireBy source) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (vb) {
// Obtain reader access to the VB state change lock so that
// the VB can't switch state whilst we're processing
}
}
-bool KVBucket::isMetaDataResident(RCPtr<VBucket> &vb, const DocKey& key) {
+bool KVBucket::isMetaDataResident(VBucketPtr &vb, const DocKey& key) {
if (!vb) {
throw std::invalid_argument("EPStore::isMetaDataResident: vb is NULL");
ENGINE_ERROR_CODE KVBucket::set(Item &itm, const void *cookie) {
- RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
+ VBucketPtr vb = getVBucket(itm.getVBucketId());
if (!vb) {
++stats.numNotMyVBuckets;
return ENGINE_NOT_MY_VBUCKET;
ENGINE_ERROR_CODE KVBucket::add(Item &itm, const void *cookie)
{
- RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
+ VBucketPtr vb = getVBucket(itm.getVBucketId());
if (!vb) {
++stats.numNotMyVBuckets;
return ENGINE_NOT_MY_VBUCKET;
}
ENGINE_ERROR_CODE KVBucket::replace(Item &itm, const void *cookie) {
- RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
+ VBucketPtr vb = getVBucket(itm.getVBucketId());
if (!vb) {
++stats.numNotMyVBuckets;
return ENGINE_NOT_MY_VBUCKET;
ENGINE_ERROR_CODE KVBucket::addBackfillItem(Item& itm,
GenerateBySeqno genBySeqno,
ExtendedMetaData* emd) {
- RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
+ VBucketPtr vb = getVBucket(itm.getVBucketId());
if (!vb) {
++stats.numNotMyVBuckets;
return ENGINE_NOT_MY_VBUCKET;
bool transfer,
bool notify_dcp,
LockHolder& vbset) {
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
if (vb && to == vb->getState()) {
return ENGINE_SUCCESS;
}
std::make_unique<FailoverTable>(engine.getMaxFailoverEntries());
KVShard* shard = vbMap.getShardByVbId(vbid);
- RCPtr<VBucket> newvb =
+ VBucketPtr newvb =
makeVBucket(vbid,
to,
shard,
}
void KVBucket::scheduleVBStatePersist(VBucket::id_type vbid) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (!vb) {
LOG(EXTENSION_LOG_WARNING,
bool bucketDeleting;
{
LockHolder lh(vbsetMutex);
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
bucketDeleting = !vb ||
vb->getState() == vbucket_state_dead ||
vb->isBucketDeletion();
return true;
}
-void KVBucket::scheduleVBDeletion(RCPtr<VBucket> &vb, const void* cookie,
+void KVBucket::scheduleVBDeletion(VBucketPtr &vb, const void* cookie,
double delay) {
ExTask delTask = make_STRCPtr<VBucketMemoryDeletionTask>(engine, vb, delay);
ExecutorPool::get()->schedule(delTask);
ENGINE_ERROR_CODE KVBucket::deleteVBucket(uint16_t vbid, const void* c) {
// Lock to prevent a race condition between a failed update and add
// (and delete).
- RCPtr<VBucket> vb;
+ VBucketPtr vb;
{
LockHolder lh(vbsetMutex);
vb = vbMap.getBucket(vbid);
ENGINE_ERROR_CODE KVBucket::checkForDBExistence(DBFileId db_file_id) {
std::string backend = engine.getConfiguration().getBackend();
if (backend.compare("couchdb") == 0) {
- RCPtr<VBucket> vb = vbMap.getBucket(db_file_id);
+ VBucketPtr vb = vbMap.getBucket(db_file_id);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
}
/* Obtain the vbucket so we can get the previous purge seqno */
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
*/
for (auto& it : ctx->max_purged_seq) {
const uint16_t vbid = it.first;
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (!vb) {
continue;
}
* the writer and compactor threads
*/
if (concWriteCompact == false) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (!vb) {
err = ENGINE_NOT_MY_VBUCKET;
engine.storeEngineSpecific(cookie, NULL);
bool KVBucket::resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset) {
bool rv(false);
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
if (vb) {
vbucket_state_t vbstate = vb->getState();
false/*transfer*/, true/*notifyDcp*/, vbset);
// Copy the all cursors from the old vbucket into the new vbucket
- RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
+ VBucketPtr newvb = vbMap.getBucket(vbid);
newvb->checkpointManager.resetCursors(cursors);
rv = true;
// Lock to prevent a race condition between a fetch for restore and delete
LockHolder lh(vbsetMutex);
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (vb) {
VBucketBGFetchItem item{gcb.val, cookie, init, isMeta};
ENGINE_ERROR_CODE status =
void KVBucket::completeBGFetchMulti(uint16_t vbId,
std::vector<bgfetched_item_t>& fetchedItems,
ProcessClock::time_point startTime) {
- RCPtr<VBucket> vb = getVBucket(vbId);
+ VBucketPtr vb = getVBucket(vbId);
if (vb) {
for (const auto& item : fetchedItems) {
auto& key = item.first;
vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
vbucket_state_replica : vbucket_state_active;
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
++stats.numNotMyVBuckets;
std::unique_ptr<Item> itm;
while (itm == NULL) {
- RCPtr<VBucket> vb = getVBucket(curr++);
+ VBucketPtr vb = getVBucket(curr++);
while (!vb || vb->getState() != vbucket_state_active) {
if (curr == start) {
return GetValue(NULL, ENGINE_KEY_ENOENT);
uint32_t& deleted,
uint8_t& datatype)
{
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
++stats.numNotMyVBuckets;
ExtendedMetaData *emd,
bool isReplication)
{
- RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
+ VBucketPtr vb = getVBucket(itm.getVBucketId());
if (!vb) {
++stats.numNotMyVBuckets;
return ENGINE_NOT_MY_VBUCKET;
GetValue KVBucket::getAndUpdateTtl(const DocKey& key, uint16_t vbucket,
const void *cookie, time_t exptime)
{
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
++stats.numNotMyVBuckets;
return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
GetValue KVBucket::getLocked(const DocKey& key, uint16_t vbucket,
rel_time_t currentTime, uint32_t lockTimeout,
const void *cookie) {
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb || vb->getState() != vbucket_state_active) {
++stats.numNotMyVBuckets;
return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
rel_time_t currentTime)
{
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb || vb->getState() != vbucket_state_active) {
++stats.numNotMyVBuckets;
return ENGINE_NOT_MY_VBUCKET;
const void* cookie,
struct key_stats& kstats,
WantsDeleted wantsDeleted) {
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
std::string KVBucket::validateKey(const DocKey& key, uint16_t vbucket,
Item &diskItem) {
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
auto hbl = vb->ht.getLockedBucket(key);
StoredValue* v = vb->fetchValidValue(
hbl, key, WantsDeleted::Yes, TrackReference::No, QueueExpired::Yes);
Item* itm,
ItemMetaData* itemMeta,
mutation_descr_t* mutInfo) {
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb || vb->getState() == vbucket_state_dead) {
++stats.numNotMyVBuckets;
return ENGINE_NOT_MY_VBUCKET;
uint64_t bySeqno,
ExtendedMetaData* emd,
bool isReplication) {
- RCPtr<VBucket> vb = getVBucket(vbucket);
+ VBucketPtr vb = getVBucket(vbucket);
if (!vb) {
++stats.numNotMyVBuckets;
void KVBucket::reset() {
auto buckets = vbMap.getBuckets();
for (auto vbid : buckets) {
- RCPtr<VBucket> vb = getVBucket(vbid);
+ VBucketPtr vb = getVBucket(vbid);
if (vb) {
LockHolder lh(vb_mutexes[vb->getId()]);
vb->ht.clear();
public Callback<int> {
public:
- PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
+ PersistenceCallback(const queued_item &qi, VBucketPtr &vb,
EPStats& s, uint64_t c)
: queuedItem(qi), vbucket(vb), stats(s), cas(c) {
if (!vb) {
}
}
- RCPtr<VBucket>& getVBucket() {
+ VBucketPtr& getVBucket() {
return vbucket;
}
}
const queued_item queuedItem;
- RCPtr<VBucket> vbucket;
+ VBucketPtr vbucket;
EPStats& stats;
uint64_t cas;
DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
void KVBucket::flushOneDeleteAll() {
for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
- RCPtr<VBucket> vb = getVBucket(i);
+ VBucketPtr vb = getVBucket(i);
// Reset the vBucket if it's non-null and not already in the middle of
// being created / destroyed.
if (vb && !(vb->isBucketCreation() || vb->isBucketDeletion())) {
int items_flushed = 0;
const hrtime_t flush_start = gethrtime();
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
if (vb) {
std::unique_lock<std::mutex> lh(vb_mutexes[vbid], std::try_to_lock);
if (!lh.owns_lock()) { // Try another bucket if this one is locked
std::unordered_set<uint16_t> vbSet;
for (auto pcbIter : pcbs) {
PersistenceCallback *pcb = pcbIter;
- RCPtr<VBucket>& vb = pcb->getVBucket();
+ VBucketPtr& vb = pcb->getVBucket();
uint16_t vbid = vb->getId();
auto found = vbSet.find(vbid);
if (found == vbSet.end()) {
}
PersistenceCallback* KVBucket::flushOneDelOrSet(const queued_item &qi,
- RCPtr<VBucket> &vb) {
+ VBucketPtr &vb) {
if (!vb) {
--stats.diskQueueSize;
void KVBucket::setAllBloomFilters(bool to) {
for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); vbid++) {
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
if (vb) {
if (to) {
vb->setFilterStatus(BFILTER_ENABLED);
void KVBucket::visit(VBucketVisitor &visitor)
{
for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); ++vbid) {
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
if (vb) {
visitor.visitBucket(vb);
}
{
uint16_t vbid = start_pos.vbucket_id;
for (; vbid < vbMap.getSize(); ++vbid) {
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
if (vb) {
bool paused = !visitor.visit(vbid, vb->ht);
if (paused) {
TRACE_EVENT("ep-engine/task", "VBCBAdaptor", vbList.front());
currentvb.store(vbList.front());
updateDescription();
- RCPtr<VBucket> vb = store->getVBucket(currentvb);
+ VBucketPtr vb = store->getVBucket(currentvb);
if (vb) {
if (visitor->pauseVisitor()) {
snooze(sleepTime);
return ENGINE_TMPFAIL; // Reschedule a vbucket rollback task.
}
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ VBucketPtr vb = vbMap.getBucket(vbid);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
}
}
if (resetVBucket_UNLOCKED(vbid, vbset)) {
- RCPtr<VBucket> newVb = vbMap.getBucket(vbid);
+ VBucketPtr newVb = vbMap.getBucket(vbid);
newVb->incrRollbackItemCount(prevHighSeqno);
return ENGINE_SUCCESS;
}
}
ENGINE_ERROR_CODE KVBucket::forceMaxCas(uint16_t vbucket, uint64_t cas) {
- RCPtr<VBucket> vb = vbMap.getBucket(vbucket);
+ VBucketPtr vb = vbMap.getBucket(vbucket);
if (vb) {
vb->forceMaxCas(cas);
return ENGINE_SUCCESS;
std::vector<bgfetched_item_t>& fetchedItems,
ProcessClock::time_point start);
- RCPtr<VBucket> getVBucket(uint16_t vbid) {
+ VBucketPtr getVBucket(uint16_t vbid) {
return vbMap.getBucket(vbid);
}
bfilterResidencyThreshold = to;
}
- bool isMetaDataResident(RCPtr<VBucket> &vb, const DocKey& key);
+ bool isMetaDataResident(VBucketPtr &vb, const DocKey& key);
void logQTime(TaskId taskType, const ProcessClock::duration enqTime) {
const auto ns_count = std::chrono::duration_cast
/**
* Create a VBucket object appropriate for this Bucket class.
*/
- virtual RCPtr<VBucket> makeVBucket(
+ virtual VBucketPtr makeVBucket(
VBucket::id_type id,
vbucket_state_t state,
KVShard* shard,
*/
void compactInternal(compaction_ctx *ctx);
- void scheduleVBDeletion(RCPtr<VBucket> &vb,
+ void scheduleVBDeletion(VBucketPtr &vb,
const void* cookie,
double delay = 0);
void flushOneDeleteAll(void);
PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
- RCPtr<VBucket> &vb);
+ VBucketPtr &vb);
GetValue getInternal(const DocKey& key, uint16_t vbucket, const void *cookie,
vbucket_state_t allowedState,
*
* @param vb the vbucket we are beginning to visit
*/
- virtual void visitBucket(RCPtr<VBucket> &vb) = 0;
+ virtual void visitBucket(VBucketPtr &vb) = 0;
const VBucketFilter &getVBucketFilter() {
return vBucketFilter;
std::vector<bgfetched_item_t>& fetchedItems,
ProcessClock::time_point start) = 0;
- virtual RCPtr<VBucket> getVBucket(uint16_t vbid) = 0;
+ virtual VBucketPtr getVBucket(uint16_t vbid) = 0;
/**
* Returns the last persisted checkpoint Id for the specified vBucket.
virtual void setBfiltersResidencyThreshold(float to) = 0;
- virtual bool isMetaDataResident(RCPtr<VBucket> &vb,
+ virtual bool isMetaDataResident(VBucketPtr &vb,
const DocKey& key) = 0;
virtual void logQTime(TaskId taskType,
/**
* Create a VBucket object appropriate for this Bucket class.
*/
- virtual RCPtr<VBucket> makeVBucket(
+ virtual VBucketPtr makeVBucket(
VBucket::id_type id,
vbucket_state_t state,
KVShard* shard,
*/
virtual void compactInternal(compaction_ctx *ctx) = 0;
- virtual void scheduleVBDeletion(RCPtr<VBucket> &vb,
+ virtual void scheduleVBDeletion(VBucketPtr &vb,
const void* cookie,
double delay = 0) = 0;
virtual void flushOneDeleteAll(void) = 0;
virtual PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
- RCPtr<VBucket> &vb) = 0;
+ VBucketPtr &vb) = 0;
virtual GetValue getInternal(const DocKey& key, uint16_t vbucket,
const void *cookie,
return bgFetcher.get();
}
-RCPtr<VBucket> KVShard::getBucket(uint16_t id) const {
+VBucketPtr KVShard::getBucket(uint16_t id) const {
if (id < vbuckets.size()) {
return vbuckets[id];
} else {
}
}
-void KVShard::setBucket(const RCPtr<VBucket> &vb) {
+void KVShard::setBucket(const VBucketPtr &vb) {
vbuckets[vb->getId()].reset(vb);
}
for (int state = vbucket_state_active;
state <= vbucket_state_dead;
++state) {
- for (RCPtr<VBucket> b : vbuckets) {
+ for (VBucketPtr b : vbuckets) {
if (b && b->getState() == state) {
rv.push_back(b->getId());
}
std::vector<VBucket::id_type> KVShard::getVBuckets() {
std::vector<VBucket::id_type> rv;
- for (RCPtr<VBucket> b : vbuckets) {
+ for (VBucketPtr b : vbuckets) {
if (b) {
rv.push_back(b->getId());
}
Flusher *getFlusher();
BgFetcher *getBgFetcher();
- RCPtr<VBucket> getBucket(VBucket::id_type id) const;
- void setBucket(const RCPtr<VBucket> &b);
+ VBucketPtr getBucket(VBucket::id_type id) const;
+ void setBucket(const VBucketPtr &b);
void resetBucket(VBucket::id_type id);
KVShard::id_type getId() const {
private:
KVStoreConfig kvConfig;
- std::vector<RCPtr<VBucket>> vbuckets;
+ std::vector<VBucketPtr> vbuckets;
std::unique_ptr<KVStore> rwStore;
std::unique_ptr<KVStore> roStore;
"when engine is NULL");
}
for (const uint16_t vb : vbid_set) {
- RCPtr<VBucket> vbucket = engine->getKVBucket()->getVBucket(vb);
+ VBucketPtr vbucket = engine->getKVBucket()->getVBucket(vb);
if (!vbucket) {
continue;
}
// Remove TAP cursors from the vbuckets that don't belong to the new vbucket filter.
for (std::set<uint16_t>::const_iterator it = vset.begin(); it != vset.end(); ++it) {
if (vbucketFilter(*it)) {
- RCPtr<VBucket> vb = vbMap.getBucket(*it);
+ VBucketPtr vb = vbMap.getBucket(*it);
if (vb) {
vb->checkpointManager.removeCursor(getName());
}
std::map<uint16_t, CheckpointState>::iterator it = checkpointState_.begin();
for (; it != checkpointState_.end(); ++it) {
uint16_t vbid = it->first;
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
+ VBucketPtr vb = vbuckets.getBucket(vbid);
if (!vb || (vb->getState() == vbucket_state_dead && !doTakeOver)) {
logger.log(EXTENSION_LOG_WARNING,
"Skip vbucket %d checkpoint queue as it's in invalid state.",
std::map<uint16_t, CheckpointState>::iterator it = checkpointState_.begin();
for (; it != checkpointState_.end(); ++it) {
uint16_t vbid = it->first;
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
+ VBucketPtr vb = vbuckets.getBucket(vbid);
if (!vb || (vb->getState() == vbucket_state_dead && !doTakeOver)) {
continue;
}
std::map<uint16_t, CheckpointState>::iterator it = checkpointState_.begin();
for (; it != checkpointState_.end(); ++it) {
uint16_t vbid = it->first;
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
+ VBucketPtr vb = vbuckets.getBucket(vbid);
if (!vb || (vb->getState() == vbucket_state_dead && !doTakeOver)) {
continue;
}
// Skip all the vbuckets that are (1) receiving backfill from their master nodes
// or (2) already scheduled for backfill.
for (; vbit != vblist.end(); ++vbit) {
- RCPtr<VBucket> vb = vbuckets.getBucket(*vbit);
+ VBucketPtr vb = vbuckets.getBucket(*vbit);
if (!vb || vb->isBackfillPhase() ||
backfillVBuckets.find(*vbit) != backfillVBuckets.end()) {
continue;
std::vector<uint16_t>::iterator it = new_vblist.begin();
for (; it != new_vblist.end(); ++it) {
- RCPtr<VBucket> vb = vbuckets.getBucket(*it);
+ VBucketPtr vb = vbuckets.getBucket(*it);
if (!vb) {
logger.log(EXTENSION_LOG_WARNING,
"VBucket %d not exist for backfill. Skip it.", *it);
if (mayCompleteDumpOrTakeover_UNLOCKED()) {
ev = nextVBucketLowPriority_UNLOCKED();
if (ev.event != TAP_PAUSE) {
- RCPtr<VBucket> vb = engine_.getVBucket(ev.vbucket);
+ VBucketPtr vb = engine_.getVBucket(ev.vbucket);
vbucket_state_t myState(vb ? vb->getState() : vbucket_state_dead);
if (ev.event != TAP_VBUCKET_SET) {
throw std::logic_error("TapProducer::checkDumpOrTakeOverCompletion: "
const VBucketMap &vbuckets = engine_.getKVBucket()->getVBuckets();
for (VBucketMap::id_type vbid = 0; vbid < vbuckets.getSize(); ++vbid) {
if (vbucketFilter(vbid)) {
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
+ VBucketPtr vb = vbuckets.getBucket(vbid);
if (!vb) {
checkpointState_.erase(vbid);
logger.log(EXTENSION_LOG_WARNING,
void Consumer::setBackfillPhase(bool isBackfill, uint16_t vbucket) {
const VBucketMap &vbuckets = engine_.getKVBucket()->getVBuckets();
- RCPtr<VBucket> vb = vbuckets.getBucket(vbucket);
+ VBucketPtr vb = vbuckets.getBucket(vbucket);
if (!(vb && supportCheckpointSync_)) {
return;
}
bool Consumer::isBackfillPhase(uint16_t vbucket) {
const VBucketMap &vbuckets = engine_.getKVBucket()->getVBuckets();
- RCPtr<VBucket> vb = vbuckets.getBucket(vbucket);
+ VBucketPtr vb = vbuckets.getBucket(vbucket);
if (vb && vb->isBackfillPhase()) {
return true;
}
void Consumer::checkVBOpenCheckpoint(uint16_t vbucket) {
const VBucketMap &vbuckets = engine_.getKVBucket()->getVBuckets();
- RCPtr<VBucket> vb = vbuckets.getBucket(vbucket);
+ VBucketPtr vb = vbuckets.getBucket(vbucket);
if (!vb || vb->getState() == vbucket_state_active) {
return;
}
bool TapConsumer::processCheckpointCommand(uint8_t event, uint16_t vbucket,
uint64_t checkpointId) {
const VBucketMap &vbuckets = engine_.getKVBucket()->getVBuckets();
- RCPtr<VBucket> vb = vbuckets.getBucket(vbucket);
+ VBucketPtr vb = vbuckets.getBucket(vbucket);
if (!vb) {
return false;
}
const VBucketMap &vbuckets = engine.getKVBucket()->getVBuckets();
// Remove all the cursors belonging to the TAP connection to be purged.
for (VBucketMap::id_type vbid = 0; vbid < vbuckets.getSize(); ++vbid) {
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
+ VBucketPtr vb = vbuckets.getBucket(vbid);
if (!vb) {
continue;
}
#include "vb_count_visitor.h"
-void VBucketCountVisitor::visitBucket(RCPtr<VBucket>& vb) {
+void VBucketCountVisitor::visitBucket(VBucketPtr& vb) {
++numVbucket;
numItems += vb->getNumItems();
numTempItems += vb->getNumTempItems();
totalHLCDriftExceptionCounters() {
}
- void visitBucket(RCPtr<VBucket>& vb) override;
+ void visitBucket(VBucketPtr& vb) override;
vbucket_state_t getVBucketState() {
return desired_state;
DISALLOW_COPY_AND_ASSIGN(VBucket);
};
+
+using VBucketPtr = RCPtr<VBucket>;
new VBucketConfigChangeListener(*this));
}
-RCPtr<VBucket> VBucketMap::getBucket(id_type id) const {
- static RCPtr<VBucket> emptyVBucket;
+VBucketPtr VBucketMap::getBucket(id_type id) const {
+ static VBucketPtr emptyVBucket;
if (id < size) {
return getShardByVbId(id)->getBucket(id);
} else {
}
}
-ENGINE_ERROR_CODE VBucketMap::addBucket(const RCPtr<VBucket> &b) {
+ENGINE_ERROR_CODE VBucketMap::addBucket(const VBucketPtr &b) {
if (b->getId() < size) {
getShardByVbId(b->getId())->setBucket(b);
LOG(EXTENSION_LOG_INFO, "Mapped new vbucket %d in state %s",
std::vector<VBucketMap::id_type> VBucketMap::getBuckets(void) const {
std::vector<id_type> rv;
for (id_type i = 0; i < size; ++i) {
- RCPtr<VBucket> b(getBucket(i));
+ VBucketPtr b(getBucket(i));
if (b) {
rv.push_back(b->getId());
}
for (int state = vbucket_state_active;
state <= vbucket_state_dead; ++state) {
for (size_t i = 0; i < size; ++i) {
- RCPtr<VBucket> b = getBucket(i);
+ VBucketPtr b = getBucket(i);
if (b && b->getState() == state) {
rv.push_back(b->getId());
}
VBucketMap::getActiveVBucketsSortedByChkMgrMem(void) const {
std::vector<std::pair<id_type, size_t> > rv;
for (id_type i = 0; i < size; ++i) {
- RCPtr<VBucket> b = getBucket(i);
+ VBucketPtr b = getBucket(i);
if (b && b->getState() == vbucket_state_active) {
rv.push_back(std::make_pair(b->getId(), b->getChkMgrMemUsage()));
}
void VBucketMap::addBuckets(const std::vector<VBucket*> &newBuckets) {
std::vector<VBucket*>::const_iterator it;
for (it = newBuckets.begin(); it != newBuckets.end(); ++it) {
- RCPtr<VBucket> v(*it);
+ VBucketPtr v(*it);
addBucket(v);
}
}
VBucketMap(Configuration& config, KVBucket& store);
- ENGINE_ERROR_CODE addBucket(const RCPtr<VBucket> &b);
+ ENGINE_ERROR_CODE addBucket(const VBucketPtr &b);
void removeBucket(id_type id);
void addBuckets(const std::vector<VBucket*> &newBuckets);
- RCPtr<VBucket> getBucket(id_type id) const;
+ VBucketPtr getBucket(id_type id) const;
// Returns the size of the map, i.e. the total number of VBuckets it can
// contain.
#include <sstream>
VBucketMemoryDeletionTask::VBucketMemoryDeletionTask(
- EventuallyPersistentEngine& eng, RCPtr<VBucket>& vb, double delay)
+ EventuallyPersistentEngine& eng, VBucketPtr& vb, double delay)
: GlobalTask(&eng, TaskId::VBucketMemoryDeletionTask, delay, true),
e(eng),
vbucket(vb) {
class VBucketMemoryDeletionTask : public GlobalTask {
public:
VBucketMemoryDeletionTask(EventuallyPersistentEngine& eng,
- RCPtr<VBucket>& vb,
+ VBucketPtr& vb,
double delay);
cb::const_char_buffer getDescription();
private:
EventuallyPersistentEngine& e;
- RCPtr<VBucket> vbucket;
+ VBucketPtr vbucket;
std::string description;
};
bool stopLoading = false;
if (i != NULL && !epstore.getWarmup()->isComplete()) {
- RCPtr<VBucket> vb = vbuckets.getBucket(i->getVBucketId());
+ VBucketPtr vb = vbuckets.getBucket(i->getVBucketId());
if (!vb) {
setStatus(ENGINE_NOT_MY_VBUCKET);
return;
EmergencyPurgeVisitor(KVBucket& store) :
epstore(store) {}
- void visitBucket(RCPtr<VBucket> &vb) override {
+ void visitBucket(VBucketPtr &vb) override {
if (vBucketFilter(vb->getId())) {
currentBucket = vb;
vb->ht.visit(*this);
private:
KVBucket& epstore;
- RCPtr<VBucket> currentBucket;
+ VBucketPtr currentBucket;
};
auto vbucketIds(vbuckets.getBuckets());
EmergencyPurgeVisitor epv(epstore);
for (auto vbid : vbucketIds) {
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
+ VBucketPtr vb = vbuckets.getBucket(vbid);
if (vb) {
epv.visitBucket(vb);
}
void LoadValueCallback::callback(CacheLookup &lookup)
{
if (warmupState == WarmupState::LoadingData) {
- RCPtr<VBucket> vb = vbuckets.getBucket(lookup.getVBucketId());
+ VBucketPtr vb = vbuckets.getBucket(lookup.getVBucketId());
if (!vb) {
return;
}
uint16_t vbid = itr.first;
vbucket_state vbs = itr.second;
- RCPtr<VBucket> vb = store.getVBucket(vbid);
+ VBucketPtr vb = store.getVBucket(vbid);
if (!vb) {
std::unique_ptr<FailoverTable> table;
if (vbs.failovers.empty()) {
for (const auto vbid : shardVbIds[shardId]) {
size_t vbItemCount = store.getROUnderlyingByShard(shardId)->
getItemCount(vbid);
- RCPtr<VBucket> vb = store.getVBucket(vbid);
+ VBucketPtr vb = store.getVBucket(vbid);
if (vb) {
vb->ht.numTotalItems = vbItemCount;
}
}
// Expose underlying protected ActiveStream methods as public
- void public_getOutstandingItems(RCPtr<VBucket>& vb,
+ void public_getOutstandingItems(VBucketPtr& vb,
std::vector<queued_item>& items) {
getOutstandingItems(vb, items);
}
};
static void assertVBucket(const VBucketMap& vbm, int id) {
- RCPtr<VBucket> v = vbm.getBucket(id);
+ VBucketPtr v = vbm.getBucket(id);
cb_assert(v);
cb_assert(v->getId() == id);
}
for (size_t j = 0; j < vbucketsEach; j++) {
int newId = ++i;
- RCPtr<VBucket> v(new VBucket(newId, vbucket_state_active,
+ VBucketPtr v(new VBucket(newId, vbucket_state_active,
global_stats, checkpoint_config, NULL));
vbm->addBucket(v);
cb_assert(vbm->getBucket(newId) == v);
int st = vbucket_state_dead;
for (int id = 0; id < 4; id++, st--) {
- RCPtr<VBucket> v(new VBucket(id, (vbucket_state_t)st, global_stats,
+ VBucketPtr v(new VBucket(id, (vbucket_state_t)st, global_stats,
checkpoint_config, NULL));
vbm.addBucket(v);
cb_assert(vbm.getBucket(id) == v);
store_item(vbid,
{"$collections::create:meat1", DocNamespace::DefaultCollection},
"value");
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
// Add the meat collection
vb->updateFromManifest(
{R"({"revision":1,)"
0,
{cb::engine_errc::unknown_collection});
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
// Add the meat collection
vb->updateFromManifest(
std::string CollectionsFlushTest::createCollectionAndFlush(
const std::string& json, const std::string& collection, int items) {
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
vb->updateFromManifest(json);
storeItems(collection, DocNamespace::Collections, items);
flush_vbucket_to_disk(vbid, 1 + items); // create event + items
std::string CollectionsFlushTest::deleteCollectionAndFlush(
const std::string& json, const std::string& collection, int items) {
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
storeItems(collection, DocNamespace::Collections, items);
vb->updateFromManifest(json);
flush_vbucket_to_disk(vbid, items); // only flush items
std::string CollectionsFlushTest::completeDeletionAndFlush(
const std::string& collection, int revision, int items) {
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
vb->completeDeletion(collection, revision);
storeItems("defaultcollection", DocNamespace::DefaultCollection, items);
flush_vbucket_to_disk(vbid, 1 + items); // delete event + items
// or after a delete.
//
TEST_F(CollectionsTest, checkpoint_consistency) {
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
CollectionsThreadTest threadTest(*this, *vb, 256, 256);
threadTest.run();
// persisted collection state and should have the collection accessible.
//
TEST_F(CollectionsWarmupTest, warmup) {
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
// Add the meat collection
vb->updateFromManifest(
/*end_seqno*/ 100,
/*flags*/ 0));
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
EXPECT_FALSE(vb->lockCollections().doesKeyContainValidCollection(
{"meat::bacon", DocNamespace::Collections}));
* The test replicates VBn to VBn+1
*/
TEST_F(CollectionsDcpTest, test_dcp) {
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
// Add a collection, then remove it. This generated events into the CP which
// we'll manually replicate with calls to step
// Next step which will process a snapshot marker
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
- RCPtr<VBucket> replica = store->getVBucket(replicaVB);
+ VBucketPtr replica = store->getVBucket(replicaVB);
// 1. Replica does not know about meat
EXPECT_FALSE(vb->lockCollections().doesKeyContainValidCollection(
}
TEST_F(CollectionsDcpTest, test_dcp_separator) {
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
// Change the separator
vb->updateFromManifest(
// Next step which should process a snapshot marker
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
- RCPtr<VBucket> replica = store->getVBucket(replicaVB);
+ VBucketPtr replica = store->getVBucket(replicaVB);
// Now step the producer to transfer the separator
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
dcp_producer_t producer;
stream_t stream;
- RCPtr<VBucket> vb0;
+ VBucketPtr vb0;
};
/* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
const void* cookie;
SingleThreadedRCPtr<MockDcpConsumer> consumer;
std::unique_ptr<dcp_message_producers> producers;
- RCPtr<VBucket> vb;
+ VBucketPtr vb;
};
RollbackDcpTest::StreamRequestData RollbackDcpTest::streamRequestData = {};
store_item(vbid, makeStoredDocKey("key"), "value");
// Force a new checkpoint.
- RCPtr<VBucket> vb = store->getVBuckets().getBucket(vbid);
+ VBucketPtr vb = store->getVBuckets().getBucket(vbid);
CheckpointManager& ckpt_mgr = vb->checkpointManager;
ckpt_mgr.createNewCheckpoint();
auto lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
unlock_mock_cookie(cookie);
// Manually run the VBucketMemoryDeletionTask task
- RCPtr<VBucket> vb = store->getVBucket(vbid);
+ VBucketPtr vb = store->getVBucket(vbid);
VBucketMemoryDeletionTask deletionTask(*engine, vb, /*delay*/0.0);
deletionTask.run();