import cv2 import numpy as np import time import threading import queue from datetime import datetime, timedelta from pathlib import Path import subprocess import logging from typing import List, Dict, Tuple, Optional from dataclasses import dataclass from collections import deque import json try: from ultralytics import YOLO except ImportError: print("请安装ultralytics: pip install ultralytics") raise # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class DetectionResult: """检测结果数据类""" boxes: np.ndarray confidences: np.ndarray class_ids: np.ndarray class_names: List[str] timestamp: datetime @dataclass class AlarmConfig: """告警配置""" target_classes: List[str] # 目标类别 confidence_threshold: float = 0.5 # 置信度阈值 alarm_duration: int = 10 # 告警录制时长(秒) cooldown_duration: int = 30 # 告警冷却时间(秒) save_path: str = "./alarm_videos" # 保存路径 class FrameBuffer: """帧缓冲区,用于告警录制""" def __init__(self, max_duration: int = 60, fps: int = 25): self.max_frames = max_duration * fps self.buffer = deque(maxlen=self.max_frames) self.fps = fps def add_frame(self, frame: np.ndarray, timestamp: datetime): self.buffer.append((frame.copy(), timestamp)) def get_frames_in_range(self, start_time: datetime, duration: int) -> List[Tuple[np.ndarray, datetime]]: """获取指定时间范围内的帧""" end_time = start_time + timedelta(seconds=duration) frames = [] for frame, timestamp in self.buffer: if start_time <= timestamp <= end_time: frames.append((frame, timestamp)) return frames class YOLODetector: """YOLO检测器""" def __init__(self, model_path: str = "yolov8n.pt"): self.model = YOLO(model_path) self.class_names = self.model.names def detect(self, frame: np.ndarray, confidence_threshold: float = 0.5) -> DetectionResult: """执行目标检测""" results = self.model(frame, conf=confidence_threshold, verbose=False) if len(results) > 0 and results[0].boxes is not None: boxes = results[0].boxes.xyxy.cpu().numpy() confidences = results[0].boxes.conf.cpu().numpy() class_ids = results[0].boxes.cls.cpu().numpy().astype(int) class_names = [self.class_names[id] for id in class_ids] else: boxes = np.array([]) confidences = np.array([]) class_ids = np.array([]) class_names = [] return DetectionResult( boxes=boxes, confidences=confidences, class_ids=class_ids, class_names=class_names, timestamp=datetime.now() ) class AlarmManager: """告警管理器""" def __init__(self, config: AlarmConfig): self.config = config self.last_alarm_time = None self.is_alarming = False self.alarm_start_time = None # 创建保存目录 Path(config.save_path).mkdir(parents=True, exist_ok=True) def check_alarm_trigger(self, detection_result: DetectionResult) -> bool: """检查是否触发告警""" current_time = datetime.now() # 检查冷却时间 if (self.last_alarm_time and current_time - self.last_alarm_time < timedelta(seconds=self.config.cooldown_duration)): return False # 检查是否检测到目标类别 for class_name, confidence in zip(detection_result.class_names, detection_result.confidences): if (class_name in self.config.target_classes and confidence >= self.config.confidence_threshold): return True return False def start_alarm(self) -> bool: """开始告警""" if not self.is_alarming: self.is_alarming = True self.alarm_start_time = datetime.now() logger.info(f"告警开始: {self.alarm_start_time}") return True return False def should_stop_alarm(self) -> bool: """检查是否应该停止告警""" if self.is_alarming and self.alarm_start_time: duration = (datetime.now() - self.alarm_start_time).total_seconds() return duration >= self.config.alarm_duration return False def stop_alarm(self): """停止告警""" if self.is_alarming: self.is_alarming = False self.last_alarm_time = datetime.now() logger.info(f"告警结束: {self.last_alarm_time}") def save_alarm_video(self, frames: List[Tuple[np.ndarray, datetime]], fps: int = 25): """保存告警视频""" if not frames: logger.warning("没有帧数据可保存") return None timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"alarm_{timestamp}.mp4" filepath = Path(self.config.save_path) / filename # 获取帧尺寸 height, width = frames[0][0].shape[:2] # 创建视频写入器 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(str(filepath), fourcc, fps, (width, height)) try: for frame, _ in frames: out.write(frame) logger.info(f"告警视频已保存: {filepath}") return str(filepath) except Exception as e: logger.error(f"保存视频失败: {e}") return None finally: out.release() class StreamServer: """流媒体服务器接口""" def __init__(self, output_url: str = "rtmp://localhost:1935/live/stream"): self.output_url = output_url self.ffmpeg_process = None def start_streaming(self, width: int, height: int, fps: int = 25): """启动流媒体推送""" ffmpeg_cmd = [ 'ffmpeg', '-y', # 覆盖输出文件 '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-s', f'{width}x{height}', '-r', str(fps), '-i', '-', # 从stdin读取 '-c:v', 'libx264', '-pix_fmt', 'yuv420p', '-preset', 'ultrafast', '-f', 'flv', self.output_url ] try: self.ffmpeg_process = subprocess.Popen( ffmpeg_cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE ) logger.info(f"流媒体服务启动: {self.output_url}") return True except Exception as e: logger.error(f"启动流媒体失败: {e}") return False def send_frame(self, frame: np.ndarray): """发送帧到流媒体服务器""" if self.ffmpeg_process and self.ffmpeg_process.stdin: try: self.ffmpeg_process.stdin.write(frame.tobytes()) self.ffmpeg_process.stdin.flush() except Exception as e: logger.error(f"发送帧失败: {e}") def stop_streaming(self): """停止流媒体推送""" if self.ffmpeg_process: self.ffmpeg_process.stdin.close() self.ffmpeg_process.wait() self.ffmpeg_process = None logger.info("流媒体服务已停止") class RTSPProcessor: """RTSP流处理器""" def __init__(self, rtsp_url: str, model_path: str = "yolov8n.pt", detection_interval: int = 5, alarm_config: AlarmConfig = None, output_stream_url: str = "rtmp://localhost:1935/live/stream"): self.rtsp_url = rtsp_url self.detection_interval = detection_interval self.frame_count = 0 self.running = False # 初始化组件 self.detector = YOLODetector(model_path) self.alarm_manager = AlarmManager(alarm_config or AlarmConfig(target_classes=["person"])) self.frame_buffer = FrameBuffer() self.stream_server = StreamServer(output_stream_url) # 最后的检测结果 self.last_detection = None # 线程安全队列 self.frame_queue = queue.Queue(maxsize=100) def draw_detections(self, frame: np.ndarray, detection_result: DetectionResult) -> np.ndarray: """在帧上绘制检测结果""" if detection_result is None or len(detection_result.boxes) == 0: return frame annotated_frame = frame.copy() for box, confidence, class_name in zip( detection_result.boxes, detection_result.confidences, detection_result.class_names ): x1, y1, x2, y2 = box.astype(int) # 绘制边界框 color = (0, 255, 0) # 绿色 if class_name in self.alarm_manager.config.target_classes: color = (0, 0, 255) # 目标类别用红色 cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) # 绘制标签 label = f"{class_name}: {confidence:.2f}" label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10), (x1 + label_size[0], y1), color, -1) cv2.putText(annotated_frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) # 添加告警状态显示 if self.alarm_manager.is_alarming: cv2.putText(annotated_frame, "ALARM ACTIVE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3) # 添加时间戳 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cv2.putText(annotated_frame, timestamp, (10, annotated_frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) return annotated_frame def process_frame(self, frame: np.ndarray) -> np.ndarray: """处理单帧""" current_time = datetime.now() detection_result = None # 按间隔进行检测 if self.frame_count % self.detection_interval == 0: detection_result = self.detector.detect(frame, self.alarm_manager.config.confidence_threshold) self.last_detection = detection_result # 检查告警触发 if self.alarm_manager.check_alarm_trigger(detection_result): if self.alarm_manager.start_alarm(): # 告警开始时的处理 pass # 使用最后的检测结果绘制 annotated_frame = self.draw_detections(frame, self.last_detection) # 添加帧到缓冲区 self.frame_buffer.add_frame(annotated_frame, current_time) # 检查告警结束 if self.alarm_manager.should_stop_alarm(): # 获取告警期间的帧 alarm_frames = self.frame_buffer.get_frames_in_range( self.alarm_manager.alarm_start_time, self.alarm_manager.config.alarm_duration ) # 保存告警视频 if alarm_frames: threading.Thread( target=self.alarm_manager.save_alarm_video, args=(alarm_frames, 25), daemon=True ).start() self.alarm_manager.stop_alarm() return annotated_frame def capture_frames(self): """捕获RTSP流帧""" cap = cv2.VideoCapture(self.rtsp_url) if not cap.isOpened(): logger.error(f"无法打开RTSP流: {self.rtsp_url}") return # 设置缓冲区大小 cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) logger.info(f"开始捕获RTSP流: {self.rtsp_url}") try: while self.running: ret, frame = cap.read() if not ret: logger.warning("读取帧失败,尝试重连...") cap.release() time.sleep(5) cap = cv2.VideoCapture(self.rtsp_url) continue if not self.frame_queue.full(): self.frame_queue.put(frame) else: # 丢弃最旧的帧 try: self.frame_queue.get_nowait() self.frame_queue.put(frame) except queue.Empty: pass finally: cap.release() logger.info("RTSP捕获已停止") def process_frames(self): """处理帧线程""" first_frame = True while self.running: try: frame = self.frame_queue.get(timeout=1) # 初始化流媒体服务器 if first_frame: height, width = frame.shape[:2] if self.stream_server.start_streaming(width, height): first_frame = False else: logger.error("流媒体服务器启动失败") break # 处理帧 processed_frame = self.process_frame(frame) # 发送到流媒体服务器 self.stream_server.send_frame(processed_frame) self.frame_count += 1 except queue.Empty: continue except Exception as e: logger.error(f"处理帧时出错: {e}") logger.info("帧处理已停止") def start(self): """启动处理器""" self.running = True # 启动捕获线程 capture_thread = threading.Thread(target=self.capture_frames, daemon=True) capture_thread.start() # 启动处理线程 process_thread = threading.Thread(target=self.process_frames, daemon=True) process_thread.start() logger.info("RTSP处理器已启动") return capture_thread, process_thread def stop(self): """停止处理器""" self.running = False self.stream_server.stop_streaming() logger.info("RTSP处理器已停止") def main(): """主函数""" # 配置参数 RTSP_URL = "rtsp://10.0.0.17:8554/camera_test/2" # 替换为实际RTSP地址 MODEL_PATH = "yolov8n.pt" # YOLO模型路径 DETECTION_INTERVAL = 5 # 每5帧检测一次 OUTPUT_STREAM_URL = "rtmp://localhost:1935/live/processed" # 输出流地址 # 告警配置 alarm_config = AlarmConfig( target_classes=["person"], # 目标类别 confidence_threshold=0.6, alarm_duration=15, # 告警录制15秒 cooldown_duration=60*5, # 冷却60秒 save_path="./alarm_videos" ) # 创建处理器 processor = RTSPProcessor( rtsp_url=RTSP_URL, model_path=MODEL_PATH, detection_interval=DETECTION_INTERVAL, alarm_config=alarm_config, output_stream_url=OUTPUT_STREAM_URL ) try: # 启动处理 threads = processor.start() # 保持运行 logger.info("系统运行中,按Ctrl+C停止...") while True: time.sleep(1) except KeyboardInterrupt: logger.info("收到停止信号") finally: processor.stop() if __name__ == "__main__": main()