rtsp_processor/rtsp_processor_1.py
2025-09-17 11:13:12 +08:00

561 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
import time
import threading
import queue
from datetime import datetime, timedelta
from pathlib import Path
import subprocess
import logging
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
from collections import deque
import yaml
try:
from ultralytics import YOLO
except ImportError:
print("请安装ultralytics: pip install ultralytics")
raise
with open("./config/config.yaml", "r") as f:
config = yaml.safe_load(f)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class DetectionResult:
"""检测结果数据类"""
boxes: np.ndarray
confidences: np.ndarray
class_ids: np.ndarray
class_names: List[str]
timestamp: datetime
@dataclass
class AlarmConfig:
"""告警配置"""
target_classes: List[str] # 目标类别
confidence_threshold: float = 0.5 # 置信度阈值
alarm_duration: int = 10 # 告警录制时长(秒)
cooldown_duration: int = 30 # 告警冷却时间(秒)
save_path: str = "./alarm_videos" # 保存路径
class FrameBuffer:
"""帧缓冲区,用于告警录制"""
def __init__(self, max_duration: int = 60, fps: int = 25):
self.max_frames = max_duration * fps
self.buffer = deque(maxlen=self.max_frames)
self.fps = fps
def add_frame(self, frame: np.ndarray, timestamp: datetime):
self.buffer.append((frame.copy(), timestamp))
def get_frames_in_range(self, start_time: datetime, duration: int) -> List[Tuple[np.ndarray, datetime]]:
"""获取指定时间范围内的帧"""
end_time = start_time + timedelta(seconds=duration)
frames = []
for frame, timestamp in self.buffer:
if start_time <= timestamp <= end_time:
frames.append((frame, timestamp))
return frames
class YOLODetector:
"""YOLO检测器"""
def __init__(self, model_path: str = "yolov8n.pt"):
self.model = YOLO(model_path)
self.class_names = self.model.names
def detect(self, frame: np.ndarray, confidence_threshold: float = 0.5) -> DetectionResult:
"""执行目标检测"""
results = self.model(frame, conf=confidence_threshold, verbose=False)
if len(results) > 0 and results[0].boxes is not None:
boxes = results[0].boxes.xyxy.cpu().numpy()
confidences = results[0].boxes.conf.cpu().numpy()
class_ids = results[0].boxes.cls.cpu().numpy().astype(int)
class_names = [self.class_names[id] for id in class_ids]
else:
boxes = np.array([])
confidences = np.array([])
class_ids = np.array([])
class_names = []
return DetectionResult(
boxes=boxes,
confidences=confidences,
class_ids=class_ids,
class_names=class_names,
timestamp=datetime.now()
)
class AlarmManager:
"""告警管理器"""
def __init__(self, config: AlarmConfig):
self.config = config
self.last_alarm_time = None
self.is_alarming = False
self.alarm_start_time = None
# 创建保存目录
Path(config.save_path).mkdir(parents=True, exist_ok=True)
def check_alarm_trigger(self, detection_result: DetectionResult) -> bool:
"""检查是否触发告警"""
current_time = datetime.now()
# 检查冷却时间
if (self.last_alarm_time and
current_time - self.last_alarm_time < timedelta(seconds=self.config.cooldown_duration)):
return False
# 检查是否检测到目标类别
for class_name, confidence in zip(detection_result.class_names, detection_result.confidences):
if (class_name in self.config.target_classes and
confidence >= self.config.confidence_threshold):
return True
return False
def start_alarm(self) -> bool:
"""开始告警"""
if not self.is_alarming:
self.is_alarming = True
self.alarm_start_time = datetime.now()
logger.info(f"告警开始: {self.alarm_start_time}")
return True
return False
def should_stop_alarm(self) -> bool:
"""检查是否应该停止告警"""
if self.is_alarming and self.alarm_start_time:
duration = (datetime.now() - self.alarm_start_time).total_seconds()
return duration >= self.config.alarm_duration
return False
def stop_alarm(self):
"""停止告警"""
if self.is_alarming:
self.is_alarming = False
self.last_alarm_time = datetime.now()
logger.info(f"告警结束: {self.last_alarm_time}")
def save_alarm_video(self, frames: List[Tuple[np.ndarray, datetime]], fps: int = 25):
"""保存告警视频"""
if not frames:
logger.warning("没有帧数据可保存")
return None
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"alarm_{timestamp}.mp4"
filepath = Path(self.config.save_path) / filename
# 获取帧尺寸
height, width = frames[0][0].shape[:2]
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(str(filepath), fourcc, fps, (width, height))
try:
for frame, _ in frames:
out.write(frame)
logger.info(f"告警视频已保存: {filepath}")
return str(filepath)
except Exception as e:
logger.error(f"保存视频失败: {e}")
return None
finally:
out.release()
class StreamServer:
"""流媒体服务器接口"""
def __init__(self, output_url: str = "rtmp://127.0.0.1:1935/live/stream", use_gpu: bool = True):
self.output_url = output_url
self.ffmpeg_process = None
self.use_gpu = use_gpu
self.gpu_encoder = None
def detect_gpu_encoder(self):
"""检测可用的GPU编码器"""
# 检测NVIDIA GPU编码器优先级
gpu_encoders = ['h264_nvenc', 'hevc_nvenc', 'av1_nvenc']
for encoder in gpu_encoders:
try:
# 测试编码器是否可用
test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc2=size=320x240:duration=1',
'-c:v', encoder, '-f', 'null', '-']
result = subprocess.run(test_cmd, capture_output=True, timeout=10)
if result.returncode == 0:
logger.info(f"检测到可用的GPU编码器: {encoder}")
return encoder
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
continue
logger.warning("未检测到可用的GPU编码器将使用CPU编码")
return None
def start_streaming(self, width: int, height: int, fps: int = 25):
"""启动流媒体推送"""
# 检测GPU编码器
if self.use_gpu:
self.gpu_encoder = self.detect_gpu_encoder()
# 构建FFmpeg命令
ffmpeg_cmd = [
'ffmpeg',
'-y', # 覆盖输出文件
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', f'{width}x{height}',
'-r', str(fps),
'-i', '-', # 从stdin读取
]
# GPU硬件加速配置
if self.gpu_encoder:
if 'nvenc' in self.gpu_encoder:
# NVIDIA GPU编码配置
ffmpeg_cmd.extend([
'-c:v', self.gpu_encoder,
'-pix_fmt', 'yuv420p',
# NVENC特定参数
'-preset', 'p1', # 最快预设 (p1-p7, p1最快)
'-tune', 'll', # 低延迟调优
'-rc', 'cbr', # 恒定码率控制
'-b:v', '2M', # 视频码率
'-maxrate', '2M', # 最大码率
'-bufsize', '4M', # 缓冲区大小
'-g', str(fps * 2), # GOP大小2秒
'-keyint_min', str(fps), # 最小关键帧间隔
'-bf', '0', # B帧数量低延迟设为0
'-refs', '1', # 参考帧数量
'-spatial_aq', '1', # 空间自适应量化
'-temporal_aq', '1', # 时间自适应量化
'-rc-lookahead', '8', # 前瞻帧数
'-surfaces', '8', # 编码表面数量
])
else:
# 其他GPU编码器的通用配置
ffmpeg_cmd.extend([
'-c:v', self.gpu_encoder,
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'-b:v', '2M',
])
else:
# CPU编码配置回退方案
ffmpeg_cmd.extend([
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
'-tune', 'zerolatency', # 零延迟调优
'-crf', '23', # 恒定质量因子
'-maxrate', '2M', # 最大码率
'-bufsize', '4M', # 缓冲区大小
])
# 输出格式和URL
ffmpeg_cmd.extend([
'-f', 'flv',
self.output_url
])
try:
self.ffmpeg_process = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0 # 无缓冲,降低延迟
)
encoder_info = self.gpu_encoder if self.gpu_encoder else "libx264 (CPU)"
logger.info(f"流媒体服务启动: {self.output_url}, 编码器: {encoder_info}")
return True
except Exception as e:
logger.error(f"启动流媒体失败: {e}")
return False
def send_frame(self, frame: np.ndarray):
"""发送帧到流媒体服务器"""
if self.ffmpeg_process and self.ffmpeg_process.stdin:
try:
self.ffmpeg_process.stdin.write(frame.tobytes())
self.ffmpeg_process.stdin.flush()
except Exception as e:
logger.error(f"发送帧失败: {e}")
def stop_streaming(self):
"""停止流媒体推送"""
if self.ffmpeg_process:
self.ffmpeg_process.stdin.close()
self.ffmpeg_process.wait()
self.ffmpeg_process = None
logger.info("流媒体服务已停止")
class RTSPProcessor:
"""RTSP流处理器"""
def __init__(self,
rtsp_url: str,
model_path: str = "yolov8n.pt",
detection_interval: int = 5,
alarm_config: AlarmConfig = None,
output_stream_url: str = "rtmp://localhost:1935/live/stream"):
self.rtsp_url = rtsp_url
self.detection_interval = detection_interval
self.frame_count = 0
self.running = False
# 初始化组件
self.detector = YOLODetector(model_path)
self.alarm_manager = AlarmManager(alarm_config or AlarmConfig(target_classes=["person"]))
self.frame_buffer = FrameBuffer()
self.stream_server = StreamServer(output_stream_url, use_gpu=True) # 启用GPU加速
# 最后的检测结果
self.last_detection = None
# 线程安全队列
self.frame_queue = queue.Queue(maxsize=100)
def draw_detections(self, frame: np.ndarray, detection_result: DetectionResult) -> np.ndarray:
"""在帧上绘制检测结果"""
if detection_result is None or len(detection_result.boxes) == 0:
return frame
annotated_frame = frame.copy()
for box, confidence, class_name in zip(
detection_result.boxes,
detection_result.confidences,
detection_result.class_names
):
x1, y1, x2, y2 = box.astype(int)
# 绘制边界框
color = (0, 255, 0) # 绿色
if class_name in self.alarm_manager.config.target_classes:
color = (0, 0, 255) # 目标类别用红色
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
# 绘制标签
label = f"{class_name}: {confidence:.2f}"
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10),
(x1 + label_size[0], y1), color, -1)
cv2.putText(annotated_frame, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
# 添加告警状态显示
if self.alarm_manager.is_alarming:
cv2.putText(annotated_frame, "ALARM ACTIVE", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
# 添加时间戳
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cv2.putText(annotated_frame, timestamp, (10, annotated_frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return annotated_frame
def process_frame(self, frame: np.ndarray) -> np.ndarray:
"""处理单帧"""
current_time = datetime.now()
detection_result = None
# 按间隔进行检测
if self.frame_count % self.detection_interval == 0:
detection_result = self.detector.detect(frame,
self.alarm_manager.config.confidence_threshold)
self.last_detection = detection_result
# 检查告警触发
if self.alarm_manager.check_alarm_trigger(detection_result):
if self.alarm_manager.start_alarm():
# 告警开始时的处理
pass
# 使用最后的检测结果绘制
annotated_frame = self.draw_detections(frame, self.last_detection)
# 添加帧到缓冲区
self.frame_buffer.add_frame(annotated_frame, current_time)
# 检查告警结束
if self.alarm_manager.should_stop_alarm():
# 获取告警期间的帧
alarm_frames = self.frame_buffer.get_frames_in_range(
self.alarm_manager.alarm_start_time,
self.alarm_manager.config.alarm_duration
)
# 保存告警视频
if alarm_frames:
threading.Thread(
target=self.alarm_manager.save_alarm_video,
args=(alarm_frames, 25),
daemon=True
).start()
self.alarm_manager.stop_alarm()
return annotated_frame
def connect_to_rtsp_stream(self, url):
""" 尝试连接到 RTSP 流 """
cap = cv2.VideoCapture(url)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'HEVC'))
if not cap.isOpened():
logger.error(f"Failed to connect to {url}")
return None
return cap
def capture_frames(self):
"""捕获RTSP流帧"""
cap = cv2.VideoCapture(self.rtsp_url)
if not cap.isOpened():
logger.error(f"无法打开RTSP流: {self.rtsp_url}")
return
# 设置缓冲区大小
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
logger.info(f"开始捕获RTSP流: {self.rtsp_url}")
try:
while self.running:
ret, frame = cap.read()
if not ret:
logger.warning("读取帧失败,尝试重连...")
cap.release()
time.sleep(5)
cap = cv2.VideoCapture(self.rtsp_url)
continue
if not self.frame_queue.full():
self.frame_queue.put(frame)
else:
# 丢弃最旧的帧
try:
self.frame_queue.get_nowait()
self.frame_queue.put(frame)
except queue.Empty:
pass
finally:
cap.release()
logger.info("RTSP捕获已停止")
def process_frames(self):
"""处理帧线程"""
first_frame = True
while self.running:
try:
frame = self.frame_queue.get(timeout=1)
# 初始化流媒体服务器
if first_frame:
height, width = frame.shape[:2]
if self.stream_server.start_streaming(width, height):
first_frame = False
else:
logger.error("流媒体服务器启动失败")
break
# 处理帧
processed_frame = self.process_frame(frame)
# 发送到流媒体服务器
self.stream_server.send_frame(processed_frame)
self.frame_count += 1
except queue.Empty:
continue
except Exception as e:
logger.error(f"处理帧时出错: {e}")
logger.info("帧处理已停止")
def start(self):
"""启动处理器"""
self.running = True
# 启动捕获线程
capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
capture_thread.start()
# 启动处理线程
process_thread = threading.Thread(target=self.process_frames, daemon=True)
process_thread.start()
logger.info("RTSP处理器已启动")
return capture_thread, process_thread
def stop(self):
"""停止处理器"""
self.running = False
self.stream_server.stop_streaming()
logger.info("RTSP处理器已停止")
def main():
"""主函数"""
# 配置参数
RTSP_URL = config["RTSP_URL"] # 替换为实际RTSP地址
MODEL_PATH = config["MODEL_PATH"] # YOLO模型路径
DETECTION_INTERVAL = config["DETECTION_INTERVAL"] # 每5帧检测一次
OUTPUT_STREAM_URL = config["OUTPUT_STREAM_URL"] # 输出流地址
# 告警配置
alarm_config = AlarmConfig(
target_classes=config["TARGET_CLASSES"], # 目标类别
confidence_threshold=config["confidence_threshold"],
alarm_duration=config["alarm_duration"], # 告警录制15秒
cooldown_duration=config["cooldown_duration"], # 冷却60秒
save_path=config["alarm_save_path"],
)
# 创建处理器
processor = RTSPProcessor(
rtsp_url=RTSP_URL,
model_path=MODEL_PATH,
detection_interval=DETECTION_INTERVAL,
alarm_config=alarm_config,
output_stream_url=OUTPUT_STREAM_URL,
)
try:
# 启动处理
threads = processor.start()
# 保持运行
logger.info("系统运行中按Ctrl+C停止...")
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("收到停止信号")
finally:
processor.stop()
if __name__ == "__main__":
main()