修改--增加了划分数据集时test/val为0时的判断

This commit is contained in:
haotian 2025-02-19 15:08:59 +08:00
parent 3b5dbcb62c
commit f19a56f90b
4 changed files with 116 additions and 247 deletions

View File

@ -102,103 +102,107 @@ class DataProcessor:
Returns:
处理结果字典
"""
try:
# try:
# 数据集名
file_name = input_path.split("/")[-1].split(".")[0]
# 数据集名
file_name = input_path.split("/")[-1].split(".")[0]
# 生成时间戳
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
# 生成时间戳
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
# 创建输出目录
output_path = Path(output_dir+'/'+file_name+'_'+timestamp)
output_path.mkdir(parents=True, exist_ok=True)
# 记录处理过程
process_record = {
'input_file': input_path,
'timestamp': datetime.datetime.now().isoformat(),
'process_methods': process_methods,
'feature_methods': feature_methods,
'split_params': split_params,
'steps': []
}
# 读取数据
self.logger.info(f"Loading data from {input_path}")
df = pd.read_csv(input_path)
# 创建输出目录
output_path = Path(output_dir+'/'+file_name+'_'+timestamp)
output_path.mkdir(parents=True, exist_ok=True)
# 记录处理过程
process_record = {
'input_file': input_path,
'timestamp': datetime.datetime.now().isoformat(),
'process_methods': process_methods,
'feature_methods': feature_methods,
'split_params': split_params,
'steps': []
}
# 读取数据
self.logger.info(f"Loading data from {input_path}")
df = pd.read_csv(input_path)
process_record['steps'].append({
'step': 'load_data',
'shape': df.shape
})
# 数据预处理
for method in process_methods:
df = self._apply_process_methods(df, method)
process_record['steps'].append({
'step': 'load_data',
'step': 'cleaning',
'method': method['method_name'],
'params': method['params'],
'shape': df.shape
})
# 特征工程
for method in feature_methods:
df = self._apply_feature_method(df, method)
process_record['steps'].append({
'step': 'feature_engineering',
'method': method['method_name'],
'params': method['params'],
'shape': df.shape
})
# 数据集划分
train_data, val_data, test_data = self._split_dataset(
df,
test_size=split_params.get('test_size', 0),
val_size=split_params.get('val_size', 0)
)
# 保存处理后的数据集
train_path = output_path / f'train_{file_name}_{timestamp}.csv'
val_path = output_path / f'val_{file_name}_{timestamp}.csv'
test_path = output_path / f'test_{file_name}_{timestamp}.csv'
if train_data is not None:
train_data.to_csv(train_path, index=False)
if val_data is not None:
val_data.to_csv(val_path, index=False)
if test_data is not None:
test_data.to_csv(test_path, index=False)
# 记录输出文件路径
process_record['output_files'] = {
'train': str(train_path) if train_data is not None else "",
'validation': str(val_path) if val_data is not None else "",
'test': str(test_path) if test_data is not None else ""
}
# 保存处理记录
record_path = output_path / f'process_record__{file_name}_{timestamp}.json'
with open(record_path, 'w', encoding='utf-8') as f:
json.dump(process_record, f, indent=2, ensure_ascii=False)
# 数据预处理
for method in process_methods:
df = self._apply_process_methods(df, method)
process_record['steps'].append({
'step': 'cleaning',
'method': method['method_name'],
'params': method['params'],
'shape': df.shape
})
# 特征工程
for method in feature_methods:
df = self._apply_feature_method(df, method)
process_record['steps'].append({
'step': 'feature_engineering',
'method': method['method_name'],
'params': method['params'],
'shape': df.shape
})
# 数据集划分
train_data, val_data, test_data = self._split_dataset(
df,
test_size=split_params.get('test_size', 0.2),
val_size=split_params.get('val_size', 0.2)
)
# 保存处理后的数据集
train_path = output_path / f'train_{file_name}_{timestamp}.csv'
val_path = output_path / f'val_{file_name}_{timestamp}.csv'
test_path = output_path / f'test_{file_name}_{timestamp}.csv'
train_data.to_csv(train_path, index=False)
val_data.to_csv(val_path, index=False)
test_data.to_csv(test_path, index=False)
# 记录输出文件路径
process_record['output_files'] = {
'train': str(train_path),
'validation': str(val_path),
'test': str(test_path)
}
# 保存处理记录
record_path = output_path / f'process_record__{file_name}_{timestamp}.json'
with open(record_path, 'w', encoding='utf-8') as f:
json.dump(process_record, f, indent=2, ensure_ascii=False)
self.logger.info(f"Data processing completed. Results saved to {output_path}")
return {
'status': 'success',
'message': 'Data processing completed successfully',
'process_record': process_record
}
except Exception as e:
error_msg = f"Error processing dataset: {str(e)}"
self.logger.error(error_msg)
return {
'status': 'error',
'message': error_msg
}
self.logger.info(f"Data processing completed. Results saved to {output_path}")
return {
'status': 'success',
'message': 'Data processing completed successfully',
'process_record': process_record
}
# except Exception as e:
# error_msg = f"Error processing dataset: {str(e)}"
# self.logger.error(error_msg)
# return {
# 'status': 'error',
# 'message': error_msg
# }
def _apply_process_methods(self, df: pd.DataFrame, method: Dict) -> pd.DataFrame:
"""应用数据预处理方法"""
@ -328,29 +332,36 @@ class DataProcessor:
def _split_dataset(
self,
df: pd.DataFrame,
test_size: float = 0.2,
val_size: float = 0.2
test_size: float = 0,
val_size: float = 0
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""划分数据集"""
try:
# 首先划分训练集和测试集
train_val_data, test_data = train_test_split(
df,
test_size=test_size,
random_state=42
)
if test_size > 0:
train_val_data, test_data = train_test_split(
df,
test_size=test_size,
random_state=42
)
else:
train_val_data = df
test_data = None
if val_size > 0:
# 再划分训练集和验证集
val_size_adjusted = val_size / (1 - test_size)
train_data, val_data = train_test_split(
train_val_data,
test_size=val_size_adjusted,
random_state=42
)
val_size_adjusted = val_size / (1 - test_size)
train_data, val_data = train_test_split(
train_val_data,
test_size=val_size_adjusted,
random_state=42
)
else:
train_data = train_val_data
val_data = None
self.logger.info(
f"Dataset split - Train: {len(train_data)}, "
f"Val: {len(val_data)}, Test: {len(test_data)}"
f"Dataset split - Train: {len(train_data) if train_data is not None else 0}, "
f"Val: {len(val_data) if val_data is not None else 0} , Test: {len(test_data) if test_data is not None else 0}"
)
return train_data, val_data, test_data

