From 87c8f8a8aa8cc67da324170d870c8a05142a522a Mon Sep 17 00:00:00 2001 From: sladro Date: Mon, 1 Dec 2025 15:11:57 +0800 Subject: [PATCH] refactor: simplify control panel UI --- src/ui/control_panel.py | 326 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 326 insertions(+) create mode 100644 src/ui/control_panel.py diff --git a/src/ui/control_panel.py b/src/ui/control_panel.py new file mode 100644 index 0000000..dcf313c --- /dev/null +++ b/src/ui/control_panel.py @@ -0,0 +1,326 @@ +"""可视化控制面板,用于启动监控与ROI标注流程。""" + +from __future__ import annotations + +import queue +import threading +import tkinter as tk +from tkinter import ttk, filedialog, messagebox +import subprocess +from pathlib import Path +from typing import Any, Dict, Optional + +from main import YantaiVisionXSystem +from tools.roi_calibration_tool import ROICalibrationTool +from src.utils.path_utils import resolve_app_path, get_app_root + + +class ControlPanelApp: + """Control panel UI for managing monitoring and calibration workflows.""" + + def __init__(self, root: tk.Tk): + self.root = root + self.root.title("烟台蓬莱国际机场低能见度识别软件") + self.root.geometry("880x640") + + self.project_root = get_app_root() + self.config_dir = resolve_app_path("config") + self.log_queue: "queue.Queue[str]" = queue.Queue() + + self.monitoring_thread: Optional[threading.Thread] = None + self.monitoring_system: Optional[YantaiVisionXSystem] = None + self.monitoring_stop_requested = False + + self.calibration_thread: Optional[threading.Thread] = None + self.external_processes: list[subprocess.Popen] = [] + + self._init_variables() + self._build_ui() + self.refresh_config_options() + + self.root.after(200, self._poll_log_queue) + + # ------------------------------------------------------------------ + # UI construction + # ------------------------------------------------------------------ + def _init_variables(self) -> None: + self.device_id_var = tk.IntVar(value=0) + self.video_file_var = tk.StringVar() + self.monitor_exe_var = tk.StringVar(value=str(self.project_root / "YantaiVisionX.exe")) + + self.calib_image_var = tk.StringVar() + self.calib_output_var = tk.StringVar(value=str(self.config_dir / "roi_config.yaml")) + self.calib_exe_var = tk.StringVar(value=str(self.project_root / "ROIConfigTool.exe")) + + def _build_ui(self) -> None: + main_frame = ttk.Frame(self.root, padding=10) + main_frame.pack(fill=tk.BOTH, expand=True) + + notebook = ttk.Notebook(main_frame) + notebook.pack(fill=tk.BOTH, expand=True) + + monitor_tab = ttk.Frame(notebook, padding=10) + calib_tab = ttk.Frame(notebook, padding=10) + notebook.add(monitor_tab, text="监控") + notebook.add(calib_tab, text="标注") + + self._build_monitor_tab(monitor_tab) + self._build_calibration_tab(calib_tab) + + log_label = ttk.Label(main_frame, text="运行日志") + log_label.pack(anchor=tk.W, pady=(10, 2)) + self.log_text = tk.Text(main_frame, height=8, state=tk.DISABLED, bg="#111", fg="#0f0") + self.log_text.pack(fill=tk.BOTH, expand=False) + + def _build_monitor_tab(self, parent: ttk.Frame) -> None: + form = ttk.Frame(parent) + form.pack(fill=tk.BOTH, expand=True) + + row = 0 + ttk.Label(form, text="摄像头ID").grid(row=row, column=0, sticky=tk.W, pady=5) + ttk.Entry(form, textvariable=self.device_id_var, width=10).grid(row=row, column=1, sticky=tk.W) + + row += 1 + ttk.Label(form, text="视频文件(可选)").grid(row=row, column=0, sticky=tk.W, pady=5) + ttk.Entry(form, textvariable=self.video_file_var, width=50).grid(row=row, column=1, sticky=tk.EW, padx=5) + ttk.Button(form, text="选择", command=lambda: self._browse_file(self.video_file_var, filetypes=[("视频文件", "*.mp4 *.avi *.mov"), ("所有文件", "*.*")])).grid(row=row, column=2) + + row += 1 + ttk.Label(form, text="监控EXE").grid(row=row, column=0, sticky=tk.W, pady=5) + ttk.Entry(form, textvariable=self.monitor_exe_var, width=50).grid(row=row, column=1, sticky=tk.EW, padx=5) + buttons_frame = ttk.Frame(form) + buttons_frame.grid(row=row, column=2, sticky=tk.W) + ttk.Button(buttons_frame, text="选择", command=lambda: self._browse_exe(self.monitor_exe_var)).pack(side=tk.LEFT) + ttk.Button(buttons_frame, text="运行EXE", command=self.launch_monitor_exe).pack(side=tk.LEFT, padx=5) + + form.columnconfigure(1, weight=1) + + button_frame = ttk.Frame(parent) + button_frame.pack(anchor=tk.E, pady=10, fill=tk.X) + self.start_monitor_btn = ttk.Button(button_frame, text="开始监控", command=self.start_monitoring) + self.start_monitor_btn.pack(side=tk.LEFT) + self.stop_monitor_btn = ttk.Button(button_frame, text="停止", command=self.stop_monitoring, state=tk.DISABLED) + self.stop_monitor_btn.pack(side=tk.LEFT, padx=10) + + def _build_calibration_tab(self, parent: ttk.Frame) -> None: + form = ttk.Frame(parent) + form.pack(fill=tk.BOTH, expand=True) + + ttk.Label(form, text="标定图像路径").grid(row=0, column=0, sticky=tk.W, pady=5) + ttk.Entry(form, textvariable=self.calib_image_var, width=50).grid(row=0, column=1, sticky=tk.EW, padx=5) + ttk.Button(form, text="选择", command=lambda: self._browse_file(self.calib_image_var, filetypes=[("图像", "*.jpg *.png *.bmp"), ("所有文件", "*.*")])).grid(row=0, column=2) + + ttk.Label(form, text="输出ROI配置").grid(row=1, column=0, sticky=tk.W, pady=5) + ttk.Entry(form, textvariable=self.calib_output_var, width=50).grid(row=1, column=1, sticky=tk.EW, padx=5) + ttk.Button(form, text="选择", command=lambda: self._save_file_dialog(self.calib_output_var)).grid(row=1, column=2) + + ttk.Label(form, text="标注EXE").grid(row=2, column=0, sticky=tk.W, pady=5) + ttk.Entry(form, textvariable=self.calib_exe_var, width=50).grid(row=2, column=1, sticky=tk.EW, padx=5) + exe_buttons = ttk.Frame(form) + exe_buttons.grid(row=2, column=2, sticky=tk.W) + ttk.Button(exe_buttons, text="选择", command=lambda: self._browse_exe(self.calib_exe_var)).pack(side=tk.LEFT) + ttk.Button(exe_buttons, text="运行EXE", command=self.launch_calib_exe).pack(side=tk.LEFT, padx=5) + + form.columnconfigure(1, weight=1) + + self.start_calib_btn = ttk.Button(parent, text="启动标注", command=self.start_calibration) + self.start_calib_btn.pack(anchor=tk.E, pady=10) + + # ------------------------------------------------------------------ + # Event handlers + # ------------------------------------------------------------------ + def refresh_config_options(self) -> None: + pass + + def start_monitoring(self) -> None: + if self.monitoring_thread and self.monitoring_thread.is_alive(): + messagebox.showinfo("提示", "监控正在运行") + return + + try: + settings = self._collect_monitoring_settings() + except ValueError as exc: + messagebox.showerror("参数错误", str(exc)) + return + + self.monitoring_stop_requested = False + self._set_monitoring_controls_state(disabled=True) + + self.monitoring_thread = threading.Thread( + target=self._monitoring_worker, + args=(settings,), + daemon=True, + ) + self.monitoring_thread.start() + self.log("监控启动中...") + + def stop_monitoring(self) -> None: + if not self.monitoring_system: + return + + self.monitoring_stop_requested = True + self.monitoring_system.is_running = False + self.log("已请求停止监控") + + def start_calibration(self) -> None: + if self.calibration_thread and self.calibration_thread.is_alive(): + messagebox.showinfo("提示", "标注流程正在运行") + return + + image_path = self.calib_image_var.get().strip() + if not image_path: + messagebox.showerror("缺少参数", "请先选择标定图像") + return + + if not Path(image_path).exists(): + messagebox.showerror("无效路径", "标定图像不存在") + return + + output_path = self.calib_output_var.get().strip() or str(self.config_dir / "roi_config.yaml") + + self.start_calib_btn.config(state=tk.DISABLED) + self.calibration_thread = threading.Thread( + target=self._calibration_worker, + args=(image_path, output_path), + daemon=True, + ) + self.calibration_thread.start() + self.log("ROI 标注工具启动中...") + + # ------------------------------------------------------------------ + # Worker functions + # ------------------------------------------------------------------ + def _monitoring_worker(self, settings: Dict[str, Any]) -> None: + try: + system = YantaiVisionXSystem(config_dir=str(self.config_dir)) + + if not system.initialize_system(camera_config=settings["camera_config"]): + raise RuntimeError("系统初始化失败,请检查摄像头或配置文件") + + self.monitoring_system = system + + video_path = settings["video_path"] + if video_path: + system.process_video_file(video_path, display=True) + else: + system.start_detection(display=True, save_results=True) + + self.log("监控已结束") + + except Exception as exc: + if not self.monitoring_stop_requested: + self.log(f"监控出错: {exc}") + self._schedule(lambda: messagebox.showerror("监控失败", str(exc))) + finally: + self.monitoring_system = None + self.monitoring_thread = None + self._schedule(lambda: self._set_monitoring_controls_state(disabled=False)) + + def _calibration_worker(self, image_path: str, output_path: str) -> None: + try: + tool = ROICalibrationTool(default_output_path=output_path) + tool.run_calibration(image_path) + self.log("标注流程已结束") + except Exception as exc: + self.log(f"标注出错: {exc}") + self._schedule(lambda: messagebox.showerror("标注失败", str(exc))) + finally: + self.calibration_thread = None + self._schedule(lambda: self.start_calib_btn.config(state=tk.NORMAL)) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + def _collect_monitoring_settings(self) -> Dict[str, Any]: + video_path = self.video_file_var.get().strip() + if video_path and not Path(video_path).exists(): + raise ValueError("视频文件不存在") + + camera_config: Dict[str, Any] = { + "opencv_camera": { + "device_id": self.device_id_var.get(), + "resolution": {"width": 1920, "height": 1080}, + } + } + + return { + "camera_config": camera_config, + "video_path": video_path, + } + + def _browse_file(self, variable: tk.StringVar, filetypes=None) -> None: + filetypes = filetypes or [("YAML 文件", "*.yaml"), ("所有文件", "*.*")] + path = filedialog.askopenfilename(title="选择文件", filetypes=filetypes) + if path: + variable.set(path) + + def _browse_exe(self, variable: tk.StringVar) -> None: + filetypes = [("可执行文件", "*.exe"), ("所有文件", "*.*")] + path = filedialog.askopenfilename(title="选择可执行文件", filetypes=filetypes) + if path: + variable.set(path) + + def _save_file_dialog(self, variable: tk.StringVar) -> None: + path = filedialog.asksaveasfilename( + title="选择输出文件", + defaultextension=".yaml", + filetypes=[("YAML 文件", "*.yaml"), ("所有文件", "*.*")], + ) + if path: + variable.set(path) + + def launch_monitor_exe(self) -> None: + self._launch_external_exe(self.monitor_exe_var, "监控EXE") + + def launch_calib_exe(self) -> None: + self._launch_external_exe(self.calib_exe_var, "标注EXE") + + def _launch_external_exe(self, path_var: tk.StringVar, friendly_name: str) -> None: + raw_path = path_var.get().strip() + if not raw_path: + messagebox.showerror("缺少路径", f"请先设置{friendly_name}路径") + return + exe_path = Path(raw_path) + if not exe_path.exists(): + messagebox.showerror("路径无效", f"未找到{friendly_name}: {exe_path}") + return + + try: + proc = subprocess.Popen([str(exe_path)], cwd=str(exe_path.parent)) + self.external_processes.append(proc) + self.log(f"已启动{friendly_name}: {exe_path}") + except Exception as exc: + messagebox.showerror("启动失败", str(exc)) + + def _set_monitoring_controls_state(self, *, disabled: bool) -> None: + self.start_monitor_btn.config(state=tk.DISABLED if disabled else tk.NORMAL) + self.stop_monitor_btn.config(state=tk.NORMAL if disabled else tk.DISABLED) + + def _poll_log_queue(self) -> None: + while not self.log_queue.empty(): + message = self.log_queue.get_nowait() + self.log_text.configure(state=tk.NORMAL) + self.log_text.insert(tk.END, message + "\n") + self.log_text.configure(state=tk.DISABLED) + self.log_text.see(tk.END) + + self.root.after(200, self._poll_log_queue) + + def log(self, message: str) -> None: + self.log_queue.put(message) + + def _schedule(self, func) -> None: + self.root.after(0, func) + + +def launch_control_panel() -> None: + root = tk.Tk() + ControlPanelApp(root) + root.mainloop() + + +__all__ = ["launch_control_panel", "ControlPanelApp"] + + +if __name__ == "__main__": + launch_control_panel()