290 lines
9.4 KiB
C++
290 lines
9.4 KiB
C++
#pragma once
|
|
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <cstdint>
|
|
#include <functional>
|
|
#include <mutex>
|
|
#include <thread>
|
|
#include <vector>
|
|
#include <utility>
|
|
|
|
namespace rk3588 {
|
|
|
|
enum class QueueDropStrategy {
|
|
DropOldest,
|
|
DropNewest,
|
|
Block
|
|
};
|
|
|
|
template <typename T>
|
|
class SpscQueue {
|
|
public:
|
|
struct Stats {
|
|
size_t size = 0;
|
|
size_t capacity = 0;
|
|
size_t dropped = 0;
|
|
size_t pushed = 0;
|
|
size_t popped = 0;
|
|
bool stopped = false;
|
|
};
|
|
|
|
SpscQueue(size_t capacity, QueueDropStrategy strategy)
|
|
: capacity_(capacity == 0 ? 1 : capacity), strategy_(strategy) {
|
|
buffer_.resize(capacity_);
|
|
for (size_t i = 0; i < capacity_; ++i) {
|
|
buffer_[i].seq.store(i, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
// Called when the queue transitions from empty -> non-empty (best-effort).
|
|
// Must be fast and must not call back into this queue.
|
|
void SetOnDataAvailable(std::function<void()> cb) {
|
|
std::lock_guard<std::mutex> lock(cb_mu_);
|
|
on_data_available_ = std::move(cb);
|
|
}
|
|
|
|
bool Push(T item) {
|
|
while (true) {
|
|
if (stop_.load(std::memory_order_acquire)) return false;
|
|
|
|
if (TryEnqueue(item)) {
|
|
pushed_.fetch_add(1, std::memory_order_relaxed);
|
|
const size_t prev_size = size_.fetch_add(1, std::memory_order_acq_rel);
|
|
if (prev_size == 0) {
|
|
std::function<void()> cb;
|
|
{
|
|
std::lock_guard<std::mutex> lock(cb_mu_);
|
|
cb = on_data_available_;
|
|
}
|
|
if (cb) cb();
|
|
data_cv_.notify_one();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// Queue full.
|
|
if (strategy_ == QueueDropStrategy::DropNewest) {
|
|
dropped_.fetch_add(1, std::memory_order_relaxed);
|
|
return true;
|
|
}
|
|
|
|
if (strategy_ == QueueDropStrategy::DropOldest) {
|
|
if (TryDropOneOldest()) {
|
|
continue;
|
|
}
|
|
std::this_thread::yield();
|
|
continue;
|
|
}
|
|
|
|
// Block until space is available.
|
|
std::unique_lock<std::mutex> lock(wait_mu_);
|
|
space_cv_.wait(lock, [&] {
|
|
return stop_.load(std::memory_order_acquire) ||
|
|
size_.load(std::memory_order_acquire) < capacity_;
|
|
});
|
|
}
|
|
}
|
|
|
|
bool Pop(T& out, std::chrono::milliseconds timeout) {
|
|
if (TryPop(out)) return true;
|
|
std::unique_lock<std::mutex> lock(wait_mu_);
|
|
if (!data_cv_.wait_for(lock, timeout, [&] {
|
|
return stop_.load(std::memory_order_acquire) ||
|
|
size_.load(std::memory_order_acquire) > 0;
|
|
})) {
|
|
return false;
|
|
}
|
|
return TryPop(out);
|
|
}
|
|
|
|
// Blocks until an item is available or the queue is stopped.
|
|
bool Pop(T& out) {
|
|
while (true) {
|
|
if (TryPop(out)) return true;
|
|
std::unique_lock<std::mutex> lock(wait_mu_);
|
|
data_cv_.wait(lock, [&] {
|
|
return stop_.load(std::memory_order_acquire) ||
|
|
size_.load(std::memory_order_acquire) > 0;
|
|
});
|
|
if (stop_.load(std::memory_order_acquire) &&
|
|
size_.load(std::memory_order_acquire) == 0) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Non-blocking pop.
|
|
bool TryPop(T& out) {
|
|
if (!TryDequeue(out)) return false;
|
|
popped_.fetch_add(1, std::memory_order_relaxed);
|
|
const size_t prev_size = size_.fetch_sub(1, std::memory_order_acq_rel);
|
|
if (prev_size == capacity_) {
|
|
// Queue was full before this pop.
|
|
space_cv_.notify_one();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// Pop up to max_items currently available (non-blocking). Returns true if any item was popped.
|
|
bool TryPopBatch(std::vector<T>& out, size_t max_items) {
|
|
if (max_items == 0) return false;
|
|
out.clear();
|
|
out.reserve(max_items);
|
|
for (size_t i = 0; i < max_items; ++i) {
|
|
T item;
|
|
if (!TryPop(item)) break;
|
|
out.push_back(std::move(item));
|
|
}
|
|
return !out.empty();
|
|
}
|
|
|
|
void Stop() {
|
|
stop_.store(true, std::memory_order_release);
|
|
data_cv_.notify_all();
|
|
space_cv_.notify_all();
|
|
}
|
|
|
|
bool IsStopped() const {
|
|
return stop_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
size_t Size() const {
|
|
return size_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
size_t Capacity() const { return capacity_; }
|
|
|
|
size_t DroppedCount() const {
|
|
return dropped_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
size_t PushedCount() const {
|
|
return pushed_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
size_t PoppedCount() const {
|
|
return popped_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
Stats GetStats() const {
|
|
Stats s;
|
|
s.size = size_.load(std::memory_order_acquire);
|
|
s.capacity = capacity_;
|
|
s.dropped = dropped_.load(std::memory_order_acquire);
|
|
s.pushed = pushed_.load(std::memory_order_acquire);
|
|
s.popped = popped_.load(std::memory_order_acquire);
|
|
s.stopped = stop_.load(std::memory_order_acquire);
|
|
return s;
|
|
}
|
|
|
|
private:
|
|
struct Cell {
|
|
std::atomic<size_t> seq;
|
|
T data;
|
|
|
|
Cell() : seq(0), data() {}
|
|
Cell(const Cell&) = delete;
|
|
Cell& operator=(const Cell&) = delete;
|
|
Cell(Cell&& other) noexcept
|
|
: seq(other.seq.load(std::memory_order_relaxed)), data(std::move(other.data)) {}
|
|
Cell& operator=(Cell&& other) noexcept {
|
|
if (this != &other) {
|
|
seq.store(other.seq.load(std::memory_order_relaxed), std::memory_order_relaxed);
|
|
data = std::move(other.data);
|
|
}
|
|
return *this;
|
|
}
|
|
};
|
|
|
|
// Lock-free bounded MPMC queue (Vyukov). We still expose it as SPSC in this project,
|
|
// but DropOldest may dequeue from the producer thread.
|
|
bool TryEnqueue(T& item) {
|
|
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
|
|
while (true) {
|
|
Cell& cell = buffer_[pos % capacity_];
|
|
const size_t seq = cell.seq.load(std::memory_order_acquire);
|
|
const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
|
|
if (dif == 0) {
|
|
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
|
|
cell.data = std::move(item);
|
|
cell.seq.store(pos + 1, std::memory_order_release);
|
|
return true;
|
|
}
|
|
} else if (dif < 0) {
|
|
return false; // full
|
|
} else {
|
|
pos = enqueue_pos_.load(std::memory_order_relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
bool TryDequeue(T& out) {
|
|
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
|
|
while (true) {
|
|
Cell& cell = buffer_[pos % capacity_];
|
|
const size_t seq = cell.seq.load(std::memory_order_acquire);
|
|
const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
|
|
if (dif == 0) {
|
|
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
|
|
out = std::move(cell.data);
|
|
cell.seq.store(pos + capacity_, std::memory_order_release);
|
|
return true;
|
|
}
|
|
} else if (dif < 0) {
|
|
return false; // empty
|
|
} else {
|
|
pos = dequeue_pos_.load(std::memory_order_relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
bool TryDropOneOldest() {
|
|
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
|
|
while (true) {
|
|
Cell& cell = buffer_[pos % capacity_];
|
|
const size_t seq = cell.seq.load(std::memory_order_acquire);
|
|
const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
|
|
if (dif == 0) {
|
|
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
|
|
// Drop the item and release its resources eagerly.
|
|
T dropped_item = std::move(cell.data);
|
|
(void)dropped_item;
|
|
cell.seq.store(pos + capacity_, std::memory_order_release);
|
|
size_.fetch_sub(1, std::memory_order_acq_rel);
|
|
dropped_.fetch_add(1, std::memory_order_relaxed);
|
|
return true;
|
|
}
|
|
} else if (dif < 0) {
|
|
return false;
|
|
} else {
|
|
pos = dequeue_pos_.load(std::memory_order_relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
size_t capacity_ = 0;
|
|
QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest;
|
|
alignas(64) std::atomic<size_t> enqueue_pos_{0};
|
|
alignas(64) std::atomic<size_t> dequeue_pos_{0};
|
|
std::vector<Cell> buffer_;
|
|
|
|
alignas(64) std::atomic<size_t> size_{0};
|
|
std::atomic<bool> stop_{false};
|
|
|
|
std::atomic<size_t> dropped_{0};
|
|
std::atomic<size_t> pushed_{0};
|
|
std::atomic<size_t> popped_{0};
|
|
|
|
mutable std::mutex wait_mu_;
|
|
std::condition_variable data_cv_;
|
|
std::condition_variable space_cv_;
|
|
|
|
mutable std::mutex cb_mu_;
|
|
std::function<void()> on_data_available_;
|
|
};
|
|
|
|
} // namespace rk3588
|