CadHubManage/app/core/process_monitor.py
root e6261532f7 feat: 添加软件停止功能并修复Creo进程检测问题
## 主要修复
- 修复Creo软件运行状态检测失败问题
- 添加完整的软件停止功能支持
- 改进多进程软件的进程管理逻辑

## 技术改进
- 更新软件配置支持多进程名称检测
- 优化进程停止逻辑,增加超时配置
- 新增 stop_software WebSocket消息类型
- 完善错误处理和日志记录

## 配置更新
- configs/software_config.yaml: 支持进程名称列表和停止超时
- 添加Revit 2017配置支持

## 文档更新
- README.md: 更新软件配置说明和API列表
- frontend-api-docs.md: 添加停止软件API文档
- CHECKPOINT.md: 记录修复进展和解决方案

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-24 17:24:49 +08:00

266 lines
10 KiB
Python

import asyncio
import time
from typing import Dict, List, Optional, Callable
from datetime import datetime
import psutil
from app.config import software_config
class ProcessInfo:
"""进程信息类"""
def __init__(self, pid: int, name: str, software_id: str):
self.pid = pid
self.name = name
self.software_id = software_id
self.start_time = datetime.now()
self.cpu_percent = 0.0
self.memory_percent = 0.0
self.status = "running"
self.last_check = datetime.now()
class ProcessMonitor:
"""进程监控类"""
def __init__(self):
self.monitored_processes: Dict[str, ProcessInfo] = {} # software_id -> ProcessInfo
self.monitoring = False
self.monitor_task: Optional[asyncio.Task] = None
self.check_interval = 5 # 监控间隔(秒)
self.callbacks: List[Callable] = [] # 状态变化回调函数
def add_callback(self, callback: Callable):
"""添加状态变化回调函数"""
self.callbacks.append(callback)
def remove_callback(self, callback: Callable):
"""移除回调函数"""
if callback in self.callbacks:
self.callbacks.remove(callback)
async def _notify_callbacks(self, event_type: str, software_id: str, data: dict):
"""通知所有回调函数"""
for callback in self.callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback(event_type, software_id, data)
else:
callback(event_type, software_id, data)
except Exception as e:
print(f"回调函数执行失败: {e}")
def add_process(self, software_id: str, pid: int):
"""添加需要监控的进程"""
try:
process = psutil.Process(pid)
process_info = ProcessInfo(pid, process.name(), software_id)
self.monitored_processes[software_id] = process_info
# 启动监控(如果未启动)
if not self.monitoring:
asyncio.create_task(self.start_monitoring())
except psutil.NoSuchProcess:
print(f"进程 {pid} 不存在")
def remove_process(self, software_id: str):
"""移除监控的进程"""
if software_id in self.monitored_processes:
del self.monitored_processes[software_id]
async def start_monitoring(self):
"""开始监控"""
if self.monitoring:
return
self.monitoring = True
self.monitor_task = asyncio.create_task(self._monitor_loop())
print("进程监控已启动")
async def stop_monitoring(self):
"""停止监控"""
self.monitoring = False
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
print("进程监控已停止")
async def _monitor_loop(self):
"""监控循环"""
while self.monitoring:
try:
await self._check_processes()
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
break
except Exception as e:
print(f"监控循环错误: {e}")
await asyncio.sleep(self.check_interval)
async def _check_processes(self):
"""检查所有监控的进程"""
for software_id in list(self.monitored_processes.keys()):
process_info = self.monitored_processes[software_id]
try:
process = psutil.Process(process_info.pid)
# 检查进程是否还在运行
if not process.is_running():
await self._handle_process_stopped(software_id, process_info)
continue
# 更新进程状态信息
old_status = process_info.status
old_cpu = process_info.cpu_percent
old_memory = process_info.memory_percent
process_info.cpu_percent = process.cpu_percent()
process_info.memory_percent = process.memory_percent()
process_info.status = process.status()
process_info.last_check = datetime.now()
# 检查是否有显著变化需要通知
if self._should_notify_change(old_cpu, process_info.cpu_percent, old_memory, process_info.memory_percent):
await self._notify_callbacks("process_update", software_id, {
"pid": process_info.pid,
"cpu_percent": process_info.cpu_percent,
"memory_percent": process_info.memory_percent,
"status": process_info.status
})
except psutil.NoSuchProcess:
await self._handle_process_stopped(software_id, process_info)
except psutil.AccessDenied:
# 权限不足,但进程可能还在运行
process_info.status = "access_denied"
process_info.last_check = datetime.now()
except Exception as e:
print(f"检查进程 {process_info.pid} 时出错: {e}")
async def _handle_process_stopped(self, software_id: str, process_info: ProcessInfo):
"""处理进程停止"""
# 通知进程已停止
await self._notify_callbacks("process_stopped", software_id, {
"pid": process_info.pid,
"name": process_info.name,
"running_time": (datetime.now() - process_info.start_time).total_seconds()
})
# 从监控列表中移除
del self.monitored_processes[software_id]
# 如果没有更多进程需要监控,停止监控
if not self.monitored_processes:
await self.stop_monitoring()
def _should_notify_change(self, old_cpu: float, new_cpu: float,
old_memory: float, new_memory: float) -> bool:
"""判断是否应该通知状态变化"""
# CPU使用率变化超过10%或内存使用率变化超过5%时通知
cpu_change = abs(new_cpu - old_cpu)
memory_change = abs(new_memory - old_memory)
return cpu_change > 10.0 or memory_change > 5.0
def get_process_info(self, software_id: str) -> Optional[ProcessInfo]:
"""获取进程信息"""
return self.monitored_processes.get(software_id)
def get_all_processes(self) -> Dict[str, ProcessInfo]:
"""获取所有监控的进程信息"""
return self.monitored_processes.copy()
def is_process_running(self, software_id: str) -> bool:
"""检查指定软件的进程是否在运行"""
if software_id not in self.monitored_processes:
return False
process_info = self.monitored_processes[software_id]
try:
process = psutil.Process(process_info.pid)
return process.is_running()
except (psutil.NoSuchProcess, psutil.AccessDenied):
return False
async def get_system_info(self) -> dict:
"""获取系统信息"""
try:
# CPU信息
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
# 内存信息
memory = psutil.virtual_memory()
# 磁盘信息
disk = psutil.disk_usage('/')
return {
"cpu": {
"percent": cpu_percent,
"count": cpu_count
},
"memory": {
"total": memory.total,
"available": memory.available,
"percent": memory.percent
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100
},
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"获取系统信息失败: {e}")
return {}
async def find_software_processes(self) -> Dict[str, List[dict]]:
"""查找所有配置的软件进程"""
software_processes = {}
try:
# 获取所有进程
all_processes = list(psutil.process_iter(['pid', 'name', 'create_time', 'cpu_percent', 'memory_percent']))
# 检查每个配置的软件
for software_id in software_config.get_software_list():
sw_config = software_config.get_software_config(software_id)
if not sw_config:
continue
process_name = sw_config.get("check_process_name", "").lower()
if not process_name:
continue
matching_processes = []
for proc in all_processes:
try:
if proc.info['name'].lower() == process_name:
matching_processes.append({
"pid": proc.info['pid'],
"name": proc.info['name'],
"create_time": proc.info['create_time'],
"cpu_percent": proc.info['cpu_percent'],
"memory_percent": proc.info['memory_percent']
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if matching_processes:
software_processes[software_id] = matching_processes
except Exception as e:
print(f"查找软件进程失败: {e}")
return software_processes
# 创建全局实例
process_monitor = ProcessMonitor()