OrangePi3588Media/plugins/input_rtsp/input_rtsp_node.cpp

723 lines
30 KiB
C++

#include <atomic>
#include <algorithm>
#include <chrono>
#include <functional>
#include <cstdio>
#include <cstring>
#include <cctype>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "node.h"
#include "hw/hw_factory.h"
#include "utils/thread_affinity.h"
#include "utils/logger.h"
#if defined(RK3588_ENABLE_MPP)
extern "C" {
#include <rockchip/mpp_buffer.h>
#include <rockchip/mpp_frame.h>
#include <rockchip/mpp_packet.h>
#include <rockchip/rk_mpi.h>
}
#include <unistd.h>
#endif
#ifdef RK3588_ENABLE_FFMPEG
extern "C" {
#include <libavcodec/bsf.h>
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/imgutils.h>
#include <libavutil/log.h>
#include <libavutil/opt.h>
}
#endif
namespace rk3588 {
class InputRtspNode : public INode {
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "input_rtsp"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "input_rtsp");
url_ = config.ValueOr<std::string>("url", "");
fps_ = config.ValueOr<int>("fps", 25);
width_ = config.ValueOr<int>("width", 1920);
height_ = config.ValueOr<int>("height", 1080);
platform_ = config.ValueOr<std::string>("platform", "");
hw_platform_ = config.ValueOr<std::string>("hw_platform", "");
use_ffmpeg_ = config.ValueOr<bool>("use_ffmpeg", false);
use_mpp_ = config.ValueOr<bool>("use_mpp", true);
ffmpeg_force_tcp_ = config.ValueOr<bool>("force_tcp", true);
reconnect_sec_ = config.ValueOr<int>("reconnect_sec", 5);
reconnect_backoff_max_sec_ = config.ValueOr<int>("reconnect_backoff_max_sec", 30);
fallback_to_stub_on_fail_ = config.ValueOr<bool>("fallback_to_stub_on_fail", false);
if (const SimpleJson* dbg = config.Find("debug"); dbg && dbg->IsObject()) {
stats_log_ = dbg->ValueOr<bool>("stats", stats_log_);
stats_interval_ = std::max<uint64_t>(
1, static_cast<uint64_t>(dbg->ValueOr<int>("stats_interval", static_cast<int>(stats_interval_))));
}
cpu_affinity_ = ParseCpuAffinity(config);
if (ctx.output_queues.empty()) {
LogError("[input_rtsp] no downstream queue configured for node " + id_);
return false;
}
out_queues_ = ctx.output_queues;
return true;
}
bool Start() override {
if (out_queues_.empty()) return false;
running_.store(true);
#if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP)
if (use_mpp_) {
worker_ = std::thread(&InputRtspNode::LoopFfmpegMpp, this);
} else if (use_ffmpeg_) {
worker_ = std::thread(&InputRtspNode::LoopFfmpegCpu, this);
} else {
worker_ = std::thread(&InputRtspNode::LoopStub, this);
}
#elif defined(RK3588_ENABLE_FFMPEG)
if (use_ffmpeg_) {
worker_ = std::thread(&InputRtspNode::LoopFfmpegCpu, this);
} else {
worker_ = std::thread(&InputRtspNode::LoopStub, this);
}
#else
if (use_ffmpeg_ || use_mpp_) {
LogError("[input_rtsp] requested ffmpeg/mpp but not enabled at build time");
}
worker_ = std::thread(&InputRtspNode::LoopStub, this);
#endif
std::string mode;
#if defined(RK3588_ENABLE_MPP)
if (use_mpp_) mode = " (ffmpeg demux + mpp decode)";
else if (use_ffmpeg_) mode = std::string(" (ffmpeg cpu decode") + (ffmpeg_force_tcp_ ? ", tcp)" : ", udp)");
else mode = " (stub)";
#else
if (use_ffmpeg_) mode = std::string(" (ffmpeg cpu decode") + (ffmpeg_force_tcp_ ? ", tcp)" : ", udp)");
else mode = " (stub)";
#endif
LogInfo("[input_rtsp] start id=" + id_ + " url=" + url_ + " fps=" + std::to_string(fps_) + mode);
return true;
}
void Stop() override {
running_.store(false);
for (auto& q : out_queues_) q->Stop();
if (worker_.joinable()) worker_.join();
}
void Drain() override {
running_.store(false);
}
private:
#if defined(RK3588_ENABLE_FFMPEG)
static SimpleJson BuildDecoderConfig(const std::string& backend,
AVCodecID codec_id,
const uint8_t* extradata,
size_t extradata_size,
const std::string& platform,
const std::string& hw_platform) {
SimpleJson::Object obj;
obj["backend"] = SimpleJson(backend);
obj["codec_id"] = SimpleJson(static_cast<double>(codec_id));
if (codec_id == AV_CODEC_ID_H264) {
obj["codec"] = SimpleJson(std::string("h264"));
} else if (codec_id == AV_CODEC_ID_HEVC) {
obj["codec"] = SimpleJson(std::string("h265"));
}
if (!platform.empty()) {
obj["platform"] = SimpleJson(platform);
}
if (!hw_platform.empty()) {
obj["hw_platform"] = SimpleJson(hw_platform);
}
if (extradata && extradata_size > 0) {
SimpleJson::Array arr;
arr.reserve(extradata_size);
for (size_t i = 0; i < extradata_size; ++i) {
arr.emplace_back(static_cast<double>(extradata[i]));
}
obj["extradata"] = SimpleJson(std::move(arr));
}
return SimpleJson(std::move(obj));
}
static void DrainDecoder(const std::shared_ptr<IDecoder>& decoder,
const std::function<void(const FramePtr&)>& on_frame) {
if (!decoder) return;
while (true) {
auto out = decoder->Receive();
if (!out.Ok()) {
if (out.ErrMessage() != "no_frame") {
LogWarn("[input_rtsp] decoder receive: " + out.ErrMessage());
}
break;
}
if (on_frame) on_frame(out.Value());
}
}
static bool IContainsNoCase(const char* haystack, const char* needle) {
if (!haystack || !needle) return false;
const size_t nlen = std::strlen(needle);
if (nlen == 0) return true;
for (const char* p = haystack; *p; ++p) {
size_t i = 0;
while (i < nlen && p[i] &&
std::tolower(static_cast<unsigned char>(p[i])) ==
std::tolower(static_cast<unsigned char>(needle[i]))) {
++i;
}
if (i == nlen) return true;
}
return false;
}
static void InstallFfmpegLogFilterOnce() {
static std::once_flag once;
std::call_once(once, []() {
// Filter known benign RTSP probe noise while keeping real errors.
av_log_set_callback([](void* avcl, int level, const char* fmt, va_list vl) {
(void)level;
va_list vl_format;
va_copy(vl_format, vl);
char buf[1024];
vsnprintf(buf, sizeof(buf), fmt, vl_format);
va_end(vl_format);
buf[sizeof(buf) - 1] = '\0';
const char* item = avcl ? av_default_item_name(avcl) : "";
// Note: item name can be "rtsp" or "RTSP" depending on build.
if (InputRtspNode::IContainsNoCase(item, "rtsp")) {
// Seen during startup/probing on some cameras; not fatal if we later decode fine.
if (InputRtspNode::IContainsNoCase(buf, "decoding for stream") && InputRtspNode::IContainsNoCase(buf, "failed")) return;
if (InputRtspNode::IContainsNoCase(buf, "not enough frames to estimate rate")) return;
}
va_list vl_default;
va_copy(vl_default, vl);
av_log_default_callback(avcl, level, fmt, vl_default);
va_end(vl_default);
});
// Do not change global log level here; keep defaults and only filter specific noise.
});
}
#endif
void ApplyAffinity() {
if (cpu_affinity_.empty()) return;
std::string aerr;
if (!SetCurrentThreadAffinity(cpu_affinity_, aerr)) {
LogWarn("[input_rtsp] SetCurrentThreadAffinity failed: " + aerr);
}
}
void PushToDownstream(FramePtr frame) {
for (auto& q : out_queues_) {
q->Push(frame);
}
}
void HandleDecodedFrame(const FramePtr& frame) {
if (!frame) return;
frame->frame_id = ++frame_id_;
PushToDownstream(frame);
if (stats_log_ && stats_interval_ > 0 && (frame_id_ % stats_interval_) == 0) {
LogInfo("[input_rtsp] recv frame=" + std::to_string(frame->frame_id) +
" queue=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->Size()) +
" drops=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->DroppedCount()));
}
}
void LoopStub() {
ApplyAffinity();
using namespace std::chrono;
auto frame_interval = fps_ > 0 ? milliseconds(1000 / fps_) : milliseconds(40);
while (running_.load()) {
auto frame = std::make_shared<Frame>();
frame->width = width_;
frame->height = height_;
frame->format = PixelFormat::NV12;
frame->plane_count = 2;
frame->planes[0] = {nullptr, width_, width_ * height_, 0};
frame->planes[1] = {nullptr, width_, width_ * height_ / 2, width_ * height_};
frame->frame_id = ++frame_id_;
frame->pts = duration_cast<microseconds>(steady_clock::now().time_since_epoch()).count();
PushToDownstream(frame);
if (stats_log_ && stats_interval_ > 0 && (frame_id_ % stats_interval_) == 0) {
LogInfo("[input_rtsp] generated frame=" + std::to_string(frame_id_) +
" queue=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->Size()) +
" drops=" + std::to_string(out_queues_.empty() ? 0 : out_queues_[0]->DroppedCount()));
}
std::this_thread::sleep_for(frame_interval);
}
}
#if defined(RK3588_ENABLE_FFMPEG)
void LoopFfmpegCpu() {
#if defined(RK3588_ENABLE_FFMPEG)
InstallFfmpegLogFilterOnce();
#endif
ApplyAffinity();
using namespace std::chrono;
int backoff = std::max(1, reconnect_sec_);
const int backoff_max = std::max(backoff, reconnect_backoff_max_sec_);
while (running_.load()) {
AVFormatContext* fmt_ctx = nullptr;
AVPacket* pkt = av_packet_alloc();
int video_stream = -1;
AVRational time_base{1, 1000};
AVDictionary* opts = nullptr;
if (ffmpeg_force_tcp_) av_dict_set(&opts, "rtsp_transport", "tcp", 0);
av_dict_set(&opts, "analyzeduration", "0", 0);
av_dict_set(&opts, "probesize", "32768", 0);
av_dict_set(&opts, "fflags", "nobuffer", 0);
av_dict_set(&opts, "flags", "low_delay", 0);
if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, &opts) < 0) {
LogError("[input_rtsp] avformat_open_input failed: " + url_);
av_dict_free(&opts);
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
if (fallback_to_stub_on_fail_) {
LoopStub();
return;
}
std::this_thread::sleep_for(seconds(backoff));
backoff = std::min(backoff_max, backoff * 2);
continue;
}
av_dict_free(&opts);
if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) {
// For RTSP, codec parameters are often available from SDP already; keep going.
LogWarn("[input_rtsp] avformat_find_stream_info failed (continuing)");
}
for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) {
if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
video_stream = static_cast<int>(i);
time_base = fmt_ctx->streams[i]->time_base;
break;
}
}
if (video_stream < 0) {
LogError("[input_rtsp] no video stream");
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
std::this_thread::sleep_for(seconds(backoff));
backoff = std::min(backoff_max, backoff * 2);
continue;
}
const AVCodec* codec = avcodec_find_decoder(fmt_ctx->streams[video_stream]->codecpar->codec_id);
if (!codec) {
LogError("[input_rtsp] decoder not found");
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
std::this_thread::sleep_for(seconds(backoff));
backoff = std::min(backoff_max, backoff * 2);
continue;
}
const auto* par = fmt_ctx->streams[video_stream]->codecpar;
SimpleJson dec_cfg = BuildDecoderConfig("ffmpeg", par->codec_id,
par->extradata,
static_cast<size_t>(std::max(0, par->extradata_size)),
platform_, hw_platform_);
auto decoder = HwFactory::CreateDecoder(dec_cfg);
if (!decoder || decoder->Open(dec_cfg).Failed()) {
LogError("[input_rtsp] ffmpeg decoder open failed");
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
std::this_thread::sleep_for(seconds(backoff));
backoff = std::min(backoff_max, backoff * 2);
continue;
}
backoff = std::max(1, reconnect_sec_); // reset after successful open
int read_fail = 0;
while (running_.load()) {
if (av_read_frame(fmt_ctx, pkt) < 0) {
if (++read_fail >= 50) {
break;
}
std::this_thread::sleep_for(milliseconds(10));
continue;
}
read_fail = 0;
if (pkt->stream_index != video_stream) {
av_packet_unref(pkt);
continue;
}
int64_t pts_us = 0;
if (pkt->pts != AV_NOPTS_VALUE) {
pts_us = av_rescale_q(pkt->pts, time_base, {1, 1000000});
}
DecodePacket dp;
dp.data = pkt->data;
dp.size = static_cast<size_t>(pkt->size);
dp.pts = static_cast<uint64_t>(std::max<int64_t>(0, pts_us));
dp.keyframe = (pkt->flags & AV_PKT_FLAG_KEY) != 0;
if (decoder->Send(dp).Ok()) {
DrainDecoder(decoder, [&](const FramePtr& frame) { HandleDecodedFrame(frame); });
}
av_packet_unref(pkt);
}
decoder->Close();
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
if (!running_.load()) break;
std::this_thread::sleep_for(seconds(std::max(1, reconnect_sec_)));
}
}
void Cleanup(AVFormatContext* fmt, AVCodecContext* dec, AVPacket* pkt, AVFrame* frm) {
if (dec) avcodec_free_context(&dec);
if (fmt) avformat_close_input(&fmt);
if (pkt) av_packet_free(&pkt);
if (frm) av_frame_free(&frm);
}
#endif
#if defined(RK3588_ENABLE_FFMPEG) && defined(RK3588_ENABLE_MPP)
static bool IsAnnexB(const uint8_t* data, size_t size) {
if (!data || size < 4) return false;
// 00 00 01 or 00 00 00 01
if (data[0] == 0 && data[1] == 0 && data[2] == 1) return true;
if (data[0] == 0 && data[1] == 0 && data[2] == 0 && data[3] == 1) return true;
return false;
}
static int GetAnnexBStartCodeSize(const uint8_t* data, size_t size) {
if (!data || size < 4) return 0;
if (data[0] == 0 && data[1] == 0 && data[2] == 1) return 3;
if (data[0] == 0 && data[1] == 0 && data[2] == 0 && data[3] == 1) return 4;
return 0;
}
struct NalSummary {
bool has_idr = false;
bool has_param = false; // SPS/PPS (or VPS for H265)
};
static NalSummary SummarizeAnnexB(AVCodecID codec_id, const uint8_t* data, size_t size) {
NalSummary s{};
if (!data || size < 5) return s;
size_t i = 0;
while (i + 4 <= size) {
int sc = GetAnnexBStartCodeSize(data + i, size - i);
if (sc == 0) {
++i;
continue;
}
size_t nal = i + static_cast<size_t>(sc);
if (nal >= size) break;
uint8_t b0 = data[nal];
if (codec_id == AV_CODEC_ID_H264) {
int t = b0 & 0x1F;
if (t == 5) s.has_idr = true;
if (t == 7 || t == 8) s.has_param = true;
} else if (codec_id == AV_CODEC_ID_HEVC) {
int t = (b0 >> 1) & 0x3F;
if (t == 19 || t == 20) s.has_idr = true;
if (t == 32 || t == 33 || t == 34) s.has_param = true;
}
// Jump to next start code by searching forward.
size_t j = nal + 1;
while (j + 3 < size) {
if ((data[j] == 0 && data[j + 1] == 0 && data[j + 2] == 1) ||
(j + 4 < size && data[j] == 0 && data[j + 1] == 0 && data[j + 2] == 0 && data[j + 3] == 1)) {
break;
}
++j;
}
i = j;
}
return s;
}
void LoopFfmpegMpp() {
#if defined(RK3588_ENABLE_FFMPEG)
InstallFfmpegLogFilterOnce();
#endif
ApplyAffinity();
using namespace std::chrono;
int backoff = std::max(1, reconnect_sec_);
const int backoff_max = std::max(backoff, reconnect_backoff_max_sec_);
while (running_.load()) {
AVFormatContext* fmt_ctx = nullptr;
AVPacket* pkt = av_packet_alloc();
AVPacket* filt_pkt = av_packet_alloc();
int video_stream = -1;
AVRational time_base{1, 1000};
AVBSFContext* bsf_ctx = nullptr;
AVDictionary* opts = nullptr;
if (ffmpeg_force_tcp_) av_dict_set(&opts, "rtsp_transport", "tcp", 0);
av_dict_set(&opts, "analyzeduration", "0", 0);
av_dict_set(&opts, "probesize", "32768", 0);
av_dict_set(&opts, "fflags", "nobuffer", 0);
av_dict_set(&opts, "flags", "low_delay", 0);
if (avformat_open_input(&fmt_ctx, url_.c_str(), nullptr, &opts) < 0) {
LogError("[input_rtsp] avformat_open_input failed: " + url_);
av_dict_free(&opts);
if (bsf_ctx) av_bsf_free(&bsf_ctx);
if (filt_pkt) av_packet_free(&filt_pkt);
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
if (fallback_to_stub_on_fail_) {
LoopStub();
return;
}
std::this_thread::sleep_for(seconds(backoff));
backoff = std::min(backoff_max, backoff * 2);
continue;
}
av_dict_free(&opts);
if (avformat_find_stream_info(fmt_ctx, nullptr) < 0) {
// For RTSP, codec parameters are often available from SDP already; keep going.
LogWarn("[input_rtsp] avformat_find_stream_info failed (continuing)");
}
for (unsigned i = 0; i < fmt_ctx->nb_streams; ++i) {
if (fmt_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
video_stream = static_cast<int>(i);
time_base = fmt_ctx->streams[i]->time_base;
break;
}
}
if (video_stream < 0) {
LogError("[input_rtsp] no video stream");
if (bsf_ctx) av_bsf_free(&bsf_ctx);
if (filt_pkt) av_packet_free(&filt_pkt);
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
std::this_thread::sleep_for(seconds(backoff));
backoff = std::min(backoff_max, backoff * 2);
continue;
}
auto codec_id = fmt_ctx->streams[video_stream]->codecpar->codec_id;
if (codec_id != AV_CODEC_ID_H264 && codec_id != AV_CODEC_ID_HEVC) {
LogError("[input_rtsp] unsupported codec for mpp");
if (bsf_ctx) av_bsf_free(&bsf_ctx);
if (filt_pkt) av_packet_free(&filt_pkt);
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
std::this_thread::sleep_for(seconds(backoff));
backoff = std::min(backoff_max, backoff * 2);
continue;
}
auto ensure_bsf = [&]() -> bool {
if (bsf_ctx) return true;
const char* bsf_name = nullptr;
if (codec_id == AV_CODEC_ID_H264) bsf_name = "h264_mp4toannexb";
else if (codec_id == AV_CODEC_ID_HEVC) bsf_name = "hevc_mp4toannexb";
if (!bsf_name) return false;
const AVBitStreamFilter* bsf = av_bsf_get_by_name(bsf_name);
if (!bsf) return false;
if (av_bsf_alloc(bsf, &bsf_ctx) < 0) {
bsf_ctx = nullptr;
return false;
}
if (avcodec_parameters_copy(bsf_ctx->par_in, fmt_ctx->streams[video_stream]->codecpar) < 0) {
av_bsf_free(&bsf_ctx);
return false;
}
bsf_ctx->time_base_in = fmt_ctx->streams[video_stream]->time_base;
if (av_bsf_init(bsf_ctx) < 0) {
av_bsf_free(&bsf_ctx);
return false;
}
return true;
};
bool bsf_ok = ensure_bsf();
const AVCodecParameters* par = fmt_ctx->streams[video_stream]->codecpar;
const uint8_t* extra = par ? par->extradata : nullptr;
size_t extra_size = par && par->extradata_size > 0
? static_cast<size_t>(par->extradata_size)
: 0;
if (bsf_ok && bsf_ctx && bsf_ctx->par_out && bsf_ctx->par_out->extradata && bsf_ctx->par_out->extradata_size > 0) {
extra = bsf_ctx->par_out->extradata;
extra_size = static_cast<size_t>(bsf_ctx->par_out->extradata_size);
}
SimpleJson dec_cfg = BuildDecoderConfig("mpp", codec_id, extra, extra_size,
platform_, hw_platform_);
auto decoder = HwFactory::CreateDecoder(dec_cfg);
if (!decoder || decoder->Open(dec_cfg).Failed()) {
LogError("[input_rtsp] mpp decoder open failed");
if (bsf_ctx) av_bsf_free(&bsf_ctx);
if (filt_pkt) av_packet_free(&filt_pkt);
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
std::this_thread::sleep_for(seconds(backoff));
backoff = std::min(backoff_max, backoff * 2);
continue;
}
backoff = std::max(1, reconnect_sec_);
int read_fail = 0;
uint64_t pkt_count = 0;
uint64_t decode_fail = 0;
bool need_idr = true;
uint64_t dropped_until_idr = 0;
bool announced_wait_idr = false;
const int64_t step_us = (fps_ > 0) ? (1000000LL / fps_) : 40000LL;
int64_t last_pts_us = 0;
bool have_last_pts = false;
auto handle_annexb = [&](const uint8_t* data, size_t size, bool key_flag, int64_t pts_us) {
if (!data || size == 0) return;
NalSummary ns = SummarizeAnnexB(codec_id, data, size);
const bool is_idr = key_flag || ns.has_idr;
if (need_idr && !is_idr) {
++dropped_until_idr;
if (stats_log_ && (!announced_wait_idr || dropped_until_idr % 300 == 0)) {
LogInfo("[input_rtsp] waiting for IDR/keyframe; dropped=" + std::to_string(dropped_until_idr));
announced_wait_idr = true;
}
return;
}
DecodePacket dp;
dp.data = data;
dp.size = size;
dp.pts = static_cast<uint64_t>(pts_us);
dp.keyframe = is_idr;
const bool ok = decoder->Send(dp).IsOk();
if (ok) {
DrainDecoder(decoder, [&](const FramePtr& frame) { HandleDecodedFrame(frame); });
}
if (is_idr && ok) {
need_idr = false;
decode_fail = 0;
} else if (!ok) {
// If we are in steady state and decoding starts failing, re-sync on next IDR.
if (!need_idr && ++decode_fail >= 3) {
need_idr = true;
announced_wait_idr = false;
LogWarn("[input_rtsp] decoder desync/backpressure; re-sync on next IDR");
}
}
};
while (running_.load()) {
if (av_read_frame(fmt_ctx, pkt) < 0) {
if (++read_fail >= 50) break;
std::this_thread::sleep_for(milliseconds(10));
continue;
}
read_fail = 0;
if (pkt->stream_index != video_stream) {
av_packet_unref(pkt);
continue;
}
++pkt_count;
int64_t raw_pts = pkt->pts;
int64_t raw_dts = pkt->dts;
if (raw_pts != AV_NOPTS_VALUE && raw_pts < 0) raw_pts = AV_NOPTS_VALUE;
if (raw_dts != AV_NOPTS_VALUE && raw_dts < 0) raw_dts = AV_NOPTS_VALUE;
int64_t raw_ts = (raw_pts != AV_NOPTS_VALUE) ? raw_pts : raw_dts;
int64_t pts_us = 0;
if (raw_ts == AV_NOPTS_VALUE) {
pts_us = have_last_pts ? (last_pts_us + step_us) : 0;
} else {
pts_us = av_rescale_q(raw_ts, time_base, {1, 1000000});
if (pts_us < 0) pts_us = 0;
if (have_last_pts && pts_us <= last_pts_us) pts_us = last_pts_us + step_us;
}
last_pts_us = pts_us;
have_last_pts = true;
if (stats_log_ && (pkt_count <= 3 || pkt_count % 300 == 0)) {
LogInfo("[input_rtsp] recv pkt#" + std::to_string(pkt_count) +
" size=" + std::to_string(pkt->size) +
" pts=" + (raw_pts == AV_NOPTS_VALUE ? std::string("NOPTS") : std::to_string(raw_pts)) +
" dts=" + (raw_dts == AV_NOPTS_VALUE ? std::string("NOPTS") : std::to_string(raw_dts)) +
" pts_us=" + std::to_string(pts_us) +
" key=" + std::to_string((pkt->flags & AV_PKT_FLAG_KEY) ? 1 : 0));
}
if (IsAnnexB(pkt->data, static_cast<size_t>(pkt->size))) {
if (stats_log_ && pkt_count <= 10) {
int sc = GetAnnexBStartCodeSize(pkt->data, static_cast<size_t>(pkt->size));
if (sc > 0 && static_cast<size_t>(sc) < static_cast<size_t>(pkt->size)) {
uint8_t b0 = pkt->data[sc];
int h264_type = b0 & 0x1F;
int h265_type = (b0 >> 1) & 0x3F;
LogInfo("[input_rtsp] pkt#" + std::to_string(pkt_count) +
" nal(h264)=" + std::to_string(h264_type) +
" nal(h265)=" + std::to_string(h265_type) +
" first_byte=" + std::to_string(static_cast<int>(b0)));
}
}
handle_annexb(pkt->data, static_cast<size_t>(pkt->size),
(pkt->flags & AV_PKT_FLAG_KEY) != 0, pts_us);
} else if (ensure_bsf()) {
if (av_bsf_send_packet(bsf_ctx, pkt) == 0) {
while (av_bsf_receive_packet(bsf_ctx, filt_pkt) == 0) {
handle_annexb(filt_pkt->data, static_cast<size_t>(filt_pkt->size),
(filt_pkt->flags & AV_PKT_FLAG_KEY) != 0, pts_us);
av_packet_unref(filt_pkt);
}
}
} else {
LogWarn("[input_rtsp] non-AnnexB bitstream but bsf init failed; mpp decode may produce no frames");
}
av_packet_unref(pkt);
}
if (bsf_ctx) av_bsf_free(&bsf_ctx);
if (filt_pkt) av_packet_free(&filt_pkt);
decoder->Close();
Cleanup(fmt_ctx, nullptr, pkt, nullptr);
if (!running_.load()) break;
std::this_thread::sleep_for(seconds(std::max(1, reconnect_sec_)));
}
}
#endif
std::string id_;
std::string url_;
int fps_ = 25;
int width_ = 1920;
int height_ = 1080;
std::atomic<bool> running_{false};
std::vector<std::shared_ptr<SpscQueue<FramePtr>>> out_queues_;
std::thread worker_;
uint64_t frame_id_ = 0;
bool use_ffmpeg_ = false;
bool use_mpp_ = true;
bool ffmpeg_force_tcp_ = true;
std::string platform_;
std::string hw_platform_;
int reconnect_sec_ = 5;
int reconnect_backoff_max_sec_ = 30;
bool fallback_to_stub_on_fail_ = false;
std::vector<int> cpu_affinity_;
bool stats_log_ = false;
uint64_t stats_interval_ = 100;
};
REGISTER_NODE(InputRtspNode, "input_rtsp");
} // namespace rk3588