439 lines
15 KiB
Python
439 lines
15 KiB
Python
from __future__ import annotations
|
||
|
||
import argparse
|
||
import re
|
||
from collections import Counter
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from docx_ops_lib import QueryError, read_json, write_json
|
||
|
||
ILLEGAL_LEAF_TITLES = {
|
||
"技术方案",
|
||
"服务方案",
|
||
"实施方案",
|
||
"服务保障及措施",
|
||
"售后服务和质保期服务计划",
|
||
"项目理解",
|
||
"解决方案",
|
||
"系统设计",
|
||
"平台建设方案",
|
||
"系统建设方案",
|
||
"总体方案",
|
||
"培训方案",
|
||
"运维方案",
|
||
}
|
||
|
||
TECHNICAL_ROOT_TITLES = {
|
||
"技术标目录",
|
||
"技术目录",
|
||
"技术部分目录",
|
||
"技术方案",
|
||
"服务方案",
|
||
"实施方案",
|
||
"服务保障及措施",
|
||
"售后服务和质保期服务计划",
|
||
}
|
||
|
||
BUSINESS_ROOT_TITLES = {
|
||
"商务及其他目录",
|
||
"商务目录",
|
||
"商务部分目录",
|
||
"商务及其他部分目录",
|
||
}
|
||
|
||
TECHNICAL_PLACEHOLDER_TITLES = {
|
||
"技术标内容详见技术标目录版",
|
||
"技术部分详见技术标",
|
||
"技术部分",
|
||
"技术标",
|
||
"技术方案",
|
||
"服务方案",
|
||
"实施方案",
|
||
}
|
||
|
||
GENERIC_TECHNICAL_PATTERNS = (
|
||
r"^(技术|总体技术|总体|项目|整体)?方案$",
|
||
r"^(服务|运维|培训|实施|部署|测试|验收|应急|保障)(方案|计划|措施)?$",
|
||
r"^(系统|平台|架构|设计)(方案|设计|建设方案)?$",
|
||
r"^(项目理解|解决方案|系统设计|总体架构|建设内容|功能设计|集成方案|响应方案|管理方案)$",
|
||
r"^(总体设计方案|总体实施方案|总体服务方案)$",
|
||
)
|
||
|
||
OBJECT_HINTS = (
|
||
"子系统",
|
||
"模块",
|
||
"设备",
|
||
"接口",
|
||
"功能",
|
||
"单元",
|
||
"终端",
|
||
"节点",
|
||
"链路",
|
||
"数据库",
|
||
"中间件",
|
||
"服务器",
|
||
"存储",
|
||
"网络",
|
||
"点位",
|
||
"机房",
|
||
"服务项",
|
||
"清单",
|
||
)
|
||
|
||
MANAGEMENT_HINTS = (
|
||
"原则",
|
||
"目标",
|
||
"思路",
|
||
"策略",
|
||
"组织",
|
||
"保障",
|
||
"计划",
|
||
"流程",
|
||
"机制",
|
||
"措施",
|
||
"培训",
|
||
"验收",
|
||
"测试",
|
||
"应急",
|
||
"运维",
|
||
"服务",
|
||
"售后",
|
||
"响应",
|
||
"巡检",
|
||
"维护",
|
||
"风险",
|
||
)
|
||
|
||
STEM_SUFFIX_PATTERN = re.compile(
|
||
r"(总体|项目|技术|系统|平台|服务|实施|运维|售后|培训|测试|验收|保障|管理|响应|交付|部署)?"
|
||
r"(方案|计划|步骤|措施|机制|说明|内容|设计|建设|保障)?$"
|
||
)
|
||
|
||
def _normalize_heading(text: str) -> str:
|
||
compact = re.sub(r"\s+", "", text or "")
|
||
compact = re.sub(r"^[一二三四五六七八九十0-9]+[、\..]\s*", "", compact)
|
||
compact = re.sub(r"^\(?[0-9一二三四五六七八九十]+\)?\s*", "", compact)
|
||
compact = re.sub(r"^[0-9]+(\.[0-9]+)*\s*", "", compact)
|
||
return compact
|
||
|
||
|
||
def _issue(issues: list[dict[str, Any]], issue_type: str, path: list[str], message: str) -> None:
|
||
issues.append({"type": issue_type, "path": " > ".join(path), "message": message})
|
||
|
||
|
||
def _is_heading(block: dict[str, Any]) -> bool:
|
||
return block.get("type", "heading") == "heading"
|
||
|
||
|
||
def _heading_children(children: list[Any]) -> list[dict[str, Any]]:
|
||
return [child for child in children if isinstance(child, dict) and _is_heading(child)]
|
||
|
||
|
||
def _is_technical_context(path: list[str]) -> bool:
|
||
return any(_normalize_heading(part) in TECHNICAL_ROOT_TITLES for part in path)
|
||
|
||
|
||
def _is_business_context(path: list[str]) -> bool:
|
||
return any(_normalize_heading(part) in BUSINESS_ROOT_TITLES for part in path)
|
||
|
||
|
||
def _technical_depth(path: list[str]) -> int:
|
||
for index, part in enumerate(path):
|
||
if _normalize_heading(part) in TECHNICAL_ROOT_TITLES:
|
||
return len(path) - index
|
||
return 0
|
||
|
||
|
||
def _contains_object_hint(text: str) -> bool:
|
||
normalized = _normalize_heading(text)
|
||
if "系统" in normalized and len(normalized) > 4 and normalized not in ILLEGAL_LEAF_TITLES:
|
||
return True
|
||
return any(hint in normalized for hint in OBJECT_HINTS)
|
||
|
||
|
||
def _looks_management_focused(text: str) -> bool:
|
||
normalized = _normalize_heading(text)
|
||
return not _contains_object_hint(normalized) and any(hint in normalized for hint in MANAGEMENT_HINTS)
|
||
|
||
|
||
def _looks_generic_technical_heading(text: str) -> bool:
|
||
normalized = _normalize_heading(text)
|
||
if normalized in ILLEGAL_LEAF_TITLES:
|
||
return True
|
||
if _contains_object_hint(normalized):
|
||
return False
|
||
return any(re.search(pattern, normalized) for pattern in GENERIC_TECHNICAL_PATTERNS)
|
||
|
||
|
||
def _has_object_child(children: list[dict[str, Any]]) -> bool:
|
||
return any(_contains_object_hint(str(child.get("text", "")).strip()) for child in children)
|
||
|
||
|
||
def _max_heading_depth(block: dict[str, Any]) -> int:
|
||
children = block.get("children", [])
|
||
if not isinstance(children, list):
|
||
return 1
|
||
heading_children = _heading_children(children)
|
||
if not heading_children:
|
||
return 1
|
||
return 1 + max(_max_heading_depth(child) for child in heading_children)
|
||
|
||
|
||
def _semantic_stem(text: str) -> str:
|
||
normalized = _normalize_heading(text)
|
||
normalized = STEM_SUFFIX_PATTERN.sub("", normalized)
|
||
normalized = normalized.strip("-_()()")
|
||
return normalized or _normalize_heading(text)
|
||
|
||
|
||
def _duplicate_generic_stems(children: list[dict[str, Any]]) -> list[str]:
|
||
stems = [
|
||
_semantic_stem(str(child.get("text", "")).strip())
|
||
for child in children
|
||
if _looks_generic_technical_heading(str(child.get("text", "")).strip())
|
||
]
|
||
counts = Counter(stem for stem in stems if len(stem) >= 2)
|
||
return sorted(stem for stem, count in counts.items() if count >= 2)
|
||
|
||
|
||
def _normalize_policy(payload: dict[str, Any]) -> dict[str, bool]:
|
||
raw_policy = payload.get("outline_policy", {})
|
||
if raw_policy is None:
|
||
raw_policy = {}
|
||
if not isinstance(raw_policy, dict):
|
||
raise QueryError("outline_policy must be an object when provided")
|
||
return {
|
||
"allow_service_facets": bool(raw_policy.get("allow_service_facets", False)),
|
||
"respect_fixed_structure": bool(raw_policy.get("respect_fixed_structure", False)),
|
||
}
|
||
|
||
|
||
def _merge_policy(raw_policy: Any, inherited_policy: dict[str, bool]) -> dict[str, bool]:
|
||
if raw_policy is None:
|
||
return dict(inherited_policy)
|
||
if not isinstance(raw_policy, dict):
|
||
raise QueryError("policy must be an object when provided on a heading block")
|
||
return {
|
||
"allow_service_facets": bool(raw_policy.get("allow_service_facets", inherited_policy["allow_service_facets"])),
|
||
"respect_fixed_structure": bool(raw_policy.get("respect_fixed_structure", inherited_policy["respect_fixed_structure"])),
|
||
}
|
||
|
||
|
||
def _parse_heading_level(
|
||
block: dict[str, Any],
|
||
path: list[str],
|
||
issues: list[dict[str, Any]],
|
||
*,
|
||
parent_level: int | None,
|
||
) -> int | None:
|
||
raw_level = block.get("level")
|
||
if not isinstance(raw_level, int):
|
||
_issue(issues, "invalid_heading_level", path, "heading level must be an integer between 1 and 9")
|
||
return None
|
||
if raw_level < 1 or raw_level > 9:
|
||
_issue(issues, "invalid_heading_level", path, "heading level must be between 1 and 9")
|
||
return None
|
||
if parent_level is None:
|
||
if raw_level != 1:
|
||
_issue(issues, "invalid_root_heading_level", path, "top-level heading must use level 1")
|
||
elif raw_level != parent_level + 1:
|
||
_issue(
|
||
issues,
|
||
"invalid_heading_hierarchy",
|
||
path,
|
||
f"child heading level must be parent level + 1; expected {parent_level + 1}, got {raw_level}",
|
||
)
|
||
return raw_level
|
||
|
||
|
||
def _check_technical_depth(blocks: list[dict[str, Any]], issues: list[dict[str, Any]], policy: dict[str, bool]) -> None:
|
||
for block in blocks:
|
||
if not isinstance(block, dict) or not _is_heading(block):
|
||
continue
|
||
root_text = str(block.get("text", "")).strip()
|
||
if _normalize_heading(root_text) not in TECHNICAL_ROOT_TITLES:
|
||
continue
|
||
root_children = block.get("children", [])
|
||
if not isinstance(root_children, list):
|
||
continue
|
||
branch_children = _heading_children(root_children)
|
||
if not branch_children:
|
||
_issue(
|
||
issues,
|
||
"technical_outline_too_shallow",
|
||
[root_text],
|
||
"technical outline must include at least one level-2 branch under the root",
|
||
)
|
||
continue
|
||
for child in branch_children:
|
||
child_text = str(child.get("text", "")).strip()
|
||
branch_path = [root_text, child_text]
|
||
branch_depth = _max_heading_depth(child)
|
||
branch_policy = _merge_policy(child.get("policy"), policy)
|
||
if branch_policy["respect_fixed_structure"] and branch_depth < 2:
|
||
continue
|
||
if branch_depth < 2:
|
||
_issue(
|
||
issues,
|
||
"technical_branch_too_shallow",
|
||
branch_path,
|
||
f"technical branch '{child_text}' must reach at least level 3",
|
||
)
|
||
|
||
|
||
def _walk_blocks(
|
||
blocks: list[dict[str, Any]],
|
||
path: list[str],
|
||
issues: list[dict[str, Any]],
|
||
policy: dict[str, bool],
|
||
parent_level: int | None = None,
|
||
) -> None:
|
||
for index, block in enumerate(blocks):
|
||
if not isinstance(block, dict):
|
||
_issue(issues, "invalid_block", path + [str(index)], "block must be an object")
|
||
continue
|
||
|
||
text = str(block.get("text", "")).strip()
|
||
block_type = block.get("type", "heading")
|
||
children = block.get("children", [])
|
||
current_path = path + ([text] if text else [str(index)])
|
||
|
||
if block_type != "heading":
|
||
continue
|
||
|
||
current_policy = _merge_policy(block.get("policy"), policy)
|
||
current_level = _parse_heading_level(block, current_path, issues, parent_level=parent_level)
|
||
|
||
if text in ILLEGAL_LEAF_TITLES and not children:
|
||
_issue(
|
||
issues,
|
||
"illegal_leaf",
|
||
current_path,
|
||
f"abstract heading '{text}' cannot be a leaf",
|
||
)
|
||
|
||
if children and not isinstance(children, list):
|
||
_issue(issues, "invalid_children", current_path, "children must be a list")
|
||
continue
|
||
|
||
if not isinstance(children, list):
|
||
continue
|
||
|
||
direct_heading_children = _heading_children(children)
|
||
normalized = _normalize_heading(text)
|
||
in_technical_context = _is_technical_context(current_path)
|
||
in_business_context = _is_business_context(current_path)
|
||
|
||
if in_business_context and normalized in TECHNICAL_PLACEHOLDER_TITLES and direct_heading_children:
|
||
_issue(
|
||
issues,
|
||
"business_technical_placeholder_expanded",
|
||
current_path,
|
||
f"business outline technical placeholder '{text}' must remain a single placeholder node",
|
||
)
|
||
|
||
if in_technical_context:
|
||
technical_depth = _technical_depth(current_path)
|
||
is_generic_heading = _looks_generic_technical_heading(text)
|
||
allow_service_facets = current_policy["allow_service_facets"]
|
||
allow_fixed_structure = current_policy["respect_fixed_structure"]
|
||
|
||
if is_generic_heading and normalized not in ILLEGAL_LEAF_TITLES and not direct_heading_children:
|
||
_issue(
|
||
issues,
|
||
"generic_technical_leaf",
|
||
current_path,
|
||
f"technical heading '{text}' is still too generic to write from directly",
|
||
)
|
||
|
||
if is_generic_heading and len(direct_heading_children) == 1:
|
||
_issue(
|
||
issues,
|
||
"single_child_breakdown",
|
||
current_path,
|
||
f"technical heading '{text}' cannot be expanded with only one direct child",
|
||
)
|
||
|
||
if (
|
||
is_generic_heading
|
||
and direct_heading_children
|
||
and not allow_service_facets
|
||
and not allow_fixed_structure
|
||
and not _has_object_child(direct_heading_children)
|
||
):
|
||
_issue(
|
||
issues,
|
||
"missing_object_breakdown",
|
||
current_path,
|
||
f"technical heading '{text}' must include at least one object/module/subsystem oriented child",
|
||
)
|
||
|
||
duplicate_stems = _duplicate_generic_stems(direct_heading_children)
|
||
if duplicate_stems:
|
||
joined = ", ".join(duplicate_stems)
|
||
_issue(
|
||
issues,
|
||
"duplicate_technical_facets",
|
||
current_path,
|
||
f"technical heading '{text}' has repeated generic child facets: {joined}",
|
||
)
|
||
|
||
if (
|
||
technical_depth >= 3
|
||
and not direct_heading_children
|
||
and not allow_service_facets
|
||
and _looks_management_focused(text)
|
||
):
|
||
_issue(
|
||
issues,
|
||
"management_leaf_too_generic",
|
||
current_path,
|
||
f"management-style leaf '{text}' is too generic; refine it to an object or concrete deliverable",
|
||
)
|
||
|
||
if technical_depth == 2 and direct_heading_children:
|
||
if (
|
||
not allow_service_facets
|
||
and not allow_fixed_structure
|
||
and all(_looks_management_focused(str(child.get("text", "")).strip()) for child in direct_heading_children)
|
||
):
|
||
_issue(
|
||
issues,
|
||
"top_branch_missing_object_nodes",
|
||
current_path,
|
||
f"technical branch '{text}' is expanded only by management facets; add module/subsystem/device oriented nodes",
|
||
)
|
||
|
||
_walk_blocks(direct_heading_children, current_path, issues, current_policy, current_level)
|
||
|
||
|
||
def check_outline(payload: dict[str, Any]) -> dict[str, Any]:
|
||
blocks = payload.get("blocks", [])
|
||
if not isinstance(blocks, list):
|
||
raise QueryError("blocks must be a list")
|
||
policy = _normalize_policy(payload)
|
||
issues: list[dict[str, Any]] = []
|
||
_walk_blocks(blocks, [], issues, policy)
|
||
_check_technical_depth(blocks, issues, policy)
|
||
return {
|
||
"status": "ok" if not issues else "failed",
|
||
"issue_count": len(issues),
|
||
"issues": issues,
|
||
}
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--outline-file", required=True)
|
||
parser.add_argument("--report", required=True)
|
||
args = parser.parse_args()
|
||
|
||
payload = read_json(Path(args.outline_file).resolve())
|
||
report = check_outline(payload)
|
||
write_json(Path(args.report).resolve(), report)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|