#include #include #include #include #include #include #include #include #include "node.h" #include "hw/hw_factory.h" #include "utils/thread_affinity.h" #include "utils/logger.h" #if defined(RK3588_ENABLE_MPP) extern "C" { #include #include #include #include } #include #endif #ifdef RK3588_ENABLE_FFMPEG extern "C" { #include #include #include } #endif namespace rk3588 { class InputFileNode : public INode { public: std::string Id() const override { return id_; } std::string Type() const override { return "input_file"; } bool Init(const SimpleJson& config, const NodeContext& ctx) override { id_ = config.ValueOr("id", "input_file"); path_ = config.ValueOr("path", ""); loop_ = config.ValueOr("loop", true); realtime_ = config.ValueOr("realtime", true); fps_ = config.ValueOr("fps", 0); width_ = config.ValueOr("width", 1920); height_ = config.ValueOr("height", 1080); platform_ = config.ValueOr("platform", ""); hw_platform_ = config.ValueOr("hw_platform", ""); use_ffmpeg_ = config.ValueOr("use_ffmpeg", true); use_mpp_ = config.ValueOr("use_mpp", true); fallback_to_stub_on_fail_ = config.ValueOr("fallback_to_stub_on_fail", false); cpu_affinity_ = ParseCpuAffinity(config); if (ctx.output_queues.empty()) { LogError("[input_file] no downstream queue configured for node " + id_); return false; } out_queues_ = ctx.output_queues; if (path_.empty() && !fallback_to_stub_on_fail_) { LogError("[input_file] path is required"); return false; } return true; } bool Start() override { if (out_queues_.empty()) return false; running_.store(true); #if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP) if (use_ffmpeg_ && use_mpp_) { worker_ = std::thread(&InputFileNode::LoopFfmpegMpp, this); } else if (use_ffmpeg_) { worker_ = std::thread(&InputFileNode::LoopFfmpegCpu, this); } else { worker_ = std::thread(&InputFileNode::LoopStub, this); } #elif defined(RK3588_ENABLE_FFMPEG) if (use_ffmpeg_) { worker_ = std::thread(&InputFileNode::LoopFfmpegCpu, this); } else { worker_ = std::thread(&InputFileNode::LoopStub, this); } #else if (use_ffmpeg_ || use_mpp_) { LogError("[input_file] requested ffmpeg/mpp but not enabled at build time"); } worker_ = std::thread(&InputFileNode::LoopStub, this); #endif std::string mode; #if defined(RK3588_ENABLE_MPP) if (use_ffmpeg_ && use_mpp_) mode = "ffmpeg demux + mpp decode"; else if (use_ffmpeg_) mode = "ffmpeg cpu decode"; else mode = "stub"; #else if (use_ffmpeg_) mode = "ffmpeg cpu decode"; else mode = "stub"; #endif LogInfo("[input_file] start path=" + path_ + " loop=" + (loop_ ? std::string("true") : std::string("false")) + " realtime=" + (realtime_ ? std::string("true") : std::string("false")) + " fps=" + std::to_string(fps_) + " (" + mode + ")"); return true; } void Stop() override { running_.store(false); for (auto& q : out_queues_) q->Stop(); if (worker_.joinable()) worker_.join(); LogInfo("[input_file] stopped"); } void Drain() override { running_.store(false); } private: #if defined(RK3588_ENABLE_FFMPEG) static SimpleJson BuildDecoderConfig(const std::string& backend, AVCodecID codec_id, const uint8_t* extradata, size_t extradata_size, const std::string& platform, const std::string& hw_platform) { SimpleJson::Object obj; obj["backend"] = SimpleJson(backend); obj["codec_id"] = SimpleJson(static_cast(codec_id)); if (codec_id == AV_CODEC_ID_H264) { obj["codec"] = SimpleJson(std::string("h264")); } else if (codec_id == AV_CODEC_ID_HEVC) { obj["codec"] = SimpleJson(std::string("h265")); } if (!platform.empty()) { obj["platform"] = SimpleJson(platform); } if (!hw_platform.empty()) { obj["hw_platform"] = SimpleJson(hw_platform); } if (extradata && extradata_size > 0) { SimpleJson::Array arr; arr.reserve(extradata_size); for (size_t i = 0; i < extradata_size; ++i) { arr.emplace_back(static_cast(extradata[i])); } obj["extradata"] = SimpleJson(std::move(arr)); } return SimpleJson(std::move(obj)); } static void DrainDecoder(const std::shared_ptr& decoder, const std::function& on_frame) { if (!decoder) return; while (true) { auto out = decoder->Receive(); if (!out.Ok()) { if (out.ErrMessage() != "no_frame") { LogWarn("[input_file] decoder receive: " + out.ErrMessage()); } break; } if (on_frame) on_frame(out.Value()); } } #endif void ApplyAffinity() { if (cpu_affinity_.empty()) return; std::string aerr; if (!SetCurrentThreadAffinity(cpu_affinity_, aerr)) { LogWarn("[input_file] SetCurrentThreadAffinity failed: " + aerr); } } void PushToDownstream(FramePtr frame) { for (auto& q : out_queues_) { q->Push(frame); } } void HandleDecodedFrame(const FramePtr& frame) { if (!frame) return; frame->frame_id = ++frame_id_; PushToDownstream(frame); } void StopDownstreamQueues() { for (auto& q : out_queues_) q->Stop(); } void LoopStub() { ApplyAffinity(); using namespace std::chrono; const int fps = fps_ > 0 ? fps_ : 25; auto interval = milliseconds(1000 / std::max(1, fps)); auto next_tp = steady_clock::now(); while (running_.load()) { auto frame = std::make_shared(); frame->width = width_; frame->height = height_; frame->format = PixelFormat::NV12; frame->plane_count = 2; const size_t y_size = static_cast(width_) * height_; const size_t total = y_size * 3 / 2; auto buffer = std::make_shared>(total, 0); frame->data = buffer->data(); frame->data_size = buffer->size(); frame->SetOwner(buffer); frame->stride = width_; frame->planes[0] = {frame->data, width_, static_cast(y_size), 0}; frame->planes[1] = {frame->data + y_size, width_, static_cast(y_size / 2), static_cast(y_size)}; frame->SyncBufferFromFrame(); frame->frame_id = ++frame_id_; frame->pts = duration_cast(steady_clock::now().time_since_epoch()).count(); PushToDownstream(frame); if (realtime_ && interval.count() > 0) { next_tp += interval; std::this_thread::sleep_until(next_tp); } } StopDownstreamQueues(); } #ifdef RK3588_ENABLE_FFMPEG bool OpenFile(AVFormatContext*& fmt_ctx, int& video_stream, AVRational& time_base, int& out_fps, std::string& err) { err.clear(); fmt_ctx = nullptr; video_stream = -1; time_base = AVRational{1, 1000}; out_fps = fps_ > 0 ? fps_ : 0; if (path_.empty()) { err = "path is empty"; return false; } if (avformat_open_input(&fmt_ctx, path_.c_str(), nullptr, nullptr) < 0) { err = "avformat_open_input failed"; return false; } if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) { avformat_close_input(&fmt_ctx); err = "avformat_find_stream_info failed"; return false; } for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) { if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { video_stream = static_cast(i); time_base = fmt_ctx->streams[i]->time_base; if (out_fps <= 0) { const AVRational r = fmt_ctx->streams[i]->avg_frame_rate; if (r.num > 0 && r.den > 0) { out_fps = static_cast((static_cast(r.num) / static_cast(r.den)) + 0.5); } } break; } } if (video_stream < 0) { avformat_close_input(&fmt_ctx); err = "no video stream"; return false; } if (out_fps <= 0) out_fps = 25; return true; } void Cleanup(AVFormatContext* fmt, AVCodecContext* dec, AVPacket* pkt, AVFrame* frm) { if (dec) avcodec_free_context(&dec); if (fmt) avformat_close_input(&fmt); if (pkt) av_packet_free(&pkt); if (frm) av_frame_free(&frm); } void LoopFfmpegCpu() { ApplyAffinity(); using namespace std::chrono; while (running_.load()) { AVFormatContext* fmt_ctx = nullptr; AVPacket* pkt = av_packet_alloc(); int video_stream = -1; AVRational time_base{1, 1000}; int fps_out = 25; std::string oerr; if (!OpenFile(fmt_ctx, video_stream, time_base, fps_out, oerr)) { LogError("[input_file] open failed: " + oerr + " path=" + path_); Cleanup(fmt_ctx, nullptr, pkt, nullptr); if (fallback_to_stub_on_fail_) { LoopStub(); return; } running_.store(false); break; } const AVCodec* codec = avcodec_find_decoder(fmt_ctx->streams[video_stream]->codecpar->codec_id); if (!codec) { LogError("[input_file] decoder not found"); Cleanup(fmt_ctx, nullptr, pkt, nullptr); running_.store(false); break; } const auto* par = fmt_ctx->streams[video_stream]->codecpar; SimpleJson dec_cfg = BuildDecoderConfig("ffmpeg", par->codec_id, par->extradata, static_cast(std::max(0, par->extradata_size)), platform_, hw_platform_); auto decoder = HwFactory::CreateDecoder(dec_cfg); if (!decoder || decoder->Open(dec_cfg).Failed()) { LogError("[input_file] ffmpeg decoder open failed"); Cleanup(fmt_ctx, nullptr, pkt, nullptr); running_.store(false); break; } auto interval = milliseconds(1000 / std::max(1, fps_out)); auto next_tp = steady_clock::now(); while (running_.load()) { if (av_read_frame(fmt_ctx, pkt) < 0) { break; } if (pkt->stream_index != video_stream) { av_packet_unref(pkt); continue; } int64_t pts_us = 0; if (pkt->pts != AV_NOPTS_VALUE) { pts_us = av_rescale_q(pkt->pts, time_base, {1, 1000000}); } DecodePacket dp; dp.data = pkt->data; dp.size = static_cast(pkt->size); dp.pts = static_cast(std::max(0, pts_us)); dp.keyframe = (pkt->flags & AV_PKT_FLAG_KEY) != 0; if (decoder->Send(dp).Ok()) { DrainDecoder(decoder, [&](const FramePtr& frame) { HandleDecodedFrame(frame); if (realtime_ && interval.count() > 0) { next_tp += interval; std::this_thread::sleep_until(next_tp); } }); } av_packet_unref(pkt); } decoder->Close(); Cleanup(fmt_ctx, nullptr, pkt, nullptr); if (!running_.load()) break; if (!loop_) { break; } } StopDownstreamQueues(); } #endif // RK3588_ENABLE_FFMPEG #if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP) void LoopFfmpegMpp() { ApplyAffinity(); using namespace std::chrono; while (running_.load()) { AVFormatContext* fmt_ctx = nullptr; AVPacket* pkt = av_packet_alloc(); int video_stream = -1; AVRational time_base{1, 1000}; int fps_out = 25; std::string oerr; if (!OpenFile(fmt_ctx, video_stream, time_base, fps_out, oerr)) { LogError("[input_file] open failed: " + oerr + " path=" + path_); Cleanup(fmt_ctx, nullptr, pkt, nullptr); if (fallback_to_stub_on_fail_) { LoopStub(); return; } running_.store(false); break; } auto codec_id = fmt_ctx->streams[video_stream]->codecpar->codec_id; if (codec_id != AV_CODEC_ID_H264 && codec_id != AV_CODEC_ID_HEVC) { LogError("[input_file] unsupported codec for mpp"); Cleanup(fmt_ctx, nullptr, pkt, nullptr); running_.store(false); break; } const auto* par = fmt_ctx->streams[video_stream]->codecpar; SimpleJson dec_cfg = BuildDecoderConfig("mpp", codec_id, par->extradata, static_cast(std::max(0, par->extradata_size)), platform_, hw_platform_); auto decoder = HwFactory::CreateDecoder(dec_cfg); if (!decoder || decoder->Open(dec_cfg).Failed()) { LogError("[input_file] mpp decoder open failed"); Cleanup(fmt_ctx, nullptr, pkt, nullptr); running_.store(false); break; } auto interval = milliseconds(1000 / std::max(1, fps_out)); auto next_tp = steady_clock::now(); while (running_.load()) { if (av_read_frame(fmt_ctx, pkt) < 0) { break; } if (pkt->stream_index != video_stream) { av_packet_unref(pkt); continue; } int64_t pts_us = pkt->pts == AV_NOPTS_VALUE ? 0 : av_rescale_q(pkt->pts, time_base, {1, 1000000}); DecodePacket dp; dp.data = pkt->data; dp.size = static_cast(pkt->size); dp.pts = static_cast(std::max(0, pts_us)); dp.keyframe = (pkt->flags & AV_PKT_FLAG_KEY) != 0; if (decoder->Send(dp).Ok()) { DrainDecoder(decoder, [&](const FramePtr& frame) { HandleDecodedFrame(frame); if (realtime_ && interval.count() > 0) { next_tp += interval; std::this_thread::sleep_until(next_tp); } }); } av_packet_unref(pkt); } decoder->Close(); Cleanup(fmt_ctx, nullptr, pkt, nullptr); if (!running_.load()) break; if (!loop_) { break; } } StopDownstreamQueues(); } #endif std::string id_; std::string path_; bool loop_ = true; bool realtime_ = true; int fps_ = 0; int width_ = 1920; int height_ = 1080; bool use_ffmpeg_ = true; bool use_mpp_ = true; bool fallback_to_stub_on_fail_ = false; std::vector cpu_affinity_; std::string platform_; std::string hw_platform_; std::atomic running_{false}; std::vector>> out_queues_; std::thread worker_; uint64_t frame_id_ = 0; }; REGISTER_NODE(InputFileNode, "input_file"); } // namespace rk3588