import mlflow from mlflow.tracking import MlflowClient import pandas as pd from typing import Dict, List, Optional import logging from pathlib import Path from datetime import datetime import yaml import json import time import os import numpy as np from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, mean_absolute_error, mean_squared_error, r2_score, explained_variance_score, adjusted_rand_score, homogeneity_score, completeness_score, silhouette_score ) import torch from torch.utils.data import DataLoader, TensorDataset class ModelManager: """模型管理类""" def __init__(self, config: Dict = None): """初始化模型管理器""" self.config = config or {} self.logger = logging.getLogger(__name__) self._setup_logging() self._metrics_map() # 初始化MLflow客户端 self.mlflow_uri = self.config.get('mlflow_uri', 'http://10.0.0.202:5000') mlflow.set_tracking_uri(self.mlflow_uri) self.client = MlflowClient() def _setup_logging(self): """设置日志""" log_dir = Path('.log') log_dir.mkdir(exist_ok=True) file_handler = logging.FileHandler( log_dir / f'model_manager_{datetime.now():%Y%m%d_%H%M%S}.log' ) file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) self.logger.addHandler(file_handler) self.logger.setLevel(logging.INFO) def _metrics_map(self): self.metrics_map={ 'accuracy' : accuracy_score, 'precision' : precision_score, 'recall' : recall_score, 'f1' : f1_score, 'mae' : mean_absolute_error, 'mse' : mean_squared_error, # 'rmse' : np.sqrt(mean_absolute_error), # 这里要特殊处理一下 'r2': r2_score, 'explained_variance' : explained_variance_score, 'adjusted_rand' : adjusted_rand_score, 'homogeneity' : homogeneity_score, 'completeness': completeness_score, 'silhouette' : silhouette_score } def get_finished_models( self, page: int = 1, page_size: int = 10, experiment_name: Optional[str] = None ) -> Dict: """ 获取已训练完成的模型列表 Args: page: 页码 page_size: 每页数量 experiment_name: 实验名称过滤 Returns: 模型列表信息 """ try: # 获取所有实验 if experiment_name: experiment = mlflow.get_experiment_by_name(experiment_name) if experiment is None: return { 'status': 'error', 'message': f'Experiment {experiment_name} not found' } experiments = [experiment] else: experiments = mlflow.search_experiments() # 获取所有运行记录 all_runs = [] for exp in experiments: runs = mlflow.search_runs( experiment_ids=[exp.experiment_id], filter_string="status = 'FINISHED'" ) all_runs.extend(runs.to_dict('records')) # 计算分页 total_count = len(all_runs) start_idx = (page - 1) * page_size end_idx = start_idx + page_size page_runs = all_runs[start_idx:end_idx] # 格式化模型信息 models = [] for run in page_runs: print(run.keys()) # 收集所有参数 params = {} metrics = {} for key, value in run.items(): if key.startswith('params.'): params[key.replace('params.', '')] = value elif key.startswith('metrics.'): metrics[key.replace('metrics.', '')] = value # 转换时间为本地时间 start_time = pd.to_datetime(run['start_time']).tz_convert('Asia/Shanghai') end_time = pd.to_datetime(run['end_time']).tz_convert('Asia/Shanghai') # 构建模型信息 model_info = { 'run_id': run['run_id'], 'experiment_id': run['experiment_id'], 'algorithm': params['algorithm'], # 从配置或其他地方获取 'task_type': params['task_type'], # 从配置或其他地方获取 'dataset': params['dataset'], # 从配置或其他地方获取 'training_start_time': start_time.strftime('%Y-%m-%d %H:%M:%S'), # 格式化为本地时间字符串 'training_end_time': end_time.strftime('%Y-%m-%d %H:%M:%S'), 'metrics': metrics, 'parameters': { k: v for k, v in params.items() if k not in ['principle', 'advantages', 'disadvantages'] }, 'algorithm_info': { 'principle': params.get('principle', ''), 'advantages': params.get('advantages', ''), 'disadvantages': params.get('disadvantages', '') }, 'mlflow_run_id': run['run_id'], 'model_path': f"models/{run['run_id']}" } models.append(model_info) return { 'status': 'success', 'models': models, 'total_count': total_count, 'page': page, 'page_size': page_size } except Exception as e: error_msg = f"Error getting finished models: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg } def get_experiments( self, page: int = 1, page_size: int = 10, include_deleted: bool = False ) -> Dict: """ 获取MLflow中保存的实验列表 Args: page: 页码 page_size: 每页数量 include_deleted: 是否包含已删除的实验 Returns: 实验列表信息 """ try: # 获取所有实验 experiments = mlflow.search_experiments() # 过滤已删除的实验 if not include_deleted: experiments = [exp for exp in experiments if exp.lifecycle_stage == 'active'] # 计算分页 total_count = len(experiments) start_idx = (page - 1) * page_size end_idx = start_idx + page_size page_experiments = experiments[start_idx:end_idx] # 格式化实验信息 experiment_list = [] for exp in page_experiments: # 获取实验的运行次数 runs = mlflow.search_runs( experiment_ids=[exp.experiment_id], filter_string="status = 'FINISHED'" ) # 获取最后更新时间并转换为本地时间 if len(runs) > 0: last_update_time = pd.to_datetime(runs['end_time'].max()) else: last_update_time = pd.to_datetime(exp.creation_time) # 创建时间转换为本地时间 creation_time = pd.to_datetime(exp.creation_time) experiment_info = { 'experiment_id': exp.experiment_id, 'name': exp.name, 'artifact_location': exp.artifact_location, 'lifecycle_stage': exp.lifecycle_stage, 'creation_time': creation_time.strftime('%Y-%m-%d %H:%M:%S'), 'last_update_time': last_update_time.strftime('%Y-%m-%d %H:%M:%S'), 'tags': exp.tags, 'runs_count': len(runs) } experiment_list.append(experiment_info) return { 'status': 'success', 'experiments': experiment_list, 'total_count': total_count, 'page': page, 'page_size': page_size } except Exception as e: error_msg = f"Error getting experiments: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg } def delete_model(self, run_id: str) -> Dict: """ 删除指定的训练好的模型 Args: run_id: MLflow运行ID Returns: 删除操作的结果信息 """ try: # 获取运行信息 run = self.client.get_run(run_id) if not run: return { 'status': 'error', 'message': f'未找到运行ID为 {run_id} 的模型' } # 获取实验ID experiment_id = run.info.experiment_id # 获取模型信息 run_data = mlflow.get_run(run_id) model_name = run_data.data.params.get('algorithm', 'Unknown') # 获取工件列表 artifacts = [] for artifact in self.client.list_artifacts(run_id): artifacts.append(artifact.path) # 删除运行记录 self.client.delete_run(run_id) # 记录日志 self.logger.info(f"已删除模型 - Run ID: {run_id}, 实验ID: {experiment_id}, 模型: {model_name}") return { 'status': 'success', 'message': '模型删除成功', 'details': { 'run_id': run_id, 'experiment_id': experiment_id, 'model_name': model_name, 'deleted_artifacts': artifacts } } except Exception as e: error_msg = f"删除模型时发生错误: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg } def predict( self, run_id: str, data_path: str, output_path: str, batch_size: int = 32, device: str = 'cuda' if torch.cuda.is_available() else 'cpu', return_proba: bool = True, metrics: List[str] = None ) -> Dict: """ 使用指定的模型进行预测 Args: run_id: MLflow运行ID data_path: 输入数据路径 output_path: 预测结果保存路径 batch_size: 批处理大小 device: 计算设备 ('cuda' or 'cpu') return_proba: 是否返回概率预测 metrics: 评估指标列表 Returns: 预测结果信息 """ # try: start_time = time.time() # 获取模型信息 run = self.client.get_run(run_id) if not run: return { 'status': 'error', 'message': f'未找到运行ID为 {run_id} 的模型' } # 加载模型 model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") model_name = run.data.params.get('algorithm', 'Unknown') # 加载数据 try: data = pd.read_csv(data_path) if 'label' in data.columns: y_true = data.pop('label').values has_labels = True elif 'target' in data.columns: y_true = data.pop('target').values has_labels = True else: has_labels = False X = data.values except Exception as e: return { 'status': 'error', 'message': '数据加载失败', 'details': { 'error_type': type(e).__name__, 'error_message': str(e) } } # 创建预测ID pred_id = f"pred_{datetime.now():%Y%m%d_%H%M%S}" # 进行预测 if isinstance(model, torch.nn.Module): # PyTorch模型预测 model.to(device) model.eval() dataset = TensorDataset(torch.FloatTensor(X)) dataloader = DataLoader(dataset, batch_size=batch_size) predictions = [] probas = [] with torch.no_grad(): for batch in dataloader: batch = batch[0].to(device) outputs = model(batch) if return_proba: proba = torch.softmax(outputs, dim=1) probas.append(proba.cpu().numpy()) preds = outputs.argmax(dim=1) predictions.append(preds.cpu().numpy()) predictions = np.concatenate(predictions) if return_proba: probas = np.concatenate(probas) else: # 其他模型预测 predictions = model.predict(X) if return_proba and hasattr(model, 'predict_proba'): probas = model.predict_proba(X) else: probas = [] # 计算评估指标 metrics_results = {} if has_labels and metrics: for metric in metrics: if metric in self.metrics_map.keys(): metrics_results[metric] = float(self.metrics_map[metric](y_true, predictions)) # 保存预测结果 results_df = pd.DataFrame({ 'prediction': predictions }) if return_proba and len(probas) > 0: for i in range(probas.shape[1]): results_df[f'probability_{i}'] = probas[:, i] # 确保输出目录存在 os.makedirs(os.path.dirname(output_path), exist_ok=True) results_df.to_csv(output_path, index=False) # 计算执行时间 execution_time = time.time() - start_time # 记录日志 self.logger.info( f"预测完成 - Run ID: {run_id}, 模型: {model_name}, " f"样本数: {len(predictions)}, 耗时: {execution_time:.2f}s" ) return { 'status': 'success', 'prediction': { 'id': pred_id, 'run_id': run_id, 'model_name': model_name, 'output_file': output_path, 'prediction_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'samples_count': len(predictions), 'metrics': metrics_results, 'execution_time': f"{execution_time:.2f}s" } } # except Exception as e: # error_msg = f"预测过程发生错误: {str(e)}" # self.logger.error(error_msg) # return { # 'status': 'error', # 'message': '模型预测失败', # 'details': { # 'error_type': type(e).__name__, # 'error_message': str(e) # } # }