#pragma once #include #include #include #include #include #include #include #include #include #include namespace rk3588 { enum class QueueDropStrategy { DropOldest, DropNewest, Block }; template class SpscQueue { public: struct Stats { size_t size = 0; size_t capacity = 0; size_t dropped = 0; size_t pushed = 0; size_t popped = 0; bool stopped = false; }; SpscQueue(size_t capacity, QueueDropStrategy strategy) : capacity_(capacity == 0 ? 1 : capacity), strategy_(strategy) { buffer_.resize(capacity_); for (size_t i = 0; i < capacity_; ++i) { buffer_[i].seq.store(i, std::memory_order_relaxed); } } // Called when the queue transitions from empty -> non-empty (best-effort). // Must be fast and must not call back into this queue. void SetOnDataAvailable(std::function cb) { std::lock_guard lock(cb_mu_); on_data_available_ = std::move(cb); } bool Push(T item) { while (true) { if (stop_.load(std::memory_order_acquire)) return false; if (TryEnqueue(item)) { pushed_.fetch_add(1, std::memory_order_relaxed); const size_t prev_size = size_.fetch_add(1, std::memory_order_acq_rel); if (prev_size == 0) { std::function cb; { std::lock_guard lock(cb_mu_); cb = on_data_available_; } if (cb) cb(); data_cv_.notify_one(); } return true; } // Queue full. if (strategy_ == QueueDropStrategy::DropNewest) { dropped_.fetch_add(1, std::memory_order_relaxed); return true; } if (strategy_ == QueueDropStrategy::DropOldest) { if (TryDropOneOldest()) { continue; } std::this_thread::yield(); continue; } // Block until space is available. std::unique_lock lock(wait_mu_); space_cv_.wait(lock, [&] { return stop_.load(std::memory_order_acquire) || size_.load(std::memory_order_acquire) < capacity_; }); } } bool Pop(T& out, std::chrono::milliseconds timeout) { if (TryPop(out)) return true; std::unique_lock lock(wait_mu_); if (!data_cv_.wait_for(lock, timeout, [&] { return stop_.load(std::memory_order_acquire) || size_.load(std::memory_order_acquire) > 0; })) { return false; } return TryPop(out); } // Blocks until an item is available or the queue is stopped. bool Pop(T& out) { while (true) { if (TryPop(out)) return true; std::unique_lock lock(wait_mu_); data_cv_.wait(lock, [&] { return stop_.load(std::memory_order_acquire) || size_.load(std::memory_order_acquire) > 0; }); if (stop_.load(std::memory_order_acquire) && size_.load(std::memory_order_acquire) == 0) { return false; } } } // Non-blocking pop. bool TryPop(T& out) { if (!TryDequeue(out)) return false; popped_.fetch_add(1, std::memory_order_relaxed); const size_t prev_size = size_.fetch_sub(1, std::memory_order_acq_rel); if (prev_size == capacity_) { // Queue was full before this pop. space_cv_.notify_one(); } return true; } // Pop up to max_items currently available (non-blocking). Returns true if any item was popped. bool TryPopBatch(std::vector& out, size_t max_items) { if (max_items == 0) return false; out.clear(); out.reserve(max_items); for (size_t i = 0; i < max_items; ++i) { T item; if (!TryPop(item)) break; out.push_back(std::move(item)); } return !out.empty(); } void Stop() { stop_.store(true, std::memory_order_release); data_cv_.notify_all(); space_cv_.notify_all(); } bool IsStopped() const { return stop_.load(std::memory_order_acquire); } size_t Size() const { return size_.load(std::memory_order_acquire); } size_t Capacity() const { return capacity_; } size_t DroppedCount() const { return dropped_.load(std::memory_order_acquire); } size_t PushedCount() const { return pushed_.load(std::memory_order_acquire); } size_t PoppedCount() const { return popped_.load(std::memory_order_acquire); } Stats GetStats() const { Stats s; s.size = size_.load(std::memory_order_acquire); s.capacity = capacity_; s.dropped = dropped_.load(std::memory_order_acquire); s.pushed = pushed_.load(std::memory_order_acquire); s.popped = popped_.load(std::memory_order_acquire); s.stopped = stop_.load(std::memory_order_acquire); return s; } private: struct Cell { std::atomic seq; T data; Cell() : seq(0), data() {} Cell(const Cell&) = delete; Cell& operator=(const Cell&) = delete; Cell(Cell&& other) noexcept : seq(other.seq.load(std::memory_order_relaxed)), data(std::move(other.data)) {} Cell& operator=(Cell&& other) noexcept { if (this != &other) { seq.store(other.seq.load(std::memory_order_relaxed), std::memory_order_relaxed); data = std::move(other.data); } return *this; } }; // Lock-free bounded MPMC queue (Vyukov). We still expose it as SPSC in this project, // but DropOldest may dequeue from the producer thread. bool TryEnqueue(T& item) { size_t pos = enqueue_pos_.load(std::memory_order_relaxed); while (true) { Cell& cell = buffer_[pos % capacity_]; const size_t seq = cell.seq.load(std::memory_order_acquire); const intptr_t dif = static_cast(seq) - static_cast(pos); if (dif == 0) { if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { cell.data = std::move(item); cell.seq.store(pos + 1, std::memory_order_release); return true; } } else if (dif < 0) { return false; // full } else { pos = enqueue_pos_.load(std::memory_order_relaxed); } } } bool TryDequeue(T& out) { size_t pos = dequeue_pos_.load(std::memory_order_relaxed); while (true) { Cell& cell = buffer_[pos % capacity_]; const size_t seq = cell.seq.load(std::memory_order_acquire); const intptr_t dif = static_cast(seq) - static_cast(pos + 1); if (dif == 0) { if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { out = std::move(cell.data); cell.seq.store(pos + capacity_, std::memory_order_release); return true; } } else if (dif < 0) { return false; // empty } else { pos = dequeue_pos_.load(std::memory_order_relaxed); } } } bool TryDropOneOldest() { size_t pos = dequeue_pos_.load(std::memory_order_relaxed); while (true) { Cell& cell = buffer_[pos % capacity_]; const size_t seq = cell.seq.load(std::memory_order_acquire); const intptr_t dif = static_cast(seq) - static_cast(pos + 1); if (dif == 0) { if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { // Drop the item and release its resources eagerly. T dropped_item = std::move(cell.data); (void)dropped_item; cell.seq.store(pos + capacity_, std::memory_order_release); size_.fetch_sub(1, std::memory_order_acq_rel); dropped_.fetch_add(1, std::memory_order_relaxed); return true; } } else if (dif < 0) { return false; } else { pos = dequeue_pos_.load(std::memory_order_relaxed); } } } size_t capacity_ = 0; QueueDropStrategy strategy_ = QueueDropStrategy::DropOldest; alignas(64) std::atomic enqueue_pos_{0}; alignas(64) std::atomic dequeue_pos_{0}; std::vector buffer_; alignas(64) std::atomic size_{0}; std::atomic stop_{false}; std::atomic dropped_{0}; std::atomic pushed_{0}; std::atomic popped_{0}; mutable std::mutex wait_mu_; std::condition_variable data_cv_; std::condition_variable space_cv_; mutable std::mutex cb_mu_; std::function on_data_available_; }; } // namespace rk3588