refactor: simplify control panel UI

This commit is contained in:
sladro 2025-12-01 15:11:57 +08:00
parent 54ed5e0682
commit 87c8f8a8aa

326
src/ui/control_panel.py Normal file
View File

@ -0,0 +1,326 @@
"""可视化控制面板用于启动监控与ROI标注流程。"""
from __future__ import annotations
import queue
import threading
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import subprocess
from pathlib import Path
from typing import Any, Dict, Optional
from main import YantaiVisionXSystem
from tools.roi_calibration_tool import ROICalibrationTool
from src.utils.path_utils import resolve_app_path, get_app_root
class ControlPanelApp:
"""Control panel UI for managing monitoring and calibration workflows."""
def __init__(self, root: tk.Tk):
self.root = root
self.root.title("烟台蓬莱国际机场低能见度识别软件")
self.root.geometry("880x640")
self.project_root = get_app_root()
self.config_dir = resolve_app_path("config")
self.log_queue: "queue.Queue[str]" = queue.Queue()
self.monitoring_thread: Optional[threading.Thread] = None
self.monitoring_system: Optional[YantaiVisionXSystem] = None
self.monitoring_stop_requested = False
self.calibration_thread: Optional[threading.Thread] = None
self.external_processes: list[subprocess.Popen] = []
self._init_variables()
self._build_ui()
self.refresh_config_options()
self.root.after(200, self._poll_log_queue)
# ------------------------------------------------------------------
# UI construction
# ------------------------------------------------------------------
def _init_variables(self) -> None:
self.device_id_var = tk.IntVar(value=0)
self.video_file_var = tk.StringVar()
self.monitor_exe_var = tk.StringVar(value=str(self.project_root / "YantaiVisionX.exe"))
self.calib_image_var = tk.StringVar()
self.calib_output_var = tk.StringVar(value=str(self.config_dir / "roi_config.yaml"))
self.calib_exe_var = tk.StringVar(value=str(self.project_root / "ROIConfigTool.exe"))
def _build_ui(self) -> None:
main_frame = ttk.Frame(self.root, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
notebook = ttk.Notebook(main_frame)
notebook.pack(fill=tk.BOTH, expand=True)
monitor_tab = ttk.Frame(notebook, padding=10)
calib_tab = ttk.Frame(notebook, padding=10)
notebook.add(monitor_tab, text="监控")
notebook.add(calib_tab, text="标注")
self._build_monitor_tab(monitor_tab)
self._build_calibration_tab(calib_tab)
log_label = ttk.Label(main_frame, text="运行日志")
log_label.pack(anchor=tk.W, pady=(10, 2))
self.log_text = tk.Text(main_frame, height=8, state=tk.DISABLED, bg="#111", fg="#0f0")
self.log_text.pack(fill=tk.BOTH, expand=False)
def _build_monitor_tab(self, parent: ttk.Frame) -> None:
form = ttk.Frame(parent)
form.pack(fill=tk.BOTH, expand=True)
row = 0
ttk.Label(form, text="摄像头ID").grid(row=row, column=0, sticky=tk.W, pady=5)
ttk.Entry(form, textvariable=self.device_id_var, width=10).grid(row=row, column=1, sticky=tk.W)
row += 1
ttk.Label(form, text="视频文件(可选)").grid(row=row, column=0, sticky=tk.W, pady=5)
ttk.Entry(form, textvariable=self.video_file_var, width=50).grid(row=row, column=1, sticky=tk.EW, padx=5)
ttk.Button(form, text="选择", command=lambda: self._browse_file(self.video_file_var, filetypes=[("视频文件", "*.mp4 *.avi *.mov"), ("所有文件", "*.*")])).grid(row=row, column=2)
row += 1
ttk.Label(form, text="监控EXE").grid(row=row, column=0, sticky=tk.W, pady=5)
ttk.Entry(form, textvariable=self.monitor_exe_var, width=50).grid(row=row, column=1, sticky=tk.EW, padx=5)
buttons_frame = ttk.Frame(form)
buttons_frame.grid(row=row, column=2, sticky=tk.W)
ttk.Button(buttons_frame, text="选择", command=lambda: self._browse_exe(self.monitor_exe_var)).pack(side=tk.LEFT)
ttk.Button(buttons_frame, text="运行EXE", command=self.launch_monitor_exe).pack(side=tk.LEFT, padx=5)
form.columnconfigure(1, weight=1)
button_frame = ttk.Frame(parent)
button_frame.pack(anchor=tk.E, pady=10, fill=tk.X)
self.start_monitor_btn = ttk.Button(button_frame, text="开始监控", command=self.start_monitoring)
self.start_monitor_btn.pack(side=tk.LEFT)
self.stop_monitor_btn = ttk.Button(button_frame, text="停止", command=self.stop_monitoring, state=tk.DISABLED)
self.stop_monitor_btn.pack(side=tk.LEFT, padx=10)
def _build_calibration_tab(self, parent: ttk.Frame) -> None:
form = ttk.Frame(parent)
form.pack(fill=tk.BOTH, expand=True)
ttk.Label(form, text="标定图像路径").grid(row=0, column=0, sticky=tk.W, pady=5)
ttk.Entry(form, textvariable=self.calib_image_var, width=50).grid(row=0, column=1, sticky=tk.EW, padx=5)
ttk.Button(form, text="选择", command=lambda: self._browse_file(self.calib_image_var, filetypes=[("图像", "*.jpg *.png *.bmp"), ("所有文件", "*.*")])).grid(row=0, column=2)
ttk.Label(form, text="输出ROI配置").grid(row=1, column=0, sticky=tk.W, pady=5)
ttk.Entry(form, textvariable=self.calib_output_var, width=50).grid(row=1, column=1, sticky=tk.EW, padx=5)
ttk.Button(form, text="选择", command=lambda: self._save_file_dialog(self.calib_output_var)).grid(row=1, column=2)
ttk.Label(form, text="标注EXE").grid(row=2, column=0, sticky=tk.W, pady=5)
ttk.Entry(form, textvariable=self.calib_exe_var, width=50).grid(row=2, column=1, sticky=tk.EW, padx=5)
exe_buttons = ttk.Frame(form)
exe_buttons.grid(row=2, column=2, sticky=tk.W)
ttk.Button(exe_buttons, text="选择", command=lambda: self._browse_exe(self.calib_exe_var)).pack(side=tk.LEFT)
ttk.Button(exe_buttons, text="运行EXE", command=self.launch_calib_exe).pack(side=tk.LEFT, padx=5)
form.columnconfigure(1, weight=1)
self.start_calib_btn = ttk.Button(parent, text="启动标注", command=self.start_calibration)
self.start_calib_btn.pack(anchor=tk.E, pady=10)
# ------------------------------------------------------------------
# Event handlers
# ------------------------------------------------------------------
def refresh_config_options(self) -> None:
pass
def start_monitoring(self) -> None:
if self.monitoring_thread and self.monitoring_thread.is_alive():
messagebox.showinfo("提示", "监控正在运行")
return
try:
settings = self._collect_monitoring_settings()
except ValueError as exc:
messagebox.showerror("参数错误", str(exc))
return
self.monitoring_stop_requested = False
self._set_monitoring_controls_state(disabled=True)
self.monitoring_thread = threading.Thread(
target=self._monitoring_worker,
args=(settings,),
daemon=True,
)
self.monitoring_thread.start()
self.log("监控启动中...")
def stop_monitoring(self) -> None:
if not self.monitoring_system:
return
self.monitoring_stop_requested = True
self.monitoring_system.is_running = False
self.log("已请求停止监控")
def start_calibration(self) -> None:
if self.calibration_thread and self.calibration_thread.is_alive():
messagebox.showinfo("提示", "标注流程正在运行")
return
image_path = self.calib_image_var.get().strip()
if not image_path:
messagebox.showerror("缺少参数", "请先选择标定图像")
return
if not Path(image_path).exists():
messagebox.showerror("无效路径", "标定图像不存在")
return
output_path = self.calib_output_var.get().strip() or str(self.config_dir / "roi_config.yaml")
self.start_calib_btn.config(state=tk.DISABLED)
self.calibration_thread = threading.Thread(
target=self._calibration_worker,
args=(image_path, output_path),
daemon=True,
)
self.calibration_thread.start()
self.log("ROI 标注工具启动中...")
# ------------------------------------------------------------------
# Worker functions
# ------------------------------------------------------------------
def _monitoring_worker(self, settings: Dict[str, Any]) -> None:
try:
system = YantaiVisionXSystem(config_dir=str(self.config_dir))
if not system.initialize_system(camera_config=settings["camera_config"]):
raise RuntimeError("系统初始化失败,请检查摄像头或配置文件")
self.monitoring_system = system
video_path = settings["video_path"]
if video_path:
system.process_video_file(video_path, display=True)
else:
system.start_detection(display=True, save_results=True)
self.log("监控已结束")
except Exception as exc:
if not self.monitoring_stop_requested:
self.log(f"监控出错: {exc}")
self._schedule(lambda: messagebox.showerror("监控失败", str(exc)))
finally:
self.monitoring_system = None
self.monitoring_thread = None
self._schedule(lambda: self._set_monitoring_controls_state(disabled=False))
def _calibration_worker(self, image_path: str, output_path: str) -> None:
try:
tool = ROICalibrationTool(default_output_path=output_path)
tool.run_calibration(image_path)
self.log("标注流程已结束")
except Exception as exc:
self.log(f"标注出错: {exc}")
self._schedule(lambda: messagebox.showerror("标注失败", str(exc)))
finally:
self.calibration_thread = None
self._schedule(lambda: self.start_calib_btn.config(state=tk.NORMAL))
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _collect_monitoring_settings(self) -> Dict[str, Any]:
video_path = self.video_file_var.get().strip()
if video_path and not Path(video_path).exists():
raise ValueError("视频文件不存在")
camera_config: Dict[str, Any] = {
"opencv_camera": {
"device_id": self.device_id_var.get(),
"resolution": {"width": 1920, "height": 1080},
}
}
return {
"camera_config": camera_config,
"video_path": video_path,
}
def _browse_file(self, variable: tk.StringVar, filetypes=None) -> None:
filetypes = filetypes or [("YAML 文件", "*.yaml"), ("所有文件", "*.*")]
path = filedialog.askopenfilename(title="选择文件", filetypes=filetypes)
if path:
variable.set(path)
def _browse_exe(self, variable: tk.StringVar) -> None:
filetypes = [("可执行文件", "*.exe"), ("所有文件", "*.*")]
path = filedialog.askopenfilename(title="选择可执行文件", filetypes=filetypes)
if path:
variable.set(path)
def _save_file_dialog(self, variable: tk.StringVar) -> None:
path = filedialog.asksaveasfilename(
title="选择输出文件",
defaultextension=".yaml",
filetypes=[("YAML 文件", "*.yaml"), ("所有文件", "*.*")],
)
if path:
variable.set(path)
def launch_monitor_exe(self) -> None:
self._launch_external_exe(self.monitor_exe_var, "监控EXE")
def launch_calib_exe(self) -> None:
self._launch_external_exe(self.calib_exe_var, "标注EXE")
def _launch_external_exe(self, path_var: tk.StringVar, friendly_name: str) -> None:
raw_path = path_var.get().strip()
if not raw_path:
messagebox.showerror("缺少路径", f"请先设置{friendly_name}路径")
return
exe_path = Path(raw_path)
if not exe_path.exists():
messagebox.showerror("路径无效", f"未找到{friendly_name}: {exe_path}")
return
try:
proc = subprocess.Popen([str(exe_path)], cwd=str(exe_path.parent))
self.external_processes.append(proc)
self.log(f"已启动{friendly_name}: {exe_path}")
except Exception as exc:
messagebox.showerror("启动失败", str(exc))
def _set_monitoring_controls_state(self, *, disabled: bool) -> None:
self.start_monitor_btn.config(state=tk.DISABLED if disabled else tk.NORMAL)
self.stop_monitor_btn.config(state=tk.NORMAL if disabled else tk.DISABLED)
def _poll_log_queue(self) -> None:
while not self.log_queue.empty():
message = self.log_queue.get_nowait()
self.log_text.configure(state=tk.NORMAL)
self.log_text.insert(tk.END, message + "\n")
self.log_text.configure(state=tk.DISABLED)
self.log_text.see(tk.END)
self.root.after(200, self._poll_log_queue)
def log(self, message: str) -> None:
self.log_queue.put(message)
def _schedule(self, func) -> None:
self.root.after(0, func)
def launch_control_panel() -> None:
root = tk.Tk()
ControlPanelApp(root)
root.mainloop()
__all__ = ["launch_control_panel", "ControlPanelApp"]
if __name__ == "__main__":
launch_control_panel()