MLPlatform/function/data_manager.py
2025-02-26 10:38:11 +08:00

800 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
import logging
from pathlib import Path
import datetime
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, Normalizer, Binarizer, LabelEncoder, KBinsDiscretizer, FunctionTransformer, PowerTransformer, QuantileTransformer, PolynomialFeatures, OneHotEncoder
from sklearn.feature_extraction import FeatureHasher, DictVectorizer
from sklearn.feature_selection import SelectKBest, RFE
from sklearn.decomposition import PCA
from sklearn.experimental import enable_iterative_imputer # noqa
from sklearn.impute import SimpleImputer, IterativeImputer, KNNImputer, MissingIndicator
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
from sklearn.model_selection import train_test_split
import json
import yaml
import os
import shutil
from fastapi import UploadFile
class DataManager:
"""数据处理类"""
def __init__(self, config: Dict = None):
"""初始化数据处理器"""
self.config = config or {}
self.logger = logging.getLogger(__name__)
self._setup_logging()
self.method_config = self._load_method_config()
self.parameter_config = self._load_parameter_config()
self.dataset_processed_path = 'dataset/dataset_processed'
# 数据预处理方法
self.preprocessing_methods = {
# 缺失值处理
'SimpleImputer': SimpleImputer,
'IterativeImputer': IterativeImputer,
'KNNImputer': KNNImputer,
'MissingIndicator': MissingIndicator,
# 异常值处理
'IsolationForest': IsolationForest,
'OneClassSVM': OneClassSVM,
'LocalOutlierFactor': LocalOutlierFactor,
'EllipticEnvelope': EllipticEnvelope,
# 数据缩放
'StandardScaler': StandardScaler,
'MinMaxScaler': MinMaxScaler,
'RobustScaler': RobustScaler,
'Normalizer': Normalizer,
'Binarizer': Binarizer
}
# 特征工程方法
self.feature_engineering_methods = {
# 特征编码
'LabelEncoder': LabelEncoder,
'OneHotEncoder': OneHotEncoder,
# 特征离散化
'KBinsDiscretizer': KBinsDiscretizer,
# 特征变换
'FunctionTransformer': FunctionTransformer,
'PowerTransformer': PowerTransformer,
'QuantileTransformer': QuantileTransformer,
'PolynomialFeatures': PolynomialFeatures,
# 特征提取
'FeatureHasher': FeatureHasher,
'DictVectorizer': DictVectorizer,
# 特征选择和降维
'PCA': PCA,
'SelectKBest': SelectKBest,
'RFE': RFE
}
self.raw_dataset_dir = "dataset/dataset_raw"
self.processed_dataset_dir = "dataset/dataset_processed"
# 确保目录存在
os.makedirs(self.raw_dataset_dir, exist_ok=True)
os.makedirs(self.processed_dataset_dir, exist_ok=True)
def _load_method_config(self) -> Dict:
"""加载方法配置文件"""
try:
config_path = Path('date_preprocessing/method.yaml')
if not config_path.exists():
raise FileNotFoundError(f"Method config file not found at {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
config_process = yaml.safe_load(f)
self.logger.info("Successfully loaded method config")
config_path = Path('date_feature/method.yaml')
if not config_path.exists():
raise FileNotFoundError(f"Method config file not found at {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
config_feature = yaml.safe_load(f)
self.logger.info("Successfully loaded feature config")
config = {**config_process, **config_feature}
return config
except Exception as e:
self.logger.error(f"Error loading method/feature config: {str(e)}")
raise
def _load_parameter_config(self) -> Dict:
"""加载参数配置文件"""
try:
config_path = Path('date_preprocessing/parameter.yaml')
if not config_path.exists():
raise FileNotFoundError(f"Parameter config file not found at {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
config_process = yaml.safe_load(f)
self.logger.info("Successfully loaded process parameter config")
config_path = Path('date_feature/parameter.yaml')
if not config_path.exists():
raise FileNotFoundError(f"Parameter config file not found at {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
config_feature = yaml.safe_load(f)
self.logger.info("Successfully loaded feature parameter config")
config = {**config_process, **config_feature}
return config
except Exception as e:
self.logger.error(f"Error loading parameter config: {str(e)}")
raise
def _setup_logging(self):
"""设置日志"""
log_dir = Path('.log')
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(
log_dir / f'data_processing_{datetime.datetime.now():%Y%m%d_%H%M%S}.log'
)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
self.logger.addHandler(file_handler)
self.logger.setLevel(logging.INFO)
def process_dataset(
self,
input_path: str,
output_dir: str,
process_methods: List[Dict],
feature_methods: List[Dict],
split_params: Dict
) -> Dict:
"""
处理数据集
Args:
input_path: 输入数据路径
output_dir: 输出目录
cleaning_methods: 数据清洗方法列表每个方法是一个字典包含method_name和params
feature_methods: 特征工程方法列表每个方法是一个字典包含method_name和params
split_params: 数据集划分参数包含test_size和val_size
Returns:
处理结果字典
"""
try:
print("process_methods", process_methods)
# 数据集名
file_name = input_path.split("/")[-1].split(".")[0]
# 生成时间戳
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
# 创建输出目录
output_path = Path(output_dir+'/'+file_name+'_'+timestamp)
output_path.mkdir(parents=True, exist_ok=True)
# 记录处理过程
process_record = {
'input_file': input_path,
'timestamp': datetime.datetime.now().isoformat(),
'process_methods': process_methods,
'feature_methods': feature_methods,
'split_params': split_params,
'steps': []
}
# 读取数据
self.logger.info(f"Loading data from {input_path}")
df = pd.read_csv(input_path)
process_record['steps'].append({
'step': 'load_data',
'shape': df.shape
})
# 数据预处理
for method in process_methods:
print(f"method {method}")
df = self._apply_process_methods(df, method)
print(f"method {method}")
process_record['steps'].append({
'step': 'cleaning',
'method': method['method_name'],
'params': method['params'],
'shape': df.shape
})
# 特征工程
for method in feature_methods:
df = self._apply_feature_method(df, method)
process_record['steps'].append({
'step': 'feature_engineering',
'method': method['method_name'],
'params': method['params'],
'shape': df.shape
})
# 数据集划分
train_data, val_data, test_data = self._split_dataset(
df,
test_size=split_params.get('test_size', 0),
val_size=split_params.get('val_size', 0)
)
# 保存处理后的数据集
train_path = output_path / f'train_{file_name}_{timestamp}.csv'
val_path = output_path / f'val_{file_name}_{timestamp}.csv'
test_path = output_path / f'test_{file_name}_{timestamp}.csv'
if train_data is not None:
train_data.to_csv(train_path, index=False)
if val_data is not None:
val_data.to_csv(val_path, index=False)
if test_data is not None:
test_data.to_csv(test_path, index=False)
# 记录输出文件路径
process_record['output_files'] = {
'train': str(train_path) if train_data is not None else "",
'validation': str(val_path) if val_data is not None else "",
'test': str(test_path) if test_data is not None else ""
}
# 保存处理记录
record_path = output_path / f'process_record__{file_name}_{timestamp}.json'
with open(record_path, 'w', encoding='utf-8') as f:
json.dump(process_record, f, indent=2, ensure_ascii=False)
self.logger.info(f"Data processing completed. Results saved to {output_path}")
return {
'status': 'success',
'message': 'Data processing completed successfully',
'process_record': process_record
}
except Exception as e:
error_msg = f"Error processing dataset: {str(e)} process_methods: {method}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg
}
def _apply_process_methods(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame:
"""应用数据预处理方法"""
try:
method_name = method['method_name']
params = method['params']
print("s数据预处理", method_name, params)
if method_name not in self.preprocessing_methods:
raise ValueError(f"Unknown preprocessing method: {method_name}")
processor = self.preprocessing_methods[method_name](**params)
# 分离特征和标签
features = df.drop('target', axis=1)
target = df['target']
# 根据不同类型的方法进行处理
if method_name in ['IsolationForest', 'OneClassSVM', 'LocalOutlierFactor', 'EllipticEnvelope']:
# 异常值检测方法
mask = processor.fit_predict(features) != -1
features = features[mask]
target = target[mask]
else:
# 其他预处理方法
features = pd.DataFrame(
processor.fit_transform(features),
columns=features.columns,
index=features.index
)
# 重新组合特征和标签
df = pd.concat([features, target], axis=1)
self.logger.info(f"Applied preprocessing method {method_name}")
print("数据处理完毕")
return df
except Exception as e:
self.logger.error(f"Error applying preprocessing method {method_name}: {str(e)}")
raise
def _apply_feature_method(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame:
"""应用特征工程方法"""
try:
method_name = method['method_name']
params = method.get('params', {})
columns = method.get('columns', df.drop('target', axis=1).columns) # 排除target列
if method_name not in self.feature_engineering_methods:
raise ValueError(f"Unknown feature engineering method: {method_name}")
processor = self.feature_engineering_methods[method_name](**params)
# 分离特征和标签
features = df.drop('target', axis=1)
target = df['target']
# 根据不同类型的特征工程方法进行处理
if method_name in ['LabelEncoder', 'OneHotEncoder']:
# 编码方法
df_temp = features[columns].copy()
if method_name == 'LabelEncoder':
for col in columns:
df_temp[col] = processor.fit_transform(df_temp[col])
else: # OneHotEncoder
encoded = processor.fit_transform(df_temp)
if isinstance(encoded, np.ndarray):
encoded = pd.DataFrame(
encoded,
columns=[f"{col}_{i}" for col in columns for i in range(encoded.shape[1]//len(columns))],
index=features.index
)
df_temp = encoded
# 更新特征数据框
features = features.drop(columns=columns)
features = pd.concat([features, df_temp], axis=1)
elif method_name in ['KBinsDiscretizer']:
# 离散化方法
transformed = processor.fit_transform(features[columns])
if processor.encode == 'onehot':
feature_names = [f"{col}_{i}" for col in columns for i in range(processor.n_bins_)]
else:
feature_names = [f"{col}_binned" for col in columns]
df_temp = pd.DataFrame(
transformed,
columns=feature_names,
index=features.index
)
features = features.drop(columns=columns)
features = pd.concat([features, df_temp], axis=1)
elif method_name in ['PCA', 'SelectKBest', 'RFE']:
# 降维和特征选择方法
transformed = processor.fit_transform(features[columns])
n_features = transformed.shape[1]
df_temp = pd.DataFrame(
transformed,
columns=[f"feature_{i}" for i in range(n_features)],
index=features.index
)
features = features.drop(columns=columns)
features = pd.concat([features, df_temp], axis=1)
else:
# 其他特征工程方法
transformed = processor.fit_transform(features[columns])
df_temp = pd.DataFrame(
transformed,
columns=[f"{col}_transformed" for col in columns],
index=features.index
)
features = features.drop(columns=columns)
features = pd.concat([features, df_temp], axis=1)
# 重新组合特征和标签
df = pd.concat([features, target], axis=1)
self.logger.info(f"Applied feature engineering method {method_name}")
return df
except Exception as e:
self.logger.error(f"Error applying feature engineering method {method_name}: {str(e)}")
raise
def _split_dataset(
self,
df: pd.DataFrame,
test_size: float = 0,
val_size: float = 0
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""划分数据集"""
try:
# 首先划分训练集和测试集
if test_size > 0:
train_val_data, test_data = train_test_split(
df,
test_size=test_size,
random_state=42
)
else:
train_val_data = df
test_data = None
if val_size > 0:
# 再划分训练集和验证集
val_size_adjusted = val_size / (1 - test_size)
train_data, val_data = train_test_split(
train_val_data,
test_size=val_size_adjusted,
random_state=42
)
else:
train_data = train_val_data
val_data = None
self.logger.info(
f"Dataset split - Train: {len(train_data) if train_data is not None else 0}, "
f"Val: {len(val_data) if val_data is not None else 0} , Test: {len(test_data) if test_data is not None else 0}"
)
return train_data, val_data, test_data
except Exception as e:
self.logger.error(f"Error splitting dataset: {str(e)}")
raise
def get_preprocessing_methods(self) -> Dict:
"""获取预处理方法列表"""
try:
methods = []
# 数据缩放方法
scaler_methods = list(self.method_config.get('data_scaler_methods', {}).keys())
if scaler_methods:
methods.append({
"name": "data_scaler",
"description": "数据缩放处理",
"method": scaler_methods
})
# 缺失值处理方法
missing_methods = list(self.method_config.get('missing_value_handling_methods', {}).keys())
if missing_methods:
methods.append({
"name": "missing_value_handler",
"description": "缺失值处理",
"method": missing_methods
})
# 异常值检测方法
outlier_methods = list(self.method_config.get('outlier_detection_methods', {}).keys())
if outlier_methods:
methods.append({
"name": "outlier_detector",
"description": "异常值检测",
"method": outlier_methods
})
self.logger.info("获取预处理方法列表")
return {
"status": "success",
"methods": methods
}
except Exception as e:
self.logger.error(f"Error getting preprocessing methods: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def get_preprocessing_method_details(self, method_name: str) -> Dict:
"""获取指定方法的详细信息"""
try:
# 在各个方法类别中查找方法原理和优缺点
method_info = None
for category in ['data_scaler_methods', 'missing_value_handling_methods', 'outlier_detection_methods']:
if method_name in self.method_config.get(category, {}):
method_info = self.method_config[category][method_name]
break
if method_info is None:
raise ValueError(f"Method {method_name} not found in method config")
# 查找方法参数信息
parameter_info = None
for category in ['data_scaler_methods', 'missing_value_handling_methods', 'outlier_detection_methods']:
if method_name in self.parameter_config.get(category, {}):
parameter_info = self.parameter_config[category][method_name]
break
if parameter_info is None:
raise ValueError(f"Method {method_name} not found in parameter config")
self.logger.info(f"获取{method_name}方法详情")
# 组合返回信息
return {
"status": "success",
"method": {
"name": method_name,
"description": parameter_info.get('description', ''),
"principle": method_info.get('principle', ''),
"advantages": method_info.get('advantages', []),
"disadvantages": method_info.get('disadvantages', []),
"applicable_scenarios": method_info.get('applicable_scenarios', []),
"parameters": parameter_info.get('parameters', [])
}
}
except Exception as e:
self.logger.error(f"Error getting method details: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def get_feature_engineering_methods(self) -> Dict:
"""获取特征工程方法列表"""
try:
methods = []
# 获取特征工程方法
feature_engineering_methods = list(self.method_config.get('feature_engineering_methods', {}).keys())
if feature_engineering_methods:
methods.append({
"name": "feature_engineering_methods",
"description": "特征工程方法",
"method": feature_engineering_methods
})
self.logger.info("获取特征工程方法列表")
return {
"status": "success",
"methods": methods
}
except Exception as e:
self.logger.error(f"Error getting preprocessing methods: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def get_feature_engineering_method_details(self, method_name: str) -> Dict:
"""获取指定方法的详细信息"""
try:
# 在各个方法类别中查找方法原理和优缺点
method_info = None
for category in ['feature_engineering_methods']:
if method_name in self.method_config.get(category, {}):
method_info = self.method_config[category][method_name]
break
if method_info is None:
raise ValueError(f"Method {method_name} not found in method config")
# 查找方法参数信息
parameter_info = None
for category in ['feature_engineering_methods']:
if method_name in self.parameter_config.get(category, {}):
parameter_info = self.parameter_config[category][method_name]
break
if parameter_info is None:
raise ValueError(f"Method {method_name} not found in parameter config")
self.logger.info(f'获取{method_name}方法详情')
# 组合返回信息
return {
"status": "success",
"method": {
"name": method_name,
"description": parameter_info.get('description', ''),
"principle": method_info.get('principle', ''),
"advantages": method_info.get('advantages', []),
"disadvantages": method_info.get('disadvantages', []),
"applicable_scenarios": method_info.get('applicable_scenarios', []),
"parameters": parameter_info.get('parameters', [])
}
}
except Exception as e:
self.logger.error(f"Error getting method details: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def _clean_json_line(self,line):
# 替换掉不符合JSON标准的特殊浮点数值
line = line.replace('NaN', 'null')
line = line.replace('Infinity', '1e308') # 或者选择一个合适的替代值
line = line.replace('-Infinity', '-1e308') # 同上
return line
def get_dataset(self):
back = list()
dataset_files_path = os.listdir(self.dataset_processed_path)
for dataset_file in dataset_files_path:
path = os.path.join(self.dataset_processed_path, dataset_file)
# 指定要查看的文件夹路径
folder_path = Path(path)
# 获取文件夹下所有以 .json 结尾的文件
json_files = list(folder_path.glob('*.json'))
for json_file in json_files:
# json是不能处理NaN这类的数值,需要单独处理他们
with open(json_file.as_posix(), 'r', encoding='utf-8') as f:
cleaned_lines = [self._clean_json_line(line) for line in f]
json_data = json.loads(''.join(cleaned_lines))
# json_data = json.load(f ,allow_nan=True)
back.append(json_data)
self.logger.info("获取处理好的数据集")
# print("可用数据集", back)
return back
def read_csv(
self,
data_path: str,
head: int = 5,
tail: int = 5,
info: bool = True,
describe: bool = True
) -> Dict:
"""
读取并展示CSV文件内容
Args:
data_path: CSV文件路径
head: 显示前几行
tail: 显示后几行
info: 是否显示数据集信息
describe: 是否显示数据统计信息
Returns:
数据集信息字典
"""
try:
self.logger.info(f"Reading CSV file: {data_path}")
# 读取CSV文件
df = pd.read_csv(data_path)
result = {
"status": "success",
"data": {}
}
# 获取前几行数据
if head > 0:
result["data"]["head"] = df.head(head).to_dict('records')
# 获取后几行数据
if tail > 0:
result["data"]["tail"] = df.tail(tail).to_dict('records')
# 获取数据集信息
if info:
# 获取每列的缺失值数量
missing_values = df.isnull().sum().to_dict()
# 获取每列的数据类型
column_types = df.dtypes.astype(str).to_dict()
# 计算内存使用
memory_usage = df.memory_usage(deep=True).sum()
if memory_usage < 1024:
memory_str = f"{memory_usage} B"
elif memory_usage < 1024 * 1024:
memory_str = f"{memory_usage/1024:.1f} KB"
else:
memory_str = f"{memory_usage/(1024*1024):.1f} MB"
result["data"]["info"] = {
"rows": len(df),
"columns": len(df.columns),
"column_types": column_types,
"memory_usage": memory_str,
"missing_values": missing_values
}
# 获取数据统计信息
if describe:
# 对数值列进行统计描述
numeric_describe = df.describe().to_dict()
# 对分类列进行统计描述
categorical_columns = df.select_dtypes(include=['object']).columns
categorical_describe = {}
for col in categorical_columns:
categorical_describe[col] = {
"count": df[col].count(),
"unique": df[col].nunique(),
"top": df[col].mode()[0] if not df[col].mode().empty else None,
"freq": df[col].value_counts().iloc[0] if not df[col].value_counts().empty else 0
}
result["data"]["describe"] = {
**numeric_describe,
**categorical_describe
}
self.logger.info(f"Successfully read CSV file: {data_path}")
return result
except Exception as e:
error_msg = f"Error reading CSV file: {str(e)}"
self.logger.error(error_msg)
return {
"status": "error",
"message": "读取CSV文件失败",
"details": {
"error_type": type(e).__name__,
"error_message": str(e)
}
}
async def save_dataset(self, file: UploadFile) -> dict:
"""保存上传的数据集"""
try:
# 生成文件名
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{file.filename}"
file_path = os.path.join(self.raw_dataset_dir, filename)
# 保存文件
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {
"status": "success",
"message": "文件上传成功",
"filename": filename
}
except Exception as e:
return {
"status": "error",
"message": f"文件上传失败: {str(e)}"
}
def get_raw_datasets(self) -> list:
"""获取待处理数据集列表"""
try:
datasets = []
for filename in os.listdir(self.raw_dataset_dir):
if filename.endswith('.csv'):
file_path = os.path.join(self.raw_dataset_dir, filename)
stat = os.stat(file_path)
datasets.append({
"name": filename,
"path": file_path,
"size": stat.st_size,
"created_at": datetime.datetime.fromtimestamp(stat.st_ctime).isoformat()
})
return datasets
except Exception as e:
raise Exception(f"获取待处理数据集列表失败: {str(e)}")