284 lines
6.7 KiB
C++
284 lines
6.7 KiB
C++
#include <gtest/gtest.h>
|
|
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
#include "utils/spsc_queue.h"
|
|
|
|
namespace rk3588 {
|
|
namespace {
|
|
|
|
TEST(SpscQueueTest, BasicPushPop) {
|
|
SpscQueue<int> q(4, QueueDropStrategy::DropOldest);
|
|
EXPECT_EQ(q.Capacity(), 4u);
|
|
EXPECT_EQ(q.Size(), 0u);
|
|
|
|
EXPECT_TRUE(q.Push(1));
|
|
EXPECT_TRUE(q.Push(2));
|
|
EXPECT_TRUE(q.Push(3));
|
|
EXPECT_EQ(q.Size(), 3u);
|
|
|
|
int v = 0;
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 1);
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 2);
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 3);
|
|
EXPECT_EQ(q.Size(), 0u);
|
|
}
|
|
|
|
TEST(SpscQueueTest, TryPopEmpty) {
|
|
SpscQueue<int> q(4, QueueDropStrategy::DropOldest);
|
|
int v = -1;
|
|
EXPECT_FALSE(q.TryPop(v));
|
|
EXPECT_EQ(v, -1); // unchanged
|
|
}
|
|
|
|
TEST(SpscQueueTest, DropOldestStrategy) {
|
|
SpscQueue<int> q(2, QueueDropStrategy::DropOldest);
|
|
|
|
EXPECT_TRUE(q.Push(1));
|
|
EXPECT_TRUE(q.Push(2));
|
|
EXPECT_EQ(q.Size(), 2u);
|
|
EXPECT_EQ(q.DroppedCount(), 0u);
|
|
|
|
// Queue full, should drop oldest (1)
|
|
EXPECT_TRUE(q.Push(3));
|
|
EXPECT_EQ(q.DroppedCount(), 1u);
|
|
|
|
int v = 0;
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 2);
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 3);
|
|
}
|
|
|
|
TEST(SpscQueueTest, DropNewestStrategy) {
|
|
SpscQueue<int> q(2, QueueDropStrategy::DropNewest);
|
|
|
|
EXPECT_TRUE(q.Push(1));
|
|
EXPECT_TRUE(q.Push(2));
|
|
EXPECT_EQ(q.DroppedCount(), 0u);
|
|
|
|
// Queue full, should drop newest (the one being pushed)
|
|
EXPECT_TRUE(q.Push(3));
|
|
EXPECT_EQ(q.DroppedCount(), 1u);
|
|
|
|
int v = 0;
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 1);
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 2);
|
|
EXPECT_FALSE(q.TryPop(v)); // 3 was dropped
|
|
}
|
|
|
|
TEST(SpscQueueTest, BlockingStrategy) {
|
|
SpscQueue<int> q(2, QueueDropStrategy::Block);
|
|
|
|
EXPECT_TRUE(q.Push(1));
|
|
EXPECT_TRUE(q.Push(2));
|
|
|
|
// Start a thread that will pop after a delay
|
|
std::thread popper([&q]() {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
int v;
|
|
q.TryPop(v);
|
|
});
|
|
|
|
// This should block until popper makes space
|
|
auto start = std::chrono::steady_clock::now();
|
|
EXPECT_TRUE(q.Push(3));
|
|
auto elapsed = std::chrono::steady_clock::now() - start;
|
|
EXPECT_GE(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(), 40);
|
|
|
|
popper.join();
|
|
|
|
int v;
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 2);
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 3);
|
|
}
|
|
|
|
TEST(SpscQueueTest, PopWithTimeout) {
|
|
SpscQueue<int> q(4, QueueDropStrategy::DropOldest);
|
|
|
|
// Pop on empty queue should timeout
|
|
int v = -1;
|
|
auto start = std::chrono::steady_clock::now();
|
|
bool got = q.Pop(v, std::chrono::milliseconds(50));
|
|
auto elapsed = std::chrono::steady_clock::now() - start;
|
|
|
|
EXPECT_FALSE(got);
|
|
EXPECT_GE(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(), 40);
|
|
}
|
|
|
|
TEST(SpscQueueTest, PopWithTimeoutSuccess) {
|
|
SpscQueue<int> q(4, QueueDropStrategy::DropOldest);
|
|
|
|
std::thread pusher([&q]() {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(30));
|
|
q.Push(42);
|
|
});
|
|
|
|
int v = -1;
|
|
bool got = q.Pop(v, std::chrono::milliseconds(200));
|
|
EXPECT_TRUE(got);
|
|
EXPECT_EQ(v, 42);
|
|
|
|
pusher.join();
|
|
}
|
|
|
|
TEST(SpscQueueTest, Stop) {
|
|
SpscQueue<int> q(4, QueueDropStrategy::Block);
|
|
q.Push(1);
|
|
|
|
EXPECT_FALSE(q.IsStopped());
|
|
q.Stop();
|
|
EXPECT_TRUE(q.IsStopped());
|
|
|
|
// Push should fail after stop
|
|
EXPECT_FALSE(q.Push(2));
|
|
|
|
// Pop should still work for existing items
|
|
int v = 0;
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 1);
|
|
|
|
// After draining, pop should fail
|
|
EXPECT_FALSE(q.TryPop(v));
|
|
}
|
|
|
|
TEST(SpscQueueTest, StopUnblocksWaiters) {
|
|
SpscQueue<int> q(4, QueueDropStrategy::Block);
|
|
|
|
std::atomic<bool> popped{false};
|
|
std::thread waiter([&q, &popped]() {
|
|
int v;
|
|
q.Pop(v); // Will block
|
|
popped = true;
|
|
});
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(30));
|
|
EXPECT_FALSE(popped.load());
|
|
|
|
q.Stop();
|
|
|
|
waiter.join();
|
|
EXPECT_TRUE(popped.load());
|
|
}
|
|
|
|
TEST(SpscQueueTest, Stats) {
|
|
SpscQueue<int> q(2, QueueDropStrategy::DropOldest);
|
|
|
|
q.Push(1);
|
|
q.Push(2);
|
|
q.Push(3); // drops 1
|
|
|
|
int v;
|
|
q.TryPop(v);
|
|
|
|
auto stats = q.GetStats();
|
|
EXPECT_EQ(stats.capacity, 2u);
|
|
EXPECT_EQ(stats.size, 1u);
|
|
EXPECT_EQ(stats.pushed, 3u);
|
|
EXPECT_EQ(stats.popped, 1u);
|
|
EXPECT_EQ(stats.dropped, 1u);
|
|
EXPECT_FALSE(stats.stopped);
|
|
}
|
|
|
|
TEST(SpscQueueTest, TryPopBatch) {
|
|
SpscQueue<int> q(10, QueueDropStrategy::DropOldest);
|
|
|
|
for (int i = 0; i < 5; ++i) {
|
|
q.Push(i);
|
|
}
|
|
|
|
std::vector<int> batch;
|
|
bool got = q.TryPopBatch(batch, 3);
|
|
EXPECT_TRUE(got);
|
|
EXPECT_EQ(batch.size(), 3u);
|
|
EXPECT_EQ(batch[0], 0);
|
|
EXPECT_EQ(batch[1], 1);
|
|
EXPECT_EQ(batch[2], 2);
|
|
|
|
got = q.TryPopBatch(batch, 10); // request more than available
|
|
EXPECT_TRUE(got);
|
|
EXPECT_EQ(batch.size(), 2u);
|
|
EXPECT_EQ(batch[0], 3);
|
|
EXPECT_EQ(batch[1], 4);
|
|
|
|
got = q.TryPopBatch(batch, 5); // empty queue
|
|
EXPECT_FALSE(got);
|
|
EXPECT_TRUE(batch.empty());
|
|
}
|
|
|
|
TEST(SpscQueueTest, OnDataAvailableCallback) {
|
|
SpscQueue<int> q(4, QueueDropStrategy::DropOldest);
|
|
|
|
std::atomic<int> callback_count{0};
|
|
q.SetOnDataAvailable([&callback_count]() { callback_count.fetch_add(1); });
|
|
|
|
q.Push(1); // empty -> non-empty, should trigger
|
|
EXPECT_EQ(callback_count.load(), 1);
|
|
|
|
q.Push(2); // not empty -> still not empty, no trigger
|
|
EXPECT_EQ(callback_count.load(), 1);
|
|
|
|
int v;
|
|
q.TryPop(v);
|
|
q.TryPop(v); // now empty
|
|
|
|
q.Push(3); // empty -> non-empty, should trigger
|
|
EXPECT_EQ(callback_count.load(), 2);
|
|
}
|
|
|
|
TEST(SpscQueueTest, ConcurrentPushPop) {
|
|
constexpr int kNumItems = 10000;
|
|
SpscQueue<int> q(kNumItems, QueueDropStrategy::Block);
|
|
|
|
std::atomic<int> sum{0};
|
|
|
|
std::thread producer([&q, kNumItems]() {
|
|
for (int i = 1; i <= kNumItems; ++i) {
|
|
q.Push(i);
|
|
}
|
|
});
|
|
|
|
std::thread consumer([&q, &sum, kNumItems]() {
|
|
int received = 0;
|
|
while (received < kNumItems) {
|
|
int v;
|
|
if (q.TryPop(v)) {
|
|
sum.fetch_add(v);
|
|
++received;
|
|
} else {
|
|
std::this_thread::yield();
|
|
}
|
|
}
|
|
});
|
|
|
|
producer.join();
|
|
consumer.join();
|
|
|
|
// Sum of 1 to kNumItems
|
|
int expected = kNumItems * (kNumItems + 1) / 2;
|
|
EXPECT_EQ(sum.load(), expected);
|
|
}
|
|
|
|
TEST(SpscQueueTest, ZeroCapacityBecomesOne) {
|
|
SpscQueue<int> q(0, QueueDropStrategy::DropOldest);
|
|
EXPECT_EQ(q.Capacity(), 1u);
|
|
|
|
EXPECT_TRUE(q.Push(42));
|
|
int v;
|
|
EXPECT_TRUE(q.TryPop(v));
|
|
EXPECT_EQ(v, 42);
|
|
}
|
|
|
|
} // namespace
|
|
} // namespace rk3588
|