#include #include #include #include #include #include #include #include #include #include #include #include "node.h" #if defined(RK3588_ENABLE_FFMPEG) extern "C" { #include #include #include #include #include } #define HAS_FFMPEG 1 #else #define HAS_FFMPEG 0 #endif #if defined(RK3588_ENABLE_MPP) extern "C" { #include #include #include } #define HAS_MPP 1 #else #define HAS_MPP 0 #endif namespace rk3588 { namespace { inline int Align16(int v) { return (v + 15) & ~15; } std::string FormatTime(const std::string& pattern) { auto now = std::chrono::system_clock::now(); auto time_t_now = std::chrono::system_clock::to_time_t(now); std::tm* tm = std::localtime(&time_t_now); std::ostringstream oss; for (size_t i = 0; i < pattern.size(); ++i) { if (pattern[i] == '%' && i + 1 < pattern.size()) { char spec = pattern[++i]; switch (spec) { case 'Y': oss << std::put_time(tm, "%Y"); break; case 'm': oss << std::put_time(tm, "%m"); break; case 'd': oss << std::put_time(tm, "%d"); break; case 'H': oss << std::put_time(tm, "%H"); break; case 'M': oss << std::put_time(tm, "%M"); break; case 'S': oss << std::put_time(tm, "%S"); break; default: oss << '%' << spec; break; } } else { oss << pattern[i]; } } return oss.str(); } } // namespace #if HAS_MPP class MppStorageEncoder { public: ~MppStorageEncoder() { Shutdown(); } bool Init(int width, int height, PixelFormat fmt, const std::string& codec, int fps, int bitrate_kbps) { width_ = width; height_ = height; hor_stride_ = Align16(width); ver_stride_ = Align16(height); fps_ = fps > 0 ? fps : 25; bitrate_bps_ = (bitrate_kbps > 0 ? bitrate_kbps : 2000) * 1000; if (fmt == PixelFormat::NV12) { mpp_fmt_ = MPP_FMT_YUV420SP; } else if (fmt == PixelFormat::YUV420) { mpp_fmt_ = MPP_FMT_YUV420P; } else { std::cerr << "[storage] unsupported pixel format for MPP encoder\n"; return false; } coding_ = (codec == "h265" || codec == "hevc") ? MPP_VIDEO_CodingHEVC : MPP_VIDEO_CodingAVC; if (mpp_create(&ctx_, &mpi_) != MPP_OK) return false; if (mpp_init(ctx_, MPP_CTX_ENC, coding_) != MPP_OK) return false; if (mpp_enc_cfg_init(&cfg_) != MPP_OK) return false; if (mpi_->control(ctx_, MPP_ENC_GET_CFG, cfg_) != MPP_OK) return false; mpp_enc_cfg_set_s32(cfg_, "prep:width", width_); mpp_enc_cfg_set_s32(cfg_, "prep:height", height_); mpp_enc_cfg_set_s32(cfg_, "prep:hor_stride", hor_stride_); mpp_enc_cfg_set_s32(cfg_, "prep:ver_stride", ver_stride_); mpp_enc_cfg_set_s32(cfg_, "prep:format", mpp_fmt_); mpp_enc_cfg_set_s32(cfg_, "rc:mode", MPP_ENC_RC_MODE_CBR); mpp_enc_cfg_set_s32(cfg_, "rc:gop", fps_ * 2); mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_num", fps_); mpp_enc_cfg_set_s32(cfg_, "rc:fps_in_denorm", 1); mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_num", fps_); mpp_enc_cfg_set_s32(cfg_, "rc:fps_out_denorm", 1); mpp_enc_cfg_set_s32(cfg_, "rc:bps_target", bitrate_bps_); mpp_enc_cfg_set_s32(cfg_, "rc:bps_max", bitrate_bps_ * 3 / 2); mpp_enc_cfg_set_s32(cfg_, "rc:bps_min", bitrate_bps_ / 2); mpp_enc_cfg_set_s32(cfg_, "codec:type", coding_); if (mpi_->control(ctx_, MPP_ENC_SET_CFG, cfg_) != MPP_OK) return false; MppEncHeaderMode header_mode = MPP_ENC_HEADER_MODE_EACH_IDR; mpi_->control(ctx_, MPP_ENC_SET_HEADER_MODE, &header_mode); if (mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_DRM) != MPP_OK) { mpp_buffer_group_get_internal(&frm_grp_, MPP_BUFFER_TYPE_NORMAL); } // Get header MppPacket hdr = nullptr; if (mpi_->control(ctx_, MPP_ENC_GET_HDR_SYNC, &hdr) == MPP_OK && hdr) { auto* data = static_cast(mpp_packet_get_pos(hdr)); size_t len = mpp_packet_get_length(hdr); header_.assign(data, data + len); mpp_packet_deinit(&hdr); } initialized_ = true; return true; } void Shutdown() { if (frm_grp_) { mpp_buffer_group_put(frm_grp_); frm_grp_ = nullptr; } if (cfg_) { mpp_enc_cfg_deinit(cfg_); cfg_ = nullptr; } if (ctx_) { if (mpi_) mpi_->reset(ctx_); mpp_destroy(ctx_); ctx_ = nullptr; mpi_ = nullptr; } initialized_ = false; } bool Encode(const FramePtr& frame, std::vector& out, bool& is_key) { out.clear(); is_key = false; if (!initialized_ || !frame) return false; size_t frame_size = static_cast(hor_stride_) * ver_stride_ * 3 / 2; MppBuffer buf = nullptr; if (frame->dma_fd >= 0 && frame->data_size > 0) { MppBufferInfo info{}; info.type = MPP_BUFFER_TYPE_EXT_DMA; info.size = frame->data_size; info.fd = frame->dma_fd; mpp_buffer_import(&buf, &info); } if (!buf) { if (!frame->data) return false; if (mpp_buffer_get(frm_grp_, &buf, frame_size) != MPP_OK) return false; auto* dst = static_cast(mpp_buffer_get_ptr(buf)); std::memset(dst, 0, frame_size); // Copy Y plane for (int row = 0; row < height_; ++row) { const uint8_t* src = frame->planes[0].data ? frame->planes[0].data + row * frame->planes[0].stride : frame->data + row * frame->width; std::memcpy(dst + row * hor_stride_, src, width_); } // Copy UV plane uint8_t* dst_uv = dst + hor_stride_ * ver_stride_; int uv_rows = height_ / 2; for (int row = 0; row < uv_rows; ++row) { const uint8_t* src = frame->planes[1].data ? frame->planes[1].data + row * frame->planes[1].stride : frame->data + width_ * height_ + row * width_; std::memcpy(dst_uv + row * hor_stride_, src, width_); } } MppFrame mpp_frame = nullptr; mpp_frame_init(&mpp_frame); mpp_frame_set_width(mpp_frame, width_); mpp_frame_set_height(mpp_frame, height_); mpp_frame_set_hor_stride(mpp_frame, hor_stride_); mpp_frame_set_ver_stride(mpp_frame, ver_stride_); mpp_frame_set_fmt(mpp_frame, mpp_fmt_); mpp_frame_set_pts(mpp_frame, frame->pts); mpp_frame_set_buffer(mpp_frame, buf); if (mpi_->encode_put_frame(ctx_, mpp_frame) != MPP_OK) { mpp_frame_deinit(&mpp_frame); mpp_buffer_put(buf); return false; } mpp_frame_deinit(&mpp_frame); MppPacket packet = nullptr; while (true) { MPP_RET ret = mpi_->encode_get_packet(ctx_, &packet); if (ret == MPP_ERR_TIMEOUT) { std::this_thread::sleep_for(std::chrono::milliseconds(2)); continue; } if (ret != MPP_OK || !packet) break; auto* pos = static_cast(mpp_packet_get_pos(packet)); size_t len = mpp_packet_get_length(packet); out.assign(pos, pos + len); RK_U32 flag = mpp_packet_get_flag(packet); is_key = (flag & 0x08) != 0; mpp_packet_deinit(&packet); break; } mpp_buffer_put(buf); return !out.empty(); } const std::vector& Header() const { return header_; } bool IsH265() const { return coding_ == MPP_VIDEO_CodingHEVC; } private: MppCtx ctx_ = nullptr; MppApi* mpi_ = nullptr; MppEncCfg cfg_ = nullptr; MppBufferGroup frm_grp_ = nullptr; MppCodingType coding_ = MPP_VIDEO_CodingAVC; MppFrameFormat mpp_fmt_ = MPP_FMT_YUV420SP; int width_ = 0, height_ = 0; int hor_stride_ = 0, ver_stride_ = 0; int fps_ = 25; int bitrate_bps_ = 2000000; std::vector header_; bool initialized_ = false; }; #endif class StorageNode : public INode { public: std::string Id() const override { return id_; } std::string Type() const override { return "storage"; } bool Init(const SimpleJson& config, const NodeContext& ctx) override { id_ = config.ValueOr("id", "storage"); mode_ = config.ValueOr("mode", "continuous"); format_ = config.ValueOr("format", "mp4"); codec_ = config.ValueOr("codec", "h264"); segment_sec_ = config.ValueOr("segment_sec", 300); base_path_ = config.ValueOr("path", "/rec"); filename_pattern_ = config.ValueOr("filename_pattern", "%Y%m%d/%H%M%S"); fps_ = config.ValueOr("fps", 25); bitrate_kbps_ = config.ValueOr("bitrate_kbps", 2000); input_queue_ = ctx.input_queue; if (!input_queue_) { std::cerr << "[storage] no input queue\n"; return false; } try { std::filesystem::create_directories(base_path_); } catch (const std::exception& e) { std::cerr << "[storage] failed to create directory: " << e.what() << "\n"; return false; } std::cout << "[storage] initialized, path=" << base_path_ << " segment=" << segment_sec_ << "s format=" << format_ << "\n"; return true; } bool Start() override { running_.store(true); worker_ = std::thread(&StorageNode::WorkerLoop, this); std::cout << "[storage] started\n"; return true; } void Stop() override { running_.store(false); if (input_queue_) input_queue_->Stop(); if (worker_.joinable()) worker_.join(); CloseCurrentFile(); std::cout << "[storage] stopped, recorded " << total_frames_ << " frames\n"; } void Drain() override { CloseCurrentFile(); } private: void WorkerLoop() { using namespace std::chrono; FramePtr frame; while (running_.load()) { if (!input_queue_->Pop(frame, milliseconds(200))) continue; if (!frame) continue; ProcessFrame(frame); ++total_frames_; } } void ProcessFrame(FramePtr frame) { // Check if we need to start a new segment auto now = std::chrono::steady_clock::now(); bool need_new_segment = false; if (!file_open_) { need_new_segment = true; } else if (segment_sec_ > 0) { auto elapsed = std::chrono::duration_cast( now - segment_start_); if (elapsed.count() >= segment_sec_) { need_new_segment = true; } } if (need_new_segment) { CloseCurrentFile(); OpenNewFile(frame); } WriteFrame(frame); } void OpenNewFile(FramePtr frame) { #if HAS_FFMPEG std::string filename = FormatTime(filename_pattern_) + "." + format_; current_path_ = base_path_ + "/" + filename; try { std::filesystem::path p(current_path_); if (p.has_parent_path()) { std::filesystem::create_directories(p.parent_path()); } } catch (...) {} const char* fmt_name = format_ == "ts" ? "mpegts" : "mp4"; if (avformat_alloc_output_context2(&fmt_ctx_, nullptr, fmt_name, current_path_.c_str()) < 0) { std::cerr << "[storage] failed to create output context\n"; return; } AVCodecID codec_id = (codec_ == "h265" || codec_ == "hevc") ? AV_CODEC_ID_HEVC : AV_CODEC_ID_H264; const AVCodec* encoder = avcodec_find_encoder(codec_id); stream_ = avformat_new_stream(fmt_ctx_, encoder); if (!stream_) { avformat_free_context(fmt_ctx_); fmt_ctx_ = nullptr; return; } stream_->time_base = AVRational{1, fps_}; stream_->codecpar->codec_type = AVMEDIA_TYPE_VIDEO; stream_->codecpar->codec_id = codec_id; stream_->codecpar->width = frame->width; stream_->codecpar->height = frame->height; stream_->codecpar->format = AV_PIX_FMT_YUV420P; #if HAS_MPP if (!mpp_encoder_) { mpp_encoder_ = std::make_unique(); if (!mpp_encoder_->Init(frame->width, frame->height, frame->format, codec_, fps_, bitrate_kbps_)) { mpp_encoder_.reset(); } } if (mpp_encoder_ && !mpp_encoder_->Header().empty()) { const auto& hdr = mpp_encoder_->Header(); stream_->codecpar->extradata_size = static_cast(hdr.size()); stream_->codecpar->extradata = static_cast( av_mallocz(hdr.size() + AV_INPUT_BUFFER_PADDING_SIZE)); std::memcpy(stream_->codecpar->extradata, hdr.data(), hdr.size()); } #endif if (!(fmt_ctx_->oformat->flags & AVFMT_NOFILE)) { if (avio_open(&fmt_ctx_->pb, current_path_.c_str(), AVIO_FLAG_WRITE) < 0) { avformat_free_context(fmt_ctx_); fmt_ctx_ = nullptr; return; } } if (avformat_write_header(fmt_ctx_, nullptr) < 0) { if (fmt_ctx_->pb) avio_closep(&fmt_ctx_->pb); avformat_free_context(fmt_ctx_); fmt_ctx_ = nullptr; return; } file_open_ = true; segment_start_ = std::chrono::steady_clock::now(); segment_frames_ = 0; std::cout << "[storage] opened: " << current_path_ << "\n"; #else std::cerr << "[storage] FFmpeg not enabled\n"; #endif } void WriteFrame(FramePtr frame) { #if HAS_FFMPEG && HAS_MPP if (!file_open_ || !fmt_ctx_ || !mpp_encoder_) return; std::vector encoded; bool is_key = false; if (!mpp_encoder_->Encode(frame, encoded, is_key)) return; if (encoded.empty()) return; AVPacket* pkt = av_packet_alloc(); if (av_new_packet(pkt, static_cast(encoded.size())) < 0) { av_packet_free(&pkt); return; } std::memcpy(pkt->data, encoded.data(), encoded.size()); pkt->stream_index = stream_->index; pkt->flags = is_key ? AV_PKT_FLAG_KEY : 0; pkt->pts = segment_frames_; pkt->dts = segment_frames_; pkt->duration = 1; av_packet_rescale_ts(pkt, AVRational{1, fps_}, stream_->time_base); av_interleaved_write_frame(fmt_ctx_, pkt); av_packet_free(&pkt); ++segment_frames_; #elif HAS_FFMPEG // Software encoding fallback would go here (void)frame; #endif } void CloseCurrentFile() { #if HAS_FFMPEG if (!file_open_ || !fmt_ctx_) return; av_write_trailer(fmt_ctx_); if (!(fmt_ctx_->oformat->flags & AVFMT_NOFILE)) { avio_closep(&fmt_ctx_->pb); } avformat_free_context(fmt_ctx_); fmt_ctx_ = nullptr; stream_ = nullptr; file_open_ = false; std::cout << "[storage] closed: " << current_path_ << " (" << segment_frames_ << " frames)\n"; #endif } std::string id_; std::string mode_; std::string format_; std::string codec_; int segment_sec_ = 300; std::string base_path_; std::string filename_pattern_; int fps_ = 25; int bitrate_kbps_ = 2000; std::atomic running_{false}; std::shared_ptr> input_queue_; std::thread worker_; uint64_t total_frames_ = 0; // Current file state bool file_open_ = false; std::string current_path_; std::chrono::steady_clock::time_point segment_start_; int64_t segment_frames_ = 0; #if HAS_FFMPEG AVFormatContext* fmt_ctx_ = nullptr; AVStream* stream_ = nullptr; #endif #if HAS_MPP std::unique_ptr mpp_encoder_; #endif }; REGISTER_NODE(StorageNode, "storage"); } // namespace rk3588