import asyncio import time from typing import Dict, List, Optional, Callable from datetime import datetime import psutil from app.config import software_config class ProcessInfo: """进程信息类""" def __init__(self, pid: int, name: str, software_id: str): self.pid = pid self.name = name self.software_id = software_id self.start_time = datetime.now() self.cpu_percent = 0.0 self.memory_percent = 0.0 self.status = "running" self.last_check = datetime.now() class ProcessMonitor: """进程监控类""" def __init__(self): self.monitored_processes: Dict[str, ProcessInfo] = {} # software_id -> ProcessInfo self.monitoring = False self.monitor_task: Optional[asyncio.Task] = None self.check_interval = 5 # 监控间隔(秒) self.callbacks: List[Callable] = [] # 状态变化回调函数 def add_callback(self, callback: Callable): """添加状态变化回调函数""" self.callbacks.append(callback) def remove_callback(self, callback: Callable): """移除回调函数""" if callback in self.callbacks: self.callbacks.remove(callback) async def _notify_callbacks(self, event_type: str, software_id: str, data: dict): """通知所有回调函数""" for callback in self.callbacks: try: if asyncio.iscoroutinefunction(callback): await callback(event_type, software_id, data) else: callback(event_type, software_id, data) except Exception as e: print(f"回调函数执行失败: {e}") def add_process(self, software_id: str, pid: int): """添加需要监控的进程""" try: process = psutil.Process(pid) process_info = ProcessInfo(pid, process.name(), software_id) self.monitored_processes[software_id] = process_info # 启动监控(如果未启动) if not self.monitoring: asyncio.create_task(self.start_monitoring()) except psutil.NoSuchProcess: print(f"进程 {pid} 不存在") def remove_process(self, software_id: str): """移除监控的进程""" if software_id in self.monitored_processes: del self.monitored_processes[software_id] async def start_monitoring(self): """开始监控""" if self.monitoring: return self.monitoring = True self.monitor_task = asyncio.create_task(self._monitor_loop()) print("进程监控已启动") async def stop_monitoring(self): """停止监控""" self.monitoring = False if self.monitor_task: self.monitor_task.cancel() try: await self.monitor_task except asyncio.CancelledError: pass print("进程监控已停止") async def _monitor_loop(self): """监控循环""" while self.monitoring: try: await self._check_processes() await asyncio.sleep(self.check_interval) except asyncio.CancelledError: break except Exception as e: print(f"监控循环错误: {e}") await asyncio.sleep(self.check_interval) async def _check_processes(self): """检查所有监控的进程""" for software_id in list(self.monitored_processes.keys()): process_info = self.monitored_processes[software_id] try: process = psutil.Process(process_info.pid) # 检查进程是否还在运行 if not process.is_running(): await self._handle_process_stopped(software_id, process_info) continue # 更新进程状态信息 old_status = process_info.status old_cpu = process_info.cpu_percent old_memory = process_info.memory_percent process_info.cpu_percent = process.cpu_percent() process_info.memory_percent = process.memory_percent() process_info.status = process.status() process_info.last_check = datetime.now() # 检查是否有显著变化需要通知 if self._should_notify_change(old_cpu, process_info.cpu_percent, old_memory, process_info.memory_percent): await self._notify_callbacks("process_update", software_id, { "pid": process_info.pid, "cpu_percent": process_info.cpu_percent, "memory_percent": process_info.memory_percent, "status": process_info.status }) except psutil.NoSuchProcess: await self._handle_process_stopped(software_id, process_info) except psutil.AccessDenied: # 权限不足,但进程可能还在运行 process_info.status = "access_denied" process_info.last_check = datetime.now() except Exception as e: print(f"检查进程 {process_info.pid} 时出错: {e}") async def _handle_process_stopped(self, software_id: str, process_info: ProcessInfo): """处理进程停止""" # 通知进程已停止 await self._notify_callbacks("process_stopped", software_id, { "pid": process_info.pid, "name": process_info.name, "running_time": (datetime.now() - process_info.start_time).total_seconds() }) # 从监控列表中移除 del self.monitored_processes[software_id] # 如果没有更多进程需要监控,停止监控 if not self.monitored_processes: await self.stop_monitoring() def _should_notify_change(self, old_cpu: float, new_cpu: float, old_memory: float, new_memory: float) -> bool: """判断是否应该通知状态变化""" # CPU使用率变化超过10%或内存使用率变化超过5%时通知 cpu_change = abs(new_cpu - old_cpu) memory_change = abs(new_memory - old_memory) return cpu_change > 10.0 or memory_change > 5.0 def get_process_info(self, software_id: str) -> Optional[ProcessInfo]: """获取进程信息""" return self.monitored_processes.get(software_id) def get_all_processes(self) -> Dict[str, ProcessInfo]: """获取所有监控的进程信息""" return self.monitored_processes.copy() def is_process_running(self, software_id: str) -> bool: """检查指定软件的进程是否在运行""" if software_id not in self.monitored_processes: return False process_info = self.monitored_processes[software_id] try: process = psutil.Process(process_info.pid) return process.is_running() except (psutil.NoSuchProcess, psutil.AccessDenied): return False async def get_system_info(self) -> dict: """获取系统信息""" try: # CPU信息 cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() # 内存信息 memory = psutil.virtual_memory() # 磁盘信息 disk = psutil.disk_usage('/') return { "cpu": { "percent": cpu_percent, "count": cpu_count }, "memory": { "total": memory.total, "available": memory.available, "percent": memory.percent }, "disk": { "total": disk.total, "used": disk.used, "free": disk.free, "percent": (disk.used / disk.total) * 100 }, "timestamp": datetime.now().isoformat() } except Exception as e: print(f"获取系统信息失败: {e}") return {} async def find_software_processes(self) -> Dict[str, List[dict]]: """查找所有配置的软件进程""" software_processes = {} try: # 获取所有进程 all_processes = list(psutil.process_iter(['pid', 'name', 'create_time', 'cpu_percent', 'memory_percent'])) # 检查每个配置的软件 for software_id in software_config.get_software_list(): sw_config = software_config.get_software_config(software_id) if not sw_config: continue process_name = sw_config.get("check_process_name", "").lower() if not process_name: continue matching_processes = [] for proc in all_processes: try: if proc.info['name'].lower() == process_name: matching_processes.append({ "pid": proc.info['pid'], "name": proc.info['name'], "create_time": proc.info['create_time'], "cpu_percent": proc.info['cpu_percent'], "memory_percent": proc.info['memory_percent'] }) except (psutil.NoSuchProcess, psutil.AccessDenied): continue if matching_processes: software_processes[software_id] = matching_processes except Exception as e: print(f"查找软件进程失败: {e}") return software_processes # 创建全局实例 process_monitor = ProcessMonitor()