#include #include #include #include #include #include #include #include #include #include #include #include "node.h" #include "utils/thread_affinity.h" #include "utils/logger.h" #if defined(RK3588_ENABLE_MPP) extern "C" { #include #include #include #include } #include #endif #ifdef RK3588_ENABLE_FFMPEG extern "C" { #include #include #include #include #include #include } #endif namespace rk3588 { class InputRtspNode : public INode { public: std::string Id() const override { return id_; } std::string Type() const override { return "input_rtsp"; } bool Init(const SimpleJson& config, const NodeContext& ctx) override { id_ = config.ValueOr("id", "input_rtsp"); url_ = config.ValueOr("url", ""); fps_ = config.ValueOr("fps", 25); width_ = config.ValueOr("width", 1920); height_ = config.ValueOr("height", 1080); use_ffmpeg_ = config.ValueOr("use_ffmpeg", false); use_mpp_ = config.ValueOr("use_mpp", true); ffmpeg_force_tcp_ = config.ValueOr("force_tcp", true); reconnect_sec_ = config.ValueOr("reconnect_sec", 5); reconnect_backoff_max_sec_ = config.ValueOr("reconnect_backoff_max_sec", 30); fallback_to_stub_on_fail_ = config.ValueOr("fallback_to_stub_on_fail", false); if (const SimpleJson* dbg = config.Find("debug"); dbg && dbg->IsObject()) { stats_log_ = dbg->ValueOr("stats", stats_log_); stats_interval_ = std::max( 1, static_cast(dbg->ValueOr("stats_interval", static_cast(stats_interval_)))); } cpu_affinity_ = ParseCpuAffinity(config); if (ctx.output_queues.empty()) { LogError("[input_rtsp] no downstream queue configured for node " + id_); return false; } out_queues_ = ctx.output_queues; return true; } bool Start() override { if (out_queues_.empty()) return false; running_.store(true); #if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP) if (use_mpp_) { worker_ = std::thread(&InputRtspNode::LoopFfmpegMpp, this); } else if (use_ffmpeg_) { worker_ = std::thread(&InputRtspNode::LoopFfmpegCpu, this); } else { worker_ = std::thread(&InputRtspNode::LoopStub, this); } #elif defined(RK3588_ENABLE_FFMPEG) if (use_ffmpeg_) { worker_ = std::thread(&InputRtspNode::LoopFfmpegCpu, this); } else { worker_ = std::thread(&InputRtspNode::LoopStub, this); } #else if (use_ffmpeg_ || use_mpp_) { LogError("[input_rtsp] requested ffmpeg/mpp but not enabled at build time"); } worker_ = std::thread(&InputRtspNode::LoopStub, this); #endif std::string mode; #if defined(RK3588_ENABLE_MPP) if (use_mpp_) mode = " (ffmpeg demux + mpp decode)"; else if (use_ffmpeg_) mode = std::string(" (ffmpeg cpu decode") + (ffmpeg_force_tcp_ ? ", tcp)" : ", udp)"); else mode = " (stub)"; #else if (use_ffmpeg_) mode = std::string(" (ffmpeg cpu decode") + (ffmpeg_force_tcp_ ? ", tcp)" : ", udp)"); else mode = " (stub)"; #endif LogInfo("[input_rtsp] start id=" + id_ + " url=" + url_ + " fps=" + std::to_string(fps_) + mode); return true; } void Stop() override { running_.store(false); for (auto& q : out_queues_) q->Stop(); if (worker_.joinable()) worker_.join(); } void Drain() override { running_.store(false); } private: #if defined(RK3588_ENABLE_FFMPEG) static bool IContainsNoCase(const char* haystack, const char* needle) { if (!haystack || !needle) return false; const size_t nlen = std::strlen(needle); if (nlen == 0) return true; for (const char* p = haystack; *p; ++p) { size_t i = 0; while (i < nlen && p[i] && std::tolower(static_cast(p[i])) == std::tolower(static_cast(needle[i]))) { ++i; } if (i == nlen) return true; } return false; } static void InstallFfmpegLogFilterOnce() { static std::once_flag once; std::call_once(once, []() { // Filter known benign RTSP probe noise while keeping real errors. av_log_set_callback([](void* avcl, int level, const char* fmt, va_list vl) { (void)level; va_list vl_format; va_copy(vl_format, vl); char buf[1024]; vsnprintf(buf, sizeof(buf), fmt, vl_format); va_end(vl_format); buf[sizeof(buf) - 1] = '\0'; const char* item = avcl ? av_default_item_name(avcl) : ""; // Note: item name can be "rtsp" or "RTSP" depending on build. if (InputRtspNode::IContainsNoCase(item, "rtsp")) { // Seen during startup/probing on some cameras; not fatal if we later decode fine. if (InputRtspNode::IContainsNoCase(buf, "decoding for stream") && InputRtspNode::IContainsNoCase(buf, "failed")) return; if (InputRtspNode::IContainsNoCase(buf, "not enough frames to estimate rate")) return; } va_list vl_default; va_copy(vl_default, vl); av_log_default_callback(avcl, level, fmt, vl_default); va_end(vl_default); }); // Do not change global log level here; keep defaults and only filter specific noise. }); } #endif void ApplyAffinity() { if (cpu_affinity_.empty()) return; std::string aerr; if (!SetCurrentThreadAffinity(cpu_affinity_, aerr)) { LogWarn("[input_rtsp] SetCurrentThreadAffinity failed: " + aerr); } } void PushToDownstream(FramePtr frame) { for (auto& q : out_queues_) { q->Push(frame); } } void LoopStub() { ApplyAffinity(); using namespace std::chrono; auto frame_interval = fps_ > 0 ? milliseconds(1000 / fps_) : milliseconds(40); while (running_.load()) { auto frame = std::make_shared(); frame->width = width_; frame->height = height_; frame->format = PixelFormat::NV12; frame->plane_count = 2; frame->planes[0] = {nullptr, width_, width_ * height_, 0}; frame->planes[1] = {nullptr, width_, width_ * height_ / 2, width_ * height_}; frame->frame_id = ++frame_id_; frame->pts = duration_cast(steady_clock::now().time_since_epoch()).count(); PushToDownstream(frame); if (stats_log_ && stats_interval_ > 0 && (frame_id_ % stats_interval_) == 0) { LogInfo("[input_rtsp] generated frame=" + std::to_string(frame_id_) + " queue=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->Size()) + " drops=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->DroppedCount())); } std::this_thread::sleep_for(frame_interval); } } #if defined(RK3588_ENABLE_FFMPEG) void LoopFfmpegCpu() { #if defined(RK3588_ENABLE_FFMPEG) InstallFfmpegLogFilterOnce(); #endif ApplyAffinity(); using namespace std::chrono; int backoff = std::max(1, reconnect_sec_); const int backoff_max = std::max(backoff, reconnect_backoff_max_sec_); while (running_.load()) { AVFormatContext* fmt_ctx = nullptr; AVCodecContext* codec_ctx = nullptr; AVPacket* pkt = av_packet_alloc(); AVFrame* frm = av_frame_alloc(); int video_stream = -1; AVRational time_base{1, 1000}; AVDictionary* opts = nullptr; if (ffmpeg_force_tcp_) av_dict_set(&opts, "rtsp_transport", "tcp", 0); av_dict_set(&opts, "analyzeduration", "0", 0); av_dict_set(&opts, "probesize", "32768", 0); av_dict_set(&opts, "fflags", "nobuffer", 0); av_dict_set(&opts, "flags", "low_delay", 0); if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, &opts) < 0) { LogError("[input_rtsp] avformat_open_input failed: " + url_); av_dict_free(&opts); Cleanup(fmt_ctx, codec_ctx, pkt, frm); if (fallback_to_stub_on_fail_) { LoopStub(); return; } std::this_thread::sleep_for(seconds(backoff)); backoff = std::min(backoff_max, backoff * 2); continue; } av_dict_free(&opts); if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) { // For RTSP, codec parameters are often available from SDP already; keep going. LogWarn("[input_rtsp] avformat_find_stream_info failed (continuing)"); } for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) { if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { video_stream = static_cast(i); time_base = fmt_ctx->streams[i]->time_base; break; } } if (video_stream < 0) { LogError("[input_rtsp] no video stream"); Cleanup(fmt_ctx, codec_ctx, pkt, frm); std::this_thread::sleep_for(seconds(backoff)); backoff = std::min(backoff_max, backoff * 2); continue; } const AVCodec* codec = avcodec_find_decoder(fmt_ctx->streams[video_stream]->codecpar->codec_id); if (!codec) { LogError("[input_rtsp] decoder not found"); Cleanup(fmt_ctx, codec_ctx, pkt, frm); std::this_thread::sleep_for(seconds(backoff)); backoff = std::min(backoff_max, backoff * 2); continue; } codec_ctx = avcodec_alloc_context3(codec); avcodec_parameters_to_context(codec_ctx, fmt_ctx->streams[video_stream]->codecpar); if (avcodec_open2(codec_ctx, codec, nullptr) < 0) { LogError("[input_rtsp] avcodec_open2 failed"); Cleanup(fmt_ctx, codec_ctx, pkt, frm); std::this_thread::sleep_for(seconds(backoff)); backoff = std::min(backoff_max, backoff * 2); continue; } backoff = std::max(1, reconnect_sec_); // reset after successful open int read_fail = 0; while (running_.load()) { if (av_read_frame(fmt_ctx, pkt) < 0) { if (++read_fail >= 50) { break; } std::this_thread::sleep_for(milliseconds(10)); continue; } read_fail = 0; if (pkt->stream_index != video_stream) { av_packet_unref(pkt); continue; } if (avcodec_send_packet(codec_ctx, pkt) == 0) { while (avcodec_receive_frame(codec_ctx, frm) == 0) { PushFrameFromAVFrame(*frm, time_base); av_frame_unref(frm); } } av_packet_unref(pkt); } Cleanup(fmt_ctx, codec_ctx, pkt, frm); if (!running_.load()) break; std::this_thread::sleep_for(seconds(std::max(1, reconnect_sec_))); } } void PushFrameFromAVFrame(const AVFrame& f, AVRational time_base) { PixelFormat fmt = PixelFormat::UNKNOWN; int plane_count = 0; int width = f.width; int height = f.height; if (f.format == AV_PIX_FMT_NV12) { fmt = PixelFormat::NV12; plane_count = 2; } else if (f.format == AV_PIX_FMT_YUV420P || f.format == AV_PIX_FMT_YUVJ420P) { fmt = PixelFormat::YUV420; plane_count = 3; } int buf_size = av_image_get_buffer_size(static_cast(f.format), width, height, 1); if (buf_size <= 0) return; auto buffer = std::make_shared>(buf_size); if (av_image_copy_to_buffer(buffer->data(), buffer->size(), f.data, f.linesize, static_cast(f.format), width, height, 1) < 0) { return; } auto frame = std::make_shared(); frame->width = width; frame->height = height; frame->format = fmt; frame->data = buffer->data(); frame->data_size = buffer->size(); frame->plane_count = plane_count; frame->frame_id = ++frame_id_; auto best_pts = f.pts; // av_frame_get_best_effort_timestamp is deprecated frame->pts = best_pts == AV_NOPTS_VALUE ? 0 : static_cast(av_rescale_q(best_pts, time_base, {1, 1000000})); frame->data_owner = buffer; if (fmt == PixelFormat::NV12 && plane_count == 2) { int y_size = width * height; frame->planes[0] = {buffer->data(), width, y_size, 0}; frame->planes[1] = {buffer->data() + y_size, width, y_size / 2, y_size}; } else if (fmt == PixelFormat::YUV420 && plane_count == 3) { int y_size = width * height; int uv_stride = width / 2; int uv_h = height / 2; int u_size = uv_stride * uv_h; frame->planes[0] = {buffer->data(), width, y_size, 0}; frame->planes[1] = {buffer->data() + y_size, uv_stride, u_size, y_size}; frame->planes[2] = {buffer->data() + y_size + u_size, uv_stride, u_size, y_size + u_size}; } PushToDownstream(frame); if (stats_log_ && stats_interval_ > 0 && (frame_id_ % stats_interval_) == 0) { LogInfo("[input_rtsp] recv frame=" + std::to_string(frame->frame_id) + " queue=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->Size()) + " drops=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->DroppedCount())); } } void Cleanup(AVFormatContext* fmt, AVCodecContext* dec, AVPacket* pkt, AVFrame* frm) { if (dec) avcodec_free_context(&dec); if (fmt) avformat_close_input(&fmt); if (pkt) av_packet_free(&pkt); if (frm) av_frame_free(&frm); } #endif #if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP) static bool IsAnnexB(const uint8_t* data, size_t size) { if (!data || size < 4) return false; // 00 00 01 or 00 00 00 01 if (data[0] == 0 && data[1] == 0 && data[2] == 1) return true; if (data[0] == 0 && data[1] == 0 && data[2] == 0 && data[3] == 1) return true; return false; } static int GetAnnexBStartCodeSize(const uint8_t* data, size_t size) { if (!data || size < 4) return 0; if (data[0] == 0 && data[1] == 0 && data[2] == 1) return 3; if (data[0] == 0 && data[1] == 0 && data[2] == 0 && data[3] == 1) return 4; return 0; } struct NalSummary { bool has_idr = false; bool has_param = false; // SPS/PPS (or VPS for H265) }; static NalSummary SummarizeAnnexB(AVCodecID codec_id, const uint8_t* data, size_t size) { NalSummary s{}; if (!data || size < 5) return s; size_t i = 0; while (i + 4 <= size) { int sc = GetAnnexBStartCodeSize(data + i, size - i); if (sc == 0) { ++i; continue; } size_t nal = i + static_cast(sc); if (nal >= size) break; uint8_t b0 = data[nal]; if (codec_id == AV_CODEC_ID_H264) { int t = b0 & 0x1F; if (t == 5) s.has_idr = true; if (t == 7 || t == 8) s.has_param = true; } else if (codec_id == AV_CODEC_ID_HEVC) { int t = (b0 >> 1) & 0x3F; if (t == 19 || t == 20) s.has_idr = true; if (t == 32 || t == 33 || t == 34) s.has_param = true; } // Jump to next start code by searching forward. size_t j = nal + 1; while (j + 3 < size) { if ((data[j] == 0 && data[j + 1] == 0 && data[j + 2] == 1) || (j + 4 < size && data[j] == 0 && data[j + 1] == 0 && data[j + 2] == 0 && data[j + 3] == 1)) { break; } ++j; } i = j; } return s; } struct MppDecoderWrapper { ~MppDecoderWrapper() { Shutdown(); } bool Init(MppCodingType type) { MPP_RET ret = mpp_create(&ctx, &mpi); if (ret != MPP_OK) return false; ret = mpp_init(ctx, MPP_CTX_DEC, type); if (ret != MPP_OK) return false; MppDecCfg cfg = nullptr; mpp_dec_cfg_init(&cfg); ret = mpi->control(ctx, MPP_DEC_GET_CFG, cfg); if (ret != MPP_OK) return false; ret = mpp_dec_cfg_set_u32(cfg, "base:split_parse", 1); if (ret != MPP_OK) return false; ret = mpi->control(ctx, MPP_DEC_SET_CFG, cfg); mpp_dec_cfg_deinit(cfg); return ret == MPP_OK; } void Shutdown() { if (frame_group) { mpp_buffer_group_put(frame_group); frame_group = nullptr; } if (ctx) { mpp_destroy(ctx); ctx = nullptr; mpi = nullptr; } } bool Decode(const uint8_t* data, size_t size, bool eos, int64_t pts_us, const std::function& on_frame) { if (!ctx || !mpi || !data || size == 0) return false; // Use mpp_packet_copy_init so packet memory lifetime is independent of AVPacket. MppPacket src = nullptr; MPP_RET ret = mpp_packet_init(&src, const_cast(data), size); if (ret != MPP_OK || !src) { if (++fail_copy_init_ <= 3 || fail_copy_init_ % 200 == 0) { LogWarn("[input_rtsp] mpp_packet_init failed ret=" + std::to_string(ret) + " size=" + std::to_string(size)); } return false; } MppPacket packet = nullptr; ret = mpp_packet_copy_init(&packet, src); mpp_packet_deinit(&src); if (ret != MPP_OK || !packet) { if (++fail_copy_init_ <= 3 || fail_copy_init_ % 200 == 0) { LogWarn("[input_rtsp] mpp_packet_copy_init failed ret=" + std::to_string(ret) + " size=" + std::to_string(size)); } return false; } mpp_packet_set_pos(packet, mpp_packet_get_data(packet)); mpp_packet_set_length(packet, size); mpp_packet_set_pts(packet, pts_us); mpp_packet_clr_eos(packet); if (eos) mpp_packet_set_eos(packet); auto drain_frames = [&]() { for (int n = 0; n < 8; ++n) { MPP_RET gr = mpi->decode_get_frame(ctx, &frame); if (gr == MPP_ERR_TIMEOUT) return; if (gr != MPP_OK) { if (++fail_get_frame_ <= 3 || fail_get_frame_ % 200 == 0) { LogWarn("[input_rtsp] decode_get_frame ret=" + std::to_string(gr)); } return; } if (!frame) return; if (mpp_frame_get_info_change(frame)) { HandleInfoChange(); } else { if (on_frame) on_frame(frame); } mpp_frame_deinit(&frame); frame = nullptr; } }; bool pkt_done = false; MPP_RET last_put = MPP_NOK; int put_tries = 0; while (!pkt_done && put_tries++ < 200) { MPP_RET put_ret = mpi->decode_put_packet(ctx, packet); if (put_ret == MPP_OK) { pkt_done = true; break; } last_put = put_ret; // Buffer full is normal backpressure; try draining and retry. drain_frames(); usleep(2000); } // Always try to drain after put attempt(s). drain_frames(); mpp_packet_deinit(&packet); if (!pkt_done) { // Treat as packet drop to keep pipeline progressing; caller can decide to wait for next IDR. if (last_put != MPP_ERR_BUFFER_FULL) { if (++fail_put_packet_ <= 3 || fail_put_packet_ % 200 == 0) { LogWarn("[input_rtsp] decode_put_packet failed ret=" + std::to_string(last_put)); } } } return pkt_done; } bool SendExtraData(const uint8_t* data, size_t size) { if (!ctx || !mpi || !data || size == 0) return false; if (!IsAnnexB(data, size)) { LogWarn("[input_rtsp] extra_data is not AnnexB; mpp may ignore it"); } MppPacket src = nullptr; MPP_RET ret = mpp_packet_init(&src, const_cast(data), size); if (ret != MPP_OK || !src) return false; MppPacket pkt = nullptr; ret = mpp_packet_copy_init(&pkt, src); mpp_packet_deinit(&src); if (ret != MPP_OK || !pkt) return false; mpp_packet_set_pos(pkt, mpp_packet_get_data(pkt)); mpp_packet_set_length(pkt, size); mpp_packet_set_pts(pkt, 0); mpp_packet_set_extra_data(pkt); int tries = 0; while (tries++ < 200) { MPP_RET put_ret = mpi->decode_put_packet(ctx, pkt); if (put_ret == MPP_OK) { mpp_packet_deinit(&pkt); return true; } // Drain output to relieve buffer full. MppFrame tmp = nullptr; if (mpi->decode_get_frame(ctx, &tmp) == MPP_OK && tmp) { if (mpp_frame_get_info_change(tmp)) { frame = tmp; HandleInfoChange(); frame = nullptr; } mpp_frame_deinit(&tmp); } usleep(2000); } LogWarn("[input_rtsp] extra_data decode_put_packet timeout"); mpp_packet_deinit(&pkt); return false; } void HandleInfoChange() { if (!ctx || !mpi || !frame) return; MppBufferGroup group = frame_group; size_t buf_size = mpp_frame_get_buf_size(frame); if (!group) { auto try_group = [&](MppBufferType t) -> bool { return mpp_buffer_group_get(&group, t, MPP_BUFFER_INTERNAL, MODULE_TAG, __FUNCTION__) == MPP_OK; }; // Prefer DMA32 buffers to keep physical addresses <4GB; some RGA drivers can't map >4GB. if (!try_group(static_cast(MPP_BUFFER_TYPE_DMA_HEAP | MPP_BUFFER_FLAGS_CACHABLE | MPP_BUFFER_FLAGS_DMA32)) && !try_group(static_cast(MPP_BUFFER_TYPE_DMA_HEAP | MPP_BUFFER_FLAGS_DMA32)) && !try_group(static_cast(MPP_BUFFER_TYPE_DMA_HEAP | MPP_BUFFER_FLAGS_CACHABLE)) && !try_group(static_cast(MPP_BUFFER_TYPE_DMA_HEAP)) && !try_group(static_cast(MPP_BUFFER_TYPE_DRM | MPP_BUFFER_FLAGS_CACHABLE)) && !try_group(static_cast(MPP_BUFFER_TYPE_DRM)) && !try_group(static_cast(MPP_BUFFER_TYPE_ION | MPP_BUFFER_FLAGS_CACHABLE)) && !try_group(static_cast(MPP_BUFFER_TYPE_ION))) { LogError("[input_rtsp] mpp info_change: failed to alloc frame buffer group"); return; } frame_group = group; } else { mpp_buffer_group_clear(group); } mpp_buffer_group_limit_config(group, buf_size, 24); mpi->control(ctx, MPP_DEC_SET_EXT_BUF_GROUP, group); mpi->control(ctx, MPP_DEC_SET_INFO_CHANGE_READY, nullptr); } MppCtx ctx = nullptr; MppApi* mpi = nullptr; MppBufferGroup frame_group = nullptr; MppFrame frame = nullptr; uint64_t fail_copy_init_ = 0; uint64_t fail_put_packet_ = 0; uint64_t fail_get_frame_ = 0; }; void LoopFfmpegMpp() { #if defined(RK3588_ENABLE_FFMPEG) InstallFfmpegLogFilterOnce(); #endif ApplyAffinity(); using namespace std::chrono; int backoff = std::max(1, reconnect_sec_); const int backoff_max = std::max(backoff, reconnect_backoff_max_sec_); while (running_.load()) { AVFormatContext* fmt_ctx = nullptr; AVPacket* pkt = av_packet_alloc(); AVPacket* filt_pkt = av_packet_alloc(); int video_stream = -1; AVRational time_base{1, 1000}; AVBSFContext* bsf_ctx = nullptr; AVDictionary* opts = nullptr; if (ffmpeg_force_tcp_) av_dict_set(&opts, "rtsp_transport", "tcp", 0); av_dict_set(&opts, "analyzeduration", "0", 0); av_dict_set(&opts, "probesize", "32768", 0); av_dict_set(&opts, "fflags", "nobuffer", 0); av_dict_set(&opts, "flags", "low_delay", 0); if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, &opts) < 0) { LogError("[input_rtsp] avformat_open_input failed: " + url_); av_dict_free(&opts); if (bsf_ctx) av_bsf_free(&bsf_ctx); if (filt_pkt) av_packet_free(&filt_pkt); Cleanup(fmt_ctx, nullptr, pkt, nullptr); if (fallback_to_stub_on_fail_) { LoopStub(); return; } std::this_thread::sleep_for(seconds(backoff)); backoff = std::min(backoff_max, backoff * 2); continue; } av_dict_free(&opts); if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) { // For RTSP, codec parameters are often available from SDP already; keep going. LogWarn("[input_rtsp] avformat_find_stream_info failed (continuing)"); } for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) { if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { video_stream = static_cast(i); time_base = fmt_ctx->streams[i]->time_base; break; } } if (video_stream < 0) { LogError("[input_rtsp] no video stream"); if (bsf_ctx) av_bsf_free(&bsf_ctx); if (filt_pkt) av_packet_free(&filt_pkt); Cleanup(fmt_ctx, nullptr, pkt, nullptr); std::this_thread::sleep_for(seconds(backoff)); backoff = std::min(backoff_max, backoff * 2); continue; } MppCodingType coding = MPP_VIDEO_CodingAVC; auto codec_id = fmt_ctx->streams[video_stream]->codecpar->codec_id; if (codec_id == AV_CODEC_ID_H264) coding = MPP_VIDEO_CodingAVC; else if (codec_id == AV_CODEC_ID_HEVC) coding = MPP_VIDEO_CodingHEVC; else { LogError("[input_rtsp] unsupported codec for mpp"); if (bsf_ctx) av_bsf_free(&bsf_ctx); if (filt_pkt) av_packet_free(&filt_pkt); Cleanup(fmt_ctx, nullptr, pkt, nullptr); std::this_thread::sleep_for(seconds(backoff)); backoff = std::min(backoff_max, backoff * 2); continue; } MppDecoderWrapper dec; if (!dec.Init(coding)) { LogError("[input_rtsp] mpp init failed"); if (bsf_ctx) av_bsf_free(&bsf_ctx); if (filt_pkt) av_packet_free(&filt_pkt); Cleanup(fmt_ctx, nullptr, pkt, nullptr); std::this_thread::sleep_for(seconds(backoff)); backoff = std::min(backoff_max, backoff * 2); continue; } auto ensure_bsf = [&]() -> bool { if (bsf_ctx) return true; const char* bsf_name = nullptr; if (codec_id == AV_CODEC_ID_H264) bsf_name = "h264_mp4toannexb"; else if (codec_id == AV_CODEC_ID_HEVC) bsf_name = "hevc_mp4toannexb"; if (!bsf_name) return false; const AVBitStreamFilter* bsf = av_bsf_get_by_name(bsf_name); if (!bsf) return false; if (av_bsf_alloc(bsf, &bsf_ctx) < 0) { bsf_ctx = nullptr; return false; } if (avcodec_parameters_copy(bsf_ctx->par_in, fmt_ctx->streams[video_stream]->codecpar) < 0) { av_bsf_free(&bsf_ctx); return false; } bsf_ctx->time_base_in = fmt_ctx->streams[video_stream]->time_base; if (av_bsf_init(bsf_ctx) < 0) { av_bsf_free(&bsf_ctx); return false; } return true; }; // Send codec extra data (SPS/PPS or VPS/SPS/PPS). Many RTSP streams rely on SDP extradata. // Prefer bsf-converted extradata when available. bool bsf_ok = ensure_bsf(); const AVCodecParameters* par = fmt_ctx->streams[video_stream]->codecpar; if (par && par->extradata && par->extradata_size > 0) { const uint8_t* extra = par->extradata; size_t extra_size = static_cast(par->extradata_size); if (bsf_ok && bsf_ctx && bsf_ctx->par_out && bsf_ctx->par_out->extradata && bsf_ctx->par_out->extradata_size > 0) { extra = bsf_ctx->par_out->extradata; extra_size = static_cast(bsf_ctx->par_out->extradata_size); } if (extra && extra_size > 0) { if (stats_log_) { LogInfo("[input_rtsp] send extra_data size=" + std::to_string(extra_size) + " annexb=" + std::to_string(IsAnnexB(extra, extra_size) ? 1 : 0)); } if (!dec.SendExtraData(extra, extra_size)) { LogWarn("[input_rtsp] send extra_data failed"); } } } backoff = std::max(1, reconnect_sec_); int read_fail = 0; uint64_t pkt_count = 0; uint64_t decode_fail = 0; bool need_idr = true; uint64_t dropped_until_idr = 0; bool announced_wait_idr = false; const int64_t step_us = (fps_ > 0) ? (1000000LL / fps_) : 40000LL; int64_t last_pts_us = 0; bool have_last_pts = false; auto handle_annexb = [&](const uint8_t* data, size_t size, bool key_flag, int64_t pts_us) { if (!data || size == 0) return; NalSummary ns = SummarizeAnnexB(codec_id, data, size); const bool is_idr = key_flag || ns.has_idr; if (need_idr && !is_idr) { ++dropped_until_idr; if (stats_log_ && (!announced_wait_idr || dropped_until_idr % 300 == 0)) { LogInfo("[input_rtsp] waiting for IDR/keyframe; dropped=" + std::to_string(dropped_until_idr)); announced_wait_idr = true; } return; } const bool ok = dec.Decode(data, size, false, pts_us, [&](MppFrame frm) { PushFrameFromMpp(frm); }); if (is_idr && ok) { need_idr = false; decode_fail = 0; } else if (!ok) { // If we are in steady state and decoding starts failing, re-sync on next IDR. if (!need_idr && ++decode_fail >= 3) { need_idr = true; announced_wait_idr = false; LogWarn("[input_rtsp] decoder desync/backpressure; re-sync on next IDR"); } } }; while (running_.load()) { if (av_read_frame(fmt_ctx, pkt) < 0) { if (++read_fail >= 50) break; std::this_thread::sleep_for(milliseconds(10)); continue; } read_fail = 0; if (pkt->stream_index != video_stream) { av_packet_unref(pkt); continue; } ++pkt_count; int64_t raw_pts = pkt->pts; int64_t raw_dts = pkt->dts; if (raw_pts != AV_NOPTS_VALUE && raw_pts < 0) raw_pts = AV_NOPTS_VALUE; if (raw_dts != AV_NOPTS_VALUE && raw_dts < 0) raw_dts = AV_NOPTS_VALUE; int64_t raw_ts = (raw_pts != AV_NOPTS_VALUE) ? raw_pts : raw_dts; int64_t pts_us = 0; if (raw_ts == AV_NOPTS_VALUE) { pts_us = have_last_pts ? (last_pts_us + step_us) : 0; } else { pts_us = av_rescale_q(raw_ts, time_base, {1, 1000000}); if (pts_us < 0) pts_us = 0; if (have_last_pts && pts_us <= last_pts_us) pts_us = last_pts_us + step_us; } last_pts_us = pts_us; have_last_pts = true; if (stats_log_ && (pkt_count <= 3 || pkt_count % 300 == 0)) { LogInfo("[input_rtsp] recv pkt#" + std::to_string(pkt_count) + " size=" + std::to_string(pkt->size) + " pts=" + (raw_pts == AV_NOPTS_VALUE ? std::string("NOPTS") : std::to_string(raw_pts)) + " dts=" + (raw_dts == AV_NOPTS_VALUE ? std::string("NOPTS") : std::to_string(raw_dts)) + " pts_us=" + std::to_string(pts_us) + " key=" + std::to_string((pkt->flags & AV_PKT_FLAG_KEY) ? 1 : 0)); } if (IsAnnexB(pkt->data, static_cast(pkt->size))) { if (stats_log_ && pkt_count <= 10) { int sc = GetAnnexBStartCodeSize(pkt->data, static_cast(pkt->size)); if (sc > 0 && static_cast(sc) < static_cast(pkt->size)) { uint8_t b0 = pkt->data[sc]; int h264_type = b0 & 0x1F; int h265_type = (b0 >> 1) & 0x3F; LogInfo("[input_rtsp] pkt#" + std::to_string(pkt_count) + " nal(h264)=" + std::to_string(h264_type) + " nal(h265)=" + std::to_string(h265_type) + " first_byte=" + std::to_string(static_cast(b0))); } } handle_annexb(pkt->data, static_cast(pkt->size), (pkt->flags & AV_PKT_FLAG_KEY) != 0, pts_us); } else if (ensure_bsf()) { if (av_bsf_send_packet(bsf_ctx, pkt) == 0) { while (av_bsf_receive_packet(bsf_ctx, filt_pkt) == 0) { handle_annexb(filt_pkt->data, static_cast(filt_pkt->size), (filt_pkt->flags & AV_PKT_FLAG_KEY) != 0, pts_us); av_packet_unref(filt_pkt); } } } else { LogWarn("[input_rtsp] non-AnnexB bitstream but bsf init failed; mpp decode may produce no frames"); } av_packet_unref(pkt); } if (bsf_ctx) av_bsf_free(&bsf_ctx); if (filt_pkt) av_packet_free(&filt_pkt); Cleanup(fmt_ctx, nullptr, pkt, nullptr); if (!running_.load()) break; std::this_thread::sleep_for(seconds(std::max(1, reconnect_sec_))); } } void PushFrameFromMpp(MppFrame frm) { MppBuffer buf = mpp_frame_get_buffer(frm); if (!buf) return; mpp_buffer_inc_ref(buf); auto owner = std::shared_ptr(buf, [](void* p) { mpp_buffer_put(reinterpret_cast(p)); }); int width = mpp_frame_get_width(frm); int height = mpp_frame_get_height(frm); int hor_stride = mpp_frame_get_hor_stride(frm); int ver_stride = mpp_frame_get_ver_stride(frm); MppFrameFormat mpp_fmt = mpp_frame_get_fmt(frm); PixelFormat fmt = PixelFormat::UNKNOWN; int plane_count = 0; if (mpp_fmt == MPP_FMT_YUV420SP #ifdef MPP_FMT_YUV420SP_10BIT || mpp_fmt == MPP_FMT_YUV420SP_10BIT #endif ) { fmt = PixelFormat::NV12; plane_count = 2; } else if (mpp_fmt == MPP_FMT_YUV420P #ifdef MPP_FMT_YUV420P10 || mpp_fmt == MPP_FMT_YUV420P10 #endif ) { fmt = PixelFormat::YUV420; plane_count = 3; } if (plane_count == 0) { return; } auto frame = std::make_shared(); frame->width = width; frame->height = height; frame->format = fmt; frame->stride = hor_stride; frame->plane_count = plane_count; frame->dma_fd = mpp_buffer_get_fd(buf); frame->data = static_cast(mpp_buffer_get_ptr(buf)); frame->data_size = mpp_buffer_get_size(buf); frame->data_owner = owner; frame->frame_id = ++frame_id_; frame->pts = mpp_frame_get_pts(frm); if (fmt == PixelFormat::NV12 && plane_count == 2) { int y_size = hor_stride * ver_stride; frame->planes[0] = {frame->data, hor_stride, y_size, 0}; frame->planes[1] = {frame->data + y_size, hor_stride, y_size / 2, y_size}; } else if (fmt == PixelFormat::YUV420 && plane_count == 3) { int y_size = hor_stride * ver_stride; int uv_stride = hor_stride / 2; int uv_h = ver_stride / 2; int u_size = uv_stride * uv_h; frame->planes[0] = {frame->data, hor_stride, y_size, 0}; frame->planes[1] = {frame->data + y_size, uv_stride, u_size, y_size}; frame->planes[2] = {frame->data + y_size + u_size, uv_stride, u_size, y_size + u_size}; } PushToDownstream(frame); if (stats_log_ && (frame_id_ <= 3 || (stats_interval_ > 0 && (frame_id_ % stats_interval_) == 0))) { LogInfo("[input_rtsp] mpp frame=" + std::to_string(frame->frame_id) + " queue=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->Size()) + " drops=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->DroppedCount())); } } #endif std::string id_; std::string url_; int fps_ = 25; int width_ = 1920; int height_ = 1080; std::atomic running_{false}; std::vector>> out_queues_; std::thread worker_; uint64_t frame_id_ = 0; bool use_ffmpeg_ = false; bool use_mpp_ = true; bool ffmpeg_force_tcp_ = true; int reconnect_sec_ = 5; int reconnect_backoff_max_sec_ = 30; bool fallback_to_stub_on_fail_ = false; std::vector cpu_affinity_; bool stats_log_ = false; uint64_t stats_interval_ = 100; }; REGISTER_NODE(InputRtspNode, "input_rtsp"); } // namespace rk3588