View File

@ -1,93 +0,0 @@
import unittest
import pandas as pd
import numpy as np
from pathlib import Path
from function.data_processor_date import DataProcessor
class TestDataProcessor(unittest.TestCase):
def setUp(self):
self.processor = DataProcessor()
# 创建测试数据
self.test_data = pd.DataFrame({
'feature1': [1, 2, np.nan, 4, 5],
'feature2': [10, 20, 30, 40, 50],
'target': [0, 1, 0, 1, 0]
})
# 保存测试数据
self.input_path = 'dataset/dataset_raw/test_data.csv'
Path(self.input_path).parent.mkdir(parents=True, exist_ok=True)
self.test_data.to_csv(self.input_path, index=False)
# 设置输出目录
self.output_dir = 'dataset/dataset_processed'
def test_process_dataset(self):
# 定义处理方法
cleaning_methods = [
{
'method_name': 'SimpleImputer',
'params': {'strategy': 'mean'}
}
]
feature_methods = [
{
'method_name': 'StandardScaler',
'params': {}
}
]
split_params = {
'test_size': 0.2,
'val_size': 0.2
}
# 处理数据集
result = self.processor.process_dataset(
self.input_path,
self.output_dir,
cleaning_methods,
feature_methods,
split_params
)
# 验证结果
self.assertEqual(result['status'], 'success')
self.assertIn('process_record', result)
# 验证输出文件
record = result['process_record']
self.assertTrue(Path(record['output_files']['train']).exists())
self.assertTrue(Path(record['output_files']['validation']).exists())
self.assertTrue(Path(record['output_files']['test']).exists())
def test_invalid_method(self):
# 测试无效的方法名
cleaning_methods = [
{
'method_name': 'InvalidMethod',
'params': {}
}
]
result = self.processor.process_dataset(
self.input_path,
self.output_dir,
cleaning_methods,
[],
{'test_size': 0.2, 'val_size': 0.2}
)
self.assertEqual(result['status'], 'error')
def tearDown(self):
# 清理测试文件
try:
Path(self.input_path).unlink()
except:
pass
if __name__ == '__main__':
unittest.main()

View File

@ -1,49 +0,0 @@
import unittest
from data_process.method_reader_date_process import MethodReader
class TestMethodReader(unittest.TestCase):
def setUp(self):
self.reader = MethodReader()
def test_get_preprocessing_methods(self):
result = self.reader.get_preprocessing_methods()
self.assertEqual(result['status'], 'success')
self.assertIsInstance(result['methods'], list)
# 检查返回的方法列表
methods = result['methods']
self.assertTrue(any(m['name'] == 'data_scaler' for m in methods))
self.assertTrue(any(m['name'] == 'missing_value_handler' for m in methods))
self.assertTrue(any(m['name'] == 'outlier_detector' for m in methods))
def test_get_method_details(self):
# 测试获取StandardScaler的详细信息
result = self.reader.get_method_details('StandardScaler')
self.assertEqual(result['status'], 'success')
self.assertEqual(result['method']['name'], 'StandardScaler')
# 检查返回的详细信息字段
method = result['method']
self.assertIn('description', method)
self.assertIn('principle', method)
self.assertIn('advantages', method)
self.assertIn('disadvantages', method)
self.assertIn('applicable_scenarios', method)
self.assertIn('parameters', method)
# 检查参数信息
parameters = method['parameters']
self.assertIsInstance(parameters, list)
if parameters:
param = parameters[0]
self.assertIn('name', param)
self.assertIn('type', param)
self.assertIn('default', param)
self.assertIn('description', param)
# 测试获取不存在的方法
result = self.reader.get_method_details('NonExistentMethod')
self.assertEqual(result['status'], 'error')
if __name__ == '__main__':
unittest.main()