OrangePi3588Media/plugins/storage/storage_node.cpp
2026-01-16 21:18:18 +08:00

821 lines
29 KiB
C++

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstring>
#include <ctime>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <memory>
#include <sstream>
#include <thread>
#include <mutex>
#include "utils/logger.h"
#include "node.h"
#include "hw/hw_factory.h"
#include "media/encoded_video_meta.h"
#if defined(RK3588_ENABLE_FFMPEG)
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
#include <libswscale/swscale.h>
}
#define HAS_FFMPEG 1
#else
#define HAS_FFMPEG 0
#endif
#if defined(RK3588_ENABLE_MPP)
extern "C" {
#include <rockchip/mpp_buffer.h>
#include <rockchip/mpp_packet.h>
#include <rockchip/rk_mpi.h>
}
#define HAS_MPP 1
#else
#define HAS_MPP 0
#endif
namespace rk3588 {
namespace {
inline int Align16(int v) { return (v + 15) & ~15; }
std::shared_ptr<EncodedVideoFrameMeta> TryGetEncodedMeta(const FramePtr& frame) {
if (!frame || !frame->user_meta) return nullptr;
auto meta = std::static_pointer_cast<EncodedVideoFrameMeta>(frame->user_meta);
if (!meta || meta->magic != EncodedVideoFrameMeta::kMagic) return nullptr;
if (!meta->codec || meta->pkt.data.empty() || meta->pkt.pts_ms <= 0) return nullptr;
return meta;
}
#if HAS_FFMPEG
static bool HasAnnexBStartCode(const uint8_t* d, size_t n) {
if (!d || n < 4) return false;
for (size_t i = 0; i + 3 < n; ++i) {
if (d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 1) return true;
if (i + 4 < n && d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 0 && d[i + 3] == 1) return true;
}
return false;
}
static size_t FindStartCode(const uint8_t* d, size_t n, size_t from, size_t* sc_len) {
for (size_t i = from; i + 3 < n; ++i) {
if (d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 1) {
if (sc_len) *sc_len = 3;
return i;
}
if (i + 4 < n && d[i] == 0 && d[i + 1] == 0 && d[i + 2] == 0 && d[i + 3] == 1) {
if (sc_len) *sc_len = 4;
return i;
}
}
if (sc_len) *sc_len = 0;
return n;
}
static std::vector<std::vector<uint8_t>> SplitAnnexBNals(const uint8_t* d, size_t n) {
std::vector<std::vector<uint8_t>> out;
if (!d || n < 4) return out;
size_t pos = 0;
while (true) {
size_t sc_len = 0;
size_t sc = FindStartCode(d, n, pos, &sc_len);
if (sc >= n) break;
size_t nal_start = sc + sc_len;
size_t next_sc = FindStartCode(d, n, nal_start, nullptr);
size_t nal_end = (next_sc >= n) ? n : next_sc;
while (nal_end > nal_start && d[nal_end - 1] == 0) --nal_end;
if (nal_end > nal_start) {
out.emplace_back(d + nal_start, d + nal_end);
}
pos = nal_end;
}
return out;
}
static bool BuildAvccFromAnnexB(const std::vector<uint8_t>& annexb, std::vector<uint8_t>& avcc_out) {
avcc_out.clear();
if (annexb.empty()) return false;
auto nals = SplitAnnexBNals(annexb.data(), annexb.size());
const uint8_t* sps = nullptr;
size_t sps_len = 0;
const uint8_t* pps = nullptr;
size_t pps_len = 0;
for (const auto& nal : nals) {
if (nal.empty()) continue;
const uint8_t t = nal[0] & 0x1F;
if (t == 7 && !sps) {
sps = nal.data();
sps_len = nal.size();
} else if (t == 8 && !pps) {
pps = nal.data();
pps_len = nal.size();
}
}
if (!sps || sps_len < 4 || !pps || pps_len < 1) return false;
avcc_out.reserve(11 + sps_len + pps_len);
avcc_out.push_back(1);
avcc_out.push_back(sps[1]);
avcc_out.push_back(sps[2]);
avcc_out.push_back(sps[3]);
avcc_out.push_back(0xFF);
avcc_out.push_back(0xE1);
avcc_out.push_back(static_cast<uint8_t>((sps_len >> 8) & 0xFF));
avcc_out.push_back(static_cast<uint8_t>((sps_len) & 0xFF));
avcc_out.insert(avcc_out.end(), sps, sps + sps_len);
avcc_out.push_back(1);
avcc_out.push_back(static_cast<uint8_t>((pps_len >> 8) & 0xFF));
avcc_out.push_back(static_cast<uint8_t>((pps_len) & 0xFF));
avcc_out.insert(avcc_out.end(), pps, pps + pps_len);
return true;
}
static std::vector<uint8_t> AnnexBToLengthPrefixed(const uint8_t* d, size_t n) {
std::vector<uint8_t> out;
auto nals = SplitAnnexBNals(d, n);
size_t total = 0;
for (const auto& nal : nals) total += 4 + nal.size();
out.reserve(total);
for (const auto& nal : nals) {
uint32_t len = static_cast<uint32_t>(nal.size());
out.push_back(static_cast<uint8_t>((len >> 24) & 0xFF));
out.push_back(static_cast<uint8_t>((len >> 16) & 0xFF));
out.push_back(static_cast<uint8_t>((len >> 8) & 0xFF));
out.push_back(static_cast<uint8_t>((len) & 0xFF));
out.insert(out.end(), nal.begin(), nal.end());
}
return out;
}
#endif
bool SafeLocalTime(std::time_t t, std::tm& out) {
#if defined(_WIN32)
return localtime_s(&out, &t) == 0;
#elif defined(__unix__) || defined(__APPLE__)
return localtime_r(&t, &out) != nullptr;
#else
static std::mutex mu;
std::lock_guard<std::mutex> lock(mu);
std::tm* p = std::localtime(&t);
if (!p) return false;
out = *p;
return true;
#endif
}
std::string FormatTime(const std::string& pattern) {
auto now = std::chrono::system_clock::now();
auto time_t_now = std::chrono::system_clock::to_time_t(now);
std::tm tm{};
if (!SafeLocalTime(time_t_now, tm)) {
LogWarn("[storage] localtime failed while formatting filename");
}
std::ostringstream oss;
for (size_t i = 0; i < pattern.size(); ++i) {
if (pattern[i] == '%' && i + 1 < pattern.size()) {
char spec = pattern[++i];
switch (spec) {
case 'Y': oss << std::put_time(&tm, "%Y"); break;
case 'm': oss << std::put_time(&tm, "%m"); break;
case 'd': oss << std::put_time(&tm, "%d"); break;
case 'H': oss << std::put_time(&tm, "%H"); break;
case 'M': oss << std::put_time(&tm, "%M"); break;
case 'S': oss << std::put_time(&tm, "%S"); break;
default: oss << '%' << spec; break;
}
} else {
oss << pattern[i];
}
}
return oss.str();
}
} // namespace
#if HAS_MPP
class MppStorageEncoder {
public:
~MppStorageEncoder() { Shutdown(); }
bool Init(int width, int height, PixelFormat fmt, const std::string& codec,
int fps, int bitrate_kbps) {
width_ = width;
height_ = height;
hor_stride_ = Align16(width);
ver_stride_ = Align16(height);
fps_ = fps > 0 ? fps : 25;
bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 2000) * 1000;
if (fmt == PixelFormat::NV12) {
mpp_fmt_ = MPP_FMT_YUV420SP;
} else if (fmt == PixelFormat::YUV420) {
mpp_fmt_ = MPP_FMT_YUV420P;
} else {
LogError("[storage] unsupported pixel format for MPP encoder");
return false;
}
coding_ = (codec == "h265" || codec == "hevc") ? MPP_VIDEO_CodingHEVC
: MPP_VIDEO_CodingAVC;
if (mpp_create(&ctx_, &mpi_) != MPP_OK) return false;
if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) return false;
if (mpp_enc_cfg_init(&cfg_) != MPP_OK) return false;
if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) return false;
mpp_enc_cfg_set_s32(cfg_, "prep:width", width_);
mpp_enc_cfg_set_s32(cfg_, "prep:height", height_);
mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_);
mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_);
mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_);
mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR);
mpp_enc_cfg_set_s32(cfg_, "rc:gop", fps_ * 2);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2);
mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_);
if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) return false;
MppEncHeaderMode header_mode = MPP_ENC_HEADER_MODE_EACH_IDR;
mpi_->control(ctx_, MPP_ENC_SET_HEADER_MODE, &header_mode);
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM, NULL) != MPP_OK) {
mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL, NULL);
}
// Get header
MppPacket hdr = nullptr;
if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) {
auto* data = static_cast<uint8_t*>(mpp_packet_get_pos(hdr));
size_t len = mpp_packet_get_length(hdr);
header_.assign(data, data + len);
mpp_packet_deinit(&hdr);
}
initialized_ = true;
return true;
}
void Shutdown() {
if (frm_grp_) { mpp_buffer_group_put(frm_grp_); frm_grp_ = nullptr; }
if (cfg_) { mpp_enc_cfg_deinit(cfg_); cfg_ = nullptr; }
if (ctx_) {
if (mpi_) mpi_->reset(ctx_);
mpp_destroy(ctx_);
ctx_ = nullptr; mpi_ = nullptr;
}
initialized_ = false;
}
bool Encode(const FramePtr& frame, std::vector<uint8_t>& out, bool& is_key) {
out.clear();
is_key = false;
if (!initialized_ || !frame) return false;
size_t frame_size = static_cast<size_t>(hor_stride_) * ver_stride_ * 3 / 2;
MppBuffer buf = nullptr;
if (frame->DmaFd() >= 0 && frame->data_size > 0) {
MppBufferInfo info{};
info.type = MPP_BUFFER_TYPE_EXT_DMA;
info.size = frame->data_size;
info.fd = frame->DmaFd();
mpp_buffer_import(&buf, &info);
}
if (!buf) {
if (!frame->data) return false;
if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) return false;
auto* dst = static_cast<uint8_t*>(mpp_buffer_get_ptr(buf));
std::memset(dst, 0, frame_size);
// Copy Y plane
for (int row = 0; row < height_; ++row) {
const uint8_t* src = frame->planes[0].data
? frame->planes[0].data + row * frame->planes[0].stride
: frame->data + row * frame->width;
std::memcpy(dst + row * hor_stride_, src, width_);
}
// Copy UV plane
uint8_t* dst_uv = dst + hor_stride_ * ver_stride_;
int uv_rows = height_ / 2;
for (int row = 0; row < uv_rows; ++row) {
const uint8_t* src = frame->planes[1].data
? frame->planes[1].data + row * frame->planes[1].stride
: frame->data + width_ * height_ + row * width_;
std::memcpy(dst_uv + row * hor_stride_, src, width_);
}
}
MppFrame mpp_frame = nullptr;
mpp_frame_init(&mpp_frame);
mpp_frame_set_width(mpp_frame, width_);
mpp_frame_set_height(mpp_frame, height_);
mpp_frame_set_hor_stride(mpp_frame, hor_stride_);
mpp_frame_set_ver_stride(mpp_frame, ver_stride_);
mpp_frame_set_fmt(mpp_frame, mpp_fmt_);
mpp_frame_set_pts(mpp_frame, frame->pts);
mpp_frame_set_buffer(mpp_frame, buf);
if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) {
mpp_frame_deinit(&mpp_frame);
mpp_buffer_put(buf);
return false;
}
mpp_frame_deinit(&mpp_frame);
MppPacket packet = nullptr;
while (true) {
MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet);
if (ret == MPP_ERR_TIMEOUT) {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
continue;
}
if (ret != MPP_OK || !packet) break;
auto* pos = static_cast<uint8_t*>(mpp_packet_get_pos(packet));
size_t len = mpp_packet_get_length(packet);
out.assign(pos, pos + len);
RK_U32 flag = mpp_packet_get_flag(packet);
is_key = (flag & 0x08) != 0;
mpp_packet_deinit(&packet);
break;
}
mpp_buffer_put(buf);
return !out.empty();
}
const std::vector<uint8_t>& Header() const { return header_; }
bool IsH265() const { return coding_ == MPP_VIDEO_CodingHEVC; }
private:
MppCtx ctx_ = nullptr;
MppApi* mpi_ = nullptr;
MppEncCfg cfg_ = nullptr;
MppBufferGroup frm_grp_ = nullptr;
MppCodingType coding_ = MPP_VIDEO_CodingAVC;
MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP;
int width_ = 0, height_ = 0;
int hor_stride_ = 0, ver_stride_ = 0;
int fps_ = 25;
int bitrate_bps_ = 2000000;
std::vector<uint8_t> header_;
bool initialized_ = false;
};
#endif
class StorageNode : public INode {
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "storage"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "storage");
mode_ = config.ValueOr<std::string>("mode", "continuous");
format_ = config.ValueOr<std::string>("format", "mp4");
codec_ = config.ValueOr<std::string>("codec", "h264");
segment_sec_ = config.ValueOr<int>("segment_sec", 300);
base_path_ = config.ValueOr<std::string>("path", "/rec");
filename_pattern_ = config.ValueOr<std::string>("filename_pattern", "%Y%m%d/%H%M%S");
fps_ = config.ValueOr<int>("fps", 25);
bitrate_kbps_ = config.ValueOr<int>("bitrate_kbps", 2000);
reuse_encoded_meta_ = config.ValueOr<bool>("reuse_encoded_meta", true);
platform_ = config.ValueOr<std::string>("platform", "");
hw_platform_ = config.ValueOr<std::string>("hw_platform", "");
input_queue_ = ctx.input_queue;
if (!input_queue_) {
LogError("[storage] no input queue");
return false;
}
try {
std::filesystem::create_directories(base_path_);
} catch (const std::exception& e) {
LogError(std::string("[storage] failed to create directory: ") + e.what());
return false;
}
LogInfo("[storage] initialized, path=" + base_path_ +
" segment=" + std::to_string(segment_sec_) +
"s format=" + format_);
return true;
}
bool Start() override {
LogInfo("[storage] started");
return true;
}
void Stop() override {
if (input_queue_) input_queue_->Stop();
CloseCurrentFile();
if (encoder_) encoder_->Close();
encoder_.reset();
encoder_ready_ = false;
encoder_header_.clear();
LogInfo("[storage] stopped, recorded " + std::to_string(total_frames_) + " frames");
}
void Drain() override {
std::lock_guard<std::mutex> lock(mu_);
CloseCurrentFile();
}
bool UpdateConfig(const SimpleJson& new_config) override {
const std::string new_id = new_config.ValueOr<std::string>("id", id_);
if (!new_id.empty() && new_id != id_) return false;
// We only support a safe subset of updates; unsupported changes trigger graph rebuild.
const std::string new_mode = new_config.ValueOr<std::string>("mode", mode_);
const std::string new_format = new_config.ValueOr<std::string>("format", format_);
const std::string new_codec = new_config.ValueOr<std::string>("codec", codec_);
if (new_mode != mode_ || new_format != format_ || new_codec != codec_) {
return false;
}
int new_segment = new_config.ValueOr<int>("segment_sec", segment_sec_);
std::string new_path = new_config.ValueOr<std::string>("path", base_path_);
std::string new_pattern = new_config.ValueOr<std::string>("filename_pattern", filename_pattern_);
int new_fps = new_config.ValueOr<int>("fps", fps_);
int new_bitrate = new_config.ValueOr<int>("bitrate_kbps", bitrate_kbps_);
{
std::lock_guard<std::mutex> lock(mu_);
segment_sec_ = new_segment;
base_path_ = std::move(new_path);
filename_pattern_ = std::move(new_pattern);
fps_ = new_fps;
bitrate_kbps_ = new_bitrate;
try {
std::filesystem::create_directories(base_path_);
} catch (const std::exception& e) {
LogWarn(std::string("[storage] create_directories failed during UpdateConfig: ") + e.what());
return false;
} catch (...) {
LogWarn("[storage] create_directories failed during UpdateConfig: unknown error");
return false;
}
// Apply on next segment.
CloseCurrentFile();
if (encoder_) encoder_->Close();
encoder_.reset();
encoder_ready_ = false;
encoder_header_.clear();
}
return true;
}
NodeStatus Process(FramePtr frame) override {
if (!frame) return NodeStatus::DROP;
std::lock_guard<std::mutex> lock(mu_);
ProcessFrame(frame);
++total_frames_;
return NodeStatus::OK;
}
private:
static std::string PixelFormatToString(PixelFormat fmt) {
switch (fmt) {
case PixelFormat::NV12:
return "nv12";
case PixelFormat::YUV420:
return "yuv420";
case PixelFormat::RGB:
return "rgb";
case PixelFormat::BGR:
return "bgr";
default:
return "unknown";
}
}
void ProcessFrame(FramePtr frame) {
// Check if we need to start a new segment
auto now = std::chrono::steady_clock::now();
bool need_new_segment = false;
if (!file_open_) {
need_new_segment = true;
} else if (segment_sec_ > 0) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
now - segment_start_);
if (elapsed.count() >= segment_sec_) {
need_new_segment = true;
}
}
if (need_new_segment) {
CloseCurrentFile();
OpenNewFile(frame);
}
WriteFrame(frame);
}
void OpenNewFile(FramePtr frame) {
#if HAS_FFMPEG
auto meta = reuse_encoded_meta_ ? TryGetEncodedMeta(frame) : nullptr;
using_encoded_meta_ = (meta != nullptr);
segment_base_pts_ms_ = using_encoded_meta_ ? meta->pkt.pts_ms : 0;
last_pts90k_ = -1;
std::string filename = FormatTime(filename_pattern_) + "." + format_;
current_path_ = base_path_ + "/" + filename;
try {
std::filesystem::path p(current_path_);
if (p.has_parent_path()) {
std::filesystem::create_directories(p.parent_path());
}
} catch (const std::exception& e) {
LogWarn(std::string("[storage] failed to create directories for segment: ") + e.what());
} catch (...) {
LogWarn("[storage] failed to create directories for segment: unknown error");
}
const char* fmt_name = format_ == "ts" ? "mpegts" : "mp4";
if (avformat_alloc_output_context2(&fmt_ctx_, nullptr, fmt_name,
current_path_.c_str()) < 0) {
LogError("[storage] failed to create output context");
return;
}
AVCodecID codec_id = AV_CODEC_ID_H264;
if (using_encoded_meta_ && meta && meta->codec) {
codec_id = (meta->codec->codec == VideoCodec::H265) ? AV_CODEC_ID_HEVC : AV_CODEC_ID_H264;
} else {
codec_id = (codec_ == "h265" || codec_ == "hevc") ? AV_CODEC_ID_HEVC : AV_CODEC_ID_H264;
}
stream_ = avformat_new_stream(fmt_ctx_, nullptr);
if (!stream_) {
avformat_free_context(fmt_ctx_);
fmt_ctx_ = nullptr;
return;
}
if (using_encoded_meta_) {
stream_->time_base = AVRational{1, 90000};
const int fps_eff = (meta && meta->codec && meta->codec->fps > 0) ? meta->codec->fps : fps_;
stream_->avg_frame_rate = AVRational{std::max(1, fps_eff), 1};
stream_->r_frame_rate = stream_->avg_frame_rate;
} else {
stream_->time_base = AVRational{1, fps_};
}
stream_->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
stream_->codecpar->codec_id = codec_id;
stream_->codecpar->width = (using_encoded_meta_ && meta && meta->codec && meta->codec->width > 0)
? meta->codec->width
: frame->width;
stream_->codecpar->height = (using_encoded_meta_ && meta && meta->codec && meta->codec->height > 0)
? meta->codec->height
: frame->height;
stream_->codecpar->format = AV_PIX_FMT_YUV420P;
if (!using_encoded_meta_) {
if (!encoder_ready_) {
SimpleJson::Object enc_obj;
enc_obj["backend"] = SimpleJson(std::string("mpp"));
enc_obj["codec"] = SimpleJson(codec_);
enc_obj["fps"] = SimpleJson(static_cast<double>(fps_));
enc_obj["gop"] = SimpleJson(static_cast<double>(std::max(1, fps_) * 2));
enc_obj["bitrate_kbps"] = SimpleJson(static_cast<double>(bitrate_kbps_));
enc_obj["width"] = SimpleJson(static_cast<double>(frame->width));
enc_obj["height"] = SimpleJson(static_cast<double>(frame->height));
enc_obj["pixel_format"] = SimpleJson(PixelFormatToString(frame->format));
if (!platform_.empty()) {
enc_obj["platform"] = SimpleJson(platform_);
}
if (!hw_platform_.empty()) {
enc_obj["hw_platform"] = SimpleJson(hw_platform_);
}
SimpleJson enc_cfg(std::move(enc_obj));
if (!encoder_) {
encoder_ = HwFactory::CreateEncoder(enc_cfg);
}
if (!encoder_ || encoder_->Open(enc_cfg).Failed()) {
encoder_.reset();
} else {
encoder_header_ = encoder_->ExtraData();
encoder_ready_ = true;
}
}
if (encoder_ready_ && !encoder_header_.empty()) {
stream_->codecpar->extradata_size = static_cast<int>(encoder_header_.size());
stream_->codecpar->extradata = static_cast<uint8_t*>(
av_mallocz(encoder_header_.size() + AV_INPUT_BUFFER_PADDING_SIZE));
std::memcpy(stream_->codecpar->extradata, encoder_header_.data(), encoder_header_.size());
}
}
if (using_encoded_meta_ && meta && meta->codec && !meta->codec->extradata.empty()) {
std::vector<uint8_t> ex = meta->codec->extradata;
const bool is_mp4 = (format_ != "ts");
// MP4 prefers AVCC/HVCC. Do a best-effort conversion for H264 AnnexB extradata.
if (is_mp4 && codec_id == AV_CODEC_ID_H264) {
if (!(ex.size() >= 1 && ex[0] == 1) && HasAnnexBStartCode(ex.data(), ex.size())) {
std::vector<uint8_t> avcc;
if (BuildAvccFromAnnexB(ex, avcc)) {
ex = std::move(avcc);
}
}
}
stream_->codecpar->extradata_size = static_cast<int>(ex.size());
stream_->codecpar->extradata = static_cast<uint8_t*>(
av_mallocz(ex.size() + AV_INPUT_BUFFER_PADDING_SIZE));
std::memcpy(stream_->codecpar->extradata, ex.data(), ex.size());
}
if (!(fmt_ctx_->oformat->flags & AVFMT_NOFILE)) {
if (avio_open(&fmt_ctx_->pb, current_path_.c_str(), AVIO_FLAG_WRITE) < 0) {
avformat_free_context(fmt_ctx_);
fmt_ctx_ = nullptr;
return;
}
}
if (avformat_write_header(fmt_ctx_, nullptr) < 0) {
if (fmt_ctx_->pb) avio_closep(&fmt_ctx_->pb);
avformat_free_context(fmt_ctx_);
fmt_ctx_ = nullptr;
return;
}
file_open_ = true;
segment_start_ = std::chrono::steady_clock::now();
segment_frames_ = 0;
LogInfo("[storage] opened: " + current_path_);
#else
LogError("[storage] FFmpeg not enabled");
#endif
}
void WriteFrame(FramePtr frame) {
#if HAS_FFMPEG
if (!file_open_ || !fmt_ctx_ || !stream_) return;
if (using_encoded_meta_) {
auto meta = TryGetEncodedMeta(frame);
if (!meta || !meta->codec) return;
const bool is_mp4 = (format_ != "ts");
std::vector<uint8_t> sample = meta->pkt.data;
if (is_mp4 && stream_->codecpar->codec_id == AV_CODEC_ID_H264 &&
HasAnnexBStartCode(sample.data(), sample.size())) {
sample = AnnexBToLengthPrefixed(sample.data(), sample.size());
}
if (sample.empty()) return;
const int fps_eff = (meta->codec->fps > 0) ? meta->codec->fps : fps_;
const int64_t frame_dur = fps_eff > 0 ? (90000 / std::max(1, fps_eff)) : 0;
int64_t rel_ms = meta->pkt.pts_ms - segment_base_pts_ms_;
if (rel_ms < 0) rel_ms = 0;
int64_t pts90k = rel_ms * 90;
if (last_pts90k_ >= 0 && pts90k <= last_pts90k_) {
pts90k = last_pts90k_ + (frame_dur > 0 ? frame_dur : 1);
}
last_pts90k_ = pts90k;
AVPacket* pkt = av_packet_alloc();
if (!pkt) return;
if (av_new_packet(pkt, static_cast<int>(sample.size())) < 0) {
av_packet_free(&pkt);
return;
}
std::memcpy(pkt->data, sample.data(), sample.size());
pkt->stream_index = stream_->index;
pkt->flags = meta->pkt.key ? AV_PKT_FLAG_KEY : 0;
pkt->pts = pts90k;
pkt->dts = pts90k;
pkt->duration = frame_dur;
(void)av_interleaved_write_frame(fmt_ctx_, pkt);
av_packet_free(&pkt);
++segment_frames_;
return;
}
#endif
#if HAS_FFMPEG
if (!file_open_ || !fmt_ctx_ || !encoder_ || !encoder_ready_) return;
if (encoder_->Send(frame).Failed()) return;
while (true) {
auto out = encoder_->Receive();
if (!out.Ok()) break;
auto pkt_out = std::move(out.Value());
if (pkt_out.data.empty()) continue;
AVPacket* pkt = av_packet_alloc();
if (av_new_packet(pkt, static_cast<int>(pkt_out.data.size())) < 0) {
av_packet_free(&pkt);
continue;
}
std::memcpy(pkt->data, pkt_out.data.data(), pkt_out.data.size());
pkt->stream_index = stream_->index;
pkt->flags = pkt_out.keyframe ? AV_PKT_FLAG_KEY : 0;
pkt->pts = segment_frames_;
pkt->dts = segment_frames_;
pkt->duration = 1;
av_packet_rescale_ts(pkt, AVRational{1, fps_}, stream_->time_base);
av_interleaved_write_frame(fmt_ctx_, pkt);
av_packet_free(&pkt);
++segment_frames_;
}
#else
(void)frame;
#endif
}
void CloseCurrentFile() {
#if HAS_FFMPEG
if (!file_open_ || !fmt_ctx_) return;
av_write_trailer(fmt_ctx_);
if (!(fmt_ctx_->oformat->flags & AVFMT_NOFILE)) {
avio_closep(&fmt_ctx_->pb);
}
avformat_free_context(fmt_ctx_);
fmt_ctx_ = nullptr;
stream_ = nullptr;
file_open_ = false;
using_encoded_meta_ = false;
segment_base_pts_ms_ = 0;
last_pts90k_ = -1;
LogInfo("[storage] closed: " + current_path_ + " (" + std::to_string(segment_frames_) + " frames)");
#endif
}
std::string id_;
std::string mode_;
std::string format_;
std::string codec_;
int segment_sec_ = 300;
std::string base_path_;
std::string filename_pattern_;
int fps_ = 25;
int bitrate_kbps_ = 2000;
bool reuse_encoded_meta_ = true;
std::string platform_;
std::string hw_platform_;
std::mutex mu_;
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
uint64_t total_frames_ = 0;
// Current file state
bool file_open_ = false;
std::string current_path_;
std::chrono::steady_clock::time_point segment_start_;
int64_t segment_frames_ = 0;
bool using_encoded_meta_ = false;
int64_t segment_base_pts_ms_ = 0;
int64_t last_pts90k_ = -1;
#if HAS_FFMPEG
AVFormatContext* fmt_ctx_ = nullptr;
AVStream* stream_ = nullptr;
#endif
std::shared_ptr<IEncoder> encoder_;
bool encoder_ready_ = false;
std::vector<uint8_t> encoder_header_;
};
REGISTER_NODE(StorageNode, "storage");
} // namespace rk3588