from __future__ import annotations import argparse import re from collections import Counter from pathlib import Path from typing import Any from docx_ops_lib import QueryError, read_json, write_json ILLEGAL_LEAF_TITLES = { "技术方案", "服务方案", "实施方案", "服务保障及措施", "售后服务和质保期服务计划", "项目理解", "解决方案", "系统设计", "平台建设方案", "系统建设方案", "总体方案", "培训方案", "运维方案", } TECHNICAL_ROOT_TITLES = { "技术标目录", "技术目录", "技术部分目录", "技术方案", "服务方案", "实施方案", "服务保障及措施", "售后服务和质保期服务计划", } BUSINESS_ROOT_TITLES = { "商务及其他目录", "商务目录", "商务部分目录", "商务及其他部分目录", } TECHNICAL_PLACEHOLDER_TITLES = { "技术标内容详见技术标目录版", "技术部分详见技术标", "技术部分", "技术标", "技术方案", "服务方案", "实施方案", } GENERIC_TECHNICAL_PATTERNS = ( r"^(技术|总体技术|总体|项目|整体)?方案$", r"^(服务|运维|培训|实施|部署|测试|验收|应急|保障)(方案|计划|措施)?$", r"^(系统|平台|架构|设计)(方案|设计|建设方案)?$", r"^(项目理解|解决方案|系统设计|总体架构|建设内容|功能设计|集成方案|响应方案|管理方案)$", r"^(总体设计方案|总体实施方案|总体服务方案)$", ) OBJECT_HINTS = ( "子系统", "模块", "设备", "接口", "功能", "单元", "终端", "节点", "链路", "数据库", "中间件", "服务器", "存储", "网络", "点位", "机房", "服务项", "清单", ) MANAGEMENT_HINTS = ( "原则", "目标", "思路", "策略", "组织", "保障", "计划", "流程", "机制", "措施", "培训", "验收", "测试", "应急", "运维", "服务", "售后", "响应", "巡检", "维护", "风险", ) STEM_SUFFIX_PATTERN = re.compile( r"(总体|项目|技术|系统|平台|服务|实施|运维|售后|培训|测试|验收|保障|管理|响应|交付|部署)?" r"(方案|计划|步骤|措施|机制|说明|内容|设计|建设|保障)?$" ) def _normalize_heading(text: str) -> str: compact = re.sub(r"\s+", "", text or "") compact = re.sub(r"^[一二三四五六七八九十0-9]+[、\..]\s*", "", compact) compact = re.sub(r"^\(?[0-9一二三四五六七八九十]+\)?\s*", "", compact) compact = re.sub(r"^[0-9]+(\.[0-9]+)*\s*", "", compact) return compact def _issue(issues: list[dict[str, Any]], issue_type: str, path: list[str], message: str) -> None: issues.append({"type": issue_type, "path": " > ".join(path), "message": message}) def _is_heading(block: dict[str, Any]) -> bool: return block.get("type", "heading") == "heading" def _heading_children(children: list[Any]) -> list[dict[str, Any]]: return [child for child in children if isinstance(child, dict) and _is_heading(child)] def _is_technical_context(path: list[str]) -> bool: return any(_normalize_heading(part) in TECHNICAL_ROOT_TITLES for part in path) def _is_business_context(path: list[str]) -> bool: return any(_normalize_heading(part) in BUSINESS_ROOT_TITLES for part in path) def _technical_depth(path: list[str]) -> int: for index, part in enumerate(path): if _normalize_heading(part) in TECHNICAL_ROOT_TITLES: return len(path) - index return 0 def _contains_object_hint(text: str) -> bool: normalized = _normalize_heading(text) if "系统" in normalized and len(normalized) > 4 and normalized not in ILLEGAL_LEAF_TITLES: return True return any(hint in normalized for hint in OBJECT_HINTS) def _looks_management_focused(text: str) -> bool: normalized = _normalize_heading(text) return not _contains_object_hint(normalized) and any(hint in normalized for hint in MANAGEMENT_HINTS) def _looks_generic_technical_heading(text: str) -> bool: normalized = _normalize_heading(text) if normalized in ILLEGAL_LEAF_TITLES: return True if _contains_object_hint(normalized): return False return any(re.search(pattern, normalized) for pattern in GENERIC_TECHNICAL_PATTERNS) def _has_object_child(children: list[dict[str, Any]]) -> bool: return any(_contains_object_hint(str(child.get("text", "")).strip()) for child in children) def _max_heading_depth(block: dict[str, Any]) -> int: children = block.get("children", []) if not isinstance(children, list): return 1 heading_children = _heading_children(children) if not heading_children: return 1 return 1 + max(_max_heading_depth(child) for child in heading_children) def _semantic_stem(text: str) -> str: normalized = _normalize_heading(text) normalized = STEM_SUFFIX_PATTERN.sub("", normalized) normalized = normalized.strip("-_()()") return normalized or _normalize_heading(text) def _duplicate_generic_stems(children: list[dict[str, Any]]) -> list[str]: stems = [ _semantic_stem(str(child.get("text", "")).strip()) for child in children if _looks_generic_technical_heading(str(child.get("text", "")).strip()) ] counts = Counter(stem for stem in stems if len(stem) >= 2) return sorted(stem for stem, count in counts.items() if count >= 2) def _normalize_policy(payload: dict[str, Any]) -> dict[str, bool]: raw_policy = payload.get("outline_policy", {}) if raw_policy is None: raw_policy = {} if not isinstance(raw_policy, dict): raise QueryError("outline_policy must be an object when provided") return { "allow_service_facets": bool(raw_policy.get("allow_service_facets", False)), "respect_fixed_structure": bool(raw_policy.get("respect_fixed_structure", False)), } def _merge_policy(raw_policy: Any, inherited_policy: dict[str, bool]) -> dict[str, bool]: if raw_policy is None: return dict(inherited_policy) if not isinstance(raw_policy, dict): raise QueryError("policy must be an object when provided on a heading block") return { "allow_service_facets": bool(raw_policy.get("allow_service_facets", inherited_policy["allow_service_facets"])), "respect_fixed_structure": bool(raw_policy.get("respect_fixed_structure", inherited_policy["respect_fixed_structure"])), } def _parse_heading_level( block: dict[str, Any], path: list[str], issues: list[dict[str, Any]], *, parent_level: int | None, ) -> int | None: raw_level = block.get("level") if not isinstance(raw_level, int): _issue(issues, "invalid_heading_level", path, "heading level must be an integer between 1 and 9") return None if raw_level < 1 or raw_level > 9: _issue(issues, "invalid_heading_level", path, "heading level must be between 1 and 9") return None if parent_level is None: if raw_level != 1: _issue(issues, "invalid_root_heading_level", path, "top-level heading must use level 1") elif raw_level != parent_level + 1: _issue( issues, "invalid_heading_hierarchy", path, f"child heading level must be parent level + 1; expected {parent_level + 1}, got {raw_level}", ) return raw_level def _check_technical_depth(blocks: list[dict[str, Any]], issues: list[dict[str, Any]], policy: dict[str, bool]) -> None: for block in blocks: if not isinstance(block, dict) or not _is_heading(block): continue root_text = str(block.get("text", "")).strip() if _normalize_heading(root_text) not in TECHNICAL_ROOT_TITLES: continue root_children = block.get("children", []) if not isinstance(root_children, list): continue branch_children = _heading_children(root_children) if not branch_children: _issue( issues, "technical_outline_too_shallow", [root_text], "technical outline must include at least one level-2 branch under the root", ) continue for child in branch_children: child_text = str(child.get("text", "")).strip() branch_path = [root_text, child_text] branch_depth = _max_heading_depth(child) branch_policy = _merge_policy(child.get("policy"), policy) if branch_policy["respect_fixed_structure"] and branch_depth < 2: continue if branch_depth < 2: _issue( issues, "technical_branch_too_shallow", branch_path, f"technical branch '{child_text}' must reach at least level 3", ) def _walk_blocks( blocks: list[dict[str, Any]], path: list[str], issues: list[dict[str, Any]], policy: dict[str, bool], parent_level: int | None = None, ) -> None: for index, block in enumerate(blocks): if not isinstance(block, dict): _issue(issues, "invalid_block", path + [str(index)], "block must be an object") continue text = str(block.get("text", "")).strip() block_type = block.get("type", "heading") children = block.get("children", []) current_path = path + ([text] if text else [str(index)]) if block_type != "heading": continue current_policy = _merge_policy(block.get("policy"), policy) current_level = _parse_heading_level(block, current_path, issues, parent_level=parent_level) if text in ILLEGAL_LEAF_TITLES and not children: _issue( issues, "illegal_leaf", current_path, f"abstract heading '{text}' cannot be a leaf", ) if children and not isinstance(children, list): _issue(issues, "invalid_children", current_path, "children must be a list") continue if not isinstance(children, list): continue direct_heading_children = _heading_children(children) normalized = _normalize_heading(text) in_technical_context = _is_technical_context(current_path) in_business_context = _is_business_context(current_path) if in_business_context and normalized in TECHNICAL_PLACEHOLDER_TITLES and direct_heading_children: _issue( issues, "business_technical_placeholder_expanded", current_path, f"business outline technical placeholder '{text}' must remain a single placeholder node", ) if in_technical_context: technical_depth = _technical_depth(current_path) is_generic_heading = _looks_generic_technical_heading(text) allow_service_facets = current_policy["allow_service_facets"] allow_fixed_structure = current_policy["respect_fixed_structure"] if is_generic_heading and normalized not in ILLEGAL_LEAF_TITLES and not direct_heading_children: _issue( issues, "generic_technical_leaf", current_path, f"technical heading '{text}' is still too generic to write from directly", ) if is_generic_heading and len(direct_heading_children) == 1: _issue( issues, "single_child_breakdown", current_path, f"technical heading '{text}' cannot be expanded with only one direct child", ) if ( is_generic_heading and direct_heading_children and not allow_service_facets and not allow_fixed_structure and not _has_object_child(direct_heading_children) ): _issue( issues, "missing_object_breakdown", current_path, f"technical heading '{text}' must include at least one object/module/subsystem oriented child", ) duplicate_stems = _duplicate_generic_stems(direct_heading_children) if duplicate_stems: joined = ", ".join(duplicate_stems) _issue( issues, "duplicate_technical_facets", current_path, f"technical heading '{text}' has repeated generic child facets: {joined}", ) if ( technical_depth >= 3 and not direct_heading_children and not allow_service_facets and _looks_management_focused(text) ): _issue( issues, "management_leaf_too_generic", current_path, f"management-style leaf '{text}' is too generic; refine it to an object or concrete deliverable", ) if technical_depth == 2 and direct_heading_children: if ( not allow_service_facets and not allow_fixed_structure and all(_looks_management_focused(str(child.get("text", "")).strip()) for child in direct_heading_children) ): _issue( issues, "top_branch_missing_object_nodes", current_path, f"technical branch '{text}' is expanded only by management facets; add module/subsystem/device oriented nodes", ) _walk_blocks(direct_heading_children, current_path, issues, current_policy, current_level) def check_outline(payload: dict[str, Any]) -> dict[str, Any]: blocks = payload.get("blocks", []) if not isinstance(blocks, list): raise QueryError("blocks must be a list") policy = _normalize_policy(payload) issues: list[dict[str, Any]] = [] _walk_blocks(blocks, [], issues, policy) _check_technical_depth(blocks, issues, policy) return { "status": "ok" if not issues else "failed", "issue_count": len(issues), "issues": issues, } def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--outline-file", required=True) parser.add_argument("--report", required=True) args = parser.parse_args() payload = read_json(Path(args.outline_file).resolve()) report = check_outline(payload) write_json(Path(args.report).resolve(), report) if __name__ == "__main__": main()