ceph/src/common/QueueRing.h
Jesse Williamson 1fa473ae15 common: migrate atomic_t to <atomic>
Signed-off-by: Jesse Williamson <jwilliamson@suse.de>
2017-06-01 15:36:27 -07:00

64 lines
1.3 KiB
C++

#ifndef QUEUE_RING_H
#define QUEUE_RING_H
#include "common/Mutex.h"
#include "common/Cond.h"
#include <list>
#include <atomic>
#include <vector>
template <class T>
class QueueRing {
struct QueueBucket {
Mutex lock;
Cond cond;
typename std::list<T> entries;
QueueBucket() : lock("QueueRing::QueueBucket::lock") {}
QueueBucket(const QueueBucket& rhs) : lock("QueueRing::QueueBucket::lock") {
entries = rhs.entries;
}
void enqueue(const T& entry) {
lock.Lock();
if (entries.empty()) {
cond.Signal();
}
entries.push_back(entry);
lock.Unlock();
}
void dequeue(T *entry) {
lock.Lock();
if (entries.empty()) {
cond.Wait(lock);
};
assert(!entries.empty());
*entry = entries.front();
entries.pop_front();
lock.Unlock();
};
};
std::vector<QueueBucket> buckets;
int num_buckets;
std::atomic<int64_t> cur_read_bucket = { 0 };
std::atomic<int64_t> cur_write_bucket = { 0 };
public:
QueueRing(int n) : buckets(n), num_buckets(n) {
}
void enqueue(const T& entry) {
buckets[++cur_write_bucket % num_buckets].enqueue(entry);
};
void dequeue(T *entry) {
buckets[++cur_read_bucket % num_buckets].dequeue(entry);
}
};
#endif