#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "node.h" #include "hw/hw_factory.h" #include "utils/logger.h" #include "media/encoded_video_meta.h" #if defined(RK3588_ENABLE_FFMPEG) extern "C" { #include #include } #endif #if defined(RK3588_ENABLE_MPP) extern "C" { #include #include #include } #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) extern "C" { #include "mk_mediakit.h" } #endif namespace rk3588 { namespace { inline int Align16(int v) { return (v + 15) & ~15; } #if defined(RK3588_ENABLE_FFMPEG) static int FfmpegLogLevelFromString(std::string s) { for (auto& c : s) c = static_cast(std::tolower(static_cast(c))); if (s == "quiet" || s == "0") return AV_LOG_QUIET; if (s == "panic" || s == "1") return AV_LOG_PANIC; if (s == "fatal" || s == "2") return AV_LOG_FATAL; if (s == "error" || s == "3") return AV_LOG_ERROR; if (s == "warning" || s == "warn" || s == "4") return AV_LOG_WARNING; if (s == "info" || s == "5") return AV_LOG_INFO; if (s == "verbose" || s == "6") return AV_LOG_VERBOSE; if (s == "debug" || s == "7") return AV_LOG_DEBUG; if (s == "trace" || s == "8") return AV_LOG_TRACE; return AV_LOG_ERROR; } static void ApplyFfmpegLogLevel(const std::string& level) { const int lv = FfmpegLogLevelFromString(level); av_log_set_level(lv); } #endif } struct OutputConfig { // proto: // - rtsp: push to an external RTSP server (client mode) // - rtsp_server: embedded RTSP server via ZLMediaKit (listen) // - hls: write local HLS std::string proto = "rtsp"; std::string host = "127.0.0.1"; // rtsp: destination host int port = 8554; std::string path = "/live/stream"; // rtsp: url path; hls: dir or m3u8 path int segment_sec = 2; }; struct EncodedPacket { std::vector data; bool key = false; int64_t pts_ms = 0; }; #if defined(RK3588_ENABLE_FFMPEG) class AvMuxer { public: AvMuxer() = default; ~AvMuxer() { Close(); } bool Init(const OutputConfig& cfg, AVCodecID codec_id, int width, int height, int fps, const std::vector& extradata) { cfg_ = cfg; codec_id_ = codec_id; width_ = width; height_ = height; fps_ = fps > 0 ? fps : 25; extradata_ = extradata; // Copy extradata proto_ = cfg.proto.empty() ? "rtsp" : cfg.proto; url_ = BuildUrl(cfg); // Global init (safe to call multiple times) avformat_network_init(); // Async write queue: bound memory and never block the realtime pipeline. queue_ = std::make_shared>(queue_size_, QueueDropStrategy::DropOldest); running_ = true; worker_thread_ = std::thread(&AvMuxer::WorkerLoop, this); LogInfo("[publish] muxer init async for " + url_); return true; } bool WriteFrame(const EncodedPacket& pkt) { if (!running_.load(std::memory_order_acquire) || !queue_) return false; if (pkt.data.empty()) return false; EncodedPacket copy = pkt; return queue_->Push(std::move(copy)); } void Close() { running_ = false; if (queue_) queue_->Stop(); if (worker_thread_.joinable()) worker_thread_.join(); } private: void WorkerLoop() { while (running_.load(std::memory_order_acquire)) { if (!TryOpen()) { if (!running_.load(std::memory_order_acquire)) break; std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } ready_ = true; warned_ = false; last_pts_ = -1; LogInfo("[publish] muxer server ready: " + url_); while (running_.load(std::memory_order_acquire) && ready_) { EncodedPacket pkt; if (!queue_ || !queue_->Pop(pkt, std::chrono::milliseconds(100))) { continue; } if (!WriteFrameInternal(pkt)) { ready_ = false; } } CleanupContext(); ready_ = false; if (running_.load(std::memory_order_acquire)) { std::this_thread::sleep_for(std::chrono::seconds(1)); } } CleanupContext(); } bool WriteFrameInternal(const EncodedPacket& pkt) { if (!fmt_ || !stream_ || pkt.data.empty()) return false; AVPacket* out = av_packet_alloc(); if (!out) return false; if (av_new_packet(out, static_cast(pkt.data.size())) < 0) { av_packet_free(&out); return false; } std::memcpy(out->data, pkt.data.data(), pkt.data.size()); out->stream_index = stream_->index; out->flags = pkt.key ? AV_PKT_FLAG_KEY : 0; // Simple PTS mapping int64_t pts = av_rescale_q(pkt.pts_ms, AVRational{1, 1000}, stream_->time_base); if (pts <= last_pts_) pts = last_pts_ + 1; last_pts_ = pts; out->pts = pts; out->dts = pts; out->duration = av_rescale_q(1000 / std::max(1, fps_), AVRational{1, 1000}, stream_->time_base); const int ret = av_interleaved_write_frame(fmt_, out); av_packet_free(&out); if (ret < 0) { if (!warned_) { char errbuf[128]; av_strerror(ret, errbuf, sizeof(errbuf)); LogWarn(std::string("[publish] write failed for ") + url_ + ": " + errbuf + ", resetting..."); warned_ = true; } return false; } return true; } void CleanupContext() { if (!fmt_) return; if (fmt_->pb) { avio_closep(&fmt_->pb); } avformat_free_context(fmt_); fmt_ = nullptr; stream_ = nullptr; } bool TryOpen() { AVFormatContext* fmt = nullptr; const char* fmt_name = proto_ == "hls" ? "hls" : "rtsp"; if (avformat_alloc_output_context2(&fmt, nullptr, fmt_name, url_.c_str()) < 0 || !fmt) { LogError("[publish] alloc context failed " + url_); return false; } // Set interrupt callback to allow breaking blocking calls fmt->interrupt_callback.callback = CheckInterrupt; fmt->interrupt_callback.opaque = this; AVStream* stream = avformat_new_stream(fmt, nullptr); if (!stream) { avformat_free_context(fmt); return false; } stream->time_base = AVRational{1, 1000}; stream->avg_frame_rate = AVRational{fps_, 1}; stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO; stream->codecpar->codec_id = codec_id_; stream->codecpar->width = width_; stream->codecpar->height = height_; stream->codecpar->format = AV_PIX_FMT_YUV420P; if (!extradata_.empty()) { stream->codecpar->extradata_size = static_cast(extradata_.size()); stream->codecpar->extradata = static_cast(av_mallocz(extradata_.size() + AV_INPUT_BUFFER_PADDING_SIZE)); std::memcpy(stream->codecpar->extradata, extradata_.data(), extradata_.size()); } AVDictionary* opts = nullptr; if (proto_ == "rtsp") { av_dict_set(&opts, "rtsp_transport", "tcp", 0); } else if (proto_ == "hls") { std::string seg = std::to_string(std::max(1, cfg_.segment_sec)); av_dict_set(&opts, "hls_time", seg.c_str(), 0); av_dict_set(&opts, "hls_list_size", "15", 0); av_dict_set(&opts, "hls_flags", "delete_segments+append_list", 0); std::filesystem::create_directories(std::filesystem::path(url_).parent_path()); } if (!(fmt->oformat->flags & AVFMT_NOFILE)) { if (av_io_open_helper(fmt, url_, &opts) < 0) { av_dict_free(&opts); avformat_free_context(fmt); return false; } } if (avformat_write_header(fmt, &opts) < 0) { av_dict_free(&opts); if (fmt->pb) avio_closep(&fmt->pb); avformat_free_context(fmt); return false; } av_dict_free(&opts); { fmt_ = fmt; stream_ = stream; } return true; } int av_io_open_helper(AVFormatContext* fmt, const std::string& url, AVDictionary** opts) { // IMPORTANT: options must be passed by pointer-to-pointer so FFmpeg can consume/remove entries safely. // Passing a copied AVDictionary* leads to use-after-free / double-free when FFmpeg updates the dict. return avio_open2(&fmt->pb, url.c_str(), AVIO_FLAG_WRITE, &fmt->interrupt_callback, opts); } static int CheckInterrupt(void* opaque) { auto* self = static_cast(opaque); return self->running_ ? 0 : 1; } std::string BuildUrl(const OutputConfig& cfg) const { if (proto_ == "hls") return cfg.path; std::string host = cfg.host.empty() ? "127.0.0.1" : cfg.host; std::string path = cfg.path; if (path.empty() || path[0] != '/') path = "/" + path; return "rtsp://" + host + ":" + std::to_string(cfg.port) + path; } OutputConfig cfg_; AVCodecID codec_id_; int width_ = 0; int height_ = 0; int fps_ = 25; std::vector extradata_; std::string proto_; std::string url_; std::atomic running_{false}; std::thread worker_thread_; std::shared_ptr> queue_; size_t queue_size_ = 64; bool ready_ = false; bool warned_ = false; AVFormatContext* fmt_ = nullptr; AVStream* stream_ = nullptr; int64_t last_pts_ = -1; }; class AvMuxerManager { public: bool Init(const std::vector& outputs, AVCodecID codec_id, int width, int height, int fps, const std::vector& extradata) { bool ok = false; for (const auto& o : outputs) { auto mux = std::make_unique(); if (mux->Init(o, codec_id, width, height, fps, extradata)) { muxers_.push_back(std::move(mux)); ok = true; } } return ok; } void Write(const EncodedPacket& pkt) { for (auto& m : muxers_) { m->WriteFrame(pkt); } } void Close() { for (auto& m : muxers_) m->Close(); muxers_.clear(); } private: std::vector> muxers_; }; #endif #if defined(RK3588_ENABLE_MPP) class MppVencEncoder { public: ~MppVencEncoder() { Shutdown(); } bool InitFromFrame(const Frame& src, const std::string& codec, int fps, int gop, int bitrate_kbps) { width_ = src.width; height_ = src.height; hor_stride_ = src.planes[0].stride > 0 ? src.planes[0].stride : (src.stride > 0 ? src.stride : Align16(src.width)); ver_stride_ = Align16(src.height); fps_ = fps > 0 ? fps : 25; gop_ = gop > 0 ? gop : fps_ * 2; bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 4000) * 1000; if (src.format == PixelFormat::NV12) { mpp_fmt_ = MPP_FMT_YUV420SP; } else if (src.format == PixelFormat::YUV420) { mpp_fmt_ = MPP_FMT_YUV420P; } else { LogError("[publish] unsupported pixel format for mpp encoder"); return false; } if (codec == "h265" || codec == "hevc") { coding_ = MPP_VIDEO_CodingHEVC; } else { coding_ = MPP_VIDEO_CodingAVC; } if (mpp_create(&ctx_, &mpi_) != MPP_OK) { LogError("[publish] mpp_create failed"); return false; } if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) { LogError("[publish] mpp_init enc failed"); return false; } if (mpp_enc_cfg_init(&cfg_) != MPP_OK) { LogError("[publish] mpp_enc_cfg_init failed"); return false; } if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) { LogError("[publish] MPP_ENC_GET_CFG failed"); return false; } mpp_enc_cfg_set_s32(cfg_, "prep:width", width_); mpp_enc_cfg_set_s32(cfg_, "prep:height", height_); mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_); mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_); mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_); mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR); mpp_enc_cfg_set_s32(cfg_, "rc:gop", gop_); mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_); mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1); mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_); mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1); mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_); mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2); mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2); mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_); if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) { LogError("[publish] MPP_ENC_SET_CFG failed"); return false; } // Ensure SPS/PPS (and VPS for H265) are emitted with IDR frames so RTSP clients can get SDP. if (coding_ == MPP_VIDEO_CodingAVC || coding_ == MPP_VIDEO_CodingHEVC) { MppEncHeaderMode header_mode = MPP_ENC_HEADER_MODE_EACH_IDR; if (mpi_->control(ctx_, MPP_ENC_SET_HEADER_MODE, &header_mode) != MPP_OK) { LogWarn("[publish] MPP_ENC_SET_HEADER_MODE failed"); } } RK_S32 timeout = 2000; mpi_->control(ctx_, MPP_SET_OUTPUT_TIMEOUT, &timeout); MppPacket hdr = nullptr; if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) { auto* data = static_cast(mpp_packet_get_pos(hdr)); size_t len = mpp_packet_get_length(hdr); header_.assign(data, data + len); mpp_packet_deinit(&hdr); } if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM, NULL) != MPP_OK) { if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL, NULL) != MPP_OK) { LogError("[publish] mpp_buffer_group_get_internal failed"); return false; } } initialized_ = true; return true; } void Shutdown() { if (frm_grp_) { mpp_buffer_group_put(frm_grp_); frm_grp_ = nullptr; } if (cfg_) { mpp_enc_cfg_deinit(cfg_); cfg_ = nullptr; } if (ctx_) { if (mpi_) mpi_->reset(ctx_); mpp_destroy(ctx_); ctx_ = nullptr; mpi_ = nullptr; } initialized_ = false; } bool Encode(const FramePtr& frame, const std::function& on_packet) { if (!initialized_) return false; if (!frame) return false; MppBuffer buf = nullptr; bool imported = false; size_t frame_size = CalcFrameSize(); if (frame->DmaFd() >= 0 && frame->data_size > 0) { MppBufferInfo info{}; info.type = MPP_BUFFER_TYPE_EXT_DMA; info.size = frame->data_size; info.fd = frame->DmaFd(); if (mpp_buffer_import(&buf, &info) == MPP_OK) { imported = true; } } if (!buf) { if (!frame->data) return false; if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) { return false; } if (!CopyToBuffer(frame, buf)) { mpp_buffer_put(buf); return false; } } MppFrame mpp_frame = nullptr; mpp_frame_init(&mpp_frame); mpp_frame_set_width(mpp_frame, width_); mpp_frame_set_height(mpp_frame, height_); mpp_frame_set_hor_stride(mpp_frame, hor_stride_); mpp_frame_set_ver_stride(mpp_frame, ver_stride_); mpp_frame_set_fmt(mpp_frame, mpp_fmt_); mpp_frame_set_pts(mpp_frame, static_cast(frame->pts)); mpp_frame_set_buffer(mpp_frame, buf); if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) { mpp_frame_deinit(&mpp_frame); mpp_buffer_put(buf); return false; } mpp_frame_deinit(&mpp_frame); while (true) { MppPacket packet = nullptr; MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet); if (ret == MPP_ERR_TIMEOUT) { std::this_thread::sleep_for(std::chrono::milliseconds(2)); continue; } if (ret != MPP_OK || !packet) break; EncodedPacket out; auto* pos = static_cast(mpp_packet_get_pos(packet)); size_t len = mpp_packet_get_length(packet); out.data.assign(pos, pos + len); // MPP_PACKET_FLAG_INTRA = 0x08, also check NAL type for H264 IDR (type 5) RK_U32 flag = mpp_packet_get_flag(packet); out.key = (flag & 0x08) != 0; // MPP_PACKET_FLAG_INTRA // Fallback: check H264 NAL unit type if MPP flag not set if (!out.key && len >= 5) { // Find NAL start code and check type for (size_t i = 0; i + 4 < len; ++i) { if (pos[i] == 0 && pos[i+1] == 0 && pos[i+2] == 0 && pos[i+3] == 1) { uint8_t nal_type = pos[i+4] & 0x1F; if (nal_type == 5 || nal_type == 7) { // IDR or SPS out.key = true; break; } } else if (pos[i] == 0 && pos[i+1] == 0 && pos[i+2] == 1) { uint8_t nal_type = pos[i+3] & 0x1F; if (nal_type == 5 || nal_type == 7) { out.key = true; break; } } } } int64_t mpp_pts = mpp_packet_get_pts(packet); // Frame::pts is microseconds; ZLMediaKit/FFmpeg muxer paths use millisecond timestamps. out.pts_ms = mpp_pts > 0 ? (mpp_pts / 1000) : static_cast(frame_count_ * 1000 / fps_); ++frame_count_; if (on_packet) on_packet(out); mpp_packet_deinit(&packet); break; } mpp_buffer_put(buf); (void)imported; return true; } const std::vector& Header() const { return header_; } private: size_t CalcFrameSize() const { if (mpp_fmt_ == MPP_FMT_YUV420SP || mpp_fmt_ == MPP_FMT_YUV420P) { return static_cast(hor_stride_) * ver_stride_ * 3 / 2; } return 0; } bool CopyToBuffer(const FramePtr& frame, MppBuffer buf) const { auto* dst = static_cast(mpp_buffer_get_ptr(buf)); size_t dst_size = mpp_buffer_get_size(buf); size_t need = CalcFrameSize(); if (!dst || dst_size < need) return false; std::memset(dst, 0, dst_size); if (frame->plane_count < 2) return false; const FramePlane& y = frame->planes[0]; const FramePlane& uv = frame->planes[1]; if (!y.data || !uv.data) return false; for (int row = 0; row < height_; ++row) { std::memcpy(dst + row * hor_stride_, y.data + row * y.stride, std::min(hor_stride_, y.stride)); } uint8_t* dst_uv = dst + hor_stride_ * ver_stride_; int uv_rows = ver_stride_ / 2; if (mpp_fmt_ == MPP_FMT_YUV420SP) { for (int row = 0; row < uv_rows; ++row) { std::memcpy(dst_uv + row * hor_stride_, uv.data + row * uv.stride, std::min(hor_stride_, uv.stride)); } } else if (frame->plane_count >= 3) { const FramePlane& v = frame->planes[2]; uint8_t* dst_u = dst_uv; uint8_t* dst_v = dst_uv + (hor_stride_ / 2) * uv_rows; for (int row = 0; row < uv_rows; ++row) { std::memcpy(dst_u + row * (hor_stride_ / 2), uv.data + row * uv.stride, std::min(hor_stride_ / 2, uv.stride)); std::memcpy(dst_v + row * (hor_stride_ / 2), v.data + row * v.stride, std::min(hor_stride_ / 2, v.stride)); } } return true; } MppCtx ctx_ = nullptr; MppApi* mpi_ = nullptr; MppEncCfg cfg_ = nullptr; MppBufferGroup frm_grp_ = nullptr; MppCodingType coding_ = MPP_VIDEO_CodingAVC; MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP; int width_ = 0; int height_ = 0; int hor_stride_ = 0; int ver_stride_ = 0; int fps_ = 25; int gop_ = 50; int bitrate_bps_ = 4000000; std::vector header_; bool initialized_ = false; uint64_t frame_count_ = 0; }; #endif class PublishNode : public INode { public: std::string Id() const override { return id_; } std::string Type() const override { return "publish"; } bool Init(const SimpleJson& config, const NodeContext& ctx) override { id_ = config.ValueOr("id", "publish"); input_queue_ = ctx.input_queue; output_queues_ = ctx.output_queues; codec_ = config.ValueOr("codec", "h264"); fps_ = config.ValueOr("fps", 25); gop_ = config.ValueOr("gop", 50); bitrate_kbps_ = config.ValueOr("bitrate_kbps", 4000); platform_ = config.ValueOr("platform", ""); hw_platform_ = config.ValueOr("hw_platform", ""); use_mpp_ = config.ValueOr("use_mpp", true); use_ffmpeg_mux_ = config.ValueOr("use_ffmpeg_mux", true); attach_encoded_meta_ = config.ValueOr("attach_encoded_meta", !output_queues_.empty()); if (const SimpleJson* dbg = config.Find("debug"); dbg && dbg->IsObject()) { stats_log_ = dbg->ValueOr("stats", stats_log_); stats_interval_ = std::max( 1, static_cast(dbg->ValueOr("stats_interval", static_cast(stats_interval_)))); #if defined(RK3588_ENABLE_FFMPEG) ffmpeg_log_level_ = dbg->ValueOr("ffmpeg_log_level", ffmpeg_log_level_); #endif } #if defined(RK3588_ENABLE_FFMPEG) // Reduce FFmpeg's internal info logs (e.g., HLS "Opening ...") unless explicitly enabled. ApplyFfmpegLogLevel(ffmpeg_log_level_); #endif const SimpleJson* outputs = config.Find("outputs"); if (outputs && outputs->IsArray()) { for (const auto& o : outputs->AsArray()) { if (!o.IsObject()) continue; OutputConfig cfg; cfg.proto = o.ValueOr("proto", cfg.proto); cfg.host = o.ValueOr("host", cfg.host); cfg.port = o.ValueOr("port", cfg.port); cfg.path = o.ValueOr("path", cfg.path); cfg.segment_sec = o.ValueOr("segment_sec", cfg.segment_sec); outputs_.push_back(std::move(cfg)); } } if (outputs_.empty()) { outputs_.push_back(OutputConfig{}); } #if !defined(RK3588_ENABLE_MPP) use_mpp_ = false; #endif #if !defined(RK3588_ENABLE_FFMPEG) use_ffmpeg_mux_ = false; #endif if (!input_queue_) { LogError("[publish] no input queue for node " + id_); return false; } for (const auto& o : outputs_) { if (o.proto == "rtsp_server") { zlm_outputs_.push_back(o); } else { ff_outputs_.push_back(o); } } return true; } bool Start() override { LogInfo("[publish] start id=" + id_ + " codec=" + codec_ + " fps=" + std::to_string(fps_) + " gop=" + std::to_string(gop_) + " bitrate=" + std::to_string(bitrate_kbps_) + "kbps" + (use_mpp_ ? " (mpp venc)" : " (stub)")); return true; } bool UpdateConfig(const SimpleJson& new_config) override { const std::string new_id = new_config.ValueOr("id", id_); if (!new_id.empty() && new_id != id_) return false; const std::string new_codec = new_config.ValueOr("codec", codec_); bool new_use_mpp = new_config.ValueOr("use_mpp", use_mpp_); bool new_use_ffmpeg_mux = new_config.ValueOr("use_ffmpeg_mux", use_ffmpeg_mux_); // outputs change requires rebuild (ports/paths might conflict) if (const SimpleJson* outs = new_config.Find("outputs")) { if (!outs->IsArray()) return false; std::vector new_outputs; for (const auto& ov : outs->AsArray()) { if (!ov.IsObject()) continue; OutputConfig cfg; cfg.proto = ov.ValueOr("proto", cfg.proto); cfg.host = ov.ValueOr("host", cfg.host); cfg.port = ov.ValueOr("port", cfg.port); cfg.path = ov.ValueOr("path", cfg.path); cfg.segment_sec = ov.ValueOr("segment_sec", cfg.segment_sec); new_outputs.push_back(std::move(cfg)); } if (new_outputs.empty()) new_outputs.push_back(OutputConfig{}); if (new_outputs.size() != outputs_.size()) return false; for (size_t i = 0; i < outputs_.size(); ++i) { if (new_outputs[i].proto != outputs_[i].proto || new_outputs[i].host != outputs_[i].host || new_outputs[i].port != outputs_[i].port || new_outputs[i].path != outputs_[i].path || new_outputs[i].segment_sec != outputs_[i].segment_sec) { return false; } } } int new_fps = new_config.ValueOr("fps", fps_); int new_gop = new_config.ValueOr("gop", gop_); int new_bitrate = new_config.ValueOr("bitrate_kbps", bitrate_kbps_); if (new_codec != codec_) return false; { std::lock_guard lock(mu_); fps_ = new_fps; gop_ = new_gop; bitrate_kbps_ = new_bitrate; use_mpp_ = new_use_mpp; use_ffmpeg_mux_ = new_use_ffmpeg_mux; // Force re-init on next frame to apply new params. #if defined(RK3588_ENABLE_FFMPEG) if (mux_mgr_) mux_mgr_->Close(); mux_mgr_.reset(); #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) for (auto& p : zlm_pubs_) p->Close(); zlm_pubs_.clear(); #endif if (encoder_) encoder_->Close(); encoder_.reset(); encoder_header_.clear(); encoder_ready_ = false; codec_info_.reset(); } return true; } bool GetCustomMetrics(SimpleJson& out) const override { std::lock_guard lock(mu_); uint64_t clients = 0; #if defined(RK3588_ENABLE_ZLMEDIAKIT) for (const auto& p : zlm_pubs_) { if (!p) continue; clients += static_cast(p->TotalReaderCount()); } #endif SimpleJson::Object o; o["clients"] = SimpleJson(static_cast(clients)); o["encoded_frames"] = SimpleJson(static_cast(encoded_frames_)); out = SimpleJson(std::move(o)); return true; } void Stop() override { if (input_queue_) input_queue_->Stop(); #if defined(RK3588_ENABLE_FFMPEG) if (mux_mgr_) mux_mgr_->Close(); #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) for (auto& p : zlm_pubs_) p->Close(); zlm_pubs_.clear(); #endif if (encoder_) encoder_->Close(); } NodeStatus Process(FramePtr frame) override { if (!frame) return NodeStatus::DROP; { std::lock_guard lock(mu_); #if defined(RK3588_ENABLE_MPP) if (use_mpp_) { ProcessMpp(frame); } else { ProcessStub(frame); } #else ProcessStub(frame); #endif } PushToDownstream(frame); return NodeStatus::OK; } private: void PushToDownstream(const FramePtr& frame) { if (output_queues_.empty()) return; for (auto& q : output_queues_) { if (q) q->Push(frame); } } #if defined(RK3588_ENABLE_ZLMEDIAKIT) struct ZlmAppStream { std::string app; std::string stream; }; static ZlmAppStream ParseRtspPath(const std::string& path, const std::string& fallback_stream) { std::string p = path; while (!p.empty() && p.front() == '/') p.erase(p.begin()); ZlmAppStream out; out.app = "live"; out.stream = fallback_stream.empty() ? "stream" : fallback_stream; if (p.empty()) return out; std::istringstream iss(p); std::string token; std::vector parts; while (std::getline(iss, token, '/')) { if (!token.empty()) parts.push_back(token); } if (parts.size() == 1) { out.stream = parts[0]; } else if (parts.size() >= 2) { out.app = parts[0]; out.stream = parts[1]; } return out; } class ZlmRtspPublisher { public: ~ZlmRtspPublisher() { Close(); } bool Init(int port, const std::string& path, const std::string& fallback_stream, const std::string& codec, int width, int height, int fps, int bitrate_kbps) { EnsureZlmEnv(); if (!StartRtspServerOnce(port)) { LogError("[publish] zlm rtsp server start failed on port " + std::to_string(port)); return false; } auto as = ParseRtspPath(path, fallback_stream); app_ = as.app; stream_ = as.stream; port_ = port; int codec_id = (codec == "h265" || codec == "hevc") ? 1 : 0; // 0:H264, 1:H265 media_ = mk_media_create("__defaultVhost__", app_.c_str(), stream_.c_str(), 0, 0, 0); if (!media_) { LogError("[publish] zlm mk_media_create failed app=" + app_ + " stream=" + stream_); return false; } int ok = mk_media_init_video(media_, codec_id, width, height, static_cast(fps), bitrate_kbps * 1000); if (!ok) { LogError("[publish] zlm mk_media_init_video failed"); mk_media_release(media_); media_ = nullptr; return false; } mk_media_init_complete(media_); is_h265_ = (codec_id == 1); splitter_ = mk_h264_splitter_create(&OnSplitFrame, this); if (!splitter_) { LogError("[publish] zlm mk_h264_splitter_create failed"); mk_media_release(media_); media_ = nullptr; return false; } LogInfo("[publish] zlm rtsp server ready: rtsp://0.0.0.0:" + std::to_string(port_) + "/" + app_ + "/" + stream_); return true; } void Close() { if (splitter_) { mk_h264_splitter_release(splitter_); splitter_ = nullptr; } if (media_) { mk_media_release(media_); media_ = nullptr; } } int TotalReaderCount() const { return media_ ? mk_media_total_reader_count(media_) : 0; } void Write(const EncodedPacket& pkt, const std::vector& header, bool is_h265) { if (!media_ || pkt.data.empty()) return; // Feed encoder header first to provide SPS/PPS(/VPS). if (!sent_header_ && !header.empty()) { if (FeedHeader(pkt.pts_ms, header, is_h265)) { sent_header_ = true; } } FeedPacket(pkt.pts_ms, pkt.data, is_h265); // NOTE: We intentionally do not send data directly via mk_media_input_h264/h265. // Using mk_h264_splitter + mk_media_input_frame is more robust for SPS/PPS handling. } private: static uint16_t ReadBe16(const uint8_t* p) { return static_cast(p[0] << 8) | static_cast(p[1]); } static bool HasAnnexBStartCode(const uint8_t* d, size_t n) { if (!d || n < 3) return false; for (size_t i = 0; i + 3 < n; ++i) { if (d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 1) return true; if (i + 4 < n && d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 0 && d[i + 3] == 1) return true; } return false; } static size_t FindStartCode(const uint8_t* d, size_t n, size_t from, size_t* sc_len) { for (size_t i = from; i + 3 < n; ++i) { if (d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 1) { if (sc_len) *sc_len = 3; return i; } if (i + 4 < n && d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 0 && d[i + 3] == 1) { if (sc_len) *sc_len = 4; return i; } } return n; } static bool IsConfigNal(const uint8_t* nal, size_t nal_len, bool is_h265) { if (!nal || nal_len < 1) return false; if (!is_h265) { uint8_t t = nal[0] & 0x1F; return t == 7 || t == 8; // SPS/PPS } if (nal_len < 2) return false; uint8_t t = (nal[0] >> 1) & 0x3F; return t == 32 || t == 33 || t == 34; // VPS/SPS/PPS } static std::vector> ExtractConfigFromAnnexB(const uint8_t* d, size_t n, bool is_h265) { std::vector> out; if (!d || n < 4) return out; size_t pos = 0; while (true) { size_t sc_len = 0; size_t sc = FindStartCode(d, n, pos, &sc_len); if (sc >= n) break; size_t nal_start = sc + sc_len; size_t next_sc = FindStartCode(d, n, nal_start, nullptr); size_t nal_end = (next_sc >= n) ? n : next_sc; if (nal_start < nal_end) { const uint8_t* nal = d + nal_start; size_t nal_len = nal_end - nal_start; if (IsConfigNal(nal, nal_len, is_h265)) { std::vector one{0, 0, 0, 1}; one.insert(one.end(), nal, nal + nal_len); out.push_back(std::move(one)); } } pos = nal_end; } return out; } static std::vector ConvertLengthPrefixedToAnnexB(const uint8_t* d, size_t n) { // Best-effort conversion for AVCC-style length-prefixed NAL units (assume 4-byte length). std::vector out; if (!d || n < 8) return out; size_t pos = 0; while (pos + 4 <= n) { uint32_t len = (static_cast(d[pos]) << 24) | (static_cast(d[pos + 1]) << 16) | (static_cast(d[pos + 2]) << 8) | (static_cast(d[pos + 3])); pos += 4; if (len == 0 || pos + len > n) break; out.insert(out.end(), {0, 0, 0, 1}); out.insert(out.end(), d + pos, d + pos + len); pos += len; } return out; } static std::vector> ExtractConfigFromAvcc(const std::vector& avcc) { std::vector> out; if (avcc.size() < 7 || avcc[0] != 1) return out; size_t pos = 5; uint8_t num_sps = avcc[pos++] & 0x1F; for (uint8_t i = 0; i < num_sps; ++i) { if (pos + 2 > avcc.size()) return {}; uint16_t len = ReadBe16(&avcc[pos]); pos += 2; if (pos + len > avcc.size()) return {}; std::vector one{0, 0, 0, 1}; one.insert(one.end(), avcc.begin() + static_cast(pos), avcc.begin() + static_cast(pos + len)); out.push_back(std::move(one)); pos += len; } if (pos + 1 > avcc.size()) return {}; uint8_t num_pps = avcc[pos++]; for (uint8_t i = 0; i < num_pps; ++i) { if (pos + 2 > avcc.size()) return {}; uint16_t len = ReadBe16(&avcc[pos]); pos += 2; if (pos + len > avcc.size()) return {}; std::vector one{0, 0, 0, 1}; one.insert(one.end(), avcc.begin() + static_cast(pos), avcc.begin() + static_cast(pos + len)); out.push_back(std::move(one)); pos += len; } return out; } static std::vector> ExtractConfigFromHvcc(const std::vector& hvcc) { std::vector> out; if (hvcc.size() < 23 || hvcc[0] != 1) return out; size_t pos = 22; if (pos >= hvcc.size()) return out; uint8_t num_arrays = hvcc[pos++]; for (uint8_t i = 0; i < num_arrays; ++i) { if (pos + 3 > hvcc.size()) return {}; uint8_t nal_type = hvcc[pos++] & 0x3F; uint16_t num_nalus = ReadBe16(&hvcc[pos]); pos += 2; for (uint16_t j = 0; j < num_nalus; ++j) { if (pos + 2 > hvcc.size()) return {}; uint16_t len = ReadBe16(&hvcc[pos]); pos += 2; if (pos + len > hvcc.size()) return {}; if (nal_type == 32 || nal_type == 33 || nal_type == 34) { std::vector one{0, 0, 0, 1}; one.insert(one.end(), hvcc.begin() + static_cast(pos), hvcc.begin() + static_cast(pos + len)); out.push_back(std::move(one)); } pos += len; } } return out; } bool FeedHeader(int64_t ts_ms, const std::vector& header, bool is_h265) { if (!splitter_ || header.empty()) return false; cur_ts_ms_ = ts_ms; std::vector> cfg; if (HasAnnexBStartCode(header.data(), header.size())) { cfg = ExtractConfigFromAnnexB(header.data(), header.size(), is_h265); } else { // AVCC/HVCC extradata: extract config NALs (AnnexB) cfg = is_h265 ? ExtractConfigFromHvcc(header) : ExtractConfigFromAvcc(header); } if (cfg.empty()) return false; for (const auto& nal : cfg) { if (nal.empty()) continue; if (is_h265) { mk_media_input_h265(media_, nal.data(), static_cast(nal.size()), cur_ts_ms_, cur_ts_ms_); } else { mk_media_input_h264(media_, nal.data(), static_cast(nal.size()), cur_ts_ms_, cur_ts_ms_); } } return true; } void FeedPacket(int64_t ts_ms, const std::vector& data, bool /*is_h265*/) { if (!splitter_ || data.empty()) return; cur_ts_ms_ = ts_ms; if (HasAnnexBStartCode(data.data(), data.size())) { mk_h264_splitter_input_data(splitter_, reinterpret_cast(data.data()), static_cast(data.size())); return; } auto annexb = ConvertLengthPrefixedToAnnexB(data.data(), data.size()); if (!annexb.empty()) { mk_h264_splitter_input_data(splitter_, reinterpret_cast(annexb.data()), static_cast(annexb.size())); } } static void API_CALL OnSplitFrame(void* user_data, mk_h264_splitter /*splitter*/, const char* frame, int size) { auto* self = static_cast(user_data); if (!self || !self->media_ || !frame || size <= 0) return; const auto* d = reinterpret_cast(frame); bool has_start_code = false; if (size >= 3 && d[0] == 0 && d[1] == 0 && d[2] == 1) { has_start_code = true; } else if (size >= 4 && d[0] == 0 && d[1] == 0 && d[2] == 0 && d[3] == 1) { has_start_code = true; } // Best-effort: detect and log config NAL once (helps diagnosing unready track). if (!self->seen_config_nal_ && has_start_code) { size_t off = (size >= 4 && d[0] == 0 && d[1] == 0 && d[2] == 0 && d[3] == 1) ? 4 : 3; if (static_cast(size) > off) { const uint8_t* nal = d + off; size_t nal_len = static_cast(size) - off; bool is_cfg = false; if (!self->is_h265_) { uint8_t t = nal[0] & 0x1F; is_cfg = (t == 7 || t == 8); } else if (nal_len >= 2) { uint8_t t = (nal[0] >> 1) & 0x3F; is_cfg = (t == 32 || t == 33 || t == 34); } if (is_cfg) { self->seen_config_nal_ = true; LogDebug("[publish] zlm saw config nal"); } } } if (!has_start_code) { thread_local std::vector prefixed; prefixed.resize(static_cast(size) + 4); prefixed[0] = 0; prefixed[1] = 0; prefixed[2] = 0; prefixed[3] = 1; std::memcpy(prefixed.data() + 4, frame, static_cast(size)); if (self->is_h265_) { mk_media_input_h265(self->media_, prefixed.data(), static_cast(prefixed.size()), self->cur_ts_ms_, self->cur_ts_ms_); } else { mk_media_input_h264(self->media_, prefixed.data(), static_cast(prefixed.size()), self->cur_ts_ms_, self->cur_ts_ms_); } return; } if (self->is_h265_) { mk_media_input_h265(self->media_, frame, size, self->cur_ts_ms_, self->cur_ts_ms_); } else { mk_media_input_h264(self->media_, frame, size, self->cur_ts_ms_, self->cur_ts_ms_); } } static void EnsureZlmEnv() { static std::once_flag once; std::call_once(once, [] { mk_config cfg; std::memset(&cfg, 0, sizeof(cfg)); cfg.thread_num = 0; // auto cfg.log_level = 2; cfg.log_mask = LOG_CONSOLE; mk_env_init(&cfg); }); } static bool StartRtspServerOnce(int port) { static std::mutex mu; static std::unordered_set started; std::lock_guard lock(mu); if (started.count(port)) return true; auto actual = mk_rtsp_server_start(static_cast(port), 0); if (actual == 0) return false; started.insert(port); return true; } mk_media media_ = nullptr; mk_h264_splitter splitter_ = nullptr; bool is_h265_ = false; uint64_t cur_ts_ms_ = 0; bool seen_config_nal_ = false; std::string app_; std::string stream_; int port_ = 0; bool sent_header_ = false; }; #endif void ProcessStub(FramePtr frame) { ++encoded_frames_; if (stats_log_ && stats_interval_ > 0 && (encoded_frames_ % stats_interval_) == 0) { LogInfo("[publish] stub frame=" + std::to_string(frame->frame_id) + " queue=" + std::to_string(input_queue_->Size()) + " drops=" + std::to_string(input_queue_->DroppedCount())); } } static std::string PixelFormatToString(PixelFormat fmt) { switch (fmt) { case PixelFormat::NV12: return "nv12"; case PixelFormat::YUV420: return "yuv420"; case PixelFormat::RGB: return "rgb"; case PixelFormat::BGR: return "bgr"; default: return "unknown"; } } #if defined(RK3588_ENABLE_MPP) void ProcessMpp(FramePtr frame) { if (!encoder_ready_) { SimpleJson::Object enc_obj; enc_obj["backend"] = SimpleJson(std::string("mpp")); enc_obj["codec"] = SimpleJson(codec_); enc_obj["fps"] = SimpleJson(static_cast(fps_)); enc_obj["gop"] = SimpleJson(static_cast(gop_)); enc_obj["bitrate_kbps"] = SimpleJson(static_cast(bitrate_kbps_)); enc_obj["width"] = SimpleJson(static_cast(frame->width)); enc_obj["height"] = SimpleJson(static_cast(frame->height)); enc_obj["pixel_format"] = SimpleJson(PixelFormatToString(frame->format)); if (!platform_.empty()) { enc_obj["platform"] = SimpleJson(platform_); } if (!hw_platform_.empty()) { enc_obj["hw_platform"] = SimpleJson(hw_platform_); } SimpleJson enc_cfg(std::move(enc_obj)); if (!encoder_) { encoder_ = HwFactory::CreateEncoder(enc_cfg); } if (!encoder_ || encoder_->Open(enc_cfg).Failed()) { LogWarn("[publish] encoder init failed, fallback to stub"); use_mpp_ = false; ProcessStub(frame); return; } encoder_header_ = encoder_->ExtraData(); #if defined(RK3588_ENABLE_FFMPEG) if (use_ffmpeg_mux_) { AVCodecID cid = (codec_ == "h265" || codec_ == "hevc") ? AV_CODEC_ID_HEVC : AV_CODEC_ID_H264; if (!mux_mgr_) mux_mgr_ = std::make_unique(); mux_mgr_->Init(ff_outputs_, cid, frame->width, frame->height, fps_, encoder_header_); } #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) for (const auto& o : zlm_outputs_) { auto pub = std::make_unique(); if (pub->Init(o.port, o.path, id_, codec_, frame->width, frame->height, fps_, bitrate_kbps_)) { zlm_pubs_.push_back(std::move(pub)); } } #endif encoder_ready_ = true; } const bool is_h265 = (codec_ == "h265" || codec_ == "hevc"); if (!codec_info_) codec_info_ = std::make_shared(); codec_info_->codec = is_h265 ? VideoCodec::H265 : VideoCodec::H264; codec_info_->width = frame->width; codec_info_->height = frame->height; codec_info_->fps = fps_; if (!encoder_header_.empty()) { codec_info_->extradata = encoder_header_; } if (encoder_->Send(frame).Ok()) { while (true) { auto out = encoder_->Receive(); if (!out.Ok()) { if (out.ErrMessage() != "no_packet") { LogWarn("[publish] encoder receive: " + out.ErrMessage()); } break; } auto pkt = std::move(out.Value()); if (!pkt.extra_data.empty()) { encoder_header_ = pkt.extra_data; if (codec_info_) codec_info_->extradata = encoder_header_; } EncodedPacket pub_pkt; pub_pkt.data = std::move(pkt.data); pub_pkt.key = pkt.keyframe; pub_pkt.pts_ms = pkt.pts > 0 ? static_cast(pkt.pts / 1000) : static_cast(encoded_frames_ * 1000 / std::max(1, fps_)); if (attach_encoded_meta_ && codec_info_) { auto meta = std::make_shared(); meta->codec = codec_info_; meta->pkt.data = pub_pkt.data; meta->pkt.key = pub_pkt.key; meta->pkt.pts_ms = pub_pkt.pts_ms; frame->user_meta = meta; } ++encoded_frames_; if (stats_log_ && stats_interval_ > 0 && (encoded_frames_ % stats_interval_) == 0) { LogInfo("[publish] encoded frames=" + std::to_string(encoded_frames_) + " queue=" + std::to_string(input_queue_->Size()) + " drops=" + std::to_string(input_queue_->DroppedCount())); } #if defined(RK3588_ENABLE_FFMPEG) if (use_ffmpeg_mux_ && mux_mgr_) mux_mgr_->Write(pub_pkt); #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) for (auto& p : zlm_pubs_) { p->Write(pub_pkt, encoder_header_, is_h265); } #endif } } } #endif std::string id_; std::string codec_ = "h264"; int fps_ = 25; int gop_ = 50; int bitrate_kbps_ = 4000; std::string platform_; std::string hw_platform_; bool use_mpp_ = false; bool use_ffmpeg_mux_ = false; std::vector outputs_; std::vector ff_outputs_; std::vector zlm_outputs_; std::shared_ptr> input_queue_; std::vector>> output_queues_; uint64_t encoded_frames_ = 0; bool stats_log_ = false; uint64_t stats_interval_ = 100; #if defined(RK3588_ENABLE_FFMPEG) std::string ffmpeg_log_level_ = "error"; #endif mutable std::mutex mu_; bool attach_encoded_meta_ = false; std::shared_ptr codec_info_; std::shared_ptr encoder_; bool encoder_ready_ = false; std::vector encoder_header_; #if defined(RK3588_ENABLE_FFMPEG) std::unique_ptr mux_mgr_; #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) std::vector> zlm_pubs_; #endif }; REGISTER_NODE(PublishNode, "publish"); } // namespace rk3588