diff --git a/function/__pycache__/data_processor_date.cpython-39.pyc b/function/__pycache__/data_processor_date.cpython-39.pyc index d15f8e0..5fc792b 100644 Binary files a/function/__pycache__/data_processor_date.cpython-39.pyc and b/function/__pycache__/data_processor_date.cpython-39.pyc differ diff --git a/function/data_processor_date.py b/function/data_processor_date.py index d264cf3..39a0b5d 100644 --- a/function/data_processor_date.py +++ b/function/data_processor_date.py @@ -102,103 +102,107 @@ class DataProcessor: Returns: 处理结果字典 """ - try: + # try: - # 数据集名 - file_name = input_path.split("/")[-1].split(".")[0] + # 数据集名 + file_name = input_path.split("/")[-1].split(".")[0] - # 生成时间戳 - timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + # 生成时间戳 + timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') - # 创建输出目录 - output_path = Path(output_dir+'/'+file_name+'_'+timestamp) - output_path.mkdir(parents=True, exist_ok=True) - - # 记录处理过程 - process_record = { - 'input_file': input_path, - 'timestamp': datetime.datetime.now().isoformat(), - 'process_methods': process_methods, - 'feature_methods': feature_methods, - 'split_params': split_params, - 'steps': [] - } - - # 读取数据 - self.logger.info(f"Loading data from {input_path}") - df = pd.read_csv(input_path) + # 创建输出目录 + output_path = Path(output_dir+'/'+file_name+'_'+timestamp) + output_path.mkdir(parents=True, exist_ok=True) + + # 记录处理过程 + process_record = { + 'input_file': input_path, + 'timestamp': datetime.datetime.now().isoformat(), + 'process_methods': process_methods, + 'feature_methods': feature_methods, + 'split_params': split_params, + 'steps': [] + } + + # 读取数据 + self.logger.info(f"Loading data from {input_path}") + df = pd.read_csv(input_path) + process_record['steps'].append({ + 'step': 'load_data', + 'shape': df.shape + }) + + + + # 数据预处理 + for method in process_methods: + df = self._apply_process_methods(df, method) process_record['steps'].append({ - 'step': 'load_data', + 'step': 'cleaning', + 'method': method['method_name'], + 'params': method['params'], 'shape': df.shape }) + + # 特征工程 + for method in feature_methods: + df = self._apply_feature_method(df, method) + process_record['steps'].append({ + 'step': 'feature_engineering', + 'method': method['method_name'], + 'params': method['params'], + 'shape': df.shape + }) + + # 数据集划分 + train_data, val_data, test_data = self._split_dataset( + df, + test_size=split_params.get('test_size', 0), + val_size=split_params.get('val_size', 0) + ) + + + + # 保存处理后的数据集 + train_path = output_path / f'train_{file_name}_{timestamp}.csv' + val_path = output_path / f'val_{file_name}_{timestamp}.csv' + test_path = output_path / f'test_{file_name}_{timestamp}.csv' + + if train_data is not None: + train_data.to_csv(train_path, index=False) + if val_data is not None: + val_data.to_csv(val_path, index=False) + if test_data is not None: + test_data.to_csv(test_path, index=False) + + # 记录输出文件路径 + process_record['output_files'] = { + 'train': str(train_path) if train_data is not None else "", + 'validation': str(val_path) if val_data is not None else "", + 'test': str(test_path) if test_data is not None else "" + } + + # 保存处理记录 + record_path = output_path / f'process_record__{file_name}_{timestamp}.json' + with open(record_path, 'w', encoding='utf-8') as f: + json.dump(process_record, f, indent=2, ensure_ascii=False) - - # 数据预处理 - for method in process_methods: - df = self._apply_process_methods(df, method) - process_record['steps'].append({ - 'step': 'cleaning', - 'method': method['method_name'], - 'params': method['params'], - 'shape': df.shape - }) - - # 特征工程 - for method in feature_methods: - df = self._apply_feature_method(df, method) - process_record['steps'].append({ - 'step': 'feature_engineering', - 'method': method['method_name'], - 'params': method['params'], - 'shape': df.shape - }) - - # 数据集划分 - train_data, val_data, test_data = self._split_dataset( - df, - test_size=split_params.get('test_size', 0.2), - val_size=split_params.get('val_size', 0.2) - ) - - - - # 保存处理后的数据集 - train_path = output_path / f'train_{file_name}_{timestamp}.csv' - val_path = output_path / f'val_{file_name}_{timestamp}.csv' - test_path = output_path / f'test_{file_name}_{timestamp}.csv' - - train_data.to_csv(train_path, index=False) - val_data.to_csv(val_path, index=False) - test_data.to_csv(test_path, index=False) - - # 记录输出文件路径 - process_record['output_files'] = { - 'train': str(train_path), - 'validation': str(val_path), - 'test': str(test_path) - } - - # 保存处理记录 - record_path = output_path / f'process_record__{file_name}_{timestamp}.json' - with open(record_path, 'w', encoding='utf-8') as f: - json.dump(process_record, f, indent=2, ensure_ascii=False) - - self.logger.info(f"Data processing completed. Results saved to {output_path}") - - return { - 'status': 'success', - 'message': 'Data processing completed successfully', - 'process_record': process_record - } - - except Exception as e: - error_msg = f"Error processing dataset: {str(e)}" - self.logger.error(error_msg) - return { - 'status': 'error', - 'message': error_msg - } + self.logger.info(f"Data processing completed. Results saved to {output_path}") + + return { + 'status': 'success', + 'message': 'Data processing completed successfully', + 'process_record': process_record + } + + # except Exception as e: + # error_msg = f"Error processing dataset: {str(e)}" + # self.logger.error(error_msg) + # return { + # 'status': 'error', + # 'message': error_msg + # } def _apply_process_methods(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame: """应用数据预处理方法""" @@ -328,29 +332,36 @@ class DataProcessor: def _split_dataset( self, df: pd.DataFrame, - test_size: float = 0.2, - val_size: float = 0.2 + test_size: float = 0, + val_size: float = 0 ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """划分数据集""" try: # 首先划分训练集和测试集 - train_val_data, test_data = train_test_split( - df, - test_size=test_size, - random_state=42 - ) - + if test_size > 0: + train_val_data, test_data = train_test_split( + df, + test_size=test_size, + random_state=42 + ) + else: + train_val_data = df + test_data = None + if val_size > 0: # 再划分训练集和验证集 - val_size_adjusted = val_size / (1 - test_size) - train_data, val_data = train_test_split( - train_val_data, - test_size=val_size_adjusted, - random_state=42 - ) + val_size_adjusted = val_size / (1 - test_size) + train_data, val_data = train_test_split( + train_val_data, + test_size=val_size_adjusted, + random_state=42 + ) + else: + train_data = train_val_data + val_data = None self.logger.info( - f"Dataset split - Train: {len(train_data)}, " - f"Val: {len(val_data)}, Test: {len(test_data)}" + f"Dataset split - Train: {len(train_data) if train_data is not None else 0}, " + f"Val: {len(val_data) if val_data is not None else 0} , Test: {len(test_data) if test_data is not None else 0}" ) return train_data, val_data, test_data diff --git a/tests/test_data_processor.py b/tests/test_data_processor.py deleted file mode 100644 index cdde864..0000000 --- a/tests/test_data_processor.py +++ /dev/null @@ -1,93 +0,0 @@ -import unittest -import pandas as pd -import numpy as np -from pathlib import Path -from function.data_processor_date import DataProcessor - -class TestDataProcessor(unittest.TestCase): - def setUp(self): - self.processor = DataProcessor() - - # 创建测试数据 - self.test_data = pd.DataFrame({ - 'feature1': [1, 2, np.nan, 4, 5], - 'feature2': [10, 20, 30, 40, 50], - 'target': [0, 1, 0, 1, 0] - }) - - # 保存测试数据 - self.input_path = 'dataset/dataset_raw/test_data.csv' - Path(self.input_path).parent.mkdir(parents=True, exist_ok=True) - self.test_data.to_csv(self.input_path, index=False) - - # 设置输出目录 - self.output_dir = 'dataset/dataset_processed' - - def test_process_dataset(self): - # 定义处理方法 - cleaning_methods = [ - { - 'method_name': 'SimpleImputer', - 'params': {'strategy': 'mean'} - } - ] - - feature_methods = [ - { - 'method_name': 'StandardScaler', - 'params': {} - } - ] - - split_params = { - 'test_size': 0.2, - 'val_size': 0.2 - } - - # 处理数据集 - result = self.processor.process_dataset( - self.input_path, - self.output_dir, - cleaning_methods, - feature_methods, - split_params - ) - - # 验证结果 - self.assertEqual(result['status'], 'success') - self.assertIn('process_record', result) - - # 验证输出文件 - record = result['process_record'] - self.assertTrue(Path(record['output_files']['train']).exists()) - self.assertTrue(Path(record['output_files']['validation']).exists()) - self.assertTrue(Path(record['output_files']['test']).exists()) - - def test_invalid_method(self): - # 测试无效的方法名 - cleaning_methods = [ - { - 'method_name': 'InvalidMethod', - 'params': {} - } - ] - - result = self.processor.process_dataset( - self.input_path, - self.output_dir, - cleaning_methods, - [], - {'test_size': 0.2, 'val_size': 0.2} - ) - - self.assertEqual(result['status'], 'error') - - def tearDown(self): - # 清理测试文件 - try: - Path(self.input_path).unlink() - except: - pass - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/test_method_reader.py b/tests/test_method_reader.py deleted file mode 100644 index 5cb3f51..0000000 --- a/tests/test_method_reader.py +++ /dev/null @@ -1,49 +0,0 @@ -import unittest -from data_process.method_reader_date_process import MethodReader - -class TestMethodReader(unittest.TestCase): - def setUp(self): - self.reader = MethodReader() - - def test_get_preprocessing_methods(self): - result = self.reader.get_preprocessing_methods() - self.assertEqual(result['status'], 'success') - self.assertIsInstance(result['methods'], list) - - # 检查返回的方法列表 - methods = result['methods'] - self.assertTrue(any(m['name'] == 'data_scaler' for m in methods)) - self.assertTrue(any(m['name'] == 'missing_value_handler' for m in methods)) - self.assertTrue(any(m['name'] == 'outlier_detector' for m in methods)) - - def test_get_method_details(self): - # 测试获取StandardScaler的详细信息 - result = self.reader.get_method_details('StandardScaler') - self.assertEqual(result['status'], 'success') - self.assertEqual(result['method']['name'], 'StandardScaler') - - # 检查返回的详细信息字段 - method = result['method'] - self.assertIn('description', method) - self.assertIn('principle', method) - self.assertIn('advantages', method) - self.assertIn('disadvantages', method) - self.assertIn('applicable_scenarios', method) - self.assertIn('parameters', method) - - # 检查参数信息 - parameters = method['parameters'] - self.assertIsInstance(parameters, list) - if parameters: - param = parameters[0] - self.assertIn('name', param) - self.assertIn('type', param) - self.assertIn('default', param) - self.assertIn('description', param) - - # 测试获取不存在的方法 - result = self.reader.get_method_details('NonExistentMethod') - self.assertEqual(result['status'], 'error') - -if __name__ == '__main__': - unittest.main() \ No newline at end of file