YantaiVisionX/main.py
sladro 6fba8a981a feat: add alarm workflow and packaging support
Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
2025-11-13 11:38:45 +08:00

390 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
YantaiVisionX 主程序
LED灯阵检测系统的主入口和集成流水线
"""
import cv2
import time
import argparse
import sys
import os
from pathlib import Path
# 抑制OpenCV日志输出
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
os.environ['OPENCV_VIDEOIO_DEBUG'] = '0'
# 添加项目根目录到Python路径
if getattr(sys, 'frozen', False):
project_root = Path(getattr(sys, '_MEIPASS'))
else:
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from src.camera.opencv_camera import OpenCVCamera
from src.preprocessing.image_enhancer import ImageEnhancer
from src.roi_detection.led_detector import LEDDetector
from src.output.result_formatter import ResultFormatter
from src.output.logger import LEDLogger
from src.output.alarm_manager import AlarmManager
from src.ui.tech_ui import TechUI
class YantaiVisionXSystem:
"""
YantaiVisionX主系统类
集成所有模块实现完整的LED检测流水线
"""
def __init__(self, config_dir: str = "config"):
"""
初始化系统
Args:
config_dir: 配置文件目录
"""
self.config_dir = Path(config_dir)
# 初始化各个组件
self.camera = None
self.image_enhancer = None
self.led_detector = None
self.formatter = ResultFormatter()
self.logger = LEDLogger()
self.alarm_manager = AlarmManager(self.logger)
self.tech_ui = TechUI()
# 状态变量
self.is_running = False
self.display_enabled = False
def initialize_system(self, camera_config=None) -> bool:
"""
初始化整个系统
Args:
camera_config: 摄像头配置如果为None则使用默认配置
Returns:
bool: 初始化是否成功
"""
try:
self.logger.log_info("初始化YantaiVisionX系统...")
# 1. 初始化图像增强器
self.image_enhancer = ImageEnhancer()
self.logger.log_info("图像增强器初始化完成")
# 2. 初始化LED检测器
roi_config_path = self.config_dir / "roi_config.yaml"
algorithm_config_path = self.config_dir / "algorithm_config.yaml"
self.led_detector = LEDDetector(
str(roi_config_path),
str(algorithm_config_path)
)
self.logger.log_info("LED检测器初始化完成")
if self.led_detector and hasattr(self.led_detector, 'algorithm_config'):
self.alarm_manager.update_config(
self.led_detector.algorithm_config.get('alarm')
)
# 3. 初始化摄像头
if camera_config is None:
camera_config = {
'opencv_camera': {
'device_id': 0,
'resolution': {'width': 1920, 'height': 1080},
'fps': 30
}
}
self.camera = OpenCVCamera(camera_config)
self.logger.log_info("系统初始化完成")
return True
except Exception as e:
self.logger.log_error("系统初始化失败", e)
return False
def start_detection(self, display: bool = False, save_results: bool = True) -> None:
"""
开始检测
Args:
display: 是否显示实时检测结果
save_results: 是否保存检测结果
"""
if not self._validate_system():
self.logger.log_error("系统验证失败,无法开始检测")
return
self.display_enabled = display
self.is_running = True
# 打开摄像头
if not self.camera.open():
self.logger.log_error("摄像头打开失败")
return
self.logger.log_info("开始 LED 检测...")
try:
while self.is_running:
# 读取帧
success, frame = self.camera.read_frame()
if not success:
self.logger.log_warning("读取帧失败")
continue
# 图像预处理
enhanced_frame = self.image_enhancer.preprocess_frame(
frame, mode="normal"
)
# LED检测
detection_result = self.led_detector.detect_leds(enhanced_frame)
# 记录结果
self.logger.log_detection_result(detection_result)
alarm_result = self.alarm_manager.process_detection(detection_result)
for event in alarm_result.events:
self.logger.log_alarm_event(event)
for roi_name in alarm_result.recoveries:
self.logger.log_recovery_event(
roi_name,
detection_result.frame_count,
detection_result.timestamp
)
# 保存结果
if (
save_results
and self.logger.save_results_enabled
and detection_result.frame_count % self.logger.save_results_interval == 0
):
self.logger.save_result_to_file(detection_result)
# 显示结果
if self.display_enabled:
self._display_results(frame, detection_result)
# 检查退出条件
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except KeyboardInterrupt:
self.logger.log_info("用户中断检测")
except Exception as e:
self.logger.log_error("检测过程中发生错误", e)
finally:
self._cleanup()
def process_video_file(self, video_path: str,
output_dir: str = "results",
display: bool = False) -> None:
"""
处理视频文件
Args:
video_path: 视频文件路径
output_dir: 结果输出目录
display: 是否显示实时结果
"""
# 创建视频文件摄像头
video_config = {
'opencv_camera': {
'video_file': video_path
}
}
self.camera = OpenCVCamera(video_config)
if not self.camera.open():
self.logger.log_error(f"无法打开视频文件: {video_path}")
return
self.display_enabled = display
self.is_running = True
# 创建输出目录
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
results = [] if self.logger.save_results_enabled else None
self.logger.log_info(f"开始处理视频: {video_path}")
try:
while self.is_running:
success, frame = self.camera.read_frame()
if not success:
break # 视频结束
# 图像预处理
enhanced_frame = self.image_enhancer.preprocess_frame(
frame, mode="normal"
)
# LED检测
detection_result = self.led_detector.detect_leds(enhanced_frame)
if results is not None:
results.append(detection_result)
# 记录结果
if detection_result.frame_count % 100 == 0:
self.logger.log_detection_result(detection_result)
alarm_result = self.alarm_manager.process_detection(detection_result)
for event in alarm_result.events:
self.logger.log_alarm_event(event)
for roi_name in alarm_result.recoveries:
self.logger.log_recovery_event(
roi_name,
detection_result.frame_count,
detection_result.timestamp
)
# 显示结果
if self.display_enabled:
self._display_results(frame, detection_result)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 保存批量结果
if results:
csv_content = self.formatter.format_to_csv(results)
csv_path = output_path / f"batch_results_{int(time.time())}.csv"
with open(csv_path, 'w', encoding='utf-8') as f:
f.write(csv_content)
self.logger.log_info(f"批量结果已保存到: {csv_path}")
except Exception as e:
self.logger.log_error("处理视频文件时发生错误", e)
finally:
self._cleanup()
def _validate_system(self) -> bool:
"""
验证系统是否准备就绪
Returns:
bool: 系统是否就绪
"""
if not self.camera:
self.logger.log_error("摄像头未初始化")
return False
if not self.image_enhancer:
self.logger.log_error("图像增强器未初始化")
return False
if not self.led_detector:
self.logger.log_error("LED检测器未初始化")
return False
return True
def _display_results(self, original_frame, detection_result) -> None:
"""
显示检测结果
Args:
original_frame: 原始帧
detection_result: 检测结果
"""
# 可视化检测结果(在原始帧上绘制ROI等信息)
vis_frame = self.led_detector.visualize_detection_result(
original_frame, detection_result
)
# 使用科技感UI创建完整显示帧
display_frame = self.tech_ui.create_display_frame(vis_frame, detection_result)
# 显示图像
cv2.imshow('烟台蓬莱国际机场低能见度识别软件', display_frame)
# 在控制台显示矩阵状态每10帧显示一次
if detection_result.frame_count % 10 == 0:
matrix_text = self.formatter.format_matrix_visual(
self.formatter.format_to_matrix(detection_result)
)
print(f"\n{matrix_text}")
def _cleanup(self) -> None:
"""
清理资源
"""
self.is_running = False
if self.camera:
self.camera.close()
if self.display_enabled:
cv2.destroyAllWindows()
self.logger.log_info("系统关闭完成")
self.logger.close()
def main():
"""
主程序入口
"""
parser = argparse.ArgumentParser(
description='YantaiVisionX - LED灯阵检测系统'
)
parser.add_argument('--video', '-v', type=str,
help='处理视频文件路径')
parser.add_argument('--camera', '-c', type=int, default=0,
help='摄像头设备ID默认0')
parser.add_argument('--display', '-d', action='store_true',
help='显示实时检测结果')
parser.add_argument('--config', type=str, default='config',
help='配置文件目录路径')
args = parser.parse_args()
# 创建系统实例
system = YantaiVisionXSystem(args.config)
# 准备摄像头配置
if args.video:
camera_config = None # 将在process_video_file中设置
else:
camera_config = {
'opencv_camera': {
'device_id': args.camera,
'resolution': {'width': 1920, 'height': 1080},
'fps': 30
}
}
# 初始化系统
if not system.initialize_system(camera_config):
print("系统初始化失败")
return 1
try:
if args.video:
# 处理视频文件
system.process_video_file(args.video, display=args.display)
else:
# 实时摄像头检测
system.start_detection(display=args.display)
except KeyboardInterrupt:
print("\n程序被用户中断")
return 0
if __name__ == '__main__':
raise SystemExit(main())