498 lines
19 KiB
Python
498 lines
19 KiB
Python
import psutil
|
|
import GPUtil
|
|
import platform
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional
|
|
import logging
|
|
from pathlib import Path
|
|
import time
|
|
import mlflow
|
|
from mlflow.tracking import MlflowClient
|
|
import pandas as pd
|
|
import json
|
|
from collections import Counter
|
|
import re
|
|
|
|
class SystemMonitor:
|
|
"""系统资源监控类"""
|
|
|
|
def __init__(self):
|
|
"""初始化系统监控器"""
|
|
self.logger = logging.getLogger(__name__)
|
|
self._setup_logging()
|
|
self.mlflow_client = MlflowClient()
|
|
|
|
def _setup_logging(self):
|
|
"""设置日志"""
|
|
log_dir = Path('.log')
|
|
log_dir.mkdir(exist_ok=True)
|
|
|
|
file_handler = logging.FileHandler(
|
|
log_dir / f'system_monitor_{datetime.now():%Y%m%d_%H%M%S}.log'
|
|
)
|
|
file_handler.setFormatter(
|
|
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
)
|
|
self.logger.addHandler(file_handler)
|
|
self.logger.setLevel(logging.INFO)
|
|
|
|
def _get_gpu_info(self) -> List[Dict]:
|
|
"""获取GPU信息"""
|
|
try:
|
|
gpus = GPUtil.getGPUs()
|
|
gpu_info = []
|
|
|
|
for gpu in gpus:
|
|
# 获取GPU进程信息
|
|
processes = []
|
|
|
|
# if gpu.processes:
|
|
# for proc in gpu.processes:
|
|
# process = psutil.Process(proc.pid)
|
|
# processes.append({
|
|
# 'pid': proc.pid,
|
|
# 'name': process.name(),
|
|
# 'memory': proc.gpu_memory # MB
|
|
# })
|
|
|
|
|
|
gpu_info.append({
|
|
'id': gpu.id,
|
|
'name': gpu.name,
|
|
'memory': {
|
|
'total': gpu.memoryTotal, # MB
|
|
'used': gpu.memoryUsed, # MB
|
|
'free': gpu.memoryFree # MB
|
|
},
|
|
'utilization': {
|
|
'gpu': gpu.load * 100, # %
|
|
'memory': gpu.memoryUtil * 100 # %
|
|
},
|
|
'temperature': gpu.temperature, # °C
|
|
'power': {
|
|
'draw': gpu.powerDraw if hasattr(gpu, 'powerDraw') else None, # W
|
|
'limit': gpu.powerLimit if hasattr(gpu, 'powerLimit') else None # W
|
|
},
|
|
'processes': processes
|
|
})
|
|
|
|
return gpu_info
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"获取GPU信息失败: {str(e)}")
|
|
return []
|
|
|
|
def _get_cpu_info(self) -> Dict:
|
|
"""获取CPU信息"""
|
|
try:
|
|
cpu_freq = psutil.cpu_freq()
|
|
cpu_temp = psutil.sensors_temperatures().get('coretemp', [None])[0]
|
|
|
|
return {
|
|
'count': {
|
|
'physical': psutil.cpu_count(logical=False),
|
|
'logical': psutil.cpu_count(logical=True)
|
|
},
|
|
'utilization': psutil.cpu_percent(interval=1), # %
|
|
'frequency': {
|
|
'current': cpu_freq.current / 1000 if cpu_freq else None, # GHz
|
|
'min': cpu_freq.min / 1000 if cpu_freq else None, # GHz
|
|
'max': cpu_freq.max / 1000 if cpu_freq else None # GHz
|
|
},
|
|
'temperature': cpu_temp.current if cpu_temp else None, # °C
|
|
'memory': self._get_memory_info(),
|
|
'swap': self._get_swap_info()
|
|
}
|
|
except Exception as e:
|
|
self.logger.error(f"获取CPU信息失败: {str(e)}")
|
|
return {}
|
|
|
|
def _get_memory_info(self) -> Dict:
|
|
"""获取内存信息"""
|
|
try:
|
|
mem = psutil.virtual_memory()
|
|
return {
|
|
'total': mem.total // (1024 * 1024), # MB
|
|
'used': mem.used // (1024 * 1024), # MB
|
|
'free': mem.free // (1024 * 1024), # MB
|
|
'percent': mem.percent # %
|
|
}
|
|
except Exception as e:
|
|
self.logger.error(f"获取内存信息失败: {str(e)}")
|
|
return {}
|
|
|
|
def _get_swap_info(self) -> Dict:
|
|
"""获取交换内存信息"""
|
|
try:
|
|
swap = psutil.swap_memory()
|
|
return {
|
|
'total': swap.total // (1024 * 1024), # MB
|
|
'used': swap.used // (1024 * 1024), # MB
|
|
'free': swap.free // (1024 * 1024), # MB
|
|
'percent': swap.percent # %
|
|
}
|
|
except Exception as e:
|
|
self.logger.error(f"获取交换内存信息失败: {str(e)}")
|
|
return {}
|
|
|
|
def _get_disk_info(self) -> Dict:
|
|
"""获取磁盘信息"""
|
|
try:
|
|
disk_info = {}
|
|
for partition in psutil.disk_partitions():
|
|
try:
|
|
usage = psutil.disk_usage(partition.mountpoint)
|
|
disk_info[partition.mountpoint] = {
|
|
'total': usage.total // (1024 * 1024), # MB
|
|
'used': usage.used // (1024 * 1024), # MB
|
|
'free': usage.free // (1024 * 1024), # MB
|
|
'percent': usage.percent # %
|
|
}
|
|
except (PermissionError, OSError):
|
|
continue
|
|
return disk_info
|
|
except Exception as e:
|
|
self.logger.error(f"获取磁盘信息失败: {str(e)}")
|
|
return {}
|
|
|
|
def _get_process_info(self) -> Dict:
|
|
"""获取进程信息"""
|
|
try:
|
|
processes = {
|
|
'total': len(psutil.pids()),
|
|
'running': 0,
|
|
'sleeping': 0
|
|
}
|
|
|
|
for proc in psutil.process_iter(['status']):
|
|
try:
|
|
status = proc.info['status']
|
|
if status == 'running':
|
|
processes['running'] += 1
|
|
elif status == 'sleeping':
|
|
processes['sleeping'] += 1
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
|
|
return processes
|
|
except Exception as e:
|
|
self.logger.error(f"获取进程信息失败: {str(e)}")
|
|
return {}
|
|
|
|
def get_system_resources(self) -> Dict:
|
|
"""
|
|
获取系统资源使用情况
|
|
|
|
Returns:
|
|
系统资源信息
|
|
"""
|
|
try:
|
|
resources = {
|
|
'gpu': self._get_gpu_info(),
|
|
'cpu': self._get_cpu_info(),
|
|
'disk': self._get_disk_info(),
|
|
'processes': self._get_process_info()
|
|
}
|
|
|
|
self.logger.info("成功获取系统资源信息")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'resources': resources,
|
|
'timestamp': datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"获取系统资源信息失败: {str(e)}"
|
|
self.logger.error(error_msg)
|
|
return {
|
|
'status': 'error',
|
|
'message': '获取资源信息失败',
|
|
'details': {
|
|
'error_type': type(e).__name__,
|
|
'error_message': str(e)
|
|
}
|
|
}
|
|
|
|
def get_training_history(
|
|
self,
|
|
page: int = 1,
|
|
page_size: int = 10,
|
|
start_time: Optional[str] = None,
|
|
end_time: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
experiment_name: Optional[str] = None
|
|
) -> Dict:
|
|
"""
|
|
获取训练历史记录
|
|
|
|
Args:
|
|
page: 页码
|
|
page_size: 每页数量
|
|
start_time: 开始时间 (YYYY-MM-DD)
|
|
end_time: 结束时间 (YYYY-MM-DD)
|
|
status: 运行状态过滤
|
|
experiment_name: 实验名称过滤
|
|
|
|
Returns:
|
|
训练历史记录
|
|
"""
|
|
try:
|
|
# 构建过滤条件
|
|
filter_string = []
|
|
|
|
if status:
|
|
filter_string.append(f"status = '{status.upper()}'")
|
|
|
|
if start_time:
|
|
start_timestamp = int(pd.Timestamp(start_time).timestamp() * 1000)
|
|
filter_string.append(f"start_time >= {start_timestamp}")
|
|
|
|
if end_time:
|
|
end_timestamp = int(pd.Timestamp(end_time).timestamp() * 1000)
|
|
filter_string.append(f"start_time <= {end_timestamp}")
|
|
|
|
filter_string = " and ".join(filter_string) if filter_string else None
|
|
|
|
# 获取实验ID
|
|
if experiment_name:
|
|
experiment = mlflow.get_experiment_by_name(experiment_name)
|
|
if experiment is None:
|
|
return {
|
|
'status': 'error',
|
|
'message': f'实验 {experiment_name} 不存在'
|
|
}
|
|
experiment_ids = [experiment.experiment_id]
|
|
else:
|
|
experiments = mlflow.search_experiments()
|
|
experiment_ids = [exp.experiment_id for exp in experiments]
|
|
|
|
# 获取所有运行记录
|
|
all_runs = []
|
|
for exp_id in experiment_ids:
|
|
runs = mlflow.search_runs(
|
|
experiment_ids=[exp_id],
|
|
filter_string=filter_string,
|
|
order_by=['start_time DESC']
|
|
)
|
|
if not runs.empty:
|
|
all_runs.extend(runs.to_dict('records'))
|
|
|
|
# 计算分页
|
|
total_items = len(all_runs)
|
|
total_pages = (total_items + page_size - 1) // page_size
|
|
start_idx = (page - 1) * page_size
|
|
end_idx = min(start_idx + page_size, total_items)
|
|
|
|
# 格式化运行记录
|
|
history = []
|
|
for run in all_runs[start_idx:end_idx]:
|
|
# 获取实验信息
|
|
experiment = mlflow.get_experiment(run['experiment_id'])
|
|
|
|
# 计算持续时间
|
|
start_time = pd.to_datetime(run['start_time'])
|
|
end_time = pd.to_datetime(run['end_time']) if run['end_time'] else pd.Timestamp.now()
|
|
duration = end_time - start_time
|
|
|
|
# 收集参数和指标
|
|
params = {}
|
|
metrics = {}
|
|
tags = {}
|
|
for key, value in run.items():
|
|
if key.startswith('params.'):
|
|
params[key.replace('params.', '')] = value
|
|
elif key.startswith('metrics.'):
|
|
metrics[key.replace('metrics.', '')] = value
|
|
elif key.startswith('tags.'):
|
|
tags[key.replace('tags.', '')] = value
|
|
|
|
# 格式化记录
|
|
history_item = {
|
|
'run_id': run['run_id'],
|
|
'experiment_id': run['experiment_id'],
|
|
'experiment_name': experiment.name,
|
|
'model_name': params.get('algorithm', 'Unknown'),
|
|
'dataset': params.get('dataset', 'Unknown'),
|
|
'start_time': start_time.strftime('%Y-%m-%dT%H:%M:%S'),
|
|
'end_time': end_time.strftime('%Y-%m-%dT%H:%M:%S'),
|
|
'duration': str(duration).split('.')[0], # 去除微秒部分
|
|
'status': run['status'],
|
|
'parameters': params,
|
|
'metrics': metrics,
|
|
'tags': tags
|
|
}
|
|
history.append(history_item)
|
|
|
|
self.logger.info(f"成功获取训练历史记录, 共{total_items}条记录")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'history': history,
|
|
'pagination': {
|
|
'current_page': page,
|
|
'page_size': page_size,
|
|
'total_pages': total_pages,
|
|
'total_items': total_items
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"获取训练历史失败: {str(e)}"
|
|
self.logger.error(error_msg)
|
|
return {
|
|
'status': 'error',
|
|
'message': '获取训练历史失败',
|
|
'details': {
|
|
'error_type': type(e).__name__,
|
|
'error_message': str(e)
|
|
}
|
|
}
|
|
|
|
def get_system_logs(
|
|
self,
|
|
level: Optional[str] = None,
|
|
start_time: Optional[str] = None,
|
|
end_time: Optional[str] = None,
|
|
module: Optional[str] = None,
|
|
page: int = 1,
|
|
page_size: int = 20
|
|
) -> Dict:
|
|
"""
|
|
获取系统日志
|
|
|
|
Args:
|
|
level: 日志级别过滤
|
|
start_time: 开始时间 (YYYY-MM-DDThh:mm:ss)
|
|
end_time: 结束时间 (YYYY-MM-DDThh:mm:ss)
|
|
module: 模块名称过滤
|
|
page: 页码
|
|
page_size: 每页数量
|
|
|
|
Returns:
|
|
系统日志信息
|
|
"""
|
|
try:
|
|
# 获取所有日志文件
|
|
log_dir = Path('.log')
|
|
log_files = sorted(log_dir.glob('*.log'), reverse=True)
|
|
|
|
if not log_files:
|
|
return {
|
|
'status': 'error',
|
|
'message': '未找到日志文件',
|
|
'details': {
|
|
'error_type': 'FileNotFoundError',
|
|
'error_message': 'No log files found'
|
|
}
|
|
}
|
|
|
|
# 解析时间范围
|
|
start_dt = pd.to_datetime(start_time) if start_time else None
|
|
end_dt = pd.to_datetime(end_time) if end_time else pd.Timestamp.now()
|
|
|
|
# 读取并解析日志
|
|
all_logs = []
|
|
level_counts = Counter()
|
|
error_types = Counter()
|
|
module_counts = Counter()
|
|
|
|
log_pattern = re.compile(
|
|
r'(?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})\s+-\s+'
|
|
r'(?P<name>[\w.]+)\s+-\s+'
|
|
r'(?P<level>\w+)\s+-\s+'
|
|
r'(?P<message>.*?)(?:\s+\{(?P<details>.*)\})?$'
|
|
)
|
|
|
|
for log_file in log_files:
|
|
with open(log_file, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
match = log_pattern.match(line.strip())
|
|
if not match:
|
|
continue
|
|
|
|
log_data = match.groupdict()
|
|
log_time = pd.to_datetime(log_data['timestamp'])
|
|
|
|
# 时间范围过滤
|
|
if start_dt and log_time < start_dt:
|
|
continue
|
|
if log_time > end_dt:
|
|
continue
|
|
|
|
# 级别过滤
|
|
log_level = log_data['level'].upper()
|
|
if level and log_level != level.upper():
|
|
continue
|
|
|
|
# 模块过滤
|
|
log_module = log_data['name']
|
|
if module and log_module != module:
|
|
continue
|
|
|
|
# 解析详细信息和上下文
|
|
try:
|
|
details = json.loads('{' + log_data.get('details', '') + '}')
|
|
except:
|
|
details = {}
|
|
|
|
# 统计信息
|
|
level_counts[log_level] += 1
|
|
if log_level == 'ERROR':
|
|
error_types[log_data['message'].split(':')[0]] += 1
|
|
module_counts[log_module] += 1
|
|
|
|
# 格式化日志记录
|
|
log_entry = {
|
|
'timestamp': log_time.strftime('%Y-%m-%dT%H:%M:%S'),
|
|
'level': log_level,
|
|
'module': log_module,
|
|
'message': log_data['message'],
|
|
'details': details,
|
|
'context': {
|
|
k: v for k, v in details.items()
|
|
if k in ['experiment_id', 'run_id', 'model']
|
|
}
|
|
}
|
|
all_logs.append(log_entry)
|
|
|
|
# 计算分页
|
|
total_items = len(all_logs)
|
|
total_pages = (total_items + page_size - 1) // page_size
|
|
start_idx = (page - 1) * page_size
|
|
end_idx = min(start_idx + page_size, total_items)
|
|
|
|
# 生成摘要信息
|
|
summary = {
|
|
'error_count': level_counts.get('ERROR', 0),
|
|
'warning_count': level_counts.get('WARNING', 0),
|
|
'info_count': level_counts.get('INFO', 0),
|
|
'most_frequent_error': error_types.most_common(1)[0][0] if error_types else None,
|
|
'most_affected_module': module_counts.most_common(1)[0][0] if module_counts else None
|
|
}
|
|
|
|
self.logger.info(f"成功获取系统日志, 共{total_items}条记录")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'logs': all_logs[start_idx:end_idx],
|
|
'pagination': {
|
|
'current_page': page,
|
|
'page_size': page_size,
|
|
'total_pages': total_pages,
|
|
'total_items': total_items
|
|
},
|
|
'summary': summary
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"获取系统日志失败: {str(e)}"
|
|
self.logger.error(error_msg)
|
|
return {
|
|
'status': 'error',
|
|
'message': '获取系统日志失败',
|
|
'details': {
|
|
'error_type': type(e).__name__,
|
|
'error_message': str(e)
|
|
}
|
|
} |