import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional import logging from pathlib import Path import datetime from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, Normalizer, Binarizer, LabelEncoder, KBinsDiscretizer, FunctionTransformer, PowerTransformer, QuantileTransformer, PolynomialFeatures, OneHotEncoder from sklearn.feature_extraction import FeatureHasher, DictVectorizer from sklearn.feature_selection import SelectKBest, RFE from sklearn.decomposition import PCA from sklearn.experimental import enable_iterative_imputer # noqa from sklearn.impute import SimpleImputer, IterativeImputer, KNNImputer, MissingIndicator from sklearn.ensemble import IsolationForest from sklearn.svm import OneClassSVM from sklearn.neighbors import LocalOutlierFactor from sklearn.covariance import EllipticEnvelope from sklearn.model_selection import train_test_split import json class DataProcessor: """数据处理类""" def __init__(self, config: Dict = None): """初始化数据处理器""" self.config = config or {} self.logger = logging.getLogger(__name__) self._setup_logging() # 数据预处理方法 self.preprocessing_methods = { # 缺失值处理 'SimpleImputer': SimpleImputer, 'IterativeImputer': IterativeImputer, 'KNNImputer': KNNImputer, 'MissingIndicator': MissingIndicator, # 异常值处理 'IsolationForest': IsolationForest, 'OneClassSVM': OneClassSVM, 'LocalOutlierFactor': LocalOutlierFactor, 'EllipticEnvelope': EllipticEnvelope, # 数据缩放 'StandardScaler': StandardScaler, 'MinMaxScaler': MinMaxScaler, 'RobustScaler': RobustScaler, 'Normalizer': Normalizer, 'Binarizer': Binarizer } # 特征工程方法 self.feature_engineering_methods = { # 特征编码 'LabelEncoder': LabelEncoder, 'OneHotEncoder': OneHotEncoder, # 特征离散化 'KBinsDiscretizer': KBinsDiscretizer, # 特征变换 'FunctionTransformer': FunctionTransformer, 'PowerTransformer': PowerTransformer, 'QuantileTransformer': QuantileTransformer, 'PolynomialFeatures': PolynomialFeatures, # 特征提取 'FeatureHasher': FeatureHasher, 'DictVectorizer': DictVectorizer, # 特征选择和降维 'PCA': PCA, 'SelectKBest': SelectKBest, 'RFE': RFE } def _setup_logging(self): """设置日志""" log_dir = Path('.log') log_dir.mkdir(exist_ok=True) file_handler = logging.FileHandler( log_dir / f'data_processing_{datetime.datetime.now():%Y%m%d_%H%M%S}.log' ) file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) self.logger.addHandler(file_handler) self.logger.setLevel(logging.INFO) def process_dataset( self, input_path: str, output_dir: str, process_methods: List[Dict], feature_methods: List[Dict], split_params: Dict ) -> Dict: """ 处理数据集 Args: input_path: 输入数据路径 output_dir: 输出目录 cleaning_methods: 数据清洗方法列表,每个方法是一个字典,包含method_name和params feature_methods: 特征工程方法列表,每个方法是一个字典,包含method_name和params split_params: 数据集划分参数,包含test_size和val_size Returns: 处理结果字典 """ # try: # 数据集名 file_name = input_path.split("/")[-1].split(".")[0] # 生成时间戳 timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') # 创建输出目录 output_path = Path(output_dir+'/'+file_name+'_'+timestamp) output_path.mkdir(parents=True, exist_ok=True) # 记录处理过程 process_record = { 'input_file': input_path, 'timestamp': datetime.datetime.now().isoformat(), 'process_methods': process_methods, 'feature_methods': feature_methods, 'split_params': split_params, 'steps': [] } # 读取数据 self.logger.info(f"Loading data from {input_path}") df = pd.read_csv(input_path) process_record['steps'].append({ 'step': 'load_data', 'shape': df.shape }) # 数据预处理 for method in process_methods: df = self._apply_process_methods(df, method) process_record['steps'].append({ 'step': 'cleaning', 'method': method['method_name'], 'params': method['params'], 'shape': df.shape }) # 特征工程 for method in feature_methods: df = self._apply_feature_method(df, method) process_record['steps'].append({ 'step': 'feature_engineering', 'method': method['method_name'], 'params': method['params'], 'shape': df.shape }) # 数据集划分 train_data, val_data, test_data = self._split_dataset( df, test_size=split_params.get('test_size', 0), val_size=split_params.get('val_size', 0) ) # 保存处理后的数据集 train_path = output_path / f'train_{file_name}_{timestamp}.csv' val_path = output_path / f'val_{file_name}_{timestamp}.csv' test_path = output_path / f'test_{file_name}_{timestamp}.csv' if train_data is not None: train_data.to_csv(train_path, index=False) if val_data is not None: val_data.to_csv(val_path, index=False) if test_data is not None: test_data.to_csv(test_path, index=False) # 记录输出文件路径 process_record['output_files'] = { 'train': str(train_path) if train_data is not None else "", 'validation': str(val_path) if val_data is not None else "", 'test': str(test_path) if test_data is not None else "" } # 保存处理记录 record_path = output_path / f'process_record__{file_name}_{timestamp}.json' with open(record_path, 'w', encoding='utf-8') as f: json.dump(process_record, f, indent=2, ensure_ascii=False) self.logger.info(f"Data processing completed. Results saved to {output_path}") return { 'status': 'success', 'message': 'Data processing completed successfully', 'process_record': process_record } # except Exception as e: # error_msg = f"Error processing dataset: {str(e)}" # self.logger.error(error_msg) # return { # 'status': 'error', # 'message': error_msg # } def _apply_process_methods(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame: """应用数据预处理方法""" try: method_name = method['method_name'] params = method['params'] if method_name not in self.preprocessing_methods: raise ValueError(f"Unknown preprocessing method: {method_name}") processor = self.preprocessing_methods[method_name](**params) # 分离特征和标签 features = df.drop('target', axis=1) target = df['target'] # 根据不同类型的方法进行处理 if method_name in ['IsolationForest', 'OneClassSVM', 'LocalOutlierFactor', 'EllipticEnvelope']: # 异常值检测方法 mask = processor.fit_predict(features) != -1 features = features[mask] target = target[mask] else: # 其他预处理方法 features = pd.DataFrame( processor.fit_transform(features), columns=features.columns, index=features.index ) # 重新组合特征和标签 df = pd.concat([features, target], axis=1) self.logger.info(f"Applied preprocessing method {method_name}") return df except Exception as e: self.logger.error(f"Error applying preprocessing method {method_name}: {str(e)}") raise def _apply_feature_method(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame: """应用特征工程方法""" try: method_name = method['method_name'] params = method.get('params', {}) columns = method.get('columns', df.drop('target', axis=1).columns) # 排除target列 if method_name not in self.feature_engineering_methods: raise ValueError(f"Unknown feature engineering method: {method_name}") processor = self.feature_engineering_methods[method_name](**params) # 分离特征和标签 features = df.drop('target', axis=1) target = df['target'] # 根据不同类型的特征工程方法进行处理 if method_name in ['LabelEncoder', 'OneHotEncoder']: # 编码方法 df_temp = features[columns].copy() if method_name == 'LabelEncoder': for col in columns: df_temp[col] = processor.fit_transform(df_temp[col]) else: # OneHotEncoder encoded = processor.fit_transform(df_temp) if isinstance(encoded, np.ndarray): encoded = pd.DataFrame( encoded, columns=[f"{col}_{i}" for col in columns for i in range(encoded.shape[1]//len(columns))], index=features.index ) df_temp = encoded # 更新特征数据框 features = features.drop(columns=columns) features = pd.concat([features, df_temp], axis=1) elif method_name in ['KBinsDiscretizer']: # 离散化方法 transformed = processor.fit_transform(features[columns]) if processor.encode == 'onehot': feature_names = [f"{col}_{i}" for col in columns for i in range(processor.n_bins_)] else: feature_names = [f"{col}_binned" for col in columns] df_temp = pd.DataFrame( transformed, columns=feature_names, index=features.index ) features = features.drop(columns=columns) features = pd.concat([features, df_temp], axis=1) elif method_name in ['PCA', 'SelectKBest', 'RFE']: # 降维和特征选择方法 transformed = processor.fit_transform(features[columns]) n_features = transformed.shape[1] df_temp = pd.DataFrame( transformed, columns=[f"feature_{i}" for i in range(n_features)], index=features.index ) features = features.drop(columns=columns) features = pd.concat([features, df_temp], axis=1) else: # 其他特征工程方法 transformed = processor.fit_transform(features[columns]) df_temp = pd.DataFrame( transformed, columns=[f"{col}_transformed" for col in columns], index=features.index ) features = features.drop(columns=columns) features = pd.concat([features, df_temp], axis=1) # 重新组合特征和标签 df = pd.concat([features, target], axis=1) self.logger.info(f"Applied feature engineering method {method_name}") return df except Exception as e: self.logger.error(f"Error applying feature engineering method {method_name}: {str(e)}") raise def _split_dataset( self, df: pd.DataFrame, test_size: float = 0, val_size: float = 0 ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """划分数据集""" try: # 首先划分训练集和测试集 if test_size > 0: train_val_data, test_data = train_test_split( df, test_size=test_size, random_state=42 ) else: train_val_data = df test_data = None if val_size > 0: # 再划分训练集和验证集 val_size_adjusted = val_size / (1 - test_size) train_data, val_data = train_test_split( train_val_data, test_size=val_size_adjusted, random_state=42 ) else: train_data = train_val_data val_data = None self.logger.info( f"Dataset split - Train: {len(train_data) if train_data is not None else 0}, " f"Val: {len(val_data) if val_data is not None else 0} , Test: {len(test_data) if test_data is not None else 0}" ) return train_data, val_data, test_data except Exception as e: self.logger.error(f"Error splitting dataset: {str(e)}") raise