OrangePi3588Media/plugins/storage/storage_node.cpp

493 lines
16 KiB
C++

#include <atomic>
#include <chrono>
#include <cstring>
#include <ctime>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <memory>
#include <sstream>
#include <thread>
#include "node.h"
#if defined(RK3588_ENABLE_FFMPEG)
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/imgutils.h>
#include <libavutil/opt.h>
#include <libswscale/swscale.h>
}
#define HAS_FFMPEG 1
#else
#define HAS_FFMPEG 0
#endif
#if defined(RK3588_ENABLE_MPP)
extern "C" {
#include <rockchip/mpp_buffer.h>
#include <rockchip/mpp_packet.h>
#include <rockchip/rk_mpi.h>
}
#define HAS_MPP 1
#else
#define HAS_MPP 0
#endif
namespace rk3588 {
namespace {
inline int Align16(int v) { return (v + 15) & ~15; }
std::string FormatTime(const std::string& pattern) {
auto now = std::chrono::system_clock::now();
auto time_t_now = std::chrono::system_clock::to_time_t(now);
std::tm* tm = std::localtime(&time_t_now);
std::ostringstream oss;
for (size_t i = 0; i < pattern.size(); ++i) {
if (pattern[i] == '%' && i + 1 < pattern.size()) {
char spec = pattern[++i];
switch (spec) {
case 'Y': oss << std::put_time(tm, "%Y"); break;
case 'm': oss << std::put_time(tm, "%m"); break;
case 'd': oss << std::put_time(tm, "%d"); break;
case 'H': oss << std::put_time(tm, "%H"); break;
case 'M': oss << std::put_time(tm, "%M"); break;
case 'S': oss << std::put_time(tm, "%S"); break;
default: oss << '%' << spec; break;
}
} else {
oss << pattern[i];
}
}
return oss.str();
}
} // namespace
#if HAS_MPP
class MppStorageEncoder {
public:
~MppStorageEncoder() { Shutdown(); }
bool Init(int width, int height, PixelFormat fmt, const std::string& codec,
int fps, int bitrate_kbps) {
width_ = width;
height_ = height;
hor_stride_ = Align16(width);
ver_stride_ = Align16(height);
fps_ = fps > 0 ? fps : 25;
bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 2000) * 1000;
if (fmt == PixelFormat::NV12) {
mpp_fmt_ = MPP_FMT_YUV420SP;
} else if (fmt == PixelFormat::YUV420) {
mpp_fmt_ = MPP_FMT_YUV420P;
} else {
std::cerr << "[storage] unsupported pixel format for MPP encoder\n";
return false;
}
coding_ = (codec == "h265" || codec == "hevc") ? MPP_VIDEO_CodingHEVC
: MPP_VIDEO_CodingAVC;
if (mpp_create(&ctx_, &mpi_) != MPP_OK) return false;
if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) return false;
if (mpp_enc_cfg_init(&cfg_) != MPP_OK) return false;
if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) return false;
mpp_enc_cfg_set_s32(cfg_, "prep:width", width_);
mpp_enc_cfg_set_s32(cfg_, "prep:height", height_);
mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_);
mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_);
mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_);
mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR);
mpp_enc_cfg_set_s32(cfg_, "rc:gop", fps_ * 2);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_);
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2);
mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2);
mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_);
if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) return false;
MppEncHeaderMode header_mode = MPP_ENC_HEADER_MODE_EACH_IDR;
mpi_->control(ctx_, MPP_ENC_SET_HEADER_MODE, &header_mode);
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM, NULL) != MPP_OK) {
mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL, NULL);
}
// Get header
MppPacket hdr = nullptr;
if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) {
auto* data = static_cast<uint8_t*>(mpp_packet_get_pos(hdr));
size_t len = mpp_packet_get_length(hdr);
header_.assign(data, data + len);
mpp_packet_deinit(&hdr);
}
initialized_ = true;
return true;
}
void Shutdown() {
if (frm_grp_) { mpp_buffer_group_put(frm_grp_); frm_grp_ = nullptr; }
if (cfg_) { mpp_enc_cfg_deinit(cfg_); cfg_ = nullptr; }
if (ctx_) {
if (mpi_) mpi_->reset(ctx_);
mpp_destroy(ctx_);
ctx_ = nullptr; mpi_ = nullptr;
}
initialized_ = false;
}
bool Encode(const FramePtr& frame, std::vector<uint8_t>& out, bool& is_key) {
out.clear();
is_key = false;
if (!initialized_ || !frame) return false;
size_t frame_size = static_cast<size_t>(hor_stride_) * ver_stride_ * 3 / 2;
MppBuffer buf = nullptr;
if (frame->dma_fd >= 0 && frame->data_size > 0) {
MppBufferInfo info{};
info.type = MPP_BUFFER_TYPE_EXT_DMA;
info.size = frame->data_size;
info.fd = frame->dma_fd;
mpp_buffer_import(&buf, &info);
}
if (!buf) {
if (!frame->data) return false;
if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) return false;
auto* dst = static_cast<uint8_t*>(mpp_buffer_get_ptr(buf));
std::memset(dst, 0, frame_size);
// Copy Y plane
for (int row = 0; row < height_; ++row) {
const uint8_t* src = frame->planes[0].data
? frame->planes[0].data + row * frame->planes[0].stride
: frame->data + row * frame->width;
std::memcpy(dst + row * hor_stride_, src, width_);
}
// Copy UV plane
uint8_t* dst_uv = dst + hor_stride_ * ver_stride_;
int uv_rows = height_ / 2;
for (int row = 0; row < uv_rows; ++row) {
const uint8_t* src = frame->planes[1].data
? frame->planes[1].data + row * frame->planes[1].stride
: frame->data + width_ * height_ + row * width_;
std::memcpy(dst_uv + row * hor_stride_, src, width_);
}
}
MppFrame mpp_frame = nullptr;
mpp_frame_init(&mpp_frame);
mpp_frame_set_width(mpp_frame, width_);
mpp_frame_set_height(mpp_frame, height_);
mpp_frame_set_hor_stride(mpp_frame, hor_stride_);
mpp_frame_set_ver_stride(mpp_frame, ver_stride_);
mpp_frame_set_fmt(mpp_frame, mpp_fmt_);
mpp_frame_set_pts(mpp_frame, frame->pts);
mpp_frame_set_buffer(mpp_frame, buf);
if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) {
mpp_frame_deinit(&mpp_frame);
mpp_buffer_put(buf);
return false;
}
mpp_frame_deinit(&mpp_frame);
MppPacket packet = nullptr;
while (true) {
MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet);
if (ret == MPP_ERR_TIMEOUT) {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
continue;
}
if (ret != MPP_OK || !packet) break;
auto* pos = static_cast<uint8_t*>(mpp_packet_get_pos(packet));
size_t len = mpp_packet_get_length(packet);
out.assign(pos, pos + len);
RK_U32 flag = mpp_packet_get_flag(packet);
is_key = (flag & 0x08) != 0;
mpp_packet_deinit(&packet);
break;
}
mpp_buffer_put(buf);
return !out.empty();
}
const std::vector<uint8_t>& Header() const { return header_; }
bool IsH265() const { return coding_ == MPP_VIDEO_CodingHEVC; }
private:
MppCtx ctx_ = nullptr;
MppApi* mpi_ = nullptr;
MppEncCfg cfg_ = nullptr;
MppBufferGroup frm_grp_ = nullptr;
MppCodingType coding_ = MPP_VIDEO_CodingAVC;
MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP;
int width_ = 0, height_ = 0;
int hor_stride_ = 0, ver_stride_ = 0;
int fps_ = 25;
int bitrate_bps_ = 2000000;
std::vector<uint8_t> header_;
bool initialized_ = false;
};
#endif
class StorageNode : public INode {
public:
std::string Id() const override { return id_; }
std::string Type() const override { return "storage"; }
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
id_ = config.ValueOr<std::string>("id", "storage");
mode_ = config.ValueOr<std::string>("mode", "continuous");
format_ = config.ValueOr<std::string>("format", "mp4");
codec_ = config.ValueOr<std::string>("codec", "h264");
segment_sec_ = config.ValueOr<int>("segment_sec", 300);
base_path_ = config.ValueOr<std::string>("path", "/rec");
filename_pattern_ = config.ValueOr<std::string>("filename_pattern", "%Y%m%d/%H%M%S");
fps_ = config.ValueOr<int>("fps", 25);
bitrate_kbps_ = config.ValueOr<int>("bitrate_kbps", 2000);
input_queue_ = ctx.input_queue;
if (!input_queue_) {
std::cerr << "[storage] no input queue\n";
return false;
}
try {
std::filesystem::create_directories(base_path_);
} catch (const std::exception& e) {
std::cerr << "[storage] failed to create directory: " << e.what() << "\n";
return false;
}
std::cout << "[storage] initialized, path=" << base_path_
<< " segment=" << segment_sec_ << "s format=" << format_ << "\n";
return true;
}
bool Start() override {
std::cout << "[storage] started\n";
return true;
}
void Stop() override {
if (input_queue_) input_queue_->Stop();
CloseCurrentFile();
std::cout << "[storage] stopped, recorded " << total_frames_ << " frames\n";
}
void Drain() override {
CloseCurrentFile();
}
NodeStatus Process(FramePtr frame) override {
if (!frame) return NodeStatus::DROP;
ProcessFrame(frame);
++total_frames_;
return NodeStatus::OK;
}
private:
void ProcessFrame(FramePtr frame) {
// Check if we need to start a new segment
auto now = std::chrono::steady_clock::now();
bool need_new_segment = false;
if (!file_open_) {
need_new_segment = true;
} else if (segment_sec_ > 0) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
now - segment_start_);
if (elapsed.count() >= segment_sec_) {
need_new_segment = true;
}
}
if (need_new_segment) {
CloseCurrentFile();
OpenNewFile(frame);
}
WriteFrame(frame);
}
void OpenNewFile(FramePtr frame) {
#if HAS_FFMPEG
std::string filename = FormatTime(filename_pattern_) + "." + format_;
current_path_ = base_path_ + "/" + filename;
try {
std::filesystem::path p(current_path_);
if (p.has_parent_path()) {
std::filesystem::create_directories(p.parent_path());
}
} catch (...) {}
const char* fmt_name = format_ == "ts" ? "mpegts" : "mp4";
if (avformat_alloc_output_context2(&fmt_ctx_, nullptr, fmt_name,
current_path_.c_str()) < 0) {
std::cerr << "[storage] failed to create output context\n";
return;
}
AVCodecID codec_id = (codec_ == "h265" || codec_ == "hevc")
? AV_CODEC_ID_HEVC : AV_CODEC_ID_H264;
const AVCodec* encoder = avcodec_find_encoder(codec_id);
stream_ = avformat_new_stream(fmt_ctx_, encoder);
if (!stream_) {
avformat_free_context(fmt_ctx_);
fmt_ctx_ = nullptr;
return;
}
stream_->time_base = AVRational{1, fps_};
stream_->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
stream_->codecpar->codec_id = codec_id;
stream_->codecpar->width = frame->width;
stream_->codecpar->height = frame->height;
stream_->codecpar->format = AV_PIX_FMT_YUV420P;
#if HAS_MPP
if (!mpp_encoder_) {
mpp_encoder_ = std::make_unique<MppStorageEncoder>();
if (!mpp_encoder_->Init(frame->width, frame->height, frame->format,
codec_, fps_, bitrate_kbps_)) {
mpp_encoder_.reset();
}
}
if (mpp_encoder_ && !mpp_encoder_->Header().empty()) {
const auto& hdr = mpp_encoder_->Header();
stream_->codecpar->extradata_size = static_cast<int>(hdr.size());
stream_->codecpar->extradata = static_cast<uint8_t*>(
av_mallocz(hdr.size() + AV_INPUT_BUFFER_PADDING_SIZE));
std::memcpy(stream_->codecpar->extradata, hdr.data(), hdr.size());
}
#endif
if (!(fmt_ctx_->oformat->flags & AVFMT_NOFILE)) {
if (avio_open(&fmt_ctx_->pb, current_path_.c_str(), AVIO_FLAG_WRITE) < 0) {
avformat_free_context(fmt_ctx_);
fmt_ctx_ = nullptr;
return;
}
}
if (avformat_write_header(fmt_ctx_, nullptr) < 0) {
if (fmt_ctx_->pb) avio_closep(&fmt_ctx_->pb);
avformat_free_context(fmt_ctx_);
fmt_ctx_ = nullptr;
return;
}
file_open_ = true;
segment_start_ = std::chrono::steady_clock::now();
segment_frames_ = 0;
std::cout << "[storage] opened: " << current_path_ << "\n";
#else
std::cerr << "[storage] FFmpeg not enabled\n";
#endif
}
void WriteFrame(FramePtr frame) {
#if HAS_FFMPEG && HAS_MPP
if (!file_open_ || !fmt_ctx_ || !mpp_encoder_) return;
std::vector<uint8_t> encoded;
bool is_key = false;
if (!mpp_encoder_->Encode(frame, encoded, is_key)) return;
if (encoded.empty()) return;
AVPacket* pkt = av_packet_alloc();
if (av_new_packet(pkt, static_cast<int>(encoded.size())) < 0) {
av_packet_free(&pkt);
return;
}
std::memcpy(pkt->data, encoded.data(), encoded.size());
pkt->stream_index = stream_->index;
pkt->flags = is_key ? AV_PKT_FLAG_KEY : 0;
pkt->pts = segment_frames_;
pkt->dts = segment_frames_;
pkt->duration = 1;
av_packet_rescale_ts(pkt, AVRational{1, fps_}, stream_->time_base);
av_interleaved_write_frame(fmt_ctx_, pkt);
av_packet_free(&pkt);
++segment_frames_;
#elif HAS_FFMPEG
// Software encoding fallback would go here
(void)frame;
#endif
}
void CloseCurrentFile() {
#if HAS_FFMPEG
if (!file_open_ || !fmt_ctx_) return;
av_write_trailer(fmt_ctx_);
if (!(fmt_ctx_->oformat->flags & AVFMT_NOFILE)) {
avio_closep(&fmt_ctx_->pb);
}
avformat_free_context(fmt_ctx_);
fmt_ctx_ = nullptr;
stream_ = nullptr;
file_open_ = false;
std::cout << "[storage] closed: " << current_path_
<< " (" << segment_frames_ << " frames)\n";
#endif
}
std::string id_;
std::string mode_;
std::string format_;
std::string codec_;
int segment_sec_ = 300;
std::string base_path_;
std::string filename_pattern_;
int fps_ = 25;
int bitrate_kbps_ = 2000;
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
uint64_t total_frames_ = 0;
// Current file state
bool file_open_ = false;
std::string current_path_;
std::chrono::steady_clock::time_point segment_start_;
int64_t segment_frames_ = 0;
#if HAS_FFMPEG
AVFormatContext* fmt_ctx_ = nullptr;
AVStream* stream_ = nullptr;
#endif
#if HAS_MPP
std::unique_ptr<MppStorageEncoder> mpp_encoder_;
#endif
};
REGISTER_NODE(StorageNode, "storage");
} // namespace rk3588