OrangePi3588Media/plugins/input_file/input_file_node.cpp
sladro a12f91da63
Some checks failed
CI / host-build (push) Has been cancelled
CI / rk3588-cross-build (push) Has been cancelled
修复功能bug,对齐prd
2026-01-03 16:12:03 +08:00

635 lines
22 KiB
C++

#include <algorithm>
#include <atomic>
#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include "node.h"
#include "utils/thread_affinity.h"
#if defined(RK3588_ENABLE_MPP)
extern "C" {
#include <rockchip/mpp_buffer.h>
#include <rockchip/mpp_frame.h>
#include <rockchip/mpp_packet.h>
#include <rockchip/rk_mpi.h>
}
#include <unistd.h>
#endif
#ifdef RK3588_ENABLE_FFMPEG
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/imgutils.h>
}
#endif
namespace rk3588 {
class InputFileNode : public INode {
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "input_file"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "input_file");
path_ = config.ValueOr<std::string>("path", "");
loop_ = config.ValueOr<bool>("loop", true);
realtime_ = config.ValueOr<bool>("realtime", true);
fps_ = config.ValueOr<int>("fps", 0);
width_ = config.ValueOr<int>("width", 1920);
height_ = config.ValueOr<int>("height", 1080);
use_ffmpeg_ = config.ValueOr<bool>("use_ffmpeg", true);
use_mpp_ = config.ValueOr<bool>("use_mpp", true);
fallback_to_stub_on_fail_ = config.ValueOr<bool>("fallback_to_stub_on_fail", false);
cpu_affinity_ = ParseCpuAffinity(config);
if (ctx.output_queues.empty()) {
std::cerr << "[input_file] no downstream queue configured for node " << id_ << "\n";
return false;
}
out_queues_ = ctx.output_queues;
if (path_.empty() && !fallback_to_stub_on_fail_) {
std::cerr << "[input_file] path is required\n";
return false;
}
return true;
}
bool Start() override {
if (out_queues_.empty()) return false;
running_.store(true);
#if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP)
if (use_ffmpeg_ && use_mpp_) {
worker_ = std::thread(&InputFileNode::LoopFfmpegMpp, this);
} else if (use_ffmpeg_) {
worker_ = std::thread(&InputFileNode::LoopFfmpegCpu, this);
} else {
worker_ = std::thread(&InputFileNode::LoopStub, this);
}
#elif defined(RK3588_ENABLE_FFMPEG)
if (use_ffmpeg_) {
worker_ = std::thread(&InputFileNode::LoopFfmpegCpu, this);
} else {
worker_ = std::thread(&InputFileNode::LoopStub, this);
}
#else
if (use_ffmpeg_ || use_mpp_) {
std::cerr << "[input_file] requested ffmpeg/mpp but not enabled at build time\n";
}
worker_ = std::thread(&InputFileNode::LoopStub, this);
#endif
std::cout << "[input_file] start path=" << path_ << " loop=" << (loop_ ? "true" : "false")
<< " realtime=" << (realtime_ ? "true" : "false")
<< " fps=" << fps_;
#if defined(RK3588_ENABLE_MPP)
if (use_ffmpeg_ && use_mpp_) std::cout << " (ffmpeg demux + mpp decode)";
else if (use_ffmpeg_) std::cout << " (ffmpeg cpu decode)";
else std::cout << " (stub)";
#else
if (use_ffmpeg_) std::cout << " (ffmpeg cpu decode)";
else std::cout << " (stub)";
#endif
std::cout << "\n";
return true;
}
void Stop() override {
running_.store(false);
for (auto& q : out_queues_) q->Stop();
if (worker_.joinable()) worker_.join();
std::cout << "[input_file] stopped\n";
}
void Drain() override {
running_.store(false);
}
private:
void ApplyAffinity() {
if (cpu_affinity_.empty()) return;
std::string aerr;
if (!SetCurrentThreadAffinity(cpu_affinity_, aerr)) {
std::cerr << "[input_file] SetCurrentThreadAffinity failed: " << aerr << "\n";
}
}
void PushToDownstream(FramePtr frame) {
for (auto& q : out_queues_) {
q->Push(frame);
}
}
void StopDownstreamQueues() {
for (auto& q : out_queues_) q->Stop();
}
void LoopStub() {
ApplyAffinity();
using namespace std::chrono;
const int fps = fps_ > 0 ? fps_ : 25;
auto interval = milliseconds(1000 / std::max(1, fps));
auto next_tp = steady_clock::now();
while (running_.load()) {
auto frame = std::make_shared<Frame>();
frame->width = width_;
frame->height = height_;
frame->format = PixelFormat::NV12;
frame->plane_count = 2;
const size_t y_size = static_cast<size_t>(width_) * height_;
const size_t total = y_size * 3 / 2;
auto buffer = std::make_shared<std::vector<uint8_t>>(total, 0);
frame->data = buffer->data();
frame->data_size = buffer->size();
frame->data_owner = buffer;
frame->stride = width_;
frame->planes[0] = {frame->data, width_, static_cast<int>(y_size), 0};
frame->planes[1] = {frame->data + y_size, width_, static_cast<int>(y_size / 2),
static_cast<int>(y_size)};
frame->frame_id = ++frame_id_;
frame->pts = duration_cast<microseconds>(steady_clock::now().time_since_epoch()).count();
PushToDownstream(frame);
if (realtime_ && interval.count() > 0) {
next_tp += interval;
std::this_thread::sleep_until(next_tp);
}
}
StopDownstreamQueues();
}
#ifdef RK3588_ENABLE_FFMPEG
bool OpenFile(AVFormatContext*& fmt_ctx, int& video_stream, AVRational& time_base,
int& out_fps, std::string& err) {
err.clear();
fmt_ctx = nullptr;
video_stream = -1;
time_base = AVRational{1, 1000};
out_fps = fps_ > 0 ? fps_ : 0;
if (path_.empty()) {
err = "path is empty";
return false;
}
if (avformat_open_input(&fmt_ctx, path_.c_str(), nullptr, nullptr) < 0) {
err = "avformat_open_input failed";
return false;
}
if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) {
avformat_close_input(&fmt_ctx);
err = "avformat_find_stream_info failed";
return false;
}
for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) {
if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
video_stream = static_cast<int>(i);
time_base = fmt_ctx->streams[i]->time_base;
if (out_fps <= 0) {
const AVRational r = fmt_ctx->streams[i]->avg_frame_rate;
if (r.num > 0 && r.den > 0) {
out_fps = static_cast<int>((static_cast<double>(r.num) / static_cast<double>(r.den)) + 0.5);
}
}
break;
}
}
if (video_stream < 0) {
avformat_close_input(&fmt_ctx);
err = "no video stream";
return false;
}
if (out_fps <= 0) out_fps = 25;
return true;
}
void Cleanup(AVFormatContext* fmt, AVCodecContext* dec, AVPacket* pkt, AVFrame* frm) {
if (dec) avcodec_free_context(&dec);
if (fmt) avformat_close_input(&fmt);
if (pkt) av_packet_free(&pkt);
if (frm) av_frame_free(&frm);
}
void PushFrameFromAVFrame(const AVFrame& f, AVRational time_base) {
PixelFormat fmt = PixelFormat::UNKNOWN;
int plane_count = 0;
int width = f.width;
int height = f.height;
if (f.format == AV_PIX_FMT_NV12) {
fmt = PixelFormat::NV12;
plane_count = 2;
} else if (f.format == AV_PIX_FMT_YUV420P || f.format == AV_PIX_FMT_YUVJ420P) {
fmt = PixelFormat::YUV420;
plane_count = 3;
}
if (plane_count == 0) return;
int buf_size = av_image_get_buffer_size(static_cast<AVPixelFormat>(f.format), width, height, 1);
if (buf_size <= 0) return;
auto buffer = std::make_shared<std::vector<uint8_t>>(static_cast<size_t>(buf_size));
if (av_image_copy_to_buffer(buffer->data(), buffer->size(), f.data, f.linesize,
static_cast<AVPixelFormat>(f.format), width, height, 1) < 0) {
return;
}
auto frame = std::make_shared<Frame>();
frame->width = width;
frame->height = height;
frame->format = fmt;
frame->stride = (fmt == PixelFormat::RGB || fmt == PixelFormat::BGR) ? width * 3 : width;
frame->data = buffer->data();
frame->data_size = buffer->size();
frame->plane_count = plane_count;
frame->data_owner = buffer;
frame->frame_id = ++frame_id_;
const int64_t best_pts = f.pts;
frame->pts = best_pts == AV_NOPTS_VALUE
? 0
: static_cast<uint64_t>(av_rescale_q(best_pts, time_base, {1, 1000000}));
if (fmt == PixelFormat::NV12 && plane_count == 2) {
int y_size = width * height;
frame->planes[0] = {buffer->data(), width, y_size, 0};
frame->planes[1] = {buffer->data() + y_size, width, y_size / 2, y_size};
} else if (fmt == PixelFormat::YUV420 && plane_count == 3) {
int y_size = width * height;
int uv_stride = width / 2;
int uv_h = height / 2;
int u_size = uv_stride * uv_h;
frame->planes[0] = {buffer->data(), width, y_size, 0};
frame->planes[1] = {buffer->data() + y_size, uv_stride, u_size, y_size};
frame->planes[2] = {buffer->data() + y_size + u_size, uv_stride, u_size, y_size + u_size};
}
PushToDownstream(frame);
}
void LoopFfmpegCpu() {
ApplyAffinity();
using namespace std::chrono;
while (running_.load()) {
AVFormatContext* fmt_ctx = nullptr;
AVCodecContext* codec_ctx = nullptr;
AVPacket* pkt = av_packet_alloc();
AVFrame* frm = av_frame_alloc();
int video_stream = -1;
AVRational time_base{1, 1000};
int fps_out = 25;
std::string oerr;
if (!OpenFile(fmt_ctx, video_stream, time_base, fps_out, oerr)) {
std::cerr << "[input_file] open failed: " << oerr << " path=" << path_ << "\n";
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
if (fallback_to_stub_on_fail_) {
LoopStub();
return;
}
running_.store(false);
break;
}
const AVCodec* codec = avcodec_find_decoder(fmt_ctx->streams[video_stream]->codecpar->codec_id);
if (!codec) {
std::cerr << "[input_file] decoder not found\n";
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
running_.store(false);
break;
}
codec_ctx = avcodec_alloc_context3(codec);
avcodec_parameters_to_context(codec_ctx, fmt_ctx->streams[video_stream]->codecpar);
if (avcodec_open2(codec_ctx, codec, nullptr) < 0) {
std::cerr << "[input_file] avcodec_open2 failed\n";
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
running_.store(false);
break;
}
auto interval = milliseconds(1000 / std::max(1, fps_out));
auto next_tp = steady_clock::now();
while (running_.load()) {
if (av_read_frame(fmt_ctx, pkt) < 0) {
break;
}
if (pkt->stream_index != video_stream) {
av_packet_unref(pkt);
continue;
}
if (avcodec_send_packet(codec_ctx, pkt) == 0) {
while (avcodec_receive_frame(codec_ctx, frm) == 0) {
PushFrameFromAVFrame(*frm, time_base);
av_frame_unref(frm);
if (realtime_ && interval.count() > 0) {
next_tp += interval;
std::this_thread::sleep_until(next_tp);
}
}
}
av_packet_unref(pkt);
}
Cleanup(fmt_ctx, codec_ctx, pkt, frm);
if (!running_.load()) break;
if (!loop_) {
break;
}
}
StopDownstreamQueues();
}
#endif // RK3588_ENABLE_FFMPEG
#if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP)
struct MppDecoderWrapper {
~MppDecoderWrapper() { Shutdown(); }
bool Init(MppCodingType type) {
MPP_RET ret = mpp_create(&ctx, &mpi);
if (ret != MPP_OK) return false;
ret = mpp_init(ctx, MPP_CTX_DEC, type);
if (ret != MPP_OK) return false;
MppDecCfg cfg = nullptr;
mpp_dec_cfg_init(&cfg);
ret = mpi->control(ctx, MPP_DEC_GET_CFG, cfg);
if (ret != MPP_OK) return false;
ret = mpp_dec_cfg_set_u32(cfg, "base:split_parse", 1);
if (ret != MPP_OK) return false;
ret = mpi->control(ctx, MPP_DEC_SET_CFG, cfg);
mpp_dec_cfg_deinit(cfg);
return ret == MPP_OK;
}
void Shutdown() {
if (packet) {
mpp_packet_deinit(&packet);
packet = nullptr;
}
if (frame_group) {
mpp_buffer_group_put(frame_group);
frame_group = nullptr;
}
if (ctx) {
mpp_destroy(ctx);
ctx = nullptr;
mpi = nullptr;
}
}
bool Decode(const uint8_t* data, size_t size, bool eos, int64_t pts_us,
const std::function<void(MppFrame)>& on_frame) {
if (!ctx || !mpi || !data || size == 0) return false;
if (!packet) {
if (mpp_packet_init(&packet, nullptr, 0) != MPP_OK) return false;
}
void* pkt_data = const_cast<uint8_t*>(data);
mpp_packet_set_data(packet, pkt_data);
mpp_packet_set_size(packet, size);
mpp_packet_set_pos(packet, pkt_data);
mpp_packet_set_length(packet, size);
mpp_packet_set_pts(packet, pts_us);
if (eos) mpp_packet_set_eos(packet);
bool pkt_done = false;
while (true) {
if (!pkt_done) {
if (mpi->decode_put_packet(ctx, packet) == MPP_OK) {
pkt_done = true;
} else {
usleep(2000);
}
}
int get_frm = 0;
MPP_RET ret = mpi->decode_get_frame(ctx, &frame);
if (ret == MPP_ERR_TIMEOUT) {
usleep(2000);
continue;
}
if (ret != MPP_OK) break;
if (frame) {
if (mpp_frame_get_info_change(frame)) {
HandleInfoChange();
} else {
if (on_frame) on_frame(frame);
}
get_frm = 1;
mpp_frame_deinit(&frame);
frame = nullptr;
}
if (get_frm) continue;
break;
}
mpp_packet_deinit(&packet);
packet = nullptr;
return true;
}
void HandleInfoChange() {
if (!ctx || !mpi || !frame) return;
MppBufferGroup group = frame_group;
size_t buf_size = mpp_frame_get_buf_size(frame);
if (!group) {
if (mpp_buffer_group_get_internal(&group, MPP_BUFFER_TYPE_DRM, NULL) != MPP_OK) return;
frame_group = group;
} else {
mpp_buffer_group_clear(group);
}
mpp_buffer_group_limit_config(group, buf_size, 24);
mpi->control(ctx, MPP_DEC_SET_EXT_BUF_GROUP, group);
mpi->control(ctx, MPP_DEC_SET_INFO_CHANGE_READY, nullptr);
}
MppCtx ctx = nullptr;
MppApi* mpi = nullptr;
MppBufferGroup frame_group = nullptr;
MppPacket packet = nullptr;
MppFrame frame = nullptr;
};
void PushFrameFromMpp(MppFrame frm) {
MppBuffer buf = mpp_frame_get_buffer(frm);
if (!buf) return;
mpp_buffer_inc_ref(buf);
auto owner = std::shared_ptr<void>(buf, [](void* p) { mpp_buffer_put(reinterpret_cast<MppBuffer>(p)); });
int width = mpp_frame_get_width(frm);
int height = mpp_frame_get_height(frm);
int hor_stride = mpp_frame_get_hor_stride(frm);
int ver_stride = mpp_frame_get_ver_stride(frm);
MppFrameFormat mpp_fmt = mpp_frame_get_fmt(frm);
PixelFormat fmt = PixelFormat::UNKNOWN;
int plane_count = 0;
if (mpp_fmt == MPP_FMT_YUV420SP
#ifdef MPP_FMT_YUV420SP_10BIT
|| mpp_fmt == MPP_FMT_YUV420SP_10BIT
#endif
) {
fmt = PixelFormat::NV12;
plane_count = 2;
} else if (mpp_fmt == MPP_FMT_YUV420P
#ifdef MPP_FMT_YUV420P10
|| mpp_fmt == MPP_FMT_YUV420P10
#endif
) {
fmt = PixelFormat::YUV420;
plane_count = 3;
}
if (plane_count == 0) return;
auto frame = std::make_shared<Frame>();
frame->width = width;
frame->height = height;
frame->format = fmt;
frame->stride = hor_stride;
frame->plane_count = plane_count;
frame->dma_fd = mpp_buffer_get_fd(buf);
frame->data = static_cast<uint8_t*>(mpp_buffer_get_ptr(buf));
frame->data_size = mpp_buffer_get_size(buf);
frame->data_owner = owner;
frame->frame_id = ++frame_id_;
frame->pts = mpp_frame_get_pts(frm);
if (fmt == PixelFormat::NV12 && plane_count == 2) {
int y_size = hor_stride * ver_stride;
frame->planes[0] = {frame->data, hor_stride, y_size, 0};
frame->planes[1] = {frame->data + y_size, hor_stride, y_size / 2, y_size};
} else if (fmt == PixelFormat::YUV420 && plane_count == 3) {
int y_size = hor_stride * ver_stride;
int uv_stride = hor_stride / 2;
int uv_h = ver_stride / 2;
int u_size = uv_stride * uv_h;
frame->planes[0] = {frame->data, hor_stride, y_size, 0};
frame->planes[1] = {frame->data + y_size, uv_stride, u_size, y_size};
frame->planes[2] = {frame->data + y_size + u_size, uv_stride, u_size, y_size + u_size};
}
PushToDownstream(frame);
}
void LoopFfmpegMpp() {
ApplyAffinity();
using namespace std::chrono;
while (running_.load()) {
AVFormatContext* fmt_ctx = nullptr;
AVPacket* pkt = av_packet_alloc();
int video_stream = -1;
AVRational time_base{1, 1000};
int fps_out = 25;
std::string oerr;
if (!OpenFile(fmt_ctx, video_stream, time_base, fps_out, oerr)) {
std::cerr << "[input_file] open failed: " << oerr << " path=" << path_ << "\n";
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
if (fallback_to_stub_on_fail_) {
LoopStub();
return;
}
running_.store(false);
break;
}
MppCodingType coding = MPP_VIDEO_CodingAVC;
auto codec_id = fmt_ctx->streams[video_stream]->codecpar->codec_id;
if (codec_id == AV_CODEC_ID_H264) coding = MPP_VIDEO_CodingAVC;
else if (codec_id == AV_CODEC_ID_HEVC) coding = MPP_VIDEO_CodingHEVC;
else {
std::cerr << "[input_file] unsupported codec for mpp\n";
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
running_.store(false);
break;
}
MppDecoderWrapper dec;
if (!dec.Init(coding)) {
std::cerr << "[input_file] mpp init failed\n";
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
running_.store(false);
break;
}
auto interval = milliseconds(1000 / std::max(1, fps_out));
auto next_tp = steady_clock::now();
while (running_.load()) {
if (av_read_frame(fmt_ctx, pkt) < 0) {
break;
}
if (pkt->stream_index != video_stream) {
av_packet_unref(pkt);
continue;
}
int64_t pts_us = pkt->pts == AV_NOPTS_VALUE ? 0
: av_rescale_q(pkt->pts, time_base, {1, 1000000});
dec.Decode(pkt->data, pkt->size, false, pts_us,
[&](MppFrame frm) {
PushFrameFromMpp(frm);
if (realtime_ && interval.count() > 0) {
next_tp += interval;
std::this_thread::sleep_until(next_tp);
}
});
av_packet_unref(pkt);
}
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
if (!running_.load()) break;
if (!loop_) {
break;
}
}
StopDownstreamQueues();
}
#endif
std::string id_;
std::string path_;
bool loop_ = true;
bool realtime_ = true;
int fps_ = 0;
int width_ = 1920;
int height_ = 1080;
bool use_ffmpeg_ = true;
bool use_mpp_ = true;
bool fallback_to_stub_on_fail_ = false;
std::vector<int> cpu_affinity_;
std::atomic<bool> running_{false};
std::vector<std::shared_ptr<SpscQueue<FramePtr>>> out_queues_;
std::thread worker_;
uint64_t frame_id_ = 0;
};
REGISTER_NODE(InputFileNode, "input_file");
} // namespace rk3588