OrangePi3588Media/include/utils/spsc_queue.h
sladro f8aa377a79
Some checks are pending
CI / host-build (push) Waiting to run
CI / rk3588-cross-build (push) Waiting to run
优化性能,对齐prd
2026-01-06 17:37:27 +08:00

152 lines
3.8 KiB
C++

#pragma once
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <mutex>
#include <vector>
#include <utility>
namespace rk3588 {
enum class QueueDropStrategy {
DropOldest,
DropNewest,
Block
};
template <typename T>
class SpscQueue {
public:
struct Stats {
size_t size = 0;
size_t capacity = 0;
size_t dropped = 0;
size_t pushed = 0;
size_t popped = 0;
bool stopped = false;
};
SpscQueue(size_t capacity, QueueDropStrategy strategy)
: capacity_(capacity == 0 ? 1 : capacity), strategy_(strategy) {
buf_.resize(capacity_);
}
bool Push(T item) {
std::unique_lock<std::mutex> lock(mu_);
if (stop_) return false;
if (size_ >= capacity_) {
if (strategy_ == QueueDropStrategy::DropOldest) {
// Drop the oldest item.
head_ = (head_ + 1) % capacity_;
--size_;
++dropped_;
} else if (strategy_ == QueueDropStrategy::DropNewest) {
// Drop the incoming item.
++dropped_;
return true;
} else {
// Block until space is available
space_cv_.wait(lock, [&] { return size_ < capacity_ || stop_; });
if (stop_) return false;
}
}
buf_[tail_] = std::move(item);
tail_ = (tail_ + 1) % capacity_;
++size_;
++pushed_;
data_cv_.notify_one();
return true;
}
bool Pop(T& out, std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mu_);
if (!data_cv_.wait_for(lock, timeout, [&] { return size_ > 0 || stop_; })) {
return false;
}
if (size_ == 0) return false;
out = std::move(buf_[head_]);
head_ = (head_ + 1) % capacity_;
--size_;
++popped_;
space_cv_.notify_one();
return true;
}
// Blocks until an item is available or the queue is stopped.
bool Pop(T& out) {
std::unique_lock<std::mutex> lock(mu_);
data_cv_.wait(lock, [&] { return size_ > 0 || stop_; });
if (size_ == 0) return false;
out = std::move(buf_[head_]);
head_ = (head_ + 1) % capacity_;
--size_;
++popped_;
space_cv_.notify_one();
return true;
}
void Stop() {
std::lock_guard<std::mutex> lock(mu_);
stop_ = true;
data_cv_.notify_all();
space_cv_.notify_all();
}
bool IsStopped() const {
std::lock_guard<std::mutex> lock(mu_);
return stop_;
}
size_t Size() const {
std::lock_guard<std::mutex> lock(mu_);
return size_;
}
size_t Capacity() const { return capacity_; }
size_t DroppedCount() const {
std::lock_guard<std::mutex> lock(mu_);
return dropped_;
}
size_t PushedCount() const {
std::lock_guard<std::mutex> lock(mu_);
return pushed_;
}
size_t PoppedCount() const {
std::lock_guard<std::mutex> lock(mu_);
return popped_;
}
Stats GetStats() const {
std::lock_guard<std::mutex> lock(mu_);
Stats s;
s.size = size_;
s.capacity = capacity_;
s.dropped = dropped_;
s.pushed = pushed_;
s.popped = popped_;
s.stopped = stop_;
return s;
}
private:
size_t capacity_ = 0;
QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest;
mutable std::mutex mu_;
std::condition_variable data_cv_;
std::condition_variable space_cv_;
std::vector<T> buf_;
size_t head_ = 0;
size_t tail_ = 0;
size_t size_ = 0;
bool stop_ = false;
size_t dropped_ = 0;
size_t pushed_ = 0;
size_t popped_ = 0;
};
} // namespace rk3588