489 lines
17 KiB
C++
489 lines
17 KiB
C++
#include <algorithm>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <string>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
#include "node.h"
|
|
#include "hw/hw_factory.h"
|
|
#include "utils/thread_affinity.h"
|
|
#include "utils/logger.h"
|
|
|
|
#if defined(RK3588_ENABLE_MPP)
|
|
extern "C" {
|
|
#include <rockchip/mpp_buffer.h>
|
|
#include <rockchip/mpp_frame.h>
|
|
#include <rockchip/mpp_packet.h>
|
|
#include <rockchip/rk_mpi.h>
|
|
}
|
|
#include <unistd.h>
|
|
#endif
|
|
|
|
#ifdef RK3588_ENABLE_FFMPEG
|
|
extern "C" {
|
|
#include <libavcodec/avcodec.h>
|
|
#include <libavformat/avformat.h>
|
|
#include <libavutil/imgutils.h>
|
|
}
|
|
#endif
|
|
|
|
namespace rk3588 {
|
|
|
|
class InputFileNode : public INode {
|
|
public:
|
|
std::string Id() const override { return id_; }
|
|
std::string Type() const override { return "input_file"; }
|
|
|
|
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
|
|
id_ = config.ValueOr<std::string>("id", "input_file");
|
|
path_ = config.ValueOr<std::string>("path", "");
|
|
loop_ = config.ValueOr<bool>("loop", true);
|
|
realtime_ = config.ValueOr<bool>("realtime", true);
|
|
fps_ = config.ValueOr<int>("fps", 0);
|
|
width_ = config.ValueOr<int>("width", 1920);
|
|
height_ = config.ValueOr<int>("height", 1080);
|
|
platform_ = config.ValueOr<std::string>("platform", "");
|
|
hw_platform_ = config.ValueOr<std::string>("hw_platform", "");
|
|
use_ffmpeg_ = config.ValueOr<bool>("use_ffmpeg", true);
|
|
use_mpp_ = config.ValueOr<bool>("use_mpp", true);
|
|
fallback_to_stub_on_fail_ = config.ValueOr<bool>("fallback_to_stub_on_fail", false);
|
|
cpu_affinity_ = ParseCpuAffinity(config);
|
|
|
|
if (ctx.output_queues.empty()) {
|
|
LogError("[input_file] no downstream queue configured for node " + id_);
|
|
return false;
|
|
}
|
|
out_queues_ = ctx.output_queues;
|
|
|
|
if (path_.empty() && !fallback_to_stub_on_fail_) {
|
|
LogError("[input_file] path is required");
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool Start() override {
|
|
if (out_queues_.empty()) return false;
|
|
running_.store(true);
|
|
|
|
#if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP)
|
|
if (use_ffmpeg_ && use_mpp_) {
|
|
worker_ = std::thread(&InputFileNode::LoopFfmpegMpp, this);
|
|
} else if (use_ffmpeg_) {
|
|
worker_ = std::thread(&InputFileNode::LoopFfmpegCpu, this);
|
|
} else {
|
|
worker_ = std::thread(&InputFileNode::LoopStub, this);
|
|
}
|
|
#elif defined(RK3588_ENABLE_FFMPEG)
|
|
if (use_ffmpeg_) {
|
|
worker_ = std::thread(&InputFileNode::LoopFfmpegCpu, this);
|
|
} else {
|
|
worker_ = std::thread(&InputFileNode::LoopStub, this);
|
|
}
|
|
#else
|
|
if (use_ffmpeg_ || use_mpp_) {
|
|
LogError("[input_file] requested ffmpeg/mpp but not enabled at build time");
|
|
}
|
|
worker_ = std::thread(&InputFileNode::LoopStub, this);
|
|
#endif
|
|
|
|
std::string mode;
|
|
#if defined(RK3588_ENABLE_MPP)
|
|
if (use_ffmpeg_ && use_mpp_) mode = "ffmpeg demux + mpp decode";
|
|
else if (use_ffmpeg_) mode = "ffmpeg cpu decode";
|
|
else mode = "stub";
|
|
#else
|
|
if (use_ffmpeg_) mode = "ffmpeg cpu decode";
|
|
else mode = "stub";
|
|
#endif
|
|
|
|
LogInfo("[input_file] start path=" + path_ +
|
|
" loop=" + (loop_ ? std::string("true") : std::string("false")) +
|
|
" realtime=" + (realtime_ ? std::string("true") : std::string("false")) +
|
|
" fps=" + std::to_string(fps_) +
|
|
" (" + mode + ")");
|
|
return true;
|
|
}
|
|
|
|
void Stop() override {
|
|
running_.store(false);
|
|
for (auto& q : out_queues_) q->Stop();
|
|
if (worker_.joinable()) worker_.join();
|
|
LogInfo("[input_file] stopped");
|
|
}
|
|
|
|
void Drain() override {
|
|
running_.store(false);
|
|
}
|
|
|
|
private:
|
|
#if defined(RK3588_ENABLE_FFMPEG)
|
|
static SimpleJson BuildDecoderConfig(const std::string& backend,
|
|
AVCodecID codec_id,
|
|
const uint8_t* extradata,
|
|
size_t extradata_size,
|
|
const std::string& platform,
|
|
const std::string& hw_platform) {
|
|
SimpleJson::Object obj;
|
|
obj["backend"] = SimpleJson(backend);
|
|
obj["codec_id"] = SimpleJson(static_cast<double>(codec_id));
|
|
if (codec_id == AV_CODEC_ID_H264) {
|
|
obj["codec"] = SimpleJson(std::string("h264"));
|
|
} else if (codec_id == AV_CODEC_ID_HEVC) {
|
|
obj["codec"] = SimpleJson(std::string("h265"));
|
|
}
|
|
if (!platform.empty()) {
|
|
obj["platform"] = SimpleJson(platform);
|
|
}
|
|
if (!hw_platform.empty()) {
|
|
obj["hw_platform"] = SimpleJson(hw_platform);
|
|
}
|
|
if (extradata && extradata_size > 0) {
|
|
SimpleJson::Array arr;
|
|
arr.reserve(extradata_size);
|
|
for (size_t i = 0; i < extradata_size; ++i) {
|
|
arr.emplace_back(static_cast<double>(extradata[i]));
|
|
}
|
|
obj["extradata"] = SimpleJson(std::move(arr));
|
|
}
|
|
return SimpleJson(std::move(obj));
|
|
}
|
|
|
|
static void DrainDecoder(const std::shared_ptr<IDecoder>& decoder,
|
|
const std::function<void(const FramePtr&)>& on_frame) {
|
|
if (!decoder) return;
|
|
while (true) {
|
|
auto out = decoder->Receive();
|
|
if (!out.Ok()) {
|
|
if (out.ErrMessage() != "no_frame") {
|
|
LogWarn("[input_file] decoder receive: " + out.ErrMessage());
|
|
}
|
|
break;
|
|
}
|
|
if (on_frame) on_frame(out.Value());
|
|
}
|
|
}
|
|
#endif
|
|
|
|
void ApplyAffinity() {
|
|
if (cpu_affinity_.empty()) return;
|
|
std::string aerr;
|
|
if (!SetCurrentThreadAffinity(cpu_affinity_, aerr)) {
|
|
LogWarn("[input_file] SetCurrentThreadAffinity failed: " + aerr);
|
|
}
|
|
}
|
|
|
|
void PushToDownstream(FramePtr frame) {
|
|
for (auto& q : out_queues_) {
|
|
q->Push(frame);
|
|
}
|
|
}
|
|
|
|
void HandleDecodedFrame(const FramePtr& frame) {
|
|
if (!frame) return;
|
|
frame->frame_id = ++frame_id_;
|
|
PushToDownstream(frame);
|
|
}
|
|
|
|
void StopDownstreamQueues() {
|
|
for (auto& q : out_queues_) q->Stop();
|
|
}
|
|
|
|
void LoopStub() {
|
|
ApplyAffinity();
|
|
using namespace std::chrono;
|
|
|
|
const int fps = fps_ > 0 ? fps_ : 25;
|
|
auto interval = milliseconds(1000 / std::max(1, fps));
|
|
auto next_tp = steady_clock::now();
|
|
|
|
while (running_.load()) {
|
|
auto frame = std::make_shared<Frame>();
|
|
frame->width = width_;
|
|
frame->height = height_;
|
|
frame->format = PixelFormat::NV12;
|
|
frame->plane_count = 2;
|
|
|
|
const size_t y_size = static_cast<size_t>(width_) * height_;
|
|
const size_t total = y_size * 3 / 2;
|
|
auto buffer = std::make_shared<std::vector<uint8_t>>(total, 0);
|
|
frame->data = buffer->data();
|
|
frame->data_size = buffer->size();
|
|
frame->SetOwner(buffer);
|
|
frame->stride = width_;
|
|
frame->planes[0] = {frame->data, width_, static_cast<int>(y_size), 0};
|
|
frame->planes[1] = {frame->data + y_size, width_, static_cast<int>(y_size / 2),
|
|
static_cast<int>(y_size)};
|
|
frame->SyncBufferFromFrame();
|
|
|
|
frame->frame_id = ++frame_id_;
|
|
frame->pts = duration_cast<microseconds>(steady_clock::now().time_since_epoch()).count();
|
|
PushToDownstream(frame);
|
|
|
|
if (realtime_ && interval.count() > 0) {
|
|
next_tp += interval;
|
|
std::this_thread::sleep_until(next_tp);
|
|
}
|
|
}
|
|
|
|
StopDownstreamQueues();
|
|
}
|
|
|
|
#ifdef RK3588_ENABLE_FFMPEG
|
|
bool OpenFile(AVFormatContext*& fmt_ctx, int& video_stream, AVRational& time_base,
|
|
int& out_fps, std::string& err) {
|
|
err.clear();
|
|
fmt_ctx = nullptr;
|
|
video_stream = -1;
|
|
time_base = AVRational{1, 1000};
|
|
out_fps = fps_ > 0 ? fps_ : 0;
|
|
|
|
if (path_.empty()) {
|
|
err = "path is empty";
|
|
return false;
|
|
}
|
|
|
|
if (avformat_open_input(&fmt_ctx, path_.c_str(), nullptr, nullptr) < 0) {
|
|
err = "avformat_open_input failed";
|
|
return false;
|
|
}
|
|
if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) {
|
|
avformat_close_input(&fmt_ctx);
|
|
err = "avformat_find_stream_info failed";
|
|
return false;
|
|
}
|
|
|
|
for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) {
|
|
if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
|
|
video_stream = static_cast<int>(i);
|
|
time_base = fmt_ctx->streams[i]->time_base;
|
|
if (out_fps <= 0) {
|
|
const AVRational r = fmt_ctx->streams[i]->avg_frame_rate;
|
|
if (r.num > 0 && r.den > 0) {
|
|
out_fps = static_cast<int>((static_cast<double>(r.num) / static_cast<double>(r.den)) + 0.5);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
if (video_stream < 0) {
|
|
avformat_close_input(&fmt_ctx);
|
|
err = "no video stream";
|
|
return false;
|
|
}
|
|
if (out_fps <= 0) out_fps = 25;
|
|
return true;
|
|
}
|
|
|
|
void Cleanup(AVFormatContext* fmt, AVCodecContext* dec, AVPacket* pkt, AVFrame* frm) {
|
|
if (dec) avcodec_free_context(&dec);
|
|
if (fmt) avformat_close_input(&fmt);
|
|
if (pkt) av_packet_free(&pkt);
|
|
if (frm) av_frame_free(&frm);
|
|
}
|
|
|
|
void LoopFfmpegCpu() {
|
|
ApplyAffinity();
|
|
using namespace std::chrono;
|
|
|
|
while (running_.load()) {
|
|
AVFormatContext* fmt_ctx = nullptr;
|
|
AVPacket* pkt = av_packet_alloc();
|
|
int video_stream = -1;
|
|
AVRational time_base{1, 1000};
|
|
int fps_out = 25;
|
|
|
|
std::string oerr;
|
|
if (!OpenFile(fmt_ctx, video_stream, time_base, fps_out, oerr)) {
|
|
LogError("[input_file] open failed: " + oerr + " path=" + path_);
|
|
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
|
|
if (fallback_to_stub_on_fail_) {
|
|
LoopStub();
|
|
return;
|
|
}
|
|
running_.store(false);
|
|
break;
|
|
}
|
|
|
|
const AVCodec* codec = avcodec_find_decoder(fmt_ctx->streams[video_stream]->codecpar->codec_id);
|
|
if (!codec) {
|
|
LogError("[input_file] decoder not found");
|
|
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
|
|
running_.store(false);
|
|
break;
|
|
}
|
|
const auto* par = fmt_ctx->streams[video_stream]->codecpar;
|
|
SimpleJson dec_cfg = BuildDecoderConfig("ffmpeg", par->codec_id,
|
|
par->extradata,
|
|
static_cast<size_t>(std::max(0, par->extradata_size)),
|
|
platform_, hw_platform_);
|
|
auto decoder = HwFactory::CreateDecoder(dec_cfg);
|
|
if (!decoder || decoder->Open(dec_cfg).Failed()) {
|
|
LogError("[input_file] ffmpeg decoder open failed");
|
|
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
|
|
running_.store(false);
|
|
break;
|
|
}
|
|
|
|
auto interval = milliseconds(1000 / std::max(1, fps_out));
|
|
auto next_tp = steady_clock::now();
|
|
|
|
while (running_.load()) {
|
|
if (av_read_frame(fmt_ctx, pkt) < 0) {
|
|
break;
|
|
}
|
|
if (pkt->stream_index != video_stream) {
|
|
av_packet_unref(pkt);
|
|
continue;
|
|
}
|
|
int64_t pts_us = 0;
|
|
if (pkt->pts != AV_NOPTS_VALUE) {
|
|
pts_us = av_rescale_q(pkt->pts, time_base, {1, 1000000});
|
|
}
|
|
DecodePacket dp;
|
|
dp.data = pkt->data;
|
|
dp.size = static_cast<size_t>(pkt->size);
|
|
dp.pts = static_cast<uint64_t>(std::max<int64_t>(0, pts_us));
|
|
dp.keyframe = (pkt->flags & AV_PKT_FLAG_KEY) != 0;
|
|
if (decoder->Send(dp).Ok()) {
|
|
DrainDecoder(decoder, [&](const FramePtr& frame) {
|
|
HandleDecodedFrame(frame);
|
|
if (realtime_ && interval.count() > 0) {
|
|
next_tp += interval;
|
|
std::this_thread::sleep_until(next_tp);
|
|
}
|
|
});
|
|
}
|
|
av_packet_unref(pkt);
|
|
}
|
|
|
|
decoder->Close();
|
|
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
|
|
|
|
if (!running_.load()) break;
|
|
if (!loop_) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
StopDownstreamQueues();
|
|
}
|
|
#endif // RK3588_ENABLE_FFMPEG
|
|
|
|
#if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP)
|
|
void LoopFfmpegMpp() {
|
|
ApplyAffinity();
|
|
using namespace std::chrono;
|
|
|
|
while (running_.load()) {
|
|
AVFormatContext* fmt_ctx = nullptr;
|
|
AVPacket* pkt = av_packet_alloc();
|
|
int video_stream = -1;
|
|
AVRational time_base{1, 1000};
|
|
int fps_out = 25;
|
|
|
|
std::string oerr;
|
|
if (!OpenFile(fmt_ctx, video_stream, time_base, fps_out, oerr)) {
|
|
LogError("[input_file] open failed: " + oerr + " path=" + path_);
|
|
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
|
|
if (fallback_to_stub_on_fail_) {
|
|
LoopStub();
|
|
return;
|
|
}
|
|
running_.store(false);
|
|
break;
|
|
}
|
|
|
|
auto codec_id = fmt_ctx->streams[video_stream]->codecpar->codec_id;
|
|
if (codec_id != AV_CODEC_ID_H264 && codec_id != AV_CODEC_ID_HEVC) {
|
|
LogError("[input_file] unsupported codec for mpp");
|
|
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
|
|
running_.store(false);
|
|
break;
|
|
}
|
|
|
|
const auto* par = fmt_ctx->streams[video_stream]->codecpar;
|
|
SimpleJson dec_cfg = BuildDecoderConfig("mpp", codec_id,
|
|
par->extradata,
|
|
static_cast<size_t>(std::max(0, par->extradata_size)),
|
|
platform_, hw_platform_);
|
|
auto decoder = HwFactory::CreateDecoder(dec_cfg);
|
|
if (!decoder || decoder->Open(dec_cfg).Failed()) {
|
|
LogError("[input_file] mpp decoder open failed");
|
|
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
|
|
running_.store(false);
|
|
break;
|
|
}
|
|
|
|
auto interval = milliseconds(1000 / std::max(1, fps_out));
|
|
auto next_tp = steady_clock::now();
|
|
|
|
while (running_.load()) {
|
|
if (av_read_frame(fmt_ctx, pkt) < 0) {
|
|
break;
|
|
}
|
|
if (pkt->stream_index != video_stream) {
|
|
av_packet_unref(pkt);
|
|
continue;
|
|
}
|
|
int64_t pts_us = pkt->pts == AV_NOPTS_VALUE ? 0
|
|
: av_rescale_q(pkt->pts, time_base, {1, 1000000});
|
|
DecodePacket dp;
|
|
dp.data = pkt->data;
|
|
dp.size = static_cast<size_t>(pkt->size);
|
|
dp.pts = static_cast<uint64_t>(std::max<int64_t>(0, pts_us));
|
|
dp.keyframe = (pkt->flags & AV_PKT_FLAG_KEY) != 0;
|
|
if (decoder->Send(dp).Ok()) {
|
|
DrainDecoder(decoder, [&](const FramePtr& frame) {
|
|
HandleDecodedFrame(frame);
|
|
if (realtime_ && interval.count() > 0) {
|
|
next_tp += interval;
|
|
std::this_thread::sleep_until(next_tp);
|
|
}
|
|
});
|
|
}
|
|
av_packet_unref(pkt);
|
|
}
|
|
|
|
decoder->Close();
|
|
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
|
|
|
|
if (!running_.load()) break;
|
|
if (!loop_) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
StopDownstreamQueues();
|
|
}
|
|
#endif
|
|
|
|
std::string id_;
|
|
std::string path_;
|
|
bool loop_ = true;
|
|
bool realtime_ = true;
|
|
int fps_ = 0;
|
|
int width_ = 1920;
|
|
int height_ = 1080;
|
|
bool use_ffmpeg_ = true;
|
|
bool use_mpp_ = true;
|
|
bool fallback_to_stub_on_fail_ = false;
|
|
std::vector<int> cpu_affinity_;
|
|
|
|
std::string platform_;
|
|
std::string hw_platform_;
|
|
|
|
std::atomic<bool> running_{false};
|
|
std::vector<std::shared_ptr<SpscQueue<FramePtr>>> out_queues_;
|
|
std::thread worker_;
|
|
uint64_t frame_id_ = 0;
|
|
};
|
|
|
|
REGISTER_NODE(InputFileNode, "input_file");
|
|
|
|
} // namespace rk3588
|