152 lines
3.8 KiB
C++
152 lines
3.8 KiB
C++
#pragma once
|
|
|
|
#include <chrono>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <mutex>
|
|
#include <vector>
|
|
#include <utility>
|
|
|
|
namespace rk3588 {
|
|
|
|
enum class QueueDropStrategy {
|
|
DropOldest,
|
|
DropNewest,
|
|
Block
|
|
};
|
|
|
|
template <typename T>
|
|
class SpscQueue {
|
|
public:
|
|
struct Stats {
|
|
size_t size = 0;
|
|
size_t capacity = 0;
|
|
size_t dropped = 0;
|
|
size_t pushed = 0;
|
|
size_t popped = 0;
|
|
bool stopped = false;
|
|
};
|
|
|
|
SpscQueue(size_t capacity, QueueDropStrategy strategy)
|
|
: capacity_(capacity == 0 ? 1 : capacity), strategy_(strategy) {
|
|
buf_.resize(capacity_);
|
|
}
|
|
|
|
bool Push(T item) {
|
|
std::unique_lock<std::mutex> lock(mu_);
|
|
if (stop_) return false;
|
|
if (size_ >= capacity_) {
|
|
if (strategy_ == QueueDropStrategy::DropOldest) {
|
|
// Drop the oldest item.
|
|
head_ = (head_ + 1) % capacity_;
|
|
--size_;
|
|
++dropped_;
|
|
} else if (strategy_ == QueueDropStrategy::DropNewest) {
|
|
// Drop the incoming item.
|
|
++dropped_;
|
|
return true;
|
|
} else {
|
|
// Block until space is available
|
|
space_cv_.wait(lock, [&] { return size_ < capacity_ || stop_; });
|
|
if (stop_) return false;
|
|
}
|
|
}
|
|
buf_[tail_] = std::move(item);
|
|
tail_ = (tail_ + 1) % capacity_;
|
|
++size_;
|
|
++pushed_;
|
|
data_cv_.notify_one();
|
|
return true;
|
|
}
|
|
|
|
bool Pop(T& out, std::chrono::milliseconds timeout) {
|
|
std::unique_lock<std::mutex> lock(mu_);
|
|
if (!data_cv_.wait_for(lock, timeout, [&] { return size_ > 0 || stop_; })) {
|
|
return false;
|
|
}
|
|
if (size_ == 0) return false;
|
|
out = std::move(buf_[head_]);
|
|
head_ = (head_ + 1) % capacity_;
|
|
--size_;
|
|
++popped_;
|
|
space_cv_.notify_one();
|
|
return true;
|
|
}
|
|
|
|
// Blocks until an item is available or the queue is stopped.
|
|
bool Pop(T& out) {
|
|
std::unique_lock<std::mutex> lock(mu_);
|
|
data_cv_.wait(lock, [&] { return size_ > 0 || stop_; });
|
|
if (size_ == 0) return false;
|
|
out = std::move(buf_[head_]);
|
|
head_ = (head_ + 1) % capacity_;
|
|
--size_;
|
|
++popped_;
|
|
space_cv_.notify_one();
|
|
return true;
|
|
}
|
|
|
|
void Stop() {
|
|
std::lock_guard<std::mutex> lock(mu_);
|
|
stop_ = true;
|
|
data_cv_.notify_all();
|
|
space_cv_.notify_all();
|
|
}
|
|
|
|
bool IsStopped() const {
|
|
std::lock_guard<std::mutex> lock(mu_);
|
|
return stop_;
|
|
}
|
|
|
|
size_t Size() const {
|
|
std::lock_guard<std::mutex> lock(mu_);
|
|
return size_;
|
|
}
|
|
|
|
size_t Capacity() const { return capacity_; }
|
|
|
|
size_t DroppedCount() const {
|
|
std::lock_guard<std::mutex> lock(mu_);
|
|
return dropped_;
|
|
}
|
|
|
|
size_t PushedCount() const {
|
|
std::lock_guard<std::mutex> lock(mu_);
|
|
return pushed_;
|
|
}
|
|
|
|
size_t PoppedCount() const {
|
|
std::lock_guard<std::mutex> lock(mu_);
|
|
return popped_;
|
|
}
|
|
|
|
Stats GetStats() const {
|
|
std::lock_guard<std::mutex> lock(mu_);
|
|
Stats s;
|
|
s.size = size_;
|
|
s.capacity = capacity_;
|
|
s.dropped = dropped_;
|
|
s.pushed = pushed_;
|
|
s.popped = popped_;
|
|
s.stopped = stop_;
|
|
return s;
|
|
}
|
|
|
|
private:
|
|
size_t capacity_ = 0;
|
|
QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest;
|
|
mutable std::mutex mu_;
|
|
std::condition_variable data_cv_;
|
|
std::condition_variable space_cv_;
|
|
std::vector<T> buf_;
|
|
size_t head_ = 0;
|
|
size_t tail_ = 0;
|
|
size_t size_ = 0;
|
|
bool stop_ = false;
|
|
size_t dropped_ = 0;
|
|
size_t pushed_ = 0;
|
|
size_t popped_ = 0;
|
|
};
|
|
|
|
} // namespace rk3588
|