503 lines
16 KiB
C++
503 lines
16 KiB
C++
#include <atomic>
|
|
#include <chrono>
|
|
#include <ctime>
|
|
#include <filesystem>
|
|
#include <fstream>
|
|
#include <iomanip>
|
|
#include <iostream>
|
|
#include <memory>
|
|
#include <sstream>
|
|
#include <thread>
|
|
|
|
#include "node.h"
|
|
|
|
#if defined(RK3588_ENABLE_FFMPEG)
|
|
extern "C" {
|
|
#include <libavcodec/avcodec.h>
|
|
#include <libavformat/avformat.h>
|
|
#include <libavutil/imgutils.h>
|
|
#include <libavutil/opt.h>
|
|
#include <libswscale/swscale.h>
|
|
}
|
|
#define HAS_FFMPEG 1
|
|
#else
|
|
#define HAS_FFMPEG 0
|
|
#endif
|
|
|
|
#if defined(RK3588_ENABLE_MPP)
|
|
extern "C" {
|
|
#include <rockchip/mpp_buffer.h>
|
|
#include <rockchip/mpp_packet.h>
|
|
#include <rockchip/rk_mpi.h>
|
|
}
|
|
#define HAS_MPP 1
|
|
#else
|
|
#define HAS_MPP 0
|
|
#endif
|
|
|
|
namespace rk3588 {
|
|
|
|
namespace {
|
|
inline int Align16(int v) { return (v + 15) & ~15; }
|
|
|
|
std::string FormatTime(const std::string& pattern) {
|
|
auto now = std::chrono::system_clock::now();
|
|
auto time_t_now = std::chrono::system_clock::to_time_t(now);
|
|
std::tm* tm = std::localtime(&time_t_now);
|
|
|
|
std::ostringstream oss;
|
|
for (size_t i = 0; i < pattern.size(); ++i) {
|
|
if (pattern[i] == '%' && i + 1 < pattern.size()) {
|
|
char spec = pattern[++i];
|
|
switch (spec) {
|
|
case 'Y': oss << std::put_time(tm, "%Y"); break;
|
|
case 'm': oss << std::put_time(tm, "%m"); break;
|
|
case 'd': oss << std::put_time(tm, "%d"); break;
|
|
case 'H': oss << std::put_time(tm, "%H"); break;
|
|
case 'M': oss << std::put_time(tm, "%M"); break;
|
|
case 'S': oss << std::put_time(tm, "%S"); break;
|
|
default: oss << '%' << spec; break;
|
|
}
|
|
} else {
|
|
oss << pattern[i];
|
|
}
|
|
}
|
|
return oss.str();
|
|
}
|
|
} // namespace
|
|
|
|
#if HAS_MPP
|
|
class MppStorageEncoder {
|
|
public:
|
|
~MppStorageEncoder() { Shutdown(); }
|
|
|
|
bool Init(int width, int height, PixelFormat fmt, const std::string& codec,
|
|
int fps, int bitrate_kbps) {
|
|
width_ = width;
|
|
height_ = height;
|
|
hor_stride_ = Align16(width);
|
|
ver_stride_ = Align16(height);
|
|
fps_ = fps > 0 ? fps : 25;
|
|
bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 2000) * 1000;
|
|
|
|
if (fmt == PixelFormat::NV12) {
|
|
mpp_fmt_ = MPP_FMT_YUV420SP;
|
|
} else if (fmt == PixelFormat::YUV420) {
|
|
mpp_fmt_ = MPP_FMT_YUV420P;
|
|
} else {
|
|
std::cerr << "[storage] unsupported pixel format for MPP encoder\n";
|
|
return false;
|
|
}
|
|
|
|
coding_ = (codec == "h265" || codec == "hevc") ? MPP_VIDEO_CodingHEVC
|
|
: MPP_VIDEO_CodingAVC;
|
|
|
|
if (mpp_create(&ctx_, &mpi_) != MPP_OK) return false;
|
|
if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) return false;
|
|
if (mpp_enc_cfg_init(&cfg_) != MPP_OK) return false;
|
|
if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) return false;
|
|
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:width", width_);
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:height", height_);
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_);
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_);
|
|
mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:gop", fps_ * 2);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2);
|
|
mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2);
|
|
mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_);
|
|
|
|
if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) return false;
|
|
|
|
MppEncHeaderMode header_mode = MPP_ENC_HEADER_MODE_EACH_IDR;
|
|
mpi_->control(ctx_, MPP_ENC_SET_HEADER_MODE, &header_mode);
|
|
|
|
if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM) != MPP_OK) {
|
|
mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL);
|
|
}
|
|
|
|
// Get header
|
|
MppPacket hdr = nullptr;
|
|
if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) {
|
|
auto* data = static_cast<uint8_t*>(mpp_packet_get_pos(hdr));
|
|
size_t len = mpp_packet_get_length(hdr);
|
|
header_.assign(data, data + len);
|
|
mpp_packet_deinit(&hdr);
|
|
}
|
|
|
|
initialized_ = true;
|
|
return true;
|
|
}
|
|
|
|
void Shutdown() {
|
|
if (frm_grp_) { mpp_buffer_group_put(frm_grp_); frm_grp_ = nullptr; }
|
|
if (cfg_) { mpp_enc_cfg_deinit(cfg_); cfg_ = nullptr; }
|
|
if (ctx_) {
|
|
if (mpi_) mpi_->reset(ctx_);
|
|
mpp_destroy(ctx_);
|
|
ctx_ = nullptr; mpi_ = nullptr;
|
|
}
|
|
initialized_ = false;
|
|
}
|
|
|
|
bool Encode(const FramePtr& frame, std::vector<uint8_t>& out, bool& is_key) {
|
|
out.clear();
|
|
is_key = false;
|
|
if (!initialized_ || !frame) return false;
|
|
|
|
size_t frame_size = static_cast<size_t>(hor_stride_) * ver_stride_ * 3 / 2;
|
|
MppBuffer buf = nullptr;
|
|
|
|
if (frame->dma_fd >= 0 && frame->data_size > 0) {
|
|
MppBufferInfo info{};
|
|
info.type = MPP_BUFFER_TYPE_EXT_DMA;
|
|
info.size = frame->data_size;
|
|
info.fd = frame->dma_fd;
|
|
mpp_buffer_import(&buf, &info);
|
|
}
|
|
|
|
if (!buf) {
|
|
if (!frame->data) return false;
|
|
if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) return false;
|
|
|
|
auto* dst = static_cast<uint8_t*>(mpp_buffer_get_ptr(buf));
|
|
std::memset(dst, 0, frame_size);
|
|
|
|
// Copy Y plane
|
|
for (int row = 0; row < height_; ++row) {
|
|
const uint8_t* src = frame->planes[0].data
|
|
? frame->planes[0].data + row * frame->planes[0].stride
|
|
: frame->data + row * frame->width;
|
|
std::memcpy(dst + row * hor_stride_, src, width_);
|
|
}
|
|
|
|
// Copy UV plane
|
|
uint8_t* dst_uv = dst + hor_stride_ * ver_stride_;
|
|
int uv_rows = height_ / 2;
|
|
for (int row = 0; row < uv_rows; ++row) {
|
|
const uint8_t* src = frame->planes[1].data
|
|
? frame->planes[1].data + row * frame->planes[1].stride
|
|
: frame->data + width_ * height_ + row * width_;
|
|
std::memcpy(dst_uv + row * hor_stride_, src, width_);
|
|
}
|
|
}
|
|
|
|
MppFrame mpp_frame = nullptr;
|
|
mpp_frame_init(&mpp_frame);
|
|
mpp_frame_set_width(mpp_frame, width_);
|
|
mpp_frame_set_height(mpp_frame, height_);
|
|
mpp_frame_set_hor_stride(mpp_frame, hor_stride_);
|
|
mpp_frame_set_ver_stride(mpp_frame, ver_stride_);
|
|
mpp_frame_set_fmt(mpp_frame, mpp_fmt_);
|
|
mpp_frame_set_pts(mpp_frame, frame->pts);
|
|
mpp_frame_set_buffer(mpp_frame, buf);
|
|
|
|
if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) {
|
|
mpp_frame_deinit(&mpp_frame);
|
|
mpp_buffer_put(buf);
|
|
return false;
|
|
}
|
|
mpp_frame_deinit(&mpp_frame);
|
|
|
|
MppPacket packet = nullptr;
|
|
while (true) {
|
|
MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet);
|
|
if (ret == MPP_ERR_TIMEOUT) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
|
continue;
|
|
}
|
|
if (ret != MPP_OK || !packet) break;
|
|
|
|
auto* pos = static_cast<uint8_t*>(mpp_packet_get_pos(packet));
|
|
size_t len = mpp_packet_get_length(packet);
|
|
out.assign(pos, pos + len);
|
|
|
|
RK_U32 flag = mpp_packet_get_flag(packet);
|
|
is_key = (flag & 0x08) != 0;
|
|
mpp_packet_deinit(&packet);
|
|
break;
|
|
}
|
|
|
|
mpp_buffer_put(buf);
|
|
return !out.empty();
|
|
}
|
|
|
|
const std::vector<uint8_t>& Header() const { return header_; }
|
|
bool IsH265() const { return coding_ == MPP_VIDEO_CodingHEVC; }
|
|
|
|
private:
|
|
MppCtx ctx_ = nullptr;
|
|
MppApi* mpi_ = nullptr;
|
|
MppEncCfg cfg_ = nullptr;
|
|
MppBufferGroup frm_grp_ = nullptr;
|
|
MppCodingType coding_ = MPP_VIDEO_CodingAVC;
|
|
MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP;
|
|
int width_ = 0, height_ = 0;
|
|
int hor_stride_ = 0, ver_stride_ = 0;
|
|
int fps_ = 25;
|
|
int bitrate_bps_ = 2000000;
|
|
std::vector<uint8_t> header_;
|
|
bool initialized_ = false;
|
|
};
|
|
#endif
|
|
|
|
class StorageNode : public INode {
|
|
public:
|
|
std::string Id() const override { return id_; }
|
|
std::string Type() const override { return "storage"; }
|
|
|
|
bool Init(const SimpleJson& config, const NodeContext& ctx) override {
|
|
id_ = config.ValueOr<std::string>("id", "storage");
|
|
mode_ = config.ValueOr<std::string>("mode", "continuous");
|
|
format_ = config.ValueOr<std::string>("format", "mp4");
|
|
codec_ = config.ValueOr<std::string>("codec", "h264");
|
|
segment_sec_ = config.ValueOr<int>("segment_sec", 300);
|
|
base_path_ = config.ValueOr<std::string>("path", "/rec");
|
|
filename_pattern_ = config.ValueOr<std::string>("filename_pattern", "%Y%m%d/%H%M%S");
|
|
fps_ = config.ValueOr<int>("fps", 25);
|
|
bitrate_kbps_ = config.ValueOr<int>("bitrate_kbps", 2000);
|
|
|
|
input_queue_ = ctx.input_queue;
|
|
if (!input_queue_) {
|
|
std::cerr << "[storage] no input queue\n";
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
std::filesystem::create_directories(base_path_);
|
|
} catch (const std::exception& e) {
|
|
std::cerr << "[storage] failed to create directory: " << e.what() << "\n";
|
|
return false;
|
|
}
|
|
|
|
std::cout << "[storage] initialized, path=" << base_path_
|
|
<< " segment=" << segment_sec_ << "s format=" << format_ << "\n";
|
|
return true;
|
|
}
|
|
|
|
bool Start() override {
|
|
running_.store(true);
|
|
worker_ = std::thread(&StorageNode::WorkerLoop, this);
|
|
std::cout << "[storage] started\n";
|
|
return true;
|
|
}
|
|
|
|
void Stop() override {
|
|
running_.store(false);
|
|
if (input_queue_) input_queue_->Stop();
|
|
if (worker_.joinable()) worker_.join();
|
|
CloseCurrentFile();
|
|
std::cout << "[storage] stopped, recorded " << total_frames_ << " frames\n";
|
|
}
|
|
|
|
void Drain() override {
|
|
CloseCurrentFile();
|
|
}
|
|
|
|
private:
|
|
void WorkerLoop() {
|
|
using namespace std::chrono;
|
|
FramePtr frame;
|
|
|
|
while (running_.load()) {
|
|
if (!input_queue_->Pop(frame, milliseconds(200))) continue;
|
|
if (!frame) continue;
|
|
|
|
ProcessFrame(frame);
|
|
++total_frames_;
|
|
}
|
|
}
|
|
|
|
void ProcessFrame(FramePtr frame) {
|
|
// Check if we need to start a new segment
|
|
auto now = std::chrono::steady_clock::now();
|
|
bool need_new_segment = false;
|
|
|
|
if (!file_open_) {
|
|
need_new_segment = true;
|
|
} else if (segment_sec_ > 0) {
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
|
|
now - segment_start_);
|
|
if (elapsed.count() >= segment_sec_) {
|
|
need_new_segment = true;
|
|
}
|
|
}
|
|
|
|
if (need_new_segment) {
|
|
CloseCurrentFile();
|
|
OpenNewFile(frame);
|
|
}
|
|
|
|
WriteFrame(frame);
|
|
}
|
|
|
|
void OpenNewFile(FramePtr frame) {
|
|
#if HAS_FFMPEG
|
|
std::string filename = FormatTime(filename_pattern_) + "." + format_;
|
|
current_path_ = base_path_ + "/" + filename;
|
|
|
|
try {
|
|
std::filesystem::path p(current_path_);
|
|
if (p.has_parent_path()) {
|
|
std::filesystem::create_directories(p.parent_path());
|
|
}
|
|
} catch (...) {}
|
|
|
|
const char* fmt_name = format_ == "ts" ? "mpegts" : "mp4";
|
|
if (avformat_alloc_output_context2(&fmt_ctx_, nullptr, fmt_name,
|
|
current_path_.c_str()) < 0) {
|
|
std::cerr << "[storage] failed to create output context\n";
|
|
return;
|
|
}
|
|
|
|
AVCodecID codec_id = (codec_ == "h265" || codec_ == "hevc")
|
|
? AV_CODEC_ID_HEVC : AV_CODEC_ID_H264;
|
|
const AVCodec* encoder = avcodec_find_encoder(codec_id);
|
|
stream_ = avformat_new_stream(fmt_ctx_, encoder);
|
|
if (!stream_) {
|
|
avformat_free_context(fmt_ctx_);
|
|
fmt_ctx_ = nullptr;
|
|
return;
|
|
}
|
|
|
|
stream_->time_base = AVRational{1, fps_};
|
|
stream_->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
|
|
stream_->codecpar->codec_id = codec_id;
|
|
stream_->codecpar->width = frame->width;
|
|
stream_->codecpar->height = frame->height;
|
|
stream_->codecpar->format = AV_PIX_FMT_YUV420P;
|
|
|
|
#if HAS_MPP
|
|
if (!mpp_encoder_) {
|
|
mpp_encoder_ = std::make_unique<MppStorageEncoder>();
|
|
if (!mpp_encoder_->Init(frame->width, frame->height, frame->format,
|
|
codec_, fps_, bitrate_kbps_)) {
|
|
mpp_encoder_.reset();
|
|
}
|
|
}
|
|
|
|
if (mpp_encoder_ && !mpp_encoder_->Header().empty()) {
|
|
const auto& hdr = mpp_encoder_->Header();
|
|
stream_->codecpar->extradata_size = static_cast<int>(hdr.size());
|
|
stream_->codecpar->extradata = static_cast<uint8_t*>(
|
|
av_mallocz(hdr.size() + AV_INPUT_BUFFER_PADDING_SIZE));
|
|
std::memcpy(stream_->codecpar->extradata, hdr.data(), hdr.size());
|
|
}
|
|
#endif
|
|
|
|
if (!(fmt_ctx_->oformat->flags & AVFMT_NOFILE)) {
|
|
if (avio_open(&fmt_ctx_->pb, current_path_.c_str(), AVIO_FLAG_WRITE) < 0) {
|
|
avformat_free_context(fmt_ctx_);
|
|
fmt_ctx_ = nullptr;
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (avformat_write_header(fmt_ctx_, nullptr) < 0) {
|
|
if (fmt_ctx_->pb) avio_closep(&fmt_ctx_->pb);
|
|
avformat_free_context(fmt_ctx_);
|
|
fmt_ctx_ = nullptr;
|
|
return;
|
|
}
|
|
|
|
file_open_ = true;
|
|
segment_start_ = std::chrono::steady_clock::now();
|
|
segment_frames_ = 0;
|
|
std::cout << "[storage] opened: " << current_path_ << "\n";
|
|
#else
|
|
std::cerr << "[storage] FFmpeg not enabled\n";
|
|
#endif
|
|
}
|
|
|
|
void WriteFrame(FramePtr frame) {
|
|
#if HAS_FFMPEG && HAS_MPP
|
|
if (!file_open_ || !fmt_ctx_ || !mpp_encoder_) return;
|
|
|
|
std::vector<uint8_t> encoded;
|
|
bool is_key = false;
|
|
|
|
if (!mpp_encoder_->Encode(frame, encoded, is_key)) return;
|
|
if (encoded.empty()) return;
|
|
|
|
AVPacket* pkt = av_packet_alloc();
|
|
if (av_new_packet(pkt, static_cast<int>(encoded.size())) < 0) {
|
|
av_packet_free(&pkt);
|
|
return;
|
|
}
|
|
|
|
std::memcpy(pkt->data, encoded.data(), encoded.size());
|
|
pkt->stream_index = stream_->index;
|
|
pkt->flags = is_key ? AV_PKT_FLAG_KEY : 0;
|
|
pkt->pts = segment_frames_;
|
|
pkt->dts = segment_frames_;
|
|
pkt->duration = 1;
|
|
|
|
av_packet_rescale_ts(pkt, AVRational{1, fps_}, stream_->time_base);
|
|
av_interleaved_write_frame(fmt_ctx_, pkt);
|
|
av_packet_free(&pkt);
|
|
++segment_frames_;
|
|
#elif HAS_FFMPEG
|
|
// Software encoding fallback would go here
|
|
(void)frame;
|
|
#endif
|
|
}
|
|
|
|
void CloseCurrentFile() {
|
|
#if HAS_FFMPEG
|
|
if (!file_open_ || !fmt_ctx_) return;
|
|
|
|
av_write_trailer(fmt_ctx_);
|
|
if (!(fmt_ctx_->oformat->flags & AVFMT_NOFILE)) {
|
|
avio_closep(&fmt_ctx_->pb);
|
|
}
|
|
avformat_free_context(fmt_ctx_);
|
|
fmt_ctx_ = nullptr;
|
|
stream_ = nullptr;
|
|
file_open_ = false;
|
|
|
|
std::cout << "[storage] closed: " << current_path_
|
|
<< " (" << segment_frames_ << " frames)\n";
|
|
#endif
|
|
}
|
|
|
|
std::string id_;
|
|
std::string mode_;
|
|
std::string format_;
|
|
std::string codec_;
|
|
int segment_sec_ = 300;
|
|
std::string base_path_;
|
|
std::string filename_pattern_;
|
|
int fps_ = 25;
|
|
int bitrate_kbps_ = 2000;
|
|
|
|
std::atomic<bool> running_{false};
|
|
std::shared_ptr<SpscQueue<FramePtr>> input_queue_;
|
|
std::thread worker_;
|
|
uint64_t total_frames_ = 0;
|
|
|
|
// Current file state
|
|
bool file_open_ = false;
|
|
std::string current_path_;
|
|
std::chrono::steady_clock::time_point segment_start_;
|
|
int64_t segment_frames_ = 0;
|
|
|
|
#if HAS_FFMPEG
|
|
AVFormatContext* fmt_ctx_ = nullptr;
|
|
AVStream* stream_ = nullptr;
|
|
#endif
|
|
|
|
#if HAS_MPP
|
|
std::unique_ptr<MppStorageEncoder> mpp_encoder_;
|
|
#endif
|
|
};
|
|
|
|
REGISTER_NODE(StorageNode, "storage");
|
|
|
|
} // namespace rk3588
|