OrangePi3588Media/docs/architecture/dag_graph_node_edge.md
2026-02-26 12:14:03 +08:00

16 KiB
Raw Blame History

DAG 架构:图、节点与边

本文档介绍 RK3588 智能视频分析系统的核心架构设计——DAG有向无环图流水线架构


1. 核心概念

系统采用 DAG有向无环图 作为数据处理流水线的组织方式,包含三个核心概念:

概念 含义 类比
Graph 一个完整的业务通道/流水线 一条生产线
Node节点 功能基本单元,由动态加载的插件实现 生产线上的一台机器
Edge 节点间的数据传输通道 传送带

1.1 Graph

  • 一个 Graph 代表一个完整的视频处理业务流
  • 例如:摄像头的 "拉流→预处理→AI检测→OSD→推流" 就是一个 Graph
  • 系统可以同时运行多个 Graph如多路摄像头
  • Graph 由 JSON 配置定义,支持热更新

1.2 Node节点

  • 节点是功能基本单元,每个节点是一个独立的 .so 插件
  • 节点类型包括输入、预处理、AI推理、OSD、报警、存储、推流等
  • 每个节点通过 REGISTER_NODE 宏注册,实现 INode 接口

节点角色Role

角色 说明 示例
source 数据源,产生 Frame input_rtsp, input_file
filter 处理 Frame可修改元数据 preprocess, ai_yolo, osd
sink 消费 Frame无下游 alarm, storage, publish

1.3 Edge

  • 边是节点间的数据通道,每条边对应一个 SPSC单生产者单消费者环形队列
  • 队列中传递的是 std::shared_ptr<Frame>,实现多下游共享
  • 支持多分支:一个节点可以有多个下游节点(一对多)

2. 节点类型详解

系统目前支持的节点类型(插件)如下:

2.1 角色分类Role

Source数据源

  • 职责:从外部获取数据,产生 Frame 并注入流水线
  • 特点
    • Start() 中启动内部采集线程
    • Process() 通常为空实现或仅做心跳/监控
    • 无上游节点

Filter过滤器/处理器)

  • 职责:接收 Frame进行处理输出到下游
  • 特点
    • 可以修改 Frame 的元数据(如 det, face_det, face_recog
    • 可以有多个下游节点(分支)
    • 不修改图像数据本身(只读),除非特殊处理节点

Sink汇聚点/终点)

  • 职责:消费 Frame执行最终动作报警、存储、推流
  • 特点
    • 数据流的终点,无下游节点
    • 负责最终的数据消费和输出

2.2 输入类节点Source

节点类型 功能 关键参数
input_rtsp 拉取 RTSP 视频流,支持断线重连 url, reconnect_sec, force_tcp
input_file 读取本地视频文件MP4/MKV支持循环播放 path, loop, fps

2.3 处理类节点Filter

节点类型 功能 关键参数 硬件依赖
preprocess 图像预处理缩放、裁剪、格式转换NV12→RGB/BGR dst_w, dst_h, dst_format, use_rga RGA / FFmpeg
ai_yolo 通用目标检测YOLOv5/v8支持 80 类 COCO 目标 model_path, conf, nms, class_filter NPU(RKNN)
ai_face_det 人脸检测,输出人脸框和 5 点关键点 model_path, conf, nms, max_faces NPU(RKNN)
ai_face_recog 人脸识别:人脸对齐、特征提取、人脸库匹配 model_path, gallery, threshold, align NPU(RKNN)
det_post 检测结果后处理,支持按类别配置过滤策略 per_class, processors CPU
tracker 目标跟踪,为检测框分配稳定的 track_id mode, track_classes, max_age_ms CPU
osd 屏幕显示叠加,绘制检测框、文字、关键点 draw_bbox, draw_text, line_width RGA / CPU
gate 门控节点,根据条件控制数据是否放行 condition, drop_policy CPU

2.4 输出类节点Sink

节点类型 功能 关键参数 说明
alarm 报警处理:规则匹配 + 动作执行 rules, actions 支持 HTTP/MinIO/截图/录像
storage 持续录像存储,按时间切片 path, segment_sec, format 保存为 MP4/TS
publish 视频推流输出RTSP Server / HLS outputs, codec, bitrate 多协议输出
zlm_http 内嵌 HTTP 文件服务器,为 HLS 提供访问 port, root, prefix 基于 ZLMediaKit

2.5 节点类型速查表

