#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "node.h" #if defined(RK3588_ENABLE_FFMPEG) extern "C" { #include #include } #endif #if defined(RK3588_ENABLE_MPP) extern "C" { #include #include #include } #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) extern "C" { #include "mk_mediakit.h" } #endif namespace rk3588 { namespace { inline int Align16(int v) { return (v + 15) & ~15; } } struct OutputConfig { // proto: // - rtsp: push to an external RTSP server (client mode) // - rtsp_server: embedded RTSP server via ZLMediaKit (listen) // - hls: write local HLS std::string proto = "rtsp"; std::string host = "127.0.0.1"; // rtsp: destination host int port = 8554; std::string path = "/live/stream"; // rtsp: url path; hls: dir or m3u8 path int segment_sec = 2; }; struct EncodedPacket { std::vector data; bool key = false; int64_t pts_ms = 0; }; #if defined(RK3588_ENABLE_FFMPEG) class AvMuxer { public: AvMuxer() = default; ~AvMuxer() { Close(); } bool Init(const OutputConfig& cfg, AVCodecID codec_id, int width, int height, int fps, const std::vector& extradata) { cfg_ = cfg; codec_id_ = codec_id; width_ = width; height_ = height; fps_ = fps > 0 ? fps : 25; extradata_ = extradata; // Copy extradata proto_ = cfg.proto.empty() ? "rtsp" : cfg.proto; url_ = BuildUrl(cfg); // Global init (safe to call multiple times) avformat_network_init(); running_ = true; monitor_thread_ = std::thread(&AvMuxer::MonitorLoop, this); std::cout << "[publish] Muxer initialized async for " << url_ << "\n"; return true; } bool WriteFrame(const EncodedPacket& pkt) { std::lock_guard lock(mutex_); if (!ready_ || !fmt_ || !stream_) return false; if (pkt.data.empty()) return false; AVPacket* out = av_packet_alloc(); if (!out) return false; if (av_new_packet(out, static_cast(pkt.data.size())) < 0) { av_packet_free(&out); return false; } std::memcpy(out->data, pkt.data.data(), pkt.data.size()); out->stream_index = stream_->index; out->flags = pkt.key ? AV_PKT_FLAG_KEY : 0; // Simple PTS mapping int64_t pts = av_rescale_q(pkt.pts_ms, AVRational{1, 1000}, stream_->time_base); if (pts <= last_pts_) pts = last_pts_ + 1; last_pts_ = pts; out->pts = pts; out->dts = pts; out->duration = av_rescale_q(1000 / std::max(1, fps_), AVRational{1, 1000}, stream_->time_base); int ret = av_interleaved_write_frame(fmt_, out); av_packet_free(&out); if (ret < 0) { if (!warned_) { char errbuf[128]; av_strerror(ret, errbuf, sizeof(errbuf)); std::cerr << "[publish] write failed for " << url_ << ": " << errbuf << ", resetting...\n"; warned_ = true; } // Signal monitor thread to reset ready_ = false; cv_.notify_all(); } return ret >= 0; } void Close() { running_ = false; // Break the av_io_open wait if possible (via CheckInterrupt) cv_.notify_all(); if (monitor_thread_.joinable()) monitor_thread_.join(); } private: void MonitorLoop() { while (running_) { if (TryOpen()) { { std::lock_guard lock(mutex_); ready_ = true; warned_ = false; std::cout << "[publish] Server ready: " << url_ << "\n"; } // Wait until error occurs or stop requested std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return !running_ || !ready_; }); } // Cleanup context { std::lock_guard lock(mutex_); if (fmt_) { // Try to write trailer if logical end (not socket error) // But usually we are here because of error, so av_write_trailer might hang/fail. // We skip trailer on error reset to avoid blocking. if (fmt_->pb) { avio_closep(&fmt_->pb); } avformat_free_context(fmt_); fmt_ = nullptr; stream_ = nullptr; } ready_ = false; } if (running_) { // Delay before retry std::this_thread::sleep_for(std::chrono::seconds(1)); } } } bool TryOpen() { AVFormatContext* fmt = nullptr; const char* fmt_name = proto_ == "hls" ? "hls" : "rtsp"; if (avformat_alloc_output_context2(&fmt, nullptr, fmt_name, url_.c_str()) < 0 || !fmt) { std::cerr << "[publish] alloc context failed " << url_ << "\n"; return false; } // Set interrupt callback to allow breaking blocking calls fmt->interrupt_callback.callback = CheckInterrupt; fmt->interrupt_callback.opaque = this; AVStream* stream = avformat_new_stream(fmt, nullptr); if (!stream) { avformat_free_context(fmt); return false; } stream->time_base = AVRational{1, 1000}; stream->avg_frame_rate = AVRational{fps_, 1}; stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO; stream->codecpar->codec_id = codec_id_; stream->codecpar->width = width_; stream->codecpar->height = height_; stream->codecpar->format = AV_PIX_FMT_YUV420P; if (!extradata_.empty()) { stream->codecpar->extradata_size = static_cast(extradata_.size()); stream->codecpar->extradata = static_cast(av_mallocz(extradata_.size() + AV_INPUT_BUFFER_PADDING_SIZE)); std::memcpy(stream->codecpar->extradata, extradata_.data(), extradata_.size()); } AVDictionary* opts = nullptr; if (proto_ == "rtsp") { av_dict_set(&opts, "rtsp_transport", "tcp", 0); } else if (proto_ == "hls") { std::string seg = std::to_string(std::max(1, cfg_.segment_sec)); av_dict_set(&opts, "hls_time", seg.c_str(), 0); av_dict_set(&opts, "hls_list_size", "0", 0); av_dict_set(&opts, "hls_flags", "delete_segments+append_list", 0); std::filesystem::create_directories(std::filesystem::path(url_).parent_path()); } if (!(fmt->oformat->flags & AVFMT_NOFILE)) { if (av_io_open_helper(fmt, url_, opts) < 0) { av_dict_free(&opts); avformat_free_context(fmt); return false; } } if (avformat_write_header(fmt, &opts) < 0) { av_dict_free(&opts); if (fmt->pb) avio_closep(&fmt->pb); avformat_free_context(fmt); return false; } av_dict_free(&opts); { std::lock_guard lock(mutex_); fmt_ = fmt; stream_ = stream; } return true; } int av_io_open_helper(AVFormatContext* fmt, const std::string& url, AVDictionary* opts) { if (proto_ == "hls") { // HLS open is slightly different handled in logic but avio_open2 can take callbacks return avio_open2(&fmt->pb, url.c_str(), AVIO_FLAG_WRITE, &fmt->interrupt_callback, &opts); } return avio_open2(&fmt->pb, url.c_str(), AVIO_FLAG_WRITE, &fmt->interrupt_callback, &opts); } static int CheckInterrupt(void* opaque) { auto* self = static_cast(opaque); return self->running_ ? 0 : 1; } std::string BuildUrl(const OutputConfig& cfg) const { if (proto_ == "hls") return cfg.path; std::string host = cfg.host.empty() ? "127.0.0.1" : cfg.host; std::string path = cfg.path; if (path.empty() || path[0] != '/') path = "/" + path; return "rtsp://" + host + ":" + std::to_string(cfg.port) + path; } OutputConfig cfg_; AVCodecID codec_id_; int width_ = 0; int height_ = 0; int fps_ = 25; std::vector extradata_; std::string proto_; std::string url_; std::atomic running_{false}; std::thread monitor_thread_; std::mutex mutex_; std::condition_variable cv_; bool ready_ = false; bool warned_ = false; AVFormatContext* fmt_ = nullptr; AVStream* stream_ = nullptr; int64_t last_pts_ = -1; }; class AvMuxerManager { public: bool Init(const std::vector& outputs, AVCodecID codec_id, int width, int height, int fps, const std::vector& extradata) { bool ok = false; for (const auto& o : outputs) { auto mux = std::make_unique(); if (mux->Init(o, codec_id, width, height, fps, extradata)) { muxers_.push_back(std::move(mux)); ok = true; } } return ok; } void Write(const EncodedPacket& pkt) { for (auto& m : muxers_) { m->WriteFrame(pkt); } } void Close() { for (auto& m : muxers_) m->Close(); muxers_.clear(); } private: std::vector> muxers_; }; #endif #if defined(RK3588_ENABLE_MPP) class MppVencEncoder { public: ~MppVencEncoder() { Shutdown(); } bool InitFromFrame(const Frame& src, const std::string& codec, int fps, int gop, int bitrate_kbps) { width_ = src.width; height_ = src.height; hor_stride_ = src.planes[0].stride > 0 ? src.planes[0].stride : (src.stride > 0 ? src.stride : Align16(src.width)); ver_stride_ = Align16(src.height); fps_ = fps > 0 ? fps : 25; gop_ = gop > 0 ? gop : fps_ * 2; bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 4000) * 1000; if (src.format == PixelFormat::NV12) { mpp_fmt_ = MPP_FMT_YUV420SP; } else if (src.format == PixelFormat::YUV420) { mpp_fmt_ = MPP_FMT_YUV420P; } else { std::cerr << "[publish] unsupported pixel format for mpp encoder\n"; return false; } if (codec == "h265" || codec == "hevc") { coding_ = MPP_VIDEO_CodingHEVC; } else { coding_ = MPP_VIDEO_CodingAVC; } if (mpp_create(&ctx_, &mpi_) != MPP_OK) { std::cerr << "[publish] mpp_create failed\n"; return false; } if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) { std::cerr << "[publish] mpp_init enc failed\n"; return false; } if (mpp_enc_cfg_init(&cfg_) != MPP_OK) { std::cerr << "[publish] mpp_enc_cfg_init failed\n"; return false; } if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) { std::cerr << "[publish] MPP_ENC_GET_CFG failed\n"; return false; } mpp_enc_cfg_set_s32(cfg_, "prep:width", width_); mpp_enc_cfg_set_s32(cfg_, "prep:height", height_); mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_); mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_); mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_); mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR); mpp_enc_cfg_set_s32(cfg_, "rc:gop", gop_); mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_); mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1); mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_); mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1); mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_); mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2); mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2); mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_); if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) { std::cerr << "[publish] MPP_ENC_SET_CFG failed\n"; return false; } // Ensure SPS/PPS (and VPS for H265) are emitted with IDR frames so RTSP clients can get SDP. if (coding_ == MPP_VIDEO_CodingAVC || coding_ == MPP_VIDEO_CodingHEVC) { MppEncHeaderMode header_mode = MPP_ENC_HEADER_MODE_EACH_IDR; if (mpi_->control(ctx_, MPP_ENC_SET_HEADER_MODE, &header_mode) != MPP_OK) { std::cerr << "[publish] MPP_ENC_SET_HEADER_MODE failed\n"; } } RK_S32 timeout = 2000; mpi_->control(ctx_, MPP_SET_OUTPUT_TIMEOUT, &timeout); MppPacket hdr = nullptr; if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) { auto* data = static_cast(mpp_packet_get_pos(hdr)); size_t len = mpp_packet_get_length(hdr); header_.assign(data, data + len); mpp_packet_deinit(&hdr); } if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM, NULL) != MPP_OK) { if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL, NULL) != MPP_OK) { std::cerr << "[publish] mpp_buffer_group_get_internal failed\n"; return false; } } initialized_ = true; return true; } void Shutdown() { if (frm_grp_) { mpp_buffer_group_put(frm_grp_); frm_grp_ = nullptr; } if (cfg_) { mpp_enc_cfg_deinit(cfg_); cfg_ = nullptr; } if (ctx_) { if (mpi_) mpi_->reset(ctx_); mpp_destroy(ctx_); ctx_ = nullptr; mpi_ = nullptr; } initialized_ = false; } bool Encode(const FramePtr& frame, const std::function& on_packet) { if (!initialized_) return false; if (!frame) return false; MppBuffer buf = nullptr; bool imported = false; size_t frame_size = CalcFrameSize(); if (frame->dma_fd >= 0 && frame->data_size > 0) { MppBufferInfo info{}; info.type = MPP_BUFFER_TYPE_EXT_DMA; info.size = frame->data_size; info.fd = frame->dma_fd; if (mpp_buffer_import(&buf, &info) == MPP_OK) { imported = true; } } if (!buf) { if (!frame->data) return false; if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) { return false; } if (!CopyToBuffer(frame, buf)) { mpp_buffer_put(buf); return false; } } MppFrame mpp_frame = nullptr; mpp_frame_init(&mpp_frame); mpp_frame_set_width(mpp_frame, width_); mpp_frame_set_height(mpp_frame, height_); mpp_frame_set_hor_stride(mpp_frame, hor_stride_); mpp_frame_set_ver_stride(mpp_frame, ver_stride_); mpp_frame_set_fmt(mpp_frame, mpp_fmt_); mpp_frame_set_pts(mpp_frame, static_cast(frame->pts)); mpp_frame_set_buffer(mpp_frame, buf); if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) { mpp_frame_deinit(&mpp_frame); mpp_buffer_put(buf); return false; } mpp_frame_deinit(&mpp_frame); while (true) { MppPacket packet = nullptr; MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet); if (ret == MPP_ERR_TIMEOUT) { std::this_thread::sleep_for(std::chrono::milliseconds(2)); continue; } if (ret != MPP_OK || !packet) break; EncodedPacket out; auto* pos = static_cast(mpp_packet_get_pos(packet)); size_t len = mpp_packet_get_length(packet); out.data.assign(pos, pos + len); // MPP_PACKET_FLAG_INTRA = 0x08, also check NAL type for H264 IDR (type 5) RK_U32 flag = mpp_packet_get_flag(packet); out.key = (flag & 0x08) != 0; // MPP_PACKET_FLAG_INTRA // Fallback: check H264 NAL unit type if MPP flag not set if (!out.key && len >= 5) { // Find NAL start code and check type for (size_t i = 0; i + 4 < len; ++i) { if (pos[i] == 0 && pos[i+1] == 0 && pos[i+2] == 0 && pos[i+3] == 1) { uint8_t nal_type = pos[i+4] & 0x1F; if (nal_type == 5 || nal_type == 7) { // IDR or SPS out.key = true; break; } } else if (pos[i] == 0 && pos[i+1] == 0 && pos[i+2] == 1) { uint8_t nal_type = pos[i+3] & 0x1F; if (nal_type == 5 || nal_type == 7) { out.key = true; break; } } } } int64_t mpp_pts = mpp_packet_get_pts(packet); // Use encoder's pts if valid, otherwise compute from frame count out.pts_ms = mpp_pts > 0 ? mpp_pts : static_cast(frame_count_ * 1000 / fps_); ++frame_count_; if (on_packet) on_packet(out); mpp_packet_deinit(&packet); break; } mpp_buffer_put(buf); (void)imported; return true; } const std::vector& Header() const { return header_; } private: size_t CalcFrameSize() const { if (mpp_fmt_ == MPP_FMT_YUV420SP || mpp_fmt_ == MPP_FMT_YUV420P) { return static_cast(hor_stride_) * ver_stride_ * 3 / 2; } return 0; } bool CopyToBuffer(const FramePtr& frame, MppBuffer buf) const { auto* dst = static_cast(mpp_buffer_get_ptr(buf)); size_t dst_size = mpp_buffer_get_size(buf); size_t need = CalcFrameSize(); if (!dst || dst_size < need) return false; std::memset(dst, 0, dst_size); if (frame->plane_count < 2) return false; const FramePlane& y = frame->planes[0]; const FramePlane& uv = frame->planes[1]; if (!y.data || !uv.data) return false; for (int row = 0; row < height_; ++row) { std::memcpy(dst + row * hor_stride_, y.data + row * y.stride, std::min(hor_stride_, y.stride)); } uint8_t* dst_uv = dst + hor_stride_ * ver_stride_; int uv_rows = ver_stride_ / 2; if (mpp_fmt_ == MPP_FMT_YUV420SP) { for (int row = 0; row < uv_rows; ++row) { std::memcpy(dst_uv + row * hor_stride_, uv.data + row * uv.stride, std::min(hor_stride_, uv.stride)); } } else if (frame->plane_count >= 3) { const FramePlane& v = frame->planes[2]; uint8_t* dst_u = dst_uv; uint8_t* dst_v = dst_uv + (hor_stride_ / 2) * uv_rows; for (int row = 0; row < uv_rows; ++row) { std::memcpy(dst_u + row * (hor_stride_ / 2), uv.data + row * uv.stride, std::min(hor_stride_ / 2, uv.stride)); std::memcpy(dst_v + row * (hor_stride_ / 2), v.data + row * v.stride, std::min(hor_stride_ / 2, v.stride)); } } return true; } MppCtx ctx_ = nullptr; MppApi* mpi_ = nullptr; MppEncCfg cfg_ = nullptr; MppBufferGroup frm_grp_ = nullptr; MppCodingType coding_ = MPP_VIDEO_CodingAVC; MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP; int width_ = 0; int height_ = 0; int hor_stride_ = 0; int ver_stride_ = 0; int fps_ = 25; int gop_ = 50; int bitrate_bps_ = 4000000; std::vector header_; bool initialized_ = false; uint64_t frame_count_ = 0; }; #endif class PublishNode : public INode { public: std::string Id() const override { return id_; } std::string Type() const override { return "publish"; } bool Init(const SimpleJson& config, const NodeContext& ctx) override { id_ = config.ValueOr("id", "publish"); input_queue_ = ctx.input_queue; codec_ = config.ValueOr("codec", "h264"); fps_ = config.ValueOr("fps", 25); gop_ = config.ValueOr("gop", 50); bitrate_kbps_ = config.ValueOr("bitrate_kbps", 4000); use_mpp_ = config.ValueOr("use_mpp", true); use_ffmpeg_mux_ = config.ValueOr("use_ffmpeg_mux", true); const SimpleJson* outputs = config.Find("outputs"); if (outputs && outputs->IsArray()) { for (const auto& o : outputs->AsArray()) { if (!o.IsObject()) continue; OutputConfig cfg; cfg.proto = o.ValueOr("proto", cfg.proto); cfg.host = o.ValueOr("host", cfg.host); cfg.port = o.ValueOr("port", cfg.port); cfg.path = o.ValueOr("path", cfg.path); cfg.segment_sec = o.ValueOr("segment_sec", cfg.segment_sec); outputs_.push_back(std::move(cfg)); } } if (outputs_.empty()) { outputs_.push_back(OutputConfig{}); } #if !defined(RK3588_ENABLE_MPP) use_mpp_ = false; #endif #if !defined(RK3588_ENABLE_FFMPEG) use_ffmpeg_mux_ = false; #endif if (!input_queue_) { std::cerr << "[publish] no input queue for node " << id_ << "\n"; return false; } for (const auto& o : outputs_) { if (o.proto == "rtsp_server") { zlm_outputs_.push_back(o); } else { ff_outputs_.push_back(o); } } return true; } bool Start() override { std::cout << "[publish] start codec=" << codec_ << " fps=" << fps_ << " gop=" << gop_ << " bitrate=" << bitrate_kbps_ << "kbps" << (use_mpp_ ? " (mpp venc)" : " (stub)") << "\n"; return true; } void Stop() override { if (input_queue_) input_queue_->Stop(); #if defined(RK3588_ENABLE_FFMPEG) if (mux_mgr_) mux_mgr_->Close(); #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) for (auto& p : zlm_pubs_) p->Close(); zlm_pubs_.clear(); #endif #if defined(RK3588_ENABLE_MPP) if (mpp_encoder_) mpp_encoder_->Shutdown(); #endif } NodeStatus Process(FramePtr frame) override { if (!frame) return NodeStatus::DROP; #if defined(RK3588_ENABLE_MPP) if (use_mpp_) { ProcessMpp(frame); } else { ProcessStub(frame); } #else ProcessStub(frame); #endif return NodeStatus::OK; } private: #if defined(RK3588_ENABLE_ZLMEDIAKIT) struct ZlmAppStream { std::string app; std::string stream; }; static ZlmAppStream ParseRtspPath(const std::string& path, const std::string& fallback_stream) { std::string p = path; while (!p.empty() && p.front() == '/') p.erase(p.begin()); ZlmAppStream out; out.app = "live"; out.stream = fallback_stream.empty() ? "stream" : fallback_stream; if (p.empty()) return out; std::istringstream iss(p); std::string token; std::vector parts; while (std::getline(iss, token, '/')) { if (!token.empty()) parts.push_back(token); } if (parts.size() == 1) { out.stream = parts[0]; } else if (parts.size() >= 2) { out.app = parts[0]; out.stream = parts[1]; } return out; } class ZlmRtspPublisher { public: ~ZlmRtspPublisher() { Close(); } bool Init(int port, const std::string& path, const std::string& fallback_stream, const std::string& codec, int width, int height, int fps, int bitrate_kbps) { EnsureZlmEnv(); if (!StartRtspServerOnce(port)) { std::cerr << "[publish] zlm rtsp server start failed on port " << port << "\n"; return false; } auto as = ParseRtspPath(path, fallback_stream); app_ = as.app; stream_ = as.stream; port_ = port; int codec_id = (codec == "h265" || codec == "hevc") ? 1 : 0; // 0:H264, 1:H265 media_ = mk_media_create("__defaultVhost__", app_.c_str(), stream_.c_str(), 0, 0, 0); if (!media_) { std::cerr << "[publish] zlm mk_media_create failed app=" << app_ << " stream=" << stream_ << "\n"; return false; } int ok = mk_media_init_video(media_, codec_id, width, height, static_cast(fps), bitrate_kbps * 1000); if (!ok) { std::cerr << "[publish] zlm mk_media_init_video failed" << "\n"; mk_media_release(media_); media_ = nullptr; return false; } mk_media_init_complete(media_); is_h265_ = (codec_id == 1); splitter_ = mk_h264_splitter_create(&OnSplitFrame, this); if (!splitter_) { std::cerr << "[publish] zlm mk_h264_splitter_create failed" << "\n"; mk_media_release(media_); media_ = nullptr; return false; } std::cout << "[publish] zlm rtsp server ready: rtsp://0.0.0.0:" << port_ << "/" << app_ << "/" << stream_ << "\n"; return true; } void Close() { if (splitter_) { mk_h264_splitter_release(splitter_); splitter_ = nullptr; } if (media_) { mk_media_release(media_); media_ = nullptr; } } void Write(const EncodedPacket& pkt, const std::vector& header, bool is_h265) { if (!media_ || pkt.data.empty()) return; // Feed encoder header first to provide SPS/PPS(/VPS). if (!sent_header_ && !header.empty()) { if (FeedHeader(pkt.pts_ms, header, is_h265)) { sent_header_ = true; } } FeedPacket(pkt.pts_ms, pkt.data, is_h265); // NOTE: We intentionally do not send data directly via mk_media_input_h264/h265. // Using mk_h264_splitter + mk_media_input_frame is more robust for SPS/PPS handling. } private: static uint16_t ReadBe16(const uint8_t* p) { return static_cast(p[0] << 8) | static_cast(p[1]); } static bool HasAnnexBStartCode(const uint8_t* d, size_t n) { if (!d || n < 3) return false; for (size_t i = 0; i + 3 < n; ++i) { if (d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 1) return true; if (i + 4 < n && d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 0 && d[i + 3] == 1) return true; } return false; } static size_t FindStartCode(const uint8_t* d, size_t n, size_t from, size_t* sc_len) { for (size_t i = from; i + 3 < n; ++i) { if (d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 1) { if (sc_len) *sc_len = 3; return i; } if (i + 4 < n && d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 0 && d[i + 3] == 1) { if (sc_len) *sc_len = 4; return i; } } return n; } static bool IsConfigNal(const uint8_t* nal, size_t nal_len, bool is_h265) { if (!nal || nal_len < 1) return false; if (!is_h265) { uint8_t t = nal[0] & 0x1F; return t == 7 || t == 8; // SPS/PPS } if (nal_len < 2) return false; uint8_t t = (nal[0] >> 1) & 0x3F; return t == 32 || t == 33 || t == 34; // VPS/SPS/PPS } static std::vector> ExtractConfigFromAnnexB(const uint8_t* d, size_t n, bool is_h265) { std::vector> out; if (!d || n < 4) return out; size_t pos = 0; while (true) { size_t sc_len = 0; size_t sc = FindStartCode(d, n, pos, &sc_len); if (sc >= n) break; size_t nal_start = sc + sc_len; size_t next_sc = FindStartCode(d, n, nal_start, nullptr); size_t nal_end = (next_sc >= n) ? n : next_sc; if (nal_start < nal_end) { const uint8_t* nal = d + nal_start; size_t nal_len = nal_end - nal_start; if (IsConfigNal(nal, nal_len, is_h265)) { std::vector one{0, 0, 0, 1}; one.insert(one.end(), nal, nal + nal_len); out.push_back(std::move(one)); } } pos = nal_end; } return out; } static std::vector ConvertLengthPrefixedToAnnexB(const uint8_t* d, size_t n) { // Best-effort conversion for AVCC-style length-prefixed NAL units (assume 4-byte length). std::vector out; if (!d || n < 8) return out; size_t pos = 0; while (pos + 4 <= n) { uint32_t len = (static_cast(d[pos]) << 24) | (static_cast(d[pos + 1]) << 16) | (static_cast(d[pos + 2]) << 8) | (static_cast(d[pos + 3])); pos += 4; if (len == 0 || pos + len > n) break; out.insert(out.end(), {0, 0, 0, 1}); out.insert(out.end(), d + pos, d + pos + len); pos += len; } return out; } static std::vector> ExtractConfigFromAvcc(const std::vector& avcc) { std::vector> out; if (avcc.size() < 7 || avcc[0] != 1) return out; size_t pos = 5; uint8_t num_sps = avcc[pos++] & 0x1F; for (uint8_t i = 0; i < num_sps; ++i) { if (pos + 2 > avcc.size()) return {}; uint16_t len = ReadBe16(&avcc[pos]); pos += 2; if (pos + len > avcc.size()) return {}; std::vector one{0, 0, 0, 1}; one.insert(one.end(), avcc.begin() + static_cast(pos), avcc.begin() + static_cast(pos + len)); out.push_back(std::move(one)); pos += len; } if (pos + 1 > avcc.size()) return {}; uint8_t num_pps = avcc[pos++]; for (uint8_t i = 0; i < num_pps; ++i) { if (pos + 2 > avcc.size()) return {}; uint16_t len = ReadBe16(&avcc[pos]); pos += 2; if (pos + len > avcc.size()) return {}; std::vector one{0, 0, 0, 1}; one.insert(one.end(), avcc.begin() + static_cast(pos), avcc.begin() + static_cast(pos + len)); out.push_back(std::move(one)); pos += len; } return out; } static std::vector> ExtractConfigFromHvcc(const std::vector& hvcc) { std::vector> out; if (hvcc.size() < 23 || hvcc[0] != 1) return out; size_t pos = 22; if (pos >= hvcc.size()) return out; uint8_t num_arrays = hvcc[pos++]; for (uint8_t i = 0; i < num_arrays; ++i) { if (pos + 3 > hvcc.size()) return {}; uint8_t nal_type = hvcc[pos++] & 0x3F; uint16_t num_nalus = ReadBe16(&hvcc[pos]); pos += 2; for (uint16_t j = 0; j < num_nalus; ++j) { if (pos + 2 > hvcc.size()) return {}; uint16_t len = ReadBe16(&hvcc[pos]); pos += 2; if (pos + len > hvcc.size()) return {}; if (nal_type == 32 || nal_type == 33 || nal_type == 34) { std::vector one{0, 0, 0, 1}; one.insert(one.end(), hvcc.begin() + static_cast(pos), hvcc.begin() + static_cast(pos + len)); out.push_back(std::move(one)); } pos += len; } } return out; } bool FeedHeader(int64_t ts_ms, const std::vector& header, bool is_h265) { if (!splitter_ || header.empty()) return false; cur_ts_ms_ = ts_ms; std::vector> cfg; if (HasAnnexBStartCode(header.data(), header.size())) { cfg = ExtractConfigFromAnnexB(header.data(), header.size(), is_h265); } else { // AVCC/HVCC extradata: extract config NALs (AnnexB) cfg = is_h265 ? ExtractConfigFromHvcc(header) : ExtractConfigFromAvcc(header); } if (cfg.empty()) return false; for (const auto& nal : cfg) { if (nal.empty()) continue; if (is_h265) { mk_media_input_h265(media_, nal.data(), static_cast(nal.size()), cur_ts_ms_, cur_ts_ms_); } else { mk_media_input_h264(media_, nal.data(), static_cast(nal.size()), cur_ts_ms_, cur_ts_ms_); } } return true; } void FeedPacket(int64_t ts_ms, const std::vector& data, bool /*is_h265*/) { if (!splitter_ || data.empty()) return; cur_ts_ms_ = ts_ms; if (HasAnnexBStartCode(data.data(), data.size())) { mk_h264_splitter_input_data(splitter_, reinterpret_cast(data.data()), static_cast(data.size())); return; } auto annexb = ConvertLengthPrefixedToAnnexB(data.data(), data.size()); if (!annexb.empty()) { mk_h264_splitter_input_data(splitter_, reinterpret_cast(annexb.data()), static_cast(annexb.size())); } } static void API_CALL OnSplitFrame(void* user_data, mk_h264_splitter /*splitter*/, const char* frame, int size) { auto* self = static_cast(user_data); if (!self || !self->media_ || !frame || size <= 0) return; const auto* d = reinterpret_cast(frame); bool has_start_code = false; if (size >= 3 && d[0] == 0 && d[1] == 0 && d[2] == 1) { has_start_code = true; } else if (size >= 4 && d[0] == 0 && d[1] == 0 && d[2] == 0 && d[3] == 1) { has_start_code = true; } // Best-effort: detect and log config NAL once (helps diagnosing unready track). if (!self->seen_config_nal_ && has_start_code) { size_t off = (size >= 4 && d[0] == 0 && d[1] == 0 && d[2] == 0 && d[3] == 1) ? 4 : 3; if (static_cast(size) > off) { const uint8_t* nal = d + off; size_t nal_len = static_cast(size) - off; bool is_cfg = false; if (!self->is_h265_) { uint8_t t = nal[0] & 0x1F; is_cfg = (t == 7 || t == 8); } else if (nal_len >= 2) { uint8_t t = (nal[0] >> 1) & 0x3F; is_cfg = (t == 32 || t == 33 || t == 34); } if (is_cfg) { self->seen_config_nal_ = true; std::cout << "[publish] zlm saw config nal" << "\n"; } } } if (!has_start_code) { thread_local std::vector prefixed; prefixed.resize(static_cast(size) + 4); prefixed[0] = 0; prefixed[1] = 0; prefixed[2] = 0; prefixed[3] = 1; std::memcpy(prefixed.data() + 4, frame, static_cast(size)); if (self->is_h265_) { mk_media_input_h265(self->media_, prefixed.data(), static_cast(prefixed.size()), self->cur_ts_ms_, self->cur_ts_ms_); } else { mk_media_input_h264(self->media_, prefixed.data(), static_cast(prefixed.size()), self->cur_ts_ms_, self->cur_ts_ms_); } return; } if (self->is_h265_) { mk_media_input_h265(self->media_, frame, size, self->cur_ts_ms_, self->cur_ts_ms_); } else { mk_media_input_h264(self->media_, frame, size, self->cur_ts_ms_, self->cur_ts_ms_); } } static void EnsureZlmEnv() { static std::once_flag once; std::call_once(once, [] { mk_config cfg; std::memset(&cfg, 0, sizeof(cfg)); cfg.thread_num = 0; // auto cfg.log_level = 2; cfg.log_mask = LOG_CONSOLE; mk_env_init(&cfg); }); } static bool StartRtspServerOnce(int port) { static std::mutex mu; static std::unordered_set started; std::lock_guard lock(mu); if (started.count(port)) return true; auto actual = mk_rtsp_server_start(static_cast(port), 0); if (actual == 0) return false; started.insert(port); return true; } mk_media media_ = nullptr; mk_h264_splitter splitter_ = nullptr; bool is_h265_ = false; uint64_t cur_ts_ms_ = 0; bool seen_config_nal_ = false; std::string app_; std::string stream_; int port_ = 0; bool sent_header_ = false; }; #endif void ProcessStub(FramePtr frame) { ++encoded_frames_; if (encoded_frames_ % 100 == 0) { std::cout << "[publish] stub frame " << frame->frame_id << " queue=" << input_queue_->Size() << " drops=" << input_queue_->DroppedCount() << "\n"; } } #if defined(RK3588_ENABLE_MPP) void ProcessMpp(FramePtr frame) { if (!mpp_encoder_) { mpp_encoder_ = std::make_unique(); } if (!encoder_ready_) { if (!mpp_encoder_->InitFromFrame(*frame, codec_, fps_, gop_, bitrate_kbps_)) { std::cerr << "[publish] encoder init failed, fallback to stub\n"; use_mpp_ = false; ProcessStub(frame); return; } #if defined(RK3588_ENABLE_FFMPEG) if (use_ffmpeg_mux_) { AVCodecID cid = (codec_ == "h265" || codec_ == "hevc") ? AV_CODEC_ID_HEVC : AV_CODEC_ID_H264; if (!mux_mgr_) mux_mgr_ = std::make_unique(); mux_mgr_->Init(ff_outputs_, cid, frame->width, frame->height, fps_, mpp_encoder_->Header()); } #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) for (const auto& o : zlm_outputs_) { auto pub = std::make_unique(); if (pub->Init(o.port, o.path, id_, codec_, frame->width, frame->height, fps_, bitrate_kbps_)) { zlm_pubs_.push_back(std::move(pub)); } } #endif encoder_ready_ = true; } const bool is_h265 = (codec_ == "h265" || codec_ == "hevc"); mpp_encoder_->Encode(frame, [&](const EncodedPacket& pkt) { ++encoded_frames_; if (encoded_frames_ % 100 == 0) { std::cout << "[publish] encoded frame " << encoded_frames_ << " queue=" << input_queue_->Size() << " drops=" << input_queue_->DroppedCount() << "\n"; } #if defined(RK3588_ENABLE_FFMPEG) if (use_ffmpeg_mux_ && mux_mgr_) mux_mgr_->Write(pkt); #else (void)pkt; #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) for (auto& p : zlm_pubs_) { p->Write(pkt, mpp_encoder_->Header(), is_h265); } #endif }); } #endif std::string id_; std::string codec_ = "h264"; int fps_ = 25; int gop_ = 50; int bitrate_kbps_ = 4000; bool use_mpp_ = false; bool use_ffmpeg_mux_ = false; std::vector outputs_; std::vector ff_outputs_; std::vector zlm_outputs_; std::shared_ptr> input_queue_; uint64_t encoded_frames_ = 0; #if defined(RK3588_ENABLE_MPP) std::unique_ptr mpp_encoder_; bool encoder_ready_ = false; #endif #if defined(RK3588_ENABLE_FFMPEG) std::unique_ptr mux_mgr_; #endif #if defined(RK3588_ENABLE_ZLMEDIAKIT) std::vector> zlm_pubs_; #endif }; REGISTER_NODE(PublishNode, "publish"); } // namespace rk3588