OrangePi3588Media/include/utils/spsc_queue.h
2026-01-29 20:44:16 +08:00

290 lines
9.4 KiB
C++

#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
#include <utility>
namespace rk3588 {
enum class QueueDropStrategy {
DropOldest,
DropNewest,
Block
};
template <typename T>
class SpscQueue {
public:
struct Stats {
size_t size = 0;
size_t capacity = 0;
size_t dropped = 0;
size_t pushed = 0;
size_t popped = 0;
bool stopped = false;
};
SpscQueue(size_t capacity, QueueDropStrategy strategy)
: capacity_(capacity == 0 ? 1 : capacity), strategy_(strategy) {
buffer_.resize(capacity_);
for (size_t i = 0; i < capacity_; ++i) {
buffer_[i].seq.store(i, std::memory_order_relaxed);
}
}
// Called when the queue transitions from empty -> non-empty (best-effort).
// Must be fast and must not call back into this queue.
void SetOnDataAvailable(std::function<void()> cb) {
std::lock_guard<std::mutex> lock(cb_mu_);
on_data_available_ = std::move(cb);
}
bool Push(T item) {
while (true) {
if (stop_.load(std::memory_order_acquire)) return false;
if (TryEnqueue(item)) {
pushed_.fetch_add(1, std::memory_order_relaxed);
const size_t prev_size = size_.fetch_add(1, std::memory_order_acq_rel);
if (prev_size == 0) {
std::function<void()> cb;
{
std::lock_guard<std::mutex> lock(cb_mu_);
cb = on_data_available_;
}
if (cb) cb();
data_cv_.notify_one();
}
return true;
}
// Queue full.
if (strategy_ == QueueDropStrategy::DropNewest) {
dropped_.fetch_add(1, std::memory_order_relaxed);
return true;
}
if (strategy_ == QueueDropStrategy::DropOldest) {
if (TryDropOneOldest()) {
continue;
}
std::this_thread::yield();
continue;
}
// Block until space is available.
std::unique_lock<std::mutex> lock(wait_mu_);
space_cv_.wait(lock, [&] {
return stop_.load(std::memory_order_acquire) ||
size_.load(std::memory_order_acquire) < capacity_;
});
}
}
bool Pop(T& out, std::chrono::milliseconds timeout) {
if (TryPop(out)) return true;
std::unique_lock<std::mutex> lock(wait_mu_);
if (!data_cv_.wait_for(lock, timeout, [&] {
return stop_.load(std::memory_order_acquire) ||
size_.load(std::memory_order_acquire) > 0;
})) {
return false;
}
return TryPop(out);
}
// Blocks until an item is available or the queue is stopped.
bool Pop(T& out) {
while (true) {
if (TryPop(out)) return true;
std::unique_lock<std::mutex> lock(wait_mu_);
data_cv_.wait(lock, [&] {
return stop_.load(std::memory_order_acquire) ||
size_.load(std::memory_order_acquire) > 0;
});
if (stop_.load(std::memory_order_acquire) &&
size_.load(std::memory_order_acquire) == 0) {
return false;
}
}
}
// Non-blocking pop.
bool TryPop(T& out) {
if (!TryDequeue(out)) return false;
popped_.fetch_add(1, std::memory_order_relaxed);
const size_t prev_size = size_.fetch_sub(1, std::memory_order_acq_rel);
if (prev_size == capacity_) {
// Queue was full before this pop.
space_cv_.notify_one();
}
return true;
}
// Pop up to max_items currently available (non-blocking). Returns true if any item was popped.
bool TryPopBatch(std::vector<T>& out, size_t max_items) {
if (max_items == 0) return false;
out.clear();
out.reserve(max_items);
for (size_t i = 0; i < max_items; ++i) {
T item;
if (!TryPop(item)) break;
out.push_back(std::move(item));
}
return !out.empty();
}
void Stop() {
stop_.store(true, std::memory_order_release);
data_cv_.notify_all();
space_cv_.notify_all();
}
bool IsStopped() const {
return stop_.load(std::memory_order_acquire);
}
size_t Size() const {
return size_.load(std::memory_order_acquire);
}
size_t Capacity() const { return capacity_; }
size_t DroppedCount() const {
return dropped_.load(std::memory_order_acquire);
}
size_t PushedCount() const {
return pushed_.load(std::memory_order_acquire);
}
size_t PoppedCount() const {
return popped_.load(std::memory_order_acquire);
}
Stats GetStats() const {
Stats s;
s.size = size_.load(std::memory_order_acquire);
s.capacity = capacity_;
s.dropped = dropped_.load(std::memory_order_acquire);
s.pushed = pushed_.load(std::memory_order_acquire);
s.popped = popped_.load(std::memory_order_acquire);
s.stopped = stop_.load(std::memory_order_acquire);
return s;
}
private:
struct Cell {
std::atomic<size_t> seq;
T data;
Cell() : seq(0), data() {}
Cell(const Cell&) = delete;
Cell& operator=(const Cell&) = delete;
Cell(Cell&& other) noexcept
: seq(other.seq.load(std::memory_order_relaxed)), data(std::move(other.data)) {}
Cell& operator=(Cell&& other) noexcept {
if (this != &other) {
seq.store(other.seq.load(std::memory_order_relaxed), std::memory_order_relaxed);
data = std::move(other.data);
}
return *this;
}
};
// Lock-free bounded MPMC queue (Vyukov). We still expose it as SPSC in this project,
// but DropOldest may dequeue from the producer thread.
bool TryEnqueue(T& item) {
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
while (true) {
Cell& cell = buffer_[pos % capacity_];
const size_t seq = cell.seq.load(std::memory_order_acquire);
const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
if (dif == 0) {
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
cell.data = std::move(item);
cell.seq.store(pos + 1, std::memory_order_release);
return true;
}
} else if (dif < 0) {
return false; // full
} else {
pos = enqueue_pos_.load(std::memory_order_relaxed);
}
}
}
bool TryDequeue(T& out) {
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
while (true) {
Cell& cell = buffer_[pos % capacity_];
const size_t seq = cell.seq.load(std::memory_order_acquire);
const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
if (dif == 0) {
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
out = std::move(cell.data);
cell.seq.store(pos + capacity_, std::memory_order_release);
return true;
}
} else if (dif < 0) {
return false; // empty
} else {
pos = dequeue_pos_.load(std::memory_order_relaxed);
}
}
}
bool TryDropOneOldest() {
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
while (true) {
Cell& cell = buffer_[pos % capacity_];
const size_t seq = cell.seq.load(std::memory_order_acquire);
const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
if (dif == 0) {
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
// Drop the item and release its resources eagerly.
T dropped_item = std::move(cell.data);
(void)dropped_item;
cell.seq.store(pos + capacity_, std::memory_order_release);
size_.fetch_sub(1, std::memory_order_acq_rel);
dropped_.fetch_add(1, std::memory_order_relaxed);
return true;
}
} else if (dif < 0) {
return false;
} else {
pos = dequeue_pos_.load(std::memory_order_relaxed);
}
}
}
size_t capacity_ = 0;
QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest;
alignas(64) std::atomic<size_t> enqueue_pos_{0};
alignas(64) std::atomic<size_t> dequeue_pos_{0};
std::vector<Cell> buffer_;
alignas(64) std::atomic<size_t> size_{0};
std::atomic<bool> stop_{false};
std::atomic<size_t> dropped_{0};
std::atomic<size_t> pushed_{0};
std::atomic<size_t> popped_{0};
mutable std::mutex wait_mu_;
std::condition_variable data_cv_;
std::condition_variable space_cv_;
mutable std::mutex cb_mu_;
std::function<void()> on_data_available_;
};
} // namespace rk3588