Skill-BidCreater/scripts/common.py
2026-03-09 22:20:38 +08:00

248 lines
7.7 KiB
Python

from __future__ import annotations
import json
import os
import re
import tempfile
from pathlib import Path
from typing import Any
import yaml
REPO_ROOT = Path(__file__).resolve().parents[2]
INPUT_ROOT = REPO_ROOT / "input"
OUTPUT_ROOT = REPO_ROOT / "output"
VALID_BUNDLES = ("technical", "business-other")
BUNDLE_ALIASES = {
"technical": "technical",
"business-other": "business-other",
"business_other": "business-other",
}
BUNDLE_DEFAULTS: dict[str, dict[str, str]] = {
"technical": {
"outline_json": "final_outline_technical.json",
"content_json": "final_bid_content_technical.json",
"outline_docx": "技术标_目录版.docx",
"bid_docx": "技术标.docx",
"outline_doc_title": "技术标(目录版)",
"outline_toc_title": "目录",
"bid_doc_title": "技术标",
"bid_toc_title": "目录",
},
"business-other": {
"outline_json": "final_outline_business_other.json",
"content_json": "final_bid_content_business_other.json",
"outline_docx": "商务及其他_目录版.docx",
"bid_docx": "商务及其他.docx",
"outline_doc_title": "商务及其他(目录版)",
"outline_toc_title": "目录",
"bid_doc_title": "商务及其他",
"bid_toc_title": "目录",
},
}
BANNED_WORDS = ["可能", "大概", "应该", "我觉得", "AI建议", "待确认"]
# Weak filename hints only. These hints may help AI label discovered files,
# but they must never be treated as workflow routing, directory semantics,
# or mandatory material categories.
MATERIAL_CATALOG = [
{"key": "business_license", "label": "营业执照副本", "keywords": ["营业执照", "license"]},
{"key": "qualification_certificate", "label": "资质证书", "keywords": ["资质", "证书", "许可", "qualification"]},
{"key": "legal_representative_id", "label": "法定代表人身份证明", "keywords": ["法人", "法定代表人", "身份证明"]},
{"key": "authorization_letter", "label": "授权委托书", "keywords": ["授权", "委托书", "authorization"]},
{"key": "project_manager_certificate", "label": "项目经理证书", "keywords": ["项目经理", "pmp", "建造师"]},
{"key": "similar_project_case", "label": "类似项目业绩证明", "keywords": ["业绩", "案例", "合同", "验收", "case"]},
{"key": "quotation_basis", "label": "报价依据说明", "keywords": ["报价", "清单", "预算", "quote", "price"]},
]
RESERVED_PROJECT_DIRS = {
"rfp",
"work",
"reports",
"final",
"__pycache__",
".git",
".hg",
".svn",
".idea",
".vscode",
".venv",
"venv",
"node_modules",
}
def ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def write_text(path: Path, text: str) -> None:
ensure_dir(path.parent)
path.write_text(text, encoding="utf-8", newline="\n")
def write_json(path: Path, data: Any) -> None:
ensure_dir(path.parent)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def write_json_atomic(path: Path, data: Any, *, indent: int = 2, ensure_ascii: bool = False) -> None:
ensure_dir(path.parent)
temp_path: Path | None = None
encoder = json.JSONEncoder(ensure_ascii=ensure_ascii, indent=indent)
try:
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
newline="\n",
dir=str(path.parent),
prefix=f"{path.stem}.",
suffix=".tmp",
delete=False,
) as temp_file:
temp_path = Path(temp_file.name)
for chunk in encoder.iterencode(data):
temp_file.write(chunk)
temp_file.flush()
os.fsync(temp_file.fileno())
temp_path.replace(path)
except Exception:
if temp_path and temp_path.exists():
temp_path.unlink(missing_ok=True)
raise
def read_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8-sig"))
def load_yaml(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
data = yaml.safe_load(path.read_text(encoding="utf-8-sig"))
return data if isinstance(data, dict) else {}
def normalize_text(text: str) -> str:
return re.sub(r"\s+", " ", text or "").strip()
def normalize_bundle(bundle: str | None) -> str | None:
if bundle is None:
return None
normalized = BUNDLE_ALIASES.get(bundle.strip())
if normalized:
return normalized
raise ValueError(f"不支持的 bundle: {bundle}。允许值:{', '.join(VALID_BUNDLES)}")
def ensure_output_layout(project_dir: Path) -> dict[str, Path]:
output_root = project_dir
layout = {
"root": output_root,
"final": output_root / "final",
"artifacts": output_root / "work",
"tables": output_root / "work",
"reports": output_root / "reports",
"work": output_root / "work",
}
for path in layout.values():
ensure_dir(path)
return layout
def get_bundle_defaults(bundle: str) -> dict[str, str]:
normalized = normalize_bundle(bundle)
if normalized is None:
raise ValueError("bundle 不能为空。")
return BUNDLE_DEFAULTS[normalized]
def get_bundle_outline_path(output_layout: dict[str, Path], bundle: str) -> Path:
return output_layout["work"] / get_bundle_defaults(bundle)["outline_json"]
def get_bundle_content_path(output_layout: dict[str, Path], bundle: str) -> Path:
return output_layout["work"] / get_bundle_defaults(bundle)["content_json"]
def get_bundle_outline_docx_path(output_layout: dict[str, Path], bundle: str) -> Path:
return output_layout["final"] / get_bundle_defaults(bundle)["outline_docx"]
def get_bundle_bid_docx_path(output_layout: dict[str, Path], bundle: str) -> Path:
return output_layout["final"] / get_bundle_defaults(bundle)["bid_docx"]
def find_rfp_docx(project_dir: Path) -> Path:
rfp_dir = project_dir / "rfp"
if not rfp_dir.exists():
raise FileNotFoundError(f"未找到招标文件目录: {rfp_dir}")
docx_files = sorted(rfp_dir.glob("*.docx"))
if not docx_files:
raise FileNotFoundError(f"未找到 DOCX 招标文件: {rfp_dir}")
return docx_files[0]
def get_project_config(project_dir: Path) -> dict[str, Any]:
return load_yaml(project_dir / "config" / "project.yaml")
def is_reserved_project_entry(path: Path) -> bool:
return path.name.lower() in RESERVED_PROJECT_DIRS
def is_hidden_project_entry(path: Path) -> bool:
return path.name.startswith(".")
def iter_material_entries(project_dir: Path) -> list[Path]:
if not project_dir.exists():
return []
entries: list[Path] = []
for entry in sorted(project_dir.iterdir()):
if is_reserved_project_entry(entry) or is_hidden_project_entry(entry):
continue
entries.append(entry)
return entries
def safe_filename(name: str) -> str:
return re.sub(r'[<>:"/\\\\|?*]+', "_", name).strip(" .") or "untitled"
def markdown_table(headers: list[str], rows: list[list[str]]) -> str:
lines = [
"| " + " | ".join(headers) + " |",
"| " + " | ".join(["---"] * len(headers)) + " |",
]
for row in rows:
lines.append("| " + " | ".join(row) + " |")
return "\n".join(lines)
def get_font_candidates() -> list[Path]:
windir = Path("C:/Windows/Fonts")
return [
windir / "msyh.ttc",
windir / "msyhbd.ttc",
windir / "simhei.ttf",
windir / "simsun.ttc",
]
def find_font_path() -> Path | None:
for path in get_font_candidates():
if path.exists():
return path
return None
def list_files(path: Path) -> list[Path]:
if not path.exists():
return []
return [item for item in sorted(path.rglob("*")) if item.is_file()]