CostPrediction/src/data_preparation.py
2024-11-08 23:43:57 +08:00

199 lines
8.3 KiB
Python

from sklearn.preprocessing import StandardScaler
from datetime import datetime
import os
import joblib
import pandas as pd
import numpy as np
from src.feature_analysis import FeatureAnalysis
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from sklearn.model_selection import cross_val_score, LeaveOneOut
import json
import logging
from src.database.db_connection import get_db_connection
from sklearn.metrics import mean_absolute_error, mean_squared_error
class DataPreparation:
def __init__(self):
self.feature_analyzer = FeatureAnalysis()
self.feature_scaler = StandardScaler()
self.target_scaler = StandardScaler() # 添加目标值标准化器
def prepare_training_data(self, equipment_data, equipment_type):
"""
准备训练数据
"""
try:
logging.info(f"Preparing training data for {equipment_type}")
logging.info(f"Raw data size: {len(equipment_data)}")
# 如果输入已经是 numpy 数组,直接返回
if isinstance(equipment_data, np.ndarray):
X = equipment_data
logging.info(f"Input is already numpy array with shape: {X.shape}")
# 处理无效值
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
return {
'X': X,
'feature_names': self.feature_analyzer.get_equipment_specific_features(equipment_type),
'feature_scaler': self.feature_scaler,
'target_scaler': self.target_scaler
}
# 从原始数据中提取特征和目标值
feature_names = self.feature_analyzer.get_equipment_specific_features(equipment_type)
features = []
targets = []
for item in equipment_data:
# 提取特征值
feature_values = []
for name in feature_names:
value = item.get(name)
try:
feature_values.append(float(value) if value is not None else 0.0)
except (ValueError, TypeError):
feature_values.append(0.0)
features.append(feature_values)
# 提取目标值(成本)
try:
cost = float(item['actual_cost'])
if cost > 0: # 只使用正数成本值
targets.append(cost)
else:
logging.warning(f"Skipping non-positive cost value: {cost}")
except (ValueError, TypeError, KeyError):
logging.error(f"Invalid cost value: {item.get('actual_cost')}")
continue
# 转换为numpy数组
X = np.array(features, dtype=float)
y = np.array(targets, dtype=float)
# 记录原始数据范围
logging.info(f"Original X range: min={X.min()}, max={X.max()}")
logging.info(f"Original y range: min={y.min()}, max={y.max()}")
# 处理无效值
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
# 标准化特征和目标值
X_scaled = self.feature_scaler.fit_transform(X)
y_scaled = self.target_scaler.fit_transform(y.reshape(-1, 1)).ravel()
# 记录标准化后的数据范围
logging.info(f"Scaled X range: min={X_scaled.min()}, max={X_scaled.max()}")
logging.info(f"Scaled y range: min={y_scaled.min()}, max={y_scaled.max()}")
return {
'X': X_scaled,
'y': y_scaled,
'feature_names': feature_names,
'feature_scaler': self.feature_scaler,
'target_scaler': self.target_scaler
}
except Exception as e:
logging.error(f"Error in data preparation: {str(e)}")
raise Exception(f"Training error: {str(e)}")
def prepare_validation_data(self, validation_data, equipment_type, feature_names=None, scalers=None):
"""
准备验证数据
"""
try:
logging.info(f"Preparing validation data for {equipment_type}")
logging.info(f"Raw validation data size: {len(validation_data)}")
# 如果输入已经是 numpy 数组,直接使用
if isinstance(validation_data, np.ndarray):
X = validation_data
logging.info(f"Input is already numpy array with shape: {X.shape}")
# 处理无效值
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
# 使用训练数据的标准化器
if scalers and 'feature_scaler' in scalers:
X_scaled = scalers['feature_scaler'].transform(X)
else:
# 如果没有提供标准化器,直接返回处理后的数组
X_scaled = X
logging.info(f"Preprocessed data shape: {X_scaled.shape}")
logging.info(f"Validation features shape: {X_scaled.shape}")
logging.info(f"Validation features type: {X_scaled.dtype}")
return {
'X': X_scaled,
'y': None # 验证数据可能没有标签
}
# 否则,从原始数据中提取特征
if not feature_names:
feature_names = self.feature_analyzer.get_equipment_specific_features(equipment_type)
# 提取特征和目标值
features = []
targets = []
for item in validation_data:
# 提取特征值
feature_values = []
for name in feature_names:
value = item.get(name)
try:
feature_values.append(float(value) if value is not None else 0.0)
except (ValueError, TypeError):
feature_values.append(0.0) # 使用0替代NaN
features.append(feature_values)
# 提取目标值(成本)
try:
targets.append(float(item['actual_cost']))
except (ValueError, TypeError):
logging.error(f"Invalid cost value: {item.get('actual_cost')}")
continue
# 转换为numpy数组
X = np.array(features, dtype=float)
y = np.array(targets, dtype=float)
# 处理无效值
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
# 使用训练数据的标准化器
if scalers and 'feature_scaler' in scalers:
X_scaled = scalers['feature_scaler'].transform(X)
else:
# 如果没有提供标准化器,直接返回处理后的数组
X_scaled = X
logging.info(f"Preprocessed data shape: {X_scaled.shape}")
logging.info(f"Validation features shape: {X_scaled.shape}")
logging.info(f"Validation features type: {X_scaled.dtype}")
return {
'X': X_scaled,
'y': y # 返回原始成本值
}
except Exception as e:
logging.error(f"Error in validation data preparation: {str(e)}")
logging.error(f"Feature names: {feature_names}")
logging.error(f"Equipment type: {equipment_type}")
raise Exception(f"Validation error: {str(e)}")
def calculate_derived_features(self, data, equipment_type):
"""
计算衍生特征
"""
try:
return self.feature_analyzer.calculate_derived_features(data, equipment_type)
except Exception as e:
logging.error(f"Error calculating derived features: {str(e)}")
raise Exception(f"Feature calculation error: {str(e)}")