Word 填充后新增 Markdown 清理节点

This commit is contained in:
sladro 2025-12-22 13:57:23 +08:00
parent 58d628347e
commit c285523c1a
6 changed files with 270 additions and 5 deletions

View File

@ -12,6 +12,7 @@ from ...nodes.content import (
PrepareChapterNode,
GenerateContentNode,
SaveToWordNode,
CleanupMarkdownInWordNode,
CollectResultsNode,
)
@ -67,6 +68,7 @@ class ContentWriterAgentBuilder(AgentBuilder):
.add_node(PrepareChapterNode()) \
.add_node(GenerateContentNode()) \
.add_node(SaveToWordNode()) \
.add_node(CleanupMarkdownInWordNode()) \
.add_node(CollectResultsNode())
# 设置入口点
@ -89,9 +91,12 @@ class ContentWriterAgentBuilder(AgentBuilder):
# generate_content → save_to_word
self.add_edge("generate_content", "save_to_word")
# save_to_word → 条件分支(是否继续循环)
# save_to_word → cleanup_markdown_in_word
self.add_edge("save_to_word", "cleanup_markdown_in_word")
# cleanup_markdown_in_word → 条件分支(是否继续循环)
self.add_conditional_edge(
"save_to_word",
"cleanup_markdown_in_word",
should_continue_loop,
{"continue": "prepare_chapter", "collect": "collect_results"},
)

View File

@ -10,6 +10,7 @@ from .base import BaseAgent, AgentBuilder
from ..nodes.content import (
GenerateContentNode,
SaveToWordNode,
CleanupMarkdownInWordNode,
)
logger = logging.getLogger(__name__)
@ -25,14 +26,16 @@ class SingleChapterAgentBuilder(AgentBuilder):
# 添加节点去掉PrepareChapterNode因为外层已准备好
builder.add_node(GenerateContentNode()) \
.add_node(SaveToWordNode())
.add_node(SaveToWordNode()) \
.add_node(CleanupMarkdownInWordNode())
# 设置入口
builder.set_entry("generate_content")
# 配置流程:线性流程
builder.add_edge("generate_content", "save_to_word")
builder.add_edge("save_to_word", "END")
builder.add_edge("save_to_word", "cleanup_markdown_in_word")
builder.add_edge("cleanup_markdown_in_word", "END")
return builder

View File