【数据流向】

Source          Filter                         Sink
─────────────────────────────────────────────────────────
input_rtsp  →   preprocess  →  ai_yolo    →   alarm
input_file  →   ai_face_det →  osd        →   storage
                ai_face_recog   tracker    →   publish
                det_post        gate       →   zlm_http

3. 架构示例

3.1 人脸识别 Pipeline

以人脸识别流程为例:

[in_cam1] ──→ [pre_cam1] ──→ [face_det_cam1] ──→ [face_recog_cam1] ──→ [osd_cam1] ──→ [pub_cam1]
   ↑              ↑              ↑                  ↑              ↑           ↑
source       filter         filter             filter         filter       sink
(输入)      (预处理)       (人脸检测)          (人脸识别)      (绘制)      (输出)

对应的 JSON 配置:

{
  "name": "cam1_face_recog",
  "nodes": [
    { "id": "in_cam1", "type": "input_rtsp", "role": "source", "url": "rtsp://..." },
    { "id": "pre_cam1", "type": "preprocess", "role": "filter", "dst_w": 1280, "dst_h": 720 },
    { "id": "face_det_cam1", "type": "ai_face_det", "role": "filter", "model_path": "..." },
    { "id": "face_recog_cam1", "type": "ai_face_recog", "role": "filter", "model_path": "..." },
    { "id": "osd_cam1", "type": "osd", "role": "filter", "draw_face_det": true },
    { "id": "pub_cam1", "type": "publish", "role": "sink", "outputs": [...] }
  ],
  "edges": [
    ["in_cam1", "pre_cam1"],
    ["pre_cam1", "face_det_cam1"],
    ["face_det_cam1", "face_recog_cam1"],
    ["face_recog_cam1", "osd_cam1"],
    ["osd_cam1", "pub_cam1"]
  ]
}

3.2 多分支 Pipeline

支持一个节点输出到多个下游(分支处理):

[ai_yolo] ──→ [alarm]         (报警分支)
      └────→ [osd] ──→ [storage]   (可视化+存储分支)
             └────→ [publish]       (推流分支)

对应的 JSON 配置:

{
  "nodes": [
    { "id": "ai", "type": "ai_yolo", ... },
    { "id": "alarm", "type": "alarm", "role": "sink", ... },
    { "id": "osd", "type": "osd", ... },
    { "id": "storage", "type": "storage", "role": "sink", ... },
    { "id": "publish", "type": "publish", "role": "sink", ... }
  ],
  "edges": [
    ["ai", "alarm"],
    ["ai", "osd"],
    ["osd", "storage"],
    ["osd", "publish"]
  ]
}

4. 为什么采用 DAG 架构?

4.1 模块化 / 插件化(低耦合)

┌─────────────────────────────────────────────────────────┐
│  每个节点是一个独立的 .so 插件(动态库)                   │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │input_rtsp│  │ ai_yolo  │  │  alarm   │  ...          │
│  │  .so     │  │  .so     │  │  .so     │               │
│  └──────────┘  └──────────┘  └──────────┘              │
│       ↑            ↑            ↑                       │
│   独立开发      独立开发      独立开发                    │
│   独立测试      独立测试      独立测试                    │
└─────────────────────────────────────────────────────────┘
  • 算法与框架解耦AI 算法、业务逻辑与底层框架完全分离
  • 独立演进:新增功能只需开发新插件,不改动现有代码
  • 通过 REGISTER_NODE 宏自动注册插件

4.2 灵活组合(配置驱动)

同一套代码,通过不同配置实现不同场景:

