MLPlatform/function/system_monitor.py

498 lines
19 KiB
Python

import psutil
import GPUtil
import platform
from datetime import datetime
from typing import Dict, List, Optional
import logging
from pathlib import Path
import time
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
import json
from collections import Counter
import re
class SystemMonitor:
"""系统资源监控类"""
def __init__(self):
"""初始化系统监控器"""
self.logger = logging.getLogger(__name__)
self._setup_logging()
self.mlflow_client = MlflowClient()
def _setup_logging(self):
"""设置日志"""
log_dir = Path('.log')
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(
log_dir / f'system_monitor_{datetime.now():%Y%m%d_%H%M%S}.log'
)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
self.logger.addHandler(file_handler)
self.logger.setLevel(logging.INFO)
def _get_gpu_info(self) -> List[Dict]:
"""获取GPU信息"""
try:
gpus = GPUtil.getGPUs()
gpu_info = []
for gpu in gpus:
# 获取GPU进程信息
processes = []
# if gpu.processes:
# for proc in gpu.processes:
# process = psutil.Process(proc.pid)
# processes.append({
# 'pid': proc.pid,
# 'name': process.name(),
# 'memory': proc.gpu_memory # MB
# })
gpu_info.append({
'id': gpu.id,
'name': gpu.name,
'memory': {
'total': gpu.memoryTotal, # MB
'used': gpu.memoryUsed, # MB
'free': gpu.memoryFree # MB
},
'utilization': {
'gpu': gpu.load * 100, # %
'memory': gpu.memoryUtil * 100 # %
},
'temperature': gpu.temperature, # °C
'power': {
'draw': gpu.powerDraw if hasattr(gpu, 'powerDraw') else None, # W
'limit': gpu.powerLimit if hasattr(gpu, 'powerLimit') else None # W
},
'processes': processes
})
return gpu_info
except Exception as e:
self.logger.error(f"获取GPU信息失败: {str(e)}")
return []
def _get_cpu_info(self) -> Dict:
"""获取CPU信息"""
try:
cpu_freq = psutil.cpu_freq()
cpu_temp = psutil.sensors_temperatures().get('coretemp', [None])[0]
return {
'count': {
'physical': psutil.cpu_count(logical=False),
'logical': psutil.cpu_count(logical=True)
},
'utilization': psutil.cpu_percent(interval=1), # %
'frequency': {
'current': cpu_freq.current / 1000 if cpu_freq else None, # GHz
'min': cpu_freq.min / 1000 if cpu_freq else None, # GHz
'max': cpu_freq.max / 1000 if cpu_freq else None # GHz
},
'temperature': cpu_temp.current if cpu_temp else None, # °C
'memory': self._get_memory_info(),
'swap': self._get_swap_info()
}
except Exception as e:
self.logger.error(f"获取CPU信息失败: {str(e)}")
return {}
def _get_memory_info(self) -> Dict:
"""获取内存信息"""
try:
mem = psutil.virtual_memory()
return {
'total': mem.total // (1024 * 1024), # MB
'used': mem.used // (1024 * 1024), # MB
'free': mem.free // (1024 * 1024), # MB
'percent': mem.percent # %
}
except Exception as e:
self.logger.error(f"获取内存信息失败: {str(e)}")
return {}
def _get_swap_info(self) -> Dict:
"""获取交换内存信息"""
try:
swap = psutil.swap_memory()
return {
'total': swap.total // (1024 * 1024), # MB
'used': swap.used // (1024 * 1024), # MB
'free': swap.free // (1024 * 1024), # MB
'percent': swap.percent # %
}
except Exception as e:
self.logger.error(f"获取交换内存信息失败: {str(e)}")
return {}
def _get_disk_info(self) -> Dict:
"""获取磁盘信息"""
try:
disk_info = {}
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_info[partition.mountpoint] = {
'total': usage.total // (1024 * 1024), # MB
'used': usage.used // (1024 * 1024), # MB
'free': usage.free // (1024 * 1024), # MB
'percent': usage.percent # %
}
except (PermissionError, OSError):
continue
return disk_info
except Exception as e:
self.logger.error(f"获取磁盘信息失败: {str(e)}")
return {}
def _get_process_info(self) -> Dict:
"""获取进程信息"""
try:
processes = {
'total': len(psutil.pids()),
'running': 0,
'sleeping': 0
}
for proc in psutil.process_iter(['status']):
try:
status = proc.info['status']
if status == 'running':
processes['running'] += 1
elif status == 'sleeping':
processes['sleeping'] += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
except Exception as e:
self.logger.error(f"获取进程信息失败: {str(e)}")
return {}
def get_system_resources(self) -> Dict:
"""
获取系统资源使用情况
Returns:
系统资源信息
"""
try:
resources = {
'gpu': self._get_gpu_info(),
'cpu': self._get_cpu_info(),
'disk': self._get_disk_info(),
'processes': self._get_process_info()
}
self.logger.info("成功获取系统资源信息")
return {
'status': 'success',
'resources': resources,
'timestamp': datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
}
except Exception as e:
error_msg = f"获取系统资源信息失败: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': '获取资源信息失败',
'details': {
'error_type': type(e).__name__,
'error_message': str(e)
}
}
def get_training_history(
self,
page: int = 1,
page_size: int = 10,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
status: Optional[str] = None,
experiment_name: Optional[str] = None
) -> Dict:
"""
获取训练历史记录
Args:
page: 页码
page_size: 每页数量
start_time: 开始时间 (YYYY-MM-DD)
end_time: 结束时间 (YYYY-MM-DD)
status: 运行状态过滤
experiment_name: 实验名称过滤
Returns:
训练历史记录
"""
try:
# 构建过滤条件
filter_string = []
if status:
filter_string.append(f"status = '{status.upper()}'")
if start_time:
start_timestamp = int(pd.Timestamp(start_time).timestamp() * 1000)
filter_string.append(f"start_time >= {start_timestamp}")
if end_time:
end_timestamp = int(pd.Timestamp(end_time).timestamp() * 1000)
filter_string.append(f"start_time <= {end_timestamp}")
filter_string = " and ".join(filter_string) if filter_string else None
# 获取实验ID
if experiment_name:
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment is None:
return {
'status': 'error',
'message': f'实验 {experiment_name} 不存在'
}
experiment_ids = [experiment.experiment_id]
else:
experiments = mlflow.search_experiments()
experiment_ids = [exp.experiment_id for exp in experiments]
# 获取所有运行记录
all_runs = []
for exp_id in experiment_ids:
runs = mlflow.search_runs(
experiment_ids=[exp_id],
filter_string=filter_string,
order_by=['start_time DESC']
)
if not runs.empty:
all_runs.extend(runs.to_dict('records'))
# 计算分页
total_items = len(all_runs)
total_pages = (total_items + page_size - 1) // page_size
start_idx = (page - 1) * page_size
end_idx = min(start_idx + page_size, total_items)
# 格式化运行记录
history = []
for run in all_runs[start_idx:end_idx]:
# 获取实验信息
experiment = mlflow.get_experiment(run['experiment_id'])
# 计算持续时间
start_time = pd.to_datetime(run['start_time'])
end_time = pd.to_datetime(run['end_time']) if run['end_time'] else pd.Timestamp.now()
duration = end_time - start_time
# 收集参数和指标
params = {}
metrics = {}
tags = {}
for key, value in run.items():
if key.startswith('params.'):
params[key.replace('params.', '')] = value
elif key.startswith('metrics.'):
metrics[key.replace('metrics.', '')] = value
elif key.startswith('tags.'):
tags[key.replace('tags.', '')] = value
# 格式化记录
history_item = {
'run_id': run['run_id'],
'experiment_id': run['experiment_id'],
'experiment_name': experiment.name,
'model_name': params.get('algorithm', 'Unknown'),
'dataset': params.get('dataset', 'Unknown'),
'start_time': start_time.strftime('%Y-%m-%dT%H:%M:%S'),
'end_time': end_time.strftime('%Y-%m-%dT%H:%M:%S'),
'duration': str(duration).split('.')[0], # 去除微秒部分
'status': run['status'],
'parameters': params,
'metrics': metrics,
'tags': tags
}
history.append(history_item)
self.logger.info(f"成功获取训练历史记录, 共{total_items}条记录")
return {
'status': 'success',
'history': history,
'pagination': {
'current_page': page,
'page_size': page_size,
'total_pages': total_pages,
'total_items': total_items
}
}
except Exception as e:
error_msg = f"获取训练历史失败: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': '获取训练历史失败',
'details': {
'error_type': type(e).__name__,
'error_message': str(e)
}
}
def get_system_logs(
self,
level: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
module: Optional[str] = None,
page: int = 1,
page_size: int = 20
) -> Dict:
"""
获取系统日志
Args:
level: 日志级别过滤
start_time: 开始时间 (YYYY-MM-DDThh:mm:ss)
end_time: 结束时间 (YYYY-MM-DDThh:mm:ss)
module: 模块名称过滤
page: 页码
page_size: 每页数量
Returns:
系统日志信息
"""
try:
# 获取所有日志文件
log_dir = Path('.log')
log_files = sorted(log_dir.glob('*.log'), reverse=True)
if not log_files:
return {
'status': 'error',
'message': '未找到日志文件',
'details': {
'error_type': 'FileNotFoundError',
'error_message': 'No log files found'
}
}
# 解析时间范围
start_dt = pd.to_datetime(start_time) if start_time else None
end_dt = pd.to_datetime(end_time) if end_time else pd.Timestamp.now()
# 读取并解析日志
all_logs = []
level_counts = Counter()
error_types = Counter()
module_counts = Counter()
log_pattern = re.compile(
r'(?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+-\s+'
r'(?P<name>[\w.]+)\s+-\s+'
r'(?P<level>\w+)\s+-\s+'
r'(?P<message>.*?)(?:\s+\{(?P<details>.*)\})?$'
)
for log_file in log_files:
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
match = log_pattern.match(line.strip())
if not match:
continue
log_data = match.groupdict()
log_time = pd.to_datetime(log_data['timestamp'])
# 时间范围过滤
if start_dt and log_time < start_dt:
continue
if log_time > end_dt:
continue
# 级别过滤
log_level = log_data['level'].upper()
if level and log_level != level.upper():
continue
# 模块过滤
log_module = log_data['name']
if module and log_module != module:
continue
# 解析详细信息和上下文
try:
details = json.loads('{' + log_data.get('details', '') + '}')
except:
details = {}
# 统计信息
level_counts[log_level] += 1
if log_level == 'ERROR':
error_types[log_data['message'].split(':')[0]] += 1
module_counts[log_module] += 1
# 格式化日志记录
log_entry = {
'timestamp': log_time.strftime('%Y-%m-%dT%H:%M:%S'),
'level': log_level,
'module': log_module,
'message': log_data['message'],
'details': details,
'context': {
k: v for k, v in details.items()
if k in ['experiment_id', 'run_id', 'model']
}
}
all_logs.append(log_entry)
# 计算分页
total_items = len(all_logs)
total_pages = (total_items + page_size - 1) // page_size
start_idx = (page - 1) * page_size
end_idx = min(start_idx + page_size, total_items)
# 生成摘要信息
summary = {
'error_count': level_counts.get('ERROR', 0),
'warning_count': level_counts.get('WARNING', 0),
'info_count': level_counts.get('INFO', 0),
'most_frequent_error': error_types.most_common(1)[0][0] if error_types else None,
'most_affected_module': module_counts.most_common(1)[0][0] if module_counts else None
}
self.logger.info(f"成功获取系统日志, 共{total_items}条记录")
return {
'status': 'success',
'logs': all_logs[start_idx:end_idx],
'pagination': {
'current_page': page,
'page_size': page_size,
'total_pages': total_pages,
'total_items': total_items
},
'summary': summary
}
except Exception as e:
error_msg = f"获取系统日志失败: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': '获取系统日志失败',
'details': {
'error_type': type(e).__name__,
'error_message': str(e)
}
}