diff --git a/rtsp_processor_1.py b/rtsp_processor_1.py index af20d63..98ea2cf 100644 --- a/rtsp_processor_1.py +++ b/rtsp_processor_1.py @@ -313,45 +313,45 @@ class RTSPProcessor: detection_interval: int = 5, alarm_config: AlarmConfig = None, output_stream_url: str = "rtmp://localhost:1935/live/stream"): - + self.rtsp_url = rtsp_url self.detection_interval = detection_interval self.frame_count = 0 self.running = False - + # 初始化组件 self.detector = YOLODetector(model_path) self.alarm_manager = AlarmManager(alarm_config or AlarmConfig(target_classes=["person"])) self.frame_buffer = FrameBuffer() self.stream_server = StreamServer(output_stream_url, use_gpu=True) # 启用GPU加速 - + # 最后的检测结果 self.last_detection = None - + # 线程安全队列 self.frame_queue = queue.Queue(maxsize=100) - + def draw_detections(self, frame: np.ndarray, detection_result: DetectionResult) -> np.ndarray: """在帧上绘制检测结果""" if detection_result is None or len(detection_result.boxes) == 0: return frame - + annotated_frame = frame.copy() - + for box, confidence, class_name in zip( detection_result.boxes, detection_result.confidences, detection_result.class_names ): x1, y1, x2, y2 = box.astype(int) - + # 绘制边界框 color = (0, 255, 0) # 绿色 if class_name in self.alarm_manager.config.target_classes: color = (0, 0, 255) # 目标类别用红色 - + cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) - + # 绘制标签 label = f"{class_name}: {confidence:.2f}" label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0] @@ -359,42 +359,42 @@ class RTSPProcessor: (x1 + label_size[0], y1), color, -1) cv2.putText(annotated_frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) - + # 添加告警状态显示 if self.alarm_manager.is_alarming: cv2.putText(annotated_frame, "ALARM ACTIVE", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3) - + # 添加时间戳 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cv2.putText(annotated_frame, timestamp, (10, annotated_frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) - + return annotated_frame - + def process_frame(self, frame: np.ndarray) -> np.ndarray: """处理单帧""" current_time = datetime.now() detection_result = None - + # 按间隔进行检测 if self.frame_count % self.detection_interval == 0: detection_result = self.detector.detect(frame, self.alarm_manager.config.confidence_threshold) self.last_detection = detection_result - + # 检查告警触发 if self.alarm_manager.check_alarm_trigger(detection_result): if self.alarm_manager.start_alarm(): # 告警开始时的处理 pass - + # 使用最后的检测结果绘制 annotated_frame = self.draw_detections(frame, self.last_detection) - + # 添加帧到缓冲区 self.frame_buffer.add_frame(annotated_frame, current_time) - + # 检查告警结束 if self.alarm_manager.should_stop_alarm(): # 获取告警期间的帧 @@ -402,7 +402,7 @@ class RTSPProcessor: self.alarm_manager.alarm_start_time, self.alarm_manager.config.alarm_duration ) - + # 保存告警视频 if alarm_frames: threading.Thread( @@ -410,65 +410,65 @@ class RTSPProcessor: args=(alarm_frames, 25), daemon=True ).start() - + self.alarm_manager.stop_alarm() - + return annotated_frame - + def connect_to_rtsp_stream(self, url): """ 尝试连接到 RTSP 流 """ cap = cv2.VideoCapture(url) - cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'HEVC')) + # cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'HEVC')) if not cap.isOpened(): logger.error(f"Failed to connect to {url}") return None return cap - + def capture_frames(self): """捕获RTSP流帧""" - cap = cv2.VideoCapture(self.rtsp_url) - - if not cap.isOpened(): - logger.error(f"无法打开RTSP流: {self.rtsp_url}") - return - + cap = self.connect_to_rtsp_stream(self.rtsp_url) + # 设置缓冲区大小 cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) - + logger.info(f"开始捕获RTSP流: {self.rtsp_url}") - + try: while self.running: - ret, frame = cap.read() - if not ret: - logger.warning("读取帧失败,尝试重连...") - cap.release() - time.sleep(5) - cap = cv2.VideoCapture(self.rtsp_url) - continue - - if not self.frame_queue.full(): - self.frame_queue.put(frame) - else: - # 丢弃最旧的帧 - try: - self.frame_queue.get_nowait() + if cap: + ret, frame = cap.read() + if not ret: + logger.warning("读取帧失败,尝试重连...") + cap.release() + time.sleep(1) + cap = None + continue + + if not self.frame_queue.full(): self.frame_queue.put(frame) - except queue.Empty: - pass - + else: + # 丢弃最旧的帧 + try: + self.frame_queue.get_nowait() + self.frame_queue.put(frame) + except queue.Empty: + pass + else: + cap = self.connect_to_rtsp_stream(self.rtsp_url) + time.sleep(5) + finally: cap.release() logger.info("RTSP捕获已停止") - + def process_frames(self): """处理帧线程""" first_frame = True - + while self.running: try: frame = self.frame_queue.get(timeout=1) - + # 初始化流媒体服务器 if first_frame: height, width = frame.shape[:2] @@ -477,38 +477,38 @@ class RTSPProcessor: else: logger.error("流媒体服务器启动失败") break - + # 处理帧 processed_frame = self.process_frame(frame) - + # 发送到流媒体服务器 self.stream_server.send_frame(processed_frame) - + self.frame_count += 1 - + except queue.Empty: continue except Exception as e: logger.error(f"处理帧时出错: {e}") - + logger.info("帧处理已停止") - + def start(self): """启动处理器""" self.running = True - + # 启动捕获线程 capture_thread = threading.Thread(target=self.capture_frames, daemon=True) capture_thread.start() - + # 启动处理线程 process_thread = threading.Thread(target=self.process_frames, daemon=True) process_thread.start() - + logger.info("RTSP处理器已启动") - + return capture_thread, process_thread - + def stop(self): """停止处理器""" self.running = False