场景 配置方式
纯转码网关 input_rtsp → publish(跳过所有 AI
AI 检测+报警 input → preprocess → ai_yolo → alarm
人脸识别 input → preprocess → face_det → face_recog → osd → publish
存储+推流 input → storage + publish(分支)

4.3 热更新(零停机)

旧配置运行中 ──→ 修改 config.json ──→ 框架自动 diff 
                                              ↓
                     能 UpdateConfig 的节点 ──→ 就地更新
                     需重建的节点 ──→ 创建新节点 → 原子切换边 → 销毁旧节点
                                              ↓
                                       失败则自动回滚
  • 改报警阈值、换模型参数、调整码率等无需重启进程
  • 业务不中断,运维友好

4.4 高性能流水线

┌────────────────────────────────────────────┐
│  边Edge= SPSC 无锁环形队列               │
│  ┌─────────┐    Queue    ┌─────────┐       │
│  │ Node A  │ ──────────→ │ Node B  │       │
│  │ (解码)  │  FramePtr   │ (AI推理)│       │
│  └─────────┘   共享指针   └─────────┘       │
│       ↓                                    │
│  零拷贝dma_fd + shared_ptr<Frame>         │
└────────────────────────────────────────────┘
  • SPSC 队列:单生产者单消费者,无锁、高吞吐
  • 零拷贝DMA-BUF 传递,视频数据不经过 CPU 复制
  • 并行处理:各节点在不同线程运行,流水线并行

4.5 多硬件解耦

┌─────────────────────────────────────────────┐
│  统一接口层          硬件实现层               │
│  ┌──────────────┐  ┌─────────────────────┐  │
│  │IInferBackend │← │Rk3588InferBackend   │  │
│  │(推理接口)     │  │(RKNN NPU)           │  │
│  └──────────────┘  └─────────────────────┘  │
│  ┌──────────────┐  ┌─────────────────────┐  │
│  │IImageProcess │← │Rk3588ImageProcessor │  │
│  │(图像处理)     │  │(RGA硬件加速)         │  │
│  └──────────────┘  └─────────────────────┘  │
│  ┌──────────────┐  ┌─────────────────────┐  │
│  │IDecoder      │← │Rk3588Decoder        │  │
│  │(解码接口)     │  │(MPP VDEC)           │  │
│  └──────────────┘  └─────────────────────┘  │
└─────────────────────────────────────────────┘
  • 业务节点只依赖抽象接口
  • 切换硬件平台(如从 RK3588 换到 Atlas/Jetson只需替换底层实现
  • 保持业务配置完全不变

4.6 数据契约标准化

所有节点通过统一的 Frame 结构传递数据:

struct Frame {
    // 图像数据
    int dma_fd;              // DMA-BUF 文件描述符(零拷贝)
    uint8_t* data;           // CPU 内存指针
    
    // AI 结果(各节点写入,下游只读)
    shared_ptr<DetectionResult> det;        // 通用检测结果
    shared_ptr<FaceDetResult> face_det;     // 人脸检测结果
    shared_ptr<FaceRecogResult> face_recog; // 人脸识别结果
    
    // 元数据
    uint64_t pts;            // 时间戳
    uint64_t frame_id;       // 帧序号
};
  • 约定优于配置:检测结果写 frame->det,人脸识别写 frame->face_recog
  • 下游节点OSD/Alarm读取标准字段不依赖具体 AI 模型

5. 线程模型

┌─────────────────────────────────────────────────────────────┐
│  Main Thread                                                │
│  ├── Graph Manager管理所有 Graph                          │
│  └── HTTP ServerREST API                                 │
└─────────────────────────────────────────────────────────────┘
                              │
┌─────────────────────────────────────────────────────────────┐
│  Graph "cam1_face_recog"  (独立线程池)                        │
│                                                             │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐ │
│  │ input    │──→│ preprocess│──→│ face_det │──→│ face_recog│ │
│  │ (IO线程)  │   │ (RGA线程) │   │ (NPU线程)│   │ (NPU线程) │ │
│  └──────────┘   └──────────┘   └──────────┘   └──────────┘ │
│                                     ↑                       │
│                              ┌────────────┐                │
│                              │ AiScheduler│ (NPU 统一调度)  │
│                              │ (NPU队列)   │                │
│                              └────────────┘                │
└─────────────────────────────────────────────────────────────┘
  • Source 节点(如 input_rtsp):内部有自己的采集线程
  • Filter/Sink 节点:由框架线程调用 Process(frame)
  • AiScheduler:统一管理 NPU 资源,避免多节点争抢

6. 总结

设计目标 实现方式
高性能 DAG 流水线 + SPSC 无锁队列 + DMA 零拷贝
低耦合 插件化(.so+ 接口抽象 + 数据契约
易运维 JSON 配置驱动 + 热更新 + 模板复用
可扩展 新增节点类型 → 编译 .so → 配置接入
跨平台 硬件抽象层HAL+ 运行时后端选择

这种设计让系统能够:

  • 单 RK3588 支持 8-16 路 1080p 实时分析
  • 端到端延迟 < 500ms
  • 同一套代码通过配置覆盖 安防、转码、存储、人脸识别 等多种场景

7. 相关文档