OrangePi3588Media/include/utils/spsc_queue.h

79 lines
2.0 KiB
C++

#pragma once
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <mutex>
namespace rk3588 {
enum class QueueDropStrategy {
DropOldest,
Block
};
template <typename T>
class SpscQueue {
public:
SpscQueue(size_t capacity, QueueDropStrategy strategy)
: capacity_(capacity), strategy_(strategy) {}
bool Push(T item) {
std::unique_lock<std::mutex> lock(mu_);
if (queue_.size() >= capacity_) {
if (strategy_ == QueueDropStrategy::DropOldest) {
queue_.pop_front();
++dropped_;
} else {
// Block until space is available
space_cv_.wait(lock, [&] { return queue_.size() < capacity_ || stop_; });
if (stop_) return false;
}
}
queue_.push_back(std::move(item));
data_cv_.notify_one();
return true;
}
bool Pop(T& out, std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mu_);
if (!data_cv_.wait_for(lock, timeout, [&] { return !queue_.empty() || stop_; })) {
return false;
}
if (queue_.empty()) return false;
out = std::move(queue_.front());
queue_.pop_front();
space_cv_.notify_one();
return true;
}
void Stop() {
std::lock_guard<std::mutex> lock(mu_);
stop_ = true;
data_cv_.notify_all();
space_cv_.notify_all();
}
size_t Size() const {
std::lock_guard<std::mutex> lock(mu_);
return queue_.size();
}
size_t Capacity() const { return capacity_; }
size_t DroppedCount() const { return dropped_; }
private:
size_t capacity_ = 0;
QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest;
mutable std::mutex mu_;
std::condition_variable data_cv_;
std::condition_variable space_cv_;
std::deque<T> queue_;
bool stop_ = false;
size_t dropped_ = 0;
};
} // namespace rk3588