1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Copyright 2010 Couchbase, Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
23 #include <memcached/engine.h>
41 #include "bgfetcher.h"
42 #include "item_pager.h"
45 #include "executorpool.h"
47 #include "stored-value.h"
49 #include "vbucketmap.h"
52 * vbucket-aware hashtable visitor.
54 class VBucketVisitor : public HashTableVisitor {
57 VBucketVisitor() : HashTableVisitor() { }
59 VBucketVisitor(const VBucketFilter &filter) :
60 HashTableVisitor(), vBucketFilter(filter) { }
63 * Begin visiting a bucket.
65 * @param vb the vbucket we are beginning to visit
67 * @return true iff we want to walk the hashtable in this vbucket
69 virtual bool visitBucket(RCPtr<VBucket> &vb) {
70 if (vBucketFilter(vb->getId())) {
77 // This is unused in all implementations so far.
78 void visit(StoredValue* v) {
83 const VBucketFilter &getVBucketFilter() {
88 * Called after all vbuckets have been visited.
90 virtual void complete() { }
93 * Return true if visiting vbuckets should be paused temporarily.
95 virtual bool pauseVisitor() {
100 VBucketFilter vBucketFilter;
101 RCPtr<VBucket> currentBucket;
104 // Forward declaration
105 class BGFetchCallback;
106 class ConflictResolution;
107 class EventuallyPersistentStore;
110 class PersistenceCallback;
114 * VBucket visitor callback adaptor.
116 class VBCBAdaptor : public GlobalTask {
119 VBCBAdaptor(EventuallyPersistentStore *s,
120 shared_ptr<VBucketVisitor> v, const char *l, const Priority &p,
123 std::string getDescription() {
124 std::stringstream rv;
125 rv << label << " on vb " << currentvb;
132 std::queue<uint16_t> vbList;
133 EventuallyPersistentStore *store;
134 shared_ptr<VBucketVisitor> visitor;
139 DISALLOW_COPY_AND_ASSIGN(VBCBAdaptor);
144 * Vbucket visitor task for a generic scheduler.
146 class VBucketVisitorTask : public GlobalTask {
149 VBucketVisitorTask(EventuallyPersistentStore *s,
150 shared_ptr<VBucketVisitor> v, uint16_t sh,
151 const char *l, double sleep=0, bool shutdown=true);
153 std::string getDescription() {
154 std::stringstream rv;
155 rv << label << " on vb " << currentvb;
162 std::queue<uint16_t> vbList;
163 EventuallyPersistentStore *store;
164 shared_ptr<VBucketVisitor> visitor;
171 const uint16_t EP_PRIMARY_SHARD = 0;
174 typedef std::pair<uint16_t, ExTask> CompTaskEntry;
177 * Manager of all interaction with the persistence.
179 class EventuallyPersistentStore {
182 EventuallyPersistentStore(EventuallyPersistentEngine &theEngine);
183 ~EventuallyPersistentStore();
188 * Set an item in the store.
189 * @param item the item to set
190 * @param cookie the cookie representing the client to store the item
191 * @param force override access to the vbucket even if the state of the
192 * vbucket would deny mutations.
193 * @param nru the nru bit value for the item
194 * @return the result of the store operation
196 ENGINE_ERROR_CODE set(const Item &item,
202 * Add an item in the store.
203 * @param item the item to add
204 * @param cookie the cookie representing the client to store the item
205 * @return the result of the operation
207 ENGINE_ERROR_CODE add(const Item &item, const void *cookie);
210 * Replace an item in the store.
211 * @param item the item to replace
212 * @param cookie the cookie representing the client to store the item
213 * @return the result of the operation
215 ENGINE_ERROR_CODE replace(const Item &item, const void *cookie);
218 * Add an TAP backfill item into its corresponding vbucket
219 * @param item the item to be added
220 * @param meta contains meta info or not
221 * @param nru the nru bit for the item
222 * @return the result of the operation
224 ENGINE_ERROR_CODE addTAPBackfillItem(const Item &item, uint8_t nru = 0xff,
225 bool genBySeqno = true);
230 * @param key the key to fetch
231 * @param vbucket the vbucket from which to retrieve the key
232 * @param cookie the connection cookie
233 * @param queueBG if true, automatically queue a background fetch if necessary
234 * @param honorStates if false, fetch a result regardless of state
235 * @param trackReference true if we want to set the nru bit for the item
237 * @return a GetValue representing the result of the request
239 GetValue get(const std::string &key, uint16_t vbucket,
240 const void *cookie, bool queueBG=true,
241 bool honorStates=true, bool trackReference=true) {
242 return getInternal(key, vbucket, cookie, queueBG, honorStates,
243 vbucket_state_active, trackReference);
246 GetValue getRandomKey(void);
249 * Retrieve a value from a vbucket in replica state.
251 * @param key the key to fetch
252 * @param vbucket the vbucket from which to retrieve the key
253 * @param cookie the connection cookie
254 * @param queueBG if true, automatically queue a background fetch if necessary
256 * @return a GetValue representing the result of the request
258 GetValue getReplica(const std::string &key, uint16_t vbucket,
259 const void *cookie, bool queueBG=true) {
260 return getInternal(key, vbucket, cookie, queueBG, true,
261 vbucket_state_replica);
266 * Retrieve the meta data for an item
268 * @parapm key the key to get the meta data for
269 * @param vbucket the vbucket from which to retrieve the key
270 * @param cookie the connection cookie
271 * @param metadata where to store the meta informaion
272 * @param true if we want to set the nru bit for the item
273 * @param deleted specifies whether or not the key is deleted
275 ENGINE_ERROR_CODE getMetaData(const std::string &key,
278 ItemMetaData &metadata,
280 bool trackReference = false);
283 * Set an item in the store.
284 * @param item the item to set
285 * @param cas value to match
286 * @param cookie the cookie representing the client to store the item
287 * @param force override vbucket states
288 * @param allowExisting set to false if you want set to fail if the
289 * item exists already
290 * @param nru the nru bit for the item
291 * @param isReplication set to true if we are to use replication
293 * @return the result of the store operation
295 ENGINE_ERROR_CODE setWithMeta(const Item &item,
301 bool genBySeqno = true,
302 bool isReplication = false);
305 * Retrieve a value, but update its TTL first
307 * @param key the key to fetch
308 * @param vbucket the vbucket from which to retrieve the key
309 * @param cookie the connection cookie
310 * @param exptime the new expiry time for the object
312 * @return a GetValue representing the result of the request
314 GetValue getAndUpdateTtl(const std::string &key, uint16_t vbucket,
315 const void *cookie, time_t exptime);
318 * Retrieve an item from the disk for vkey stats
320 * @param key the key to fetch
321 * @param vbucket the vbucket from which to retrieve the key
322 * @param cookie the connection cookie
323 * @param cb callback to return an item fetched from the disk
325 * @return a status resulting form executing the method
327 ENGINE_ERROR_CODE statsVKey(const std::string &key,
331 void completeStatsVKey(const void* cookie, std::string &key, uint16_t vbid,
334 protocol_binary_response_status evictKey(const std::string &key,
341 * delete an item in the store.
342 * @param key the key of the item
343 * @param cas the CAS ID for a CASed delete (0 to override)
344 * @param vbucket the vbucket for the key
345 * @param cookie the cookie representing the client
346 * @param force override access to the vbucket even if the state of the
347 * vbucket would deny mutations.
348 * @param itemMeta the pointer to the metadata memory.
349 * @param tapBackfill true if an item deletion is from TAP backfill stream
351 * @param isReplication set to true if we are to use replication
353 * @return the result of the delete operation
355 ENGINE_ERROR_CODE deleteItem(const std::string &key,
360 ItemMetaData *itemMeta,
361 bool tapBackfill=false);
363 ENGINE_ERROR_CODE deleteWithMeta(const std::string &key,
368 ItemMetaData *itemMeta,
369 bool tapBackfill=false,
370 bool genBySeqno=true,
372 bool isReplication=false);
377 * Set the background fetch delay.
379 * This exists for debugging and testing purposes. It
380 * artificially injects delays into background fetches that are
381 * performed when the user requests an item whose value is not
382 * currently resident.
384 * @param to how long to delay before performing a bg fetch
386 void setBGFetchDelay(uint32_t to) {
390 double getBGFetchDelay(void) { return (double)bgFetchDelay; }
392 void stopFlusher(void);
394 bool startFlusher(void);
396 bool pauseFlusher(void);
397 bool resumeFlusher(void);
398 void wakeUpFlusher(void);
400 bool startBgFetcher(void);
401 void stopBgFetcher(void);
404 * Takes a snapshot of the current stats and persists them to disk.
406 void snapshotStats(void);
409 * Enqueue a background fetch for a key.
411 * @param key the key to be bg fetched
412 * @param vbucket the vbucket in which the key lives
413 * @param rowid the rowid of the record within its shard
414 * @param cookie the cookie of the requestor
415 * @param type whether the fetch is for a non-resident value or metadata of
416 * a (possibly) deleted item
418 void bgFetch(const std::string &key,
422 bool isMeta = false);
425 * Complete a background fetch of a non resident value or metadata.
427 * @param key the key that was fetched
428 * @param vbucket the vbucket in which the key lived
429 * @param rowid the rowid of the record within its shard
430 * @param cookie the cookie of the requestor
431 * @param init the timestamp of when the request came in
432 * @param type whether the fetch is for a non-resident value or metadata of
433 * a (possibly) deleted item
435 void completeBGFetch(const std::string &key,
442 * Complete a batch of background fetch of a non resident value or metadata.
444 * @param vbId the vbucket in which the requested key lived
445 * @param fetchedItems vector of completed background feches containing key,
446 * value, client cookies
447 * @param start the time when the background fetch was started
450 void completeBGFetchMulti(uint16_t vbId,
451 std::vector<bgfetched_item_t> &fetchedItems,
455 * Helper function to update stats after completion of a background fetch
456 * for either the value of metadata of a key.
458 * @param init the time of epstore's initialization
459 * @param start the time when the background fetch was started
460 * @param stop the time when the background fetch completed
462 void updateBGStats(const hrtime_t init,
463 const hrtime_t start,
464 const hrtime_t stop);
466 RCPtr<VBucket> getVBucket(uint16_t vbid) {
467 return vbMap.getBucket(vbid);
470 uint64_t getLastPersistedCheckpointId(uint16_t vb) {
471 return vbMap.getPersistenceCheckpointId(vb);
474 uint64_t getLastPersistedSeqno(uint16_t vb) {
475 return vbMap.getPersistenceSeqno(vb);
478 void snapshotVBuckets(const Priority &priority, uint16_t shardId);
480 /* transfer should be set to true *only* if this vbucket is becoming master
481 * as the result of the previous master cleanly handing off contorol. */
482 ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t state,
483 bool transfer, bool notify_dcp = true);
486 * Physically deletes a VBucket from disk. This function should only
487 * be called on a VBucket that has already been logically deleted.
489 * @param vbid vbucket id
490 * @param cookie The connection that requested the deletion
492 bool completeVBucketDeletion(uint16_t vbid, const void* cookie);
497 * @param vbid The vbucket to delete.
498 * @param c The cookie for this connection.
499 * Used in synchronous bucket deletes
500 * to notify the connection of operation completion.
502 ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL);
505 * Triggers compaction of a vbucket
507 * @param vbid The vbucket to compact.
508 * @param c The context for compaction of a DB file
509 * @param ck cookie used to notify connection of operation completion
511 ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c,
515 * Callback to do the compaction of a vbucket
517 * @param vbid The Id of the VBucket which needs to be compacted
518 * @param ctx Context for couchstore compaction hooks
519 * @param ck cookie used to notify connection of operation completion
521 bool compactVBucket(const uint16_t vbid, compaction_ctx *ctx,
525 * Reset a given vbucket from memory and disk. This differs from vbucket deletion in that
526 * it does not delete the vbucket instance from memory hash table.
528 bool resetVBucket(uint16_t vbid);
530 void visit(VBucketVisitor &visitor);
533 * Run a vbucket visitor with separate jobs per vbucket.
535 * Note that this is asynchronous.
537 size_t visit(shared_ptr<VBucketVisitor> visitor, const char *lbl,
538 task_type_t taskGroup, const Priority &prio,
539 double sleepTime=0) {
540 return ExecutorPool::get()->schedule(new VBCBAdaptor(this, visitor,
541 lbl, prio, sleepTime), taskGroup);
544 const Flusher* getFlusher(uint16_t shardId);
545 Warmup* getWarmup(void) const;
547 ENGINE_ERROR_CODE getKeyStats(const std::string &key, uint16_t vbucket,
548 const void* cookie, key_stats &kstats,
549 bool bgfetch, bool wantsDeleted=false);
551 std::string validateKey(const std::string &key, uint16_t vbucket,
554 bool getLocked(const std::string &key, uint16_t vbucket,
555 Callback<GetValue> &cb,
556 rel_time_t currentTime, uint32_t lockTimeout,
559 ENGINE_ERROR_CODE unlockKey(const std::string &key,
562 rel_time_t currentTime);
565 KVStore* getRWUnderlying(uint16_t vbId) {
566 return vbMap.getShard(vbId)->getRWUnderlying();
569 KVStore* getRWUnderlyingByShard(size_t shardId) {
570 return vbMap.shards[shardId]->getRWUnderlying();
573 KVStore* getROUnderlyingByShard(size_t shardId) {
574 return vbMap.shards[shardId]->getROUnderlying();
577 KVStore* getROUnderlying(uint16_t vbId) {
578 return vbMap.getShard(vbId)->getROUnderlying();
581 void deleteExpiredItem(uint16_t, std::string &, time_t, uint64_t );
582 void deleteExpiredItems(std::list<std::pair<uint16_t, std::string> > &);
586 * Get the memoized storage properties from the DB.kv
588 const StorageProperties getStorageProperties() const {
589 return *storageProperties;
593 * schedule a vb_state snapshot task for all the shards.
595 bool scheduleVBSnapshot(const Priority &priority);
598 * schedule a vb_state snapshot task for a given shard.
600 void scheduleVBSnapshot(const Priority &priority, uint16_t shardId,
604 * Schedule a vbstate persistence task for a given vbucket.
606 void scheduleVBStatePersist(const Priority &priority, uint16_t vbid,
610 * Persist a vbucket's state.
612 bool persistVBState(const Priority &priority, uint16_t vbid);
614 const VBucketMap &getVBuckets() {
618 EventuallyPersistentEngine& getEPEngine() {
622 size_t getExpiryPagerSleeptime(void) {
623 LockHolder lh(expiryPager.mutex);
624 return expiryPager.sleeptime;
627 size_t getTransactionTimePerItem() {
628 return lastTransTimePerItem;
631 bool isFlushAllScheduled() {
632 return diskFlushAll.load();
635 void setBackfillMemoryThreshold(double threshold);
637 void setExpiryPagerSleeptime(size_t val);
639 void enableAccessScannerTask();
640 void disableAccessScannerTask();
641 void setAccessScannerSleeptime(size_t val);
642 void resetAccessScannerStartTime();
644 void resetAccessScannerTasktime() {
645 accessScanner.lastTaskRuntime = gethrtime();
648 void incExpirationStat(RCPtr<VBucket> &vb, bool byPager = true) {
650 ++stats.expired_pager;
652 ++stats.expired_access;
654 ++vb->numExpiredItems;
657 void logQTime(type_id_t taskType, hrtime_t enqTime) {
658 stats.schedulingHisto[taskType].add(enqTime);
661 void logRunTime(type_id_t taskType, hrtime_t runTime) {
662 stats.taskRuntimeHisto[taskType].add(runTime);
665 bool multiBGFetchEnabled() {
666 return storageProperties->hasEfficientGet();
669 void updateCachedResidentRatio(size_t activePerc, size_t replicaPerc) {
670 cachedResidentRatio.activeRatio.store(activePerc);
671 cachedResidentRatio.replicaRatio.store(replicaPerc);
676 bool maybeEnableTraffic(void);
679 * Checks the memory consumption.
680 * To be used by backfill tasks (tap & dcp).
682 bool isMemoryUsageTooHigh();
685 * Flushes all items waiting for persistence in a given vbucket
686 * @param vbid The id of the vbucket to flush
687 * @return The amount of items flushed
689 int flushVBucket(uint16_t vbid);
691 void addKVStoreStats(ADD_STAT add_stat, const void* cookie);
693 void addKVStoreTimingStats(ADD_STAT add_stat, const void* cookie);
695 void resetUnderlyingStats(void);
696 KVStore *getOneROUnderlying(void);
697 KVStore *getOneRWUnderlying(void);
699 item_eviction_policy_t getItemEvictionPolicy(void) const {
700 return eviction_policy;
703 ENGINE_ERROR_CODE rollback(uint16_t vbid, uint64_t rollbackSeqno);
705 ExTask &fetchItemPagerTask() {
709 void wakeUpCheckpointRemover() {
710 if (chkTask->getState() == TASK_SNOOZED) {
711 ExecutorPool::get()->wake(chkTask->getId());
715 void setCompactionWriteQueueCap(size_t to) {
716 compactionWriteQueueCap = to;
719 void setCompactionExpMemThreshold(size_t to) {
720 compactionExpMemThreshold = static_cast<double>(to) / 100.0;
723 bool compactionCanExpireItems() {
724 // Process expired items only if memory usage is lesser than
725 // compaction_exp_mem_threshold and disk queue is small
726 // enough (marked by tap_throttle_queue_cap)
728 bool isMemoryUsageOk = (stats.getTotalMemoryUsed() <
729 (stats.getMaxDataSize() * compactionExpMemThreshold));
731 size_t queueSize = stats.diskQueueSize.load();
732 bool isQueueSizeOk = ((stats.tapThrottleWriteQueueCap == -1) ||
733 (queueSize < static_cast<size_t>(stats.tapThrottleWriteQueueCap)));
735 return (isMemoryUsageOk && isQueueSizeOk);
739 // During the warmup phase we might want to enable external traffic
740 // at a given point in time.. The LoadStorageKvPairCallback will be
741 // triggered whenever we want to check if we could enable traffic..
742 friend class LoadStorageKVPairCallback;
744 // Methods called during warmup
745 std::vector<vbucket_state *> loadVBucketState();
747 void warmupCompleted();
748 void stopWarmup(void);
752 void scheduleVBDeletion(RCPtr<VBucket> &vb,
756 RCPtr<VBucket> getVBucket(uint16_t vbid, vbucket_state_t wanted_state);
758 /* Queue an item for persistence and replication
760 * The caller of this function must hold the lock of the hash table
761 * partition that contains the StoredValue being Queued.
763 * @param vb the vbucket that contains the dirty item
764 * @param v the dirty item
765 * @param plh the pointer to the hash table partition lock for the dirty item.
766 * Note that the lock is released inside this function.
767 * @param tapBackfill if the item is from backfill replication
768 * @param notifyReplicator whether or not to notify the replicator
770 void queueDirty(RCPtr<VBucket> &vb,
773 bool tapBackfill = false,
774 bool notifyReplicator = true,
775 bool genBySeqno = true);
778 * Retrieve a StoredValue and invoke a method on it.
780 * Note that because of complications with void/non-void methods
781 * and potentially missing StoredValues along with the way I
782 * actually intend to use this, I don't return any values from
785 * @param key the item's key to retrieve
786 * @param vbid the vbucket containing the item
787 * @param f the method to invoke on the item
789 * @return true if the object was found and method was invoked
791 bool invokeOnLockedStoredValue(const std::string &key, uint16_t vbid,
792 void (StoredValue::* f)()) {
793 RCPtr<VBucket> vb = getVBucket(vbid);
799 LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
800 StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true);
808 void flushOneDeleteAll(void);
809 PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
812 StoredValue *fetchValidValue(RCPtr<VBucket> &vb, const std::string &key,
813 int bucket_num, bool wantsDeleted=false,
814 bool trackReference=true, bool queueExpired=true);
816 GetValue getInternal(const std::string &key, uint16_t vbucket,
817 const void *cookie, bool queueBG,
819 vbucket_state_t allowedState,
820 bool trackReference=true);
822 ENGINE_ERROR_CODE addTempItemForBgFetch(LockHolder &lock, int bucket_num,
823 const std::string &key, RCPtr<VBucket> &vb,
824 const void *cookie, bool metadataOnly,
825 bool isReplication = false);
828 friend class Flusher;
829 friend class BGFetchCallback;
830 friend class VKeyStatBGFetchCallback;
831 friend class PersistenceCallback;
832 friend class Deleter;
833 friend class VBCBAdaptor;
834 friend class VBucketVisitorTask;
835 friend class ItemPager;
836 friend class PagingVisitor;
838 EventuallyPersistentEngine &engine;
840 StorageProperties *storageProperties;
842 ConflictResolution *conflictResolver;
847 size_t compactionWriteQueueCap;
848 float compactionExpMemThreshold;
850 /* Array of mutexes for each vbucket
851 * Used by flush operations: flushVB, deleteVB, compactVB, snapshotVB */
853 AtomicValue<bool> *schedule_vbstate_persist;
854 std::vector<MutationLog*> accessLog;
856 AtomicValue<size_t> bgFetchQueue;
857 AtomicValue<bool> diskFlushAll;
859 uint32_t bgFetchDelay;
860 double backfillMemoryThreshold;
861 struct ExpiryPagerDelta {
862 ExpiryPagerDelta() : sleeptime(0), task(0) {}
868 ALogTask() : sleeptime(0), task(0), lastTaskRuntime(gethrtime()),
873 hrtime_t lastTaskRuntime;
876 struct ResidentRatio {
877 AtomicValue<size_t> activeRatio;
878 AtomicValue<size_t> replicaRatio;
879 } cachedResidentRatio;
880 size_t statsSnapshotTaskId;
881 size_t lastTransTimePerItem;
882 item_eviction_policy_t eviction_policy;
884 Mutex compactionLock;
885 std::list<CompTaskEntry> compactionTasks;
887 DISALLOW_COPY_AND_ASSIGN(EventuallyPersistentStore);