646 lines
22 KiB
C++
646 lines
22 KiB
C++
#include <algorithm>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <cstring>
|
|
#include <filesystem>
|
|
#include <functional>
|
|
#include <iostream>
|
|
#include <memory>
|
|
#include <string>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
#include "node.h"
|
|
|
|
#if defined(RK3588_ENABLE_FFMPEG)
|
|
extern "C" {
|
|
#include <libavformat/avformat.h>
|
|
#include <libavutil/avutil.h>
|
|
}
|
|
#endif
|
|
|
|
#if defined(RK3588_ENABLE_MPP)
|
|
extern "C" {
|
|
#include <rockchip/mpp_buffer.h>
|
|
#include <rockchip/mpp_packet.h>
|
|
#include <rockchip/rk_mpi.h>
|
|
}
|
|
#endif
|
|
|
|
namespace rk3588 {
|
|
|
|
namespace {
|
|
inline int Align16(int v) { return (v + 15) & ~15; }
|
|
}
|
|
|
|
struct OutputConfig {
|
|
std::string proto = "rtsp"; // rtsp | hls
|
|
int port = 8554;
|
|
std::string path = "/live/stream"; // rtsp: url path; hls: dir or m3u8 path
|
|
int segment_sec = 2;
|
|
};
|
|
|
|
struct EncodedPacket {
|
|
std::vector<uint8_t> data;
|
|
bool key = false;
|
|
int64_t pts_ms = 0;
|
|
};
|
|
|
|
#if defined(RK3588_ENABLE_FFMPEG)
|
|
class AvMuxer {
|
|
public:
|
|
bool Init(const OutputConfig& cfg, AVCodecID codec_id, int width, int height, int fps,
|
|
const std::vector<uint8_t>& extradata) {
|
|
proto_ = cfg.proto.empty() ? "rtsp" : cfg.proto;
|
|
fps_ = fps > 0 ? fps : 25;
|
|
url_ = BuildUrl(cfg);
|
|
avformat_network_init();
|
|
|
|
const char* fmt_name = proto_ == "hls" ? "hls" : "rtsp";
|
|
if (avformat_alloc_output_context2(&fmt_, nullptr, fmt_name, url_.c_str()) < 0 || !fmt_) {
|
|
std::cerr << "[publish] avformat_alloc_output_context2 failed for " << url_ << "\n";
|
|
return false;
|
|
}
|
|
|
|
stream_ = avformat_new_stream(fmt_, nullptr);
|
|
if (!stream_) {
|
|
std::cerr << "[publish] avformat_new_stream failed for " << url_ << "\n";
|
|
Close();
|
|
return false;
|
|
}
|
|
|
|
stream_->time_base = AVRational{1, 1000};
|
|
stream_->avg_frame_rate = AVRational{fps_, 1};
|
|
stream_->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
|
|
stream_->codecpar->codec_id = codec_id;
|
|
stream_->codecpar->width = width;
|
|
stream_->codecpar->height = height;
|
|
stream_->codecpar->format = AV_PIX_FMT_YUV420P;
|
|
stream_->codecpar->codec_tag = 0;
|
|
|
|
if (!extradata.empty()) {
|
|
stream_->codecpar->extradata_size = static_cast<int>(extradata.size());
|
|
stream_->codecpar->extradata =
|
|
static_cast<uint8_t*>(av_mallocz(extradata.size() + AV_INPUT_BUFFER_PADDING_SIZE));
|
|
if (stream_->codecpar->extradata) {
|
|
std::memcpy(stream_->codecpar->extradata, extradata.data(), extradata.size());
|
|
}
|
|
}
|
|
|
|
AVDictionary* opts = nullptr;
|
|
if (proto_ == "rtsp") {
|
|
av_dict_set(&opts, "rtsp_flags", "listen", 0);
|
|
av_dict_set(&opts, "rtsp_transport", "tcp", 0);
|
|
} else if (proto_ == "hls") {
|
|
std::string seg = std::to_string(std::max(1, cfg.segment_sec));
|
|
av_dict_set(&opts, "hls_time", seg.c_str(), 0);
|
|
av_dict_set(&opts, "hls_list_size", "0", 0);
|
|
av_dict_set(&opts, "hls_flags", "delete_segments+append_list", 0);
|
|
// Ensure HLS output directory exists (HLS muxer uses AVFMT_NOFILE)
|
|
std::filesystem::path p(url_);
|
|
if (!p.has_extension()) p /= "index.m3u8";
|
|
std::error_code ec;
|
|
std::filesystem::create_directories(p.parent_path(), ec);
|
|
}
|
|
|
|
// For non-NOFILE formats, open IO context manually.
|
|
if (!(fmt_->oformat->flags & AVFMT_NOFILE)) {
|
|
if (av_io_open(url_, opts) < 0) {
|
|
av_dict_free(&opts);
|
|
Close();
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if (avformat_write_header(fmt_, &opts) < 0) {
|
|
std::cerr << "[publish] avformat_write_header failed for " << url_ << "\n";
|
|
av_dict_free(&opts);
|
|
Close();
|
|
return false;
|
|
}
|
|
av_dict_free(&opts);
|
|
ready_ = true;
|
|
std::cout << "[publish] mux start " << proto_ << " -> " << url_ << "\n";
|
|
return true;
|
|
}
|
|
|
|
bool WriteFrame(const EncodedPacket& pkt) {
|
|
if (!ready_ || !stream_) return false;
|
|
if (pkt.data.empty()) return false;
|
|
|
|
AVPacket out;
|
|
av_init_packet(&out);
|
|
if (av_new_packet(&out, static_cast<int>(pkt.data.size())) < 0) {
|
|
return false;
|
|
}
|
|
std::memcpy(out.data, pkt.data.data(), pkt.data.size());
|
|
out.stream_index = stream_->index;
|
|
out.flags = pkt.key ? AV_PKT_FLAG_KEY : 0;
|
|
out.pts = av_rescale_q(pkt.pts_ms, AVRational{1, 1000}, stream_->time_base);
|
|
out.dts = out.pts;
|
|
out.duration = av_rescale_q(1000 / std::max(1, fps_), AVRational{1, 1000}, stream_->time_base);
|
|
|
|
int ret = av_interleaved_write_frame(fmt_, &out);
|
|
av_packet_unref(&out);
|
|
if (ret < 0 && !warned_) {
|
|
char errbuf[128];
|
|
av_strerror(ret, errbuf, sizeof(errbuf));
|
|
std::cerr << "[publish] av_interleaved_write_frame failed for " << url_ << " ret="
|
|
<< ret << " (" << errbuf << ")\n";
|
|
warned_ = true;
|
|
}
|
|
// Debug: log first few frames pts
|
|
if (write_count_ < 5 || write_count_ % 100 == 0) {
|
|
std::cout << "[publish] hls write frame " << write_count_ << " pts=" << pkt.pts_ms
|
|
<< "ms key=" << pkt.key << " ret=" << ret << "\n";
|
|
}
|
|
++write_count_;
|
|
return ret >= 0;
|
|
}
|
|
|
|
void Close() {
|
|
if (fmt_) {
|
|
if (ready_) av_write_trailer(fmt_);
|
|
if (!(fmt_->oformat->flags & AVFMT_NOFILE) && fmt_->pb) {
|
|
avio_closep(&fmt_->pb);
|
|
}
|
|
avformat_free_context(fmt_);
|
|
}
|
|
fmt_ = nullptr;
|
|
stream_ = nullptr;
|
|
ready_ = false;
|
|
warned_ = false;
|
|
}
|
|
|
|
private:
|
|
int av_io_open(const std::string& url, AVDictionary* opts) {
|
|
if (proto_ == "hls") {
|
|
std::filesystem::path p(url);
|
|
if (!p.has_extension()) p /= "index.m3u8";
|
|
if (!p.is_absolute()) p = std::filesystem::current_path() / p;
|
|
std::error_code ec;
|
|
std::filesystem::create_directories(p.parent_path(), ec);
|
|
resolved_path_ = p.string();
|
|
return avio_open2(&fmt_->pb, resolved_path_.c_str(), AVIO_FLAG_WRITE, nullptr, &opts);
|
|
}
|
|
resolved_path_ = url;
|
|
return avio_open2(&fmt_->pb, url.c_str(), AVIO_FLAG_WRITE, nullptr, &opts);
|
|
}
|
|
|
|
std::string BuildUrl(const OutputConfig& cfg) const {
|
|
if (proto_ == "hls") return cfg.path;
|
|
std::string path = cfg.path;
|
|
if (path.empty() || path[0] != '/') path = "/" + path;
|
|
return "rtsp://0.0.0.0:" + std::to_string(cfg.port) + path;
|
|
}
|
|
|
|
AVFormatContext* fmt_ = nullptr;
|
|
AVStream* stream_ = nullptr;
|
|
std::string url_;
|
|
std::string resolved_path_;
|
|
std::string proto_;
|
|
int fps_ = 25;
|
|
bool ready_ = false;
|
|
bool warned_ = false;
|
|
uint64_t write_count_ = 0;
|
|
};
|
|
|
|
class AvMuxerManager {
|
|
public:
|
|
bool Init(const std::vector<OutputConfig>& outputs, AVCodecID codec_id, int width, int height,
|
|
int fps, const std::vector<uint8_t>& extradata) {
|
|
bool ok = false;
|
|
for (const auto& o : outputs) {
|
|
auto mux = std::make_unique<AvMuxer>();
|
|
if (mux->Init(o, codec_id, width, height, fps, extradata)) {
|
|
muxers_.push_back(std::move(mux));
|
|
ok = true;
|
|
}
|
|
}
|
|
return ok;
|
|
}
|
|
|
|
void Write(const EncodedPacket& pkt) {
|
|
for (auto& m : muxers_) {
|
|
m->WriteFrame(pkt);
|
|
}
|
|
}
|
|
|
|
void Close() {
|
|
for (auto& m : muxers_) m->Close();
|
|
muxers_.clear();
|
|
}
|
|
|
|
private:
|
|
std::vector<std::unique_ptr<AvMuxer>> muxers_;
|
|
};
|
|
#endif
|
|
|
|
#if defined(RK3588_ENABLE_MPP)
|
|
class MppVencEncoder {
|
|
public:
|
|
~MppVencEncoder() { Shutdown(); }
|
|
|
|
bool InitFromFrame(const Frame& src, const std::string& codec, int fps, int gop,
|
|
int bitrate_kbps) {
|
|
width_ = src.width;
|
|
height_ = src.height;
|
|
hor_stride_ = src.planes[0].stride > 0 ? src.planes[0].stride
|
|
: (src.stride > 0 ? src.stride : Align16(src.width));
|
|
ver_stride_ = Align16(src.height);
|
|
fps_ = fps > 0 ? fps : 25;
|
|
gop_ = gop > 0 ? gop : fps_ * 2;
|
|
bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 4000) * 1000;
|
|
|
|
if (src.format == PixelFormat::NV12) {
|
|
mpp_fmt_ = MPP_FMT_YUV420SP;
|
|
} else if (src.format == PixelFormat::YUV420) {
|
|
mpp_fmt_ = MPP_FMT_YUV420P;
|
|
} else {
|
|
std::cerr << "[publish] unsupported pixel format for mpp encoder\n";
|
|
return false;
|
|
}
|
|
|
|
if (codec == "h265" || codec == "hevc") {
|
|
coding_ = MPP_VIDEO_CodingHEVC;
|
|
} else {
|
|
coding_ = MPP_VIDEO_CodingAVC;
|
|
}
|
|
|
|
if (mpp_create(&ctx_, &mpi_) != MPP_OK) {
|
|
std::cerr << "[publish] mpp_create failed\n";
|
|
return false;
|
|
}
|
|
if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) {
|
|
std::cerr << "[publish] mpp_init enc failed\n";
|
|
return false;
|
|
}
|
|
|
|
if (mpp_enc_cfg_init(&cfg_) != MPP_OK) {
|
|
std::cerr << "[publish] mpp_enc_cfg_init failed\n";
|
|
return false;
|
|
}
|
|
if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) {
|
|
std::cerr << "[publish] MPP_ENC_GET_CFG failed\n";
|
|
return false;
|
|
}
|
|
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:width", width_);
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:height", height_);
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_);
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_);
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_);
|
|
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:gop", gop_);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2);
|
|
mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_);
|
|
|
|
if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) {
|
|
std::cerr << "[publish] MPP_ENC_SET_CFG failed\n";
|
|
return false;
|
|
}
|
|
|
|
RK_S32 timeout = 2000;
|
|
mpi_->control(ctx_, MPP_SET_OUTPUT_TIMEOUT, &timeout);
|
|
|
|
MppPacket hdr = nullptr;
|
|
if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) {
|
|
auto* data = static_cast<uint8_t*>(mpp_packet_get_pos(hdr));
|
|
size_t len = mpp_packet_get_length(hdr);
|
|
header_.assign(data, data + len);
|
|
mpp_packet_deinit(&hdr);
|
|
}
|
|
|
|
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM) != MPP_OK) {
|
|
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL) != MPP_OK) {
|
|
std::cerr << "[publish] mpp_buffer_group_get_internal failed\n";
|
|
return false;
|
|
}
|
|
}
|
|
initialized_ = true;
|
|
return true;
|
|
}
|
|
|
|
void Shutdown() {
|
|
if (frm_grp_) {
|
|
mpp_buffer_group_put(frm_grp_);
|
|
frm_grp_ = nullptr;
|
|
}
|
|
if (cfg_) {
|
|
mpp_enc_cfg_deinit(cfg_);
|
|
cfg_ = nullptr;
|
|
}
|
|
if (ctx_) {
|
|
if (mpi_) mpi_->reset(ctx_);
|
|
mpp_destroy(ctx_);
|
|
ctx_ = nullptr;
|
|
mpi_ = nullptr;
|
|
}
|
|
initialized_ = false;
|
|
}
|
|
|
|
bool Encode(const FramePtr& frame, const std::function<void(const EncodedPacket&)>& on_packet) {
|
|
if (!initialized_) return false;
|
|
if (!frame) return false;
|
|
|
|
MppBuffer buf = nullptr;
|
|
bool imported = false;
|
|
size_t frame_size = CalcFrameSize();
|
|
|
|
if (frame->dma_fd >= 0 && frame->data_size > 0) {
|
|
MppBufferInfo info{};
|
|
info.type = MPP_BUFFER_TYPE_EXT_DMA;
|
|
info.size = frame->data_size;
|
|
info.fd = frame->dma_fd;
|
|
if (mpp_buffer_import(&buf, &info) == MPP_OK) {
|
|
imported = true;
|
|
}
|
|
}
|
|
|
|
if (!buf) {
|
|
if (!frame->data) return false;
|
|
if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) {
|
|
return false;
|
|
}
|
|
if (!CopyToBuffer(frame, buf)) {
|
|
mpp_buffer_put(buf);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
MppFrame mpp_frame = nullptr;
|
|
mpp_frame_init(&mpp_frame);
|
|
mpp_frame_set_width(mpp_frame, width_);
|
|
mpp_frame_set_height(mpp_frame, height_);
|
|
mpp_frame_set_hor_stride(mpp_frame, hor_stride_);
|
|
mpp_frame_set_ver_stride(mpp_frame, ver_stride_);
|
|
mpp_frame_set_fmt(mpp_frame, mpp_fmt_);
|
|
mpp_frame_set_pts(mpp_frame, static_cast<int64_t>(frame->pts));
|
|
mpp_frame_set_buffer(mpp_frame, buf);
|
|
|
|
if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) {
|
|
mpp_frame_deinit(&mpp_frame);
|
|
mpp_buffer_put(buf);
|
|
return false;
|
|
}
|
|
mpp_frame_deinit(&mpp_frame);
|
|
|
|
while (true) {
|
|
MppPacket packet = nullptr;
|
|
MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet);
|
|
if (ret == MPP_ERR_TIMEOUT) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
|
continue;
|
|
}
|
|
if (ret != MPP_OK || !packet) break;
|
|
|
|
EncodedPacket out;
|
|
auto* pos = static_cast<uint8_t*>(mpp_packet_get_pos(packet));
|
|
size_t len = mpp_packet_get_length(packet);
|
|
out.data.assign(pos, pos + len);
|
|
out.key = (mpp_packet_get_flag(packet) & 0x10) != 0; // INTRA flag
|
|
int64_t mpp_pts = mpp_packet_get_pts(packet);
|
|
// Use encoder's pts if valid, otherwise compute from frame count
|
|
out.pts_ms = mpp_pts > 0 ? mpp_pts : static_cast<int64_t>(frame_count_ * 1000 / fps_);
|
|
++frame_count_;
|
|
if (on_packet) on_packet(out);
|
|
mpp_packet_deinit(&packet);
|
|
break;
|
|
}
|
|
|
|
mpp_buffer_put(buf);
|
|
(void)imported;
|
|
return true;
|
|
}
|
|
|
|
const std::vector<uint8_t>& Header() const { return header_; }
|
|
|
|
private:
|
|
size_t CalcFrameSize() const {
|
|
if (mpp_fmt_ == MPP_FMT_YUV420SP || mpp_fmt_ == MPP_FMT_YUV420P) {
|
|
return static_cast<size_t>(hor_stride_) * ver_stride_ * 3 / 2;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
bool CopyToBuffer(const FramePtr& frame, MppBuffer buf) const {
|
|
auto* dst = static_cast<uint8_t*>(mpp_buffer_get_ptr(buf));
|
|
size_t dst_size = mpp_buffer_get_size(buf);
|
|
size_t need = CalcFrameSize();
|
|
if (!dst || dst_size < need) return false;
|
|
std::memset(dst, 0, dst_size);
|
|
|
|
if (frame->plane_count < 2) return false;
|
|
const FramePlane& y = frame->planes[0];
|
|
const FramePlane& uv = frame->planes[1];
|
|
if (!y.data || !uv.data) return false;
|
|
|
|
for (int row = 0; row < height_; ++row) {
|
|
std::memcpy(dst + row * hor_stride_, y.data + row * y.stride,
|
|
std::min(hor_stride_, y.stride));
|
|
}
|
|
|
|
uint8_t* dst_uv = dst + hor_stride_ * ver_stride_;
|
|
int uv_rows = ver_stride_ / 2;
|
|
if (mpp_fmt_ == MPP_FMT_YUV420SP) {
|
|
for (int row = 0; row < uv_rows; ++row) {
|
|
std::memcpy(dst_uv + row * hor_stride_, uv.data + row * uv.stride,
|
|
std::min(hor_stride_, uv.stride));
|
|
}
|
|
} else if (frame->plane_count >= 3) {
|
|
const FramePlane& v = frame->planes[2];
|
|
uint8_t* dst_u = dst_uv;
|
|
uint8_t* dst_v = dst_uv + (hor_stride_ / 2) * uv_rows;
|
|
for (int row = 0; row < uv_rows; ++row) {
|
|
std::memcpy(dst_u + row * (hor_stride_ / 2), uv.data + row * uv.stride,
|
|
std::min(hor_stride_ / 2, uv.stride));
|
|
std::memcpy(dst_v + row * (hor_stride_ / 2), v.data + row * v.stride,
|
|
std::min(hor_stride_ / 2, v.stride));
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
MppCtx ctx_ = nullptr;
|
|
MppApi* mpi_ = nullptr;
|
|
MppEncCfg cfg_ = nullptr;
|
|
MppBufferGroup frm_grp_ = nullptr;
|
|
MppCodingType coding_ = MPP_VIDEO_CodingAVC;
|
|
MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP;
|
|
int width_ = 0;
|
|
int height_ = 0;
|
|
int hor_stride_ = 0;
|
|
int ver_stride_ = 0;
|
|
int fps_ = 25;
|
|
int gop_ = 50;
|
|
int bitrate_bps_ = 4000000;
|
|
std::vector<uint8_t> header_;
|
|
bool initialized_ = false;
|
|
uint64_t frame_count_ = 0;
|
|
};
|
|
#endif
|
|
|
|
class PublishNode : public INode {
|
|
public:
|
|
std::string Id() const override { return id_; }
|
|
std::string Type() const override { return "publish"; }
|
|
|
|
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
|
|
id_ = config.ValueOr<std::string>("id", "publish");
|
|
input_queue_ = ctx.input_queue;
|
|
codec_ = config.ValueOr<std::string>("codec", "h264");
|
|
fps_ = config.ValueOr<int>("fps", 25);
|
|
gop_ = config.ValueOr<int>("gop", 50);
|
|
bitrate_kbps_ = config.ValueOr<int>("bitrate_kbps", 4000);
|
|
use_mpp_ = config.ValueOr<bool>("use_mpp", true);
|
|
use_ffmpeg_mux_ = config.ValueOr<bool>("use_ffmpeg_mux", true);
|
|
|
|
const SimpleJson* outputs = config.Find("outputs");
|
|
if (outputs && outputs->IsArray()) {
|
|
for (const auto& o : outputs->AsArray()) {
|
|
if (!o.IsObject()) continue;
|
|
OutputConfig cfg;
|
|
cfg.proto = o.ValueOr<std::string>("proto", cfg.proto);
|
|
cfg.port = o.ValueOr<int>("port", cfg.port);
|
|
cfg.path = o.ValueOr<std::string>("path", cfg.path);
|
|
cfg.segment_sec = o.ValueOr<int>("segment_sec", cfg.segment_sec);
|
|
outputs_.push_back(std::move(cfg));
|
|
}
|
|
}
|
|
if (outputs_.empty()) {
|
|
outputs_.push_back(OutputConfig{});
|
|
}
|
|
|
|
#if !defined(RK3588_ENABLE_MPP)
|
|
use_mpp_ = false;
|
|
#endif
|
|
#if !defined(RK3588_ENABLE_FFMPEG)
|
|
use_ffmpeg_mux_ = false;
|
|
#endif
|
|
|
|
if (!input_queue_) {
|
|
std::cerr << "[publish] no input queue for node " << id_ << "\n";
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool Start() override {
|
|
if (!input_queue_) return false;
|
|
running_.store(true);
|
|
|
|
#if defined(RK3588_ENABLE_MPP)
|
|
if (use_mpp_) {
|
|
worker_ = std::thread(&PublishNode::LoopMpp, this);
|
|
} else {
|
|
worker_ = std::thread(&PublishNode::LoopStub, this);
|
|
}
|
|
#else
|
|
worker_ = std::thread(&PublishNode::LoopStub, this);
|
|
#endif
|
|
|
|
std::cout << "[publish] start codec=" << codec_ << " fps=" << fps_ << " gop=" << gop_
|
|
<< " bitrate=" << bitrate_kbps_ << "kbps"
|
|
<< (use_mpp_ ? " (mpp venc)" : " (stub)") << "\n";
|
|
return true;
|
|
}
|
|
|
|
void Stop() override {
|
|
running_.store(false);
|
|
if (input_queue_) input_queue_->Stop();
|
|
if (worker_.joinable()) worker_.join();
|
|
}
|
|
|
|
private:
|
|
void LoopStub() {
|
|
using namespace std::chrono;
|
|
FramePtr frame;
|
|
while (running_.load()) {
|
|
if (!input_queue_->Pop(frame, milliseconds(200))) continue;
|
|
++encoded_frames_;
|
|
if (encoded_frames_ % 100 == 0 && frame) {
|
|
std::cout << "[publish] stub frame " << frame->frame_id
|
|
<< " queue=" << input_queue_->Size()
|
|
<< " drops=" << input_queue_->DroppedCount() << "\n";
|
|
}
|
|
}
|
|
}
|
|
|
|
#if defined(RK3588_ENABLE_MPP)
|
|
void LoopMpp() {
|
|
using namespace std::chrono;
|
|
FramePtr frame;
|
|
MppVencEncoder encoder;
|
|
bool encoder_ready = false;
|
|
|
|
#if defined(RK3588_ENABLE_FFMPEG)
|
|
AvMuxerManager mux_mgr;
|
|
#endif
|
|
|
|
while (running_.load()) {
|
|
if (!input_queue_->Pop(frame, milliseconds(200))) continue;
|
|
if (!frame) continue;
|
|
|
|
if (!encoder_ready) {
|
|
if (!encoder.InitFromFrame(*frame, codec_, fps_, gop_, bitrate_kbps_)) {
|
|
std::cerr << "[publish] encoder init failed, fallback to stub" << "\n";
|
|
LoopStub();
|
|
return;
|
|
}
|
|
|
|
#if defined(RK3588_ENABLE_FFMPEG)
|
|
if (use_ffmpeg_mux_) {
|
|
AVCodecID cid = (codec_ == "h265" || codec_ == "hevc") ? AV_CODEC_ID_HEVC
|
|
: AV_CODEC_ID_H264;
|
|
mux_mgr.Init(outputs_, cid, frame->width, frame->height, fps_, encoder.Header());
|
|
}
|
|
#endif
|
|
encoder_ready = true;
|
|
}
|
|
|
|
encoder.Encode(frame, [&](const EncodedPacket& pkt) {
|
|
++encoded_frames_;
|
|
if (encoded_frames_ % 100 == 0) {
|
|
std::cout << "[publish] encoded frame " << encoded_frames_
|
|
<< " queue=" << input_queue_->Size()
|
|
<< " drops=" << input_queue_->DroppedCount() << "\n";
|
|
}
|
|
#if defined(RK3588_ENABLE_FFMPEG)
|
|
if (use_ffmpeg_mux_) mux_mgr.Write(pkt);
|
|
#else
|
|
(void)pkt;
|
|
#endif
|
|
});
|
|
}
|
|
|
|
#if defined(RK3588_ENABLE_FFMPEG)
|
|
mux_mgr.Close();
|
|
#endif
|
|
}
|
|
#endif
|
|
|
|
std::string id_;
|
|
std::string codec_ = "h264";
|
|
int fps_ = 25;
|
|
int gop_ = 50;
|
|
int bitrate_kbps_ = 4000;
|
|
bool use_mpp_ = false;
|
|
bool use_ffmpeg_mux_ = false;
|
|
std::vector<OutputConfig> outputs_;
|
|
std::atomic<bool> running_{false};
|
|
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
|
|
std::thread worker_;
|
|
uint64_t encoded_frames_ = 0;
|
|
};
|
|
|
|
REGISTER_NODE(PublishNode, "publish");
|
|
|
|
} // namespace rk3588
|