#include #include #include #include #include #include "utils/spsc_queue.h" namespace rk3588 { namespace { TEST(SpscQueueTest, BasicPushPop) { SpscQueue q(4, QueueDropStrategy::DropOldest); EXPECT_EQ(q.Capacity(), 4u); EXPECT_EQ(q.Size(), 0u); EXPECT_TRUE(q.Push(1)); EXPECT_TRUE(q.Push(2)); EXPECT_TRUE(q.Push(3)); EXPECT_EQ(q.Size(), 3u); int v = 0; EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 1); EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 2); EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 3); EXPECT_EQ(q.Size(), 0u); } TEST(SpscQueueTest, TryPopEmpty) { SpscQueue q(4, QueueDropStrategy::DropOldest); int v = -1; EXPECT_FALSE(q.TryPop(v)); EXPECT_EQ(v, -1); // unchanged } TEST(SpscQueueTest, DropOldestStrategy) { SpscQueue q(2, QueueDropStrategy::DropOldest); EXPECT_TRUE(q.Push(1)); EXPECT_TRUE(q.Push(2)); EXPECT_EQ(q.Size(), 2u); EXPECT_EQ(q.DroppedCount(), 0u); // Queue full, should drop oldest (1) EXPECT_TRUE(q.Push(3)); EXPECT_EQ(q.DroppedCount(), 1u); int v = 0; EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 2); EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 3); } TEST(SpscQueueTest, DropNewestStrategy) { SpscQueue q(2, QueueDropStrategy::DropNewest); EXPECT_TRUE(q.Push(1)); EXPECT_TRUE(q.Push(2)); EXPECT_EQ(q.DroppedCount(), 0u); // Queue full, should drop newest (the one being pushed) EXPECT_TRUE(q.Push(3)); EXPECT_EQ(q.DroppedCount(), 1u); int v = 0; EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 1); EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 2); EXPECT_FALSE(q.TryPop(v)); // 3 was dropped } TEST(SpscQueueTest, BlockingStrategy) { SpscQueue q(2, QueueDropStrategy::Block); EXPECT_TRUE(q.Push(1)); EXPECT_TRUE(q.Push(2)); // Start a thread that will pop after a delay std::thread popper([&q]() { std::this_thread::sleep_for(std::chrono::milliseconds(50)); int v; q.TryPop(v); }); // This should block until popper makes space auto start = std::chrono::steady_clock::now(); EXPECT_TRUE(q.Push(3)); auto elapsed = std::chrono::steady_clock::now() - start; EXPECT_GE(std::chrono::duration_cast(elapsed).count(), 40); popper.join(); int v; EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 2); EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 3); } TEST(SpscQueueTest, PopWithTimeout) { SpscQueue q(4, QueueDropStrategy::DropOldest); // Pop on empty queue should timeout int v = -1; auto start = std::chrono::steady_clock::now(); bool got = q.Pop(v, std::chrono::milliseconds(50)); auto elapsed = std::chrono::steady_clock::now() - start; EXPECT_FALSE(got); EXPECT_GE(std::chrono::duration_cast(elapsed).count(), 40); } TEST(SpscQueueTest, PopWithTimeoutSuccess) { SpscQueue q(4, QueueDropStrategy::DropOldest); std::thread pusher([&q]() { std::this_thread::sleep_for(std::chrono::milliseconds(30)); q.Push(42); }); int v = -1; bool got = q.Pop(v, std::chrono::milliseconds(200)); EXPECT_TRUE(got); EXPECT_EQ(v, 42); pusher.join(); } TEST(SpscQueueTest, Stop) { SpscQueue q(4, QueueDropStrategy::Block); q.Push(1); EXPECT_FALSE(q.IsStopped()); q.Stop(); EXPECT_TRUE(q.IsStopped()); // Push should fail after stop EXPECT_FALSE(q.Push(2)); // Pop should still work for existing items int v = 0; EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 1); // After draining, pop should fail EXPECT_FALSE(q.TryPop(v)); } TEST(SpscQueueTest, StopUnblocksWaiters) { SpscQueue q(4, QueueDropStrategy::Block); std::atomic popped{false}; std::thread waiter([&q, &popped]() { int v; q.Pop(v); // Will block popped = true; }); std::this_thread::sleep_for(std::chrono::milliseconds(30)); EXPECT_FALSE(popped.load()); q.Stop(); waiter.join(); EXPECT_TRUE(popped.load()); } TEST(SpscQueueTest, Stats) { SpscQueue q(2, QueueDropStrategy::DropOldest); q.Push(1); q.Push(2); q.Push(3); // drops 1 int v; q.TryPop(v); auto stats = q.GetStats(); EXPECT_EQ(stats.capacity, 2u); EXPECT_EQ(stats.size, 1u); EXPECT_EQ(stats.pushed, 3u); EXPECT_EQ(stats.popped, 1u); EXPECT_EQ(stats.dropped, 1u); EXPECT_FALSE(stats.stopped); } TEST(SpscQueueTest, TryPopBatch) { SpscQueue q(10, QueueDropStrategy::DropOldest); for (int i = 0; i < 5; ++i) { q.Push(i); } std::vector batch; bool got = q.TryPopBatch(batch, 3); EXPECT_TRUE(got); EXPECT_EQ(batch.size(), 3u); EXPECT_EQ(batch[0], 0); EXPECT_EQ(batch[1], 1); EXPECT_EQ(batch[2], 2); got = q.TryPopBatch(batch, 10); // request more than available EXPECT_TRUE(got); EXPECT_EQ(batch.size(), 2u); EXPECT_EQ(batch[0], 3); EXPECT_EQ(batch[1], 4); got = q.TryPopBatch(batch, 5); // empty queue EXPECT_FALSE(got); EXPECT_TRUE(batch.empty()); } TEST(SpscQueueTest, OnDataAvailableCallback) { SpscQueue q(4, QueueDropStrategy::DropOldest); std::atomic callback_count{0}; q.SetOnDataAvailable([&callback_count]() { callback_count.fetch_add(1); }); q.Push(1); // empty -> non-empty, should trigger EXPECT_EQ(callback_count.load(), 1); q.Push(2); // not empty -> still not empty, no trigger EXPECT_EQ(callback_count.load(), 1); int v; q.TryPop(v); q.TryPop(v); // now empty q.Push(3); // empty -> non-empty, should trigger EXPECT_EQ(callback_count.load(), 2); } TEST(SpscQueueTest, ConcurrentPushPop) { SpscQueue q(100, QueueDropStrategy::DropOldest); constexpr int kNumItems = 10000; std::atomic sum{0}; std::thread producer([&q]() { for (int i = 1; i <= kNumItems; ++i) { q.Push(i); } }); std::thread consumer([&q, &sum]() { int received = 0; while (received < kNumItems) { int v; if (q.TryPop(v)) { sum.fetch_add(v); ++received; } else { std::this_thread::yield(); } } }); producer.join(); consumer.join(); // Sum of 1 to kNumItems int expected = kNumItems * (kNumItems + 1) / 2; EXPECT_EQ(sum.load(), expected); } TEST(SpscQueueTest, ZeroCapacityBecomesOne) { SpscQueue q(0, QueueDropStrategy::DropOldest); EXPECT_EQ(q.Capacity(), 1u); EXPECT_TRUE(q.Push(42)); int v; EXPECT_TRUE(q.TryPop(v)); EXPECT_EQ(v, 42); } } // namespace } // namespace rk3588