CostPrediction/src/demo_service.py

291 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR
@dataclass(frozen=True)
class AlgorithmDefinition:
key: str
name: str
english_name: str
family: str
description: str
estimator: Any
class DemoModelService:
target_column = "actual_cost"
ignored_columns = {"name", "type", target_column}
def __init__(self, dataset_path: Path | str | None = None):
root = Path(__file__).resolve().parent.parent
self.dataset_path = Path(dataset_path) if dataset_path else root / "data" / "demo_equipment_costs.csv"
def get_algorithms(self) -> list[dict[str, str]]:
algorithms, _ = self._available_algorithms()
return [
{
"key": item.key,
"name": item.name,
"english_name": item.english_name,
"family": item.family,
"description": item.description,
}
for item in algorithms.values()
]
def get_dataset_summary(self) -> dict[str, Any]:
frame = self._load_dataset()
feature_columns = self._feature_columns(frame)
return {
"source": "local-file",
"path": str(self.dataset_path),
"row_count": int(len(frame)),
"columns": list(frame.columns),
"features": feature_columns,
"target": self.target_column,
"target_label": "实际成本",
"equipment_types": sorted(frame["type"].dropna().unique().tolist()),
"preview": frame.head(8).to_dict(orient="records"),
}
def run_demo(self, selected_algorithms: list[str] | None = None) -> dict[str, Any]:
frame = self._load_dataset()
feature_columns = self._feature_columns(frame)
algorithms, availability_warnings = self._available_algorithms()
requested = selected_algorithms or list(algorithms.keys())
warnings = list(availability_warnings)
selected = []
for key in requested:
if key in algorithms:
selected.append(key)
else:
warnings.append(f"算法 '{key}' 不可用,已自动跳过。")
if not selected:
selected = ["linear"]
warnings.append("所选算法均不可用,已自动使用线性回归。")
X = frame[feature_columns]
y = frame[self.target_column]
train_x, test_x, train_y, test_y = train_test_split(
X,
y,
test_size=0.3,
random_state=42,
)
metrics: dict[str, dict[str, float | str]] = {}
predictions: dict[str, list[float]] = {}
feature_importance: dict[str, list[dict[str, float | str]]] = {}
for key in selected:
definition = algorithms[key]
model = definition.estimator
model.fit(train_x, train_y)
predicted = model.predict(test_x)
predictions[key] = [float(value) for value in predicted]
metrics[key] = {
"name": definition.name,
"r2": float(r2_score(test_y, predicted)),
"mae": float(mean_absolute_error(test_y, predicted)),
"rmse": float(np.sqrt(mean_squared_error(test_y, predicted))),
}
feature_importance[key] = self._feature_importance(model, feature_columns)
best_model = min(metrics, key=lambda key: float(metrics[key]["rmse"]))
ordered_test = test_x.copy()
ordered_test["actual"] = test_y
ordered_test["name"] = frame.loc[test_x.index, "name"]
prediction_points = []
for position, (index, row) in enumerate(ordered_test.sort_values("actual").iterrows()):
point = {
"name": row["name"],
"actual": float(row["actual"]),
}
for key in selected:
original_position = list(test_x.index).index(index)
point[key] = predictions[key][original_position]
prediction_points.append(point)
sample = frame.sort_values(self.target_column).iloc[len(frame) // 2]
sample_x = pd.DataFrame([sample[feature_columns].to_dict()])
sample_predictions = {
key: float(algorithms[key].estimator.fit(X, y).predict(sample_x)[0])
for key in selected
}
return {
"source": "local-file",
"dataset": self.get_dataset_summary(),
"algorithms": self.get_algorithms(),
"selected_algorithms": selected,
"best_model": best_model,
"metrics": metrics,
"feature_importance": feature_importance,
"prediction_points": prediction_points,
"sample_prediction": {
"input": sample.drop(labels=[self.target_column]).to_dict(),
"actual": float(sample[self.target_column]),
"predictions": sample_predictions,
},
"warnings": warnings,
}
def _load_dataset(self) -> pd.DataFrame:
if not self.dataset_path.exists():
raise FileNotFoundError(f"Demo dataset not found: {self.dataset_path}")
frame = pd.read_csv(self.dataset_path)
if self.target_column not in frame.columns:
raise ValueError(f"Demo dataset must include '{self.target_column}'.")
return frame
def _feature_columns(self, frame: pd.DataFrame) -> list[str]:
columns = [
column
for column in frame.columns
if column not in self.ignored_columns and pd.api.types.is_numeric_dtype(frame[column])
]
if not columns:
raise ValueError("Demo dataset has no numeric feature columns.")
return columns
def _available_algorithms(self) -> tuple[dict[str, AlgorithmDefinition], list[str]]:
algorithms = {
"linear": AlgorithmDefinition(
"linear",
"线性回归",
"Linear Regression",
"线性模型",
"快速建立基准模型,用于展示参数与成本之间的线性关系。",
Pipeline([("scaler", StandardScaler()), ("model", LinearRegression())]),
),
"ridge": AlgorithmDefinition(
"ridge",
"岭回归",
"Ridge Regression",
"线性模型",
"带正则化的线性模型,适合特征存在相关性的场景。",
Pipeline([("scaler", StandardScaler()), ("model", Ridge(alpha=1.0))]),
),
"random_forest": AlgorithmDefinition(
"random_forest",
"随机森林",
"Random Forest",
"树模型集成",
"通过多棵决策树集成预测,能够捕捉非线性特征影响。",
RandomForestRegressor(n_estimators=160, max_depth=6, random_state=42),
),
"gradient_boosting": AlgorithmDefinition(
"gradient_boosting",
"梯度提升树",
"Gradient Boosting",
"树模型集成",
"逐步修正误差的提升模型,常用于表格数据回归任务。",
GradientBoostingRegressor(n_estimators=120, learning_rate=0.06, max_depth=3, random_state=42),
),
"svr": AlgorithmDefinition(
"svr",
"支持向量回归",
"Support Vector Regression",
"核方法",
"使用核函数拟合平滑回归关系,适合展示不同算法偏好。",
Pipeline([("scaler", StandardScaler()), ("model", SVR(C=500000, epsilon=50000))]),
),
"knn": AlgorithmDefinition(
"knn",
"近邻回归",
"KNN Regression",
"实例学习",
"基于相似样本进行预测,便于解释局部相似性。",
Pipeline([("scaler", StandardScaler()), ("model", KNeighborsRegressor(n_neighbors=4))]),
),
}
warnings = []
try:
from xgboost import XGBRegressor
algorithms["xgboost"] = AlgorithmDefinition(
"xgboost",
"XGBoost",
"XGBoost",
"提升模型",
"面向表格数据的高性能梯度提升实现。",
XGBRegressor(
n_estimators=120,
max_depth=3,
learning_rate=0.05,
subsample=0.9,
colsample_bytree=0.9,
random_state=42,
objective="reg:squarederror",
),
)
except Exception:
warnings.append("当前环境未安装 XGBoost页面已自动隐藏该算法。")
try:
from lightgbm import LGBMRegressor
algorithms["lightgbm"] = AlgorithmDefinition(
"lightgbm",
"LightGBM",
"LightGBM",
"提升模型",
"基于直方图优化的快速梯度提升模型。",
LGBMRegressor(
n_estimators=120,
learning_rate=0.05,
max_depth=4,
random_state=42,
verbose=-1,
),
)
except Exception:
warnings.append("当前环境未安装 LightGBM页面已自动隐藏该算法。")
return algorithms, warnings
def _feature_importance(self, model: Any, feature_columns: list[str]) -> list[dict[str, float | str]]:
estimator = model
if isinstance(model, Pipeline):
estimator = model.named_steps["model"]
if hasattr(estimator, "feature_importances_"):
values = estimator.feature_importances_
elif hasattr(estimator, "coef_"):
values = np.abs(np.ravel(estimator.coef_))
else:
values = np.zeros(len(feature_columns))
total = float(np.sum(values))
if total > 0:
values = values / total
ranked = sorted(
[
{"feature": feature, "importance": float(value)}
for feature, value in zip(feature_columns, values)
],
key=lambda item: item["importance"],
reverse=True,
)
return ranked[:8]