642 lines
25 KiB
Python
642 lines
25 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
from typing import Dict, List, Tuple, Optional
|
||
import logging
|
||
from pathlib import Path
|
||
import datetime
|
||
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, Normalizer, Binarizer, LabelEncoder, KBinsDiscretizer, FunctionTransformer, PowerTransformer, QuantileTransformer, PolynomialFeatures, OneHotEncoder
|
||
from sklearn.feature_extraction import FeatureHasher, DictVectorizer
|
||
from sklearn.feature_selection import SelectKBest, RFE
|
||
from sklearn.decomposition import PCA
|
||
from sklearn.experimental import enable_iterative_imputer # noqa
|
||
from sklearn.impute import SimpleImputer, IterativeImputer, KNNImputer, MissingIndicator
|
||
from sklearn.ensemble import IsolationForest
|
||
from sklearn.svm import OneClassSVM
|
||
from sklearn.neighbors import LocalOutlierFactor
|
||
from sklearn.covariance import EllipticEnvelope
|
||
from sklearn.model_selection import train_test_split
|
||
import json
|
||
import yaml
|
||
import os
|
||
|
||
|
||
class DataManager:
|
||
"""数据处理类"""
|
||
|
||
def __init__(self, config: Dict = None):
|
||
"""初始化数据处理器"""
|
||
self.config = config or {}
|
||
self.logger = logging.getLogger(__name__)
|
||
self._setup_logging()
|
||
|
||
self.method_config = self._load_method_config()
|
||
self.parameter_config = self._load_parameter_config()
|
||
|
||
self.dataset_processed_path = 'dataset/dataset_processed'
|
||
|
||
# 数据预处理方法
|
||
self.preprocessing_methods = {
|
||
# 缺失值处理
|
||
'SimpleImputer': SimpleImputer,
|
||
'IterativeImputer': IterativeImputer,
|
||
'KNNImputer': KNNImputer,
|
||
'MissingIndicator': MissingIndicator,
|
||
# 异常值处理
|
||
'IsolationForest': IsolationForest,
|
||
'OneClassSVM': OneClassSVM,
|
||
'LocalOutlierFactor': LocalOutlierFactor,
|
||
'EllipticEnvelope': EllipticEnvelope,
|
||
# 数据缩放
|
||
'StandardScaler': StandardScaler,
|
||
'MinMaxScaler': MinMaxScaler,
|
||
'RobustScaler': RobustScaler,
|
||
'Normalizer': Normalizer,
|
||
'Binarizer': Binarizer
|
||
}
|
||
|
||
# 特征工程方法
|
||
self.feature_engineering_methods = {
|
||
# 特征编码
|
||
'LabelEncoder': LabelEncoder,
|
||
'OneHotEncoder': OneHotEncoder,
|
||
# 特征离散化
|
||
'KBinsDiscretizer': KBinsDiscretizer,
|
||
# 特征变换
|
||
'FunctionTransformer': FunctionTransformer,
|
||
'PowerTransformer': PowerTransformer,
|
||
'QuantileTransformer': QuantileTransformer,
|
||
'PolynomialFeatures': PolynomialFeatures,
|
||
# 特征提取
|
||
'FeatureHasher': FeatureHasher,
|
||
'DictVectorizer': DictVectorizer,
|
||
# 特征选择和降维
|
||
'PCA': PCA,
|
||
'SelectKBest': SelectKBest,
|
||
'RFE': RFE
|
||
}
|
||
|
||
def _load_method_config(self) -> Dict:
|
||
"""加载方法配置文件"""
|
||
try:
|
||
config_path = Path('date_preprocessing/method.yaml')
|
||
if not config_path.exists():
|
||
raise FileNotFoundError(f"Method config file not found at {config_path}")
|
||
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config_process = yaml.safe_load(f)
|
||
|
||
self.logger.info("Successfully loaded method config")
|
||
|
||
config_path = Path('date_feature/method.yaml')
|
||
if not config_path.exists():
|
||
raise FileNotFoundError(f"Method config file not found at {config_path}")
|
||
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config_feature = yaml.safe_load(f)
|
||
|
||
self.logger.info("Successfully loaded feature config")
|
||
|
||
config = {**config_process, **config_feature}
|
||
|
||
return config
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error loading method/feature config: {str(e)}")
|
||
raise
|
||
|
||
def _load_parameter_config(self) -> Dict:
|
||
"""加载参数配置文件"""
|
||
try:
|
||
config_path = Path('date_preprocessing/parameter.yaml')
|
||
if not config_path.exists():
|
||
raise FileNotFoundError(f"Parameter config file not found at {config_path}")
|
||
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config_process = yaml.safe_load(f)
|
||
|
||
self.logger.info("Successfully loaded process parameter config")
|
||
|
||
config_path = Path('date_feature/parameter.yaml')
|
||
if not config_path.exists():
|
||
raise FileNotFoundError(f"Parameter config file not found at {config_path}")
|
||
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config_feature = yaml.safe_load(f)
|
||
|
||
self.logger.info("Successfully loaded feature parameter config")
|
||
|
||
config = {**config_process, **config_feature}
|
||
|
||
|
||
return config
|
||
except Exception as e:
|
||
self.logger.error(f"Error loading parameter config: {str(e)}")
|
||
raise
|
||
|
||
|
||
|
||
def _setup_logging(self):
|
||
"""设置日志"""
|
||
log_dir = Path('.log')
|
||
log_dir.mkdir(exist_ok=True)
|
||
|
||
file_handler = logging.FileHandler(
|
||
log_dir / f'data_processing_{datetime.datetime.now():%Y%m%d_%H%M%S}.log'
|
||
)
|
||
file_handler.setFormatter(
|
||
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
)
|
||
self.logger.addHandler(file_handler)
|
||
self.logger.setLevel(logging.INFO)
|
||
|
||
def process_dataset(
|
||
self,
|
||
input_path: str,
|
||
output_dir: str,
|
||
process_methods: List[Dict],
|
||
feature_methods: List[Dict],
|
||
split_params: Dict
|
||
) -> Dict:
|
||
"""
|
||
处理数据集
|
||
|
||
Args:
|
||
input_path: 输入数据路径
|
||
output_dir: 输出目录
|
||
cleaning_methods: 数据清洗方法列表,每个方法是一个字典,包含method_name和params
|
||
feature_methods: 特征工程方法列表,每个方法是一个字典,包含method_name和params
|
||
split_params: 数据集划分参数,包含test_size和val_size
|
||
|
||
Returns:
|
||
处理结果字典
|
||
"""
|
||
try:
|
||
|
||
# 数据集名
|
||
file_name = input_path.split("/")[-1].split(".")[0]
|
||
|
||
# 生成时间戳
|
||
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
|
||
# 创建输出目录
|
||
output_path = Path(output_dir+'/'+file_name+'_'+timestamp)
|
||
output_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 记录处理过程
|
||
process_record = {
|
||
'input_file': input_path,
|
||
'timestamp': datetime.datetime.now().isoformat(),
|
||
'process_methods': process_methods,
|
||
'feature_methods': feature_methods,
|
||
'split_params': split_params,
|
||
'steps': []
|
||
}
|
||
|
||
# 读取数据
|
||
self.logger.info(f"Loading data from {input_path}")
|
||
df = pd.read_csv(input_path)
|
||
process_record['steps'].append({
|
||
'step': 'load_data',
|
||
'shape': df.shape
|
||
})
|
||
|
||
|
||
|
||
# 数据预处理
|
||
for method in process_methods:
|
||
df = self._apply_process_methods(df, method)
|
||
process_record['steps'].append({
|
||
'step': 'cleaning',
|
||
'method': method['method_name'],
|
||
'params': method['params'],
|
||
'shape': df.shape
|
||
})
|
||
|
||
# 特征工程
|
||
for method in feature_methods:
|
||
df = self._apply_feature_method(df, method)
|
||
process_record['steps'].append({
|
||
'step': 'feature_engineering',
|
||
'method': method['method_name'],
|
||
'params': method['params'],
|
||
'shape': df.shape
|
||
})
|
||
|
||
# 数据集划分
|
||
train_data, val_data, test_data = self._split_dataset(
|
||
df,
|
||
test_size=split_params.get('test_size', 0),
|
||
val_size=split_params.get('val_size', 0)
|
||
)
|
||
|
||
|
||
|
||
# 保存处理后的数据集
|
||
train_path = output_path / f'train_{file_name}_{timestamp}.csv'
|
||
val_path = output_path / f'val_{file_name}_{timestamp}.csv'
|
||
test_path = output_path / f'test_{file_name}_{timestamp}.csv'
|
||
|
||
if train_data is not None:
|
||
train_data.to_csv(train_path, index=False)
|
||
|
||
if val_data is not None:
|
||
val_data.to_csv(val_path, index=False)
|
||
if test_data is not None:
|
||
test_data.to_csv(test_path, index=False)
|
||
|
||
# 记录输出文件路径
|
||
process_record['output_files'] = {
|
||
'train': str(train_path) if train_data is not None else "",
|
||
'validation': str(val_path) if val_data is not None else "",
|
||
'test': str(test_path) if test_data is not None else ""
|
||
}
|
||
|
||
# 保存处理记录
|
||
record_path = output_path / f'process_record__{file_name}_{timestamp}.json'
|
||
with open(record_path, 'w', encoding='utf-8') as f:
|
||
json.dump(process_record, f, indent=2, ensure_ascii=False)
|
||
|
||
self.logger.info(f"Data processing completed. Results saved to {output_path}")
|
||
|
||
return {
|
||
'status': 'success',
|
||
'message': 'Data processing completed successfully',
|
||
'process_record': process_record
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = f"Error processing dataset: {str(e)}"
|
||
self.logger.error(error_msg)
|
||
return {
|
||
'status': 'error',
|
||
'message': error_msg
|
||
}
|
||
|
||
def _apply_process_methods(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame:
|
||
"""应用数据预处理方法"""
|
||
try:
|
||
method_name = method['method_name']
|
||
params = method['params']
|
||
|
||
if method_name not in self.preprocessing_methods:
|
||
raise ValueError(f"Unknown preprocessing method: {method_name}")
|
||
|
||
processor = self.preprocessing_methods[method_name](**params)
|
||
|
||
# 分离特征和标签
|
||
features = df.drop('target', axis=1)
|
||
target = df['target']
|
||
|
||
# 根据不同类型的方法进行处理
|
||
if method_name in ['IsolationForest', 'OneClassSVM', 'LocalOutlierFactor', 'EllipticEnvelope']:
|
||
# 异常值检测方法
|
||
mask = processor.fit_predict(features) != -1
|
||
features = features[mask]
|
||
target = target[mask]
|
||
else:
|
||
# 其他预处理方法
|
||
features = pd.DataFrame(
|
||
processor.fit_transform(features),
|
||
columns=features.columns,
|
||
index=features.index
|
||
)
|
||
|
||
# 重新组合特征和标签
|
||
df = pd.concat([features, target], axis=1)
|
||
|
||
self.logger.info(f"Applied preprocessing method {method_name}")
|
||
return df
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error applying preprocessing method {method_name}: {str(e)}")
|
||
raise
|
||
|
||
def _apply_feature_method(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame:
|
||
"""应用特征工程方法"""
|
||
try:
|
||
method_name = method['method_name']
|
||
params = method.get('params', {})
|
||
columns = method.get('columns', df.drop('target', axis=1).columns) # 排除target列
|
||
|
||
if method_name not in self.feature_engineering_methods:
|
||
raise ValueError(f"Unknown feature engineering method: {method_name}")
|
||
|
||
processor = self.feature_engineering_methods[method_name](**params)
|
||
|
||
# 分离特征和标签
|
||
features = df.drop('target', axis=1)
|
||
target = df['target']
|
||
|
||
# 根据不同类型的特征工程方法进行处理
|
||
if method_name in ['LabelEncoder', 'OneHotEncoder']:
|
||
# 编码方法
|
||
df_temp = features[columns].copy()
|
||
if method_name == 'LabelEncoder':
|
||
for col in columns:
|
||
df_temp[col] = processor.fit_transform(df_temp[col])
|
||
else: # OneHotEncoder
|
||
encoded = processor.fit_transform(df_temp)
|
||
if isinstance(encoded, np.ndarray):
|
||
encoded = pd.DataFrame(
|
||
encoded,
|
||
columns=[f"{col}_{i}" for col in columns for i in range(encoded.shape[1]//len(columns))],
|
||
index=features.index
|
||
)
|
||
df_temp = encoded
|
||
|
||
# 更新特征数据框
|
||
features = features.drop(columns=columns)
|
||
features = pd.concat([features, df_temp], axis=1)
|
||
|
||
elif method_name in ['KBinsDiscretizer']:
|
||
# 离散化方法
|
||
transformed = processor.fit_transform(features[columns])
|
||
if processor.encode == 'onehot':
|
||
feature_names = [f"{col}_{i}" for col in columns for i in range(processor.n_bins_)]
|
||
else:
|
||
feature_names = [f"{col}_binned" for col in columns]
|
||
|
||
df_temp = pd.DataFrame(
|
||
transformed,
|
||
columns=feature_names,
|
||
index=features.index
|
||
)
|
||
features = features.drop(columns=columns)
|
||
features = pd.concat([features, df_temp], axis=1)
|
||
|
||
elif method_name in ['PCA', 'SelectKBest', 'RFE']:
|
||
# 降维和特征选择方法
|
||
transformed = processor.fit_transform(features[columns])
|
||
n_features = transformed.shape[1]
|
||
df_temp = pd.DataFrame(
|
||
transformed,
|
||
columns=[f"feature_{i}" for i in range(n_features)],
|
||
index=features.index
|
||
)
|
||
features = features.drop(columns=columns)
|
||
features = pd.concat([features, df_temp], axis=1)
|
||
|
||
else:
|
||
# 其他特征工程方法
|
||
transformed = processor.fit_transform(features[columns])
|
||
df_temp = pd.DataFrame(
|
||
transformed,
|
||
columns=[f"{col}_transformed" for col in columns],
|
||
index=features.index
|
||
)
|
||
features = features.drop(columns=columns)
|
||
features = pd.concat([features, df_temp], axis=1)
|
||
|
||
# 重新组合特征和标签
|
||
df = pd.concat([features, target], axis=1)
|
||
|
||
self.logger.info(f"Applied feature engineering method {method_name}")
|
||
return df
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error applying feature engineering method {method_name}: {str(e)}")
|
||
raise
|
||
|
||
def _split_dataset(
|
||
self,
|
||
df: pd.DataFrame,
|
||
test_size: float = 0,
|
||
val_size: float = 0
|
||
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
|
||
"""划分数据集"""
|
||
try:
|
||
# 首先划分训练集和测试集
|
||
if test_size > 0:
|
||
train_val_data, test_data = train_test_split(
|
||
df,
|
||
test_size=test_size,
|
||
random_state=42
|
||
)
|
||
else:
|
||
train_val_data = df
|
||
test_data = None
|
||
if val_size > 0:
|
||
# 再划分训练集和验证集
|
||
val_size_adjusted = val_size / (1 - test_size)
|
||
train_data, val_data = train_test_split(
|
||
train_val_data,
|
||
test_size=val_size_adjusted,
|
||
random_state=42
|
||
)
|
||
else:
|
||
train_data = train_val_data
|
||
val_data = None
|
||
|
||
self.logger.info(
|
||
f"Dataset split - Train: {len(train_data) if train_data is not None else 0}, "
|
||
f"Val: {len(val_data) if val_data is not None else 0} , Test: {len(test_data) if test_data is not None else 0}"
|
||
)
|
||
|
||
return train_data, val_data, test_data
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error splitting dataset: {str(e)}")
|
||
raise
|
||
|
||
def get_preprocessing_methods(self) -> Dict:
|
||
"""获取预处理方法列表"""
|
||
try:
|
||
methods = []
|
||
|
||
# 数据缩放方法
|
||
scaler_methods = list(self.method_config.get('data_scaler_methods', {}).keys())
|
||
if scaler_methods:
|
||
methods.append({
|
||
"name": "data_scaler",
|
||
"description": "数据缩放处理",
|
||
"method": scaler_methods
|
||
})
|
||
|
||
# 缺失值处理方法
|
||
missing_methods = list(self.method_config.get('missing_value_handling_methods', {}).keys())
|
||
if missing_methods:
|
||
methods.append({
|
||
"name": "missing_value_handler",
|
||
"description": "缺失值处理",
|
||
"method": missing_methods
|
||
})
|
||
|
||
# 异常值检测方法
|
||
outlier_methods = list(self.method_config.get('outlier_detection_methods', {}).keys())
|
||
if outlier_methods:
|
||
methods.append({
|
||
"name": "outlier_detector",
|
||
"description": "异常值检测",
|
||
"method": outlier_methods
|
||
})
|
||
|
||
self.logger.info("获取预处理方法列表")
|
||
|
||
return {
|
||
"status": "success",
|
||
"methods": methods
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error getting preprocessing methods: {str(e)}")
|
||
return {
|
||
"status": "error",
|
||
"error": str(e)
|
||
}
|
||
|
||
def get_preprocessing_method_details(self, method_name: str) -> Dict:
|
||
"""获取指定方法的详细信息"""
|
||
try:
|
||
# 在各个方法类别中查找方法原理和优缺点
|
||
method_info = None
|
||
for category in ['data_scaler_methods', 'missing_value_handling_methods', 'outlier_detection_methods']:
|
||
if method_name in self.method_config.get(category, {}):
|
||
method_info = self.method_config[category][method_name]
|
||
break
|
||
|
||
if method_info is None:
|
||
raise ValueError(f"Method {method_name} not found in method config")
|
||
|
||
# 查找方法参数信息
|
||
parameter_info = None
|
||
for category in ['data_scaler_methods', 'missing_value_handling_methods', 'outlier_detection_methods']:
|
||
if method_name in self.parameter_config.get(category, {}):
|
||
parameter_info = self.parameter_config[category][method_name]
|
||
break
|
||
|
||
if parameter_info is None:
|
||
raise ValueError(f"Method {method_name} not found in parameter config")
|
||
|
||
self.logger.info(f"获取{method_name}方法详情")
|
||
|
||
# 组合返回信息
|
||
return {
|
||
"status": "success",
|
||
"method": {
|
||
"name": method_name,
|
||
"description": parameter_info.get('description', ''),
|
||
"principle": method_info.get('principle', ''),
|
||
"advantages": method_info.get('advantages', []),
|
||
"disadvantages": method_info.get('disadvantages', []),
|
||
"applicable_scenarios": method_info.get('applicable_scenarios', []),
|
||
"parameters": parameter_info.get('parameters', [])
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error getting method details: {str(e)}")
|
||
return {
|
||
"status": "error",
|
||
"error": str(e)
|
||
}
|
||
|
||
def get_feature_engineering_methods(self) -> Dict:
|
||
"""获取特征工程方法列表"""
|
||
try:
|
||
methods = []
|
||
|
||
|
||
# 获取特征工程方法
|
||
feature_engineering_methods = list(self.method_config.get('feature_engineering_methods', {}).keys())
|
||
if feature_engineering_methods:
|
||
methods.append({
|
||
"name": "feature_engineering_methods",
|
||
"description": "特征工程方法",
|
||
"method": feature_engineering_methods
|
||
})
|
||
|
||
self.logger.info("获取特征工程方法列表")
|
||
|
||
return {
|
||
"status": "success",
|
||
"methods": methods
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error getting preprocessing methods: {str(e)}")
|
||
return {
|
||
"status": "error",
|
||
"error": str(e)
|
||
}
|
||
|
||
def get_feature_engineering_method_details(self, method_name: str) -> Dict:
|
||
"""获取指定方法的详细信息"""
|
||
try:
|
||
# 在各个方法类别中查找方法原理和优缺点
|
||
method_info = None
|
||
for category in ['feature_engineering_methods']:
|
||
if method_name in self.method_config.get(category, {}):
|
||
method_info = self.method_config[category][method_name]
|
||
break
|
||
|
||
if method_info is None:
|
||
raise ValueError(f"Method {method_name} not found in method config")
|
||
|
||
# 查找方法参数信息
|
||
parameter_info = None
|
||
for category in ['feature_engineering_methods']:
|
||
if method_name in self.parameter_config.get(category, {}):
|
||
parameter_info = self.parameter_config[category][method_name]
|
||
break
|
||
|
||
if parameter_info is None:
|
||
raise ValueError(f"Method {method_name} not found in parameter config")
|
||
|
||
self.logger.info(f'获取{method_name}方法详情')
|
||
|
||
# 组合返回信息
|
||
return {
|
||
"status": "success",
|
||
"method": {
|
||
"name": method_name,
|
||
"description": parameter_info.get('description', ''),
|
||
"principle": method_info.get('principle', ''),
|
||
"advantages": method_info.get('advantages', []),
|
||
"disadvantages": method_info.get('disadvantages', []),
|
||
"applicable_scenarios": method_info.get('applicable_scenarios', []),
|
||
"parameters": parameter_info.get('parameters', [])
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error getting method details: {str(e)}")
|
||
return {
|
||
"status": "error",
|
||
"error": str(e)
|
||
}
|
||
|
||
|
||
def _clean_json_line(self,line):
|
||
# 替换掉不符合JSON标准的特殊浮点数值
|
||
line = line.replace('NaN', 'null')
|
||
line = line.replace('Infinity', '1e308') # 或者选择一个合适的替代值
|
||
line = line.replace('-Infinity', '-1e308') # 同上
|
||
return line
|
||
|
||
def get_dataset(self):
|
||
back = list()
|
||
|
||
dataset_files_path = os.listdir(self.dataset_processed_path)
|
||
|
||
for dataset_file in dataset_files_path:
|
||
path = os.path.join(self.dataset_processed_path, dataset_file)
|
||
# 指定要查看的文件夹路径
|
||
folder_path = Path(path)
|
||
|
||
# 获取文件夹下所有以 .json 结尾的文件
|
||
json_files = list(folder_path.glob('*.json'))
|
||
|
||
for json_file in json_files:
|
||
|
||
|
||
# json是不能处理NaN这类的数值,需要单独处理他们
|
||
with open(json_file.as_posix(), 'r', encoding='utf-8') as f:
|
||
cleaned_lines = [self._clean_json_line(line) for line in f]
|
||
json_data = json.loads(''.join(cleaned_lines))
|
||
# json_data = json.load(f ,allow_nan=True)
|
||
|
||
back.append(json_data)
|
||
self.logger.info("获取处理好的数据集")
|
||
|
||
# print("可用数据集", back)
|
||
return back |