OrangePi3588Media/plugins/publish/publish_node.cpp

674 lines
23 KiB
C++

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstring>
#include <filesystem>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include "node.h"
#if defined(RK3588_ENABLE_FFMPEG)
extern "C" {
#include <libavformat/avformat.h>
#include <libavutil/avutil.h>
}
#endif
#if defined(RK3588_ENABLE_MPP)
extern "C" {
#include <rockchip/mpp_buffer.h>
#include <rockchip/mpp_packet.h>
#include <rockchip/rk_mpi.h>
}
#endif
namespace rk3588 {
namespace {
inline int Align16(int v) { return (v + 15) & ~15; }
}
struct OutputConfig {
std::string proto = "rtsp"; // rtsp | hls
int port = 8554;
std::string path = "/live/stream"; // rtsp: url path; hls: dir or m3u8 path
int segment_sec = 2;
};
struct EncodedPacket {
std::vector<uint8_t> data;
bool key = false;
int64_t pts_ms = 0;
};
#if defined(RK3588_ENABLE_FFMPEG)
class AvMuxer {
public:
bool Init(const OutputConfig& cfg, AVCodecID codec_id, int width, int height, int fps,
const std::vector<uint8_t>& extradata) {
proto_ = cfg.proto.empty() ? "rtsp" : cfg.proto;
fps_ = fps > 0 ? fps : 25;
url_ = BuildUrl(cfg);
avformat_network_init();
const char* fmt_name = proto_ == "hls" ? "hls" : "rtsp";
if (avformat_alloc_output_context2(&fmt_, nullptr, fmt_name, url_.c_str()) < 0 || !fmt_) {
std::cerr << "[publish] avformat_alloc_output_context2 failed for " << url_ << "\n";
return false;
}
stream_ = avformat_new_stream(fmt_, nullptr);
if (!stream_) {
std::cerr << "[publish] avformat_new_stream failed for " << url_ << "\n";
Close();
return false;
}
stream_->time_base = AVRational{1, 1000};
stream_->avg_frame_rate = AVRational{fps_, 1};
stream_->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
stream_->codecpar->codec_id = codec_id;
stream_->codecpar->width = width;
stream_->codecpar->height = height;
stream_->codecpar->format = AV_PIX_FMT_YUV420P;
stream_->codecpar->codec_tag = 0;
if (!extradata.empty()) {
stream_->codecpar->extradata_size = static_cast<int>(extradata.size());
stream_->codecpar->extradata =
static_cast<uint8_t*>(av_mallocz(extradata.size() + AV_INPUT_BUFFER_PADDING_SIZE));
if (stream_->codecpar->extradata) {
std::memcpy(stream_->codecpar->extradata, extradata.data(), extradata.size());
}
}
AVDictionary* opts = nullptr;
if (proto_ == "rtsp") {
av_dict_set(&opts, "rtsp_flags", "listen", 0);
av_dict_set(&opts, "rtsp_transport", "tcp", 0);
} else if (proto_ == "hls") {
std::string seg = std::to_string(std::max(1, cfg.segment_sec));
av_dict_set(&opts, "hls_time", seg.c_str(), 0);
av_dict_set(&opts, "hls_list_size", "0", 0);
av_dict_set(&opts, "hls_flags", "delete_segments+append_list", 0);
// Ensure HLS output directory exists (HLS muxer uses AVFMT_NOFILE)
std::filesystem::path p(url_);
if (!p.has_extension()) p /= "index.m3u8";
std::error_code ec;
std::filesystem::create_directories(p.parent_path(), ec);
}
// For non-NOFILE formats, open IO context manually.
if (!(fmt_->oformat->flags & AVFMT_NOFILE)) {
if (av_io_open(url_, opts) < 0) {
av_dict_free(&opts);
Close();
return false;
}
}
if (avformat_write_header(fmt_, &opts) < 0) {
std::cerr << "[publish] avformat_write_header failed for " << url_ << "\n";
av_dict_free(&opts);
Close();
return false;
}
av_dict_free(&opts);
ready_ = true;
std::cout << "[publish] mux start " << proto_ << " -> " << url_ << "\n";
return true;
}
bool WriteFrame(const EncodedPacket& pkt) {
if (!ready_ || !stream_) return false;
if (pkt.data.empty()) return false;
AVPacket out;
av_init_packet(&out);
if (av_new_packet(&out, static_cast<int>(pkt.data.size())) < 0) {
return false;
}
std::memcpy(out.data, pkt.data.data(), pkt.data.size());
out.stream_index = stream_->index;
out.flags = pkt.key ? AV_PKT_FLAG_KEY : 0;
int64_t pts = av_rescale_q(pkt.pts_ms, AVRational{1, 1000}, stream_->time_base);
// Ensure monotonically increasing PTS/DTS
if (pts <= last_pts_) {
pts = last_pts_ + 1;
}
last_pts_ = pts;
out.pts = pts;
out.dts = pts;
out.duration = av_rescale_q(1000 / std::max(1, fps_), AVRational{1, 1000}, stream_->time_base);
int ret = av_interleaved_write_frame(fmt_, &out);
av_packet_unref(&out);
if (ret < 0 && !warned_) {
char errbuf[128];
av_strerror(ret, errbuf, sizeof(errbuf));
std::cerr << "[publish] av_interleaved_write_frame failed for " << url_ << " ret="
<< ret << " (" << errbuf << ")\n";
warned_ = true;
}
// Debug: log first few frames pts
if (write_count_ < 5 || write_count_ % 100 == 0) {
std::cout << "[publish] hls write frame " << write_count_ << " pts=" << pkt.pts_ms
<< "ms key=" << pkt.key << " ret=" << ret << "\n";
}
++write_count_;
return ret >= 0;
}
void Close() {
if (fmt_) {
if (ready_) av_write_trailer(fmt_);
if (!(fmt_->oformat->flags & AVFMT_NOFILE) && fmt_->pb) {
avio_closep(&fmt_->pb);
}
avformat_free_context(fmt_);
}
fmt_ = nullptr;
stream_ = nullptr;
ready_ = false;
warned_ = false;
}
private:
int av_io_open(const std::string& url, AVDictionary* opts) {
if (proto_ == "hls") {
std::filesystem::path p(url);
if (!p.has_extension()) p /= "index.m3u8";
if (!p.is_absolute()) p = std::filesystem::current_path() / p;
std::error_code ec;
std::filesystem::create_directories(p.parent_path(), ec);
resolved_path_ = p.string();
return avio_open2(&fmt_->pb, resolved_path_.c_str(), AVIO_FLAG_WRITE, nullptr, &opts);
}
resolved_path_ = url;
return avio_open2(&fmt_->pb, url.c_str(), AVIO_FLAG_WRITE, nullptr, &opts);
}
std::string BuildUrl(const OutputConfig& cfg) const {
if (proto_ == "hls") return cfg.path;
std::string path = cfg.path;
if (path.empty() || path[0] != '/') path = "/" + path;
return "rtsp://0.0.0.0:" + std::to_string(cfg.port) + path;
}
AVFormatContext* fmt_ = nullptr;
AVStream* stream_ = nullptr;
std::string url_;
std::string resolved_path_;
std::string proto_;
int fps_ = 25;
bool ready_ = false;
bool warned_ = false;
uint64_t write_count_ = 0;
int64_t last_pts_ = -1;
};
class AvMuxerManager {
public:
bool Init(const std::vector<OutputConfig>& outputs, AVCodecID codec_id, int width, int height,
int fps, const std::vector<uint8_t>& extradata) {
bool ok = false;
for (const auto& o : outputs) {
auto mux = std::make_unique<AvMuxer>();
if (mux->Init(o, codec_id, width, height, fps, extradata)) {
muxers_.push_back(std::move(mux));
ok = true;
}
}
return ok;
}
void Write(const EncodedPacket& pkt) {
for (auto& m : muxers_) {
m->WriteFrame(pkt);
}
}
void Close() {
for (auto& m : muxers_) m->Close();
muxers_.clear();
}
private:
std::vector<std::unique_ptr<AvMuxer>> muxers_;
};
#endif
#if defined(RK3588_ENABLE_MPP)
class MppVencEncoder {
public:
~MppVencEncoder() { Shutdown(); }
bool InitFromFrame(const Frame& src, const std::string& codec, int fps, int gop,
int bitrate_kbps) {
width_ = src.width;
height_ = src.height;
hor_stride_ = src.planes[0].stride > 0 ? src.planes[0].stride
: (src.stride > 0 ? src.stride : Align16(src.width));
ver_stride_ = Align16(src.height);
fps_ = fps > 0 ? fps : 25;
gop_ = gop > 0 ? gop : fps_ * 2;
bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 4000) * 1000;
if (src.format == PixelFormat::NV12) {
mpp_fmt_ = MPP_FMT_YUV420SP;
} else if (src.format == PixelFormat::YUV420) {
mpp_fmt_ = MPP_FMT_YUV420P;
} else {
std::cerr << "[publish] unsupported pixel format for mpp encoder\n";
return false;
}
if (codec == "h265" || codec == "hevc") {
coding_ = MPP_VIDEO_CodingHEVC;
} else {
coding_ = MPP_VIDEO_CodingAVC;
}
if (mpp_create(&ctx_, &mpi_) != MPP_OK) {
std::cerr << "[publish] mpp_create failed\n";
return false;
}
if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) {
std::cerr << "[publish] mpp_init enc failed\n";
return false;
}
if (mpp_enc_cfg_init(&cfg_) != MPP_OK) {
std::cerr << "[publish] mpp_enc_cfg_init failed\n";
return false;
}
if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) {
std::cerr << "[publish] MPP_ENC_GET_CFG failed\n";
return false;
}
mpp_enc_cfg_set_s32(cfg_, "prep:width", width_);
mpp_enc_cfg_set_s32(cfg_, "prep:height", height_);
mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_);
mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_);
mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_);
mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR);
mpp_enc_cfg_set_s32(cfg_, "rc:gop", gop_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2);
mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_);
if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) {
std::cerr << "[publish] MPP_ENC_SET_CFG failed\n";
return false;
}
RK_S32 timeout = 2000;
mpi_->control(ctx_, MPP_SET_OUTPUT_TIMEOUT, &timeout);
MppPacket hdr = nullptr;
if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) {
auto* data = static_cast<uint8_t*>(mpp_packet_get_pos(hdr));
size_t len = mpp_packet_get_length(hdr);
header_.assign(data, data + len);
mpp_packet_deinit(&hdr);
}
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM) != MPP_OK) {
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL) != MPP_OK) {
std::cerr << "[publish] mpp_buffer_group_get_internal failed\n";
return false;
}
}
initialized_ = true;
return true;
}
void Shutdown() {
if (frm_grp_) {
mpp_buffer_group_put(frm_grp_);
frm_grp_ = nullptr;
}
if (cfg_) {
mpp_enc_cfg_deinit(cfg_);
cfg_ = nullptr;
}
if (ctx_) {
if (mpi_) mpi_->reset(ctx_);
mpp_destroy(ctx_);
ctx_ = nullptr;
mpi_ = nullptr;
}
initialized_ = false;
}
bool Encode(const FramePtr& frame, const std::function<void(const EncodedPacket&)>& on_packet) {
if (!initialized_) return false;
if (!frame) return false;
MppBuffer buf = nullptr;
bool imported = false;
size_t frame_size = CalcFrameSize();
if (frame->dma_fd >= 0 && frame->data_size > 0) {
MppBufferInfo info{};
info.type = MPP_BUFFER_TYPE_EXT_DMA;
info.size = frame->data_size;
info.fd = frame->dma_fd;
if (mpp_buffer_import(&buf, &info) == MPP_OK) {
imported = true;
}
}
if (!buf) {
if (!frame->data) return false;
if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) {
return false;
}
if (!CopyToBuffer(frame, buf)) {
mpp_buffer_put(buf);
return false;
}
}
MppFrame mpp_frame = nullptr;
mpp_frame_init(&mpp_frame);
mpp_frame_set_width(mpp_frame, width_);
mpp_frame_set_height(mpp_frame, height_);
mpp_frame_set_hor_stride(mpp_frame, hor_stride_);
mpp_frame_set_ver_stride(mpp_frame, ver_stride_);
mpp_frame_set_fmt(mpp_frame, mpp_fmt_);
mpp_frame_set_pts(mpp_frame, static_cast<int64_t>(frame->pts));
mpp_frame_set_buffer(mpp_frame, buf);
if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) {
mpp_frame_deinit(&mpp_frame);
mpp_buffer_put(buf);
return false;
}
mpp_frame_deinit(&mpp_frame);
while (true) {
MppPacket packet = nullptr;
MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet);
if (ret == MPP_ERR_TIMEOUT) {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
continue;
}
if (ret != MPP_OK || !packet) break;
EncodedPacket out;
auto* pos = static_cast<uint8_t*>(mpp_packet_get_pos(packet));
size_t len = mpp_packet_get_length(packet);
out.data.assign(pos, pos + len);
// MPP_PACKET_FLAG_INTRA = 0x08, also check NAL type for H264 IDR (type 5)
RK_U32 flag = mpp_packet_get_flag(packet);
out.key = (flag & 0x08) != 0; // MPP_PACKET_FLAG_INTRA
// Fallback: check H264 NAL unit type if MPP flag not set
if (!out.key && len >= 5) {
// Find NAL start code and check type
for (size_t i = 0; i + 4 < len; ++i) {
if (pos[i] == 0 && pos[i+1] == 0 && pos[i+2] == 0 && pos[i+3] == 1) {
uint8_t nal_type = pos[i+4] & 0x1F;
if (nal_type == 5 || nal_type == 7) { // IDR or SPS
out.key = true;
break;
}
} else if (pos[i] == 0 && pos[i+1] == 0 && pos[i+2] == 1) {
uint8_t nal_type = pos[i+3] & 0x1F;
if (nal_type == 5 || nal_type == 7) {
out.key = true;
break;
}
}
}
}
int64_t mpp_pts = mpp_packet_get_pts(packet);
// Use encoder's pts if valid, otherwise compute from frame count
out.pts_ms = mpp_pts > 0 ? mpp_pts : static_cast<int64_t>(frame_count_ * 1000 / fps_);
++frame_count_;
if (on_packet) on_packet(out);
mpp_packet_deinit(&packet);
break;
}
mpp_buffer_put(buf);
(void)imported;
return true;
}
const std::vector<uint8_t>& Header() const { return header_; }
private:
size_t CalcFrameSize() const {
if (mpp_fmt_ == MPP_FMT_YUV420SP || mpp_fmt_ == MPP_FMT_YUV420P) {
return static_cast<size_t>(hor_stride_) * ver_stride_ * 3 / 2;
}
return 0;
}
bool CopyToBuffer(const FramePtr& frame, MppBuffer buf) const {
auto* dst = static_cast<uint8_t*>(mpp_buffer_get_ptr(buf));
size_t dst_size = mpp_buffer_get_size(buf);
size_t need = CalcFrameSize();
if (!dst || dst_size < need) return false;
std::memset(dst, 0, dst_size);
if (frame->plane_count < 2) return false;
const FramePlane& y = frame->planes[0];
const FramePlane& uv = frame->planes[1];
if (!y.data || !uv.data) return false;
for (int row = 0; row < height_; ++row) {
std::memcpy(dst + row * hor_stride_, y.data + row * y.stride,
std::min(hor_stride_, y.stride));
}
uint8_t* dst_uv = dst + hor_stride_ * ver_stride_;
int uv_rows = ver_stride_ / 2;
if (mpp_fmt_ == MPP_FMT_YUV420SP) {
for (int row = 0; row < uv_rows; ++row) {
std::memcpy(dst_uv + row * hor_stride_, uv.data + row * uv.stride,
std::min(hor_stride_, uv.stride));
}
} else if (frame->plane_count >= 3) {
const FramePlane& v = frame->planes[2];
uint8_t* dst_u = dst_uv;
uint8_t* dst_v = dst_uv + (hor_stride_ / 2) * uv_rows;
for (int row = 0; row < uv_rows; ++row) {
std::memcpy(dst_u + row * (hor_stride_ / 2), uv.data + row * uv.stride,
std::min(hor_stride_ / 2, uv.stride));
std::memcpy(dst_v + row * (hor_stride_ / 2), v.data + row * v.stride,
std::min(hor_stride_ / 2, v.stride));
}
}
return true;
}
MppCtx ctx_ = nullptr;
MppApi* mpi_ = nullptr;
MppEncCfg cfg_ = nullptr;
MppBufferGroup frm_grp_ = nullptr;
MppCodingType coding_ = MPP_VIDEO_CodingAVC;
MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP;
int width_ = 0;
int height_ = 0;
int hor_stride_ = 0;
int ver_stride_ = 0;
int fps_ = 25;
int gop_ = 50;
int bitrate_bps_ = 4000000;
std::vector<uint8_t> header_;
bool initialized_ = false;
uint64_t frame_count_ = 0;
};
#endif
class PublishNode : public INode {
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "publish"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "publish");
input_queue_ = ctx.input_queue;
codec_ = config.ValueOr<std::string>("codec", "h264");
fps_ = config.ValueOr<int>("fps", 25);
gop_ = config.ValueOr<int>("gop", 50);
bitrate_kbps_ = config.ValueOr<int>("bitrate_kbps", 4000);
use_mpp_ = config.ValueOr<bool>("use_mpp", true);
use_ffmpeg_mux_ = config.ValueOr<bool>("use_ffmpeg_mux", true);
const SimpleJson* outputs = config.Find("outputs");
if (outputs && outputs->IsArray()) {
for (const auto& o : outputs->AsArray()) {
if (!o.IsObject()) continue;
OutputConfig cfg;
cfg.proto = o.ValueOr<std::string>("proto", cfg.proto);
cfg.port = o.ValueOr<int>("port", cfg.port);
cfg.path = o.ValueOr<std::string>("path", cfg.path);
cfg.segment_sec = o.ValueOr<int>("segment_sec", cfg.segment_sec);
outputs_.push_back(std::move(cfg));
}
}
if (outputs_.empty()) {
outputs_.push_back(OutputConfig{});
}
#if !defined(RK3588_ENABLE_MPP)
use_mpp_ = false;
#endif
#if !defined(RK3588_ENABLE_FFMPEG)
use_ffmpeg_mux_ = false;
#endif
if (!input_queue_) {
std::cerr << "[publish] no input queue for node " << id_ << "\n";
return false;
}
return true;
}
bool Start() override {
if (!input_queue_) return false;
running_.store(true);
#if defined(RK3588_ENABLE_MPP)
if (use_mpp_) {
worker_ = std::thread(&PublishNode::LoopMpp, this);
} else {
worker_ = std::thread(&PublishNode::LoopStub, this);
}
#else
worker_ = std::thread(&PublishNode::LoopStub, this);
#endif
std::cout << "[publish] start codec=" << codec_ << " fps=" << fps_ << " gop=" << gop_
<< " bitrate=" << bitrate_kbps_ << "kbps"
<< (use_mpp_ ? " (mpp venc)" : " (stub)") << "\n";
return true;
}
void Stop() override {
running_.store(false);
if (input_queue_) input_queue_->Stop();
if (worker_.joinable()) worker_.join();
}
private:
void LoopStub() {
using namespace std::chrono;
FramePtr frame;
while (running_.load()) {
if (!input_queue_->Pop(frame, milliseconds(200))) continue;
++encoded_frames_;
if (encoded_frames_ % 100 == 0 && frame) {
std::cout << "[publish] stub frame " << frame->frame_id
<< " queue=" << input_queue_->Size()
<< " drops=" << input_queue_->DroppedCount() << "\n";
}
}
}
#if defined(RK3588_ENABLE_MPP)
void LoopMpp() {
using namespace std::chrono;
FramePtr frame;
MppVencEncoder encoder;
bool encoder_ready = false;
#if defined(RK3588_ENABLE_FFMPEG)
AvMuxerManager mux_mgr;
#endif
while (running_.load()) {
if (!input_queue_->Pop(frame, milliseconds(200))) continue;
if (!frame) continue;
if (!encoder_ready) {
if (!encoder.InitFromFrame(*frame, codec_, fps_, gop_, bitrate_kbps_)) {
std::cerr << "[publish] encoder init failed, fallback to stub" << "\n";
LoopStub();
return;
}
#if defined(RK3588_ENABLE_FFMPEG)
if (use_ffmpeg_mux_) {
AVCodecID cid = (codec_ == "h265" || codec_ == "hevc") ? AV_CODEC_ID_HEVC
: AV_CODEC_ID_H264;
mux_mgr.Init(outputs_, cid, frame->width, frame->height, fps_, encoder.Header());
}
#endif
encoder_ready = true;
}
encoder.Encode(frame, [&](const EncodedPacket& pkt) {
++encoded_frames_;
if (encoded_frames_ % 100 == 0) {
std::cout << "[publish] encoded frame " << encoded_frames_
<< " queue=" << input_queue_->Size()
<< " drops=" << input_queue_->DroppedCount() << "\n";
}
#if defined(RK3588_ENABLE_FFMPEG)
if (use_ffmpeg_mux_) mux_mgr.Write(pkt);
#else
(void)pkt;
#endif
});
}
#if defined(RK3588_ENABLE_FFMPEG)
mux_mgr.Close();
#endif
}
#endif
std::string id_;
std::string codec_ = "h264";
int fps_ = 25;
int gop_ = 50;
int bitrate_kbps_ = 4000;
bool use_mpp_ = false;
bool use_ffmpeg_mux_ = false;
std::vector<OutputConfig> outputs_;
std::atomic<bool> running_{false};
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
std::thread worker_;
uint64_t encoded_frames_ = 0;
};
REGISTER_NODE(PublishNode, "publish");
} // namespace rk3588