MLPlatform/function_old/data_processor_date.py

371 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
import logging
from pathlib import Path
import datetime
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, Normalizer, Binarizer, LabelEncoder, KBinsDiscretizer, FunctionTransformer, PowerTransformer, QuantileTransformer, PolynomialFeatures, OneHotEncoder
from sklearn.feature_extraction import FeatureHasher, DictVectorizer
from sklearn.feature_selection import SelectKBest, RFE
from sklearn.decomposition import PCA
from sklearn.experimental import enable_iterative_imputer # noqa
from sklearn.impute import SimpleImputer, IterativeImputer, KNNImputer, MissingIndicator
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
from sklearn.model_selection import train_test_split
import json
class DataProcessor:
"""数据处理类"""
def __init__(self, config: Dict = None):
"""初始化数据处理器"""
self.config = config or {}
self.logger = logging.getLogger(__name__)
self._setup_logging()
# 数据预处理方法
self.preprocessing_methods = {
# 缺失值处理
'SimpleImputer': SimpleImputer,
'IterativeImputer': IterativeImputer,
'KNNImputer': KNNImputer,
'MissingIndicator': MissingIndicator,
# 异常值处理
'IsolationForest': IsolationForest,
'OneClassSVM': OneClassSVM,
'LocalOutlierFactor': LocalOutlierFactor,
'EllipticEnvelope': EllipticEnvelope,
# 数据缩放
'StandardScaler': StandardScaler,
'MinMaxScaler': MinMaxScaler,
'RobustScaler': RobustScaler,
'Normalizer': Normalizer,
'Binarizer': Binarizer
}
# 特征工程方法
self.feature_engineering_methods = {
# 特征编码
'LabelEncoder': LabelEncoder,
'OneHotEncoder': OneHotEncoder,
# 特征离散化
'KBinsDiscretizer': KBinsDiscretizer,
# 特征变换
'FunctionTransformer': FunctionTransformer,
'PowerTransformer': PowerTransformer,
'QuantileTransformer': QuantileTransformer,
'PolynomialFeatures': PolynomialFeatures,
# 特征提取
'FeatureHasher': FeatureHasher,
'DictVectorizer': DictVectorizer,
# 特征选择和降维
'PCA': PCA,
'SelectKBest': SelectKBest,
'RFE': RFE
}
def _setup_logging(self):
"""设置日志"""
log_dir = Path('.log')
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(
log_dir / f'data_processing_{datetime.datetime.now():%Y%m%d_%H%M%S}.log'
)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
self.logger.addHandler(file_handler)
self.logger.setLevel(logging.INFO)
def process_dataset(
self,
input_path: str,
output_dir: str,
process_methods: List[Dict],
feature_methods: List[Dict],
split_params: Dict
) -> Dict:
"""
处理数据集
Args:
input_path: 输入数据路径
output_dir: 输出目录
cleaning_methods: 数据清洗方法列表每个方法是一个字典包含method_name和params
feature_methods: 特征工程方法列表每个方法是一个字典包含method_name和params
split_params: 数据集划分参数包含test_size和val_size
Returns:
处理结果字典
"""
# try:
# 数据集名
file_name = input_path.split("/")[-1].split(".")[0]
# 生成时间戳
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
# 创建输出目录
output_path = Path(output_dir+'/'+file_name+'_'+timestamp)
output_path.mkdir(parents=True, exist_ok=True)
# 记录处理过程
process_record = {
'input_file': input_path,
'timestamp': datetime.datetime.now().isoformat(),
'process_methods': process_methods,
'feature_methods': feature_methods,
'split_params': split_params,
'steps': []
}
# 读取数据
self.logger.info(f"Loading data from {input_path}")
df = pd.read_csv(input_path)
process_record['steps'].append({
'step': 'load_data',
'shape': df.shape
})
# 数据预处理
for method in process_methods:
df = self._apply_process_methods(df, method)
process_record['steps'].append({
'step': 'cleaning',
'method': method['method_name'],
'params': method['params'],
'shape': df.shape
})
# 特征工程
for method in feature_methods:
df = self._apply_feature_method(df, method)
process_record['steps'].append({
'step': 'feature_engineering',
'method': method['method_name'],
'params': method['params'],
'shape': df.shape
})
# 数据集划分
train_data, val_data, test_data = self._split_dataset(
df,
test_size=split_params.get('test_size', 0),
val_size=split_params.get('val_size', 0)
)
# 保存处理后的数据集
train_path = output_path / f'train_{file_name}_{timestamp}.csv'
val_path = output_path / f'val_{file_name}_{timestamp}.csv'
test_path = output_path / f'test_{file_name}_{timestamp}.csv'
if train_data is not None:
train_data.to_csv(train_path, index=False)
if val_data is not None:
val_data.to_csv(val_path, index=False)
if test_data is not None:
test_data.to_csv(test_path, index=False)
# 记录输出文件路径
process_record['output_files'] = {
'train': str(train_path) if train_data is not None else "",
'validation': str(val_path) if val_data is not None else "",
'test': str(test_path) if test_data is not None else ""
}
# 保存处理记录
record_path = output_path / f'process_record__{file_name}_{timestamp}.json'
with open(record_path, 'w', encoding='utf-8') as f:
json.dump(process_record, f, indent=2, ensure_ascii=False)
self.logger.info(f"Data processing completed. Results saved to {output_path}")
return {
'status': 'success',
'message': 'Data processing completed successfully',
'process_record': process_record
}
# except Exception as e:
# error_msg = f"Error processing dataset: {str(e)}"
# self.logger.error(error_msg)
# return {
# 'status': 'error',
# 'message': error_msg
# }
def _apply_process_methods(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame:
"""应用数据预处理方法"""
try:
method_name = method['method_name']
params = method['params']
if method_name not in self.preprocessing_methods:
raise ValueError(f"Unknown preprocessing method: {method_name}")
processor = self.preprocessing_methods[method_name](**params)
# 分离特征和标签
features = df.drop('target', axis=1)
target = df['target']
# 根据不同类型的方法进行处理
if method_name in ['IsolationForest', 'OneClassSVM', 'LocalOutlierFactor', 'EllipticEnvelope']:
# 异常值检测方法
mask = processor.fit_predict(features) != -1
features = features[mask]
target = target[mask]
else:
# 其他预处理方法
features = pd.DataFrame(
processor.fit_transform(features),
columns=features.columns,
index=features.index
)
# 重新组合特征和标签
df = pd.concat([features, target], axis=1)
self.logger.info(f"Applied preprocessing method {method_name}")
return df
except Exception as e:
self.logger.error(f"Error applying preprocessing method {method_name}: {str(e)}")
raise
def _apply_feature_method(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame:
"""应用特征工程方法"""
try:
method_name = method['method_name']
params = method.get('params', {})
columns = method.get('columns', df.drop('target', axis=1).columns) # 排除target列
if method_name not in self.feature_engineering_methods:
raise ValueError(f"Unknown feature engineering method: {method_name}")
processor = self.feature_engineering_methods[method_name](**params)
# 分离特征和标签
features = df.drop('target', axis=1)
target = df['target']
# 根据不同类型的特征工程方法进行处理
if method_name in ['LabelEncoder', 'OneHotEncoder']:
# 编码方法
df_temp = features[columns].copy()
if method_name == 'LabelEncoder':
for col in columns:
df_temp[col] = processor.fit_transform(df_temp[col])
else: # OneHotEncoder
encoded = processor.fit_transform(df_temp)
if isinstance(encoded, np.ndarray):
encoded = pd.DataFrame(
encoded,
columns=[f"{col}_{i}" for col in columns for i in range(encoded.shape[1]//len(columns))],
index=features.index
)
df_temp = encoded
# 更新特征数据框
features = features.drop(columns=columns)
features = pd.concat([features, df_temp], axis=1)
elif method_name in ['KBinsDiscretizer']:
# 离散化方法
transformed = processor.fit_transform(features[columns])
if processor.encode == 'onehot':
feature_names = [f"{col}_{i}" for col in columns for i in range(processor.n_bins_)]
else:
feature_names = [f"{col}_binned" for col in columns]
df_temp = pd.DataFrame(
transformed,
columns=feature_names,
index=features.index
)
features = features.drop(columns=columns)
features = pd.concat([features, df_temp], axis=1)
elif method_name in ['PCA', 'SelectKBest', 'RFE']:
# 降维和特征选择方法
transformed = processor.fit_transform(features[columns])
n_features = transformed.shape[1]
df_temp = pd.DataFrame(
transformed,
columns=[f"feature_{i}" for i in range(n_features)],
index=features.index
)
features = features.drop(columns=columns)
features = pd.concat([features, df_temp], axis=1)
else:
# 其他特征工程方法
transformed = processor.fit_transform(features[columns])
df_temp = pd.DataFrame(
transformed,
columns=[f"{col}_transformed" for col in columns],
index=features.index
)
features = features.drop(columns=columns)
features = pd.concat([features, df_temp], axis=1)
# 重新组合特征和标签
df = pd.concat([features, target], axis=1)
self.logger.info(f"Applied feature engineering method {method_name}")
return df
except Exception as e:
self.logger.error(f"Error applying feature engineering method {method_name}: {str(e)}")
raise
def _split_dataset(
self,
df: pd.DataFrame,
test_size: float = 0,
val_size: float = 0
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""划分数据集"""
try:
# 首先划分训练集和测试集
if test_size > 0:
train_val_data, test_data = train_test_split(
df,
test_size=test_size,
random_state=42
)
else:
train_val_data = df
test_data = None
if val_size > 0:
# 再划分训练集和验证集
val_size_adjusted = val_size / (1 - test_size)
train_data, val_data = train_test_split(
train_val_data,
test_size=val_size_adjusted,
random_state=42
)
else:
train_data = train_val_data
val_data = None
self.logger.info(
f"Dataset split - Train: {len(train_data) if train_data is not None else 0}, "
f"Val: {len(val_data) if val_data is not None else 0} , Test: {len(test_data) if test_data is not None else 0}"
)
return train_data, val_data, test_data
except Exception as e:
self.logger.error(f"Error splitting dataset: {str(e)}")
raise