Skill-BidCreater/scripts/docx_ops_lib.py
2026-03-14 08:49:28 +08:00

854 lines
34 KiB
Python

from __future__ import annotations
import json
import re
import shutil
import subprocess
from dataclasses import dataclass
from hashlib import sha1
from pathlib import Path
from typing import Any, Iterator
from docx import Document
from docx.document import Document as DocxDocument
from docx.oxml import OxmlElement
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
try:
from pdf2image import convert_from_path
except ImportError: # pragma: no cover
convert_from_path = None
try:
from docx.oxml.ns import qn
except ImportError: # pragma: no cover
qn = None
NAMESPACES = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
TEXT_WINDOW_DEFAULT = 40
@dataclass
class NodeRecord:
node_id: str
node_type: str
text: str
style_name: str | None
heading_level: int | None
path: list[str]
ordinal: int
parent_id: str | None
anchor: str
container: str
table_index: int | None = None
row_index: int | None = None
cell_index: int | None = None
block_index: int | None = None
xml_path: str | None = None
has_image: bool = False
object_ref: Any = None
def to_dict(self) -> dict[str, Any]:
return {
"node_id": self.node_id,
"node_type": self.node_type,
"text": self.text,
"style_name": self.style_name,
"heading_level": self.heading_level,
"path": self.path,
"ordinal": self.ordinal,
"parent_id": self.parent_id,
"anchor": self.anchor,
"container": self.container,
"table_index": self.table_index,
"row_index": self.row_index,
"cell_index": self.cell_index,
"block_index": self.block_index,
"xml_path": self.xml_path,
"has_image": self.has_image,
}
class QueryError(RuntimeError):
pass
def read_json(path: Path) -> Any:
with path.open("r", encoding="utf-8-sig") as handle:
return json.load(handle)
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="\n") as handle:
json.dump(payload, handle, ensure_ascii=False, indent=2)
handle.write("\n")
def heading_level_for_style(style_name: str | None) -> int | None:
if not style_name:
return None
compact_style = normalize_text(style_name)
match = re.match(r"Heading\s+(\d+)$", compact_style, flags=re.IGNORECASE)
if match:
return int(match.group(1))
match = re.match(r"标题\s*(\d+)$", compact_style)
return int(match.group(1)) if match else None
def normalize_text(value: str) -> str:
return re.sub(r"\s+", " ", value or "").strip()
def create_docx_document(spec_data: dict[str, Any]) -> dict[str, Any]:
output_docx = Path(spec_data["output_docx"]).resolve()
blocks = spec_data.get("blocks", [])
if not isinstance(blocks, list):
raise QueryError("blocks must be a list")
output_docx.parent.mkdir(parents=True, exist_ok=True)
document = Document()
title = spec_data.get("title")
if title:
document.core_properties.title = str(title)
block_reports: list[dict[str, Any]] = []
def render_block(block: dict[str, Any], index_path: list[int]) -> None:
if not isinstance(block, dict):
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} must be an object")
block_type = block.get("type", "paragraph")
if block_type == "heading":
level = int(block.get("level", 1))
if level < 1 or level > 9:
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} heading level must be between 1 and 9")
text = str(block.get("text", ""))
paragraph = document.add_paragraph(style=f"Heading {level}")
paragraph.add_run(text)
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "text": summarize_text(text), "level": level})
children = block.get("children", [])
if children and not isinstance(children, list):
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} children must be a list")
if isinstance(children, list):
for child_index, child in enumerate(children):
render_block(child, index_path + [child_index])
return
if block_type == "paragraph":
text = str(block.get("text", ""))
paragraph = document.add_paragraph()
style_name = block.get("style")
if style_name:
try:
paragraph.style = str(style_name)
except KeyError:
pass
paragraph.add_run(text)
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "text": summarize_text(text)})
return
if block_type == "list":
items = block.get("items", [])
if not isinstance(items, list):
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} items must be a list")
style_name = str(block.get("style", "List Bullet"))
for item in items:
paragraph = document.add_paragraph()
try:
paragraph.style = style_name
except KeyError:
pass
paragraph.add_run(str(item))
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "item_count": len(items)})
return
if block_type == "table":
rows = block.get("rows", [])
if not isinstance(rows, list) or not rows or not isinstance(rows[0], list) or not rows[0]:
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} rows must be a non-empty 2D list")
table = document.add_table(rows=0, cols=len(rows[0]))
style_name = block.get("style")
if style_name:
try:
table.style = str(style_name)
except KeyError:
pass
for row_values in rows:
if not isinstance(row_values, list) or len(row_values) != len(rows[0]):
raise QueryError(f"block {'.'.join(str(part) for part in index_path)} table rows must have equal column counts")
row = table.add_row()
for cell_index, value in enumerate(row_values):
row.cells[cell_index].text = str(value)
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type, "row_count": len(rows), "column_count": len(rows[0])})
return
if block_type == "page_break":
document.add_page_break()
block_reports.append({"index": ".".join(str(part) for part in index_path), "type": block_type})
return
raise QueryError(f"unsupported block type: {block_type}")
for index, block in enumerate(blocks):
render_block(block, [index])
document.save(str(output_docx))
final_index = index_document(output_docx)
return {
"status": "ok",
"output_docx": str(output_docx),
"block_count": len(blocks),
"blocks": block_reports,
"final_summary": final_index["summary"],
}
def export_outline_artifacts(payload: dict[str, Any]) -> dict[str, Any]:
technical_outline = payload.get("technical_outline")
business_outline = payload.get("business_outline")
technical_json = Path(payload["technical_outline_json"]).resolve()
business_json = Path(payload["business_outline_json"]).resolve()
technical_docx = Path(payload["technical_docx"]).resolve()
business_docx = Path(payload["business_docx"]).resolve()
for outline_name, outline in (("technical_outline", technical_outline), ("business_outline", business_outline)):
if not isinstance(outline, dict):
raise QueryError(f"{outline_name} must be an object")
if not isinstance(outline.get("blocks"), list):
raise QueryError(f"{outline_name}.blocks must be a list")
write_json(technical_json, technical_outline)
write_json(business_json, business_outline)
technical_report = create_docx_document(
{
"output_docx": str(technical_docx),
"title": str(technical_outline.get("title", "技术标目录")),
"blocks": technical_outline["blocks"],
}
)
business_report = create_docx_document(
{
"output_docx": str(business_docx),
"title": str(business_outline.get("title", "商务及其他目录")),
"blocks": business_outline["blocks"],
}
)
return {
"status": "ok",
"technical_outline_json": str(technical_json),
"business_outline_json": str(business_json),
"technical_docx": str(technical_docx),
"business_docx": str(business_docx),
"technical_report": technical_report,
"business_report": business_report,
}
def slugify_text(value: str, *, limit: int = 32) -> str:
compact = normalize_text(value)
if not compact:
return "empty"
compact = re.sub(r"[^\w\u4e00-\u9fff-]+", "-", compact, flags=re.UNICODE)
compact = re.sub(r"-+", "-", compact).strip("-").lower()
return compact[:limit] or "empty"
def summarize_text(value: str, *, limit: int = 80) -> str:
return normalize_text(value)[:limit]
def iter_block_items(parent: DocxDocument | _Cell) -> Iterator[Paragraph | Table]:
parent_element = parent.element.body if isinstance(parent, DocxDocument) else parent._tc
for child in parent_element.iterchildren():
if child.tag.endswith("}p"):
yield Paragraph(child, parent)
elif child.tag.endswith("}tbl"):
yield Table(child, parent)
def paragraph_has_image(paragraph: Paragraph) -> bool:
return bool(paragraph._element.xpath(".//w:drawing"))
def paragraph_is_list_item(paragraph: Paragraph) -> bool:
style_name = paragraph.style.name if paragraph.style else ""
if style_name.lower().startswith("list"):
return True
p_pr = paragraph._element.pPr
return p_pr is not None and p_pr.numPr is not None
def build_anchor(path: list[str], node_type: str, text: str, ordinal: int) -> str:
seed = "|".join(["/".join(path), node_type, summarize_text(text, limit=32), str(ordinal)])
digest = sha1(seed.encode("utf-8")).hexdigest()[:10]
slug = slugify_text(text, limit=24)
path_slug = slugify_text("-".join(path), limit=24)
return f"{path_slug}:{node_type}:{slug}:{ordinal}:{digest}"
def _index_document_core(document: Document) -> list[NodeRecord]:
nodes: list[NodeRecord] = []
heading_stack: list[str] = []
heading_ids: dict[int, str] = {}
ordinal = 0
def current_parent_id() -> str | None:
if not heading_ids:
return None
return heading_ids[max(heading_ids)]
def add_record(
*,
node_type: str,
text: str,
style_name: str | None,
heading_level: int | None,
path: list[str],
parent_id: str | None,
container: str,
object_ref: Any,
table_index: int | None = None,
row_index: int | None = None,
cell_index: int | None = None,
block_index: int | None = None,
xml_path: str | None = None,
has_image: bool = False,
) -> NodeRecord:
nonlocal ordinal
ordinal += 1
node_id = f"n-{ordinal:05d}"
record = NodeRecord(
node_id=node_id,
node_type=node_type,
text=normalize_text(text),
style_name=style_name,
heading_level=heading_level,
path=path,
ordinal=ordinal,
parent_id=parent_id,
anchor=build_anchor(path, node_type, text, ordinal),
container=container,
table_index=table_index,
row_index=row_index,
cell_index=cell_index,
block_index=block_index,
xml_path=xml_path,
has_image=has_image,
object_ref=object_ref,
)
nodes.append(record)
return record
for block_index, block in enumerate(iter_block_items(document)):
if isinstance(block, Paragraph):
text = normalize_text(block.text)
style_name = block.style.name if block.style else None
level = heading_level_for_style(style_name)
if level is not None:
while len(heading_stack) >= level:
heading_stack.pop()
heading_stack.append(text or f"Heading {level}")
heading_ids = {key: value for key, value in heading_ids.items() if key < level}
record = add_record(
node_type="heading",
text=text,
style_name=style_name,
heading_level=level,
path=list(heading_stack),
parent_id=heading_ids.get(level - 1),
container="document",
object_ref=block,
block_index=block_index,
has_image=paragraph_has_image(block),
)
heading_ids[level] = record.node_id
if record.has_image:
add_record(
node_type="image_placeholder",
text=text or "[image]",
style_name=style_name,
heading_level=level,
path=list(heading_stack),
parent_id=record.node_id,
container="document",
object_ref=block,
block_index=block_index,
has_image=True,
)
continue
record = add_record(
node_type="list_item" if paragraph_is_list_item(block) else "paragraph",
text=text,
style_name=style_name,
heading_level=None,
path=list(heading_stack),
parent_id=current_parent_id(),
container="document",
object_ref=block,
block_index=block_index,
has_image=paragraph_has_image(block),
)
if record.has_image:
add_record(
node_type="image_placeholder",
text=text or "[image]",
style_name=style_name,
heading_level=None,
path=list(heading_stack),
parent_id=record.node_id,
container="document",
object_ref=block,
block_index=block_index,
has_image=True,
)
else:
table_text = "\n".join(
" | ".join(normalize_text(cell.text) for cell in row.cells)
for row in block.rows
)
table_record = add_record(
node_type="table",
text=table_text,
style_name=block.style.name if block.style else None,
heading_level=None,
path=list(heading_stack),
parent_id=current_parent_id(),
container="document",
object_ref=block,
block_index=block_index,
xml_path=f"table[{block_index}]",
)
for row_index, row in enumerate(block.rows):
row_text = " | ".join(normalize_text(cell.text) for cell in row.cells)
row_record = add_record(
node_type="table_row",
text=row_text,
style_name=table_record.style_name,
heading_level=None,
path=list(heading_stack),
parent_id=table_record.node_id,
container="table",
object_ref=row,
table_index=block_index,
row_index=row_index,
xml_path=f"table[{block_index}]/row[{row_index}]",
)
for cell_index, cell in enumerate(row.cells):
add_record(
node_type="table_cell",
text="\n".join(
normalize_text(paragraph.text)
for paragraph in cell.paragraphs
if normalize_text(paragraph.text)
),
style_name=None,
heading_level=None,
path=list(heading_stack),
parent_id=row_record.node_id,
container="table",
object_ref=cell,
table_index=block_index,
row_index=row_index,
cell_index=cell_index,
xml_path=f"table[{block_index}]/row[{row_index}]/cell[{cell_index}]",
)
return nodes
def index_document(docx_path: Path) -> dict[str, Any]:
document = Document(str(docx_path))
nodes = _index_document_core(document)
return {
"status": "ok",
"docx": str(docx_path),
"summary": {
"node_count": len(nodes),
"heading_count": sum(1 for node in nodes if node.node_type == "heading"),
"paragraph_count": sum(1 for node in nodes if node.node_type == "paragraph"),
"list_item_count": sum(1 for node in nodes if node.node_type == "list_item"),
"table_count": sum(1 for node in nodes if node.node_type == "table"),
"image_placeholder_count": sum(1 for node in nodes if node.node_type == "image_placeholder"),
},
"nodes": [node.to_dict() for node in nodes],
}
def query_nodes(index_data: dict[str, Any], query: dict[str, Any]) -> dict[str, Any]:
nodes = index_data.get("nodes", [])
mode = query.get("match_mode", "contains_text")
value = query.get("value")
if value is None and mode not in {"node_type"}:
raise QueryError("query.value is required")
node_type_filter = query.get("node_type")
style_name_filter = query.get("style_name")
heading_level = query.get("heading_level")
allow_multiple = bool(query.get("allow_multiple", False))
occurrence = query.get("occurrence")
window = int(query.get("context_window", TEXT_WINDOW_DEFAULT))
def node_matches(node: dict[str, Any]) -> bool:
if node_type_filter and node.get("node_type") != node_type_filter:
return False
if style_name_filter and node.get("style_name") != style_name_filter:
return False
if heading_level is not None and node.get("heading_level") != heading_level:
return False
node_text = node.get("text", "")
if mode == "exact_text":
return node_text == value
if mode == "contains_text":
return value in node_text
if mode == "regex":
return re.search(value, node_text) is not None
if mode == "heading_path":
return node.get("node_type") == "heading" and " > ".join(node.get("path", [])) == value
if mode == "heading_text":
return node.get("node_type") == "heading" and node_text == value
if mode == "table_title":
path_parts = node.get("path", [])
return node.get("node_type") == "table" and bool(path_parts) and path_parts[-1] == value
if mode == "style_name":
return node.get("style_name") == value
if mode == "node_type":
return node.get("node_type") == query.get("value")
if mode == "anchor":
return node.get("anchor") == value
if mode == "node_id":
return node.get("node_id") == value
raise QueryError(f"unsupported match_mode: {mode}")
matches = [node for node in nodes if node_matches(node)]
if occurrence is not None:
matches = [matches[occurrence]] if 0 <= occurrence < len(matches) else []
ambiguous = len(matches) > 1 and not allow_multiple
best_match = matches[0] if len(matches) == 1 or (allow_multiple and matches) else None
def with_context(node: dict[str, Any]) -> dict[str, Any]:
text = node.get("text", "")
return {
**node,
"context": {
"before": text[:window],
"after": text[-window:] if text else "",
},
}
return {
"status": "ok",
"query": query,
"match_count": len(matches),
"ambiguous": ambiguous,
"best_match": with_context(best_match) if best_match else None,
"candidate_anchors": [match["anchor"] for match in matches],
"matches": [with_context(match) for match in matches],
"errors": ["query matched multiple nodes"] if ambiguous else [],
"warnings": [],
}
def find_records(index_data: dict[str, Any], query: dict[str, Any]) -> list[dict[str, Any]]:
result = query_nodes(index_data, query)
if result["ambiguous"] and query.get("on_ambiguous", "error") == "error":
raise QueryError("query matched multiple nodes")
if result["match_count"] == 0 and query.get("on_missing", "error") == "error":
raise QueryError("query matched no nodes")
return result["matches"]
def clone_run_format(source_run: Any, target_run: Any) -> None:
target_run.bold = source_run.bold
target_run.italic = source_run.italic
target_run.underline = source_run.underline
target_run.font.name = source_run.font.name
target_run.font.size = source_run.font.size
if source_run.font.color and source_run.font.color.rgb:
target_run.font.color.rgb = source_run.font.color.rgb
if qn and source_run._element.rPr is not None and source_run._element.rPr.rFonts is not None:
east_asia = source_run._element.rPr.rFonts.get(qn("w:eastAsia"))
if east_asia:
target_run._element.get_or_add_rPr().rFonts.set(qn("w:eastAsia"), east_asia)
def clear_paragraph(paragraph: Paragraph) -> None:
p_element = paragraph._element
for child in list(p_element):
if child.tag.endswith("}r") or child.tag.endswith("}hyperlink"):
p_element.remove(child)
def replace_text_in_paragraph(paragraph: Paragraph, old_text: str, new_text: str) -> bool:
if old_text not in paragraph.text:
return False
for run in paragraph.runs:
if old_text in run.text:
run.text = run.text.replace(old_text, new_text, 1)
return True
existing_runs = list(paragraph.runs)
first_run = existing_runs[0] if existing_runs else paragraph.add_run()
clear_paragraph(paragraph)
new_run = paragraph.add_run(new_text)
if existing_runs:
clone_run_format(first_run, new_run)
return True
def delete_block(block: Paragraph | Table) -> None:
element = block._element
parent = element.getparent()
if parent is not None:
parent.remove(element)
def insert_paragraph_relative(target: Paragraph | Table, *, after: bool, style_name: str | None = None) -> Paragraph:
new_p = OxmlElement("w:p")
if after:
target._element.addnext(new_p)
else:
target._element.addprevious(new_p)
paragraph = Paragraph(new_p, target._parent)
if style_name:
try:
paragraph.style = style_name
except KeyError:
pass
return paragraph
def append_paragraph_contents(paragraph: Paragraph, text: str, source: Paragraph | None = None) -> None:
if source is not None and source.style is not None:
paragraph.style = source.style
paragraph.paragraph_format.left_indent = source.paragraph_format.left_indent
paragraph.paragraph_format.right_indent = source.paragraph_format.right_indent
paragraph.paragraph_format.first_line_indent = source.paragraph_format.first_line_indent
paragraph.paragraph_format.space_before = source.paragraph_format.space_before
paragraph.paragraph_format.space_after = source.paragraph_format.space_after
paragraph.paragraph_format.line_spacing = source.paragraph_format.line_spacing
paragraph.alignment = source.alignment
if source is not None and source.runs:
run = paragraph.add_run(text)
clone_run_format(source.runs[0], run)
else:
paragraph.add_run(text)
def create_table_after(target: Paragraph | Table, rows: list[list[str]], style_name: str | None = None) -> Table:
parent = target._parent
cols = len(rows[0]) if rows else 1
table = parent.add_table(rows=0, cols=cols)
if style_name:
try:
table.style = style_name
except KeyError:
pass
for row_values in rows:
row = table.add_row()
for index, value in enumerate(row_values):
row.cells[index].text = value
target._element.addnext(table._element)
return table
def build_live_index(document: Document) -> tuple[dict[str, Any], dict[str, NodeRecord]]:
nodes = _index_document_core(document)
return {
"status": "ok",
"summary": {"node_count": len(nodes)},
"nodes": [node.to_dict() for node in nodes],
}, {node.anchor: node for node in nodes}
def insert_blocks(record: NodeRecord, operation: dict[str, Any], *, after: bool) -> None:
content_type = operation.get("content_type", "paragraphs")
content = operation.get("content")
if record.node_type not in {"paragraph", "list_item", "heading", "table"}:
raise QueryError("insert operations only support block nodes")
target = record.object_ref
if content_type == "paragraphs":
paragraphs = content if isinstance(content, list) else [str(content)]
previous: Paragraph | Table = target
for index, paragraph_text in enumerate(paragraphs):
new_paragraph = insert_paragraph_relative(
previous,
after=after if index == 0 else True,
style_name=record.style_name if record.node_type in {"paragraph", "list_item"} else "Normal",
)
source_paragraph = target if isinstance(target, Paragraph) and record.node_type in {"paragraph", "list_item"} else None
append_paragraph_contents(new_paragraph, str(paragraph_text), source=source_paragraph)
previous = new_paragraph
return
if content_type == "heading":
payload = content if isinstance(content, dict) else {"text": str(content)}
level = int(payload.get("level", record.heading_level or 1))
new_paragraph = insert_paragraph_relative(target, after=after, style_name=f"Heading {level}")
append_paragraph_contents(new_paragraph, str(payload.get("text", "")), source=target if isinstance(target, Paragraph) else None)
try:
new_paragraph.style = f"Heading {level}"
except KeyError:
pass
return
if content_type == "list":
items = content if isinstance(content, list) else []
previous: Paragraph | Table = target
for index, item in enumerate(items):
new_paragraph = insert_paragraph_relative(
previous,
after=after if index == 0 else True,
style_name="List Bullet",
)
source_paragraph = target if isinstance(target, Paragraph) and record.node_type == "list_item" else None
append_paragraph_contents(new_paragraph, str(item), source=source_paragraph)
try:
new_paragraph.style = "List Bullet"
except KeyError:
pass
previous = new_paragraph
return
if content_type == "table":
rows = content.get("rows") if isinstance(content, dict) else content
if not isinstance(rows, list) or not rows:
raise QueryError("table content must provide rows")
style_name = None
if isinstance(target, Table) and target.style is not None:
style_name = target.style.name
create_table_after(target, rows, style_name=style_name)
return
raise QueryError(f"unsupported content_type: {content_type}")
def replace_block(record: NodeRecord, operation: dict[str, Any]) -> None:
target = record.object_ref
insert_blocks(record, operation, after=False)
delete_block(target)
def apply_patch_document(patch_data: dict[str, Any]) -> dict[str, Any]:
source_docx = Path(patch_data["source_docx"]).resolve()
output_docx = Path(patch_data.get("output_docx", source_docx)).resolve()
in_place = bool(patch_data.get("in_place", False))
if not in_place and output_docx == source_docx:
raise QueryError("output_docx must differ from source_docx unless in_place is true")
output_docx.parent.mkdir(parents=True, exist_ok=True)
if not in_place:
shutil.copy2(source_docx, output_docx)
document = Document(str(output_docx))
operations = patch_data.get("operations", [])
operation_reports: list[dict[str, Any]] = []
for index, operation in enumerate(operations):
live_index, record_map = build_live_index(document)
matches = find_records(live_index, operation.get("target", {}))
if len(matches) > 1 and operation.get("on_ambiguous", "error") == "error":
raise QueryError(f"operation {index} matched multiple nodes")
selected = matches if operation.get("allow_multiple") else matches[:1]
if not selected and operation.get("on_missing", "error") == "error":
raise QueryError(f"operation {index} matched no nodes")
affected: list[dict[str, Any]] = []
for match in selected:
record = record_map[match["anchor"]]
before_summary = summarize_text(record.text)
op_name = operation["op"]
if op_name == "replace_text":
old_text = operation["old_text"]
new_text = operation["new_text"]
if record.node_type not in {"paragraph", "list_item", "heading"}:
raise QueryError("replace_text only supports paragraph-like nodes")
if not replace_text_in_paragraph(record.object_ref, old_text, new_text):
raise QueryError(f"text not found in node {record.anchor}")
elif op_name == "delete_node":
if record.node_type not in {"paragraph", "list_item", "heading", "table"}:
raise QueryError("delete_node only supports block nodes")
delete_block(record.object_ref)
elif op_name == "insert_before":
insert_blocks(record, operation, after=False)
elif op_name == "insert_after":
insert_blocks(record, operation, after=True)
elif op_name == "replace_node":
replace_block(record, operation)
else:
raise QueryError(f"unsupported op: {op_name}")
affected.append(
{
"anchor": record.anchor,
"node_type": record.node_type,
"before": before_summary,
"op": op_name,
}
)
document.save(str(output_docx))
operation_reports.append(
{
"index": index,
"op": operation["op"],
"match_count": len(selected),
"affected": affected,
}
)
document.save(str(output_docx))
final_index = index_document(output_docx)
return {
"status": "ok",
"source_docx": str(source_docx),
"output_docx": str(output_docx),
"in_place": in_place,
"operation_count": len(operations),
"operations": operation_reports,
"errors": [],
"warnings": [],
"final_summary": final_index["summary"],
}
def render_docx(docx_path: Path, out_dir: Path) -> dict[str, Any]:
out_dir.mkdir(parents=True, exist_ok=True)
pdf_path = out_dir / f"{docx_path.stem}.pdf"
png_dir = out_dir / "pages"
png_dir.mkdir(parents=True, exist_ok=True)
soffice = shutil.which("soffice")
if not soffice:
return {
"status": "render_skipped",
"docx": str(docx_path),
"pdf": None,
"page_count": 0,
"images": [],
"errors": [],
"warnings": ["LibreOffice/soffice not found"],
}
process = subprocess.run(
[soffice, "--headless", "--convert-to", "pdf", "--outdir", str(out_dir), str(docx_path)],
capture_output=True,
text=True,
encoding="utf-8",
)
if process.returncode != 0 or not pdf_path.exists():
return {
"status": "error",
"docx": str(docx_path),
"pdf": str(pdf_path),
"page_count": 0,
"images": [],
"errors": [process.stderr.strip() or "failed to convert docx to pdf"],
"warnings": [],
}
images: list[str] = []
warnings: list[str] = []
if convert_from_path is None:
warnings.append("pdf2image not installed")
else:
try:
for page_number, image in enumerate(convert_from_path(str(pdf_path)), start=1):
image_path = png_dir / f"page-{page_number:03d}.png"
image.save(str(image_path), "PNG")
images.append(str(image_path))
except Exception as exc: # pragma: no cover
warnings.append(f"PNG render skipped: {exc}")
return {
"status": "ok",
"docx": str(docx_path),
"pdf": str(pdf_path),
"page_count": len(images),
"images": images,
"errors": [],
"warnings": warnings,
}