CadHubManage/app/core/software_manager.py
root e6261532f7 feat: 添加软件停止功能并修复Creo进程检测问题
## 主要修复
- 修复Creo软件运行状态检测失败问题
- 添加完整的软件停止功能支持
- 改进多进程软件的进程管理逻辑

## 技术改进
- 更新软件配置支持多进程名称检测
- 优化进程停止逻辑,增加超时配置
- 新增 stop_software WebSocket消息类型
- 完善错误处理和日志记录

## 配置更新
- configs/software_config.yaml: 支持进程名称列表和停止超时
- 添加Revit 2017配置支持

## 文档更新
- README.md: 更新软件配置说明和API列表
- frontend-api-docs.md: 添加停止软件API文档
- CHECKPOINT.md: 记录修复进展和解决方案

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-24 17:24:49 +08:00

502 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import subprocess
import uuid
from datetime import datetime
from typing import Dict, Optional, List
import psutil
from app.models.task import Task, TaskStatus, TaskType
from app.models.software import SoftwareDefinition
from app.models.operation_log import OperationStatus
from app.config import software_config, settings
class SoftwareManager:
"""软件管理核心类"""
def __init__(self):
self.running_tasks: Dict[str, Task] = {}
self.running_processes: Dict[str, int] = {} # software_id -> process_id
self._websocket_manager = None
self._log_manager = None
def set_websocket_manager(self, websocket_manager):
"""设置WebSocket管理器用于状态通知"""
self._websocket_manager = websocket_manager
def set_log_manager(self, log_manager):
"""设置日志管理器用于操作记录"""
self._log_manager = log_manager
async def _notify_websocket(self, message_type: str, data: dict):
"""发送WebSocket通知"""
if self._websocket_manager:
message = {
"type": message_type,
"data": data,
"timestamp": datetime.now().isoformat()
}
await self._websocket_manager.broadcast(message)
async def _log_system_operation(self, operation: str, details: str, software_id: str = None, **kwargs):
"""记录系统操作日志"""
if self._log_manager:
from app.models.operation_log import ActionType, OperationStatus
# 设置默认值允许通过kwargs覆盖
log_params = {
"operation_category": "software_management",
"action_type": ActionType.EXECUTE,
**kwargs
}
await self._log_manager.log_system_operation(
operation=operation,
details=details,
target_object=software_id,
**log_params
)
async def get_software_list(self) -> List[Dict]:
"""获取软件列表及状态"""
software_list = []
for software_id in software_config.get_software_list():
sw_config = software_config.get_software_config(software_id)
is_running = self.is_software_running(software_id)
process_id = self.running_processes.get(software_id)
software_list.append({
"id": software_id,
"name": sw_config["name"],
"is_running": is_running,
"process_id": process_id
})
return software_list
def is_software_running(self, software_id: str) -> bool:
"""检查软件是否在运行"""
# 首先检查记录的进程ID
if software_id in self.running_processes:
process_id = self.running_processes[software_id]
try:
process = psutil.Process(process_id)
if process.is_running():
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
# 进程不存在,清理记录
del self.running_processes[software_id]
# 如果记录中没有或进程已停止,则根据进程名称检查
sw_config = software_config.get_software_config(software_id)
if sw_config:
return self._check_process_started(sw_config["check_process_name"])
return False
async def start_software(self, software_id: str) -> Task:
"""启动软件"""
# 验证软件配置存在
if not software_config.validate_software_exists(software_id):
raise ValueError(f"软件 {software_id} 不存在")
# 检查是否已在运行
if self.is_software_running(software_id):
raise ValueError(f"软件 {software_id} 已在运行")
# 创建任务
task = Task(
id=str(uuid.uuid4()),
task_type=TaskType.START_SOFTWARE,
software_id=software_id,
status=TaskStatus.PENDING,
created_at=datetime.now()
)
self.running_tasks[task.id] = task
# 记录启动操作日志
await self._log_system_operation(
operation="start_software",
details=f"启动软件任务已创建: {software_id}",
software_id=software_id,
status=OperationStatus.PENDING
)
# 异步启动软件
asyncio.create_task(self._execute_start_task(task))
return task
async def stop_software(self, software_id: str) -> Task:
"""停止软件"""
# 验证软件配置存在
if not software_config.validate_software_exists(software_id):
raise ValueError(f"软件 {software_id} 不存在")
# 检查软件是否在运行
if not self.is_software_running(software_id):
raise ValueError(f"软件 {software_id} 未在运行")
# 创建任务
task = Task(
id=str(uuid.uuid4()),
task_type=TaskType.STOP_SOFTWARE,
software_id=software_id,
status=TaskStatus.PENDING,
created_at=datetime.now()
)
self.running_tasks[task.id] = task
# 记录停止操作日志
await self._log_system_operation(
operation="stop_software",
details=f"停止软件任务已创建: {software_id}",
software_id=software_id,
status=OperationStatus.PENDING
)
# 异步停止软件
asyncio.create_task(self._execute_stop_task(task))
return task
async def restart_software(self, software_id: str) -> Task:
"""重启软件"""
# 验证软件配置存在
if not software_config.validate_software_exists(software_id):
raise ValueError(f"软件 {software_id} 不存在")
# 创建任务
task = Task(
id=str(uuid.uuid4()),
task_type=TaskType.RESTART_SOFTWARE,
software_id=software_id,
status=TaskStatus.PENDING,
created_at=datetime.now()
)
self.running_tasks[task.id] = task
# 异步重启软件
asyncio.create_task(self._execute_restart_task(task))
return task
async def _execute_start_task(self, task: Task):
"""执行启动任务"""
try:
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
task.progress = 10
# 通知任务开始
await self._notify_websocket("task_update", {
"task_id": task.id,
"software_id": task.software_id,
"status": task.status.value,
"progress": task.progress,
"message": f"开始启动软件 {task.software_id}"
})
sw_config = software_config.get_software_config(task.software_id)
# 启动进程
cmd = [sw_config["executable_path"]] + sw_config["startup_args"]
process = subprocess.Popen(cmd)
task.progress = 30
await self._notify_websocket("task_update", {
"task_id": task.id,
"software_id": task.software_id,
"status": task.status.value,
"progress": task.progress,
"message": f"进程已启动,等待软件完全加载"
})
# 等待软件启动完成
timeout = sw_config["startup_timeout"]
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
await asyncio.sleep(1)
task.progress = min(90, task.progress + 2)
# 定期更新进度
if task.progress % 10 == 0:
await self._notify_websocket("task_update", {
"task_id": task.id,
"software_id": task.software_id,
"status": task.status.value,
"progress": task.progress,
"message": f"软件加载中... ({task.progress}%)"
})
# 检查进程是否启动成功
if self._check_process_started(sw_config["check_process_name"]):
self.running_processes[task.software_id] = process.pid
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.progress = 100
# 通知启动成功
await self._notify_websocket("software_started", {
"task_id": task.id,
"software_id": task.software_id,
"process_id": process.pid,
"message": f"软件 {task.software_id} 启动成功"
})
# 记录启动成功日志
await self._log_system_operation(
operation="start_software",
details=f"软件 {task.software_id} 启动成功进程ID: {process.pid}",
software_id=task.software_id,
status=OperationStatus.SUCCESS,
duration=int((datetime.now() - task.started_at).total_seconds() * 1000),
extra_data={"process_id": process.pid, "task_id": task.id}
)
return
# 超时处理
process.kill()
task.status = TaskStatus.FAILED
task.error_message = f"软件启动超时 ({timeout}s)"
task.completed_at = datetime.now()
# 通知启动失败
await self._notify_websocket("software_start_failed", {
"task_id": task.id,
"software_id": task.software_id,
"error": task.error_message,
"message": f"软件 {task.software_id} 启动失败"
})
# 记录启动失败日志
await self._log_system_operation(
operation="start_software",
details=f"软件 {task.software_id} 启动失败: {task.error_message}",
software_id=task.software_id,
status=OperationStatus.FAILED,
error_message=task.error_message,
extra_data={"task_id": task.id}
)
except Exception as e:
task.status = TaskStatus.FAILED
task.error_message = str(e)
task.completed_at = datetime.now()
# 通知启动异常
await self._notify_websocket("software_start_failed", {
"task_id": task.id,
"software_id": task.software_id,
"error": str(e),
"message": f"软件 {task.software_id} 启动异常"
})
# 记录启动异常日志
await self._log_system_operation(
operation="start_software",
details=f"软件 {task.software_id} 启动异常: {str(e)}",
software_id=task.software_id,
status=OperationStatus.FAILED,
error_message=str(e),
extra_data={"task_id": task.id}
)
async def _execute_stop_task(self, task: Task):
"""执行停止任务"""
try:
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
task.progress = 10
# 通知任务开始
await self._notify_websocket("task_update", {
"task_id": task.id,
"software_id": task.software_id,
"status": task.status.value,
"progress": task.progress,
"message": f"开始停止软件 {task.software_id}"
})
# 停止软件
await self._stop_software_sync(task.software_id)
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.progress = 100
# 通知停止成功
await self._notify_websocket("software_stopped", {
"task_id": task.id,
"software_id": task.software_id,
"message": f"软件 {task.software_id} 已成功停止"
})
# 记录停止成功日志
await self._log_system_operation(
operation="stop_software",
details=f"软件 {task.software_id} 停止成功",
software_id=task.software_id,
status=OperationStatus.SUCCESS,
duration=int((datetime.now() - task.started_at).total_seconds() * 1000),
extra_data={"task_id": task.id}
)
except Exception as e:
task.status = TaskStatus.FAILED
task.error_message = str(e)
task.completed_at = datetime.now()
# 通知停止失败
await self._notify_websocket("software_stop_failed", {
"task_id": task.id,
"software_id": task.software_id,
"error": str(e),
"message": f"软件 {task.software_id} 停止失败"
})
# 记录停止失败日志
await self._log_system_operation(
operation="stop_software",
details=f"软件 {task.software_id} 停止失败: {str(e)}",
software_id=task.software_id,
status=OperationStatus.FAILED,
error_message=str(e),
extra_data={"task_id": task.id}
)
async def _execute_restart_task(self, task: Task):
"""执行重启任务"""
try:
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
task.progress = 10
# 先停止
if self.is_software_running(task.software_id):
await self._stop_software_sync(task.software_id)
task.progress = 50
# 等待一段时间
await asyncio.sleep(2)
# 再启动
await self._start_software_sync(task.software_id)
task.progress = 100
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
except Exception as e:
task.status = TaskStatus.FAILED
task.error_message = str(e)
task.completed_at = datetime.now()
async def _stop_software_sync(self, software_id: str):
"""同步停止软件"""
sw_config = software_config.get_software_config(software_id)
process_names = sw_config["check_process_name"]
stop_timeout = sw_config.get("stop_timeout", 10) # 默认10秒超时
# 支持单个进程名称字符串或进程名称列表
if isinstance(process_names, str):
process_names = [process_names]
stopped_processes = []
# 查找所有匹配的进程
target_processes = []
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name']:
for process_name in process_names:
if proc.info['name'].lower() == process_name.lower():
target_processes.append({
'pid': proc.info['pid'],
'name': proc.info['name']
})
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not target_processes:
raise Exception(f"未找到运行中的进程: {', '.join(process_names)}")
# 分阶段停止进程
for proc_info in target_processes:
try:
process = psutil.Process(proc_info['pid'])
# 先尝试优雅停止
process.terminate()
stopped_processes.append(proc_info['pid'])
# 等待进程结束,使用配置的超时时间
try:
process.wait(timeout=stop_timeout)
except psutil.TimeoutExpired:
# 如果优雅停止失败,强制杀死进程
if process.is_running():
process.kill()
try:
process.wait(timeout=3)
except psutil.TimeoutExpired:
# 记录无法停止的进程
print(f"警告: 无法强制停止进程 {proc_info['name']} (PID: {proc_info['pid']})")
except (psutil.NoSuchProcess, psutil.AccessDenied):
# 进程可能已经停止或无权限访问
continue
# 清理running_processes记录
if software_id in self.running_processes:
del self.running_processes[software_id]
# 等待一段时间确保所有进程都已停止
await asyncio.sleep(2)
async def _start_software_sync(self, software_id: str):
"""同步启动软件"""
sw_config = software_config.get_software_config(software_id)
cmd = [sw_config["executable_path"]] + sw_config["startup_args"]
process = subprocess.Popen(cmd)
# 等待启动
timeout = sw_config["startup_timeout"]
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
await asyncio.sleep(1)
if self._check_process_started(sw_config["check_process_name"]):
self.running_processes[software_id] = process.pid
return
# 启动失败
process.kill()
raise Exception(f"软件启动超时 ({timeout}s)")
def _check_process_started(self, process_names) -> bool:
"""检查进程是否已启动"""
# 支持单个进程名称字符串或进程名称列表
if isinstance(process_names, str):
process_names = [process_names]
for proc in psutil.process_iter(['name']):
try:
if proc.info['name']:
for process_name in process_names:
if proc.info['name'].lower() == process_name.lower():
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return False
def get_task(self, task_id: str) -> Optional[Task]:
"""获取任务信息"""
return self.running_tasks.get(task_id)
# 创建全局实例
software_manager = SoftwareManager()