Decouple StoredValue and HashTable class
[ep-engine.git] / src / hash_table.cc
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #include "hash_table.h"
19
20 #include "stored_value_factories.h"
21
22 #include <cstring>
23
24 static const ssize_t prime_size_table[] = {
25     3, 7, 13, 23, 47, 97, 193, 383, 769, 1531, 3079, 6143, 12289, 24571, 49157,
26     98299, 196613, 393209, 786433, 1572869, 3145721, 6291449, 12582917,
27     25165813, 50331653, 100663291, 201326611, 402653189, 805306357,
28     1610612741, -1
29 };
30
31
32 std::ostream& operator<<(std::ostream& os, const HashTable::Position& pos) {
33     os << "{lock:" << pos.lock << " bucket:" << pos.hash_bucket << "/" << pos.ht_size << "}";
34     return os;
35 }
36
37 HashTable::HashTable(EPStats& st,
38                      std::unique_ptr<AbstractStoredValueFactory> svFactory,
39                      size_t initialSize,
40                      size_t locks)
41     : maxDeletedRevSeqno(0),
42       numTotalItems(0),
43       numNonResidentItems(0),
44       numDeletedItems(0),
45       datatypeCounts(),
46       numEjects(0),
47       memSize(0),
48       cacheSize(0),
49       metaDataMemory(0),
50       initialSize(initialSize),
51       size(initialSize),
52       n_locks(locks),
53       stats(st),
54       valFact(std::move(svFactory)),
55       visitors(0),
56       numItems(0),
57       numResizes(0),
58       numTempItems(0) {
59     values.resize(size);
60     mutexes = new std::mutex[n_locks];
61     activeState = true;
62 }
63
64 HashTable::~HashTable() {
65     // Use unlocked clear for the destructor, avoids lock inversions on VBucket
66     // delete
67     clear_UNLOCKED(true);
68     // Wait for any outstanding visitors to finish.
69     while (visitors > 0) {
70 #ifdef _MSC_VER
71         Sleep(1);
72 #else
73         usleep(100);
74 #endif
75     }
76     delete []mutexes;
77 }
78
79 void HashTable::clear(bool deactivate) {
80     if (!deactivate) {
81         // If not deactivating, assert we're already active.
82         if (!isActive()) {
83             throw std::logic_error("HashTable::clear: Cannot call on a "
84                     "non-active object");
85         }
86     }
87     MultiLockHolder mlh(mutexes, n_locks);
88     clear_UNLOCKED(deactivate);
89 }
90
91 void HashTable::clear_UNLOCKED(bool deactivate) {
92     if (deactivate) {
93         setActiveState(false);
94     }
95     size_t clearedMemSize = 0;
96     size_t clearedValSize = 0;
97     for (int i = 0; i < (int)size; i++) {
98         while (values[i]) {
99             // Take ownership of the StoredValue from the vector, update
100             // statistics and release it.
101             auto v = std::move(values[i]);
102             clearedMemSize += v->size();
103             clearedValSize += v->valuelen();
104             values[i] = std::move(v->getNext());
105         }
106     }
107
108     stats.currentSize.fetch_sub(clearedMemSize - clearedValSize);
109
110     datatypeCounts.fill(0);
111     numTotalItems.store(0);
112     numItems.store(0);
113     numTempItems.store(0);
114     numNonResidentItems.store(0);
115     memSize.store(0);
116     cacheSize.store(0);
117 }
118
119 static size_t distance(size_t a, size_t b) {
120     return std::max(a, b) - std::min(a, b);
121 }
122
123 static size_t nearest(size_t n, size_t a, size_t b) {
124     return (distance(n, a) < distance(b, n)) ? a : b;
125 }
126
127 static bool isCurrently(size_t size, ssize_t a, ssize_t b) {
128     ssize_t current(static_cast<ssize_t>(size));
129     return (current == a || current == b);
130 }
131
132 void HashTable::resize() {
133     size_t ni = getNumInMemoryItems();
134     int i(0);
135     size_t new_size(0);
136
137     // Figure out where in the prime table we are.
138     ssize_t target(static_cast<ssize_t>(ni));
139     for (i = 0; prime_size_table[i] > 0 && prime_size_table[i] < target; ++i) {
140         // Just looking...
141     }
142
143     if (prime_size_table[i] == -1) {
144         // We're at the end, take the biggest
145         new_size = prime_size_table[i-1];
146     } else if (prime_size_table[i] < static_cast<ssize_t>(initialSize)) {
147         // Was going to be smaller than the initial size.
148         new_size = initialSize;
149     } else if (0 == i) {
150         new_size = prime_size_table[i];
151     }else if (isCurrently(size, prime_size_table[i-1], prime_size_table[i])) {
152         // If one of the candidate sizes is the current size, maintain
153         // the current size in order to remain stable.
154         new_size = size;
155     } else {
156         // Somewhere in the middle, use the one we're closer to.
157         new_size = nearest(ni, prime_size_table[i-1], prime_size_table[i]);
158     }
159
160     resize(new_size);
161 }
162
163 void HashTable::resize(size_t newSize) {
164     if (!isActive()) {
165         throw std::logic_error("HashTable::resize: Cannot call on a "
166                 "non-active object");
167     }
168
169     // Due to the way hashing works, we can't fit anything larger than
170     // an int.
171     if (newSize > static_cast<size_t>(std::numeric_limits<int>::max())) {
172         return;
173     }
174
175     // Don't resize to the same size, either.
176     if (newSize == size) {
177         return;
178     }
179
180     MultiLockHolder mlh(mutexes, n_locks);
181     if (visitors.load() > 0) {
182         // Do not allow a resize while any visitors are actually
183         // processing.  The next attempt will have to pick it up.  New
184         // visitors cannot start doing meaningful work (we own all
185         // locks at this point).
186         return;
187     }
188
189     // Get a place for the new items.
190     table_type newValues(newSize);
191
192     stats.memOverhead->fetch_sub(memorySize());
193     ++numResizes;
194
195     // Set the new size so all the hashy stuff works.
196     size_t oldSize = size;
197     size.store(newSize);
198
199     // Move existing records into the new space.
200     for (size_t i = 0; i < oldSize; i++) {
201         while (values[i]) {
202             // unlink the front element from the hash chain at values[i].
203             auto v = std::move(values[i]);
204             values[i] = std::move(v->getNext());
205
206             // And re-link it into the correct place in newValues.
207             int newBucket = getBucketForHash(v->getKey().hash());
208             v->setNext(std::move(newValues[newBucket]));
209             newValues[newBucket] = std::move(v);
210         }
211     }
212
213     // Finally assign the new table to values.
214     values = std::move(newValues);
215
216     stats.memOverhead->fetch_add(memorySize());
217 }
218
219 StoredValue* HashTable::find(const DocKey& key,
220                              TrackReference trackReference,
221                              WantsDeleted wantsDeleted) {
222     if (!isActive()) {
223         throw std::logic_error("HashTable::find: Cannot call on a "
224                 "non-active object");
225     }
226     HashBucketLock hbl = getLockedBucket(key);
227     return unlocked_find(key, hbl.getBucketNum(), wantsDeleted, trackReference);
228 }
229
230 std::unique_ptr<Item> HashTable::getRandomKey(long rnd) {
231     /* Try to locate a partition */
232     size_t start = rnd % size;
233     size_t curr = start;
234     std::unique_ptr<Item> ret;
235
236     do {
237         ret = getRandomKeyFromSlot(curr++);
238         if (curr == size) {
239             curr = 0;
240         }
241     } while (ret == NULL && curr != start);
242
243     return ret;
244 }
245
246 MutationStatus HashTable::set(Item& val) {
247     if (!StoredValue::hasAvailableSpace(stats, val, false)) {
248         return MutationStatus::NoMem;
249     }
250
251     HashBucketLock hbl = getLockedBucket(val.getKey());
252     StoredValue* v = unlocked_find(val.getKey(),
253                                    hbl.getBucketNum(),
254                                    WantsDeleted::Yes,
255                                    TrackReference::No);
256     if (v) {
257         return unlocked_updateStoredValue(hbl.getHTLock(), *v, val);
258     } else {
259         unlocked_addNewStoredValue(hbl, val);
260         return MutationStatus::WasClean;
261     }
262 }
263
264 MutationStatus HashTable::unlocked_updateStoredValue(
265         const std::unique_lock<std::mutex>& htLock,
266         StoredValue& v,
267         const Item& itm) {
268     if (!htLock) {
269         throw std::invalid_argument(
270                 "HashTable::unlocked_updateStoredValue: htLock "
271                 "not held");
272     }
273
274     if (!isActive()) {
275         throw std::logic_error(
276                 "HashTable::unlocked_updateStoredValue: Cannot "
277                 "call on a non-active HT object");
278     }
279
280     MutationStatus status =
281             v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
282     if (!v.isResident() && !v.isDeleted() && !v.isTempItem()) {
283         decrNumNonResidentItems();
284     }
285
286     if (v.isTempItem()) {
287         --numTempItems;
288         ++numItems;
289         ++numTotalItems;
290     }
291
292     if (v.isDeleted() && !itm.isDeleted()) {
293         --numDeletedItems;
294     }
295     if (!v.isDeleted() && itm.isDeleted()) {
296         ++numDeletedItems;
297     }
298
299     // If the item we are replacing is resident then we need to make sure we
300     // appropriately alter the datatype stats.
301     if (v.getDatatype() != itm.getDataType()) {
302         --datatypeCounts[v.getDatatype()];
303         ++datatypeCounts[itm.getDataType()];
304     }
305
306     /* setValue() will mark v as undeleted if required */
307     setValue(itm, v);
308
309     return status;
310 }
311
312 StoredValue* HashTable::unlocked_addNewStoredValue(const HashBucketLock& hbl,
313                                                    const Item& itm) {
314     if (!hbl.getHTLock()) {
315         throw std::invalid_argument(
316                 "HashTable::unlocked_addNewStoredValue: htLock "
317                 "not held");
318     }
319
320     if (!isActive()) {
321         throw std::invalid_argument(
322                 "HashTable::unlocked_addNewStoredValue: Cannot "
323                 "call on a non-active HT object");
324     }
325
326     // Create a new StoredValue and link it into the head of the bucket chain.
327     auto v = (*valFact)(itm, std::move(values[hbl.getBucketNum()]));
328     increaseMetaDataSize(stats, v->metaDataSize());
329     increaseCacheSize(v->size());
330
331     if (v->isTempItem()) {
332         ++numTempItems;
333     } else {
334         ++numItems;
335         ++numTotalItems;
336         ++datatypeCounts[v->getDatatype()];
337     }
338     if (v->isDeleted()) {
339         ++numDeletedItems;
340     }
341     values[hbl.getBucketNum()] = std::move(v);
342
343     return values[hbl.getBucketNum()].get();
344 }
345
346 std::pair<StoredValue*, StoredValue::UniquePtr>
347 HashTable::unlocked_replaceByCopy(const HashBucketLock& hbl,
348                                   const StoredValue& vToCopy) {
349     if (!hbl.getHTLock()) {
350         throw std::invalid_argument(
351                 "HashTable::unlocked_replaceByCopy: htLock "
352                 "not held");
353     }
354
355     if (!isActive()) {
356         throw std::invalid_argument(
357                 "HashTable::unlocked_replaceByCopy: Cannot "
358                 "call on a non-active HT object");
359     }
360
361     /* Release (remove) the StoredValue from the hash table */
362     auto releasedSv = unlocked_release(hbl, vToCopy.getKey());
363
364     /* Copy the StoredValue and link it into the head of the bucket chain. */
365     auto newSv = valFact->copyStoredValue(
366             vToCopy, std::move(values[hbl.getBucketNum()]));
367     if (newSv->isTempItem()) {
368         ++numTempItems;
369     } else {
370         ++numItems;
371         ++numTotalItems;
372     }
373     values[hbl.getBucketNum()] = std::move(newSv);
374
375     return {values[hbl.getBucketNum()].get(), std::move(releasedSv)};
376 }
377
378 void HashTable::unlocked_softDelete(const std::unique_lock<std::mutex>& htLock,
379                                     StoredValue& v,
380                                     bool onlyMarkDeleted) {
381     const bool alreadyDeleted = v.isDeleted();
382     if (!v.isResident() && !v.isDeleted() && !v.isTempItem()) {
383         decrNumNonResidentItems();
384     }
385
386     --datatypeCounts[v.getDatatype()];
387
388     if (onlyMarkDeleted) {
389         v.markDeleted();
390     } else {
391         if (v.isTempItem()) {
392             --numTempItems;
393             ++numItems;
394             ++numTotalItems;
395         }
396         size_t len = v.valuelen();
397         if (v.del()) {
398             reduceCacheSize(len);
399         }
400     }
401     if (!alreadyDeleted) {
402         ++numDeletedItems;
403     }
404 }
405
406 StoredValue* HashTable::unlocked_find(const DocKey& key,
407                                       int bucket_num,
408                                       WantsDeleted wantsDeleted,
409                                       TrackReference trackReference) {
410     for (StoredValue* v = values[bucket_num].get(); v; v = v->getNext().get()) {
411         if (v->hasKey(key)) {
412             if (trackReference == TrackReference::Yes && !v->isDeleted()) {
413                 v->referenced();
414             }
415             if (wantsDeleted == WantsDeleted::Yes || !v->isDeleted()) {
416                 return v;
417             } else {
418                 return NULL;
419             }
420         }
421     }
422     return NULL;
423 }
424
425 void HashTable::unlocked_del(const HashBucketLock& hbl, const DocKey& key) {
426     unlocked_release(hbl, key).reset();
427 }
428
429 StoredValue::UniquePtr HashTable::unlocked_release(
430         const HashBucketLock& hbl, const DocKey& key) {
431     if (!hbl.getHTLock()) {
432         throw std::invalid_argument(
433                 "HashTable::unlocked_remove: htLock "
434                 "not held");
435     }
436
437     if (!isActive()) {
438         throw std::logic_error(
439                 "HashTable::unlocked_remove: Cannot call on a "
440                 "non-active object");
441     }
442
443     // Remove the first (should only be one) StoredValue with the given key.
444     auto released = hashChainRemoveFirst(
445             values[hbl.getBucketNum()],
446             [key](const StoredValue* v) { return v->hasKey(key); });
447
448     if (!released) {
449         /* We shouldn't reach here, we must delete the StoredValue in the
450            HashTable */
451         throw std::logic_error(
452                 "HashTable::unlocked_del: StoredValue to be deleted "
453                 "not found in HashTable; possibly HashTable leak");
454     }
455
456     // Update statistics now the item has been removed.
457     reduceCacheSize(released->size());
458     reduceMetaDataSize(stats, released->metaDataSize());
459     if (released->isTempItem()) {
460         --numTempItems;
461     } else {
462         decrNumItems();
463         decrNumTotalItems();
464         --datatypeCounts[released->getDatatype()];
465         if (released->isDeleted()) {
466             --numDeletedItems;
467         }
468     }
469     return released;
470 }
471
472 void HashTable::visit(HashTableVisitor &visitor) {
473     if ((numItems.load() + numTempItems.load()) == 0 || !isActive()) {
474         return;
475     }
476
477     // Acquire one (any) of the mutexes before incrementing {visitors}, this
478     // prevents any race between this visitor and the HashTable resizer.
479     // See comments in pauseResumeVisit() for further details.
480     std::unique_lock<std::mutex> lh(mutexes[0]);
481     VisitorTracker vt(&visitors);
482     lh.unlock();
483
484     bool aborted = !visitor.shouldContinue();
485     size_t visited = 0;
486     for (int l = 0; isActive() && !aborted && l < static_cast<int>(n_locks);
487          l++) {
488         for (int i = l; i < static_cast<int>(size); i+= n_locks) {
489             // (re)acquire mutex on each HashBucket, to minimise any impact
490             // on front-end threads.
491             HashBucketLock lh(i, mutexes[l]);
492
493             StoredValue* v = values[i].get();
494             if (v) {
495                 // TODO: Perf: This check seems costly - do we think it's still
496                 // worth keeping?
497                 auto hashbucket = getBucketForHash(v->getKey().hash());
498                 if (i != hashbucket) {
499                     throw std::logic_error("HashTable::visit: inconsistency "
500                             "between StoredValue's calculated hashbucket "
501                             "(which is " + std::to_string(hashbucket) +
502                             ") and bucket is is located in (which is " +
503                             std::to_string(i) + ")");
504                 }
505             }
506             while (v) {
507                 StoredValue* tmp = v->getNext().get();
508                 visitor.visit(lh, v);
509                 v = tmp;
510             }
511             ++visited;
512         }
513         aborted = !visitor.shouldContinue();
514     }
515 }
516
517 void HashTable::visitDepth(HashTableDepthVisitor &visitor) {
518     if (numItems.load() == 0 || !isActive()) {
519         return;
520     }
521     size_t visited = 0;
522     VisitorTracker vt(&visitors);
523
524     for (int l = 0; l < static_cast<int>(n_locks); l++) {
525         LockHolder lh(mutexes[l]);
526         for (int i = l; i < static_cast<int>(size); i+= n_locks) {
527             size_t depth = 0;
528             StoredValue* p = values[i].get();
529             if (p) {
530                 // TODO: Perf: This check seems costly - do we think it's still
531                 // worth keeping?
532                 auto hashbucket = getBucketForHash(p->getKey().hash());
533                 if (i != hashbucket) {
534                     throw std::logic_error("HashTable::visit: inconsistency "
535                             "between StoredValue's calculated hashbucket "
536                             "(which is " + std::to_string(hashbucket) +
537                             ") and bucket it is located in (which is " +
538                             std::to_string(i) + ")");
539                 }
540             }
541             size_t mem(0);
542             while (p) {
543                 depth++;
544                 mem += p->size();
545                 p = p->getNext().get();
546             }
547             visitor.visit(i, depth, mem);
548             ++visited;
549         }
550     }
551 }
552
553 HashTable::Position
554 HashTable::pauseResumeVisit(PauseResumeHashTableVisitor& visitor,
555                             Position& start_pos) {
556     if ((numItems.load() + numTempItems.load()) == 0 || !isActive()) {
557         // Nothing to visit
558         return endPosition();
559     }
560
561     bool paused = false;
562
563     // To attempt to minimize the impact the visitor has on normal frontend
564     // operations, we deliberately acquire (and release) the mutex between
565     // each hash_bucket - see `lh` in the inner for() loop below. This means we
566     // hold a given mutex for a large number of short durations, instead of just
567     // one single, long duration.
568     // *However*, there is a potential race with this approach - the {size} of
569     // the HashTable may be changed (by the Resizer task) between us first
570     // reading it to calculate the starting hash_bucket, and then reading it
571     // inside the inner for() loop. To prevent this race, we explicitly acquire
572     // (any) mutex, increment {visitors} and then release the mutex. This
573     //avoids the race as if visitors >0 then Resizer will not attempt to resize.
574     std::unique_lock<std::mutex> lh(mutexes[0]);
575     VisitorTracker vt(&visitors);
576     lh.unlock();
577
578     // Start from the requested lock number if in range.
579     size_t lock = (start_pos.lock < n_locks) ? start_pos.lock : 0;
580     size_t hash_bucket = 0;
581
582     for (; isActive() && !paused && lock < n_locks; lock++) {
583
584         // If the bucket position is *this* lock, then start from the
585         // recorded bucket (as long as we haven't resized).
586         hash_bucket = lock;
587         if (start_pos.lock == lock &&
588             start_pos.ht_size == size &&
589             start_pos.hash_bucket < size) {
590             hash_bucket = start_pos.hash_bucket;
591         }
592
593         // Iterate across all values in the hash buckets owned by this lock.
594         // Note: we don't record how far into the bucket linked-list we
595         // pause at; so any restart will begin from the next bucket.
596         for (; !paused && hash_bucket < size; hash_bucket += n_locks) {
597             LockHolder lh(mutexes[lock]);
598
599             StoredValue* v = values[hash_bucket].get();
600             while (!paused && v) {
601                 StoredValue* tmp = v->getNext().get();
602                 paused = !visitor.visit(*v);
603                 v = tmp;
604             }
605         }
606
607         // If the visitor paused us before we visited all hash buckets owned
608         // by this lock, we don't want to skip the remaining hash buckets, so
609         // stop the outer for loop from advancing to the next lock.
610         if (paused && hash_bucket < size) {
611             break;
612         }
613
614         // Finished all buckets owned by this lock. Set hash_bucket to 'size'
615         // to give a consistent marker for "end of lock".
616         hash_bucket = size;
617     }
618
619     // Return the *next* location that should be visited.
620     return HashTable::Position(size, lock, hash_bucket);
621 }
622
623 HashTable::Position HashTable::endPosition() const  {
624     return HashTable::Position(size, n_locks, size);
625 }
626
627 bool HashTable::unlocked_ejectItem(StoredValue*& vptr,
628                                    item_eviction_policy_t policy) {
629     if (vptr == nullptr) {
630         throw std::invalid_argument("HashTable::unlocked_ejectItem: "
631                 "Unable to delete NULL StoredValue");
632     }
633     if (policy == VALUE_ONLY) {
634         if (vptr->eligibleForEviction(policy)) {
635             reduceCacheSize(vptr->valuelen());
636             vptr->ejectValue();
637             ++stats.numValueEjects;
638             ++numNonResidentItems;
639             ++numEjects;
640             return true;
641         } else {
642             ++stats.numFailedEjects;
643             return false;
644         }
645     } else { // full eviction.
646         if (vptr->eligibleForEviction(policy)) {
647             reduceMetaDataSize(stats, vptr->metaDataSize());
648             reduceCacheSize(vptr->size());
649             int bucket_num = getBucketForHash(vptr->getKey().hash());
650
651             // Remove the item from the hash table.
652             auto removed = hashChainRemoveFirst(
653                     values[bucket_num],
654                     [vptr](const StoredValue* v) { return v == vptr; });
655
656             if (removed->isResident()) {
657                 ++stats.numValueEjects;
658             }
659             if (!removed->isResident() && !removed->isTempItem()) {
660                 decrNumNonResidentItems(); // Decrement because the item is
661                                            // fully evicted.
662             }
663             decrNumItems(); // Decrement because the item is fully evicted.
664             --datatypeCounts[vptr->getDatatype()];
665             ++numEjects;
666             updateMaxDeletedRevSeqno(vptr->getRevSeqno());
667
668             return true;
669         } else {
670             ++stats.numFailedEjects;
671             return false;
672         }
673     }
674 }
675
676 std::unique_ptr<Item> HashTable::getRandomKeyFromSlot(int slot) {
677     auto lh = getLockedBucket(slot);
678     for (StoredValue* v = values[slot].get(); v; v = v->getNext().get()) {
679         if (!v->isTempItem() && !v->isDeleted() && v->isResident()) {
680             return v->toItem(false, 0);
681         }
682     }
683
684     return nullptr;
685 }
686
687 bool HashTable::unlocked_restoreValue(
688         const std::unique_lock<std::mutex>& htLock,
689         const Item& itm,
690         StoredValue& v) {
691     if (!htLock || !isActive() || v.isResident()) {
692         return false;
693     }
694
695     if (v.isTempInitialItem()) { // Regular item with the full eviction
696         --numTempItems;
697         ++numItems;
698         /* set it back to false as we created a temp item by setting it to true
699            when bg fetch is scheduled (full eviction mode). */
700         v.setNewCacheItem(false);
701         ++datatypeCounts[itm.getDataType()];
702     } else {
703         decrNumNonResidentItems();
704     }
705
706     v.restoreValue(itm);
707
708     increaseCacheSize(v.getValue()->length());
709     return true;
710 }
711
712 void HashTable::unlocked_restoreMeta(const std::unique_lock<std::mutex>& htLock,
713                                      const Item& itm,
714                                      StoredValue& v) {
715     if (!htLock) {
716         throw std::invalid_argument(
717                 "HashTable::unlocked_restoreMeta: htLock "
718                 "not held");
719     }
720
721     if (!isActive()) {
722         throw std::logic_error(
723                 "HashTable::unlocked_restoreMeta: Cannot "
724                 "call on a non-active HT object");
725     }
726
727     v.restoreMeta(itm);
728     if (!itm.isDeleted()) {
729         --numTempItems;
730         ++numItems;
731         ++numNonResidentItems;
732         ++datatypeCounts[v.getDatatype()];
733     }
734 }
735
736 void HashTable::increaseCacheSize(size_t by) {
737     cacheSize.fetch_add(by);
738     memSize.fetch_add(by);
739 }
740
741 void HashTable::reduceCacheSize(size_t by) {
742     cacheSize.fetch_sub(by);
743     memSize.fetch_sub(by);
744 }
745
746 void HashTable::increaseMetaDataSize(EPStats& st, size_t by) {
747     metaDataMemory.fetch_add(by);
748     st.currentSize.fetch_add(by);
749 }
750
751 void HashTable::reduceMetaDataSize(EPStats &st, size_t by) {
752     metaDataMemory.fetch_sub(by);
753     st.currentSize.fetch_sub(by);
754 }
755
756 void HashTable::setValue(const Item& itm, StoredValue& v) {
757     reduceCacheSize(v.size());
758     v.setValue(itm);
759     increaseCacheSize(v.size());
760 }
761
762 std::ostream& operator<<(std::ostream& os, const HashTable& ht) {
763     os << "HashTable[" << &ht << "] with"
764        << " numInMemory:" << ht.getNumInMemoryItems()
765        << " numDeleted:" << ht.getNumDeletedItems()
766        << " values: " << std::endl;
767     for (const auto& chain : ht.values) {
768         if (chain) {
769             for (StoredValue* sv = chain.get(); sv != nullptr;
770                  sv = sv->getNext().get()) {
771                 os << "    " << *sv << std::endl;
772             }
773         }
774     }
775     return os;
776 }