@ -7,6 +7,7 @@ from .init_config import InitConfigNode
from .prepare_chapter import PrepareChapterNode
from .generate_content import GenerateContentNode
from .save_to_word import SaveToWordNode
from .cleanup_markdown_in_word import CleanupMarkdownInWordNode
from .collect_results import CollectResultsNode
__all__ = [
@ -14,5 +15,6 @@ __all__ = [
"PrepareChapterNode",
"GenerateContentNode",
"SaveToWordNode",
"CleanupMarkdownInWordNode",
"CollectResultsNode",
]

View File

@ -0,0 +1,102 @@
"""Word 填充后 Markdown 清理节点。
在章节内容写入 Word 若检测到生成内容包含 Markdown 语法则遍历文档文本并剥离 Markdown 符号
"""
from __future__ import annotations
import logging
import re
from pathlib import Path
from typing import Any, Dict
from docx import Document
from ..base import BaseNode, NodeContext
from ...utils.markdown_cleanup import clean_markdown_text, contains_markdown
logger = logging.getLogger(__name__)
_PLACEHOLDER_RE = re.compile(r"\{\{.*?\}\}")
class CleanupMarkdownInWordNode(BaseNode):
@property
def name(self) -> str:
return "cleanup_markdown_in_word"
@property
def description(self) -> str:
return "填充后清理Word中的Markdown格式符号"
def execute(self, state: Dict[str, Any], context: NodeContext) -> Dict[str, Any]:
word_file = state.get("word_file")
if not word_file:
return self._update_state(state)
last_content = state.get("last_generated_content")
if not contains_markdown(last_content):
return self._update_state(state)
word_path = Path(str(word_file))
if not word_path.exists():
raise FileNotFoundError(f"Word文档不存在: {word_file}")
stats = self._cleanup_document(word_path)
state["markdown_cleanup_stats"] = stats
return self._update_state(state)
def _cleanup_document(self, word_path: Path) -> Dict[str, int]:
doc = Document(str(word_path))
changed_runs = 0
scanned_runs = 0
scanned_paragraphs = 0
scanned_table_cells = 0
def maybe_clean_run(run) -> None:
nonlocal changed_runs, scanned_runs
scanned_runs += 1
text = run.text
if not text:
return
if _PLACEHOLDER_RE.search(text):
return
cleaned = clean_markdown_text(text)
if cleaned != text:
run.text = cleaned
changed_runs += 1
for para in doc.paragraphs:
scanned_paragraphs += 1
for run in para.runs:
maybe_clean_run(run)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
scanned_table_cells += 1
for para in cell.paragraphs:
scanned_paragraphs += 1
for run in para.runs:
maybe_clean_run(run)
if changed_runs:
doc.save(str(word_path))
logger.info(
"Markdown清理完成: %s (changed_runs=%s, scanned_runs=%s)",
word_path,
changed_runs,
scanned_runs,
)
else:
logger.debug(
"Markdown清理无需修改: %s (scanned_runs=%s)", word_path, scanned_runs
)
return {
"changed_runs": changed_runs,
"scanned_runs": scanned_runs,
"scanned_paragraphs": scanned_paragraphs,
"scanned_table_cells": scanned_table_cells,
}

View File

@ -0,0 +1,77 @@
"""Markdown 清理工具。
用于将生成文本中的 Markdown 语法剥离为纯文本避免残留符号写入 Word
"""
from __future__ import annotations
import re
from typing import Any
_FENCED_CODE_BLOCK_RE = re.compile(r"```[\w+-]*\n([\s\S]*?)```", re.MULTILINE)
_INLINE_CODE_RE = re.compile(r"`([^`\n]+)`")
_IMAGE_RE = re.compile(r"!\[([^\]]*)\]\([^\)]+\)")
_LINK_RE = re.compile(r"\[([^\]]+)\]\([^\)]+\)")
_HEADING_LINE_RE = re.compile(r"^#{1,6}\s+", re.MULTILINE)
_BLOCKQUOTE_RE = re.compile(r"^>\s+", re.MULTILINE)
_UNORDERED_LIST_RE = re.compile(r"^\s*[-*+•]\s+", re.MULTILINE)
_ORDERED_LIST_RE = re.compile(r"^\s*(?:\d+[\.)]|\d+)\s+", re.MULTILINE)
_BOLD_RE = re.compile(r"\*\*([^*\n]+)\*\*")
_ITALIC_RE = re.compile(r"(?<!\*)\*([^*\n]+)\*(?!\*)")
_STRIKETHROUGH_RE = re.compile(r"~~([^~\n]+)~~")
def contains_markdown(value: Any) -> bool:
if value is None:
return False
if isinstance(value, str):
text = value
if not text:
return False
if "```" in text or "**" in text or "~~" in text or "`" in text:
return True
if "[" in text and "](" in text and ")" in text:
return True
if re.search(r"^\s*#{1,6}\s+", text, flags=re.MULTILINE):
return True
if re.search(r"^\s*(?:[-*+•]|\d+[\.)]|\d+|>)\s+", text, flags=re.MULTILINE):
return True
return False
if isinstance(value, dict):
return any(contains_markdown(v) for v in value.values())
if isinstance(value, (list, tuple, set)):
return any(contains_markdown(v) for v in value)
return False
def clean_markdown_text(text: str) -> str:
if not text:
return text
if not contains_markdown(text):
return text
cleaned = text
cleaned = _FENCED_CODE_BLOCK_RE.sub(r"\1", cleaned)
cleaned = _INLINE_CODE_RE.sub(r"\1", cleaned)
cleaned = _IMAGE_RE.sub(r"\1", cleaned)
cleaned = _LINK_RE.sub(r"\1", cleaned)
cleaned = _HEADING_LINE_RE.sub("", cleaned)
cleaned = _BLOCKQUOTE_RE.sub("", cleaned)
cleaned = _UNORDERED_LIST_RE.sub("", cleaned)
cleaned = _ORDERED_LIST_RE.sub("", cleaned)
cleaned = _STRIKETHROUGH_RE.sub(r"\1", cleaned)
cleaned = _BOLD_RE.sub(r"\1", cleaned)
cleaned = _ITALIC_RE.sub(r"\1", cleaned)
return cleaned

View File

@ -8,6 +8,8 @@ import pytest
from docx import Document
from bidmaster.nodes.content.save_to_word import SaveToWordNode
from bidmaster.nodes.content.cleanup_markdown_in_word import CleanupMarkdownInWordNode
from bidmaster.nodes.base import NodeContext
from bidmaster.tools.word import WordProcessor
@ -291,4 +293,78 @@ def test_fill_with_structured_table_block(word_processor: WordProcessor) -> None
assert len(doc.tables) == 1
table = doc.tables[0]
assert table.cell(0, 0).text == "名称"
assert table.cell(1, 0).text == "A型设备"
assert table.cell(1, 0).text == "A型设备"
def test_cleanup_node_preserves_placeholders() -> None:
with TemporaryDirectory() as tmp_dir:
doc_path = Path(tmp_dir) / "placeholder.docx"
doc = Document()
doc.add_paragraph("{{chapter_1_content}}")
doc.save(doc_path)
node = CleanupMarkdownInWordNode()
state = {
"word_file": str(doc_path),
"last_generated_content": "**触发清理**",
}
node.execute(state, NodeContext())
after = Document(doc_path)
assert any("{{chapter_1_content}}" in para.text for para in after.paragraphs)
def test_cleanup_node_removes_markdown_from_structured_blocks(word_processor: WordProcessor) -> None:
with TemporaryDirectory() as tmp_dir:
doc_path = Path(tmp_dir) / "cleanup_structured.docx"
_create_document(doc_path, [("1 技术方案", 1)])
chapter_meta = {
"id": "chapter_1",
"title": "技术方案",
"level": 1,
"placeholder": "{{chapter_1_content}}",
"normalized_title": word_processor._normalize_heading_text("技术方案"),
"heading_number": "1",
"order_index": 1,
}
content = {
"blocks": [
{"type": "paragraph", "text": "## 子标题"},
{
"type": "paragraph",
"text": "这是 **加粗**、`代码` 和 [链接](http://example.com) 以及 ~~删除线~~",
},
{
"type": "table",
"headers": ["**名称**", "`数量`"],
"rows": [["**A**", "`1`"], ["B", "2"]],
},
]
}
word_processor.fill_chapter_content(doc_path, chapter_meta, content)
node = CleanupMarkdownInWordNode()
state = {
"word_file": str(doc_path),
"last_generated_content": content,
}
node.execute(state, NodeContext())
after = Document(doc_path)
text = "\n".join(para.text for para in after.paragraphs)
assert "##" not in text
assert "**" not in text
assert "`" not in text
assert "](" not in text
assert "~~" not in text
assert any(para.text.strip() == "子标题" for para in after.paragraphs)
assert after.tables
table = after.tables[0]
assert table.cell(0, 0).text.strip() == "名称"
assert table.cell(0, 1).text.strip() == "数量"
assert table.cell(1, 0).text.strip() == "A"
assert table.cell(1, 1).text.strip() == "1"