OrangePi3588Media/plugins/publish/publish_node.cpp
2025-12-31 11:00:08 +08:00

1258 lines
45 KiB
C++

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstring>
#include <filesystem>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <condition_variable>
#include <mutex>
#include <sstream>
#include <unordered_set>
#include "node.h"
#if defined(RK3588_ENABLE_FFMPEG)
extern "C" {
#include <libavformat/avformat.h>
#include <libavutil/avutil.h>
}
#endif
#if defined(RK3588_ENABLE_MPP)
extern "C" {
#include <rockchip/mpp_buffer.h>
#include <rockchip/mpp_packet.h>
#include <rockchip/rk_mpi.h>
}
#endif
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
extern "C" {
#include "mk_mediakit.h"
}
#endif
namespace rk3588 {
namespace {
inline int Align16(int v) { return (v + 15) & ~15; }
}
struct OutputConfig {
// proto:
// - rtsp: push to an external RTSP server (client mode)
// - rtsp_server: embedded RTSP server via ZLMediaKit (listen)
// - hls: write local HLS
std::string proto = "rtsp";
std::string host = "127.0.0.1"; // rtsp: destination host
int port = 8554;
std::string path = "/live/stream"; // rtsp: url path; hls: dir or m3u8 path
int segment_sec = 2;
};
struct EncodedPacket {
std::vector<uint8_t> data;
bool key = false;
int64_t pts_ms = 0;
};
#if defined(RK3588_ENABLE_FFMPEG)
class AvMuxer {
public:
AvMuxer() = default;
~AvMuxer() { Close(); }
bool Init(const OutputConfig& cfg, AVCodecID codec_id, int width, int height, int fps,
const std::vector<uint8_t>& extradata) {
cfg_ = cfg;
codec_id_ = codec_id;
width_ = width;
height_ = height;
fps_ = fps > 0 ? fps : 25;
extradata_ = extradata; // Copy extradata
proto_ = cfg.proto.empty() ? "rtsp" : cfg.proto;
url_ = BuildUrl(cfg);
// Global init (safe to call multiple times)
avformat_network_init();
running_ = true;
monitor_thread_ = std::thread(&AvMuxer::MonitorLoop, this);
std::cout << "[publish] Muxer initialized async for " << url_ << "\n";
return true;
}
bool WriteFrame(const EncodedPacket& pkt) {
std::lock_guard<std::mutex> lock(mutex_);
if (!ready_ || !fmt_ || !stream_) return false;
if (pkt.data.empty()) return false;
AVPacket* out = av_packet_alloc();
if (!out) return false;
if (av_new_packet(out, static_cast<int>(pkt.data.size())) < 0) {
av_packet_free(&out);
return false;
}
std::memcpy(out->data, pkt.data.data(), pkt.data.size());
out->stream_index = stream_->index;
out->flags = pkt.key ? AV_PKT_FLAG_KEY : 0;
// Simple PTS mapping
int64_t pts = av_rescale_q(pkt.pts_ms, AVRational{1, 1000}, stream_->time_base);
if (pts <= last_pts_) pts = last_pts_ + 1;
last_pts_ = pts;
out->pts = pts;
out->dts = pts;
out->duration = av_rescale_q(1000 / std::max(1, fps_), AVRational{1, 1000}, stream_->time_base);
int ret = av_interleaved_write_frame(fmt_, out);
av_packet_free(&out);
if (ret < 0) {
if (!warned_) {
char errbuf[128];
av_strerror(ret, errbuf, sizeof(errbuf));
std::cerr << "[publish] write failed for " << url_ << ": " << errbuf << ", resetting...\n";
warned_ = true;
}
// Signal monitor thread to reset
ready_ = false;
cv_.notify_all();
}
return ret >= 0;
}
void Close() {
running_ = false;
// Break the av_io_open wait if possible (via CheckInterrupt)
cv_.notify_all();
if (monitor_thread_.joinable()) monitor_thread_.join();
}
private:
void MonitorLoop() {
while (running_) {
if (TryOpen()) {
{
std::lock_guard<std::mutex> lock(mutex_);
ready_ = true;
warned_ = false;
std::cout << "[publish] Server ready: " << url_ << "\n";
}
// Wait until error occurs or stop requested
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !running_ || !ready_; });
}
// Cleanup context
{
std::lock_guard<std::mutex> lock(mutex_);
if (fmt_) {
// Try to write trailer if logical end (not socket error)
// But usually we are here because of error, so av_write_trailer might hang/fail.
// We skip trailer on error reset to avoid blocking.
if (fmt_->pb) {
avio_closep(&fmt_->pb);
}
avformat_free_context(fmt_);
fmt_ = nullptr;
stream_ = nullptr;
}
ready_ = false;
}
if (running_) {
// Delay before retry
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
bool TryOpen() {
AVFormatContext* fmt = nullptr;
const char* fmt_name = proto_ == "hls" ? "hls" : "rtsp";
if (avformat_alloc_output_context2(&fmt, nullptr, fmt_name, url_.c_str()) < 0 || !fmt) {
std::cerr << "[publish] alloc context failed " << url_ << "\n";
return false;
}
// Set interrupt callback to allow breaking blocking calls
fmt->interrupt_callback.callback = CheckInterrupt;
fmt->interrupt_callback.opaque = this;
AVStream* stream = avformat_new_stream(fmt, nullptr);
if (!stream) {
avformat_free_context(fmt);
return false;
}
stream->time_base = AVRational{1, 1000};
stream->avg_frame_rate = AVRational{fps_, 1};
stream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
stream->codecpar->codec_id = codec_id_;
stream->codecpar->width = width_;
stream->codecpar->height = height_;
stream->codecpar->format = AV_PIX_FMT_YUV420P;
if (!extradata_.empty()) {
stream->codecpar->extradata_size = static_cast<int>(extradata_.size());
stream->codecpar->extradata = static_cast<uint8_t*>(av_mallocz(extradata_.size() + AV_INPUT_BUFFER_PADDING_SIZE));
std::memcpy(stream->codecpar->extradata, extradata_.data(), extradata_.size());
}
AVDictionary* opts = nullptr;
if (proto_ == "rtsp") {
av_dict_set(&opts, "rtsp_transport", "tcp", 0);
} else if (proto_ == "hls") {
std::string seg = std::to_string(std::max(1, cfg_.segment_sec));
av_dict_set(&opts, "hls_time", seg.c_str(), 0);
av_dict_set(&opts, "hls_list_size", "0", 0);
av_dict_set(&opts, "hls_flags", "delete_segments+append_list", 0);
std::filesystem::create_directories(std::filesystem::path(url_).parent_path());
}
if (!(fmt->oformat->flags & AVFMT_NOFILE)) {
if (av_io_open_helper(fmt, url_, opts) < 0) {
av_dict_free(&opts);
avformat_free_context(fmt);
return false;
}
}
if (avformat_write_header(fmt, &opts) < 0) {
av_dict_free(&opts);
if (fmt->pb) avio_closep(&fmt->pb);
avformat_free_context(fmt);
return false;
}
av_dict_free(&opts);
{
std::lock_guard<std::mutex> lock(mutex_);
fmt_ = fmt;
stream_ = stream;
}
return true;
}
int av_io_open_helper(AVFormatContext* fmt, const std::string& url, AVDictionary* opts) {
if (proto_ == "hls") {
// HLS open is slightly different handled in logic but avio_open2 can take callbacks
return avio_open2(&fmt->pb, url.c_str(), AVIO_FLAG_WRITE, &fmt->interrupt_callback, &opts);
}
return avio_open2(&fmt->pb, url.c_str(), AVIO_FLAG_WRITE, &fmt->interrupt_callback, &opts);
}
static int CheckInterrupt(void* opaque) {
auto* self = static_cast<AvMuxer*>(opaque);
return self->running_ ? 0 : 1;
}
std::string BuildUrl(const OutputConfig& cfg) const {
if (proto_ == "hls") return cfg.path;
std::string host = cfg.host.empty() ? "127.0.0.1" : cfg.host;
std::string path = cfg.path;
if (path.empty() || path[0] != '/') path = "/" + path;
return "rtsp://" + host + ":" + std::to_string(cfg.port) + path;
}
OutputConfig cfg_;
AVCodecID codec_id_;
int width_ = 0;
int height_ = 0;
int fps_ = 25;
std::vector<uint8_t> extradata_;
std::string proto_;
std::string url_;
std::atomic<bool> running_{false};
std::thread monitor_thread_;
std::mutex mutex_;
std::condition_variable cv_;
bool ready_ = false;
bool warned_ = false;
AVFormatContext* fmt_ = nullptr;
AVStream* stream_ = nullptr;
int64_t last_pts_ = -1;
};
class AvMuxerManager {
public:
bool Init(const std::vector<OutputConfig>& outputs, AVCodecID codec_id, int width, int height,
int fps, const std::vector<uint8_t>& extradata) {
bool ok = false;
for (const auto& o : outputs) {
auto mux = std::make_unique<AvMuxer>();
if (mux->Init(o, codec_id, width, height, fps, extradata)) {
muxers_.push_back(std::move(mux));
ok = true;
}
}
return ok;
}
void Write(const EncodedPacket& pkt) {
for (auto& m : muxers_) {
m->WriteFrame(pkt);
}
}
void Close() {
for (auto& m : muxers_) m->Close();
muxers_.clear();
}
private:
std::vector<std::unique_ptr<AvMuxer>> muxers_;
};
#endif
#if defined(RK3588_ENABLE_MPP)
class MppVencEncoder {
public:
~MppVencEncoder() { Shutdown(); }
bool InitFromFrame(const Frame& src, const std::string& codec, int fps, int gop,
int bitrate_kbps) {
width_ = src.width;
height_ = src.height;
hor_stride_ = src.planes[0].stride > 0 ? src.planes[0].stride
: (src.stride > 0 ? src.stride : Align16(src.width));
ver_stride_ = Align16(src.height);
fps_ = fps > 0 ? fps : 25;
gop_ = gop > 0 ? gop : fps_ * 2;
bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 4000) * 1000;
if (src.format == PixelFormat::NV12) {
mpp_fmt_ = MPP_FMT_YUV420SP;
} else if (src.format == PixelFormat::YUV420) {
mpp_fmt_ = MPP_FMT_YUV420P;
} else {
std::cerr << "[publish] unsupported pixel format for mpp encoder\n";
return false;
}
if (codec == "h265" || codec == "hevc") {
coding_ = MPP_VIDEO_CodingHEVC;
} else {
coding_ = MPP_VIDEO_CodingAVC;
}
if (mpp_create(&ctx_, &mpi_) != MPP_OK) {
std::cerr << "[publish] mpp_create failed\n";
return false;
}
if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) {
std::cerr << "[publish] mpp_init enc failed\n";
return false;
}
if (mpp_enc_cfg_init(&cfg_) != MPP_OK) {
std::cerr << "[publish] mpp_enc_cfg_init failed\n";
return false;
}
if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) {
std::cerr << "[publish] MPP_ENC_GET_CFG failed\n";
return false;
}
mpp_enc_cfg_set_s32(cfg_, "prep:width", width_);
mpp_enc_cfg_set_s32(cfg_, "prep:height", height_);
mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_);
mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_);
mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_);
mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR);
mpp_enc_cfg_set_s32(cfg_, "rc:gop", gop_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2);
mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_);
if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) {
std::cerr << "[publish] MPP_ENC_SET_CFG failed\n";
return false;
}
// Ensure SPS/PPS (and VPS for H265) are emitted with IDR frames so RTSP clients can get SDP.
if (coding_ == MPP_VIDEO_CodingAVC || coding_ == MPP_VIDEO_CodingHEVC) {
MppEncHeaderMode header_mode = MPP_ENC_HEADER_MODE_EACH_IDR;
if (mpi_->control(ctx_, MPP_ENC_SET_HEADER_MODE, &header_mode) != MPP_OK) {
std::cerr << "[publish] MPP_ENC_SET_HEADER_MODE failed\n";
}
}
RK_S32 timeout = 2000;
mpi_->control(ctx_, MPP_SET_OUTPUT_TIMEOUT, &timeout);
MppPacket hdr = nullptr;
if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) {
auto* data = static_cast<uint8_t*>(mpp_packet_get_pos(hdr));
size_t len = mpp_packet_get_length(hdr);
header_.assign(data, data + len);
mpp_packet_deinit(&hdr);
}
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM, NULL) != MPP_OK) {
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL, NULL) != MPP_OK) {
std::cerr << "[publish] mpp_buffer_group_get_internal failed\n";
return false;
}
}
initialized_ = true;
return true;
}
void Shutdown() {
if (frm_grp_) {
mpp_buffer_group_put(frm_grp_);
frm_grp_ = nullptr;
}
if (cfg_) {
mpp_enc_cfg_deinit(cfg_);
cfg_ = nullptr;
}
if (ctx_) {
if (mpi_) mpi_->reset(ctx_);
mpp_destroy(ctx_);
ctx_ = nullptr;
mpi_ = nullptr;
}
initialized_ = false;
}
bool Encode(const FramePtr& frame, const std::function<void(const EncodedPacket&)>& on_packet) {
if (!initialized_) return false;
if (!frame) return false;
MppBuffer buf = nullptr;
bool imported = false;
size_t frame_size = CalcFrameSize();
if (frame->dma_fd >= 0 && frame->data_size > 0) {
MppBufferInfo info{};
info.type = MPP_BUFFER_TYPE_EXT_DMA;
info.size = frame->data_size;
info.fd = frame->dma_fd;
if (mpp_buffer_import(&buf, &info) == MPP_OK) {
imported = true;
}
}
if (!buf) {
if (!frame->data) return false;
if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) {
return false;
}
if (!CopyToBuffer(frame, buf)) {
mpp_buffer_put(buf);
return false;
}
}
MppFrame mpp_frame = nullptr;
mpp_frame_init(&mpp_frame);
mpp_frame_set_width(mpp_frame, width_);
mpp_frame_set_height(mpp_frame, height_);
mpp_frame_set_hor_stride(mpp_frame, hor_stride_);
mpp_frame_set_ver_stride(mpp_frame, ver_stride_);
mpp_frame_set_fmt(mpp_frame, mpp_fmt_);
mpp_frame_set_pts(mpp_frame, static_cast<int64_t>(frame->pts));
mpp_frame_set_buffer(mpp_frame, buf);
if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) {
mpp_frame_deinit(&mpp_frame);
mpp_buffer_put(buf);
return false;
}
mpp_frame_deinit(&mpp_frame);
while (true) {
MppPacket packet = nullptr;
MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet);
if (ret == MPP_ERR_TIMEOUT) {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
continue;
}
if (ret != MPP_OK || !packet) break;
EncodedPacket out;
auto* pos = static_cast<uint8_t*>(mpp_packet_get_pos(packet));
size_t len = mpp_packet_get_length(packet);
out.data.assign(pos, pos + len);
// MPP_PACKET_FLAG_INTRA = 0x08, also check NAL type for H264 IDR (type 5)
RK_U32 flag = mpp_packet_get_flag(packet);
out.key = (flag & 0x08) != 0; // MPP_PACKET_FLAG_INTRA
// Fallback: check H264 NAL unit type if MPP flag not set
if (!out.key && len >= 5) {
// Find NAL start code and check type
for (size_t i = 0; i + 4 < len; ++i) {
if (pos[i] == 0 && pos[i+1] == 0 && pos[i+2] == 0 && pos[i+3] == 1) {
uint8_t nal_type = pos[i+4] & 0x1F;
if (nal_type == 5 || nal_type == 7) { // IDR or SPS
out.key = true;
break;
}
} else if (pos[i] == 0 && pos[i+1] == 0 && pos[i+2] == 1) {
uint8_t nal_type = pos[i+3] & 0x1F;
if (nal_type == 5 || nal_type == 7) {
out.key = true;
break;
}
}
}
}
int64_t mpp_pts = mpp_packet_get_pts(packet);
// Use encoder's pts if valid, otherwise compute from frame count
out.pts_ms = mpp_pts > 0 ? mpp_pts : static_cast<int64_t>(frame_count_ * 1000 / fps_);
++frame_count_;
if (on_packet) on_packet(out);
mpp_packet_deinit(&packet);
break;
}
mpp_buffer_put(buf);
(void)imported;
return true;
}
const std::vector<uint8_t>& Header() const { return header_; }
private:
size_t CalcFrameSize() const {
if (mpp_fmt_ == MPP_FMT_YUV420SP || mpp_fmt_ == MPP_FMT_YUV420P) {
return static_cast<size_t>(hor_stride_) * ver_stride_ * 3 / 2;
}
return 0;
}
bool CopyToBuffer(const FramePtr& frame, MppBuffer buf) const {
auto* dst = static_cast<uint8_t*>(mpp_buffer_get_ptr(buf));
size_t dst_size = mpp_buffer_get_size(buf);
size_t need = CalcFrameSize();
if (!dst || dst_size < need) return false;
std::memset(dst, 0, dst_size);
if (frame->plane_count < 2) return false;
const FramePlane& y = frame->planes[0];
const FramePlane& uv = frame->planes[1];
if (!y.data || !uv.data) return false;
for (int row = 0; row < height_; ++row) {
std::memcpy(dst + row * hor_stride_, y.data + row * y.stride,
std::min(hor_stride_, y.stride));
}
uint8_t* dst_uv = dst + hor_stride_ * ver_stride_;
int uv_rows = ver_stride_ / 2;
if (mpp_fmt_ == MPP_FMT_YUV420SP) {
for (int row = 0; row < uv_rows; ++row) {
std::memcpy(dst_uv + row * hor_stride_, uv.data + row * uv.stride,
std::min(hor_stride_, uv.stride));
}
} else if (frame->plane_count >= 3) {
const FramePlane& v = frame->planes[2];
uint8_t* dst_u = dst_uv;
uint8_t* dst_v = dst_uv + (hor_stride_ / 2) * uv_rows;
for (int row = 0; row < uv_rows; ++row) {
std::memcpy(dst_u + row * (hor_stride_ / 2), uv.data + row * uv.stride,
std::min(hor_stride_ / 2, uv.stride));
std::memcpy(dst_v + row * (hor_stride_ / 2), v.data + row * v.stride,
std::min(hor_stride_ / 2, v.stride));
}
}
return true;
}
MppCtx ctx_ = nullptr;
MppApi* mpi_ = nullptr;
MppEncCfg cfg_ = nullptr;
MppBufferGroup frm_grp_ = nullptr;
MppCodingType coding_ = MPP_VIDEO_CodingAVC;
MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP;
int width_ = 0;
int height_ = 0;
int hor_stride_ = 0;
int ver_stride_ = 0;
int fps_ = 25;
int gop_ = 50;
int bitrate_bps_ = 4000000;
std::vector<uint8_t> header_;
bool initialized_ = false;
uint64_t frame_count_ = 0;
};
#endif
class PublishNode : public INode {
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "publish"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "publish");
input_queue_ = ctx.input_queue;
codec_ = config.ValueOr<std::string>("codec", "h264");
fps_ = config.ValueOr<int>("fps", 25);
gop_ = config.ValueOr<int>("gop", 50);
bitrate_kbps_ = config.ValueOr<int>("bitrate_kbps", 4000);
use_mpp_ = config.ValueOr<bool>("use_mpp", true);
use_ffmpeg_mux_ = config.ValueOr<bool>("use_ffmpeg_mux", true);
const SimpleJson* outputs = config.Find("outputs");
if (outputs && outputs->IsArray()) {
for (const auto& o : outputs->AsArray()) {
if (!o.IsObject()) continue;
OutputConfig cfg;
cfg.proto = o.ValueOr<std::string>("proto", cfg.proto);
cfg.host = o.ValueOr<std::string>("host", cfg.host);
cfg.port = o.ValueOr<int>("port", cfg.port);
cfg.path = o.ValueOr<std::string>("path", cfg.path);
cfg.segment_sec = o.ValueOr<int>("segment_sec", cfg.segment_sec);
outputs_.push_back(std::move(cfg));
}
}
if (outputs_.empty()) {
outputs_.push_back(OutputConfig{});
}
#if !defined(RK3588_ENABLE_MPP)
use_mpp_ = false;
#endif
#if !defined(RK3588_ENABLE_FFMPEG)
use_ffmpeg_mux_ = false;
#endif
if (!input_queue_) {
std::cerr << "[publish] no input queue for node " << id_ << "\n";
return false;
}
for (const auto& o : outputs_) {
if (o.proto == "rtsp_server") {
zlm_outputs_.push_back(o);
} else {
ff_outputs_.push_back(o);
}
}
return true;
}
bool Start() override {
std::cout << "[publish] start codec=" << codec_ << " fps=" << fps_ << " gop=" << gop_
<< " bitrate=" << bitrate_kbps_ << "kbps"
<< (use_mpp_ ? " (mpp venc)" : " (stub)") << "\n";
return true;
}
bool UpdateConfig(const SimpleJson& new_config) override {
const std::string new_id = new_config.ValueOr<std::string>("id", id_);
if (!new_id.empty() && new_id != id_) return false;
const std::string new_codec = new_config.ValueOr<std::string>("codec", codec_);
bool new_use_mpp = new_config.ValueOr<bool>("use_mpp", use_mpp_);
bool new_use_ffmpeg_mux = new_config.ValueOr<bool>("use_ffmpeg_mux", use_ffmpeg_mux_);
// outputs change requires rebuild (ports/paths might conflict)
if (const SimpleJson* outs = new_config.Find("outputs")) {
if (!outs->IsArray()) return false;
std::vector<OutputConfig> new_outputs;
for (const auto& ov : outs->AsArray()) {
if (!ov.IsObject()) continue;
OutputConfig cfg;
cfg.proto = ov.ValueOr<std::string>("proto", cfg.proto);
cfg.host = ov.ValueOr<std::string>("host", cfg.host);
cfg.port = ov.ValueOr<int>("port", cfg.port);
cfg.path = ov.ValueOr<std::string>("path", cfg.path);
cfg.segment_sec = ov.ValueOr<int>("segment_sec", cfg.segment_sec);
new_outputs.push_back(std::move(cfg));
}
if (new_outputs.empty()) new_outputs.push_back(OutputConfig{});
if (new_outputs.size() != outputs_.size()) return false;
for (size_t i = 0; i < outputs_.size(); ++i) {
if (new_outputs[i].proto != outputs_[i].proto || new_outputs[i].host != outputs_[i].host ||
new_outputs[i].port != outputs_[i].port || new_outputs[i].path != outputs_[i].path ||
new_outputs[i].segment_sec != outputs_[i].segment_sec) {
return false;
}
}
}
int new_fps = new_config.ValueOr<int>("fps", fps_);
int new_gop = new_config.ValueOr<int>("gop", gop_);
int new_bitrate = new_config.ValueOr<int>("bitrate_kbps", bitrate_kbps_);
if (new_codec != codec_) return false;
{
std::lock_guard<std::mutex> lock(mu_);
fps_ = new_fps;
gop_ = new_gop;
bitrate_kbps_ = new_bitrate;
use_mpp_ = new_use_mpp;
use_ffmpeg_mux_ = new_use_ffmpeg_mux;
// Force re-init on next frame to apply new params.
#if defined(RK3588_ENABLE_FFMPEG)
if (mux_mgr_) mux_mgr_->Close();
mux_mgr_.reset();
#endif
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
for (auto& p : zlm_pubs_) p->Close();
zlm_pubs_.clear();
#endif
#if defined(RK3588_ENABLE_MPP)
if (mpp_encoder_) mpp_encoder_->Shutdown();
encoder_ready_ = false;
#endif
}
return true;
}
bool GetCustomMetrics(SimpleJson& out) const override {
std::lock_guard<std::mutex> lock(mu_);
uint64_t clients = 0;
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
for (const auto& p : zlm_pubs_) {
if (!p) continue;
clients += static_cast<uint64_t>(p->TotalReaderCount());
}
#endif
SimpleJson::Object o;
o["clients"] = SimpleJson(static_cast<double>(clients));
o["encoded_frames"] = SimpleJson(static_cast<double>(encoded_frames_));
out = SimpleJson(std::move(o));
return true;
}
void Stop() override {
if (input_queue_) input_queue_->Stop();
#if defined(RK3588_ENABLE_FFMPEG)
if (mux_mgr_) mux_mgr_->Close();
#endif
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
for (auto& p : zlm_pubs_) p->Close();
zlm_pubs_.clear();
#endif
#if defined(RK3588_ENABLE_MPP)
if (mpp_encoder_) mpp_encoder_->Shutdown();
#endif
}
NodeStatus Process(FramePtr frame) override {
if (!frame) return NodeStatus::DROP;
std::lock_guard<std::mutex> lock(mu_);
#if defined(RK3588_ENABLE_MPP)
if (use_mpp_) {
ProcessMpp(frame);
} else {
ProcessStub(frame);
}
#else
ProcessStub(frame);
#endif
return NodeStatus::OK;
}
private:
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
struct ZlmAppStream {
std::string app;
std::string stream;
};
static ZlmAppStream ParseRtspPath(const std::string& path, const std::string& fallback_stream) {
std::string p = path;
while (!p.empty() && p.front() == '/') p.erase(p.begin());
ZlmAppStream out;
out.app = "live";
out.stream = fallback_stream.empty() ? "stream" : fallback_stream;
if (p.empty()) return out;
std::istringstream iss(p);
std::string token;
std::vector<std::string> parts;
while (std::getline(iss, token, '/')) {
if (!token.empty()) parts.push_back(token);
}
if (parts.size() == 1) {
out.stream = parts[0];
} else if (parts.size() >= 2) {
out.app = parts[0];
out.stream = parts[1];
}
return out;
}
class ZlmRtspPublisher {
public:
~ZlmRtspPublisher() { Close(); }
bool Init(int port, const std::string& path, const std::string& fallback_stream,
const std::string& codec, int width, int height, int fps, int bitrate_kbps) {
EnsureZlmEnv();
if (!StartRtspServerOnce(port)) {
std::cerr << "[publish] zlm rtsp server start failed on port " << port << "\n";
return false;
}
auto as = ParseRtspPath(path, fallback_stream);
app_ = as.app;
stream_ = as.stream;
port_ = port;
int codec_id = (codec == "h265" || codec == "hevc") ? 1 : 0; // 0:H264, 1:H265
media_ = mk_media_create("__defaultVhost__", app_.c_str(), stream_.c_str(), 0, 0, 0);
if (!media_) {
std::cerr << "[publish] zlm mk_media_create failed app=" << app_ << " stream=" << stream_ << "\n";
return false;
}
int ok = mk_media_init_video(media_, codec_id, width, height, static_cast<float>(fps), bitrate_kbps * 1000);
if (!ok) {
std::cerr << "[publish] zlm mk_media_init_video failed" << "\n";
mk_media_release(media_);
media_ = nullptr;
return false;
}
mk_media_init_complete(media_);
is_h265_ = (codec_id == 1);
splitter_ = mk_h264_splitter_create(&OnSplitFrame, this);
if (!splitter_) {
std::cerr << "[publish] zlm mk_h264_splitter_create failed" << "\n";
mk_media_release(media_);
media_ = nullptr;
return false;
}
std::cout << "[publish] zlm rtsp server ready: rtsp://0.0.0.0:" << port_ << "/" << app_ << "/" << stream_ << "\n";
return true;
}
void Close() {
if (splitter_) {
mk_h264_splitter_release(splitter_);
splitter_ = nullptr;
}
if (media_) {
mk_media_release(media_);
media_ = nullptr;
}
}
int TotalReaderCount() const {
return media_ ? mk_media_total_reader_count(media_) : 0;
}
void Write(const EncodedPacket& pkt, const std::vector<uint8_t>& header, bool is_h265) {
if (!media_ || pkt.data.empty()) return;
// Feed encoder header first to provide SPS/PPS(/VPS).
if (!sent_header_ && !header.empty()) {
if (FeedHeader(pkt.pts_ms, header, is_h265)) {
sent_header_ = true;
}
}
FeedPacket(pkt.pts_ms, pkt.data, is_h265);
// NOTE: We intentionally do not send data directly via mk_media_input_h264/h265.
// Using mk_h264_splitter + mk_media_input_frame is more robust for SPS/PPS handling.
}
private:
static uint16_t ReadBe16(const uint8_t* p) {
return static_cast<uint16_t>(p[0] << 8) | static_cast<uint16_t>(p[1]);
}
static bool HasAnnexBStartCode(const uint8_t* d, size_t n) {
if (!d || n < 3) return false;
for (size_t i = 0; i + 3 < n; ++i) {
if (d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 1) return true;
if (i + 4 < n && d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 0 && d[i + 3] == 1) return true;
}
return false;
}
static size_t FindStartCode(const uint8_t* d, size_t n, size_t from, size_t* sc_len) {
for (size_t i = from; i + 3 < n; ++i) {
if (d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 1) {
if (sc_len) *sc_len = 3;
return i;
}
if (i + 4 < n && d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 0 && d[i + 3] == 1) {
if (sc_len) *sc_len = 4;
return i;
}
}
return n;
}
static bool IsConfigNal(const uint8_t* nal, size_t nal_len, bool is_h265) {
if (!nal || nal_len < 1) return false;
if (!is_h265) {
uint8_t t = nal[0] & 0x1F;
return t == 7 || t == 8; // SPS/PPS
}
if (nal_len < 2) return false;
uint8_t t = (nal[0] >> 1) & 0x3F;
return t == 32 || t == 33 || t == 34; // VPS/SPS/PPS
}
static std::vector<std::vector<uint8_t>> ExtractConfigFromAnnexB(const uint8_t* d, size_t n, bool is_h265) {
std::vector<std::vector<uint8_t>> out;
if (!d || n < 4) return out;
size_t pos = 0;
while (true) {
size_t sc_len = 0;
size_t sc = FindStartCode(d, n, pos, &sc_len);
if (sc >= n) break;
size_t nal_start = sc + sc_len;
size_t next_sc = FindStartCode(d, n, nal_start, nullptr);
size_t nal_end = (next_sc >= n) ? n : next_sc;
if (nal_start < nal_end) {
const uint8_t* nal = d + nal_start;
size_t nal_len = nal_end - nal_start;
if (IsConfigNal(nal, nal_len, is_h265)) {
std::vector<uint8_t> one{0, 0, 0, 1};
one.insert(one.end(), nal, nal + nal_len);
out.push_back(std::move(one));
}
}
pos = nal_end;
}
return out;
}
static std::vector<uint8_t> ConvertLengthPrefixedToAnnexB(const uint8_t* d, size_t n) {
// Best-effort conversion for AVCC-style length-prefixed NAL units (assume 4-byte length).
std::vector<uint8_t> out;
if (!d || n < 8) return out;
size_t pos = 0;
while (pos + 4 <= n) {
uint32_t len = (static_cast<uint32_t>(d[pos]) << 24) |
(static_cast<uint32_t>(d[pos + 1]) << 16) |
(static_cast<uint32_t>(d[pos + 2]) << 8) |
(static_cast<uint32_t>(d[pos + 3]));
pos += 4;
if (len == 0 || pos + len > n) break;
out.insert(out.end(), {0, 0, 0, 1});
out.insert(out.end(), d + pos, d + pos + len);
pos += len;
}
return out;
}
static std::vector<std::vector<uint8_t>> ExtractConfigFromAvcc(const std::vector<uint8_t>& avcc) {
std::vector<std::vector<uint8_t>> out;
if (avcc.size() < 7 || avcc[0] != 1) return out;
size_t pos = 5;
uint8_t num_sps = avcc[pos++] & 0x1F;
for (uint8_t i = 0; i < num_sps; ++i) {
if (pos + 2 > avcc.size()) return {};
uint16_t len = ReadBe16(&avcc[pos]);
pos += 2;
if (pos + len > avcc.size()) return {};
std::vector<uint8_t> one{0, 0, 0, 1};
one.insert(one.end(), avcc.begin() + static_cast<long>(pos), avcc.begin() + static_cast<long>(pos + len));
out.push_back(std::move(one));
pos += len;
}
if (pos + 1 > avcc.size()) return {};
uint8_t num_pps = avcc[pos++];
for (uint8_t i = 0; i < num_pps; ++i) {
if (pos + 2 > avcc.size()) return {};
uint16_t len = ReadBe16(&avcc[pos]);
pos += 2;
if (pos + len > avcc.size()) return {};
std::vector<uint8_t> one{0, 0, 0, 1};
one.insert(one.end(), avcc.begin() + static_cast<long>(pos), avcc.begin() + static_cast<long>(pos + len));
out.push_back(std::move(one));
pos += len;
}
return out;
}
static std::vector<std::vector<uint8_t>> ExtractConfigFromHvcc(const std::vector<uint8_t>& hvcc) {
std::vector<std::vector<uint8_t>> out;
if (hvcc.size() < 23 || hvcc[0] != 1) return out;
size_t pos = 22;
if (pos >= hvcc.size()) return out;
uint8_t num_arrays = hvcc[pos++];
for (uint8_t i = 0; i < num_arrays; ++i) {
if (pos + 3 > hvcc.size()) return {};
uint8_t nal_type = hvcc[pos++] & 0x3F;
uint16_t num_nalus = ReadBe16(&hvcc[pos]);
pos += 2;
for (uint16_t j = 0; j < num_nalus; ++j) {
if (pos + 2 > hvcc.size()) return {};
uint16_t len = ReadBe16(&hvcc[pos]);
pos += 2;
if (pos + len > hvcc.size()) return {};
if (nal_type == 32 || nal_type == 33 || nal_type == 34) {
std::vector<uint8_t> one{0, 0, 0, 1};
one.insert(one.end(), hvcc.begin() + static_cast<long>(pos), hvcc.begin() + static_cast<long>(pos + len));
out.push_back(std::move(one));
}
pos += len;
}
}
return out;
}
bool FeedHeader(int64_t ts_ms, const std::vector<uint8_t>& header, bool is_h265) {
if (!splitter_ || header.empty()) return false;
cur_ts_ms_ = ts_ms;
std::vector<std::vector<uint8_t>> cfg;
if (HasAnnexBStartCode(header.data(), header.size())) {
cfg = ExtractConfigFromAnnexB(header.data(), header.size(), is_h265);
} else {
// AVCC/HVCC extradata: extract config NALs (AnnexB)
cfg = is_h265 ? ExtractConfigFromHvcc(header) : ExtractConfigFromAvcc(header);
}
if (cfg.empty()) return false;
for (const auto& nal : cfg) {
if (nal.empty()) continue;
if (is_h265) {
mk_media_input_h265(media_, nal.data(), static_cast<int>(nal.size()), cur_ts_ms_, cur_ts_ms_);
} else {
mk_media_input_h264(media_, nal.data(), static_cast<int>(nal.size()), cur_ts_ms_, cur_ts_ms_);
}
}
return true;
}
void FeedPacket(int64_t ts_ms, const std::vector<uint8_t>& data, bool /*is_h265*/) {
if (!splitter_ || data.empty()) return;
cur_ts_ms_ = ts_ms;
if (HasAnnexBStartCode(data.data(), data.size())) {
mk_h264_splitter_input_data(splitter_, reinterpret_cast<const char*>(data.data()), static_cast<int>(data.size()));
return;
}
auto annexb = ConvertLengthPrefixedToAnnexB(data.data(), data.size());
if (!annexb.empty()) {
mk_h264_splitter_input_data(splitter_, reinterpret_cast<const char*>(annexb.data()), static_cast<int>(annexb.size()));
}
}
static void API_CALL OnSplitFrame(void* user_data, mk_h264_splitter /*splitter*/, const char* frame, int size) {
auto* self = static_cast<ZlmRtspPublisher*>(user_data);
if (!self || !self->media_ || !frame || size <= 0) return;
const auto* d = reinterpret_cast<const uint8_t*>(frame);
bool has_start_code = false;
if (size >= 3 && d[0] == 0 && d[1] == 0 && d[2] == 1) {
has_start_code = true;
} else if (size >= 4 && d[0] == 0 && d[1] == 0 && d[2] == 0 && d[3] == 1) {
has_start_code = true;
}
// Best-effort: detect and log config NAL once (helps diagnosing unready track).
if (!self->seen_config_nal_ && has_start_code) {
size_t off = (size >= 4 && d[0] == 0 && d[1] == 0 && d[2] == 0 && d[3] == 1) ? 4 : 3;
if (static_cast<size_t>(size) > off) {
const uint8_t* nal = d + off;
size_t nal_len = static_cast<size_t>(size) - off;
bool is_cfg = false;
if (!self->is_h265_) {
uint8_t t = nal[0] & 0x1F;
is_cfg = (t == 7 || t == 8);
} else if (nal_len >= 2) {
uint8_t t = (nal[0] >> 1) & 0x3F;
is_cfg = (t == 32 || t == 33 || t == 34);
}
if (is_cfg) {
self->seen_config_nal_ = true;
std::cout << "[publish] zlm saw config nal" << "\n";
}
}
}
if (!has_start_code) {
thread_local std::vector<uint8_t> prefixed;
prefixed.resize(static_cast<size_t>(size) + 4);
prefixed[0] = 0;
prefixed[1] = 0;
prefixed[2] = 0;
prefixed[3] = 1;
std::memcpy(prefixed.data() + 4, frame, static_cast<size_t>(size));
if (self->is_h265_) {
mk_media_input_h265(self->media_, prefixed.data(), static_cast<int>(prefixed.size()), self->cur_ts_ms_, self->cur_ts_ms_);
} else {
mk_media_input_h264(self->media_, prefixed.data(), static_cast<int>(prefixed.size()), self->cur_ts_ms_, self->cur_ts_ms_);
}
return;
}
if (self->is_h265_) {
mk_media_input_h265(self->media_, frame, size, self->cur_ts_ms_, self->cur_ts_ms_);
} else {
mk_media_input_h264(self->media_, frame, size, self->cur_ts_ms_, self->cur_ts_ms_);
}
}
static void EnsureZlmEnv() {
static std::once_flag once;
std::call_once(once, [] {
mk_config cfg;
std::memset(&cfg, 0, sizeof(cfg));
cfg.thread_num = 0; // auto
cfg.log_level = 2;
cfg.log_mask = LOG_CONSOLE;
mk_env_init(&cfg);
});
}
static bool StartRtspServerOnce(int port) {
static std::mutex mu;
static std::unordered_set<int> started;
std::lock_guard<std::mutex> lock(mu);
if (started.count(port)) return true;
auto actual = mk_rtsp_server_start(static_cast<uint16_t>(port), 0);
if (actual == 0) return false;
started.insert(port);
return true;
}
mk_media media_ = nullptr;
mk_h264_splitter splitter_ = nullptr;
bool is_h265_ = false;
uint64_t cur_ts_ms_ = 0;
bool seen_config_nal_ = false;
std::string app_;
std::string stream_;
int port_ = 0;
bool sent_header_ = false;
};
#endif
void ProcessStub(FramePtr frame) {
++encoded_frames_;
if (encoded_frames_ % 100 == 0) {
std::cout << "[publish] stub frame " << frame->frame_id
<< " queue=" << input_queue_->Size()
<< " drops=" << input_queue_->DroppedCount() << "\n";
}
}
#if defined(RK3588_ENABLE_MPP)
void ProcessMpp(FramePtr frame) {
if (!mpp_encoder_) {
mpp_encoder_ = std::make_unique<MppVencEncoder>();
}
if (!encoder_ready_) {
if (!mpp_encoder_->InitFromFrame(*frame, codec_, fps_, gop_, bitrate_kbps_)) {
std::cerr << "[publish] encoder init failed, fallback to stub\n";
use_mpp_ = false;
ProcessStub(frame);
return;
}
#if defined(RK3588_ENABLE_FFMPEG)
if (use_ffmpeg_mux_) {
AVCodecID cid = (codec_ == "h265" || codec_ == "hevc") ? AV_CODEC_ID_HEVC
: AV_CODEC_ID_H264;
if (!mux_mgr_) mux_mgr_ = std::make_unique<AvMuxerManager>();
mux_mgr_->Init(ff_outputs_, cid, frame->width, frame->height, fps_, mpp_encoder_->Header());
}
#endif
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
for (const auto& o : zlm_outputs_) {
auto pub = std::make_unique<ZlmRtspPublisher>();
if (pub->Init(o.port, o.path, id_, codec_, frame->width, frame->height, fps_, bitrate_kbps_)) {
zlm_pubs_.push_back(std::move(pub));
}
}
#endif
encoder_ready_ = true;
}
const bool is_h265 = (codec_ == "h265" || codec_ == "hevc");
mpp_encoder_->Encode(frame, [&](const EncodedPacket& pkt) {
++encoded_frames_;
if (encoded_frames_ % 100 == 0) {
std::cout << "[publish] encoded frame " << encoded_frames_
<< " queue=" << input_queue_->Size()
<< " drops=" << input_queue_->DroppedCount() << "\n";
}
#if defined(RK3588_ENABLE_FFMPEG)
if (use_ffmpeg_mux_ && mux_mgr_) mux_mgr_->Write(pkt);
#else
(void)pkt;
#endif
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
for (auto& p : zlm_pubs_) {
p->Write(pkt, mpp_encoder_->Header(), is_h265);
}
#endif
});
}
#endif
std::string id_;
std::string codec_ = "h264";
int fps_ = 25;
int gop_ = 50;
int bitrate_kbps_ = 4000;
bool use_mpp_ = false;
bool use_ffmpeg_mux_ = false;
std::vector<OutputConfig> outputs_;
std::vector<OutputConfig> ff_outputs_;
std::vector<OutputConfig> zlm_outputs_;
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
uint64_t encoded_frames_ = 0;
mutable std::mutex mu_;
#if defined(RK3588_ENABLE_MPP)
std::unique_ptr<MppVencEncoder> mpp_encoder_;
bool encoder_ready_ = false;
#endif
#if defined(RK3588_ENABLE_FFMPEG)
std::unique_ptr<AvMuxerManager> mux_mgr_;
#endif
#if defined(RK3588_ENABLE_ZLMEDIAKIT)
std::vector<std::unique_ptr<ZlmRtspPublisher>> zlm_pubs_;
#endif
};
REGISTER_NODE(PublishNode, "publish");
} // namespace rk3588