bidmaster-cli/src/bidmaster/tools/parser.py
sladro bf7eb9ca9d refactor: 提取所有AI提示词到配置文件并实现统一管理
- 新增 config/prompts.yaml 提示词配置文件,包含10个核心提示词
- 新增 src/bidmaster/config/prompt_manager.py 提示词管理工具类
- 重构 src/bidmaster/tools/llm.py 使用配置化系统消息
- 重构 src/bidmaster/tools/parser.py 使用配置化解析提示词(3个)
- 重构 src/bidmaster/tools/rag.py 使用配置化RAG生成提示词
- 重构 src/bidmaster/nodes/toc/llm_helper.py 使用配置化TOC提示词(2个)
- 重构 src/bidmaster/nodes/toc/adjust_chapters.py 使用配置化章节调整提示词
- 重构 src/bidmaster/nodes/toc/optimize_with_feedback.py 使用配置化优化反馈提示词

优势:
- 集中管理: 所有提示词统一配置,易于维护
- 易于调优: 修改提示词无需改动代码
- 版本控制: 提示词变更可独立追踪
- A/B测试: 方便测试不同提示词效果
- 可扩展性: 支持未来多语言提示词

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-10 10:12:25 +08:00

574 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""招标文件解析器
解析评分要求表格、偏离表要求结合Word模板生成标书结构。
支持Excel、CSV、Word表格格式。
"""
import logging
from pathlib import Path
from typing import Any, List
from enum import Enum
import pandas as pd
from docx import Document
from openai import OpenAI
from pydantic import BaseModel, Field
from ..config import get_settings
from ..config.prompt_manager import get_prompt_manager
logger = logging.getLogger(__name__)
class TechnicalCategory(str, Enum):
"""评分类别"""
TECHNICAL_SOLUTION = "technical_solution" # 技术方案完整性
EQUIPMENT_SPEC = "equipment_spec" # 设备规格和可靠性
IMPLEMENTATION = "implementation" # 实施方案
QUALITY_SAFETY = "quality_safety" # 质量安全体系
AFTER_SALES = "after_sales" # 售后服务
COMPLIANCE = "compliance" # 合规性响应
COMMERCIAL = "commercial" # 商务条件(价格、资质、商务条款等)
OTHER = "other" # 其他类别
class ScoringCriteria(BaseModel):
"""评分标准"""
item_name: str = Field(..., description="评分项名称")
max_score: float = Field(..., description="最高分值")
description: str = Field(default="", description="评分要求描述")
category: TechnicalCategory = Field(default=TechnicalCategory.OTHER, description="技术类别")
chapter_id: str = Field(..., description="对应章节ID")
original_index: int = Field(default=0, description="在评分表中的原始出现顺序")
class DeviationItem(BaseModel):
"""偏离项"""
requirement: str = Field(..., description="招标要求")
response_type: str = Field(default="正偏离", description="响应类型")
chapter_id: str = Field(..., description="对应章节ID")
class DocumentChapter(BaseModel):
"""文档章节"""
id: str = Field(..., description="章节ID")
title: str = Field(..., description="章节标题")
level: int = Field(..., description="章节层级")
score: float | None = Field(default=None, description="评分值")
children: List['DocumentChapter'] = Field(default_factory=list, description="子章节")
template_placeholder: str | None = Field(default=None, description="模板占位符")
class BidStructure(BaseModel):
"""标书结构"""
project_name: str = Field(default="", description="项目名称")
scoring_criteria: List[ScoringCriteria] = Field(default_factory=list, description="评分标准")
deviation_items: List[DeviationItem] = Field(default_factory=list, description="偏离项")
chapters: List[DocumentChapter] = Field(default_factory=list, description="文档章节")
# 文件路径
scoring_file: str = Field(default="", description="评分要求文件路径")
deviation_file: str = Field(default="", description="偏离表文件路径")
template_file: str = Field(default="", description="模板文件路径")
# AI审查结果
structure_review: dict[str, Any] = Field(default_factory=dict, description="AI结构审查结果")
class BidParser:
"""招标文件解析器"""
def __init__(self) -> None:
self.settings = get_settings()
def parse_bid_requirements(
self,
scoring_file: str,
deviation_file: str | None = None,
template_file: str | None = None
) -> BidStructure:
"""解析招标要求并生成标书结构"""
try:
# 验证文件存在
self._validate_files(scoring_file, deviation_file, template_file)
# 创建标书结构
bid_structure = BidStructure(
scoring_file=scoring_file,
deviation_file=deviation_file or "",
template_file=template_file or ""
)
# 解析评分要求
bid_structure.scoring_criteria = self._parse_scoring_file(scoring_file)
# 解析偏离表要求(可选)
if deviation_file:
bid_structure.deviation_items = self._parse_deviation_file(deviation_file)
# 解析Word模板可选
if template_file:
bid_structure.chapters = self._parse_template_file(template_file)
# 注意章节生成现在完全由TocAgent负责不在BidParser中处理
return bid_structure
except Exception as e:
logger.error(f"解析招标要求失败: {e}")
raise
def _validate_files(self, scoring_file: str, deviation_file: str | None, template_file: str | None) -> None:
"""验证文件存在"""
# 评分文件是必需的
if not Path(scoring_file).exists():
raise FileNotFoundError(f"评分要求文件不存在: {scoring_file}")
# 偏离表和模板文件是可选的
if deviation_file and not Path(deviation_file).exists():
raise FileNotFoundError(f"偏离表文件不存在: {deviation_file}")
if template_file and not Path(template_file).exists():
raise FileNotFoundError(f"模板文件不存在: {template_file}")
def _parse_scoring_file(self, file_path: str) -> List[ScoringCriteria]:
"""解析评分要求文件"""
try:
file_path_obj = Path(file_path)
suffix = file_path_obj.suffix.lower()
if suffix in ['.xlsx', '.xls']:
return self._parse_excel_scoring(file_path)
elif suffix == '.csv':
return self._parse_csv_scoring(file_path)
elif suffix == '.docx':
return self._parse_word_table_scoring(file_path)
else:
raise ValueError(f"不支持的评分文件格式: {suffix}")
except Exception as e:
logger.error(f"解析评分文件失败: {e}")
raise
def _parse_excel_scoring(self, file_path: str) -> List[ScoringCriteria]:
"""解析Excel评分文件"""
df = pd.read_excel(file_path)
return self._parse_dataframe_scoring(df)
def _parse_csv_scoring(self, file_path: str) -> List[ScoringCriteria]:
"""解析CSV评分文件"""
df = pd.read_csv(file_path, encoding='utf-8-sig')
return self._parse_dataframe_scoring(df)
def _parse_dataframe_scoring(self, df: pd.DataFrame) -> List[ScoringCriteria]:
"""从DataFrame解析评分标准的公共逻辑"""
# 预期的列名(支持中英文)
column_mapping = {
'评分项': 'item_name',
'评分项目': 'item_name',
'项目': 'item_name',
'分值': 'max_score',
'最高分': 'max_score',
'满分': 'max_score',
'描述': 'description',
'要求': 'description',
'评分要求': 'description'
}
# 找到对应的列
columns = {}
for col in df.columns:
for key, value in column_mapping.items():
if key in str(col):
columns[value] = col
break
if 'item_name' not in columns or 'max_score' not in columns:
raise ValueError("文件缺少必要的列:评分项、分值")
criteria = []
for i, row in df.iterrows():
if pd.isna(row[columns['item_name']]):
continue
# 安全地处理max_score防止NaN或None值
max_score_raw = row[columns['max_score']]
try:
max_score = float(max_score_raw) if pd.notna(max_score_raw) else 0.0
except (ValueError, TypeError):
max_score = 0.0
logger.warning(f"无法解析评分项'{row[columns['item_name']]}'的分值: {max_score_raw}")
criterion = ScoringCriteria(
item_name=str(row[columns['item_name']]).strip(),
max_score=max_score,
description=str(row.get(columns.get('description', ''), '')).strip(),
category=TechnicalCategory.OTHER, # 表格解析默认为OTHERAI会重新分类
chapter_id=f"chapter_{i+1:02d}"
)
criteria.append(criterion)
return criteria
def _parse_word_table_scoring(self, file_path: str) -> List[ScoringCriteria]:
"""使用AI解析Word文档中的评分表格"""
doc = Document(file_path)
criteria = []
for table in doc.tables:
if len(table.rows) < 2:
continue
# 提取表格内容为文本
table_text = self._extract_table_text(table)
# 识别表格类型
table_type = self._identify_table_type(table_text)
if table_type == "scoring":
# 使用AI解析评分表格
ai_results = self._ai_parse_scoring_table(table_text)
if ai_results:
criteria.extend(ai_results)
return criteria
def extract_table_text(self, table) -> str:
"""提取表格内容为文本格式,处理合并单元格"""
lines = []
# 获取表格的基本信息
max_cols = max(len(row.cells) for row in table.rows) if table.rows else 0
for i, row in enumerate(table.rows):
cells = []
for j in range(max_cols):
if j < len(row.cells):
cell_text = row.cells[j].text.strip()
# 处理空单元格
if not cell_text:
cell_text = "[空]"
cells.append(cell_text)
else:
cells.append("[空]")
# 使用制表符分隔便于AI理解
line = "\t".join(cells)
lines.append(f"{i+1}: {line}")
return "\n".join(lines)
def _ai_parse_scoring_table(self, table_text: str) -> List[ScoringCriteria]:
"""使用AI解析评分表格"""
try:
prompt_manager = get_prompt_manager()
prompt = prompt_manager.get_parser_prompt("parse_scoring_table", table_text=table_text)
# 调用LLM API
response = self.call_llm(prompt)
if not response:
raise ValueError("AI解析表格失败无响应")
# 解析AI响应
try:
if not response or response.strip() == "":
logger.error("AI返回空响应")
return []
# 使用统一的JSON解析方法
from ..nodes.toc.llm_helper import LLMHelper
result_data = LLMHelper.parse_ai_json_response(response)
scoring_data = result_data.get("scoring_criteria", [])
criteria = []
for i, item in enumerate(scoring_data):
# 验证和转换类别
category = item.get("category", "other")
try:
category_enum = TechnicalCategory(category)
except ValueError:
category_enum = TechnicalCategory.OTHER
# 安全地处理max_score防止None值
max_score_raw = item.get("max_score", 0)
try:
max_score = float(max_score_raw) if max_score_raw is not None else 0.0
except (ValueError, TypeError):
max_score = 0.0
logger.warning(f"无法解析评分项'{item.get('item_name', '')}'的分值: {max_score_raw}")
criterion = ScoringCriteria(
item_name=item.get("item_name", ""),
max_score=max_score,
description=item.get("description", ""),
category=category_enum,
chapter_id=f"chapter_{i+1:02d}",
original_index=i # 保存在评分表中的原始出现顺序
)
criteria.append(criterion)
return criteria
except (ValueError, KeyError) as e:
logger.error(f"解析AI响应失败: {e}")
return []
except Exception as e:
logger.error(f"AI解析表格失败: {e}")
return []
def call_llm(self, prompt: str) -> str | None:
"""公共方法调用LLM API"""
from ..nodes.toc.llm_helper import LLMHelper
return LLMHelper.call_llm_with_retry(prompt, max_retries=1)
def _identify_table_type(self, table_text: str) -> str:
"""使用AI识别表格类型"""
prompt_manager = get_prompt_manager()
prompt = prompt_manager.get_parser_prompt("identify_table_type", table_text=table_text)
response = self.call_llm(prompt)
if not response:
raise ValueError("AI识别表格类型失败无响应")
result = response.strip().lower()
if result not in ["scoring", "deviation", "other"]:
raise ValueError(f"AI返回了无效的表格类型: {result}")
return result
def parse_single_word_document(self, file_path: str) -> tuple[List[ScoringCriteria], List[DeviationItem]]:
"""解析包含评分表和偏离表的单个Word文档"""
try:
doc = Document(file_path)
scoring_criteria = []
deviation_items = []
for table in doc.tables:
if len(table.rows) < 2:
continue
# 提取表格内容为文本
table_text = self.extract_table_text(table)
# 识别表格类型
table_type = self._identify_table_type(table_text)
if table_type == "scoring":
# 解析评分表
ai_results = self._ai_parse_scoring_table(table_text)
if ai_results:
scoring_criteria.extend(ai_results)
elif table_type == "deviation":
# 解析偏离表
deviation_results = self._ai_parse_deviation_table(table_text)
if deviation_results:
deviation_items.extend(deviation_results)
return scoring_criteria, deviation_items
except Exception as e:
logger.error(f"解析Word文档失败: {e}")
return [], []
def parse_word_with_filter(self, file_path: str) -> tuple[List[ScoringCriteria], List[ScoringCriteria], List[DeviationItem]]:
"""解析Word文档返回技术部分、商务部分和偏离项"""
try:
doc = Document(file_path)
all_criteria = []
deviation_items = []
for table in doc.tables:
if len(table.rows) < 2:
continue
# 提取表格内容为文本
table_text = self.extract_table_text(table)
# 识别表格类型
table_type = self._identify_table_type(table_text)
if table_type == "scoring":
# 解析评分表
ai_results = self._ai_parse_scoring_table(table_text)
if ai_results:
all_criteria.extend(ai_results)
elif table_type == "deviation":
# 解析偏离表
deviation_results = self._ai_parse_deviation_table(table_text)
if deviation_results:
deviation_items.extend(deviation_results)
# 分离技术和商务部分
technical_criteria = []
commercial_criteria = []
for criteria in all_criteria:
if criteria.category == TechnicalCategory.COMMERCIAL:
commercial_criteria.append(criteria)
else:
technical_criteria.append(criteria)
return technical_criteria, commercial_criteria, deviation_items
except Exception as e:
logger.error(f"解析Word文档失败: {e}")
return [], [], []
def _ai_parse_deviation_table(self, table_text: str) -> List[DeviationItem]:
"""使用AI解析偏离表格"""
try:
prompt_manager = get_prompt_manager()
prompt = prompt_manager.get_parser_prompt("parse_deviation_table", table_text=table_text)
response = self.call_llm(prompt)
if not response:
raise ValueError("AI解析偏离表失败无响应")
# 解析AI响应
try:
from ..nodes.toc.llm_helper import LLMHelper
result_data = LLMHelper.parse_ai_json_response(response)
deviation_data = result_data.get("deviation_items", [])
items = []
for i, item in enumerate(deviation_data):
deviation_item = DeviationItem(
requirement=item.get("requirement", ""),
response_type=item.get("response_type", "正偏离"),
chapter_id=f"deviation_{i+1:02d}"
)
items.append(deviation_item)
return items
except (ValueError, KeyError) as e:
logger.error(f"解析偏离表AI响应失败: {e}")
return []
except Exception as e:
logger.error(f"AI解析偏离表失败: {e}")
return []
def _parse_deviation_file(self, file_path: str) -> List[DeviationItem]:
"""解析偏离表要求文件"""
try:
file_path_obj = Path(file_path)
suffix = file_path_obj.suffix.lower()
if suffix in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
elif suffix == '.csv':
df = pd.read_csv(file_path, encoding='utf-8-sig')
elif suffix == '.docx':
return self._parse_word_table_deviation(file_path)
else:
raise ValueError(f"不支持的偏离文件格式: {suffix}")
return self._parse_deviation_from_df(df)
except Exception as e:
logger.error(f"解析偏离文件失败: {e}")
raise
def _parse_deviation_from_df(self, df: pd.DataFrame) -> List[DeviationItem]:
"""从DataFrame解析偏离项"""
column_mapping = {
'要求': 'requirement',
'招标要求': 'requirement',
'技术要求': 'requirement',
'响应': 'response_type',
'类型': 'response_type'
}
columns = {}
for col in df.columns:
for key, value in column_mapping.items():
if key in str(col):
columns[value] = col
break
if 'requirement' not in columns:
raise ValueError("偏离表缺少必要的列:要求")
items = []
for i, row in df.iterrows():
if pd.isna(row[columns['requirement']]):
continue
item = DeviationItem(
requirement=str(row[columns['requirement']]).strip(),
response_type=str(row.get(columns.get('response_type', ''), '正偏离')).strip(),
chapter_id=f"deviation_{i+1:02d}"
)
items.append(item)
return items
def _parse_word_table_deviation(self, file_path: str) -> List[DeviationItem]:
"""解析Word表格中的偏离项"""
doc = Document(file_path)
items = []
for table in doc.tables:
if len(table.rows) < 2:
continue
headers = [cell.text.strip() for cell in table.rows[0].cells]
req_col = resp_col = None
for i, header in enumerate(headers):
if '要求' in header:
req_col = i
elif '响应' in header or '类型' in header:
resp_col = i
if req_col is None:
continue
for j, row in enumerate(table.rows[1:], 1):
cells = [cell.text.strip() for cell in row.cells]
if len(cells) <= req_col:
continue
item = DeviationItem(
requirement=cells[req_col],
response_type=cells[resp_col] if resp_col and len(cells) > resp_col else "正偏离",
chapter_id=f"deviation_{j:02d}"
)
items.append(item)
return items
def _parse_template_file(self, file_path: str) -> List[DocumentChapter]:
"""解析Word模板文件提取章节结构"""
doc = Document(file_path)
chapters = []
for i, paragraph in enumerate(doc.paragraphs):
if paragraph.style.name.startswith('Heading'):
level = int(paragraph.style.name.split()[-1]) if paragraph.style.name.split()[-1].isdigit() else 1
chapter = DocumentChapter(
id=f"template_chapter_{i+1:02d}",
title=paragraph.text.strip(),
level=level,
template_placeholder=f"{{{{chapter_{i+1:02d}_content}}}}"
)
chapters.append(chapter)
return chapters