Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
582 lines
22 KiB
Python
582 lines
22 KiB
Python
"""招标文件解析器
|
||
|
||
解析评分要求表格、偏离表要求,结合Word模板生成标书结构。
|
||
支持Excel、CSV、Word表格格式。
|
||
"""
|
||
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Any, List
|
||
from enum import Enum
|
||
|
||
import pandas as pd
|
||
from docx import Document
|
||
from pydantic import BaseModel, Field
|
||
|
||
from ..config import get_settings
|
||
from ..config.prompt_manager import get_prompt_manager
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TechnicalCategory(str, Enum):
|
||
"""评分类别"""
|
||
TECHNICAL_SOLUTION = "technical_solution" # 技术方案完整性
|
||
EQUIPMENT_SPEC = "equipment_spec" # 设备规格和可靠性
|
||
IMPLEMENTATION = "implementation" # 实施方案
|
||
QUALITY_SAFETY = "quality_safety" # 质量安全体系
|
||
AFTER_SALES = "after_sales" # 售后服务
|
||
COMPLIANCE = "compliance" # 合规性响应
|
||
COMMERCIAL = "commercial" # 商务条件(价格、资质、商务条款等)
|
||
OTHER = "other" # 其他类别
|
||
|
||
|
||
class ScoringCriteria(BaseModel):
|
||
"""评分标准"""
|
||
|
||
item_name: str = Field(..., description="评分项名称")
|
||
max_score: float = Field(..., description="最高分值")
|
||
description: str = Field(default="", description="评分要求描述")
|
||
category: TechnicalCategory = Field(default=TechnicalCategory.OTHER, description="技术类别")
|
||
chapter_id: str = Field(..., description="对应章节ID")
|
||
original_index: int = Field(default=0, description="在评分表中的原始出现顺序")
|
||
|
||
|
||
class DeviationItem(BaseModel):
|
||
"""偏离项"""
|
||
|
||
requirement: str = Field(..., description="招标要求")
|
||
response_type: str = Field(default="正偏离", description="响应类型")
|
||
chapter_id: str = Field(..., description="对应章节ID")
|
||
|
||
|
||
class ChapterGuidance(BaseModel):
|
||
"""章节写作提示"""
|
||
|
||
key_requirements: List[str] = Field(default_factory=list, description="关键评分要求")
|
||
suggested_evidence: List[str] = Field(default_factory=list, description="建议支撑材料")
|
||
context_snippets: List[str] = Field(default_factory=list, description="上下文摘录")
|
||
|
||
|
||
class DocumentChapter(BaseModel):
|
||
"""文档章节"""
|
||
|
||
id: str = Field(..., description="章节ID")
|
||
title: str = Field(..., description="章节标题")
|
||
level: int = Field(..., description="章节层级")
|
||
score: float | None = Field(default=None, description="评分值")
|
||
children: List['DocumentChapter'] = Field(default_factory=list, description="子章节")
|
||
template_placeholder: str | None = Field(default=None, description="模板占位符")
|
||
guidance: ChapterGuidance | None = Field(default=None, description="章节写作提示")
|
||
|
||
|
||
|
||
|
||
class BidStructure(BaseModel):
|
||
"""标书结构"""
|
||
|
||
project_name: str = Field(default="", description="项目名称")
|
||
scoring_criteria: List[ScoringCriteria] = Field(default_factory=list, description="评分标准")
|
||
deviation_items: List[DeviationItem] = Field(default_factory=list, description="偏离项")
|
||
chapters: List[DocumentChapter] = Field(default_factory=list, description="文档章节")
|
||
|
||
# 文件路径
|
||
scoring_file: str = Field(default="", description="评分要求文件路径")
|
||
deviation_file: str = Field(default="", description="偏离表文件路径")
|
||
template_file: str = Field(default="", description="模板文件路径")
|
||
|
||
# AI审查结果
|
||
structure_review: dict[str, Any] = Field(default_factory=dict, description="AI结构审查结果")
|
||
|
||
|
||
class BidParser:
|
||
"""招标文件解析器"""
|
||
|
||
def __init__(self) -> None:
|
||
self.settings = get_settings()
|
||
|
||
def parse_bid_requirements(
|
||
self,
|
||
scoring_file: str,
|
||
deviation_file: str | None = None,
|
||
template_file: str | None = None
|
||
) -> BidStructure:
|
||
"""解析招标要求并生成标书结构"""
|
||
try:
|
||
# 验证文件存在
|
||
self._validate_files(scoring_file, deviation_file, template_file)
|
||
|
||
# 创建标书结构
|
||
bid_structure = BidStructure(
|
||
scoring_file=scoring_file,
|
||
deviation_file=deviation_file or "",
|
||
template_file=template_file or ""
|
||
)
|
||
|
||
# 解析评分要求
|
||
bid_structure.scoring_criteria = self._parse_scoring_file(scoring_file)
|
||
|
||
# 解析偏离表要求(可选)
|
||
if deviation_file:
|
||
bid_structure.deviation_items = self._parse_deviation_file(deviation_file)
|
||
|
||
# 解析Word模板(可选)
|
||
if template_file:
|
||
bid_structure.chapters = self._parse_template_file(template_file)
|
||
# 注意:章节生成现在完全由TocAgent负责,不在BidParser中处理
|
||
|
||
|
||
return bid_structure
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析招标要求失败: {e}")
|
||
raise
|
||
|
||
def _validate_files(self, scoring_file: str, deviation_file: str | None, template_file: str | None) -> None:
|
||
"""验证文件存在"""
|
||
# 评分文件是必需的
|
||
if not Path(scoring_file).exists():
|
||
raise FileNotFoundError(f"评分要求文件不存在: {scoring_file}")
|
||
|
||
# 偏离表和模板文件是可选的
|
||
if deviation_file and not Path(deviation_file).exists():
|
||
raise FileNotFoundError(f"偏离表文件不存在: {deviation_file}")
|
||
|
||
if template_file and not Path(template_file).exists():
|
||
raise FileNotFoundError(f"模板文件不存在: {template_file}")
|
||
|
||
def _parse_scoring_file(self, file_path: str) -> List[ScoringCriteria]:
|
||
"""解析评分要求文件"""
|
||
try:
|
||
file_path_obj = Path(file_path)
|
||
suffix = file_path_obj.suffix.lower()
|
||
|
||
if suffix in ['.xlsx', '.xls']:
|
||
return self._parse_excel_scoring(file_path)
|
||
elif suffix == '.csv':
|
||
return self._parse_csv_scoring(file_path)
|
||
elif suffix == '.docx':
|
||
return self._parse_word_table_scoring(file_path)
|
||
else:
|
||
raise ValueError(f"不支持的评分文件格式: {suffix}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析评分文件失败: {e}")
|
||
raise
|
||
|
||
def _parse_excel_scoring(self, file_path: str) -> List[ScoringCriteria]:
|
||
"""解析Excel评分文件"""
|
||
df = pd.read_excel(file_path)
|
||
return self._parse_dataframe_scoring(df)
|
||
|
||
def _parse_csv_scoring(self, file_path: str) -> List[ScoringCriteria]:
|
||
"""解析CSV评分文件"""
|
||
df = pd.read_csv(file_path, encoding='utf-8-sig')
|
||
return self._parse_dataframe_scoring(df)
|
||
|
||
def _parse_dataframe_scoring(self, df: pd.DataFrame) -> List[ScoringCriteria]:
|
||
"""从DataFrame解析评分标准的公共逻辑"""
|
||
# 预期的列名(支持中英文)
|
||
column_mapping = {
|
||
'评分项': 'item_name',
|
||
'评分项目': 'item_name',
|
||
'项目': 'item_name',
|
||
'分值': 'max_score',
|
||
'最高分': 'max_score',
|
||
'满分': 'max_score',
|
||
'描述': 'description',
|
||
'要求': 'description',
|
||
'评分要求': 'description'
|
||
}
|
||
|
||
# 找到对应的列
|
||
columns = {}
|
||
for col in df.columns:
|
||
for key, value in column_mapping.items():
|
||
if key in str(col):
|
||
columns[value] = col
|
||
break
|
||
|
||
if 'item_name' not in columns or 'max_score' not in columns:
|
||
raise ValueError("文件缺少必要的列:评分项、分值")
|
||
|
||
criteria = []
|
||
for i, row in df.iterrows():
|
||
if pd.isna(row[columns['item_name']]):
|
||
continue
|
||
|
||
# 安全地处理max_score,防止NaN或None值
|
||
max_score_raw = row[columns['max_score']]
|
||
try:
|
||
max_score = float(max_score_raw) if pd.notna(max_score_raw) else 0.0
|
||
except (ValueError, TypeError):
|
||
max_score = 0.0
|
||
logger.warning(f"无法解析评分项'{row[columns['item_name']]}'的分值: {max_score_raw}")
|
||
|
||
criterion = ScoringCriteria(
|
||
item_name=str(row[columns['item_name']]).strip(),
|
||
max_score=max_score,
|
||
description=str(row.get(columns.get('description', ''), '')).strip(),
|
||
category=TechnicalCategory.OTHER, # 表格解析默认为OTHER,AI会重新分类
|
||
chapter_id=f"chapter_{i+1:02d}"
|
||
)
|
||
criteria.append(criterion)
|
||
|
||
return criteria
|
||
|
||
def _parse_word_table_scoring(self, file_path: str) -> List[ScoringCriteria]:
|
||
"""使用AI解析Word文档中的评分表格"""
|
||
doc = Document(file_path)
|
||
criteria = []
|
||
|
||
for table in doc.tables:
|
||
if len(table.rows) < 2:
|
||
continue
|
||
|
||
# 提取表格内容为文本
|
||
table_text = self._extract_table_text(table)
|
||
|
||
# 识别表格类型
|
||
table_type = self._identify_table_type(table_text)
|
||
|
||
if table_type == "scoring":
|
||
# 使用AI解析评分表格
|
||
ai_results = self._ai_parse_scoring_table(table_text)
|
||
if ai_results:
|
||
criteria.extend(ai_results)
|
||
|
||
return criteria
|
||
|
||
def extract_table_text(self, table) -> str:
|
||
"""提取表格内容为文本格式,处理合并单元格"""
|
||
lines = []
|
||
|
||
# 获取表格的基本信息
|
||
max_cols = max(len(row.cells) for row in table.rows) if table.rows else 0
|
||
|
||
for i, row in enumerate(table.rows):
|
||
cells = []
|
||
for j in range(max_cols):
|
||
if j < len(row.cells):
|
||
cell_text = row.cells[j].text.strip()
|
||
# 处理空单元格
|
||
if not cell_text:
|
||
cell_text = "[空]"
|
||
cells.append(cell_text)
|
||
else:
|
||
cells.append("[空]")
|
||
|
||
# 使用制表符分隔,便于AI理解
|
||
line = "\t".join(cells)
|
||
lines.append(f"行{i+1}: {line}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _ai_parse_scoring_table(self, table_text: str) -> List[ScoringCriteria]:
|
||
"""使用AI解析评分表格"""
|
||
try:
|
||
prompt_manager = get_prompt_manager()
|
||
prompt = prompt_manager.get_parser_prompt("parse_scoring_table", table_text=table_text)
|
||
|
||
# 调用LLM API
|
||
response = self.call_llm(prompt)
|
||
|
||
if not response:
|
||
raise ValueError("AI解析表格失败:无响应")
|
||
|
||
# 解析AI响应
|
||
try:
|
||
|
||
if not response or response.strip() == "":
|
||
logger.error("AI返回空响应")
|
||
return []
|
||
|
||
# 使用统一的JSON解析方法
|
||
from ..nodes.toc.llm_helper import LLMHelper
|
||
result_data = LLMHelper.parse_ai_json_response(response)
|
||
|
||
scoring_data = result_data.get("scoring_criteria", [])
|
||
|
||
criteria = []
|
||
for i, item in enumerate(scoring_data):
|
||
# 验证和转换类别
|
||
category = item.get("category", "other")
|
||
try:
|
||
category_enum = TechnicalCategory(category)
|
||
except ValueError:
|
||
category_enum = TechnicalCategory.OTHER
|
||
|
||
# 安全地处理max_score,防止None值
|
||
max_score_raw = item.get("max_score", 0)
|
||
try:
|
||
max_score = float(max_score_raw) if max_score_raw is not None else 0.0
|
||
except (ValueError, TypeError):
|
||
max_score = 0.0
|
||
logger.warning(f"无法解析评分项'{item.get('item_name', '')}'的分值: {max_score_raw}")
|
||
|
||
criterion = ScoringCriteria(
|
||
item_name=item.get("item_name", ""),
|
||
max_score=max_score,
|
||
description=item.get("description", ""),
|
||
category=category_enum,
|
||
chapter_id=f"chapter_{i+1:02d}",
|
||
original_index=i # 保存在评分表中的原始出现顺序
|
||
)
|
||
criteria.append(criterion)
|
||
|
||
return criteria
|
||
|
||
except (ValueError, KeyError) as e:
|
||
logger.error(f"解析AI响应失败: {e}")
|
||
return []
|
||
|
||
except Exception as e:
|
||
logger.error(f"AI解析表格失败: {e}")
|
||
return []
|
||
|
||
def call_llm(self, prompt: str) -> str | None:
|
||
"""公共方法:调用LLM API"""
|
||
from ..nodes.toc.llm_helper import LLMHelper
|
||
return LLMHelper.call_llm_with_retry(prompt, max_retries=1)
|
||
|
||
def _identify_table_type(self, table_text: str) -> str:
|
||
"""使用AI识别表格类型"""
|
||
prompt_manager = get_prompt_manager()
|
||
prompt = prompt_manager.get_parser_prompt("identify_table_type", table_text=table_text)
|
||
|
||
response = self.call_llm(prompt)
|
||
if not response:
|
||
raise ValueError("AI识别表格类型失败:无响应")
|
||
|
||
result = response.strip().lower()
|
||
if result not in ["scoring", "deviation", "other"]:
|
||
raise ValueError(f"AI返回了无效的表格类型: {result}")
|
||
|
||
return result
|
||
|
||
def parse_single_word_document(self, file_path: str) -> tuple[List[ScoringCriteria], List[DeviationItem]]:
|
||
"""解析包含评分表和偏离表的单个Word文档"""
|
||
try:
|
||
doc = Document(file_path)
|
||
scoring_criteria = []
|
||
deviation_items = []
|
||
|
||
for table in doc.tables:
|
||
if len(table.rows) < 2:
|
||
continue
|
||
|
||
# 提取表格内容为文本
|
||
table_text = self.extract_table_text(table)
|
||
|
||
# 识别表格类型
|
||
table_type = self._identify_table_type(table_text)
|
||
|
||
if table_type == "scoring":
|
||
# 解析评分表
|
||
ai_results = self._ai_parse_scoring_table(table_text)
|
||
if ai_results:
|
||
scoring_criteria.extend(ai_results)
|
||
|
||
elif table_type == "deviation":
|
||
# 解析偏离表
|
||
deviation_results = self._ai_parse_deviation_table(table_text)
|
||
if deviation_results:
|
||
deviation_items.extend(deviation_results)
|
||
|
||
return scoring_criteria, deviation_items
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析Word文档失败: {e}")
|
||
return [], []
|
||
|
||
def parse_word_with_filter(self, file_path: str) -> tuple[List[ScoringCriteria], List[ScoringCriteria], List[DeviationItem]]:
|
||
"""解析Word文档,返回技术部分、商务部分和偏离项"""
|
||
try:
|
||
doc = Document(file_path)
|
||
all_criteria = []
|
||
deviation_items = []
|
||
|
||
for table in doc.tables:
|
||
if len(table.rows) < 2:
|
||
continue
|
||
|
||
# 提取表格内容为文本
|
||
table_text = self.extract_table_text(table)
|
||
|
||
# 识别表格类型
|
||
table_type = self._identify_table_type(table_text)
|
||
|
||
if table_type == "scoring":
|
||
# 解析评分表
|
||
ai_results = self._ai_parse_scoring_table(table_text)
|
||
if ai_results:
|
||
all_criteria.extend(ai_results)
|
||
|
||
elif table_type == "deviation":
|
||
# 解析偏离表
|
||
deviation_results = self._ai_parse_deviation_table(table_text)
|
||
if deviation_results:
|
||
deviation_items.extend(deviation_results)
|
||
|
||
# 分离技术和商务部分
|
||
technical_criteria = []
|
||
commercial_criteria = []
|
||
|
||
for criteria in all_criteria:
|
||
if criteria.category == TechnicalCategory.COMMERCIAL:
|
||
commercial_criteria.append(criteria)
|
||
else:
|
||
technical_criteria.append(criteria)
|
||
|
||
return technical_criteria, commercial_criteria, deviation_items
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析Word文档失败: {e}")
|
||
return [], [], []
|
||
|
||
def _ai_parse_deviation_table(self, table_text: str) -> List[DeviationItem]:
|
||
"""使用AI解析偏离表格"""
|
||
try:
|
||
prompt_manager = get_prompt_manager()
|
||
prompt = prompt_manager.get_parser_prompt("parse_deviation_table", table_text=table_text)
|
||
|
||
response = self.call_llm(prompt)
|
||
if not response:
|
||
raise ValueError("AI解析偏离表失败:无响应")
|
||
|
||
# 解析AI响应
|
||
try:
|
||
from ..nodes.toc.llm_helper import LLMHelper
|
||
result_data = LLMHelper.parse_ai_json_response(response)
|
||
deviation_data = result_data.get("deviation_items", [])
|
||
|
||
items = []
|
||
for i, item in enumerate(deviation_data):
|
||
deviation_item = DeviationItem(
|
||
requirement=item.get("requirement", ""),
|
||
response_type=item.get("response_type", "正偏离"),
|
||
chapter_id=f"deviation_{i+1:02d}"
|
||
)
|
||
items.append(deviation_item)
|
||
|
||
return items
|
||
|
||
except (ValueError, KeyError) as e:
|
||
logger.error(f"解析偏离表AI响应失败: {e}")
|
||
return []
|
||
|
||
except Exception as e:
|
||
logger.error(f"AI解析偏离表失败: {e}")
|
||
return []
|
||
|
||
def _parse_deviation_file(self, file_path: str) -> List[DeviationItem]:
|
||
"""解析偏离表要求文件"""
|
||
try:
|
||
file_path_obj = Path(file_path)
|
||
suffix = file_path_obj.suffix.lower()
|
||
|
||
if suffix in ['.xlsx', '.xls']:
|
||
df = pd.read_excel(file_path)
|
||
elif suffix == '.csv':
|
||
df = pd.read_csv(file_path, encoding='utf-8-sig')
|
||
elif suffix == '.docx':
|
||
return self._parse_word_table_deviation(file_path)
|
||
else:
|
||
raise ValueError(f"不支持的偏离文件格式: {suffix}")
|
||
|
||
return self._parse_deviation_from_df(df)
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析偏离文件失败: {e}")
|
||
raise
|
||
|
||
def _parse_deviation_from_df(self, df: pd.DataFrame) -> List[DeviationItem]:
|
||
"""从DataFrame解析偏离项"""
|
||
column_mapping = {
|
||
'要求': 'requirement',
|
||
'招标要求': 'requirement',
|
||
'技术要求': 'requirement',
|
||
'响应': 'response_type',
|
||
'类型': 'response_type'
|
||
}
|
||
|
||
columns = {}
|
||
for col in df.columns:
|
||
for key, value in column_mapping.items():
|
||
if key in str(col):
|
||
columns[value] = col
|
||
break
|
||
|
||
if 'requirement' not in columns:
|
||
raise ValueError("偏离表缺少必要的列:要求")
|
||
|
||
items = []
|
||
for i, row in df.iterrows():
|
||
if pd.isna(row[columns['requirement']]):
|
||
continue
|
||
|
||
item = DeviationItem(
|
||
requirement=str(row[columns['requirement']]).strip(),
|
||
response_type=str(row.get(columns.get('response_type', ''), '正偏离')).strip(),
|
||
chapter_id=f"deviation_{i+1:02d}"
|
||
)
|
||
items.append(item)
|
||
|
||
return items
|
||
|
||
def _parse_word_table_deviation(self, file_path: str) -> List[DeviationItem]:
|
||
"""解析Word表格中的偏离项"""
|
||
doc = Document(file_path)
|
||
items = []
|
||
|
||
for table in doc.tables:
|
||
if len(table.rows) < 2:
|
||
continue
|
||
|
||
headers = [cell.text.strip() for cell in table.rows[0].cells]
|
||
req_col = resp_col = None
|
||
|
||
for i, header in enumerate(headers):
|
||
if '要求' in header:
|
||
req_col = i
|
||
elif '响应' in header or '类型' in header:
|
||
resp_col = i
|
||
|
||
if req_col is None:
|
||
continue
|
||
|
||
for j, row in enumerate(table.rows[1:], 1):
|
||
cells = [cell.text.strip() for cell in row.cells]
|
||
|
||
if len(cells) <= req_col:
|
||
continue
|
||
|
||
item = DeviationItem(
|
||
requirement=cells[req_col],
|
||
response_type=cells[resp_col] if resp_col and len(cells) > resp_col else "正偏离",
|
||
chapter_id=f"deviation_{j:02d}"
|
||
)
|
||
items.append(item)
|
||
|
||
return items
|
||
|
||
def _parse_template_file(self, file_path: str) -> List[DocumentChapter]:
|
||
"""解析Word模板文件,提取章节结构"""
|
||
doc = Document(file_path)
|
||
chapters = []
|
||
|
||
for i, paragraph in enumerate(doc.paragraphs):
|
||
if paragraph.style.name.startswith('Heading'):
|
||
level = int(paragraph.style.name.split()[-1]) if paragraph.style.name.split()[-1].isdigit() else 1
|
||
|
||
chapter = DocumentChapter(
|
||
id=f"template_chapter_{i+1:02d}",
|
||
title=paragraph.text.strip(),
|
||
level=level,
|
||
template_placeholder=f"{{{{chapter_{i+1:02d}_content}}}}"
|
||
)
|
||
chapters.append(chapter)
|
||
|
||
return chapters
|
||
|