YantaiVisionX/main.py

717 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
YantaiVisionX 主程序
LED灯阵检测系统的主入口和集成流水线
"""
import cv2
import time
import argparse
import sys
import os
from pathlib import Path
from typing import Optional
# 抑制OpenCV日志输出
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
os.environ['OPENCV_VIDEOIO_DEBUG'] = '0'
# 添加项目根目录到Python路径
if getattr(sys, 'frozen', False):
project_root = Path(getattr(sys, '_MEIPASS'))
else:
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from src.utils.path_utils import resolve_app_path
from src.utils.performance_limiter import PerformanceLimiter
from src.camera.opencv_camera import OpenCVCamera
from src.preprocessing.image_enhancer import ImageEnhancer
from src.roi_detection.led_detector import LEDDetector
from src.roi_detection.threshold_detector import LEDState
from src.output.result_formatter import ResultFormatter
from src.output.logger import LEDLogger
from src.output.alarm_manager import AlarmManager
from src.ui.tech_ui import TechUI
class YantaiVisionXSystem:
"""
YantaiVisionX主系统类
集成所有模块实现完整的LED检测流水线
"""
def __init__(
self,
config_dir: str = "config",
roi_config_path: Optional[str] = None,
algorithm_config_path: Optional[str] = None,
):
"""
初始化系统
Args:
config_dir: 配置文件目录
"""
self.config_dir = resolve_app_path(config_dir)
self.config_dir.mkdir(parents=True, exist_ok=True)
self._roi_config_path = (
resolve_app_path(roi_config_path)
if roi_config_path is not None
else self.config_dir / "roi_config.yaml"
)
self._algorithm_config_path = (
resolve_app_path(algorithm_config_path)
if algorithm_config_path is not None
else self.config_dir / "algorithm_config.yaml"
)
# 初始化各个组件
self.camera = None
self.image_enhancer = None
self.led_detector = None
self.formatter = ResultFormatter()
self.logger = LEDLogger()
self.alarm_manager = AlarmManager(self.logger)
self.tech_ui = TechUI()
self.performance_limiter = PerformanceLimiter()
self._detection_interval_seconds: float = 0.0
self._window_internal_name = "YantaiVisionX Monitor"
self._window_display_name = "烟台蓬莱国际机场低能见度识别软件"
self._window_created = False
self._default_runtime_message = "运行结果:实时检测中"
# 状态变量
self.is_running = False
self.display_enabled = False
self.runtime_message = self._default_runtime_message
def set_config_paths(
self,
roi_config_path: Optional[str] = None,
algorithm_config_path: Optional[str] = None,
) -> None:
"""动态更新配置文件路径"""
if roi_config_path is not None:
self._roi_config_path = resolve_app_path(roi_config_path)
if algorithm_config_path is not None:
self._algorithm_config_path = resolve_app_path(algorithm_config_path)
def get_config_paths(self) -> dict:
"""获取当前配置文件路径"""
return {
"roi": str(self._roi_config_path),
"algorithm": str(self._algorithm_config_path),
}
def _configure_performance_limiter(self) -> None:
"""根据算法配置更新性能限制。"""
performance_cfg = {}
if self.led_detector and hasattr(self.led_detector, "algorithm_config"):
performance_cfg = self.led_detector.algorithm_config.get("performance", {}) or {}
max_fps = performance_cfg.get("max_processing_fps")
frame_stride = performance_cfg.get("frame_stride", 1)
detection_interval = performance_cfg.get("detection_interval_seconds", 0)
self.performance_limiter.update_settings(max_processing_fps=max_fps, frame_stride=frame_stride)
try:
detection_interval_value = float(detection_interval)
except (TypeError, ValueError):
detection_interval_value = 0.0
self._detection_interval_seconds = detection_interval_value if detection_interval_value and detection_interval_value > 0 else 0.0
self.logger.log_info(
f"性能限制器配置: max_fps={self.performance_limiter.max_processing_fps}, "
f"frame_stride={self.performance_limiter.frame_stride}, "
f"detection_interval={self._detection_interval_seconds}s"
)
def update_performance_settings(
self,
max_processing_fps: Optional[float] = None,
frame_stride: Optional[int] = None,
detection_interval_seconds: Optional[float] = None,
) -> None:
"""运行中动态调整性能限制参数"""
self.performance_limiter.update_settings(
max_processing_fps=max_processing_fps,
frame_stride=frame_stride,
)
if detection_interval_seconds is not None:
try:
value = float(detection_interval_seconds)
except (TypeError, ValueError):
value = 0.0
self._detection_interval_seconds = value if value > 0 else 0.0
def initialize_system(self, camera_config=None) -> bool:
"""
初始化整个系统
Args:
camera_config: 摄像头配置如果为None则使用默认配置
Returns:
bool: 初始化是否成功
"""
try:
self.logger.log_info("初始化YantaiVisionX系统...")
# 1. 初始化图像增强器
self.image_enhancer = ImageEnhancer()
self.logger.log_info("图像增强器初始化完成")
# 2. 初始化LED检测器
roi_config_path = self._roi_config_path
algorithm_config_path = self._algorithm_config_path
self.led_detector = LEDDetector(
roi_config_path,
algorithm_config_path
)
self.logger.log_info("LED检测器初始化完成")
self._configure_performance_limiter()
if self.led_detector and hasattr(self.led_detector, 'algorithm_config'):
self.alarm_manager.update_config(
self.led_detector.algorithm_config.get('alarm')
)
# 3. 初始化摄像头
if camera_config is None:
camera_config = {
'opencv_camera': {
'device_id': 0,
'resolution': {'width': 1920, 'height': 1080},
'fps': 30
}
}
self.camera = OpenCVCamera(camera_config)
self.logger.log_info("系统初始化完成")
return True
except Exception as e:
self.logger.log_error("系统初始化失败", e)
return False
def start_detection(self, display: bool = False, save_results: bool = True) -> None:
"""
开始检测
Args:
display: 是否显示实时检测结果
save_results: 是否保存检测结果
"""
if not self._validate_system():
self.logger.log_error("系统验证失败,无法开始检测")
return
self.display_enabled = display
self.is_running = True
self._reset_runtime_message()
# 打开摄像头
if not self.camera.open():
self.logger.log_error("摄像头打开失败")
return
# 获取视频/摄像头帧率,用于控制播放速度
video_fps = self.camera.get_fps()
if video_fps <= 0:
video_fps = 30.0
frame_interval = 1.0 / video_fps # 每帧间隔(秒)
self.logger.log_info(f"开始 LED 检测... 帧率: {video_fps:.1f}fps")
limiter = self.performance_limiter
frame_index = 0
last_detection_result = None
last_detection_time: Optional[float] = None
display_frame_interval = 5 # 每5帧显示一次降低UI绑定开销
# 性能统计
loop_start = time.perf_counter()
total_display_time = 0.0
display_count = 0
try:
while self.is_running:
# 读取帧
success, frame = self.camera.read_frame()
if not success:
self.logger.log_warning("读取帧失败")
continue
current_frame_index = frame_index
frame_index += 1
current_time = time.perf_counter()
# 每100帧输出一次性能统计
if frame_index % 100 == 0:
elapsed = current_time - loop_start
actual_fps = frame_index / elapsed if elapsed > 0 else 0
avg_display = (total_display_time / display_count * 1000) if display_count > 0 else 0
print(f"[性能] 帧:{frame_index} 实际FPS:{actual_fps:.1f} 显示次数:{display_count} 平均显示耗时:{avg_display:.1f}ms")
should_process = True
if limiter and not limiter.should_process_frame(current_frame_index):
should_process = False
if last_detection_result is None:
should_process = True
if should_process and self._detection_interval_seconds > 0:
now = time.perf_counter()
if (
last_detection_time is not None
and (now - last_detection_time) < self._detection_interval_seconds
):
should_process = False
else:
last_detection_time = now
elif should_process:
last_detection_time = time.perf_counter()
detection_result = None
if should_process:
# 图像预处理
enhanced_frame = self.image_enhancer.preprocess_frame(
frame, mode="normal"
)
# LED检测
detection_result = self.led_detector.detect_leds(enhanced_frame)
last_detection_result = detection_result
# 记录结果
self.logger.log_detection_result(detection_result)
alarm_result = self.alarm_manager.process_detection(detection_result)
for event in alarm_result.events:
self.logger.log_alarm_event(event)
for roi_name in alarm_result.recoveries:
self.logger.log_recovery_event(
roi_name,
detection_result.frame_count,
detection_result.timestamp
)
# 保存结果
if (
save_results
and self.logger.save_results_enabled
and detection_result.frame_count % self.logger.save_results_interval == 0
):
self.logger.save_result_to_file(detection_result)
display_detection_result = last_detection_result if last_detection_result is not None else detection_result
# 控制显示帧率每N帧显示一次避免UI绑定拖慢速度
should_display = (current_frame_index % display_frame_interval == 0)
# 显示结果
if self.display_enabled and display_detection_result is not None and should_display:
t0 = time.perf_counter()
self._display_results(frame, display_detection_result)
total_display_time += time.perf_counter() - t0
display_count += 1
# 检查退出条件
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except KeyboardInterrupt:
self.logger.log_info("用户中断检测")
except Exception as e:
self.logger.log_error("检测过程中发生错误", e)
finally:
self._cleanup()
def process_video_file(self, video_path: str,
output_dir: str = "results",
display: bool = False) -> None:
"""
处理视频文件
Args:
video_path: 视频文件路径
output_dir: 结果输出目录
display: 是否显示实时结果
"""
# 创建视频文件摄像头
video_config = {
'opencv_camera': {
'video_file': video_path
}
}
self.camera = OpenCVCamera(video_config)
if not self.camera.open():
self.logger.log_error(f"无法打开视频文件: {video_path}")
return
self.display_enabled = display
self.is_running = True
self._reset_runtime_message()
# 创建输出目录
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
results = [] if self.logger.save_results_enabled else None
last_frame = None
last_detection_result = None
last_detection_time: Optional[float] = None
display_frame_interval = 5 # 每5帧显示一次降低UI绑定开销
self.logger.log_info(f"开始处理视频: {video_path}")
limiter = self.performance_limiter
frame_index = 0
# 性能统计
loop_start = time.perf_counter()
total_display_time = 0.0
display_count = 0
try:
while self.is_running:
current_time = time.perf_counter()
success, frame = self.camera.read_frame()
if not success:
if self.display_enabled and last_frame is not None:
self._freeze_last_frame(last_frame, last_detection_result)
break # 视频结束
current_frame_index = frame_index
frame_index += 1
# 每100帧输出一次性能统计
if frame_index % 100 == 0:
elapsed = current_time - loop_start
actual_fps = frame_index / elapsed if elapsed > 0 else 0
avg_display = (total_display_time / display_count * 1000) if display_count > 0 else 0
print(f"[性能] 帧:{frame_index} 实际FPS:{actual_fps:.1f} 显示次数:{display_count} 平均显示耗时:{avg_display:.1f}ms")
should_process = True
if limiter and not limiter.should_process_frame(current_frame_index):
should_process = False
if last_detection_result is None:
should_process = True
if should_process and self._detection_interval_seconds > 0:
now = time.perf_counter()
if (
last_detection_time is not None
and (now - last_detection_time) < self._detection_interval_seconds
):
should_process = False
else:
last_detection_time = now
elif should_process:
last_detection_time = time.perf_counter()
detection_result = None
if should_process:
# 图像预处理
enhanced_frame = self.image_enhancer.preprocess_frame(
frame, mode="normal"
)
# LED检测
detection_result = self.led_detector.detect_leds(enhanced_frame)
last_detection_result = detection_result
if results is not None:
results.append(detection_result)
# 记录结果
if detection_result.frame_count % 100 == 0:
self.logger.log_detection_result(detection_result)
alarm_result = self.alarm_manager.process_detection(detection_result)
for event in alarm_result.events:
self.logger.log_alarm_event(event)
for roi_name in alarm_result.recoveries:
self.logger.log_recovery_event(
roi_name,
detection_result.frame_count,
detection_result.timestamp
)
if self.display_enabled and frame is not None:
last_frame = frame.copy()
else:
last_frame = None
display_detection_result = last_detection_result if last_detection_result is not None else detection_result
# 控制显示帧率每N帧显示一次避免UI绑定拖慢速度
should_display = (current_frame_index % display_frame_interval == 0)
# 显示结果
if self.display_enabled and display_detection_result is not None and should_display:
t0 = time.perf_counter()
self._display_results(frame, display_detection_result)
total_display_time += time.perf_counter() - t0
display_count += 1
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 保存批量结果
if results:
csv_content = self.formatter.format_to_csv(results)
csv_path = output_path / f"batch_results_{int(time.time())}.csv"
with open(csv_path, 'w', encoding='utf-8') as f:
f.write(csv_content)
self.logger.log_info(f"批量结果已保存到: {csv_path}")
except Exception as e:
self.logger.log_error("处理视频文件时发生错误", e)
finally:
self._cleanup()
def _validate_system(self) -> bool:
"""
验证系统是否准备就绪
Returns:
bool: 系统是否就绪
"""
if not self.camera:
self.logger.log_error("摄像头未初始化")
return False
if not self.image_enhancer:
self.logger.log_error("图像增强器未初始化")
return False
if not self.led_detector:
self.logger.log_error("LED检测器未初始化")
return False
return True
def _reset_runtime_message(self) -> None:
self.runtime_message = self._default_runtime_message
def _display_results(self, original_frame, detection_result) -> None:
"""
显示检测结果
Args:
original_frame: 原始帧
detection_result: 检测结果
"""
# 可视化检测结果(在原始帧上绘制ROI等信息)
vis_frame = self.led_detector.visualize_detection_result(
original_frame, detection_result
)
# 使用科技感UI创建完整显示帧
display_frame = self.tech_ui.create_display_frame(
vis_frame,
detection_result,
runtime_message=self.runtime_message,
)
self._ensure_window()
# 显示图像
cv2.imshow(self._window_internal_name, display_frame)
# 在控制台显示矩阵状态每10帧显示一次
if detection_result.frame_count % 10 == 0:
matrix_text = self.formatter.format_matrix_visual(
self.formatter.format_to_matrix(detection_result)
)
print(f"\n{matrix_text}")
def _freeze_last_frame(self, frame, detection_result) -> None:
"""视频播放结束后保持最后一帧并显示运行结果"""
self.runtime_message = self._build_runtime_summary(detection_result)
if not self.display_enabled or frame is None:
return
try:
self._display_results(frame, detection_result)
except Exception as exc:
self.logger.log_warning(f"最后一帧渲染失败: {exc}")
return
self.logger.log_info("视频播放结束按Q或Esc退出最终画面")
while True:
key = cv2.waitKey(100) & 0xFF
if key in (ord('q'), 27):
break
try:
visible = cv2.getWindowProperty(self._window_internal_name, cv2.WND_PROP_VISIBLE)
except Exception:
visible = -1
if visible < 1:
break
def _build_runtime_summary(self, detection_result) -> str:
if not detection_result:
return "运行结果:未获取检测结果"
summary = getattr(detection_result, 'detection_summary', {}) or {}
threshold_summary = summary.get('threshold_detection') or {}
states = threshold_summary.get('states') or {}
counts = {
'on': int(states.get('on', 0)),
'off': int(states.get('off', 0)),
'uncertain': int(states.get('uncertain', 0)),
}
if counts['on'] + counts['off'] + counts['uncertain'] == 0:
stable_states = getattr(detection_result, 'stable_states', {}) or {}
for result in stable_states.values():
state = getattr(result, 'led_state', None)
if state == LEDState.ON:
counts['on'] += 1
elif state == LEDState.OFF:
counts['off'] += 1
else:
counts['uncertain'] += 1
total = counts['on'] + counts['off'] + counts['uncertain']
total_text = f"{total}" if total else "总数未知"
return (
f"运行结果:亮{counts['on']}{counts['off']} 不确定{counts['uncertain']}{total_text}"
)
def _cleanup(self) -> None:
"""
清理资源
"""
self.is_running = False
if self.camera:
self.camera.close()
if self.display_enabled:
cv2.destroyAllWindows()
self._window_created = False
self.logger.log_info("系统关闭完成")
self.logger.close()
def _ensure_window(self) -> None:
"""确保OpenCV窗口存在并设置标题"""
if self._window_created or not self.display_enabled:
return
cv2.namedWindow(self._window_internal_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(
self._window_internal_name,
self.tech_ui.window_width,
self.tech_ui.window_height,
)
cv2.setMouseCallback(self._window_internal_name, self._on_mouse_click)
self._apply_window_title()
self._window_created = True
def _on_mouse_click(self, event: int, x: int, y: int, flags: int, param) -> None:
"""鼠标点击回调函数"""
if event == cv2.EVENT_LBUTTONDOWN:
if self.tech_ui.is_exit_button_clicked(x, y):
self.logger.log_info("用户点击退出按钮")
self.is_running = False
def _apply_window_title(self) -> None:
"""在不同平台上设置窗口标题,优先保证中文显示正常"""
display_title = self._window_display_name
try:
if os.name == 'nt':
import ctypes
hwnd = ctypes.windll.user32.FindWindowW(None, self._window_internal_name)
if hwnd:
ctypes.windll.user32.SetWindowTextW(hwnd, display_title)
else:
cv2.setWindowTitle(self._window_internal_name, display_title)
else:
cv2.setWindowTitle(self._window_internal_name, display_title)
except Exception:
cv2.setWindowTitle(self._window_internal_name, display_title)
def main():
"""
主程序入口
"""
parser = argparse.ArgumentParser(
description='YantaiVisionX - LED灯阵检测系统'
)
parser.add_argument('--video', '-v', type=str,
help='处理视频文件路径')
parser.add_argument('--camera', '-c', type=int, default=0,
help='摄像头设备ID默认0')
parser.add_argument('--display', '-d', action='store_true',
help='显示实时检测结果')
parser.add_argument('--config', type=str, default='config',
help='配置文件目录路径')
parser.add_argument('--ui', action='store_true',
help='启动可视化控制面板')
args = parser.parse_args()
if args.ui:
from src.ui.control_panel import launch_control_panel
launch_control_panel()
return 0
# 创建系统实例
system = YantaiVisionXSystem(args.config)
# 准备摄像头配置
if args.video:
camera_config = None # 将在process_video_file中设置
else:
camera_config = {
'opencv_camera': {
'device_id': args.camera,
'resolution': {'width': 1920, 'height': 1080},
'fps': 30
}
}
# 初始化系统
if not system.initialize_system(camera_config):
print("系统初始化失败")
return 1
try:
if args.video:
# 处理视频文件
system.process_video_file(args.video, display=args.display)
else:
# 实时摄像头检测
system.start_detection(display=args.display)
except KeyboardInterrupt:
print("\n程序被用户中断")
return 0
if __name__ == '__main__':
raise SystemExit(main())