from __future__ import annotations import json import re import shutil import subprocess from collections import defaultdict from dataclasses import dataclass from hashlib import sha1 from pathlib import Path from typing import Any, Iterator from docx import Document from docx.document import Document as DocxDocument from docx.enum.text import WD_PARAGRAPH_ALIGNMENT from docx.oxml import OxmlElement from docx.shared import Pt from docx.table import Table, _Cell from docx.text.paragraph import Paragraph try: from pdf2image import convert_from_path except ImportError: # pragma: no cover convert_from_path = None try: from docx.oxml.ns import qn except ImportError: # pragma: no cover qn = None NAMESPACES = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"} TEXT_WINDOW_DEFAULT = 40 DEFAULT_DOCX_STYLE_PROFILE = "default_bid" DEFAULT_NUMBERING_MODE = "explicit_text" DEFAULT_DOCUMENT_KIND = "generic" HEADING_NUMBER_PATTERN = re.compile(r"^(?P\d+(?:\.\d+)*)\s+(?P.+)$") LEGACY_HEADING_PREFIX_PATTERN = re.compile(r"^(?:\d+(?:\.\d+)*|[一二三四五六七八九十]+)[、\..]?\s*") CAPTION_PATTERN = re.compile(r"^(图|表|附件)\s*(\d+)-(\d+)\s+(.+)$") PLACEHOLDER_PATTERN = re.compile(r"(占位|待补充|待提供|待替换|替换提示|TODO|技术转引)") DEFAULT_BID_STYLE_SPEC: dict[str, Any] = { "normal": { "font_name": "宋体", "font_size": 12, "bold": False, "first_line_indent": 24, "line_spacing": 1.5, "space_before": 0, "space_after": 0, }, "headings": { 1: { "font_name": "黑体", "font_size": 15, "bold": True, "space_before": 18, "space_after": 12, }, 2: { "font_name": "黑体", "font_size": 14, "bold": True, "space_before": 12, "space_after": 6, }, 3: { "font_name": "黑体", "font_size": 12, "bold": True, "space_before": 6, "space_after": 6, }, 4: { "font_name": "楷体", "font_size": 12, "bold": False, "space_before": 6, "space_after": 3, }, }, "table": { "font_name": "宋体", "font_size": 10.5, "header_bold": True, }, } @dataclass class NodeRecord: node_id: str node_type: str text: str style_name: str | None heading_level: int | None path: list[str] ordinal: int parent_id: str | None anchor: str container: str table_index: int | None = None row_index: int | None = None cell_index: int | None = None block_index: int | None = None xml_path: str | None = None has_image: bool = False object_ref: Any = None def to_dict(self) -> dict[str, Any]: return { "node_id": self.node_id, "node_type": self.node_type, "text": self.text, "style_name": self.style_name, "heading_level": self.heading_level, "path": self.path, "ordinal": self.ordinal, "parent_id": self.parent_id, "anchor": self.anchor, "container": self.container, "table_index": self.table_index, "row_index": self.row_index, "cell_index": self.cell_index, "block_index": self.block_index, "xml_path": self.xml_path, "has_image": self.has_image, } class QueryError(RuntimeError): pass def read_json(path: Path) -> Any: with path.open("r", encoding="utf-8-sig") as handle: return json.load(handle) def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8", newline="\n") as handle: json.dump(payload, handle, ensure_ascii=False, indent=2) handle.write("\n") def heading_level_for_style(style_name: str | None) -> int | None: if not style_name: return None compact_style = normalize_text(style_name) match = re.match(r"Heading\s+(\d+)$", compact_style, flags=re.IGNORECASE) if match: return int(match.group(1)) match = re.match(r"标题\s*(\d+)$", compact_style) return int(match.group(1)) if match else None def normalize_text(value: str) -> str: return re.sub(r"\s+", " ", value or "").strip() def get_style_spec(docx_style_profile: str) -> dict[str, Any]: if docx_style_profile != DEFAULT_DOCX_STYLE_PROFILE: raise QueryError(f"unsupported docx_style_profile: {docx_style_profile}") return DEFAULT_BID_STYLE_SPEC def resolve_generation_options(payload: dict[str, Any]) -> dict[str, Any]: return { "docx_style_profile": str(payload.get("docx_style_profile", DEFAULT_DOCX_STYLE_PROFILE)), "numbering_mode": str(payload.get("numbering_mode", DEFAULT_NUMBERING_MODE)), "template_docx": payload.get("template_docx"), "document_kind": str(payload.get("document_kind", DEFAULT_DOCUMENT_KIND)), } def _get_xml_element(target: Any) -> Any | None: element = getattr(target, "_element", None) if element is not None: return element return getattr(target, "element", None) def _set_font_family(target: Any, font_name: str) -> None: target.font.name = font_name if not qn: return element = _get_xml_element(target) if element is None: return r_pr = getattr(element, "rPr", None) if r_pr is None: r_pr = OxmlElement("w:rPr") element.insert(0, r_pr) r_fonts = getattr(r_pr, "rFonts", None) if r_fonts is None: r_fonts = OxmlElement("w:rFonts") r_pr.insert(0, r_fonts) for attr in ("w:ascii", "w:hAnsi", "w:eastAsia"): r_fonts.set(qn(attr), font_name) def apply_run_font(target_run: Any, *, font_name: str, font_size: float, bold: bool | None = None) -> None: _set_font_family(target_run, font_name) target_run.font.size = Pt(font_size) if bold is not None: target_run.bold = bold def configure_style(style: Any, *, font_name: str, font_size: float, bold: bool, space_before: float = 0, space_after: float = 0, first_line_indent: float | None = None, line_spacing: float | None = None) -> None: _set_font_family(style, font_name) style.font.size = Pt(font_size) style.font.bold = bold paragraph_format = style.paragraph_format paragraph_format.space_before = Pt(space_before) paragraph_format.space_after = Pt(space_after) if first_line_indent is not None: paragraph_format.first_line_indent = Pt(first_line_indent) if line_spacing is not None: paragraph_format.line_spacing = line_spacing def initialize_default_bid_styles(document: Document, docx_style_profile: str) -> dict[str, Any]: spec = get_style_spec(docx_style_profile) styles = document.styles configure_style(styles["Normal"], **spec["normal"]) try: configure_style(styles["List Bullet"], **spec["normal"]) except KeyError: pass for level, heading_spec in spec["headings"].items(): configure_style(styles[f"Heading {level}"], **heading_spec) return { "status": "pass", "profile": docx_style_profile, "summary": { "heading_numbering": "1 / 1.1 / 1.1.1 / 1.1.1.1", "normal_font": spec["normal"]["font_name"], "normal_font_size": spec["normal"]["font_size"], }, "issues": [], } def strip_heading_prefix(text: str) -> str: normalized = normalize_text(text) numbered = HEADING_NUMBER_PATTERN.match(normalized) if numbered: return numbered.group("title") return LEGACY_HEADING_PREFIX_PATTERN.sub("", normalized, count=1).strip() def replace_paragraph_text(paragraph: Paragraph, text: str) -> None: existing_runs = list(paragraph.runs) source_run = existing_runs[0] if existing_runs else None clear_paragraph(paragraph) new_run = paragraph.add_run(text) if source_run is not None: clone_run_format(source_run, new_run) def apply_heading_numbering(document: Document, numbering_mode: str) -> None: if numbering_mode != DEFAULT_NUMBERING_MODE: raise QueryError(f"unsupported numbering_mode: {numbering_mode}") counters = [0] * 9 for paragraph in document.paragraphs: style_name = paragraph.style.name if paragraph.style else None level = heading_level_for_style(style_name) if not level: continue counters[level - 1] += 1 for index in range(level, len(counters)): counters[index] = 0 prefix = ".".join(str(value) for value in counters[:level] if value) base_text = strip_heading_prefix(paragraph.text) replace_paragraph_text(paragraph, f"{prefix} {base_text}".strip()) def apply_paragraph_profile(paragraph: Paragraph, *, font_name: str, font_size: float, bold: bool, first_line_indent: float | None = None, line_spacing: float | None = None, space_before: float | None = None, space_after: float | None = None) -> None: if first_line_indent is not None: paragraph.paragraph_format.first_line_indent = Pt(first_line_indent) if line_spacing is not None: paragraph.paragraph_format.line_spacing = line_spacing if space_before is not None: paragraph.paragraph_format.space_before = Pt(space_before) if space_after is not None: paragraph.paragraph_format.space_after = Pt(space_after) for run in paragraph.runs: apply_run_font(run, font_name=font_name, font_size=font_size, bold=bold) def apply_table_profile(table: Table, docx_style_profile: str) -> None: table_spec = get_style_spec(docx_style_profile)["table"] try: table.style = "Table Grid" except KeyError: pass for row_index, row in enumerate(table.rows): for cell in row.cells: for paragraph in cell.paragraphs: apply_paragraph_profile( paragraph, font_name=table_spec["font_name"], font_size=table_spec["font_size"], bold=bool(row_index == 0 and table_spec["header_bold"]), first_line_indent=0, line_spacing=1.0, space_before=0, space_after=0, ) def apply_document_profile(document: Document, docx_style_profile: str) -> None: spec = get_style_spec(docx_style_profile) for paragraph in document.paragraphs: style_name = paragraph.style.name if paragraph.style else None level = heading_level_for_style(style_name) if level: heading_spec = spec["headings"].get(level, spec["headings"][4]) paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT paragraph.paragraph_format.first_line_indent = Pt(0) paragraph.paragraph_format.space_before = Pt(heading_spec["space_before"]) paragraph.paragraph_format.space_after = Pt(heading_spec["space_after"]) paragraph.paragraph_format.keep_with_next = True apply_paragraph_profile( paragraph, font_name=heading_spec["font_name"], font_size=heading_spec["font_size"], bold=heading_spec["bold"], first_line_indent=0, line_spacing=1.0, space_before=heading_spec["space_before"], space_after=heading_spec["space_after"], ) continue apply_paragraph_profile( paragraph, font_name=spec["normal"]["font_name"], font_size=spec["normal"]["font_size"], bold=spec["normal"]["bold"], first_line_indent=spec["normal"]["first_line_indent"], line_spacing=spec["normal"]["line_spacing"], space_before=spec["normal"]["space_before"], space_after=spec["normal"]["space_after"], ) for table in document.tables: apply_table_profile(table, docx_style_profile) def remove_initial_blank_paragraph(document: Document) -> None: if len(document.paragraphs) != 1: return paragraph = document.paragraphs[0] if normalize_text(paragraph.text): return delete_block(paragraph) def validate_format_profile(document: Document, docx_style_profile: str) -> dict[str, Any]: spec = get_style_spec(docx_style_profile) issues: list[str] = [] styles = document.styles for style_name, expected in ( ("Normal", spec["normal"]), ("Heading 1", spec["headings"][1]), ("Heading 2", spec["headings"][2]), ("Heading 3", spec["headings"][3]), ("Heading 4", spec["headings"][4]), ): style = styles[style_name] actual_size = style.font.size.pt if style.font.size is not None else None if style.font.name != expected["font_name"]: issues.append(f"{style_name} font should be {expected['font_name']}, got {style.font.name!r}") if actual_size is None or abs(actual_size - expected["font_size"]) > 0.2: issues.append(f"{style_name} size should be {expected['font_size']}, got {actual_size!r}") return { "status": "pass" if not issues else "fail", "profile": docx_style_profile, "issues": issues, } def validate_heading_numbering(document: Document, numbering_mode: str) -> dict[str, Any]: if numbering_mode != DEFAULT_NUMBERING_MODE: return { "status": "fail", "mode": numbering_mode, "checked_headings": 0, "issues": [f"unsupported numbering_mode: {numbering_mode}"], } counters = [0] * 9 issues: list[str] = [] checked = 0 for paragraph in document.paragraphs: style_name = paragraph.style.name if paragraph.style else None level = heading_level_for_style(style_name) if not level: continue checked += 1 counters[level - 1] += 1 for index in range(level, len(counters)): counters[index] = 0 expected = ".".join(str(value) for value in counters[:level] if value) match = HEADING_NUMBER_PATTERN.match(normalize_text(paragraph.text)) actual = match.group("number") if match else None if actual != expected: issues.append(f"{paragraph.text!r} should use heading number {expected}") return { "status": "pass" if not issues else "fail", "mode": numbering_mode, "checked_headings": checked, "issues": issues, } def validate_caption_numbering(document: Document) -> dict[str, Any]: counters: dict[tuple[str, int], int] = defaultdict(int) issues: list[str] = [] caption_count = 0 for paragraph in document.paragraphs: match = CAPTION_PATTERN.match(normalize_text(paragraph.text)) if not match: continue caption_count += 1 kind, chapter_text, index_text, _ = match.groups() chapter = int(chapter_text) index = int(index_text) counters[(kind, chapter)] += 1 if counters[(kind, chapter)] != index: issues.append(f"{paragraph.text!r} caption index should be {counters[(kind, chapter)]}") return { "status": "pass" if not issues else "fail", "caption_count": caption_count, "issues": issues, } def document_has_toc(document: Document) -> bool: body = document.element.body for element in body.iter(): if element.tag.endswith("}instrText") and "TOC" in "".join(element.itertext()): return True return False def validate_toc(document: Document, document_kind: str) -> dict[str, Any]: has_toc = document_has_toc(document) if has_toc: return {"status": "pass", "has_toc": True, "issues": []} if document_kind == "outline": return { "status": "pass", "has_toc": False, "issues": ["outline documents can use heading numbering directly as visible目录内容"], } return { "status": "pass", "has_toc": False, "issues": ["TOC field not found; current workflow allows user to insert or update TOC in Word"], } def collect_placeholder_hits(document: Document) -> list[str]: hits: list[str] = [] for paragraph in document.paragraphs: text = normalize_text(paragraph.text) if text and PLACEHOLDER_PATTERN.search(text): hits.append(text) for table in document.tables: for row in table.rows: for cell in row.cells: text = normalize_text(cell.text) if text and PLACEHOLDER_PATTERN.search(text): hits.append(text) return hits def validate_placeholders(document: Document, document_kind: str) -> dict[str, Any]: hits = collect_placeholder_hits(document) allow_hits = document_kind == "outline" status = "pass" if allow_hits or not hits else "fail" return { "status": status, "placeholder_count": len(hits), "issues": hits[:20], } def build_acceptance_checks(*, format_profile: dict[str, Any], numbering_validation: dict[str, Any], caption_validation: dict[str, Any], toc_validation: dict[str, Any], placeholder_validation: dict[str, Any], render_status: str | None = None) -> dict[str, Any]: checks = [ {"name": "format_profile", "status": format_profile["status"]}, {"name": "numbering_validation", "status": numbering_validation["status"]}, {"name": "caption_validation", "status": caption_validation["status"]}, {"name": "toc_validation", "status": toc_validation["status"]}, {"name": "placeholder_validation", "status": placeholder_validation["status"]}, ] if render_status is not None: checks.append( { "name": "render_validation", "status": "pass" if render_status == "ok" else ("warn" if render_status == "render_skipped" else "fail"), } ) overall_status = "fail" if any(item["status"] == "fail" for item in checks) else "pass" return { "status": overall_status, "checks": checks, } def inspect_document_quality(docx_path: Path, *, docx_style_profile: str, numbering_mode: str, document_kind: str, render_status: str | None = None) -> dict[str, Any]: document = Document(str(docx_path)) format_profile = validate_format_profile(document, docx_style_profile) numbering_validation = validate_heading_numbering(document, numbering_mode) caption_validation = validate_caption_numbering(document) toc_validation = validate_toc(document, document_kind) placeholder_validation = validate_placeholders(document, document_kind) acceptance_checks = build_acceptance_checks( format_profile=format_profile, numbering_validation=numbering_validation, caption_validation=caption_validation, toc_validation=toc_validation, placeholder_validation=placeholder_validation, render_status=render_status, ) return { "format_profile": format_profile, "numbering_validation": numbering_validation, "caption_validation": caption_validation, "toc_validation": toc_validation, "placeholder_validation": placeholder_validation, "acceptance_checks": acceptance_checks, } def create_docx_document(spec_data: dict[str, Any]) -> dict[str, Any]: output_docx = Path(spec_data["output_docx"]).resolve() blocks = spec_data.get("blocks", []) if not isinstance(blocks, list): raise QueryError("blocks must be a list") options = resolve_generation_options(spec_data) template_docx = options["template_docx"] output_docx.parent.mkdir(parents=True, exist_ok=True) document = Document(str(Path(template_docx).resolve())) if template_docx else Document() title = spec_data.get("title") if title: document.core_properties.title = str(title) initialize_default_bid_styles(document, options["docx_style_profile"]) remove_initial_blank_paragraph(document) block_reports: list[dict[str, Any]] = [] def render_block(block: dict[str, Any], index_path: list[int]) -> None: if not isinstance(block, dict): raise QueryError(f"block {'.'.join(str(part) for part in index_path)} must be an object") block_type = block.get("type", "paragraph") if block_type == "heading": level = int(block.get("level", 1)) if level < 1 or level > 9: raise QueryError(f"block {'.'.join(str(part) for part in index_path)} heading level must be between 1 and 9") text = str(block.get("text", "")) paragraph = document.add_paragraph(style=f"Heading {level}") run = paragraph.add_run(text) heading_spec = get_style_spec(options["docx_style_profile"])["headings"].get(level, get_style_spec(options["docx_style_profile"])["headings"][4]) apply_run_font(run, font_name=heading_spec["font_name"], font_size=heading_spec["font_size"], bold=heading_spec["bold"]) block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "text": summarize_text(text), "level": level}) children = block.get("children", []) if children and not isinstance(children, list): raise QueryError(f"block {'.'.join(str(part) for part in index_path)} children must be a list") if isinstance(children, list): for child_index, child in enumerate(children): render_block(child, index_path + [child_index]) return if block_type == "paragraph": text = str(block.get("text", "")) paragraph = document.add_paragraph() style_name = block.get("style") if style_name: try: paragraph.style = str(style_name) except KeyError: pass run = paragraph.add_run(text) normal_spec = get_style_spec(options["docx_style_profile"])["normal"] apply_run_font(run, font_name=normal_spec["font_name"], font_size=normal_spec["font_size"], bold=normal_spec["bold"]) block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "text": summarize_text(text)}) return if block_type == "list": items = block.get("items", []) if not isinstance(items, list): raise QueryError(f"block {'.'.join(str(part) for part in index_path)} items must be a list") style_name = str(block.get("style", "List Bullet")) for item in items: paragraph = document.add_paragraph() try: paragraph.style = style_name except KeyError: pass run = paragraph.add_run(str(item)) normal_spec = get_style_spec(options["docx_style_profile"])["normal"] apply_run_font(run, font_name=normal_spec["font_name"], font_size=normal_spec["font_size"], bold=normal_spec["bold"]) block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "item_count": len(items)}) return if block_type == "table": rows = block.get("rows", []) if not isinstance(rows, list) or not rows or not isinstance(rows[0], list) or not rows[0]: raise QueryError(f"block {'.'.join(str(part) for part in index_path)} rows must be a non-empty 2D list") table = document.add_table(rows=0, cols=len(rows[0])) style_name = block.get("style") if style_name: try: table.style = str(style_name) except KeyError: pass for row_values in rows: if not isinstance(row_values, list) or len(row_values) != len(rows[0]): raise QueryError(f"block {'.'.join(str(part) for part in index_path)} table rows must have equal column counts") row = table.add_row() for cell_index, value in enumerate(row_values): row.cells[cell_index].text = str(value) apply_table_profile(table, options["docx_style_profile"]) block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "row_count": len(rows), "column_count": len(rows[0])}) return if block_type == "page_break": document.add_page_break() block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type}) return raise QueryError(f"unsupported block type: {block_type}") for index, block in enumerate(blocks): render_block(block, [index]) apply_heading_numbering(document, options["numbering_mode"]) apply_document_profile(document, options["docx_style_profile"]) document.save(str(output_docx)) final_index = index_document(output_docx) quality = inspect_document_quality( output_docx, docx_style_profile=options["docx_style_profile"], numbering_mode=options["numbering_mode"], document_kind=options["document_kind"], ) return { "status": "ok", "output_docx": str(output_docx), "block_count": len(blocks), "blocks": block_reports, "final_summary": final_index["summary"], "docx_style_profile": options["docx_style_profile"], "numbering_mode": options["numbering_mode"], "document_kind": options["document_kind"], "template_docx": str(Path(template_docx).resolve()) if template_docx else None, **quality, } def export_outline_artifacts(payload: dict[str, Any]) -> dict[str, Any]: technical_outline = payload.get("technical_outline") business_outline = payload.get("business_outline") technical_json = Path(payload["technical_outline_json"]).resolve() business_json = Path(payload["business_outline_json"]).resolve() technical_docx = Path(payload["technical_docx"]).resolve() business_docx = Path(payload["business_docx"]).resolve() options = resolve_generation_options(payload) for outline_name, outline in (("technical_outline", technical_outline), ("business_outline", business_outline)): if not isinstance(outline, dict): raise QueryError(f"{outline_name} must be an object") if not isinstance(outline.get("blocks"), list): raise QueryError(f"{outline_name}.blocks must be a list") write_json(technical_json, technical_outline) write_json(business_json, business_outline) technical_report = create_docx_document( { "output_docx": str(technical_docx), "title": str(technical_outline.get("title", "技术标目录")), "blocks": technical_outline["blocks"], "docx_style_profile": options["docx_style_profile"], "numbering_mode": options["numbering_mode"], "template_docx": options["template_docx"], "document_kind": "outline", } ) business_report = create_docx_document( { "output_docx": str(business_docx), "title": str(business_outline.get("title", "商务及其他目录")), "blocks": business_outline["blocks"], "docx_style_profile": options["docx_style_profile"], "numbering_mode": options["numbering_mode"], "template_docx": options["template_docx"], "document_kind": "outline", } ) return { "status": "ok", "docx_style_profile": options["docx_style_profile"], "numbering_mode": options["numbering_mode"], "document_kind": "outline", "template_docx": str(Path(options["template_docx"]).resolve()) if options["template_docx"] else None, "technical_outline_json": str(technical_json), "business_outline_json": str(business_json), "technical_docx": str(technical_docx), "business_docx": str(business_docx), "technical_report": technical_report, "business_report": business_report, } def slugify_text(value: str, *, limit: int = 32) -> str: compact = normalize_text(value) if not compact: return "empty" compact = re.sub(r"[^\w\u4e00-\u9fff-]+", "-", compact, flags=re.UNICODE) compact = re.sub(r"-+", "-", compact).strip("-").lower() return compact[:limit] or "empty" def summarize_text(value: str, *, limit: int = 80) -> str: return normalize_text(value)[:limit] def iter_block_items(parent: DocxDocument | _Cell) -> Iterator[Paragraph | Table]: parent_element = parent.element.body if isinstance(parent, DocxDocument) else parent._tc for child in parent_element.iterchildren(): if child.tag.endswith("}p"): yield Paragraph(child, parent) elif child.tag.endswith("}tbl"): yield Table(child, parent) def paragraph_has_image(paragraph: Paragraph) -> bool: return bool(paragraph._element.xpath(".//w:drawing")) def paragraph_is_list_item(paragraph: Paragraph) -> bool: style_name = paragraph.style.name if paragraph.style else "" if style_name.lower().startswith("list"): return True p_pr = paragraph._element.pPr return p_pr is not None and p_pr.numPr is not None def build_anchor(path: list[str], node_type: str, text: str, ordinal: int) -> str: seed = "|".join(["/".join(path), node_type, summarize_text(text, limit=32), str(ordinal)]) digest = sha1(seed.encode("utf-8")).hexdigest()[:10] slug = slugify_text(text, limit=24) path_slug = slugify_text("-".join(path), limit=24) return f"{path_slug}:{node_type}:{slug}:{ordinal}:{digest}" def _index_document_core(document: Document) -> list[NodeRecord]: nodes: list[NodeRecord] = [] heading_stack: list[str] = [] heading_ids: dict[int, str] = {} ordinal = 0 def current_parent_id() -> str | None: if not heading_ids: return None return heading_ids[max(heading_ids)] def add_record( *, node_type: str, text: str, style_name: str | None, heading_level: int | None, path: list[str], parent_id: str | None, container: str, object_ref: Any, table_index: int | None = None, row_index: int | None = None, cell_index: int | None = None, block_index: int | None = None, xml_path: str | None = None, has_image: bool = False, ) -> NodeRecord: nonlocal ordinal ordinal += 1 node_id = f"n-{ordinal:05d}" record = NodeRecord( node_id=node_id, node_type=node_type, text=normalize_text(text), style_name=style_name, heading_level=heading_level, path=path, ordinal=ordinal, parent_id=parent_id, anchor=build_anchor(path, node_type, text, ordinal), container=container, table_index=table_index, row_index=row_index, cell_index=cell_index, block_index=block_index, xml_path=xml_path, has_image=has_image, object_ref=object_ref, ) nodes.append(record) return record for block_index, block in enumerate(iter_block_items(document)): if isinstance(block, Paragraph): text = normalize_text(block.text) style_name = block.style.name if block.style else None level = heading_level_for_style(style_name) if level is not None: while len(heading_stack) >= level: heading_stack.pop() heading_stack.append(text or f"Heading {level}") heading_ids = {key: value for key, value in heading_ids.items() if key < level} record = add_record( node_type="heading", text=text, style_name=style_name, heading_level=level, path=list(heading_stack), parent_id=heading_ids.get(level - 1), container="document", object_ref=block, block_index=block_index, has_image=paragraph_has_image(block), ) heading_ids[level] = record.node_id if record.has_image: add_record( node_type="image_placeholder", text=text or "[image]", style_name=style_name, heading_level=level, path=list(heading_stack), parent_id=record.node_id, container="document", object_ref=block, block_index=block_index, has_image=True, ) continue record = add_record( node_type="list_item" if paragraph_is_list_item(block) else "paragraph", text=text, style_name=style_name, heading_level=None, path=list(heading_stack), parent_id=current_parent_id(), container="document", object_ref=block, block_index=block_index, has_image=paragraph_has_image(block), ) if record.has_image: add_record( node_type="image_placeholder", text=text or "[image]", style_name=style_name, heading_level=None, path=list(heading_stack), parent_id=record.node_id, container="document", object_ref=block, block_index=block_index, has_image=True, ) else: table_text = "\n".join( " | ".join(normalize_text(cell.text) for cell in row.cells) for row in block.rows ) table_record = add_record( node_type="table", text=table_text, style_name=block.style.name if block.style else None, heading_level=None, path=list(heading_stack), parent_id=current_parent_id(), container="document", object_ref=block, block_index=block_index, xml_path=f"table[{block_index}]", ) for row_index, row in enumerate(block.rows): row_text = " | ".join(normalize_text(cell.text) for cell in row.cells) row_record = add_record( node_type="table_row", text=row_text, style_name=table_record.style_name, heading_level=None, path=list(heading_stack), parent_id=table_record.node_id, container="table", object_ref=row, table_index=block_index, row_index=row_index, xml_path=f"table[{block_index}]/row[{row_index}]", ) for cell_index, cell in enumerate(row.cells): add_record( node_type="table_cell", text="\n".join( normalize_text(paragraph.text) for paragraph in cell.paragraphs if normalize_text(paragraph.text) ), style_name=None, heading_level=None, path=list(heading_stack), parent_id=row_record.node_id, container="table", object_ref=cell, table_index=block_index, row_index=row_index, cell_index=cell_index, xml_path=f"table[{block_index}]/row[{row_index}]/cell[{cell_index}]", ) return nodes def index_document(docx_path: Path) -> dict[str, Any]: document = Document(str(docx_path)) nodes = _index_document_core(document) return { "status": "ok", "docx": str(docx_path), "summary": { "node_count": len(nodes), "heading_count": sum(1 for node in nodes if node.node_type == "heading"), "paragraph_count": sum(1 for node in nodes if node.node_type == "paragraph"), "list_item_count": sum(1 for node in nodes if node.node_type == "list_item"), "table_count": sum(1 for node in nodes if node.node_type == "table"), "image_placeholder_count": sum(1 for node in nodes if node.node_type == "image_placeholder"), }, "nodes": [node.to_dict() for node in nodes], } def query_nodes(index_data: dict[str, Any], query: dict[str, Any]) -> dict[str, Any]: nodes = index_data.get("nodes", []) mode = query.get("match_mode", "contains_text") value = query.get("value") if value is None and mode not in {"node_type"}: raise QueryError("query.value is required") node_type_filter = query.get("node_type") style_name_filter = query.get("style_name") heading_level = query.get("heading_level") allow_multiple = bool(query.get("allow_multiple", False)) occurrence = query.get("occurrence") window = int(query.get("context_window", TEXT_WINDOW_DEFAULT)) def node_matches(node: dict[str, Any]) -> bool: if node_type_filter and node.get("node_type") != node_type_filter: return False if style_name_filter and node.get("style_name") != style_name_filter: return False if heading_level is not None and node.get("heading_level") != heading_level: return False node_text = node.get("text", "") if mode == "exact_text": return node_text == value if mode == "contains_text": return value in node_text if mode == "regex": return re.search(value, node_text) is not None if mode == "heading_path": return node.get("node_type") == "heading" and " > ".join(node.get("path", [])) == value if mode == "heading_text": return node.get("node_type") == "heading" and node_text == value if mode == "table_title": path_parts = node.get("path", []) return node.get("node_type") == "table" and bool(path_parts) and path_parts[-1] == value if mode == "style_name": return node.get("style_name") == value if mode == "node_type": return node.get("node_type") == query.get("value") if mode == "anchor": return node.get("anchor") == value if mode == "node_id": return node.get("node_id") == value raise QueryError(f"unsupported match_mode: {mode}") matches = [node for node in nodes if node_matches(node)] if occurrence is not None: matches = [matches[occurrence]] if 0 <= occurrence < len(matches) else [] ambiguous = len(matches) > 1 and not allow_multiple best_match = matches[0] if len(matches) == 1 or (allow_multiple and matches) else None def with_context(node: dict[str, Any]) -> dict[str, Any]: text = node.get("text", "") return { **node, "context": { "before": text[:window], "after": text[-window:] if text else "", }, } return { "status": "ok", "query": query, "match_count": len(matches), "ambiguous": ambiguous, "best_match": with_context(best_match) if best_match else None, "candidate_anchors": [match["anchor"] for match in matches], "matches": [with_context(match) for match in matches], "errors": ["query matched multiple nodes"] if ambiguous else [], "warnings": [], } def find_records(index_data: dict[str, Any], query: dict[str, Any]) -> list[dict[str, Any]]: result = query_nodes(index_data, query) if result["ambiguous"] and query.get("on_ambiguous", "error") == "error": raise QueryError("query matched multiple nodes") if result["match_count"] == 0 and query.get("on_missing", "error") == "error": raise QueryError("query matched no nodes") return result["matches"] def clone_run_format(source_run: Any, target_run: Any) -> None: target_run.bold = source_run.bold target_run.italic = source_run.italic target_run.underline = source_run.underline target_run.font.name = source_run.font.name target_run.font.size = source_run.font.size if source_run.font.color and source_run.font.color.rgb: target_run.font.color.rgb = source_run.font.color.rgb if qn and source_run._element.rPr is not None and source_run._element.rPr.rFonts is not None: east_asia = source_run._element.rPr.rFonts.get(qn("w:eastAsia")) if east_asia: target_run._element.get_or_add_rPr().rFonts.set(qn("w:eastAsia"), east_asia) def clear_paragraph(paragraph: Paragraph) -> None: p_element = paragraph._element for child in list(p_element): if child.tag.endswith("}r") or child.tag.endswith("}hyperlink"): p_element.remove(child) def replace_text_in_paragraph(paragraph: Paragraph, old_text: str, new_text: str) -> bool: if old_text not in paragraph.text: return False for run in paragraph.runs: if old_text in run.text: run.text = run.text.replace(old_text, new_text, 1) return True existing_runs = list(paragraph.runs) first_run = existing_runs[0] if existing_runs else paragraph.add_run() clear_paragraph(paragraph) new_run = paragraph.add_run(new_text) if existing_runs: clone_run_format(first_run, new_run) return True def delete_block(block: Paragraph | Table) -> None: element = block._element parent = element.getparent() if parent is not None: parent.remove(element) def insert_paragraph_relative(target: Paragraph | Table, *, after: bool, style_name: str | None = None) -> Paragraph: new_p = OxmlElement("w:p") if after: target._element.addnext(new_p) else: target._element.addprevious(new_p) paragraph = Paragraph(new_p, target._parent) if style_name: try: paragraph.style = style_name except KeyError: pass return paragraph def append_paragraph_contents(paragraph: Paragraph, text: str, source: Paragraph | None = None) -> None: if source is not None and source.style is not None: paragraph.style = source.style paragraph.paragraph_format.left_indent = source.paragraph_format.left_indent paragraph.paragraph_format.right_indent = source.paragraph_format.right_indent paragraph.paragraph_format.first_line_indent = source.paragraph_format.first_line_indent paragraph.paragraph_format.space_before = source.paragraph_format.space_before paragraph.paragraph_format.space_after = source.paragraph_format.space_after paragraph.paragraph_format.line_spacing = source.paragraph_format.line_spacing paragraph.alignment = source.alignment if source is not None and source.runs: run = paragraph.add_run(text) clone_run_format(source.runs[0], run) else: paragraph.add_run(text) def create_table_after(target: Paragraph | Table, rows: list[list[str]], style_name: str | None = None) -> Table: parent = target._parent cols = len(rows[0]) if rows else 1 table = parent.add_table(rows=0, cols=cols) if style_name: try: table.style = style_name except KeyError: pass for row_values in rows: row = table.add_row() for index, value in enumerate(row_values): row.cells[index].text = value target._element.addnext(table._element) return table def build_live_index(document: Document) -> tuple[dict[str, Any], dict[str, NodeRecord]]: nodes = _index_document_core(document) return { "status": "ok", "summary": {"node_count": len(nodes)}, "nodes": [node.to_dict() for node in nodes], }, {node.anchor: node for node in nodes} def insert_blocks(record: NodeRecord, operation: dict[str, Any], *, after: bool) -> None: content_type = operation.get("content_type", "paragraphs") content = operation.get("content") if record.node_type not in {"paragraph", "list_item", "heading", "table"}: raise QueryError("insert operations only support block nodes") target = record.object_ref if content_type == "paragraphs": paragraphs = content if isinstance(content, list) else [str(content)] previous: Paragraph | Table = target for index, paragraph_text in enumerate(paragraphs): new_paragraph = insert_paragraph_relative( previous, after=after if index == 0 else True, style_name=record.style_name if record.node_type in {"paragraph", "list_item"} else "Normal", ) source_paragraph = target if isinstance(target, Paragraph) and record.node_type in {"paragraph", "list_item"} else None append_paragraph_contents(new_paragraph, str(paragraph_text), source=source_paragraph) previous = new_paragraph return if content_type == "heading": payload = content if isinstance(content, dict) else {"text": str(content)} level = int(payload.get("level", record.heading_level or 1)) new_paragraph = insert_paragraph_relative(target, after=after, style_name=f"Heading {level}") append_paragraph_contents(new_paragraph, str(payload.get("text", "")), source=target if isinstance(target, Paragraph) else None) try: new_paragraph.style = f"Heading {level}" except KeyError: pass return if content_type == "list": items = content if isinstance(content, list) else [] previous: Paragraph | Table = target for index, item in enumerate(items): new_paragraph = insert_paragraph_relative( previous, after=after if index == 0 else True, style_name="List Bullet", ) source_paragraph = target if isinstance(target, Paragraph) and record.node_type == "list_item" else None append_paragraph_contents(new_paragraph, str(item), source=source_paragraph) try: new_paragraph.style = "List Bullet" except KeyError: pass previous = new_paragraph return if content_type == "table": rows = content.get("rows") if isinstance(content, dict) else content if not isinstance(rows, list) or not rows: raise QueryError("table content must provide rows") style_name = None if isinstance(target, Table) and target.style is not None: style_name = target.style.name create_table_after(target, rows, style_name=style_name) return raise QueryError(f"unsupported content_type: {content_type}") def replace_block(record: NodeRecord, operation: dict[str, Any]) -> None: target = record.object_ref insert_blocks(record, operation, after=False) delete_block(target) def apply_patch_document(patch_data: dict[str, Any]) -> dict[str, Any]: source_docx = Path(patch_data["source_docx"]).resolve() output_docx = Path(patch_data.get("output_docx", source_docx)).resolve() in_place = bool(patch_data.get("in_place", False)) options = resolve_generation_options(patch_data) if not in_place and output_docx == source_docx: raise QueryError("output_docx must differ from source_docx unless in_place is true") output_docx.parent.mkdir(parents=True, exist_ok=True) if not in_place: shutil.copy2(source_docx, output_docx) document = Document(str(output_docx)) initialize_default_bid_styles(document, options["docx_style_profile"]) operations = patch_data.get("operations", []) operation_reports: list[dict[str, Any]] = [] for index, operation in enumerate(operations): live_index, record_map = build_live_index(document) matches = find_records(live_index, operation.get("target", {})) if len(matches) > 1 and operation.get("on_ambiguous", "error") == "error": raise QueryError(f"operation {index} matched multiple nodes") selected = matches if operation.get("allow_multiple") else matches[:1] if not selected and operation.get("on_missing", "error") == "error": raise QueryError(f"operation {index} matched no nodes") affected: list[dict[str, Any]] = [] for match in selected: record = record_map[match["anchor"]] before_summary = summarize_text(record.text) op_name = operation["op"] if op_name == "replace_text": old_text = operation["old_text"] new_text = operation["new_text"] if record.node_type not in {"paragraph", "list_item", "heading"}: raise QueryError("replace_text only supports paragraph-like nodes") if not replace_text_in_paragraph(record.object_ref, old_text, new_text): raise QueryError(f"text not found in node {record.anchor}") elif op_name == "delete_node": if record.node_type not in {"paragraph", "list_item", "heading", "table"}: raise QueryError("delete_node only supports block nodes") delete_block(record.object_ref) elif op_name == "insert_before": insert_blocks(record, operation, after=False) elif op_name == "insert_after": insert_blocks(record, operation, after=True) elif op_name == "replace_node": replace_block(record, operation) else: raise QueryError(f"unsupported op: {op_name}") affected.append( { "anchor": record.anchor, "node_type": record.node_type, "before": before_summary, "op": op_name, } ) document.save(str(output_docx)) operation_reports.append( { "index": index, "op": operation["op"], "match_count": len(selected), "affected": affected, } ) apply_heading_numbering(document, options["numbering_mode"]) apply_document_profile(document, options["docx_style_profile"]) document.save(str(output_docx)) final_index = index_document(output_docx) quality = inspect_document_quality( output_docx, docx_style_profile=options["docx_style_profile"], numbering_mode=options["numbering_mode"], document_kind=options["document_kind"], ) return { "status": "ok", "source_docx": str(source_docx), "output_docx": str(output_docx), "in_place": in_place, "docx_style_profile": options["docx_style_profile"], "numbering_mode": options["numbering_mode"], "document_kind": options["document_kind"], "template_docx": str(Path(options["template_docx"]).resolve()) if options["template_docx"] else None, "operation_count": len(operations), "operations": operation_reports, "errors": [], "warnings": [], "final_summary": final_index["summary"], **quality, } def render_docx(docx_path: Path, out_dir: Path, *, docx_style_profile: str = DEFAULT_DOCX_STYLE_PROFILE, numbering_mode: str = DEFAULT_NUMBERING_MODE, document_kind: str = DEFAULT_DOCUMENT_KIND) -> dict[str, Any]: out_dir.mkdir(parents=True, exist_ok=True) pdf_path = out_dir / f"{docx_path.stem}.pdf" png_dir = out_dir / "pages" png_dir.mkdir(parents=True, exist_ok=True) soffice = shutil.which("soffice") if not soffice: report = { "status": "render_skipped", "docx": str(docx_path), "docx_style_profile": docx_style_profile, "numbering_mode": numbering_mode, "document_kind": document_kind, "pdf": None, "page_count": 0, "images": [], "errors": [], "warnings": ["LibreOffice/soffice not found"], } report.update( inspect_document_quality( docx_path, docx_style_profile=docx_style_profile, numbering_mode=numbering_mode, document_kind=document_kind, render_status=report["status"], ) ) return report process = subprocess.run( [soffice, "--headless", "--convert-to", "pdf", "--outdir", str(out_dir), str(docx_path)], capture_output=True, text=True, encoding="utf-8", ) if process.returncode != 0 or not pdf_path.exists(): report = { "status": "error", "docx": str(docx_path), "docx_style_profile": docx_style_profile, "numbering_mode": numbering_mode, "document_kind": document_kind, "pdf": str(pdf_path), "page_count": 0, "images": [], "errors": [process.stderr.strip() or "failed to convert docx to pdf"], "warnings": [], } report.update( inspect_document_quality( docx_path, docx_style_profile=docx_style_profile, numbering_mode=numbering_mode, document_kind=document_kind, render_status=report["status"], ) ) return report images: list[str] = [] warnings: list[str] = [] if convert_from_path is None: warnings.append("pdf2image not installed") else: try: for page_number, image in enumerate(convert_from_path(str(pdf_path)), start=1): image_path = png_dir / f"page-{page_number:03d}.png" image.save(str(image_path), "PNG") images.append(str(image_path)) except Exception as exc: # pragma: no cover warnings.append(f"PNG render skipped: {exc}") report = { "status": "ok", "docx": str(docx_path), "docx_style_profile": docx_style_profile, "numbering_mode": numbering_mode, "document_kind": document_kind, "pdf": str(pdf_path), "page_count": len(images), "images": images, "errors": [], "warnings": warnings, } report.update( inspect_document_quality( docx_path, docx_style_profile=docx_style_profile, numbering_mode=numbering_mode, document_kind=document_kind, render_status=report["status"], ) ) return report