from typing import Dict, List, Optional, Any, Union from pathlib import Path import logging from datetime import datetime import yaml import mlflow from mlflow.tracking import MlflowClient import pandas as pd import numpy as np import os import json import time import uuid from sklearn.model_selection import GridSearchCV, RandomizedSearchCV from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, mean_absolute_error, mean_squared_error, r2_score, explained_variance_score ) import importlib from hyperopt import fmin, tpe, hp, STATUS_OK, Trials from optuna import create_study import optuna class OptimizeManager: """模型优化管理类""" def __init__(self, config: Dict = None) -> None: self.config = config or {} self.logger = logging.getLogger(__name__) self._setup_logging() self.method_config = self._load_method_config() self.parameter_config = self._load_parameter_config() # 初始化MLflow客户端 self.mlflow_uri = self.config.get('mlflow_uri', 'http://127.0.0.1:5000') mlflow.set_tracking_uri(self.mlflow_uri) self.client = MlflowClient() # 优化任务存储目录 self.optimize_dir = Path('optimized_models') self.optimize_dir.mkdir(exist_ok=True) def _setup_logging(self): """设置日志""" # 相对于程序运行目录 log_dir = Path('.log') log_dir.mkdir(exist_ok=True) file_handler = logging.FileHandler( log_dir / f'optimize_manager_{datetime.now():%Y%m%d_%H%M%S}.log' ) file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) self.logger.addHandler(file_handler) self.logger.setLevel(logging.INFO) def _load_method_config(self) -> Dict: """加载模型优化方法配置文件""" config_path = Path('optimize/method.yaml') if not config_path.exists(): raise FileNotFoundError(f"Method config file not found at {config_path}") with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) self.logger.info("Successfully loaded optimize method config") return config def _load_parameter_config(self) -> Dict: """加载优化方法参数配置文件""" config_path = Path('optimize/parameter.yaml') if not config_path.exists(): self.logger.error("Error loading optimize parameter config") raise FileNotFoundError(f"Parameter config file not found at {config_path}") with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) self.logger.info("Successfully loaded optimize parameter config") return config def get_optimize_methods(self) -> Dict: """获取所有优化方法列表""" try: methods = [] for name, details in self.method_config['optimization_methods'].items(): methods.append({ 'name': name, 'description': details.get('description', ''), 'advantages': details.get('advantages', []), 'disadvantages': details.get('disadvantages', []), 'applicable_scenarios': details.get('applicable_scenarios', []) }) return { 'status': 'success', 'methods': methods } except Exception as e: error_msg = f"Error getting optimization methods: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg } def get_optimize_method_details(self, method_name: str) -> Dict: """获取指定优化方法的详细信息""" try: if method_name not in self.method_config['optimization_methods']: return { 'status': 'error', 'message': f"Method {method_name} not found" } method_details = self.method_config['optimization_methods'][method_name] parameters = [] if method_name in self.parameter_config['optimization_methods']: parameters = self.parameter_config['optimization_methods'][method_name].get('parameters', []) return { 'status': 'success', 'method': { 'name': method_name, 'description': method_details.get('description', ''), 'advantages': method_details.get('advantages', []), 'disadvantages': method_details.get('disadvantages', []), 'applicable_scenarios': method_details.get('applicable_scenarios', []), 'parameters': parameters } } except Exception as e: error_msg = f"Error getting optimization method details: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg } def _get_model_from_run(self, run_id: str): """从MLflow运行中获取模型""" try: run = self.client.get_run(run_id) model_uri = f"runs:/{run_id}/model" model = mlflow.sklearn.load_model(model_uri) dataset = run.data.params.get('dataset', None) # 获取模型类型 model_type = run.data.params.get('algorithm', 'Unknown') return model, model_type, dataset except Exception as e: error_msg = f"Error loading model from run {run_id}: {str(e)}" self.logger.error(error_msg) raise ValueError(error_msg) def _load_data(self, dataset: str): """加载数据集""" try: if not os.path.exists(dataset): raise FileNotFoundError(f"Data file not found at {dataset}") dataset_train_name = 'train_'+dataset.split('/')[-1]+'.csv' dataset_val_name = 'val_'+dataset.split('/')[-1]+'.csv' data_train = pd.read_csv(os.path.join(dataset, dataset_train_name)) data_val = pd.read_csv(os.path.join(dataset, dataset_val_name)) X_train = data_train.drop('target', axis=1) y_train = data_train['target'] X_val = data_val.drop('target', axis=1) y_val = data_val['target'] return X_train, y_train, X_val, y_val except Exception as e: error_msg = f"Error loading data from {dataset}: {str(e)}" self.logger.error(error_msg) raise ValueError(error_msg) def _calculate_metrics(self, y_true, y_pred, task_type: str) -> Dict: """计算评估指标""" metrics = {} if task_type == 'classification': # 分类指标 metrics['accuracy'] = float(accuracy_score(y_true, y_pred)) metrics['precision'] = float(precision_score(y_true, y_pred, average='weighted')) metrics['recall'] = float(recall_score(y_true, y_pred, average='weighted')) metrics['f1'] = float(f1_score(y_true, y_pred, average='weighted')) # 如果是二分类问题,计算ROC AUC if len(np.unique(y_true)) == 2: metrics['roc_auc'] = float(roc_auc_score(y_true, y_pred)) elif task_type == 'regression': # 回归指标 metrics['mae'] = float(mean_absolute_error(y_true, y_pred)) metrics['mse'] = float(mean_squared_error(y_true, y_pred)) metrics['rmse'] = float(np.sqrt(metrics['mse'])) metrics['r2'] = float(r2_score(y_true, y_pred)) metrics['explained_variance'] = float(explained_variance_score(y_true, y_pred)) return metrics def _get_optimizer(self, method: str, model, parameters: Dict, scoring: str, cv: int, n_jobs: int, verbose: int): """获取优化器实例""" if method == 'GridSearchCV': return GridSearchCV( estimator=model, param_grid=parameters.get('param_grid', {}), scoring=scoring, cv=cv, n_jobs=n_jobs, verbose=verbose, return_train_score=True ) elif method == 'RandomizedSearchCV': return RandomizedSearchCV( estimator=model, param_distributions=parameters.get('param_distributions', {}), n_iter=parameters.get('n_iter', 10), scoring=scoring, cv=cv, n_jobs=n_jobs, verbose=verbose, random_state=parameters.get('random_state', 42), return_train_score=True ) else: raise ValueError(f"Unsupported optimization method: {method}") def run_optimization(self, run_id: str, method: str, parameters: Dict, data_path: str, output_dir: str, experiment_name: str) -> Dict: """执行模型优化""" try: # 生成优化任务ID task_id = f"opt_{datetime.now():%Y%m%d}_{uuid.uuid4().hex[:6]}" task_dir = self.optimize_dir / task_id task_dir.mkdir(exist_ok=True) # 记录开始时间 start_time = datetime.now() # 加载原始模型 model, model_type, dataset = self._get_model_from_run(run_id) # 加载数据 X_train, y_train, X_val, y_val = self._load_data(dataset) # 获取任务类型 run = self.client.get_run(run_id) task_type = run.data.params.get('task_type', 'classification') # 设置MLflow实验 mlflow.set_experiment(experiment_name) with mlflow.start_run(run_name=f"optimize_{method}_{task_id}") as opt_run: # 记录原始模型信息 mlflow.log_param('original_run_id', run_id) mlflow.log_param('original_model', model_type) mlflow.log_param('optimization_method', method) mlflow.log_param('data_path', dataset) # 记录优化参数 for param_name, param_value in parameters.items(): if isinstance(param_value, dict): mlflow.log_param(param_name, json.dumps(param_value)) else: mlflow.log_param(param_name, param_value) # 获取优化器 optimizer = self._get_optimizer( method=method, model=model, parameters=parameters, scoring=parameters.get('scoring', 'accuracy'), cv=parameters.get('cv', 5), n_jobs=parameters.get('n_jobs', -1), verbose=parameters.get('verbose', 1) ) # 执行优化 self.logger.info(f"Starting optimization with {method} for model {model_type}") optimizer.fit(X_train, y_train) # 记录最佳参数和得分 mlflow.log_param('best_params', json.dumps(optimizer.best_params_)) mlflow.log_metric('best_score', optimizer.best_score_) # 保存CV结果 cv_results = pd.DataFrame(optimizer.cv_results_) cv_results_path = task_dir / 'cv_results.csv' cv_results.to_csv(cv_results_path, index=False) mlflow.log_artifact(str(cv_results_path)) # 计算CV结果摘要 cv_results_summary = { 'mean_fit_time': float(cv_results['mean_fit_time'].mean()), 'mean_score_time': float(cv_results['mean_score_time'].mean()), 'param_combinations': len(cv_results), 'score_range': [float(cv_results['mean_test_score'].min()), float(cv_results['mean_test_score'].max())] } # 使用最佳参数重新训练模型 best_model = optimizer.best_estimator_ # 保存优化后的模型 mlflow.sklearn.log_model(best_model, "optimized_model") # 记录结束时间和执行时间 end_time = datetime.now() execution_time = end_time - start_time mlflow.log_param('execution_time', str(execution_time)) mlflow.log_param('start_time', start_time.isoformat()) mlflow.log_param('end_time', end_time.isoformat()) # 保存优化任务信息 print("run_id: ", opt_run.info.run_id) task_info = { 'id': task_id, 'run_id': opt_run.info.run_id, 'experiment_id': opt_run.info.experiment_id, 'task_id': task_id, 'method': method, 'original_model': model_type, 'parameters': parameters, 'best_params': optimizer.best_params_, 'best_score': float(optimizer.best_score_), 'cv_results_summary': cv_results_summary, 'cv_results_file': str(cv_results_path), 'optimized_model_id': run_id, 'execution_time': str(execution_time), 'experiment_name': experiment_name, 'status': 'completed', 'start_time': start_time.isoformat(), 'end_time': end_time.isoformat() } # 保存任务信息到文件 with open(task_dir / 'task_info.json', 'w') as f: json.dump(task_info, f, indent=2) self.logger.info(f"Optimization completed. Task ID: {task_id}, Run ID: {opt_run.info.run_id}") return { 'status': 'success', 'optimization': { 'id': task_id, 'run_id': opt_run.info.run_id, 'experiment_id': opt_run.info.experiment_id, 'task_id': task_id, 'method': method, 'original_model': model_type, 'best_params': optimizer.best_params_, 'best_score': float(optimizer.best_score_), 'cv_results_file': str(cv_results_path), 'optimized_model_id': run_id, 'execution_time': str(execution_time), 'status': 'completed', 'timestamp': end_time.isoformat() } } except Exception as e: error_msg = f"Error running optimization: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg, 'details': { 'error_type': type(e).__name__, 'error_message': str(e) } } def get_optimization_tasks(self, experiment_name: Optional[str] = None, page: int = 1, page_size: int = 10, status:str = 0) -> Dict: """获取优化任务列表""" try: tasks = [] # 遍历优化任务目录 for task_dir in sorted(self.optimize_dir.glob('*'), reverse=True): if not task_dir.is_dir(): continue task_info_path = task_dir / 'task_info.json' if not task_info_path.exists(): continue with open(task_info_path, 'r') as f: task_info = json.load(f) # 如果指定了实验名称,则过滤 if experiment_name and task_info.get('experiment_name', None) != experiment_name: continue # 添加简化的任务信息 tasks.append({ 'id': task_info['id'], 'run_id': task_info['run_id'], 'task_id': task_info['task_id'], 'method': task_info['method'], 'original_model': task_info['original_model'], 'best_score': task_info['best_score'], 'status': task_info['status'], 'start_time': task_info['start_time'], 'end_time': task_info['end_time'], 'experiment_name': task_info.get('experiment_name', None), 'optimized_model_id': task_info['optimized_model_id'] }) # 分页 total_count = len(tasks) start_idx = (page - 1) * page_size end_idx = start_idx + page_size paginated_tasks = tasks[start_idx:end_idx] return { 'status': 'success', 'tasks': paginated_tasks, 'total_count': total_count, 'page': page, 'page_size': page_size } except Exception as e: error_msg = f"Error getting optimization tasks: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg } def get_optimization_task(self, task_id: str) -> Dict: """获取优化任务详情""" try: task_dir = self.optimize_dir / task_id task_info_path = task_dir / 'task_info.json' if not task_info_path.exists(): return { 'status': 'error', 'message': f"Task {task_id} not found" } with open(task_info_path, 'r') as f: task_info = json.load(f) return { 'status': 'success', 'task': task_info } except Exception as e: error_msg = f"Error getting optimization task {task_id}: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg } def cancel_optimization_task(self, task_id: str) -> Dict: """取消优化任务""" try: task_dir = self.optimize_dir / task_id task_info_path = task_dir / 'task_info.json' if not task_info_path.exists(): return { 'status': 'error', 'message': "任务不存在", 'details': { 'reason': "任务不存在" } } with open(task_info_path, 'r') as f: task_info = json.load(f) if task_info['status'] == 'completed': return { 'status': 'error', 'message': "无法取消任务", 'details': { 'reason': "任务已完成" } } # 更新任务状态 task_info['status'] = 'cancelled' task_info['cancel_time'] = datetime.now().isoformat() with open(task_info_path, 'w') as f: json.dump(task_info, f, indent=2) # 如果有MLflow运行ID,尝试终止运行 if 'optimized_model_id' in task_info: try: self.client.set_terminated(task_info['optimized_model_id']) except: pass return { 'status': 'success', 'message': "优化任务已取消", 'task_id': task_id, 'cancel_time': task_info['cancel_time'] } except Exception as e: error_msg = f"Error cancelling optimization task {task_id}: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg } def delete_optimization_task(self, task_id: str) -> Dict: """删除优化任务""" try: task_dir = self.optimize_dir / task_id task_info_path = task_dir / 'task_info.json' if not task_info_path.exists(): return { 'status': 'error', 'message': "删除任务失败", 'details': { 'reason': "任务不存在" } } with open(task_info_path, 'r') as f: task_info = json.load(f) if task_info['status'] == 'running': return { 'status': 'error', 'message': "删除任务失败", 'details': { 'reason': "任务正在运行中" } } # mlflow 删除实验 self.client.delete_run(task_info['run_id']) # 收集要删除的文件 deleted_files = [] for file_path in task_dir.glob('*'): deleted_files.append(str(file_path)) # 删除任务目录 import shutil shutil.rmtree(task_dir) return { 'status': 'success', 'message': "优化任务已删除", 'details': { 'task_id': task_id, 'deleted_files': deleted_files } } except Exception as e: error_msg = f"Error deleting optimization task {task_id}: {str(e)}" self.logger.error(error_msg) return { 'status': 'error', 'message': error_msg }