""" YantaiVisionX 主程序 LED灯阵检测系统的主入口和集成流水线 """ import cv2 import time import argparse import sys import os from pathlib import Path from typing import Optional # 抑制OpenCV日志输出 os.environ['OPENCV_LOG_LEVEL'] = 'ERROR' os.environ['OPENCV_VIDEOIO_DEBUG'] = '0' # 添加项目根目录到Python路径 if getattr(sys, 'frozen', False): project_root = Path(getattr(sys, '_MEIPASS')) else: project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from src.utils.path_utils import resolve_app_path from src.utils.performance_limiter import PerformanceLimiter from src.camera.opencv_camera import OpenCVCamera from src.preprocessing.image_enhancer import ImageEnhancer from src.roi_detection.led_detector import LEDDetector from src.roi_detection.threshold_detector import LEDState from src.output.result_formatter import ResultFormatter from src.output.logger import LEDLogger from src.output.alarm_manager import AlarmManager from src.ui.tech_ui import TechUI class YantaiVisionXSystem: """ YantaiVisionX主系统类 集成所有模块实现完整的LED检测流水线 """ def __init__( self, config_dir: str = "config", roi_config_path: Optional[str] = None, algorithm_config_path: Optional[str] = None, ): """ 初始化系统 Args: config_dir: 配置文件目录 """ self.config_dir = resolve_app_path(config_dir) self.config_dir.mkdir(parents=True, exist_ok=True) self._roi_config_path = ( resolve_app_path(roi_config_path) if roi_config_path is not None else self.config_dir / "roi_config.yaml" ) self._algorithm_config_path = ( resolve_app_path(algorithm_config_path) if algorithm_config_path is not None else self.config_dir / "algorithm_config.yaml" ) # 初始化各个组件 self.camera = None self.image_enhancer = None self.led_detector = None self.formatter = ResultFormatter() self.logger = LEDLogger() self.alarm_manager = AlarmManager(self.logger) self.tech_ui = TechUI() self.performance_limiter = PerformanceLimiter() self._detection_interval_seconds: float = 0.0 self._window_internal_name = "YantaiVisionX Monitor" self._window_display_name = "烟台蓬莱国际机场低能见度识别软件" self._window_created = False self._default_runtime_message = "运行结果:实时检测中" # 状态变量 self.is_running = False self.display_enabled = False self.runtime_message = self._default_runtime_message def set_config_paths( self, roi_config_path: Optional[str] = None, algorithm_config_path: Optional[str] = None, ) -> None: """动态更新配置文件路径""" if roi_config_path is not None: self._roi_config_path = resolve_app_path(roi_config_path) if algorithm_config_path is not None: self._algorithm_config_path = resolve_app_path(algorithm_config_path) def get_config_paths(self) -> dict: """获取当前配置文件路径""" return { "roi": str(self._roi_config_path), "algorithm": str(self._algorithm_config_path), } def _configure_performance_limiter(self) -> None: """根据算法配置更新性能限制。""" performance_cfg = {} if self.led_detector and hasattr(self.led_detector, "algorithm_config"): performance_cfg = self.led_detector.algorithm_config.get("performance", {}) or {} max_fps = performance_cfg.get("max_processing_fps") frame_stride = performance_cfg.get("frame_stride", 1) detection_interval = performance_cfg.get("detection_interval_seconds", 0) self.performance_limiter.update_settings(max_processing_fps=max_fps, frame_stride=frame_stride) try: detection_interval_value = float(detection_interval) except (TypeError, ValueError): detection_interval_value = 0.0 self._detection_interval_seconds = detection_interval_value if detection_interval_value and detection_interval_value > 0 else 0.0 self.logger.log_info( f"性能限制器配置: max_fps={self.performance_limiter.max_processing_fps}, " f"frame_stride={self.performance_limiter.frame_stride}, " f"detection_interval={self._detection_interval_seconds}s" ) def update_performance_settings( self, max_processing_fps: Optional[float] = None, frame_stride: Optional[int] = None, detection_interval_seconds: Optional[float] = None, ) -> None: """运行中动态调整性能限制参数""" self.performance_limiter.update_settings( max_processing_fps=max_processing_fps, frame_stride=frame_stride, ) if detection_interval_seconds is not None: try: value = float(detection_interval_seconds) except (TypeError, ValueError): value = 0.0 self._detection_interval_seconds = value if value > 0 else 0.0 def initialize_system(self, camera_config=None) -> bool: """ 初始化整个系统 Args: camera_config: 摄像头配置,如果为None则使用默认配置 Returns: bool: 初始化是否成功 """ try: self.logger.log_info("初始化YantaiVisionX系统...") # 1. 初始化图像增强器 self.image_enhancer = ImageEnhancer() self.logger.log_info("图像增强器初始化完成") # 2. 初始化LED检测器 roi_config_path = self._roi_config_path algorithm_config_path = self._algorithm_config_path self.led_detector = LEDDetector( roi_config_path, algorithm_config_path ) self.logger.log_info("LED检测器初始化完成") self._configure_performance_limiter() if self.led_detector and hasattr(self.led_detector, 'algorithm_config'): self.alarm_manager.update_config( self.led_detector.algorithm_config.get('alarm') ) # 3. 初始化摄像头 if camera_config is None: camera_config = { 'opencv_camera': { 'device_id': 0, 'resolution': {'width': 1920, 'height': 1080}, 'fps': 30 } } self.camera = OpenCVCamera(camera_config) self.logger.log_info("系统初始化完成") return True except Exception as e: self.logger.log_error("系统初始化失败", e) return False def start_detection(self, display: bool = False, save_results: bool = True) -> None: """ 开始检测 Args: display: 是否显示实时检测结果 save_results: 是否保存检测结果 """ if not self._validate_system(): self.logger.log_error("系统验证失败,无法开始检测") return self.display_enabled = display self.is_running = True self._reset_runtime_message() # 打开摄像头 if not self.camera.open(): self.logger.log_error("摄像头打开失败") return # 获取视频/摄像头帧率,用于控制播放速度 video_fps = self.camera.get_fps() if video_fps <= 0: video_fps = 30.0 frame_interval = 1.0 / video_fps # 每帧间隔(秒) self.logger.log_info(f"开始 LED 检测... 帧率: {video_fps:.1f}fps") limiter = self.performance_limiter frame_index = 0 last_detection_result = None last_detection_time: Optional[float] = None display_frame_interval = 5 # 每5帧显示一次,降低UI绑定开销 # 性能统计 loop_start = time.perf_counter() total_display_time = 0.0 display_count = 0 try: while self.is_running: # 读取帧 success, frame = self.camera.read_frame() if not success: self.logger.log_warning("读取帧失败") continue current_frame_index = frame_index frame_index += 1 current_time = time.perf_counter() # 每100帧输出一次性能统计 if frame_index % 100 == 0: elapsed = current_time - loop_start actual_fps = frame_index / elapsed if elapsed > 0 else 0 avg_display = (total_display_time / display_count * 1000) if display_count > 0 else 0 print(f"[性能] 帧:{frame_index} 实际FPS:{actual_fps:.1f} 显示次数:{display_count} 平均显示耗时:{avg_display:.1f}ms") should_process = True if limiter and not limiter.should_process_frame(current_frame_index): should_process = False if last_detection_result is None: should_process = True if should_process and self._detection_interval_seconds > 0: now = time.perf_counter() if ( last_detection_time is not None and (now - last_detection_time) < self._detection_interval_seconds ): should_process = False else: last_detection_time = now elif should_process: last_detection_time = time.perf_counter() detection_result = None if should_process: # 图像预处理 enhanced_frame = self.image_enhancer.preprocess_frame( frame, mode="normal" ) # LED检测 detection_result = self.led_detector.detect_leds(enhanced_frame) last_detection_result = detection_result # 记录结果 self.logger.log_detection_result(detection_result) alarm_result = self.alarm_manager.process_detection(detection_result) for event in alarm_result.events: self.logger.log_alarm_event(event) for roi_name in alarm_result.recoveries: self.logger.log_recovery_event( roi_name, detection_result.frame_count, detection_result.timestamp ) # 保存结果 if ( save_results and self.logger.save_results_enabled and detection_result.frame_count % self.logger.save_results_interval == 0 ): self.logger.save_result_to_file(detection_result) display_detection_result = last_detection_result if last_detection_result is not None else detection_result # 控制显示帧率:每N帧显示一次,避免UI绑定拖慢速度 should_display = (current_frame_index % display_frame_interval == 0) # 显示结果 if self.display_enabled and display_detection_result is not None and should_display: t0 = time.perf_counter() self._display_results(frame, display_detection_result) total_display_time += time.perf_counter() - t0 display_count += 1 # 检查退出条件 if cv2.waitKey(1) & 0xFF == ord('q'): break except KeyboardInterrupt: self.logger.log_info("用户中断检测") except Exception as e: self.logger.log_error("检测过程中发生错误", e) finally: self._cleanup() def process_video_file(self, video_path: str, output_dir: str = "results", display: bool = False) -> None: """ 处理视频文件 Args: video_path: 视频文件路径 output_dir: 结果输出目录 display: 是否显示实时结果 """ # 创建视频文件摄像头 video_config = { 'opencv_camera': { 'video_file': video_path } } self.camera = OpenCVCamera(video_config) if not self.camera.open(): self.logger.log_error(f"无法打开视频文件: {video_path}") return self.display_enabled = display self.is_running = True self._reset_runtime_message() # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(exist_ok=True) results = [] if self.logger.save_results_enabled else None last_frame = None last_detection_result = None last_detection_time: Optional[float] = None display_frame_interval = 5 # 每5帧显示一次,降低UI绑定开销 self.logger.log_info(f"开始处理视频: {video_path}") limiter = self.performance_limiter frame_index = 0 # 性能统计 loop_start = time.perf_counter() total_display_time = 0.0 display_count = 0 try: while self.is_running: current_time = time.perf_counter() success, frame = self.camera.read_frame() if not success: if self.display_enabled and last_frame is not None: self._freeze_last_frame(last_frame, last_detection_result) break # 视频结束 current_frame_index = frame_index frame_index += 1 # 每100帧输出一次性能统计 if frame_index % 100 == 0: elapsed = current_time - loop_start actual_fps = frame_index / elapsed if elapsed > 0 else 0 avg_display = (total_display_time / display_count * 1000) if display_count > 0 else 0 print(f"[性能] 帧:{frame_index} 实际FPS:{actual_fps:.1f} 显示次数:{display_count} 平均显示耗时:{avg_display:.1f}ms") should_process = True if limiter and not limiter.should_process_frame(current_frame_index): should_process = False if last_detection_result is None: should_process = True if should_process and self._detection_interval_seconds > 0: now = time.perf_counter() if ( last_detection_time is not None and (now - last_detection_time) < self._detection_interval_seconds ): should_process = False else: last_detection_time = now elif should_process: last_detection_time = time.perf_counter() detection_result = None if should_process: # 图像预处理 enhanced_frame = self.image_enhancer.preprocess_frame( frame, mode="normal" ) # LED检测 detection_result = self.led_detector.detect_leds(enhanced_frame) last_detection_result = detection_result if results is not None: results.append(detection_result) # 记录结果 if detection_result.frame_count % 100 == 0: self.logger.log_detection_result(detection_result) alarm_result = self.alarm_manager.process_detection(detection_result) for event in alarm_result.events: self.logger.log_alarm_event(event) for roi_name in alarm_result.recoveries: self.logger.log_recovery_event( roi_name, detection_result.frame_count, detection_result.timestamp ) if self.display_enabled and frame is not None: last_frame = frame.copy() else: last_frame = None display_detection_result = last_detection_result if last_detection_result is not None else detection_result # 控制显示帧率:每N帧显示一次,避免UI绑定拖慢速度 should_display = (current_frame_index % display_frame_interval == 0) # 显示结果 if self.display_enabled and display_detection_result is not None and should_display: t0 = time.perf_counter() self._display_results(frame, display_detection_result) total_display_time += time.perf_counter() - t0 display_count += 1 if cv2.waitKey(1) & 0xFF == ord('q'): break # 保存批量结果 if results: csv_content = self.formatter.format_to_csv(results) csv_path = output_path / f"batch_results_{int(time.time())}.csv" with open(csv_path, 'w', encoding='utf-8') as f: f.write(csv_content) self.logger.log_info(f"批量结果已保存到: {csv_path}") except Exception as e: self.logger.log_error("处理视频文件时发生错误", e) finally: self._cleanup() def _validate_system(self) -> bool: """ 验证系统是否准备就绪 Returns: bool: 系统是否就绪 """ if not self.camera: self.logger.log_error("摄像头未初始化") return False if not self.image_enhancer: self.logger.log_error("图像增强器未初始化") return False if not self.led_detector: self.logger.log_error("LED检测器未初始化") return False return True def _reset_runtime_message(self) -> None: self.runtime_message = self._default_runtime_message def _display_results(self, original_frame, detection_result) -> None: """ 显示检测结果 Args: original_frame: 原始帧 detection_result: 检测结果 """ # 可视化检测结果(在原始帧上绘制ROI等信息) vis_frame = self.led_detector.visualize_detection_result( original_frame, detection_result ) # 使用科技感UI创建完整显示帧 display_frame = self.tech_ui.create_display_frame( vis_frame, detection_result, runtime_message=self.runtime_message, ) self._ensure_window() # 显示图像 cv2.imshow(self._window_internal_name, display_frame) # 在控制台显示矩阵状态(每10帧显示一次) if detection_result.frame_count % 10 == 0: matrix_text = self.formatter.format_matrix_visual( self.formatter.format_to_matrix(detection_result) ) print(f"\n{matrix_text}") def _freeze_last_frame(self, frame, detection_result) -> None: """视频播放结束后保持最后一帧并显示运行结果""" self.runtime_message = self._build_runtime_summary(detection_result) if not self.display_enabled or frame is None: return try: self._display_results(frame, detection_result) except Exception as exc: self.logger.log_warning(f"最后一帧渲染失败: {exc}") return self.logger.log_info("视频播放结束,按Q或Esc退出最终画面") while True: key = cv2.waitKey(100) & 0xFF if key in (ord('q'), 27): break try: visible = cv2.getWindowProperty(self._window_internal_name, cv2.WND_PROP_VISIBLE) except Exception: visible = -1 if visible < 1: break def _build_runtime_summary(self, detection_result) -> str: if not detection_result: return "运行结果:未获取检测结果" summary = getattr(detection_result, 'detection_summary', {}) or {} threshold_summary = summary.get('threshold_detection') or {} states = threshold_summary.get('states') or {} counts = { 'on': int(states.get('on', 0)), 'off': int(states.get('off', 0)), 'uncertain': int(states.get('uncertain', 0)), } if counts['on'] + counts['off'] + counts['uncertain'] == 0: stable_states = getattr(detection_result, 'stable_states', {}) or {} for result in stable_states.values(): state = getattr(result, 'led_state', None) if state == LEDState.ON: counts['on'] += 1 elif state == LEDState.OFF: counts['off'] += 1 else: counts['uncertain'] += 1 total = counts['on'] + counts['off'] + counts['uncertain'] total_text = f"共{total}盏" if total else "总数未知" return ( f"运行结果:亮{counts['on']} 灭{counts['off']} 不确定{counts['uncertain']}({total_text})" ) def _cleanup(self) -> None: """ 清理资源 """ self.is_running = False if self.camera: self.camera.close() if self.display_enabled: cv2.destroyAllWindows() self._window_created = False self.logger.log_info("系统关闭完成") self.logger.close() def _ensure_window(self) -> None: """确保OpenCV窗口存在并设置标题""" if self._window_created or not self.display_enabled: return cv2.namedWindow(self._window_internal_name, cv2.WINDOW_NORMAL) cv2.resizeWindow( self._window_internal_name, self.tech_ui.window_width, self.tech_ui.window_height, ) cv2.setMouseCallback(self._window_internal_name, self._on_mouse_click) self._apply_window_title() self._window_created = True def _on_mouse_click(self, event: int, x: int, y: int, flags: int, param) -> None: """鼠标点击回调函数""" if event == cv2.EVENT_LBUTTONDOWN: if self.tech_ui.is_exit_button_clicked(x, y): self.logger.log_info("用户点击退出按钮") self.is_running = False def _apply_window_title(self) -> None: """在不同平台上设置窗口标题,优先保证中文显示正常""" display_title = self._window_display_name try: if os.name == 'nt': import ctypes hwnd = ctypes.windll.user32.FindWindowW(None, self._window_internal_name) if hwnd: ctypes.windll.user32.SetWindowTextW(hwnd, display_title) else: cv2.setWindowTitle(self._window_internal_name, display_title) else: cv2.setWindowTitle(self._window_internal_name, display_title) except Exception: cv2.setWindowTitle(self._window_internal_name, display_title) def main(): """ 主程序入口 """ parser = argparse.ArgumentParser( description='YantaiVisionX - LED灯阵检测系统' ) parser.add_argument('--video', '-v', type=str, help='处理视频文件路径') parser.add_argument('--camera', '-c', type=int, default=0, help='摄像头设备ID(默认0)') parser.add_argument('--display', '-d', action='store_true', help='显示实时检测结果') parser.add_argument('--config', type=str, default='config', help='配置文件目录路径') parser.add_argument('--ui', action='store_true', help='启动可视化控制面板') args = parser.parse_args() if args.ui: from src.ui.control_panel import launch_control_panel launch_control_panel() return 0 # 创建系统实例 system = YantaiVisionXSystem(args.config) # 准备摄像头配置 if args.video: camera_config = None # 将在process_video_file中设置 else: camera_config = { 'opencv_camera': { 'device_id': args.camera, 'resolution': {'width': 1920, 'height': 1080}, 'fps': 30 } } # 初始化系统 if not system.initialize_system(camera_config): print("系统初始化失败") return 1 try: if args.video: # 处理视频文件 system.process_video_file(args.video, display=args.display) else: # 实时摄像头检测 system.start_detection(display=args.display) except KeyboardInterrupt: print("\n程序被用户中断") return 0 if __name__ == '__main__': raise SystemExit(main())