diff --git a/rtsp_processor_0.py b/rtsp_processor_0.py new file mode 100644 index 0000000..f078e34 --- /dev/null +++ b/rtsp_processor_0.py @@ -0,0 +1,474 @@ +import cv2 +import numpy as np +import time +import threading +import queue +from datetime import datetime, timedelta +from pathlib import Path +import subprocess +import logging +from typing import List, Dict, Tuple, Optional +from dataclasses import dataclass +from collections import deque +import json + +try: + from ultralytics import YOLO +except ImportError: + print("请安装ultralytics: pip install ultralytics") + raise + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +@dataclass +class DetectionResult: + """检测结果数据类""" + boxes: np.ndarray + confidences: np.ndarray + class_ids: np.ndarray + class_names: List[str] + timestamp: datetime + +@dataclass +class AlarmConfig: + """告警配置""" + target_classes: List[str] # 目标类别 + confidence_threshold: float = 0.5 # 置信度阈值 + alarm_duration: int = 10 # 告警录制时长(秒) + cooldown_duration: int = 30 # 告警冷却时间(秒) + save_path: str = "./alarm_videos" # 保存路径 + +class FrameBuffer: + """帧缓冲区,用于告警录制""" + def __init__(self, max_duration: int = 60, fps: int = 25): + self.max_frames = max_duration * fps + self.buffer = deque(maxlen=self.max_frames) + self.fps = fps + + def add_frame(self, frame: np.ndarray, timestamp: datetime): + self.buffer.append((frame.copy(), timestamp)) + + def get_frames_in_range(self, start_time: datetime, duration: int) -> List[Tuple[np.ndarray, datetime]]: + """获取指定时间范围内的帧""" + end_time = start_time + timedelta(seconds=duration) + frames = [] + + for frame, timestamp in self.buffer: + if start_time <= timestamp <= end_time: + frames.append((frame, timestamp)) + + return frames + +class YOLODetector: + """YOLO检测器""" + def __init__(self, model_path: str = "yolov8n.pt"): + self.model = YOLO(model_path) + self.class_names = self.model.names + + def detect(self, frame: np.ndarray, confidence_threshold: float = 0.5) -> DetectionResult: + """执行目标检测""" + results = self.model(frame, conf=confidence_threshold, verbose=False) + + if len(results) > 0 and results[0].boxes is not None: + boxes = results[0].boxes.xyxy.cpu().numpy() + confidences = results[0].boxes.conf.cpu().numpy() + class_ids = results[0].boxes.cls.cpu().numpy().astype(int) + class_names = [self.class_names[id] for id in class_ids] + else: + boxes = np.array([]) + confidences = np.array([]) + class_ids = np.array([]) + class_names = [] + + return DetectionResult( + boxes=boxes, + confidences=confidences, + class_ids=class_ids, + class_names=class_names, + timestamp=datetime.now() + ) + +class AlarmManager: + """告警管理器""" + def __init__(self, config: AlarmConfig): + self.config = config + self.last_alarm_time = None + self.is_alarming = False + self.alarm_start_time = None + + # 创建保存目录 + Path(config.save_path).mkdir(parents=True, exist_ok=True) + + def check_alarm_trigger(self, detection_result: DetectionResult) -> bool: + """检查是否触发告警""" + current_time = datetime.now() + + # 检查冷却时间 + if (self.last_alarm_time and + current_time - self.last_alarm_time < timedelta(seconds=self.config.cooldown_duration)): + return False + + # 检查是否检测到目标类别 + for class_name, confidence in zip(detection_result.class_names, detection_result.confidences): + if (class_name in self.config.target_classes and + confidence >= self.config.confidence_threshold): + return True + + return False + + def start_alarm(self) -> bool: + """开始告警""" + if not self.is_alarming: + self.is_alarming = True + self.alarm_start_time = datetime.now() + logger.info(f"告警开始: {self.alarm_start_time}") + return True + return False + + def should_stop_alarm(self) -> bool: + """检查是否应该停止告警""" + if self.is_alarming and self.alarm_start_time: + duration = (datetime.now() - self.alarm_start_time).total_seconds() + return duration >= self.config.alarm_duration + return False + + def stop_alarm(self): + """停止告警""" + if self.is_alarming: + self.is_alarming = False + self.last_alarm_time = datetime.now() + logger.info(f"告警结束: {self.last_alarm_time}") + + def save_alarm_video(self, frames: List[Tuple[np.ndarray, datetime]], fps: int = 25): + """保存告警视频""" + if not frames: + logger.warning("没有帧数据可保存") + return None + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"alarm_{timestamp}.mp4" + filepath = Path(self.config.save_path) / filename + + # 获取帧尺寸 + height, width = frames[0][0].shape[:2] + + # 创建视频写入器 + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(str(filepath), fourcc, fps, (width, height)) + + try: + for frame, _ in frames: + out.write(frame) + + logger.info(f"告警视频已保存: {filepath}") + return str(filepath) + + except Exception as e: + logger.error(f"保存视频失败: {e}") + return None + + finally: + out.release() + +class StreamServer: + """流媒体服务器接口""" + def __init__(self, output_url: str = "rtmp://localhost:1935/live/stream"): + self.output_url = output_url + self.ffmpeg_process = None + + def start_streaming(self, width: int, height: int, fps: int = 25): + """启动流媒体推送""" + ffmpeg_cmd = [ + 'ffmpeg', + '-y', # 覆盖输出文件 + '-f', 'rawvideo', + '-vcodec', 'rawvideo', + '-pix_fmt', 'bgr24', + '-s', f'{width}x{height}', + '-r', str(fps), + '-i', '-', # 从stdin读取 + '-c:v', 'libx264', + '-pix_fmt', 'yuv420p', + '-preset', 'ultrafast', + '-f', 'flv', + self.output_url + ] + + try: + self.ffmpeg_process = subprocess.Popen( + ffmpeg_cmd, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE + ) + logger.info(f"流媒体服务启动: {self.output_url}") + return True + except Exception as e: + logger.error(f"启动流媒体失败: {e}") + return False + + def send_frame(self, frame: np.ndarray): + """发送帧到流媒体服务器""" + if self.ffmpeg_process and self.ffmpeg_process.stdin: + try: + self.ffmpeg_process.stdin.write(frame.tobytes()) + self.ffmpeg_process.stdin.flush() + except Exception as e: + logger.error(f"发送帧失败: {e}") + + def stop_streaming(self): + """停止流媒体推送""" + if self.ffmpeg_process: + self.ffmpeg_process.stdin.close() + self.ffmpeg_process.wait() + self.ffmpeg_process = None + logger.info("流媒体服务已停止") + +class RTSPProcessor: + """RTSP流处理器""" + def __init__(self, + rtsp_url: str, + model_path: str = "yolov8n.pt", + detection_interval: int = 5, + alarm_config: AlarmConfig = None, + output_stream_url: str = "rtmp://localhost:1935/live/stream"): + + self.rtsp_url = rtsp_url + self.detection_interval = detection_interval + self.frame_count = 0 + self.running = False + + # 初始化组件 + self.detector = YOLODetector(model_path) + self.alarm_manager = AlarmManager(alarm_config or AlarmConfig(target_classes=["person"])) + self.frame_buffer = FrameBuffer() + self.stream_server = StreamServer(output_stream_url) + + # 最后的检测结果 + self.last_detection = None + + # 线程安全队列 + self.frame_queue = queue.Queue(maxsize=100) + + def draw_detections(self, frame: np.ndarray, detection_result: DetectionResult) -> np.ndarray: + """在帧上绘制检测结果""" + if detection_result is None or len(detection_result.boxes) == 0: + return frame + + annotated_frame = frame.copy() + + for box, confidence, class_name in zip( + detection_result.boxes, + detection_result.confidences, + detection_result.class_names + ): + x1, y1, x2, y2 = box.astype(int) + + # 绘制边界框 + color = (0, 255, 0) # 绿色 + if class_name in self.alarm_manager.config.target_classes: + color = (0, 0, 255) # 目标类别用红色 + + cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) + + # 绘制标签 + label = f"{class_name}: {confidence:.2f}" + label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] + cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10), + (x1 + label_size[0], y1), color, -1) + cv2.putText(annotated_frame, label, (x1, y1 - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) + + # 添加告警状态显示 + if self.alarm_manager.is_alarming: + cv2.putText(annotated_frame, "ALARM ACTIVE", (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3) + + # 添加时间戳 + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + cv2.putText(annotated_frame, timestamp, (10, annotated_frame.shape[0] - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + return annotated_frame + + def process_frame(self, frame: np.ndarray) -> np.ndarray: + """处理单帧""" + current_time = datetime.now() + detection_result = None + + # 按间隔进行检测 + if self.frame_count % self.detection_interval == 0: + detection_result = self.detector.detect(frame, + self.alarm_manager.config.confidence_threshold) + self.last_detection = detection_result + + # 检查告警触发 + if self.alarm_manager.check_alarm_trigger(detection_result): + if self.alarm_manager.start_alarm(): + # 告警开始时的处理 + pass + + # 使用最后的检测结果绘制 + annotated_frame = self.draw_detections(frame, self.last_detection) + + # 添加帧到缓冲区 + self.frame_buffer.add_frame(annotated_frame, current_time) + + # 检查告警结束 + if self.alarm_manager.should_stop_alarm(): + # 获取告警期间的帧 + alarm_frames = self.frame_buffer.get_frames_in_range( + self.alarm_manager.alarm_start_time, + self.alarm_manager.config.alarm_duration + ) + + # 保存告警视频 + if alarm_frames: + threading.Thread( + target=self.alarm_manager.save_alarm_video, + args=(alarm_frames, 25), + daemon=True + ).start() + + self.alarm_manager.stop_alarm() + + return annotated_frame + + def capture_frames(self): + """捕获RTSP流帧""" + cap = cv2.VideoCapture(self.rtsp_url) + + if not cap.isOpened(): + logger.error(f"无法打开RTSP流: {self.rtsp_url}") + return + + # 设置缓冲区大小 + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + logger.info(f"开始捕获RTSP流: {self.rtsp_url}") + + try: + while self.running: + ret, frame = cap.read() + if not ret: + logger.warning("读取帧失败,尝试重连...") + cap.release() + time.sleep(5) + cap = cv2.VideoCapture(self.rtsp_url) + continue + + if not self.frame_queue.full(): + self.frame_queue.put(frame) + else: + # 丢弃最旧的帧 + try: + self.frame_queue.get_nowait() + self.frame_queue.put(frame) + except queue.Empty: + pass + + finally: + cap.release() + logger.info("RTSP捕获已停止") + + def process_frames(self): + """处理帧线程""" + first_frame = True + + while self.running: + try: + frame = self.frame_queue.get(timeout=1) + + # 初始化流媒体服务器 + if first_frame: + height, width = frame.shape[:2] + if self.stream_server.start_streaming(width, height): + first_frame = False + else: + logger.error("流媒体服务器启动失败") + break + + # 处理帧 + processed_frame = self.process_frame(frame) + + # 发送到流媒体服务器 + self.stream_server.send_frame(processed_frame) + + self.frame_count += 1 + + except queue.Empty: + continue + except Exception as e: + logger.error(f"处理帧时出错: {e}") + + logger.info("帧处理已停止") + + def start(self): + """启动处理器""" + self.running = True + + # 启动捕获线程 + capture_thread = threading.Thread(target=self.capture_frames, daemon=True) + capture_thread.start() + + # 启动处理线程 + process_thread = threading.Thread(target=self.process_frames, daemon=True) + process_thread.start() + + logger.info("RTSP处理器已启动") + + return capture_thread, process_thread + + def stop(self): + """停止处理器""" + self.running = False + self.stream_server.stop_streaming() + logger.info("RTSP处理器已停止") + +def main(): + """主函数""" + # 配置参数 + RTSP_URL = "rtsp://10.0.0.17:8554/camera_test/2" # 替换为实际RTSP地址 + MODEL_PATH = "yolov8n.pt" # YOLO模型路径 + DETECTION_INTERVAL = 5 # 每5帧检测一次 + OUTPUT_STREAM_URL = "rtmp://localhost:1935/live/processed" # 输出流地址 + + # 告警配置 + alarm_config = AlarmConfig( + target_classes=["person"], # 目标类别 + confidence_threshold=0.6, + alarm_duration=15, # 告警录制15秒 + cooldown_duration=60*5, # 冷却60秒 + save_path="./alarm_videos" + ) + + # 创建处理器 + processor = RTSPProcessor( + rtsp_url=RTSP_URL, + model_path=MODEL_PATH, + detection_interval=DETECTION_INTERVAL, + alarm_config=alarm_config, + output_stream_url=OUTPUT_STREAM_URL + ) + + try: + # 启动处理 + threads = processor.start() + + # 保持运行 + logger.info("系统运行中,按Ctrl+C停止...") + while True: + time.sleep(1) + + except KeyboardInterrupt: + logger.info("收到停止信号") + + finally: + processor.stop() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/rtsp_processor_1.py b/rtsp_processor_1.py new file mode 100644 index 0000000..444f744 --- /dev/null +++ b/rtsp_processor_1.py @@ -0,0 +1,548 @@ +import cv2 +import numpy as np +import time +import threading +import queue +from datetime import datetime, timedelta +from pathlib import Path +import subprocess +import logging +from typing import List, Dict, Tuple, Optional +from dataclasses import dataclass +from collections import deque +import json + +try: + from ultralytics import YOLO +except ImportError: + print("请安装ultralytics: pip install ultralytics") + raise + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +@dataclass +class DetectionResult: + """检测结果数据类""" + boxes: np.ndarray + confidences: np.ndarray + class_ids: np.ndarray + class_names: List[str] + timestamp: datetime + +@dataclass +class AlarmConfig: + """告警配置""" + target_classes: List[str] # 目标类别 + confidence_threshold: float = 0.5 # 置信度阈值 + alarm_duration: int = 10 # 告警录制时长(秒) + cooldown_duration: int = 30 # 告警冷却时间(秒) + save_path: str = "./alarm_videos" # 保存路径 + +class FrameBuffer: + """帧缓冲区,用于告警录制""" + def __init__(self, max_duration: int = 60, fps: int = 25): + self.max_frames = max_duration * fps + self.buffer = deque(maxlen=self.max_frames) + self.fps = fps + + def add_frame(self, frame: np.ndarray, timestamp: datetime): + self.buffer.append((frame.copy(), timestamp)) + + def get_frames_in_range(self, start_time: datetime, duration: int) -> List[Tuple[np.ndarray, datetime]]: + """获取指定时间范围内的帧""" + end_time = start_time + timedelta(seconds=duration) + frames = [] + + for frame, timestamp in self.buffer: + if start_time <= timestamp <= end_time: + frames.append((frame, timestamp)) + + return frames + +class YOLODetector: + """YOLO检测器""" + def __init__(self, model_path: str = "yolov8n.pt"): + self.model = YOLO(model_path) + self.class_names = self.model.names + + def detect(self, frame: np.ndarray, confidence_threshold: float = 0.5) -> DetectionResult: + """执行目标检测""" + results = self.model(frame, conf=confidence_threshold, verbose=False) + + if len(results) > 0 and results[0].boxes is not None: + boxes = results[0].boxes.xyxy.cpu().numpy() + confidences = results[0].boxes.conf.cpu().numpy() + class_ids = results[0].boxes.cls.cpu().numpy().astype(int) + class_names = [self.class_names[id] for id in class_ids] + else: + boxes = np.array([]) + confidences = np.array([]) + class_ids = np.array([]) + class_names = [] + + return DetectionResult( + boxes=boxes, + confidences=confidences, + class_ids=class_ids, + class_names=class_names, + timestamp=datetime.now() + ) + +class AlarmManager: + """告警管理器""" + def __init__(self, config: AlarmConfig): + self.config = config + self.last_alarm_time = None + self.is_alarming = False + self.alarm_start_time = None + + # 创建保存目录 + Path(config.save_path).mkdir(parents=True, exist_ok=True) + + def check_alarm_trigger(self, detection_result: DetectionResult) -> bool: + """检查是否触发告警""" + current_time = datetime.now() + + # 检查冷却时间 + if (self.last_alarm_time and + current_time - self.last_alarm_time < timedelta(seconds=self.config.cooldown_duration)): + return False + + # 检查是否检测到目标类别 + for class_name, confidence in zip(detection_result.class_names, detection_result.confidences): + if (class_name in self.config.target_classes and + confidence >= self.config.confidence_threshold): + return True + + return False + + def start_alarm(self) -> bool: + """开始告警""" + if not self.is_alarming: + self.is_alarming = True + self.alarm_start_time = datetime.now() + logger.info(f"告警开始: {self.alarm_start_time}") + return True + return False + + def should_stop_alarm(self) -> bool: + """检查是否应该停止告警""" + if self.is_alarming and self.alarm_start_time: + duration = (datetime.now() - self.alarm_start_time).total_seconds() + return duration >= self.config.alarm_duration + return False + + def stop_alarm(self): + """停止告警""" + if self.is_alarming: + self.is_alarming = False + self.last_alarm_time = datetime.now() + logger.info(f"告警结束: {self.last_alarm_time}") + + def save_alarm_video(self, frames: List[Tuple[np.ndarray, datetime]], fps: int = 25): + """保存告警视频""" + if not frames: + logger.warning("没有帧数据可保存") + return None + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"alarm_{timestamp}.mp4" + filepath = Path(self.config.save_path) / filename + + # 获取帧尺寸 + height, width = frames[0][0].shape[:2] + + # 创建视频写入器 + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(str(filepath), fourcc, fps, (width, height)) + + try: + for frame, _ in frames: + out.write(frame) + + logger.info(f"告警视频已保存: {filepath}") + return str(filepath) + + except Exception as e: + logger.error(f"保存视频失败: {e}") + return None + + finally: + out.release() + +class StreamServer: + """流媒体服务器接口""" + def __init__(self, output_url: str = "rtmp://localhost:1935/live/stream", use_gpu: bool = True): + self.output_url = output_url + self.ffmpeg_process = None + self.use_gpu = use_gpu + self.gpu_encoder = None + + def detect_gpu_encoder(self): + """检测可用的GPU编码器""" + # 检测NVIDIA GPU编码器优先级 + gpu_encoders = ['h264_nvenc', 'hevc_nvenc', 'av1_nvenc'] + + for encoder in gpu_encoders: + try: + # 测试编码器是否可用 + test_cmd = ['ffmpeg', '-hide_banner', '-f', 'lavfi', '-i', 'testsrc2=size=320x240:duration=1', + '-c:v', encoder, '-f', 'null', '-'] + result = subprocess.run(test_cmd, capture_output=True, timeout=10) + if result.returncode == 0: + logger.info(f"检测到可用的GPU编码器: {encoder}") + return encoder + except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): + continue + + logger.warning("未检测到可用的GPU编码器,将使用CPU编码") + return None + + def start_streaming(self, width: int, height: int, fps: int = 25): + """启动流媒体推送""" + # 检测GPU编码器 + if self.use_gpu: + self.gpu_encoder = self.detect_gpu_encoder() + + # 构建FFmpeg命令 + ffmpeg_cmd = [ + 'ffmpeg', + '-y', # 覆盖输出文件 + '-f', 'rawvideo', + '-vcodec', 'rawvideo', + '-pix_fmt', 'bgr24', + '-s', f'{width}x{height}', + '-r', str(fps), + '-i', '-', # 从stdin读取 + ] + + # GPU硬件加速配置 + if self.gpu_encoder: + if 'nvenc' in self.gpu_encoder: + # NVIDIA GPU编码配置 + ffmpeg_cmd.extend([ + '-c:v', self.gpu_encoder, + '-pix_fmt', 'yuv420p', + # NVENC特定参数 + '-preset', 'p1', # 最快预设 (p1-p7, p1最快) + '-tune', 'll', # 低延迟调优 + '-rc', 'cbr', # 恒定码率控制 + '-b:v', '2M', # 视频码率 + '-maxrate', '2M', # 最大码率 + '-bufsize', '4M', # 缓冲区大小 + '-g', str(fps * 2), # GOP大小,2秒 + '-keyint_min', str(fps), # 最小关键帧间隔 + '-bf', '0', # B帧数量(低延迟设为0) + '-refs', '1', # 参考帧数量 + '-spatial_aq', '1', # 空间自适应量化 + '-temporal_aq', '1', # 时间自适应量化 + '-rc-lookahead', '8', # 前瞻帧数 + '-surfaces', '8', # 编码表面数量 + ]) + else: + # 其他GPU编码器的通用配置 + ffmpeg_cmd.extend([ + '-c:v', self.gpu_encoder, + '-pix_fmt', 'yuv420p', + '-preset', 'ultrafast', + '-b:v', '2M', + ]) + else: + # CPU编码配置(回退方案) + ffmpeg_cmd.extend([ + '-c:v', 'libx264', + '-pix_fmt', 'yuv420p', + '-preset', 'ultrafast', + '-tune', 'zerolatency', # 零延迟调优 + '-crf', '23', # 恒定质量因子 + '-maxrate', '2M', # 最大码率 + '-bufsize', '4M', # 缓冲区大小 + ]) + + # 输出格式和URL + ffmpeg_cmd.extend([ + '-f', 'flv', + self.output_url + ]) + + try: + self.ffmpeg_process = subprocess.Popen( + ffmpeg_cmd, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0 # 无缓冲,降低延迟 + ) + + encoder_info = self.gpu_encoder if self.gpu_encoder else "libx264 (CPU)" + logger.info(f"流媒体服务启动: {self.output_url}, 编码器: {encoder_info}") + return True + except Exception as e: + logger.error(f"启动流媒体失败: {e}") + return False + + def send_frame(self, frame: np.ndarray): + """发送帧到流媒体服务器""" + if self.ffmpeg_process and self.ffmpeg_process.stdin: + try: + self.ffmpeg_process.stdin.write(frame.tobytes()) + self.ffmpeg_process.stdin.flush() + except Exception as e: + logger.error(f"发送帧失败: {e}") + + def stop_streaming(self): + """停止流媒体推送""" + if self.ffmpeg_process: + self.ffmpeg_process.stdin.close() + self.ffmpeg_process.wait() + self.ffmpeg_process = None + logger.info("流媒体服务已停止") + +class RTSPProcessor: + """RTSP流处理器""" + def __init__(self, + rtsp_url: str, + model_path: str = "yolov8n.pt", + detection_interval: int = 5, + alarm_config: AlarmConfig = None, + output_stream_url: str = "rtmp://localhost:1935/live/stream"): + + self.rtsp_url = rtsp_url + self.detection_interval = detection_interval + self.frame_count = 0 + self.running = False + + # 初始化组件 + self.detector = YOLODetector(model_path) + self.alarm_manager = AlarmManager(alarm_config or AlarmConfig(target_classes=["person"])) + self.frame_buffer = FrameBuffer() + self.stream_server = StreamServer(output_stream_url, use_gpu=True) # 启用GPU加速 + + # 最后的检测结果 + self.last_detection = None + + # 线程安全队列 + self.frame_queue = queue.Queue(maxsize=100) + + def draw_detections(self, frame: np.ndarray, detection_result: DetectionResult) -> np.ndarray: + """在帧上绘制检测结果""" + if detection_result is None or len(detection_result.boxes) == 0: + return frame + + annotated_frame = frame.copy() + + for box, confidence, class_name in zip( + detection_result.boxes, + detection_result.confidences, + detection_result.class_names + ): + x1, y1, x2, y2 = box.astype(int) + + # 绘制边界框 + color = (0, 255, 0) # 绿色 + if class_name in self.alarm_manager.config.target_classes: + color = (0, 0, 255) # 目标类别用红色 + + cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) + + # 绘制标签 + label = f"{class_name}: {confidence:.2f}" + label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] + cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10), + (x1 + label_size[0], y1), color, -1) + cv2.putText(annotated_frame, label, (x1, y1 - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) + + # 添加告警状态显示 + if self.alarm_manager.is_alarming: + cv2.putText(annotated_frame, "ALARM ACTIVE", (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3) + + # 添加时间戳 + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + cv2.putText(annotated_frame, timestamp, (10, annotated_frame.shape[0] - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + return annotated_frame + + def process_frame(self, frame: np.ndarray) -> np.ndarray: + """处理单帧""" + current_time = datetime.now() + detection_result = None + + # 按间隔进行检测 + if self.frame_count % self.detection_interval == 0: + detection_result = self.detector.detect(frame, + self.alarm_manager.config.confidence_threshold) + self.last_detection = detection_result + + # 检查告警触发 + if self.alarm_manager.check_alarm_trigger(detection_result): + if self.alarm_manager.start_alarm(): + # 告警开始时的处理 + pass + + # 使用最后的检测结果绘制 + annotated_frame = self.draw_detections(frame, self.last_detection) + + # 添加帧到缓冲区 + self.frame_buffer.add_frame(annotated_frame, current_time) + + # 检查告警结束 + if self.alarm_manager.should_stop_alarm(): + # 获取告警期间的帧 + alarm_frames = self.frame_buffer.get_frames_in_range( + self.alarm_manager.alarm_start_time, + self.alarm_manager.config.alarm_duration + ) + + # 保存告警视频 + if alarm_frames: + threading.Thread( + target=self.alarm_manager.save_alarm_video, + args=(alarm_frames, 25), + daemon=True + ).start() + + self.alarm_manager.stop_alarm() + + return annotated_frame + + def capture_frames(self): + """捕获RTSP流帧""" + cap = cv2.VideoCapture(self.rtsp_url) + + if not cap.isOpened(): + logger.error(f"无法打开RTSP流: {self.rtsp_url}") + return + + # 设置缓冲区大小 + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + logger.info(f"开始捕获RTSP流: {self.rtsp_url}") + + try: + while self.running: + ret, frame = cap.read() + if not ret: + logger.warning("读取帧失败,尝试重连...") + cap.release() + time.sleep(5) + cap = cv2.VideoCapture(self.rtsp_url) + continue + + if not self.frame_queue.full(): + self.frame_queue.put(frame) + else: + # 丢弃最旧的帧 + try: + self.frame_queue.get_nowait() + self.frame_queue.put(frame) + except queue.Empty: + pass + + finally: + cap.release() + logger.info("RTSP捕获已停止") + + def process_frames(self): + """处理帧线程""" + first_frame = True + + while self.running: + try: + frame = self.frame_queue.get(timeout=1) + + # 初始化流媒体服务器 + if first_frame: + height, width = frame.shape[:2] + if self.stream_server.start_streaming(width, height): + first_frame = False + else: + logger.error("流媒体服务器启动失败") + break + + # 处理帧 + processed_frame = self.process_frame(frame) + + # 发送到流媒体服务器 + self.stream_server.send_frame(processed_frame) + + self.frame_count += 1 + + except queue.Empty: + continue + except Exception as e: + logger.error(f"处理帧时出错: {e}") + + logger.info("帧处理已停止") + + def start(self): + """启动处理器""" + self.running = True + + # 启动捕获线程 + capture_thread = threading.Thread(target=self.capture_frames, daemon=True) + capture_thread.start() + + # 启动处理线程 + process_thread = threading.Thread(target=self.process_frames, daemon=True) + process_thread.start() + + logger.info("RTSP处理器已启动") + + return capture_thread, process_thread + + def stop(self): + """停止处理器""" + self.running = False + self.stream_server.stop_streaming() + logger.info("RTSP处理器已停止") + +def main(): + """主函数""" + # 配置参数 + RTSP_URL = "rtsp://10.0.0.17:8554/camera_test/2" # 替换为实际RTSP地址 + MODEL_PATH = "yolov8n.pt" # YOLO模型路径 + DETECTION_INTERVAL = 5 # 每5帧检测一次 + OUTPUT_STREAM_URL = "rtmp://localhost:1935/live/processed" # 输出流地址 + + # 告警配置 + alarm_config = AlarmConfig( + target_classes=["person", "car", "truck"], # 目标类别 + confidence_threshold=0.6, + alarm_duration=15, # 告警录制15秒 + cooldown_duration=60, # 冷却60秒 + save_path="./alarm_videos" + ) + + # 创建处理器 + processor = RTSPProcessor( + rtsp_url=RTSP_URL, + model_path=MODEL_PATH, + detection_interval=DETECTION_INTERVAL, + alarm_config=alarm_config, + output_stream_url=OUTPUT_STREAM_URL + ) + + try: + # 启动处理 + threads = processor.start() + + # 保持运行 + logger.info("系统运行中,按Ctrl+C停止...") + while True: + time.sleep(1) + + except KeyboardInterrupt: + logger.info("收到停止信号") + + finally: + processor.stop() + +if __name__ == "__main__": + main() \ No newline at end of file