MLPlatform/function/optimize_manager.py

580 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Dict, List, Optional, Any, Union
from pathlib import Path
import logging
from datetime import datetime
import yaml
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
import numpy as np
import os
import json
import time
import uuid
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
mean_absolute_error, mean_squared_error, r2_score, explained_variance_score
)
import importlib
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from optuna import create_study
import optuna
class OptimizeManager:
"""模型优化管理类"""
def __init__(self, config: Dict = None) -> None:
self.config = config or {}
self.logger = logging.getLogger(__name__)
self._setup_logging()
self.method_config = self._load_method_config()
self.parameter_config = self._load_parameter_config()
# 初始化MLflow客户端
self.mlflow_uri = self.config.get('mlflow_uri', 'http://127.0.0.1:5000')
mlflow.set_tracking_uri(self.mlflow_uri)
self.client = MlflowClient()
# 优化任务存储目录
self.optimize_dir = Path('optimized_models')
self.optimize_dir.mkdir(exist_ok=True)
def _setup_logging(self):
"""设置日志"""
# 相对于程序运行目录
log_dir = Path('.log')
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(
log_dir / f'optimize_manager_{datetime.now():%Y%m%d_%H%M%S}.log'
)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
self.logger.addHandler(file_handler)
self.logger.setLevel(logging.INFO)
def _load_method_config(self) -> Dict:
"""加载模型优化方法配置文件"""
config_path = Path('optimize/method.yaml')
if not config_path.exists():
raise FileNotFoundError(f"Method config file not found at {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
self.logger.info("Successfully loaded optimize method config")
return config
def _load_parameter_config(self) -> Dict:
"""加载优化方法参数配置文件"""
config_path = Path('optimize/parameter.yaml')
if not config_path.exists():
self.logger.error("Error loading optimize parameter config")
raise FileNotFoundError(f"Parameter config file not found at {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
self.logger.info("Successfully loaded optimize parameter config")
return config
def get_optimize_methods(self) -> Dict:
"""获取所有优化方法列表"""
try:
methods = []
for name, details in self.method_config['optimization_methods'].items():
methods.append({
'name': name,
'description': details.get('description', ''),
'advantages': details.get('advantages', []),
'disadvantages': details.get('disadvantages', []),
'applicable_scenarios': details.get('applicable_scenarios', [])
})
return {
'status': 'success',
'methods': methods
}
except Exception as e:
error_msg = f"Error getting optimization methods: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg
}
def get_optimize_method_details(self, method_name: str) -> Dict:
"""获取指定优化方法的详细信息"""
try:
if method_name not in self.method_config['optimization_methods']:
return {
'status': 'error',
'message': f"Method {method_name} not found"
}
method_details = self.method_config['optimization_methods'][method_name]
parameters = []
if method_name in self.parameter_config['optimization_methods']:
parameters = self.parameter_config['optimization_methods'][method_name].get('parameters', [])
return {
'status': 'success',
'method': {
'name': method_name,
'description': method_details.get('description', ''),
'advantages': method_details.get('advantages', []),
'disadvantages': method_details.get('disadvantages', []),
'applicable_scenarios': method_details.get('applicable_scenarios', []),
'parameters': parameters
}
}
except Exception as e:
error_msg = f"Error getting optimization method details: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg
}
def _get_model_from_run(self, run_id: str):
"""从MLflow运行中获取模型"""
try:
run = self.client.get_run(run_id)
model_uri = f"runs:/{run_id}/model"
model = mlflow.sklearn.load_model(model_uri)
dataset = run.data.params.get('dataset', None)
# 获取模型类型
model_type = run.data.params.get('algorithm', 'Unknown')
return model, model_type, dataset
except Exception as e:
error_msg = f"Error loading model from run {run_id}: {str(e)}"
self.logger.error(error_msg)
raise ValueError(error_msg)
def _load_data(self, dataset: str):
"""加载数据集"""
try:
if not os.path.exists(dataset):
raise FileNotFoundError(f"Data file not found at {dataset}")
dataset_train_name = 'train_'+dataset.split('/')[-1]+'.csv'
dataset_val_name = 'val_'+dataset.split('/')[-1]+'.csv'
data_train = pd.read_csv(os.path.join(dataset, dataset_train_name))
data_val = pd.read_csv(os.path.join(dataset, dataset_val_name))
X_train = data_train.drop('target', axis=1)
y_train = data_train['target']
X_val = data_val.drop('target', axis=1)
y_val = data_val['target']
return X_train, y_train, X_val, y_val
except Exception as e:
error_msg = f"Error loading data from {dataset}: {str(e)}"
self.logger.error(error_msg)
raise ValueError(error_msg)
def _calculate_metrics(self, y_true, y_pred, task_type: str) -> Dict:
"""计算评估指标"""
metrics = {}
if task_type == 'classification':
# 分类指标
metrics['accuracy'] = float(accuracy_score(y_true, y_pred))
metrics['precision'] = float(precision_score(y_true, y_pred, average='weighted'))
metrics['recall'] = float(recall_score(y_true, y_pred, average='weighted'))
metrics['f1'] = float(f1_score(y_true, y_pred, average='weighted'))
# 如果是二分类问题计算ROC AUC
if len(np.unique(y_true)) == 2:
metrics['roc_auc'] = float(roc_auc_score(y_true, y_pred))
elif task_type == 'regression':
# 回归指标
metrics['mae'] = float(mean_absolute_error(y_true, y_pred))
metrics['mse'] = float(mean_squared_error(y_true, y_pred))
metrics['rmse'] = float(np.sqrt(metrics['mse']))
metrics['r2'] = float(r2_score(y_true, y_pred))
metrics['explained_variance'] = float(explained_variance_score(y_true, y_pred))
return metrics
def _get_optimizer(self, method: str, model, parameters: Dict, scoring: str, cv: int, n_jobs: int, verbose: int):
"""获取优化器实例"""
if method == 'GridSearchCV':
return GridSearchCV(
estimator=model,
param_grid=parameters.get('param_grid', {}),
scoring=scoring,
cv=cv,
n_jobs=n_jobs,
verbose=verbose,
return_train_score=True
)
elif method == 'RandomizedSearchCV':
return RandomizedSearchCV(
estimator=model,
param_distributions=parameters.get('param_distributions', {}),
n_iter=parameters.get('n_iter', 10),
scoring=scoring,
cv=cv,
n_jobs=n_jobs,
verbose=verbose,
random_state=parameters.get('random_state', 42),
return_train_score=True
)
else:
raise ValueError(f"Unsupported optimization method: {method}")
def run_optimization(self, run_id: str, method: str, parameters: Dict, data_path: str,
output_dir: str, experiment_name: str) -> Dict:
"""执行模型优化"""
try:
# 生成优化任务ID
task_id = f"opt_{datetime.now():%Y%m%d}_{uuid.uuid4().hex[:6]}"
task_dir = self.optimize_dir / task_id
task_dir.mkdir(exist_ok=True)
# 记录开始时间
start_time = datetime.now()
# 加载原始模型
model, model_type, dataset = self._get_model_from_run(run_id)
# 加载数据
X_train, y_train, X_val, y_val = self._load_data(dataset)
# 获取任务类型
run = self.client.get_run(run_id)
task_type = run.data.params.get('task_type', 'classification')
# 设置MLflow实验
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=f"optimize_{method}_{task_id}") as opt_run:
# 记录原始模型信息
mlflow.log_param('original_run_id', run_id)
mlflow.log_param('original_model', model_type)
mlflow.log_param('optimization_method', method)
mlflow.log_param('data_path', dataset)
# 记录优化参数
for param_name, param_value in parameters.items():
if isinstance(param_value, dict):
mlflow.log_param(param_name, json.dumps(param_value))
else:
mlflow.log_param(param_name, param_value)
# 获取优化器
optimizer = self._get_optimizer(
method=method,
model=model,
parameters=parameters,
scoring=parameters.get('scoring', 'accuracy'),
cv=parameters.get('cv', 5),
n_jobs=parameters.get('n_jobs', -1),
verbose=parameters.get('verbose', 1)
)
# 执行优化
self.logger.info(f"Starting optimization with {method} for model {model_type}")
optimizer.fit(X_train, y_train)
# 记录最佳参数和得分
mlflow.log_param('best_params', json.dumps(optimizer.best_params_))
mlflow.log_metric('best_score', optimizer.best_score_)
# 保存CV结果
cv_results = pd.DataFrame(optimizer.cv_results_)
cv_results_path = task_dir / 'cv_results.csv'
cv_results.to_csv(cv_results_path, index=False)
mlflow.log_artifact(str(cv_results_path))
# 计算CV结果摘要
cv_results_summary = {
'mean_fit_time': float(cv_results['mean_fit_time'].mean()),
'mean_score_time': float(cv_results['mean_score_time'].mean()),
'param_combinations': len(cv_results),
'score_range': [float(cv_results['mean_test_score'].min()),
float(cv_results['mean_test_score'].max())]
}
# 使用最佳参数重新训练模型
best_model = optimizer.best_estimator_
# 保存优化后的模型
mlflow.sklearn.log_model(best_model, "optimized_model")
# 记录结束时间和执行时间
end_time = datetime.now()
execution_time = end_time - start_time
mlflow.log_param('execution_time', str(execution_time))
mlflow.log_param('start_time', start_time.isoformat())
mlflow.log_param('end_time', end_time.isoformat())
# 保存优化任务信息
print("run_id: ", opt_run.info.run_id)
task_info = {
'id': task_id,
'run_id': opt_run.info.run_id,
'experiment_id': opt_run.info.experiment_id,
'task_id': task_id,
'method': method,
'original_model': model_type,
'parameters': parameters,
'best_params': optimizer.best_params_,
'best_score': float(optimizer.best_score_),
'cv_results_summary': cv_results_summary,
'cv_results_file': str(cv_results_path),
'optimized_model_id': run_id,
'execution_time': str(execution_time),
'experiment_name': experiment_name,
'status': 'completed',
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat()
}
# 保存任务信息到文件
with open(task_dir / 'task_info.json', 'w') as f:
json.dump(task_info, f, indent=2)
self.logger.info(f"Optimization completed. Task ID: {task_id}, Run ID: {opt_run.info.run_id}")
return {
'status': 'success',
'optimization': {
'id': task_id,
'run_id': opt_run.info.run_id,
'experiment_id': opt_run.info.experiment_id,
'task_id': task_id,
'method': method,
'original_model': model_type,
'best_params': optimizer.best_params_,
'best_score': float(optimizer.best_score_),
'cv_results_file': str(cv_results_path),
'optimized_model_id': run_id,
'execution_time': str(execution_time),
'status': 'completed',
'timestamp': end_time.isoformat()
}
}
except Exception as e:
error_msg = f"Error running optimization: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg,
'details': {
'error_type': type(e).__name__,
'error_message': str(e)
}
}
def get_optimization_tasks(self, experiment_name: Optional[str] = None,
page: int = 1, page_size: int = 10, status:str = 0) -> Dict:
"""获取优化任务列表"""
try:
tasks = []
# 遍历优化任务目录
for task_dir in sorted(self.optimize_dir.glob('*'), reverse=True):
if not task_dir.is_dir():
continue
task_info_path = task_dir / 'task_info.json'
if not task_info_path.exists():
continue
with open(task_info_path, 'r') as f:
task_info = json.load(f)
# 如果指定了实验名称,则过滤
if experiment_name and task_info.get('experiment_name', None) != experiment_name:
continue
# 添加简化的任务信息
tasks.append({
'id': task_info['id'],
'run_id': task_info['run_id'],
'task_id': task_info['task_id'],
'method': task_info['method'],
'original_model': task_info['original_model'],
'best_score': task_info['best_score'],
'status': task_info['status'],
'start_time': task_info['start_time'],
'end_time': task_info['end_time'],
'experiment_name': task_info.get('experiment_name', None),
'optimized_model_id': task_info['optimized_model_id']
})
# 分页
total_count = len(tasks)
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_tasks = tasks[start_idx:end_idx]
return {
'status': 'success',
'tasks': paginated_tasks,
'total_count': total_count,
'page': page,
'page_size': page_size
}
except Exception as e:
error_msg = f"Error getting optimization tasks: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg
}
def get_optimization_task(self, task_id: str) -> Dict:
"""获取优化任务详情"""
try:
task_dir = self.optimize_dir / task_id
task_info_path = task_dir / 'task_info.json'
if not task_info_path.exists():
return {
'status': 'error',
'message': f"Task {task_id} not found"
}
with open(task_info_path, 'r') as f:
task_info = json.load(f)
return {
'status': 'success',
'task': task_info
}
except Exception as e:
error_msg = f"Error getting optimization task {task_id}: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg
}
def cancel_optimization_task(self, task_id: str) -> Dict:
"""取消优化任务"""
try:
task_dir = self.optimize_dir / task_id
task_info_path = task_dir / 'task_info.json'
if not task_info_path.exists():
return {
'status': 'error',
'message': "任务不存在",
'details': {
'reason': "任务不存在"
}
}
with open(task_info_path, 'r') as f:
task_info = json.load(f)
if task_info['status'] == 'completed':
return {
'status': 'error',
'message': "无法取消任务",
'details': {
'reason': "任务已完成"
}
}
# 更新任务状态
task_info['status'] = 'cancelled'
task_info['cancel_time'] = datetime.now().isoformat()
with open(task_info_path, 'w') as f:
json.dump(task_info, f, indent=2)
# 如果有MLflow运行ID尝试终止运行
if 'optimized_model_id' in task_info:
try:
self.client.set_terminated(task_info['optimized_model_id'])
except:
pass
return {
'status': 'success',
'message': "优化任务已取消",
'task_id': task_id,
'cancel_time': task_info['cancel_time']
}
except Exception as e:
error_msg = f"Error cancelling optimization task {task_id}: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg
}
def delete_optimization_task(self, task_id: str) -> Dict:
"""删除优化任务"""
try:
task_dir = self.optimize_dir / task_id
task_info_path = task_dir / 'task_info.json'
if not task_info_path.exists():
return {
'status': 'error',
'message': "删除任务失败",
'details': {
'reason': "任务不存在"
}
}
with open(task_info_path, 'r') as f:
task_info = json.load(f)
if task_info['status'] == 'running':
return {
'status': 'error',
'message': "删除任务失败",
'details': {
'reason': "任务正在运行中"
}
}
# mlflow 删除实验
self.client.delete_run(task_info['run_id'])
# 收集要删除的文件
deleted_files = []
for file_path in task_dir.glob('*'):
deleted_files.append(str(file_path))
# 删除任务目录
import shutil
shutil.rmtree(task_dir)
return {
'status': 'success',
'message': "优化任务已删除",
'details': {
'task_id': task_id,
'deleted_files': deleted_files
}
}
except Exception as e:
error_msg = f"Error deleting optimization task {task_id}: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg
}