OrangePi3588Media/plugins/input_rtsp/input_rtsp_node.cpp

245 lines
8.7 KiB
C++

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>
#include "node.h"
#ifdef RK3588_ENABLE_FFMPEG
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
}
#endif
namespace rk3588 {
class InputRtspNode : public INode {
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "input_rtsp"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "input_rtsp");
url_ = config.ValueOr<std::string>("url", "");
fps_ = config.ValueOr<int>("fps", 25);
width_ = config.ValueOr<int>("width", 1920);
height_ = config.ValueOr<int>("height", 1080);
use_ffmpeg_ = config.ValueOr<bool>("use_ffmpeg", false);
if (ctx.output_queues.empty()) {
std::cerr << "[input_rtsp] no downstream queue configured for node " << id_ << "\n";
return false;
}
out_queue_ = ctx.output_queues[0];
return true;
}
bool Start() override {
if (!out_queue_) return false;
running_.store(true);
#if defined(RK3588_ENABLE_FFMPEG)
if (use_ffmpeg_) {
worker_ = std::thread(&InputRtspNode::LoopFfmpeg, this);
} else {
worker_ = std::thread(&InputRtspNode::LoopStub, this);
}
#else
if (use_ffmpeg_) {
std::cerr << "[input_rtsp] use_ffmpeg requested but not enabled at build time" << "\n";
}
worker_ = std::thread(&InputRtspNode::LoopStub, this);
#endif
std::cout << "[input_rtsp] start url=" << url_ << " fps=" << fps_
<< (use_ffmpeg_ ? " (ffmpeg)" : " (stub)") << "\n";
return true;
}
void Stop() override {
running_.store(false);
if (out_queue_) out_queue_->Stop();
if (worker_.joinable()) worker_.join();
}
private:
void LoopStub() {
using namespace std::chrono;
auto frame_interval = fps_ > 0 ? milliseconds(1000 / fps_) : milliseconds(40);
while (running_.load()) {
auto frame = std::make_shared<Frame>();
frame->width = width_;
frame->height = height_;
frame->format = PixelFormat::NV12;
frame->plane_count = 2;
frame->planes[0] = {nullptr, width_, width_ * height_, 0};
frame->planes[1] = {nullptr, width_, width_ * height_ / 2, width_ * height_};
frame->frame_id = ++frame_id_;
frame->pts = duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
out_queue_->Push(frame);
if (frame_id_ % 100 == 0) {
std::cout << "[input_rtsp] generated frame " << frame_id_ << " queue="
<< out_queue_->Size() << " drops=" << out_queue_->DroppedCount() << "\n";
}
std::this_thread::sleep_for(frame_interval);
}
}
#if defined(RK3588_ENABLE_FFMPEG)
void LoopFfmpeg() {
using namespace std::chrono;
AVFormatContext* fmt_ctx = nullptr;
AVCodecContext* codec_ctx = nullptr;
AVPacket* pkt = av_packet_alloc();
AVFrame* frm = av_frame_alloc();
int video_stream = -1;
AVRational time_base{1, 1000};
if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, nullptr) < 0) {
std::cerr << "[input_rtsp] avformat_open_input failed: " << url_ << "\n";
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
LoopStub();
return;
}
if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) {
std::cerr << "[input_rtsp] avformat_find_stream_info failed\n";
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
LoopStub();
return;
}
for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) {
if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
video_stream = static_cast<int>(i);
time_base = fmt_ctx->streams[i]->time_base;
break;
}
}
if (video_stream < 0) {
std::cerr << "[input_rtsp] no video stream\n";
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
LoopStub();
return;
}
const AVCodec* codec = avcodec_find_decoder(fmt_ctx->streams[video_stream]->codecpar->codec_id);
if (!codec) {
std::cerr << "[input_rtsp] decoder not found\n";
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
LoopStub();
return;
}
codec_ctx = avcodec_alloc_context3(codec);
avcodec_parameters_to_context(codec_ctx, fmt_ctx->streams[video_stream]->codecpar);
if (avcodec_open2(codec_ctx, codec, nullptr) < 0) {
std::cerr << "[input_rtsp] avcodec_open2 failed\n";
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
LoopStub();
return;
}
while (running_.load()) {
if (av_read_frame(fmt_ctx, pkt) < 0) {
std::this_thread::sleep_for(milliseconds(10));
continue;
}
if (pkt->stream_index != video_stream) {
av_packet_unref(pkt);
continue;
}
if (avcodec_send_packet(codec_ctx, pkt) == 0) {
while (avcodec_receive_frame(codec_ctx, frm) == 0) {
PushFrameFromAVFrame(*frm, time_base);
av_frame_unref(frm);
}
}
av_packet_unref(pkt);
}
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
}
void PushFrameFromAVFrame(const AVFrame& f, AVRational time_base) {
PixelFormat fmt = PixelFormat::UNKNOWN;
int plane_count = 0;
int width = f.width;
int height = f.height;
if (f.format == AV_PIX_FMT_NV12) {
fmt = PixelFormat::NV12;
plane_count = 2;
} else if (f.format == AV_PIX_FMT_YUV420P || f.format == AV_PIX_FMT_YUVJ420P) {
fmt = PixelFormat::YUV420;
plane_count = 3;
}
int buf_size = av_image_get_buffer_size(static_cast<AVPixelFormat>(f.format), width, height, 1);
if (buf_size <= 0) return;
auto buffer = std::make_shared<std::vector<uint8_t>>(buf_size);
if (av_image_copy_to_buffer(buffer->data(), buffer->size(), f.data, f.linesize,
static_cast<AVPixelFormat>(f.format), width, height, 1) < 0) {
return;
}
auto frame = std::make_shared<Frame>();
frame->width = width;
frame->height = height;
frame->format = fmt;
frame->data = buffer->data();
frame->data_size = buffer->size();
frame->plane_count = plane_count;
frame->frame_id = ++frame_id_;
auto best_pts = av_frame_get_best_effort_timestamp(&f);
frame->pts = best_pts == AV_NOPTS_VALUE ? 0
: static_cast<uint64_t>(av_rescale_q(best_pts, time_base, {1, 1000}));
frame->data_owner = buffer;
if (fmt == PixelFormat::NV12 && plane_count == 2) {
int y_size = width * height;
frame->planes[0] = {buffer->data(), width, y_size, 0};
frame->planes[1] = {buffer->data() + y_size, width, y_size / 2, y_size};
} else if (fmt == PixelFormat::YUV420 && plane_count == 3) {
int y_size = width * height;
int uv_stride = width / 2;
int uv_h = height / 2;
int u_size = uv_stride * uv_h;
frame->planes[0] = {buffer->data(), width, y_size, 0};
frame->planes[1] = {buffer->data() + y_size, uv_stride, u_size, y_size};
frame->planes[2] = {buffer->data() + y_size + u_size, uv_stride, u_size, y_size + u_size};
}
if (!out_queue_->Push(frame)) {
// dropped by queue policy
}
if (frame_id_ % 100 == 0) {
std::cout << "[input_rtsp] recv frame " << frame->frame_id
<< " queue=" << out_queue_->Size()
<< " drops=" << out_queue_->DroppedCount() << "\n";
}
}
void Cleanup(AVFormatContext* fmt, AVCodecContext* dec, AVPacket* pkt, AVFrame* frm) {
if (dec) avcodec_free_context(&dec);
if (fmt) avformat_close_input(&fmt);
if (pkt) av_packet_free(&pkt);
if (frm) av_frame_free(&frm);
}
#endif
std::string id_;
std::string url_;
int fps_ = 25;
int width_ = 1920;
int height_ = 1080;
std::atomic<bool> running_{false};
std::shared_ptr<SpscQueue<FramePtr>> out_queue_;
std::thread worker_;
uint64_t frame_id_ = 0;
bool use_ffmpeg_ = false;
};
REGISTER_NODE(InputRtspNode, "input_rtsp");
} // namespace rk3588