rtsp_tensorrt/tests/test_rtsp_reader.cpp
sladro e13cb3659c feat: 初始化项目结构
- 创建基本项目结构和目录
- 添加CMake构建系统
- 实现基础的配置解析功能
- 添加YOLO推理框架支持
- 集成RTSP和视频流处理功能
- 添加性能监控和日志系统
2024-12-24 16:25:03 +08:00

143 lines
4.0 KiB
C++

#include <gtest/gtest.h>
#include "pipeline/input/rtsp_reader.hpp"
#include "pipeline/common/yaml_config_parser.hpp"
#include <thread>
#include <chrono>
#include <opencv2/imgcodecs.hpp>
#include <opencv2/imgproc.hpp>
using namespace pipeline;
class MockRtspSource {
public:
MockRtspSource() {
// 创建一个测试图像
frame_ = cv::Mat(480, 640, CV_8UC3, cv::Scalar(0, 255, 0));
cv::putText(frame_, "Test Frame", cv::Point(50, 50),
cv::FONT_HERSHEY_SIMPLEX, 1.0, cv::Scalar(255, 255, 255), 2);
}
cv::Mat getFrame() const {
return frame_.clone();
}
private:
cv::Mat frame_;
};
class RtspReaderTest : public ::testing::Test {
protected:
void SetUp() override {
// 配置测试参数
config_.buffer_size = 30;
config_.max_retry_count = 3;
config_.retry_interval_ms = 100; // 使用较短的间隔以加快测试
config_.frame_timeout_ms = 1000;
config_.target_fps = 30.0f;
// 使用本地测试文件作为模拟源
mock_source_ = std::make_unique<MockRtspSource>();
test_url_ = "mock://test";
}
RtspReader::Config config_;
std::string test_url_;
std::unique_ptr<MockRtspSource> mock_source_;
};
// 测试连接功能
TEST_F(RtspReaderTest, Connection) {
RtspReader reader(config_);
// 测试无效URL
EXPECT_FALSE(reader.connect("invalid_url"));
EXPECT_FALSE(reader.isConnected());
// 测试模拟URL
EXPECT_TRUE(reader.connect(test_url_));
EXPECT_TRUE(reader.isConnected());
EXPECT_EQ(reader.getUrl(), test_url_);
}
// 测试断开连接
TEST_F(RtspReaderTest, Disconnect) {
RtspReader reader(config_);
// 连接后断开
EXPECT_TRUE(reader.connect(test_url_));
EXPECT_TRUE(reader.isConnected());
reader.disconnect();
EXPECT_FALSE(reader.isConnected());
}
// 测试读取功能
TEST_F(RtspReaderTest, ReadFrame) {
RtspReader reader(config_);
cv::Mat frame;
// 连接并读取帧
EXPECT_TRUE(reader.connect(test_url_));
EXPECT_TRUE(reader.read(frame));
// 检查帧的基本属性
EXPECT_FALSE(frame.empty());
EXPECT_EQ(frame.cols, 640);
EXPECT_EQ(frame.rows, 480);
EXPECT_EQ(frame.channels(), 3);
}
// 测试重连功能
TEST_F(RtspReaderTest, Reconnection) {
RtspReader reader(config_);
// 测试正常连接
EXPECT_TRUE(reader.connect(test_url_));
// 模拟断开连接
reader.disconnect();
// 测试重连
cv::Mat frame;
EXPECT_TRUE(reader.read(frame)); // 这里会触发重连
EXPECT_TRUE(reader.isConnected());
EXPECT_FALSE(frame.empty());
}
// 测试帧率控制
TEST_F(RtspReaderTest, FrameRateControl) {
RtspReader reader(config_);
cv::Mat frame;
EXPECT_TRUE(reader.connect(test_url_));
// 测试帧率
auto start = std::chrono::steady_clock::now();
int frame_count = 0;
for (int i = 0; i < 10; ++i) { // 减少测试帧数以加快测试
if (reader.read(frame)) {
frame_count++;
}
}
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start).count() / 1000.0f;
float actual_fps = frame_count / duration;
// 允许较大的误差范围,因为是模拟测试
EXPECT_GT(actual_fps, 0.0f);
EXPECT_LT(actual_fps, config_.target_fps * 1.5f);
}
// 测试超时行为
TEST_F(RtspReaderTest, Timeout) {
config_.frame_timeout_ms = 100; // 设置较短的超时时间
RtspReader reader(config_);
cv::Mat frame;
// 测试正常读取的超时
EXPECT_TRUE(reader.connect(test_url_));
auto start = std::chrono::steady_clock::now();
EXPECT_TRUE(reader.read(frame));
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start).count();
EXPECT_LT(duration, config_.frame_timeout_ms);
}