MB-20054: Fix windows build error by adding size() func in class AtomicQueue
[ep-engine.git] / src / atomicqueue.h
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17
18 #ifndef SRC_ATOMICQUEUE_H_
19 #define SRC_ATOMICQUEUE_H_ 1
20
21 #include "atomic.h"
22
23 #ifdef _MSC_VER
24
25 #include <queue>
26 #include <thread>
27 #include <mutex>
28
29 /**
30  * Create a simple version of the AtomicQueue for windows right now to
31  * avoid the threadlocal usage which is currently using pthreads
32  */
33 template <typename T>
34 class AtomicQueue {
35 public:
36     void push(T &value) {
37         std::lock_guard<std::mutex> lock(mutex);
38         queue.push(value);
39     }
40
41     void getAll(std::queue<T> &outQueue) {
42         std::lock_guard<std::mutex> lock(mutex);
43         while (!queue.empty()) {
44             outQueue.push(queue.front());
45             queue.pop();
46         }
47     }
48
49     bool empty() {
50         std::lock_guard<std::mutex> lock(mutex);
51         return queue.empty();
52     }
53
54     /**
55      * Return the number of queued items.
56      */
57     size_t size() {
58         std::lock_guard<std::mutex> lock(mutex);
59         return queue.size();
60     }
61 private:
62     std::queue<T> queue;
63     std::mutex mutex;
64 };
65
66 #else
67
68 #include "threadlocal.h"
69
70 template <typename T>
71 class CouchbaseAtomicPtr : public CouchbaseAtomic<T*> {
72 public:
73     CouchbaseAtomicPtr(T *initial = NULL) : CouchbaseAtomic<T*>(initial) {}
74
75     ~CouchbaseAtomicPtr() {}
76
77     T *operator ->() {
78         return CouchbaseAtomic<T*>::load();
79     }
80
81     T &operator *() {
82         return *CouchbaseAtomic<T*>::load();
83     }
84
85     operator bool() const {
86         return CouchbaseAtomic<T*>::load() != NULL;
87     }
88
89     bool operator !() const {
90         return CouchbaseAtomic<T*>::load() == NULL;
91     }
92 };
93
94
95
96 /**
97  * Efficient approximate-FIFO queue optimize for concurrent writers.
98  */
99 template <typename T>
100 class AtomicQueue {
101 public:
102     AtomicQueue() : counter(0), numItems(0) {}
103
104     ~AtomicQueue() {
105         size_t i;
106         for (i = 0; i < counter; ++i) {
107             delete queues[i];
108         }
109     }
110
111     /**
112      * Place an item in the queue.
113      */
114     void push(T &value) {
115         std::queue<T> *q = swapQueue(); // steal our queue
116         q->push(value);
117         ++numItems;
118         q = swapQueue(q);
119     }
120
121     /**
122      * Grab all items from this queue an place them into the provided
123      * output queue.
124      *
125      * @param outQueue a destination queue to fill
126      */
127     void getAll(std::queue<T> &outQueue) {
128         std::queue<T> *q(swapQueue()); // Grab my own queue
129         std::queue<T> *newQueue(NULL);
130         int count(0);
131
132         // Will start empty unless this thread is adding stuff
133         while (!q->empty()) {
134             outQueue.push(q->front());
135             q->pop();
136             ++count;
137         }
138
139         size_t c(counter);
140         for (size_t i = 0; i < c; ++i) {
141             // Swap with another thread
142             newQueue = queues[i].swapIfNot(NULL, q);
143             // Empty the queue
144             if (newQueue != NULL) {
145                 q = newQueue;
146                 while (!q->empty()) {
147                     outQueue.push(q->front());
148                     q->pop();
149                     ++count;
150                 }
151             }
152         }
153
154         q = swapQueue(q);
155         numItems.fetch_sub(count);
156     }
157
158     /**
159      * True if this queue is empty.
160      */
161     bool empty() const {
162         return size() == 0;
163     }
164
165     /**
166      * Return the number of queued items.
167      */
168     size_t size() const {
169         return numItems;
170     }
171 private:
172     CouchbaseAtomicPtr<std::queue<T> > *initialize() {
173         std::queue<T> *q = new std::queue<T>;
174         size_t i(counter++);
175         cb_assert(counter <= MAX_THREADS);
176         queues[i] = q;
177         threadQueue = &queues[i];
178         return &queues[i];
179     }
180
181     std::queue<T> *swapQueue(std::queue<T> *newQueue = NULL) {
182         CouchbaseAtomicPtr<std::queue<T> > *qPtr(threadQueue);
183         if (qPtr == NULL) {
184             qPtr = initialize();
185         }
186         return qPtr->exchange(newQueue);
187     }
188
189     ThreadLocalPtr<CouchbaseAtomicPtr<std::queue<T> > > threadQueue;
190     CouchbaseAtomicPtr<std::queue<T> > queues[MAX_THREADS];
191     AtomicValue<size_t> counter;
192     AtomicValue<size_t> numItems;
193     DISALLOW_COPY_AND_ASSIGN(AtomicQueue);
194 };
195 #endif
196
197
198 #endif  // SRC_ATOMICQUEUE_H_