Skill-BidCreater/scripts/docx_ops_lib.py
2026-03-14 17:02:33 +08:00

1376 lines
55 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import re
import shutil
import subprocess
from collections import defaultdict
from dataclasses import dataclass
from hashlib import sha1
from pathlib import Path
from typing import Any, Iterator
from docx import Document
from docx.document import Document as DocxDocument
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.oxml import OxmlElement
from docx.shared import Pt
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
try:
from pdf2image import convert_from_path
except ImportError: # pragma: no cover
convert_from_path = None
try:
from docx.oxml.ns import qn
except ImportError: # pragma: no cover
qn = None
NAMESPACES = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
TEXT_WINDOW_DEFAULT = 40
DEFAULT_DOCX_STYLE_PROFILE = "default_bid"
DEFAULT_NUMBERING_MODE = "explicit_text"
DEFAULT_DOCUMENT_KIND = "generic"
HEADING_NUMBER_PATTERN = re.compile(r"^(?P<number>\d+(?:\.\d+)*)\s+(?P<title>.+)$")
LEGACY_HEADING_PREFIX_PATTERN = re.compile(r"^(?:\d+(?:\.\d+)*|[一二三四五六七八九十]+)[、\.]?\s*")
CAPTION_PATTERN = re.compile(r"^(图|表|附件)\s*(\d+)-(\d+)\s+(.+)$")
PLACEHOLDER_PATTERN = re.compile(r"(占位|待补充|待提供|待替换|替换提示|TODO|技术转引)")
DEFAULT_BID_STYLE_SPEC: dict[str, Any] = {
"normal": {
"font_name": "宋体",
"font_size": 12,
"bold": False,
"first_line_indent": 24,
"line_spacing": 1.5,
"space_before": 0,
"space_after": 0,
},
"headings": {
1: {
"font_name": "黑体",
"font_size": 15,
"bold": True,
"space_before": 18,
"space_after": 12,
},
2: {
"font_name": "黑体",
"font_size": 14,
"bold": True,
"space_before": 12,
"space_after": 6,
},
3: {
"font_name": "黑体",
"font_size": 12,
"bold": True,
"space_before": 6,
"space_after": 6,
},
4: {
"font_name": "楷体",
"font_size": 12,
"bold": False,
"space_before": 6,
"space_after": 3,
},
},
"table": {
"font_name": "宋体",
"font_size": 10.5,
"header_bold": True,
},
}
@dataclass
class NodeRecord:
node_id: str
node_type: str
text: str
style_name: str | None
heading_level: int | None
path: list[str]
ordinal: int
parent_id: str | None
anchor: str
container: str
table_index: int | None = None
row_index: int | None = None
cell_index: int | None = None
block_index: int | None = None
xml_path: str | None = None
has_image: bool = False
object_ref: Any = None
def to_dict(self) -> dict[str, Any]:
return {
"node_id": self.node_id,
"node_type": self.node_type,
"text": self.text,
"style_name": self.style_name,
"heading_level": self.heading_level,
"path": self.path,
"ordinal": self.ordinal,
"parent_id": self.parent_id,
"anchor": self.anchor,
"container": self.container,
"table_index": self.table_index,
"row_index": self.row_index,
"cell_index": self.cell_index,
"block_index": self.block_index,
"xml_path": self.xml_path,
"has_image": self.has_image,
}
class QueryError(RuntimeError):
pass
def read_json(path: Path) -> Any:
with path.open("r", encoding="utf-8-sig") as handle:
return json.load(handle)
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="\n") as handle:
json.dump(payload, handle, ensure_ascii=False, indent=2)
handle.write("\n")
def heading_level_for_style(style_name: str | None) -> int | None:
if not style_name:
return None
compact_style = normalize_text(style_name)
match = re.match(r"Heading\s+(\d+)$", compact_style, flags=re.IGNORECASE)
if match:
return int(match.group(1))
match = re.match(r"标题\s*(\d+)$", compact_style)
return int(match.group(1)) if match else None
def normalize_text(value: str) -> str:
return re.sub(r"\s+", " ", value or "").strip()
def get_style_spec(docx_style_profile: str) -> dict[str, Any]:
if docx_style_profile != DEFAULT_DOCX_STYLE_PROFILE:
raise QueryError(f"unsupported docx_style_profile: {docx_style_profile}")
return DEFAULT_BID_STYLE_SPEC
def resolve_generation_options(payload: dict[str, Any]) -> dict[str, Any]:
return {
"docx_style_profile": str(payload.get("docx_style_profile", DEFAULT_DOCX_STYLE_PROFILE)),
"numbering_mode": str(payload.get("numbering_mode", DEFAULT_NUMBERING_MODE)),
"template_docx": payload.get("template_docx"),
"document_kind": str(payload.get("document_kind", DEFAULT_DOCUMENT_KIND)),
}
def _get_xml_element(target: Any) -> Any | None:
element = getattr(target, "_element", None)
if element is not None:
return element
return getattr(target, "element", None)
def _set_font_family(target: Any, font_name: str) -> None:
target.font.name = font_name
if not qn:
return
element = _get_xml_element(target)
if element is None:
return
r_pr = getattr(element, "rPr", None)
if r_pr is None:
r_pr = OxmlElement("w:rPr")
element.insert(0, r_pr)
r_fonts = getattr(r_pr, "rFonts", None)
if r_fonts is None:
r_fonts = OxmlElement("w:rFonts")
r_pr.insert(0, r_fonts)
for attr in ("w:ascii", "w:hAnsi", "w:eastAsia"):
r_fonts.set(qn(attr), font_name)
def apply_run_font(target_run: Any, *, font_name: str, font_size: float, bold: bool | None = None) -> None:
_set_font_family(target_run, font_name)
target_run.font.size = Pt(font_size)
if bold is not None:
target_run.bold = bold
def configure_style(style: Any, *, font_name: str, font_size: float, bold: bool, space_before: float = 0, space_after: float = 0, first_line_indent: float | None = None, line_spacing: float | None = None) -> None:
_set_font_family(style, font_name)
style.font.size = Pt(font_size)
style.font.bold = bold
paragraph_format = style.paragraph_format
paragraph_format.space_before = Pt(space_before)
paragraph_format.space_after = Pt(space_after)
if first_line_indent is not None:
paragraph_format.first_line_indent = Pt(first_line_indent)
if line_spacing is not None:
paragraph_format.line_spacing = line_spacing
def initialize_default_bid_styles(document: Document, docx_style_profile: str) -> dict[str, Any]:
spec = get_style_spec(docx_style_profile)
styles = document.styles
configure_style(styles["Normal"], **spec["normal"])
try:
configure_style(styles["List Bullet"], **spec["normal"])
except KeyError:
pass
for level, heading_spec in spec["headings"].items():
configure_style(styles[f"Heading {level}"], **heading_spec)
return {
"status": "pass",
"profile": docx_style_profile,
"summary": {
"heading_numbering": "1 / 1.1 / 1.1.1 / 1.1.1.1",
"normal_font": spec["normal"]["font_name"],
"normal_font_size": spec["normal"]["font_size"],
},
"issues": [],
}
def strip_heading_prefix(text: str) -> str:
normalized = normalize_text(text)
numbered = HEADING_NUMBER_PATTERN.match(normalized)
if numbered:
return numbered.group("title")
return LEGACY_HEADING_PREFIX_PATTERN.sub("", normalized, count=1).strip()
def replace_paragraph_text(paragraph: Paragraph, text: str) -> None:
existing_runs = list(paragraph.runs)
source_run = existing_runs[0] if existing_runs else None
clear_paragraph(paragraph)
new_run = paragraph.add_run(text)
if source_run is not None:
clone_run_format(source_run, new_run)
def apply_heading_numbering(document: Document, numbering_mode: str) -> None:
if numbering_mode != DEFAULT_NUMBERING_MODE:
raise QueryError(f"unsupported numbering_mode: {numbering_mode}")
counters = [0] * 9
for paragraph in document.paragraphs:
style_name = paragraph.style.name if paragraph.style else None
level = heading_level_for_style(style_name)
if not level:
continue
counters[level - 1] += 1
for index in range(level, len(counters)):
counters[index] = 0
prefix = ".".join(str(value) for value in counters[:level] if value)
base_text = strip_heading_prefix(paragraph.text)
replace_paragraph_text(paragraph, f"{prefix} {base_text}".strip())
def apply_paragraph_profile(paragraph: Paragraph, *, font_name: str, font_size: float, bold: bool, first_line_indent: float | None = None, line_spacing: float | None = None, space_before: float | None = None, space_after: float | None = None) -> None:
if first_line_indent is not None:
paragraph.paragraph_format.first_line_indent = Pt(first_line_indent)
if line_spacing is not None:
paragraph.paragraph_format.line_spacing = line_spacing
if space_before is not None:
paragraph.paragraph_format.space_before = Pt(space_before)
if space_after is not None:
paragraph.paragraph_format.space_after = Pt(space_after)
for run in paragraph.runs:
apply_run_font(run, font_name=font_name, font_size=font_size, bold=bold)
def apply_table_profile(table: Table, docx_style_profile: str) -> None:
table_spec = get_style_spec(docx_style_profile)["table"]
try:
table.style = "Table Grid"
except KeyError:
pass
for row_index, row in enumerate(table.rows):
for cell in row.cells:
for paragraph in cell.paragraphs:
apply_paragraph_profile(
paragraph,
font_name=table_spec["font_name"],
font_size=table_spec["font_size"],
bold=bool(row_index == 0 and table_spec["header_bold"]),
first_line_indent=0,
line_spacing=1.0,
space_before=0,
space_after=0,
)
def apply_document_profile(document: Document, docx_style_profile: str) -> None:
spec = get_style_spec(docx_style_profile)
for paragraph in document.paragraphs:
style_name = paragraph.style.name if paragraph.style else None
level = heading_level_for_style(style_name)
if level:
heading_spec = spec["headings"].get(level, spec["headings"][4])
paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT
paragraph.paragraph_format.first_line_indent = Pt(0)
paragraph.paragraph_format.space_before = Pt(heading_spec["space_before"])
paragraph.paragraph_format.space_after = Pt(heading_spec["space_after"])
paragraph.paragraph_format.keep_with_next = True
apply_paragraph_profile(
paragraph,
font_name=heading_spec["font_name"],
font_size=heading_spec["font_size"],
bold=heading_spec["bold"],
first_line_indent=0,
line_spacing=1.0,
space_before=heading_spec["space_before"],
space_after=heading_spec["space_after"],
)
continue
apply_paragraph_profile(
paragraph,
font_name=spec["normal"]["font_name"],
font_size=spec["normal"]["font_size"],
bold=spec["normal"]["bold"],
first_line_indent=spec["normal"]["first_line_indent"],
line_spacing=spec["normal"]["line_spacing"],
space_before=spec["normal"]["space_before"],
space_after=spec["normal"]["space_after"],
)
for table in document.tables:
apply_table_profile(table, docx_style_profile)
def remove_initial_blank_paragraph(document: Document) -> None:
if len(document.paragraphs) != 1:
return
paragraph = document.paragraphs[0]
if normalize_text(paragraph.text):
return
delete_block(paragraph)
def validate_format_profile(document: Document, docx_style_profile: str) -> dict[str, Any]:
spec = get_style_spec(docx_style_profile)
issues: list[str] = []
styles = document.styles
for style_name, expected in (
("Normal", spec["normal"]),
("Heading 1", spec["headings"][1]),
("Heading 2", spec["headings"][2]),
("Heading 3", spec["headings"][3]),
("Heading 4", spec["headings"][4]),
):
style = styles[style_name]
actual_size = style.font.size.pt if style.font.size is not None else None
if style.font.name != expected["font_name"]:
issues.append(f"{style_name} font should be {expected['font_name']}, got {style.font.name!r}")
if actual_size is None or abs(actual_size - expected["font_size"]) > 0.2:
issues.append(f"{style_name} size should be {expected['font_size']}, got {actual_size!r}")
return {
"status": "pass" if not issues else "fail",
"profile": docx_style_profile,
"issues": issues,
}
def validate_heading_numbering(document: Document, numbering_mode: str) -> dict[str, Any]:
if numbering_mode != DEFAULT_NUMBERING_MODE:
return {
"status": "fail",
"mode": numbering_mode,
"checked_headings": 0,
"issues": [f"unsupported numbering_mode: {numbering_mode}"],
}
counters = [0] * 9
issues: list[str] = []
checked = 0
for paragraph in document.paragraphs:
style_name = paragraph.style.name if paragraph.style else None
level = heading_level_for_style(style_name)
if not level:
continue
checked += 1
counters[level - 1] += 1
for index in range(level, len(counters)):
counters[index] = 0
expected = ".".join(str(value) for value in counters[:level] if value)
match = HEADING_NUMBER_PATTERN.match(normalize_text(paragraph.text))
actual = match.group("number") if match else None
if actual != expected:
issues.append(f"{paragraph.text!r} should use heading number {expected}")
return {
"status": "pass" if not issues else "fail",
"mode": numbering_mode,
"checked_headings": checked,
"issues": issues,
}
def validate_caption_numbering(document: Document) -> dict[str, Any]:
counters: dict[tuple[str, int], int] = defaultdict(int)
issues: list[str] = []
caption_count = 0
for paragraph in document.paragraphs:
match = CAPTION_PATTERN.match(normalize_text(paragraph.text))
if not match:
continue
caption_count += 1
kind, chapter_text, index_text, _ = match.groups()
chapter = int(chapter_text)
index = int(index_text)
counters[(kind, chapter)] += 1
if counters[(kind, chapter)] != index:
issues.append(f"{paragraph.text!r} caption index should be {counters[(kind, chapter)]}")
return {
"status": "pass" if not issues else "fail",
"caption_count": caption_count,
"issues": issues,
}
def document_has_toc(document: Document) -> bool:
body = document.element.body
for element in body.iter():
if element.tag.endswith("}instrText") and "TOC" in "".join(element.itertext()):
return True
return False
def validate_toc(document: Document, document_kind: str) -> dict[str, Any]:
has_toc = document_has_toc(document)
if has_toc:
return {"status": "pass", "has_toc": True, "issues": []}
if document_kind == "outline":
return {
"status": "pass",
"has_toc": False,
"issues": ["outline documents can use heading numbering directly as visible目录内容"],
}
return {
"status": "pass",
"has_toc": False,
"issues": ["TOC field not found; current workflow allows user to insert or update TOC in Word"],
}
def collect_placeholder_hits(document: Document) -> list[str]:
hits: list[str] = []
for paragraph in document.paragraphs:
text = normalize_text(paragraph.text)
if text and PLACEHOLDER_PATTERN.search(text):
hits.append(text)
for table in document.tables:
for row in table.rows:
for cell in row.cells:
text = normalize_text(cell.text)
if text and PLACEHOLDER_PATTERN.search(text):
hits.append(text)
return hits
def validate_placeholders(document: Document, document_kind: str) -> dict[str, Any]:
hits = collect_placeholder_hits(document)
allow_hits = document_kind == "outline"
status = "pass" if allow_hits or not hits else "fail"
return {
"status": status,
"placeholder_count": len(hits),
"issues": hits[:20],
}
def build_acceptance_checks(*, format_profile: dict[str, Any], numbering_validation: dict[str, Any], caption_validation: dict[str, Any], toc_validation: dict[str, Any], placeholder_validation: dict[str, Any], render_status: str | None = None) -> dict[str, Any]:
checks = [
{"name": "format_profile", "status": format_profile["status"]},
{"name": "numbering_validation", "status": numbering_validation["status"]},
{"name": "caption_validation", "status": caption_validation["status"]},
{"name": "toc_validation", "status": toc_validation["status"]},
{"name": "placeholder_validation", "status": placeholder_validation["status"]},
]
if render_status is not None:
checks.append(
{
"name": "render_validation",
"status": "pass" if render_status == "ok" else ("warn" if render_status == "render_skipped" else "fail"),
}
)
overall_status = "fail" if any(item["status"] == "fail" for item in checks) else "pass"
return {
"status": overall_status,
"checks": checks,
}
def inspect_document_quality(docx_path: Path, *, docx_style_profile: str, numbering_mode: str, document_kind: str, render_status: str | None = None) -> dict[str, Any]:
document = Document(str(docx_path))
format_profile = validate_format_profile(document, docx_style_profile)
numbering_validation = validate_heading_numbering(document, numbering_mode)
caption_validation = validate_caption_numbering(document)
toc_validation = validate_toc(document, document_kind)
placeholder_validation = validate_placeholders(document, document_kind)
acceptance_checks = build_acceptance_checks(
format_profile=format_profile,
numbering_validation=numbering_validation,
caption_validation=caption_validation,
toc_validation=toc_validation,
placeholder_validation=placeholder_validation,
render_status=render_status,
)
return {
"format_profile": format_profile,
"numbering_validation": numbering_validation,
"caption_validation": caption_validation,
"toc_validation": toc_validation,
"placeholder_validation": placeholder_validation,
"acceptance_checks": acceptance_checks,
}
def create_docx_document(spec_data: dict[str, Any]) -> dict[str, Any]:
output_docx = Path(spec_data["output_docx"]).resolve()
blocks = spec_data.get("blocks", [])
if not isinstance(blocks, list):
raise QueryError("blocks must be a list")
options = resolve_generation_options(spec_data)
template_docx = options["template_docx"]
output_docx.parent.mkdir(parents=True, exist_ok=True)
document = Document(str(Path(template_docx).resolve())) if template_docx else Document()
title = spec_data.get("title")
if title:
document.core_properties.title = str(title)
initialize_default_bid_styles(document, options["docx_style_profile"])
remove_initial_blank_paragraph(document)
block_reports: list[dict[str, Any]] = []
def render_block(block: dict[str, Any], index_path: list[int]) -> None:
if not isinstance(block, dict):
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} must be an object")
block_type = block.get("type", "paragraph")
if block_type == "heading":
level = int(block.get("level", 1))
if level < 1 or level > 9:
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} heading level must be between 1 and 9")
text = str(block.get("text", ""))
paragraph = document.add_paragraph(style=f"Heading {level}")
run = paragraph.add_run(text)
heading_spec = get_style_spec(options["docx_style_profile"])["headings"].get(level, get_style_spec(options["docx_style_profile"])["headings"][4])
apply_run_font(run, font_name=heading_spec["font_name"], font_size=heading_spec["font_size"], bold=heading_spec["bold"])
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "text": summarize_text(text), "level": level})
children = block.get("children", [])
if children and not isinstance(children, list):
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} children must be a list")
if isinstance(children, list):
for child_index, child in enumerate(children):
render_block(child, index_path + [child_index])
return
if block_type == "paragraph":
text = str(block.get("text", ""))
paragraph = document.add_paragraph()
style_name = block.get("style")
if style_name:
try:
paragraph.style = str(style_name)
except KeyError:
pass
run = paragraph.add_run(text)
normal_spec = get_style_spec(options["docx_style_profile"])["normal"]
apply_run_font(run, font_name=normal_spec["font_name"], font_size=normal_spec["font_size"], bold=normal_spec["bold"])
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "text": summarize_text(text)})
return
if block_type == "list":
items = block.get("items", [])
if not isinstance(items, list):
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} items must be a list")
style_name = str(block.get("style", "List Bullet"))
for item in items:
paragraph = document.add_paragraph()
try:
paragraph.style = style_name
except KeyError:
pass
run = paragraph.add_run(str(item))
normal_spec = get_style_spec(options["docx_style_profile"])["normal"]
apply_run_font(run, font_name=normal_spec["font_name"], font_size=normal_spec["font_size"], bold=normal_spec["bold"])
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "item_count": len(items)})
return
if block_type == "table":
rows = block.get("rows", [])
if not isinstance(rows, list) or not rows or not isinstance(rows[0], list) or not rows[0]:
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} rows must be a non-empty 2D list")
table = document.add_table(rows=0, cols=len(rows[0]))
style_name = block.get("style")
if style_name:
try:
table.style = str(style_name)
except KeyError:
pass
for row_values in rows:
if not isinstance(row_values, list) or len(row_values) != len(rows[0]):
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} table rows must have equal column counts")
row = table.add_row()
for cell_index, value in enumerate(row_values):
row.cells[cell_index].text = str(value)
apply_table_profile(table, options["docx_style_profile"])
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "row_count": len(rows), "column_count": len(rows[0])})
return
if block_type == "page_break":
document.add_page_break()
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type})
return
raise QueryError(f"unsupported block type: {block_type}")
for index, block in enumerate(blocks):
render_block(block, [index])
apply_heading_numbering(document, options["numbering_mode"])
apply_document_profile(document, options["docx_style_profile"])
document.save(str(output_docx))
final_index = index_document(output_docx)
quality = inspect_document_quality(
output_docx,
docx_style_profile=options["docx_style_profile"],
numbering_mode=options["numbering_mode"],
document_kind=options["document_kind"],
)
return {
"status": "ok",
"output_docx": str(output_docx),
"block_count": len(blocks),
"blocks": block_reports,
"final_summary": final_index["summary"],
"docx_style_profile": options["docx_style_profile"],
"numbering_mode": options["numbering_mode"],
"document_kind": options["document_kind"],
"template_docx": str(Path(template_docx).resolve()) if template_docx else None,
**quality,
}
def export_outline_artifacts(payload: dict[str, Any]) -> dict[str, Any]:
technical_outline = payload.get("technical_outline")
business_outline = payload.get("business_outline")
technical_json = Path(payload["technical_outline_json"]).resolve()
business_json = Path(payload["business_outline_json"]).resolve()
technical_docx = Path(payload["technical_docx"]).resolve()
business_docx = Path(payload["business_docx"]).resolve()
options = resolve_generation_options(payload)
for outline_name, outline in (("technical_outline", technical_outline), ("business_outline", business_outline)):
if not isinstance(outline, dict):
raise QueryError(f"{outline_name} must be an object")
if not isinstance(outline.get("blocks"), list):
raise QueryError(f"{outline_name}.blocks must be a list")
write_json(technical_json, technical_outline)
write_json(business_json, business_outline)
technical_report = create_docx_document(
{
"output_docx": str(technical_docx),
"title": str(technical_outline.get("title", "技术标目录")),
"blocks": technical_outline["blocks"],
"docx_style_profile": options["docx_style_profile"],
"numbering_mode": options["numbering_mode"],
"template_docx": options["template_docx"],
"document_kind": "outline",
}
)
business_report = create_docx_document(
{
"output_docx": str(business_docx),
"title": str(business_outline.get("title", "商务及其他目录")),
"blocks": business_outline["blocks"],
"docx_style_profile": options["docx_style_profile"],
"numbering_mode": options["numbering_mode"],
"template_docx": options["template_docx"],
"document_kind": "outline",
}
)
return {
"status": "ok",
"docx_style_profile": options["docx_style_profile"],
"numbering_mode": options["numbering_mode"],
"document_kind": "outline",
"template_docx": str(Path(options["template_docx"]).resolve()) if options["template_docx"] else None,
"technical_outline_json": str(technical_json),
"business_outline_json": str(business_json),
"technical_docx": str(technical_docx),
"business_docx": str(business_docx),
"technical_report": technical_report,
"business_report": business_report,
}
def slugify_text(value: str, *, limit: int = 32) -> str:
compact = normalize_text(value)
if not compact:
return "empty"
compact = re.sub(r"[^\w\u4e00-\u9fff-]+", "-", compact, flags=re.UNICODE)
compact = re.sub(r"-+", "-", compact).strip("-").lower()
return compact[:limit] or "empty"
def summarize_text(value: str, *, limit: int = 80) -> str:
return normalize_text(value)[:limit]
def iter_block_items(parent: DocxDocument | _Cell) -> Iterator[Paragraph | Table]:
parent_element = parent.element.body if isinstance(parent, DocxDocument) else parent._tc
for child in parent_element.iterchildren():
if child.tag.endswith("}p"):
yield Paragraph(child, parent)
elif child.tag.endswith("}tbl"):
yield Table(child, parent)
def paragraph_has_image(paragraph: Paragraph) -> bool:
return bool(paragraph._element.xpath(".//w:drawing"))
def paragraph_is_list_item(paragraph: Paragraph) -> bool:
style_name = paragraph.style.name if paragraph.style else ""
if style_name.lower().startswith("list"):
return True
p_pr = paragraph._element.pPr
return p_pr is not None and p_pr.numPr is not None
def build_anchor(path: list[str], node_type: str, text: str, ordinal: int) -> str:
seed = "|".join(["/".join(path), node_type, summarize_text(text, limit=32), str(ordinal)])
digest = sha1(seed.encode("utf-8")).hexdigest()[:10]
slug = slugify_text(text, limit=24)
path_slug = slugify_text("-".join(path), limit=24)
return f"{path_slug}:{node_type}:{slug}:{ordinal}:{digest}"
def _index_document_core(document: Document) -> list[NodeRecord]:
nodes: list[NodeRecord] = []
heading_stack: list[str] = []
heading_ids: dict[int, str] = {}
ordinal = 0
def current_parent_id() -> str | None:
if not heading_ids:
return None
return heading_ids[max(heading_ids)]
def add_record(
*,
node_type: str,
text: str,
style_name: str | None,
heading_level: int | None,
path: list[str],
parent_id: str | None,
container: str,
object_ref: Any,
table_index: int | None = None,
row_index: int | None = None,
cell_index: int | None = None,
block_index: int | None = None,
xml_path: str | None = None,
has_image: bool = False,
) -> NodeRecord:
nonlocal ordinal
ordinal += 1
node_id = f"n-{ordinal:05d}"
record = NodeRecord(
node_id=node_id,
node_type=node_type,
text=normalize_text(text),
style_name=style_name,
heading_level=heading_level,
path=path,
ordinal=ordinal,
parent_id=parent_id,
anchor=build_anchor(path, node_type, text, ordinal),
container=container,
table_index=table_index,
row_index=row_index,
cell_index=cell_index,
block_index=block_index,
xml_path=xml_path,
has_image=has_image,
object_ref=object_ref,
)
nodes.append(record)
return record
for block_index, block in enumerate(iter_block_items(document)):
if isinstance(block, Paragraph):
text = normalize_text(block.text)
style_name = block.style.name if block.style else None
level = heading_level_for_style(style_name)
if level is not None:
while len(heading_stack) >= level:
heading_stack.pop()
heading_stack.append(text or f"Heading {level}")
heading_ids = {key: value for key, value in heading_ids.items() if key < level}
record = add_record(
node_type="heading",
text=text,
style_name=style_name,
heading_level=level,
path=list(heading_stack),
parent_id=heading_ids.get(level - 1),
container="document",
object_ref=block,
block_index=block_index,
has_image=paragraph_has_image(block),
)
heading_ids[level] = record.node_id
if record.has_image:
add_record(
node_type="image_placeholder",
text=text or "[image]",
style_name=style_name,
heading_level=level,
path=list(heading_stack),
parent_id=record.node_id,
container="document",
object_ref=block,
block_index=block_index,
has_image=True,
)
continue
record = add_record(
node_type="list_item" if paragraph_is_list_item(block) else "paragraph",
text=text,
style_name=style_name,
heading_level=None,
path=list(heading_stack),
parent_id=current_parent_id(),
container="document",
object_ref=block,
block_index=block_index,
has_image=paragraph_has_image(block),
)
if record.has_image:
add_record(
node_type="image_placeholder",
text=text or "[image]",
style_name=style_name,
heading_level=None,
path=list(heading_stack),
parent_id=record.node_id,
container="document",
object_ref=block,
block_index=block_index,
has_image=True,
)
else:
table_text = "\n".join(
" | ".join(normalize_text(cell.text) for cell in row.cells)
for row in block.rows
)
table_record = add_record(
node_type="table",
text=table_text,
style_name=block.style.name if block.style else None,
heading_level=None,
path=list(heading_stack),
parent_id=current_parent_id(),
container="document",
object_ref=block,
block_index=block_index,
xml_path=f"table[{block_index}]",
)
for row_index, row in enumerate(block.rows):
row_text = " | ".join(normalize_text(cell.text) for cell in row.cells)
row_record = add_record(
node_type="table_row",
text=row_text,
style_name=table_record.style_name,
heading_level=None,
path=list(heading_stack),
parent_id=table_record.node_id,
container="table",
object_ref=row,
table_index=block_index,
row_index=row_index,
xml_path=f"table[{block_index}]/row[{row_index}]",
)
for cell_index, cell in enumerate(row.cells):
add_record(
node_type="table_cell",
text="\n".join(
normalize_text(paragraph.text)
for paragraph in cell.paragraphs
if normalize_text(paragraph.text)
),
style_name=None,
heading_level=None,
path=list(heading_stack),
parent_id=row_record.node_id,
container="table",
object_ref=cell,
table_index=block_index,
row_index=row_index,
cell_index=cell_index,
xml_path=f"table[{block_index}]/row[{row_index}]/cell[{cell_index}]",
)
return nodes
def index_document(docx_path: Path) -> dict[str, Any]:
document = Document(str(docx_path))
nodes = _index_document_core(document)
return {
"status": "ok",
"docx": str(docx_path),
"summary": {
"node_count": len(nodes),
"heading_count": sum(1 for node in nodes if node.node_type == "heading"),
"paragraph_count": sum(1 for node in nodes if node.node_type == "paragraph"),
"list_item_count": sum(1 for node in nodes if node.node_type == "list_item"),
"table_count": sum(1 for node in nodes if node.node_type == "table"),
"image_placeholder_count": sum(1 for node in nodes if node.node_type == "image_placeholder"),
},
"nodes": [node.to_dict() for node in nodes],
}
def query_nodes(index_data: dict[str, Any], query: dict[str, Any]) -> dict[str, Any]:
nodes = index_data.get("nodes", [])
mode = query.get("match_mode", "contains_text")
value = query.get("value")
if value is None and mode not in {"node_type"}:
raise QueryError("query.value is required")
node_type_filter = query.get("node_type")
style_name_filter = query.get("style_name")
heading_level = query.get("heading_level")
allow_multiple = bool(query.get("allow_multiple", False))
occurrence = query.get("occurrence")
window = int(query.get("context_window", TEXT_WINDOW_DEFAULT))
def node_matches(node: dict[str, Any]) -> bool:
if node_type_filter and node.get("node_type") != node_type_filter:
return False
if style_name_filter and node.get("style_name") != style_name_filter:
return False
if heading_level is not None and node.get("heading_level") != heading_level:
return False
node_text = node.get("text", "")
if mode == "exact_text":
return node_text == value
if mode == "contains_text":
return value in node_text
if mode == "regex":
return re.search(value, node_text) is not None
if mode == "heading_path":
return node.get("node_type") == "heading" and " > ".join(node.get("path", [])) == value
if mode == "heading_text":
return node.get("node_type") == "heading" and node_text == value
if mode == "table_title":
path_parts = node.get("path", [])
return node.get("node_type") == "table" and bool(path_parts) and path_parts[-1] == value
if mode == "style_name":
return node.get("style_name") == value
if mode == "node_type":
return node.get("node_type") == query.get("value")
if mode == "anchor":
return node.get("anchor") == value
if mode == "node_id":
return node.get("node_id") == value
raise QueryError(f"unsupported match_mode: {mode}")
matches = [node for node in nodes if node_matches(node)]
if occurrence is not None:
matches = [matches[occurrence]] if 0 <= occurrence < len(matches) else []
ambiguous = len(matches) > 1 and not allow_multiple
best_match = matches[0] if len(matches) == 1 or (allow_multiple and matches) else None
def with_context(node: dict[str, Any]) -> dict[str, Any]:
text = node.get("text", "")
return {
**node,
"context": {
"before": text[:window],
"after": text[-window:] if text else "",
},
}
return {
"status": "ok",
"query": query,
"match_count": len(matches),
"ambiguous": ambiguous,
"best_match": with_context(best_match) if best_match else None,
"candidate_anchors": [match["anchor"] for match in matches],
"matches": [with_context(match) for match in matches],
"errors": ["query matched multiple nodes"] if ambiguous else [],
"warnings": [],
}
def find_records(index_data: dict[str, Any], query: dict[str, Any]) -> list[dict[str, Any]]:
result = query_nodes(index_data, query)
if result["ambiguous"] and query.get("on_ambiguous", "error") == "error":
raise QueryError("query matched multiple nodes")
if result["match_count"] == 0 and query.get("on_missing", "error") == "error":
raise QueryError("query matched no nodes")
return result["matches"]
def clone_run_format(source_run: Any, target_run: Any) -> None:
target_run.bold = source_run.bold
target_run.italic = source_run.italic
target_run.underline = source_run.underline
target_run.font.name = source_run.font.name
target_run.font.size = source_run.font.size
if source_run.font.color and source_run.font.color.rgb:
target_run.font.color.rgb = source_run.font.color.rgb
if qn and source_run._element.rPr is not None and source_run._element.rPr.rFonts is not None:
east_asia = source_run._element.rPr.rFonts.get(qn("w:eastAsia"))
if east_asia:
target_run._element.get_or_add_rPr().rFonts.set(qn("w:eastAsia"), east_asia)
def clear_paragraph(paragraph: Paragraph) -> None:
p_element = paragraph._element
for child in list(p_element):
if child.tag.endswith("}r") or child.tag.endswith("}hyperlink"):
p_element.remove(child)
def replace_text_in_paragraph(paragraph: Paragraph, old_text: str, new_text: str) -> bool:
if old_text not in paragraph.text:
return False
for run in paragraph.runs:
if old_text in run.text:
run.text = run.text.replace(old_text, new_text, 1)
return True
existing_runs = list(paragraph.runs)
first_run = existing_runs[0] if existing_runs else paragraph.add_run()
clear_paragraph(paragraph)
new_run = paragraph.add_run(new_text)
if existing_runs:
clone_run_format(first_run, new_run)
return True
def delete_block(block: Paragraph | Table) -> None:
element = block._element
parent = element.getparent()
if parent is not None:
parent.remove(element)
def insert_paragraph_relative(target: Paragraph | Table, *, after: bool, style_name: str | None = None) -> Paragraph:
new_p = OxmlElement("w:p")
if after:
target._element.addnext(new_p)
else:
target._element.addprevious(new_p)
paragraph = Paragraph(new_p, target._parent)
if style_name:
try:
paragraph.style = style_name
except KeyError:
pass
return paragraph
def append_paragraph_contents(paragraph: Paragraph, text: str, source: Paragraph | None = None) -> None:
if source is not None and source.style is not None:
paragraph.style = source.style
paragraph.paragraph_format.left_indent = source.paragraph_format.left_indent
paragraph.paragraph_format.right_indent = source.paragraph_format.right_indent
paragraph.paragraph_format.first_line_indent = source.paragraph_format.first_line_indent
paragraph.paragraph_format.space_before = source.paragraph_format.space_before
paragraph.paragraph_format.space_after = source.paragraph_format.space_after
paragraph.paragraph_format.line_spacing = source.paragraph_format.line_spacing
paragraph.alignment = source.alignment
if source is not None and source.runs:
run = paragraph.add_run(text)
clone_run_format(source.runs[0], run)
else:
paragraph.add_run(text)
def create_table_after(target: Paragraph | Table, rows: list[list[str]], style_name: str | None = None) -> Table:
parent = target._parent
cols = len(rows[0]) if rows else 1
table = parent.add_table(rows=0, cols=cols)
if style_name:
try:
table.style = style_name
except KeyError:
pass
for row_values in rows:
row = table.add_row()
for index, value in enumerate(row_values):
row.cells[index].text = value
target._element.addnext(table._element)
return table
def build_live_index(document: Document) -> tuple[dict[str, Any], dict[str, NodeRecord]]:
nodes = _index_document_core(document)
return {
"status": "ok",
"summary": {"node_count": len(nodes)},
"nodes": [node.to_dict() for node in nodes],
}, {node.anchor: node for node in nodes}
def insert_blocks(record: NodeRecord, operation: dict[str, Any], *, after: bool) -> None:
content_type = operation.get("content_type", "paragraphs")
content = operation.get("content")
if record.node_type not in {"paragraph", "list_item", "heading", "table"}:
raise QueryError("insert operations only support block nodes")
target = record.object_ref
if content_type == "paragraphs":
paragraphs = content if isinstance(content, list) else [str(content)]
previous: Paragraph | Table = target
for index, paragraph_text in enumerate(paragraphs):
new_paragraph = insert_paragraph_relative(
previous,
after=after if index == 0 else True,
style_name=record.style_name if record.node_type in {"paragraph", "list_item"} else "Normal",
)
source_paragraph = target if isinstance(target, Paragraph) and record.node_type in {"paragraph", "list_item"} else None
append_paragraph_contents(new_paragraph, str(paragraph_text), source=source_paragraph)
previous = new_paragraph
return
if content_type == "heading":
payload = content if isinstance(content, dict) else {"text": str(content)}
level = int(payload.get("level", record.heading_level or 1))
new_paragraph = insert_paragraph_relative(target, after=after, style_name=f"Heading {level}")
append_paragraph_contents(new_paragraph, str(payload.get("text", "")), source=target if isinstance(target, Paragraph) else None)
try:
new_paragraph.style = f"Heading {level}"
except KeyError:
pass
return
if content_type == "list":
items = content if isinstance(content, list) else []
previous: Paragraph | Table = target
for index, item in enumerate(items):
new_paragraph = insert_paragraph_relative(
previous,
after=after if index == 0 else True,
style_name="List Bullet",
)
source_paragraph = target if isinstance(target, Paragraph) and record.node_type == "list_item" else None
append_paragraph_contents(new_paragraph, str(item), source=source_paragraph)
try:
new_paragraph.style = "List Bullet"
except KeyError:
pass
previous = new_paragraph
return
if content_type == "table":
rows = content.get("rows") if isinstance(content, dict) else content
if not isinstance(rows, list) or not rows:
raise QueryError("table content must provide rows")
style_name = None
if isinstance(target, Table) and target.style is not None:
style_name = target.style.name
create_table_after(target, rows, style_name=style_name)
return
raise QueryError(f"unsupported content_type: {content_type}")
def replace_block(record: NodeRecord, operation: dict[str, Any]) -> None:
target = record.object_ref
insert_blocks(record, operation, after=False)
delete_block(target)
def apply_patch_document(patch_data: dict[str, Any]) -> dict[str, Any]:
source_docx = Path(patch_data["source_docx"]).resolve()
output_docx = Path(patch_data.get("output_docx", source_docx)).resolve()
in_place = bool(patch_data.get("in_place", False))
options = resolve_generation_options(patch_data)
if not in_place and output_docx == source_docx:
raise QueryError("output_docx must differ from source_docx unless in_place is true")
output_docx.parent.mkdir(parents=True, exist_ok=True)
if not in_place:
shutil.copy2(source_docx, output_docx)
document = Document(str(output_docx))
initialize_default_bid_styles(document, options["docx_style_profile"])
operations = patch_data.get("operations", [])
operation_reports: list[dict[str, Any]] = []
for index, operation in enumerate(operations):
live_index, record_map = build_live_index(document)
matches = find_records(live_index, operation.get("target", {}))
if len(matches) > 1 and operation.get("on_ambiguous", "error") == "error":
raise QueryError(f"operation {index} matched multiple nodes")
selected = matches if operation.get("allow_multiple") else matches[:1]
if not selected and operation.get("on_missing", "error") == "error":
raise QueryError(f"operation {index} matched no nodes")
affected: list[dict[str, Any]] = []
for match in selected:
record = record_map[match["anchor"]]
before_summary = summarize_text(record.text)
op_name = operation["op"]
if op_name == "replace_text":
old_text = operation["old_text"]
new_text = operation["new_text"]
if record.node_type not in {"paragraph", "list_item", "heading"}:
raise QueryError("replace_text only supports paragraph-like nodes")
if not replace_text_in_paragraph(record.object_ref, old_text, new_text):
raise QueryError(f"text not found in node {record.anchor}")
elif op_name == "delete_node":
if record.node_type not in {"paragraph", "list_item", "heading", "table"}:
raise QueryError("delete_node only supports block nodes")
delete_block(record.object_ref)
elif op_name == "insert_before":
insert_blocks(record, operation, after=False)
elif op_name == "insert_after":
insert_blocks(record, operation, after=True)
elif op_name == "replace_node":
replace_block(record, operation)
else:
raise QueryError(f"unsupported op: {op_name}")
affected.append(
{
"anchor": record.anchor,
"node_type": record.node_type,
"before": before_summary,
"op": op_name,
}
)
document.save(str(output_docx))
operation_reports.append(
{
"index": index,
"op": operation["op"],
"match_count": len(selected),
"affected": affected,
}
)
apply_heading_numbering(document, options["numbering_mode"])
apply_document_profile(document, options["docx_style_profile"])
document.save(str(output_docx))
final_index = index_document(output_docx)
quality = inspect_document_quality(
output_docx,
docx_style_profile=options["docx_style_profile"],
numbering_mode=options["numbering_mode"],
document_kind=options["document_kind"],
)
return {
"status": "ok",
"source_docx": str(source_docx),
"output_docx": str(output_docx),
"in_place": in_place,
"docx_style_profile": options["docx_style_profile"],
"numbering_mode": options["numbering_mode"],
"document_kind": options["document_kind"],
"template_docx": str(Path(options["template_docx"]).resolve()) if options["template_docx"] else None,
"operation_count": len(operations),
"operations": operation_reports,
"errors": [],
"warnings": [],
"final_summary": final_index["summary"],
**quality,
}
def render_docx(docx_path: Path, out_dir: Path, *, docx_style_profile: str = DEFAULT_DOCX_STYLE_PROFILE, numbering_mode: str = DEFAULT_NUMBERING_MODE, document_kind: str = DEFAULT_DOCUMENT_KIND) -> dict[str, Any]:
out_dir.mkdir(parents=True, exist_ok=True)
pdf_path = out_dir / f"{docx_path.stem}.pdf"
png_dir = out_dir / "pages"
png_dir.mkdir(parents=True, exist_ok=True)
soffice = shutil.which("soffice")
if not soffice:
report = {
"status": "render_skipped",
"docx": str(docx_path),
"docx_style_profile": docx_style_profile,
"numbering_mode": numbering_mode,
"document_kind": document_kind,
"pdf": None,
"page_count": 0,
"images": [],
"errors": [],
"warnings": ["LibreOffice/soffice not found"],
}
report.update(
inspect_document_quality(
docx_path,
docx_style_profile=docx_style_profile,
numbering_mode=numbering_mode,
document_kind=document_kind,
render_status=report["status"],
)
)
return report
process = subprocess.run(
[soffice, "--headless", "--convert-to", "pdf", "--outdir", str(out_dir), str(docx_path)],
capture_output=True,
text=True,
encoding="utf-8",
)
if process.returncode != 0 or not pdf_path.exists():
report = {
"status": "error",
"docx": str(docx_path),
"docx_style_profile": docx_style_profile,
"numbering_mode": numbering_mode,
"document_kind": document_kind,
"pdf": str(pdf_path),
"page_count": 0,
"images": [],
"errors": [process.stderr.strip() or "failed to convert docx to pdf"],
"warnings": [],
}
report.update(
inspect_document_quality(
docx_path,
docx_style_profile=docx_style_profile,
numbering_mode=numbering_mode,
document_kind=document_kind,
render_status=report["status"],
)
)
return report
images: list[str] = []
warnings: list[str] = []
if convert_from_path is None:
warnings.append("pdf2image not installed")
else:
try:
for page_number, image in enumerate(convert_from_path(str(pdf_path)), start=1):
image_path = png_dir / f"page-{page_number:03d}.png"
image.save(str(image_path), "PNG")
images.append(str(image_path))
except Exception as exc: # pragma: no cover
warnings.append(f"PNG render skipped: {exc}")
report = {
"status": "ok",
"docx": str(docx_path),
"docx_style_profile": docx_style_profile,
"numbering_mode": numbering_mode,
"document_kind": document_kind,
"pdf": str(pdf_path),
"page_count": len(images),
"images": images,
"errors": [],
"warnings": warnings,
}
report.update(
inspect_document_quality(
docx_path,
docx_style_profile=docx_style_profile,
numbering_mode=numbering_mode,
document_kind=document_kind,
render_status=report["status"],
)
)
return report