feat: Enhance SerialBatchExecutor with pre-batch cleanup and task execution improvements

- Added pre-batch cleanup functionality to SerialBatchExecutor, allowing for cleanup tasks before processing items.
- Introduced new task execution phases and improved error handling for task submissions.
- Implemented inter-step delays and between-items delays for better task management.
- Updated logging to capture detailed events during batch processing.
- Enhanced configuration options for plugins in software_config.yaml to support new features.
- Added tests for pre-batch cleanup and auto-close scenarios to ensure robust handling of edge cases.
- Created a PowerShell script for automated callback handling from Revit.
This commit is contained in:
sladro 2026-03-03 16:13:19 +08:00
parent 5cf541aa21
commit 08623bf4d6
10 changed files with 1218 additions and 58 deletions

View File

@ -152,6 +152,13 @@ async def handle_client_message(message: dict, client_id: str, user_id: str):
from app.models.operation_log import ActionType, OperationStatus
message_type = message.get("type")
logger.info(
"WS recv: client_id=%s user_id=%s type=%s keys=%s",
client_id,
user_id,
message_type,
sorted(list(message.keys())),
)
if message_type == WSMessageType.PING:
# 心跳响应
@ -912,6 +919,13 @@ async def handle_client_message(message: dict, client_id: str, user_id: str):
items = message.get("items")
batch_name = message.get("batch_name")
metadata = message.get("metadata", {})
logger.info(
"WS submit_batch_tasks: client_id=%s user_id=%s batch_name=%s item_count=%s",
client_id,
user_id,
batch_name,
len(items) if isinstance(items, list) else "invalid",
)
if not items or not isinstance(items, list):
await websocket_manager.send_personal_message({
@ -946,6 +960,7 @@ async def handle_client_message(message: dict, client_id: str, user_id: str):
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
logger.exception("WS submit_batch_tasks failed: client_id=%s error=%s", client_id, e)
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"failed to submit batch tasks: {str(e)}",
@ -1167,6 +1182,7 @@ async def handle_client_message(message: dict, client_id: str, user_id: str):
else:
# 未知消息类型
logger.warning("WS unknown message type: client_id=%s type=%s", client_id, message_type)
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"未知的消息类型: {message_type}",

View File

@ -1,6 +1,7 @@
from __future__ import annotations
from __future__ import annotations
import asyncio
import logging
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Set
@ -21,6 +22,9 @@ from app.models.cad_batch import (
from app.models.operation_log import OperationStatus
logger = logging.getLogger(__name__)
TERMINAL_BATCH_STATUSES: Set[BatchStatus] = {
BatchStatus.COMPLETED,
BatchStatus.COMPLETED_WITH_ERRORS,
@ -178,10 +182,127 @@ class CadBatchManager:
def get_task_completion_mode(self, software_id: str, task_type: str) -> str:
task_cfg = software_config.get_plugin_task_config(software_id, task_type) or {}
mode = task_cfg.get("completion_mode", "callback")
if mode not in {"callback", "sync"}:
if mode not in {"callback", "sync", "submit_only"}:
return "callback"
return mode
def should_auto_open_model(self, software_id: str, task_type: str) -> bool:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
enabled = bool(plugin_cfg.get("auto_open_model_before_tasks", False))
if not enabled:
return False
exclude = plugin_cfg.get("auto_open_exclude_task_types", ["open_model", "close_model"])
if isinstance(exclude, list) and task_type in exclude:
return False
include = plugin_cfg.get("auto_open_include_task_types")
if isinstance(include, list) and include:
return task_type in include
return True
def should_auto_close_model(self, software_id: str, task_type: str) -> bool:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
enabled = bool(plugin_cfg.get("auto_close_model_after_tasks", False))
if not enabled:
return False
exclude = plugin_cfg.get("auto_close_exclude_task_types", ["open_model", "close_model"])
if isinstance(exclude, list) and task_type in exclude:
return False
include = plugin_cfg.get("auto_close_include_task_types")
if isinstance(include, list) and include:
return task_type in include
return True
def get_inter_step_delay_sec(self, software_id: str) -> float:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
value = plugin_cfg.get("inter_step_delay_sec", 0)
try:
delay = float(value)
except (TypeError, ValueError):
return 0
return max(0.0, delay)
def get_between_items_delay_sec(self, software_id: str) -> float:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
value = plugin_cfg.get("between_items_delay_sec", 0)
try:
delay = float(value)
except (TypeError, ValueError):
return 0
return max(0.0, delay)
def should_run_pre_batch_cleanup(self, software_id: str) -> bool:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
return bool(plugin_cfg.get("pre_batch_cleanup_enabled", False))
def get_pre_batch_cleanup_task_type(self, software_id: str) -> Optional[str]:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
task_type = plugin_cfg.get("pre_batch_cleanup_task_type")
if isinstance(task_type, str) and task_type.strip():
return task_type.strip()
return None
def get_pre_batch_cleanup_task_params(self, software_id: str) -> Dict[str, object]:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
params = plugin_cfg.get("pre_batch_cleanup_task_params")
return params if isinstance(params, dict) else {}
def get_pre_batch_cleanup_ignore_error_markers(self, software_id: str) -> List[str]:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
markers = plugin_cfg.get("pre_batch_cleanup_ignore_error_markers")
if not isinstance(markers, list):
return []
normalized: List[str] = []
for marker in markers:
if isinstance(marker, str):
value = marker.strip()
if value:
normalized.append(value)
return normalized
def get_close_model_ignore_error_markers(self, software_id: str) -> List[str]:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
markers = plugin_cfg.get("close_model_ignore_error_markers")
if not isinstance(markers, list):
return []
normalized: List[str] = []
for marker in markers:
if isinstance(marker, str):
value = marker.strip()
if value:
normalized.append(value)
return normalized
def should_check_open_before_pre_batch_cleanup(self, software_id: str) -> bool:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
return bool(plugin_cfg.get("pre_batch_check_open_enabled", False))
def get_pre_batch_check_task_type(self, software_id: str) -> Optional[str]:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
task_type = plugin_cfg.get("pre_batch_check_task_type")
if isinstance(task_type, str) and task_type.strip():
return task_type.strip()
return None
async def log_execution_event(
self,
operation: str,
details: str,
status: OperationStatus = OperationStatus.SUCCESS,
extra_data: Optional[dict] = None,
):
await self._log_system_operation(
operation=operation,
details=details,
status=status,
extra_data=extra_data or {},
)
def validate_callback_token(self, software_id: str, token: Optional[str]) -> bool:
plugin_cfg = software_config.get_plugin_config(software_id) or {}
expected = plugin_cfg.get("callback_token")
@ -375,6 +496,13 @@ class CadBatchManager:
)
async def _log_system_operation(self, operation: str, details: str, status=OperationStatus.SUCCESS, **kwargs):
extra_data = kwargs.get("extra_data")
prefix = "[batch]"
if status == OperationStatus.FAILED:
logger.warning("%s %s | %s | extra=%s", prefix, operation, details, extra_data or {})
else:
logger.info("%s %s | %s | extra=%s", prefix, operation, details, extra_data or {})
if self._log_manager:
await self._log_manager.log_system_operation(
operation=operation,

View File

@ -25,8 +25,8 @@ class PluginHttpClient:
base_url = plugin_config.get("base_url")
submit_path = plugin_config.get("submit_path", "/api/plugin/tasks")
timeout = int(plugin_config.get("request_timeout_sec", 10))
task_config = self._config_provider.get_plugin_task_config(software_id, task_type) or {}
timeout = int(task_config.get("request_timeout_sec", plugin_config.get("request_timeout_sec", 10)))
submit_path = task_config.get("path", submit_path)
body_mode = task_config.get("body_mode", "passthrough")
@ -43,6 +43,35 @@ class PluginHttpClient:
except Exception as exc:
raise PluginSubmitError(str(exc)) from exc
async def get_task_status(
self,
software_id: str,
task_id: Optional[str] = None,
status_url: Optional[str] = None,
timeout_sec: int = 10,
) -> Dict[str, Any]:
plugin_config = self._config_provider.get_plugin_config(software_id)
if not plugin_config:
raise PluginSubmitError(f"Plugin config for software '{software_id}' not found")
base_url = plugin_config.get("base_url")
if not base_url:
raise PluginSubmitError(f"Plugin base_url for software '{software_id}' is missing")
if status_url:
url = parse.urljoin(base_url.rstrip("/") + "/", str(status_url).lstrip("/"))
elif task_id:
url = parse.urljoin(base_url.rstrip("/") + "/", f"api/task/{task_id}")
else:
raise PluginSubmitError("Either task_id or status_url must be provided for polling")
try:
return await asyncio.to_thread(self._get_json, url, int(timeout_sec))
except PluginSubmitError:
raise
except Exception as exc:
raise PluginSubmitError(str(exc)) from exc
@staticmethod
def _build_payload(payload: Dict[str, Any], body_mode: str) -> Dict[str, Any]:
task_params = payload.get("task_params", {}) if isinstance(payload.get("task_params"), dict) else {}
@ -98,3 +127,26 @@ class PluginHttpClient:
raise PluginSubmitError(f"Plugin HTTP {exc.code}: {body}") from exc
except error.URLError as exc:
raise PluginSubmitError(f"Plugin request failed: {exc.reason}") from exc
@staticmethod
def _get_json(url: str, timeout: int) -> Dict[str, Any]:
req = request.Request(
url=url,
headers={"Accept": "application/json"},
method="GET",
)
try:
with request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8") if resp.length != 0 else ""
if not body:
return {}
try:
return json.loads(body)
except json.JSONDecodeError:
return {"raw": body}
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="ignore") if exc.fp else ""
raise PluginSubmitError(f"Plugin HTTP {exc.code}: {body}") from exc
except error.URLError as exc:
raise PluginSubmitError(f"Plugin request failed: {exc.reason}") from exc

View File

@ -1,11 +1,12 @@
from __future__ import annotations
from __future__ import annotations
import asyncio
import uuid
from typing import Optional
from typing import Any, Dict, Optional
from app.core.plugin_http_client import PluginSubmitError
from app.models.cad_batch import PluginResultStatus
from app.models.operation_log import OperationStatus
class SerialBatchExecutor:
@ -19,6 +20,7 @@ class SerialBatchExecutor:
self._queue: asyncio.Queue[str] = asyncio.Queue()
self._worker_task: Optional[asyncio.Task] = None
self._running = False
self._pre_batch_cleanup_done: set[tuple[str, str]] = set()
async def start(self):
if self._worker_task and not self._worker_task.done():
@ -37,6 +39,7 @@ class SerialBatchExecutor:
except asyncio.CancelledError:
pass
self._worker_task = None
self._pre_batch_cleanup_done.clear()
async def enqueue(self, item_id: str):
await self._queue.put(item_id)
@ -44,77 +47,797 @@ class SerialBatchExecutor:
async def _worker_loop(self):
while self._running:
item_id = await self._queue.get()
software_id = None
try:
await self._process_item(item_id)
software_id = await self._process_item(item_id)
finally:
self._queue.task_done()
async def _process_item(self, item_id: str):
if software_id:
between_items_delay = self._batch_manager.get_between_items_delay_sec(software_id)
if between_items_delay > 0:
await self._log_event(
operation="batch_between_items_delay",
details=f"Delay {between_items_delay}s before next queued item",
extra_data={"software_id": software_id, "delay_sec": between_items_delay},
)
await asyncio.sleep(between_items_delay)
async def _process_item(self, item_id: str) -> Optional[str]:
item = await self._batch_manager.get_item(item_id)
if not item:
return
return None
# Item may already be marked failed during routing.
if item.software_id is None:
return
return None
for attempt in range(item.max_retries + 1):
execution_id = str(uuid.uuid4())
await self._batch_manager.mark_item_dispatching(item_id, execution_id, attempt)
callback_url = self._batch_manager.get_callback_endpoint_url()
payload = {
"execution_id": execution_id,
"batch_id": item.batch_id,
"item_id": item.id,
"model_path": item.model_path,
"task_type": item.task_type,
"task_params": item.task_params,
"callback_url": callback_url,
"attempt": attempt,
}
error_message = None
current_phase = "main_task"
open_result = None
main_result = None
close_result = None
auto_open = self._batch_manager.should_auto_open_model(item.software_id, item.task_type)
auto_close = self._batch_manager.should_auto_close_model(item.software_id, item.task_type)
try:
plugin_response = await self._plugin_client.submit_task(
item.software_id,
item.task_type,
payload,
current_phase = "pre_batch_cleanup"
await self._ensure_pre_batch_cleanup(item=item, item_id=item_id, attempt=attempt)
if auto_open:
current_phase = "open_model"
open_result = await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type="open_model",
task_params=self._extract_open_task_params(item.task_params),
phase=current_phase,
)
await self._sleep_inter_step(item, phase="open_model_to_main")
current_phase = item.task_type
main_result = await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type=item.task_type,
task_params=item.task_params,
phase=current_phase,
)
completion_mode = self._batch_manager.get_task_completion_mode(item.software_id, item.task_type)
if completion_mode == "sync":
success_flag = bool(plugin_response.get("success"))
if success_flag:
await self._batch_manager.mark_item_succeeded(item_id, plugin_response)
return
error_message = plugin_response.get("error") or "Plugin sync task failed"
raise PluginSubmitError(error_message)
if auto_close:
await self._sleep_inter_step(item, phase="main_to_close_model")
current_phase = "close_model"
try:
close_result = await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type="close_model",
task_params=self._extract_close_task_params(item.task_params),
phase=current_phase,
)
except PluginSubmitError as exc:
error_message = str(exc)
if not self._is_ignorable_close_model_error(item.software_id, error_message):
raise
close_result = {"ignored_error": error_message}
await self._log_event(
operation="batch_close_model_ignored",
details=(
f"Close model ignored item={item.id} attempt={attempt}: "
f"{error_message}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"software_id": item.software_id,
"error_message": error_message,
},
)
await self._batch_manager.mark_item_waiting_callback(item_id, execution_id, plugin_response)
timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id)
callback_payload = await self._callback_registry.wait_for_callback(execution_id, timeout_sec)
if callback_payload.status == PluginResultStatus.SUCCESS:
await self._batch_manager.mark_item_succeeded(item_id, callback_payload.result)
return
error_message = callback_payload.error_message or "Plugin returned failed status"
except asyncio.TimeoutError:
error_message = f"Callback timeout for execution_id={execution_id}"
success_result = self._compose_success_result(
auto_open=auto_open,
auto_close=auto_close,
open_result=open_result,
main_result=main_result,
close_result=close_result,
)
await self._batch_manager.mark_item_succeeded(item_id, success_result)
await self._log_event(
operation="batch_item_succeeded",
details=f"Item succeeded item={item.id} attempt={attempt}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"software_id": item.software_id,
"auto_open": auto_open,
"auto_close": auto_close,
},
)
return item.software_id
except PluginSubmitError as exc:
error_message = str(exc)
await self._log_event(
operation="batch_step_failed",
details=(
f"Step failed item={item.id} phase={current_phase} attempt={attempt}: "
f"{error_message}"
),
status=OperationStatus.FAILED,
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"phase": current_phase,
"error_message": error_message,
},
)
except Exception as exc:
error_message = f"Unexpected execution error: {exc}"
await self._log_event(
operation="batch_step_unexpected_error",
details=(
f"Unexpected error item={item.id} phase={current_phase} attempt={attempt}: "
f"{exc}"
),
status=OperationStatus.FAILED,
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"phase": current_phase,
"error_message": str(exc),
},
)
if attempt < item.max_retries:
await self._batch_manager.mark_item_retrying(item_id, error_message, attempt + 1)
backoff = self._batch_manager.get_retry_backoff_sec(item.software_id, attempt)
await self._log_event(
operation="batch_retry_scheduled",
details=(
f"Retry scheduled item={item.id} phase={current_phase} next_attempt={attempt + 1} "
f"backoff_sec={backoff}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"next_attempt": attempt + 1,
"phase": current_phase,
"backoff_sec": backoff,
"error_message": error_message,
},
)
if backoff > 0:
await asyncio.sleep(backoff)
continue
await self._batch_manager.mark_item_failed(item_id, error_message)
await self._log_event(
operation="batch_item_failed_final",
details=f"Item failed item={item.id} phase={current_phase}: {error_message}",
status=OperationStatus.FAILED,
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"phase": current_phase,
"error_message": error_message,
},
)
return item.software_id
return item.software_id
async def _execute_task_step(
self,
item,
item_id: str,
attempt: int,
task_type: str,
task_params: Dict[str, Any],
phase: str,
) -> Dict[str, Any]:
execution_id = str(uuid.uuid4())
await self._batch_manager.mark_item_dispatching(item_id, execution_id, attempt)
completion_mode = self._batch_manager.get_task_completion_mode(item.software_id, task_type)
if phase == "open_model" and completion_mode == "submit_only":
raise PluginSubmitError(
"open_model completion_mode=submit_only is not allowed; use sync or callback"
)
callback_url = self._batch_manager.get_callback_endpoint_url()
payload = {
"execution_id": execution_id,
"batch_id": item.batch_id,
"item_id": item.id,
"model_path": item.model_path,
"task_type": task_type,
"task_params": task_params,
"callback_url": callback_url,
"attempt": attempt,
}
await self._log_event(
operation="batch_step_start",
details=(
f"Step start item={item.id} phase={phase} attempt={attempt} "
f"task_type={task_type} completion_mode={completion_mode}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"software_id": item.software_id,
"task_type": task_type,
"model_path": item.model_path,
"completion_mode": completion_mode,
},
)
plugin_response = await self._plugin_client.submit_task(item.software_id, task_type, payload)
await self._log_event(
operation="batch_step_submitted",
details=f"Step submitted item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"completion_mode": completion_mode,
"plugin_response": plugin_response,
},
)
# Revit shell_execute is async by taskId/statusUrl. Poll plugin task status directly.
if item.software_id == "revit" and task_type == "shell_execute":
polled_result = await self._poll_revit_shell_execute_result(
item=item,
attempt=attempt,
phase=phase,
plugin_response=plugin_response if isinstance(plugin_response, dict) else {},
execution_id=execution_id,
)
if polled_result is not None:
return polled_result
if completion_mode == "submit_only":
submit_result = {"completion_mode": "submit_only"}
if isinstance(plugin_response, dict):
submit_result.update(plugin_response)
else:
submit_result["raw_response"] = str(plugin_response)
await self._log_event(
operation="batch_step_submit_only_accepted",
details=f"Submit-only accepted item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
},
)
return submit_result
if completion_mode == "sync":
if phase == "open_model":
success_flag = self._is_successful_open_response(plugin_response)
else:
success_flag = self._is_successful_sync_response(plugin_response)
if not success_flag:
error_message = "Plugin sync task failed"
if isinstance(plugin_response, dict):
error_message = plugin_response.get("error") or error_message
raise PluginSubmitError(f"{phase} sync failed: {error_message}")
await self._log_event(
operation="batch_step_sync_succeeded",
details=f"Sync succeeded item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
},
)
return plugin_response if isinstance(plugin_response, dict) else {"raw_response": str(plugin_response)}
await self._batch_manager.mark_item_waiting_callback(item_id, execution_id, plugin_response)
await self._log_event(
operation="batch_step_waiting_callback",
details=f"Waiting callback item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
},
)
timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id)
try:
callback_payload = await self._callback_registry.wait_for_callback(execution_id, timeout_sec)
except asyncio.TimeoutError as exc:
raise PluginSubmitError(f"{phase} callback timeout for execution_id={execution_id}") from exc
if callback_payload.status != PluginResultStatus.SUCCESS:
error_message = callback_payload.error_message or "Plugin returned failed status"
raise PluginSubmitError(f"{phase} callback failed: {error_message}")
await self._log_event(
operation="batch_step_callback_succeeded",
details=f"Callback succeeded item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
},
)
return callback_payload.result if isinstance(callback_payload.result, dict) else {"result": callback_payload.result}
async def _poll_revit_shell_execute_result(
self,
item,
attempt: int,
phase: str,
plugin_response: Dict[str, Any],
execution_id: str,
) -> Optional[Dict[str, Any]]:
if not hasattr(self._plugin_client, "get_task_status"):
return None
data = plugin_response.get("data")
if not isinstance(data, dict):
data = {}
task_id = data.get("taskId") or data.get("task_id")
status_url = data.get("statusUrl") or data.get("status_url")
if not task_id and not status_url:
return None
timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id)
poll_interval_sec = 2.0
deadline = asyncio.get_event_loop().time() + timeout_sec
last_status = None
await self._log_event(
operation="batch_step_polling_started",
details=(
f"Polling started item={item.id} phase={phase} execution_id={execution_id} "
f"task_id={task_id or 'unknown'} timeout_sec={timeout_sec}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"task_id": task_id,
"status_url": status_url,
"poll_interval_sec": poll_interval_sec,
"timeout_sec": timeout_sec,
},
)
while asyncio.get_event_loop().time() < deadline:
status_response = await self._plugin_client.get_task_status(
software_id=item.software_id,
task_id=str(task_id) if task_id else None,
status_url=str(status_url) if status_url else None,
)
status_data = status_response.get("data") if isinstance(status_response.get("data"), dict) else {}
status_value = (
status_data.get("status")
or status_response.get("status")
or ""
)
lowered = str(status_value).strip().lower()
if lowered and lowered != last_status:
last_status = lowered
await self._log_event(
operation="batch_step_polling_status",
details=(
f"Polling status item={item.id} phase={phase} execution_id={execution_id} "
f"status={status_value}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"task_id": task_id,
"status": status_value,
},
)
if lowered in {"completed", "success", "succeeded", "done"}:
await self._log_event(
operation="batch_step_polling_succeeded",
details=(
f"Polling succeeded item={item.id} phase={phase} execution_id={execution_id} "
f"task_id={task_id}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"task_id": task_id,
},
)
return status_response if isinstance(status_response, dict) else {"raw_response": str(status_response)}
if lowered in {"failed", "cancelled", "canceled", "error"}:
error_message = (
status_data.get("errorMessage")
or status_data.get("error_message")
or status_data.get("error")
or status_response.get("message")
or "Plugin task failed"
)
raise PluginSubmitError(f"{phase} polling failed: {error_message}")
await asyncio.sleep(poll_interval_sec)
raise PluginSubmitError(
f"{phase} polling timeout for task_id={task_id or 'unknown'} execution_id={execution_id}"
)
async def _ensure_pre_batch_cleanup(self, item, item_id: str, attempt: int):
key = (item.batch_id, item.software_id)
if key in self._pre_batch_cleanup_done:
return
if not self._batch_manager.should_run_pre_batch_cleanup(item.software_id):
self._pre_batch_cleanup_done.add(key)
return
await self._log_event(
operation="batch_precheck_start",
details=f"Precheck start batch={item.batch_id} software={item.software_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
},
)
should_close = True
if self._batch_manager.should_check_open_before_pre_batch_cleanup(item.software_id):
check_task_type = self._batch_manager.get_pre_batch_check_task_type(item.software_id)
if not check_task_type:
raise PluginSubmitError(
f"pre_batch_check_open_enabled is true but pre_batch_check_task_type is missing for {item.software_id}"
)
check_result = await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type=check_task_type,
task_params={},
phase="pre_batch_check_open",
)
should_close = self._is_model_open_from_check_result(check_result)
await self._log_event(
operation="batch_precheck_state_checked",
details=(
f"Precheck state checked batch={item.batch_id} software={item.software_id} "
f"should_close={should_close}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
"should_close": should_close,
"check_result": check_result,
},
)
if not should_close:
self._pre_batch_cleanup_done.add(key)
await self._log_event(
operation="batch_precheck_skip_cleanup",
details=f"Precheck skip cleanup batch={item.batch_id} software={item.software_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
},
)
return
cleanup_task_type = self._batch_manager.get_pre_batch_cleanup_task_type(item.software_id)
if not cleanup_task_type:
raise PluginSubmitError(
f"pre_batch_cleanup_enabled is true but pre_batch_cleanup_task_type is missing for {item.software_id}"
)
cleanup_params = self._batch_manager.get_pre_batch_cleanup_task_params(item.software_id)
try:
await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type=cleanup_task_type,
task_params=cleanup_params,
phase=f"pre_batch_cleanup_{cleanup_task_type}",
)
self._pre_batch_cleanup_done.add(key)
await self._log_event(
operation="batch_precheck_cleanup_succeeded",
details=(
f"Precheck cleanup succeeded batch={item.batch_id} software={item.software_id} "
f"task_type={cleanup_task_type}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
"cleanup_task_type": cleanup_task_type,
},
)
except PluginSubmitError as exc:
error_message = str(exc)
if not self._is_ignorable_pre_batch_cleanup_error(item.software_id, error_message):
raise
self._pre_batch_cleanup_done.add(key)
await self._log_event(
operation="batch_precheck_cleanup_ignored",
details=(
f"Precheck cleanup ignored batch={item.batch_id} software={item.software_id} "
f"task_type={cleanup_task_type}: {error_message}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
"cleanup_task_type": cleanup_task_type,
"error_message": error_message,
},
)
async def _sleep_inter_step(self, item, phase: str):
delay = self._batch_manager.get_inter_step_delay_sec(item.software_id)
if delay <= 0:
return
await self._log_event(
operation="batch_inter_step_delay",
details=f"Delay {delay}s between steps ({phase})",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"phase": phase,
"delay_sec": delay,
},
)
await asyncio.sleep(delay)
@staticmethod
def _compose_success_result(
auto_open: bool,
auto_close: bool,
open_result: Optional[Dict[str, Any]],
main_result: Optional[Dict[str, Any]],
close_result: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
if not auto_open and not auto_close:
return main_result or {}
result: Dict[str, Any] = {"main_result": main_result or {}}
if auto_open:
result["open_model_result"] = open_result or {}
if auto_close:
result["close_model_result"] = close_result or {}
return result
@staticmethod
def _extract_open_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(task_params, dict):
return {}
open_params = task_params.get("open_model_params")
if isinstance(open_params, dict):
return open_params
open_params = task_params.get("open_params")
if isinstance(open_params, dict):
return open_params
return {}
@staticmethod
def _extract_close_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(task_params, dict):
return {}
close_params = task_params.get("close_model_params")
if isinstance(close_params, dict):
return close_params
close_params = task_params.get("close_params")
if isinstance(close_params, dict):
return close_params
return {}
def _is_ignorable_pre_batch_cleanup_error(self, software_id: str, error_message: str) -> bool:
if not error_message:
return False
markers = self._batch_manager.get_pre_batch_cleanup_ignore_error_markers(software_id)
if not markers:
return False
lowered = error_message.lower()
for marker in markers:
if marker.lower() in lowered:
return True
return False
def _is_ignorable_close_model_error(self, software_id: str, error_message: str) -> bool:
if not error_message:
return False
markers = self._batch_manager.get_close_model_ignore_error_markers(software_id)
if not markers:
return False
lowered = error_message.lower()
for marker in markers:
if marker.lower() in lowered:
return True
return False
@staticmethod
def _is_model_open_from_check_result(check_result: Dict[str, Any]) -> bool:
if not isinstance(check_result, dict):
return True
explicit_flags = [
"has_open_model",
"hasOpenModel",
"is_model_open",
"isModelOpen",
"opened",
"isOpened",
"project_opened",
"projectOpened",
]
for key in explicit_flags:
if key in check_result:
return bool(check_result.get(key))
non_empty_markers = [
"model_path",
"modelPath",
"file_path",
"filePath",
"current_model",
"currentModel",
"project_name",
"projectName",
"current_project",
"currentProject",
]
for key in non_empty_markers:
value = check_result.get(key)
if isinstance(value, str) and value.strip():
return True
if isinstance(value, dict) and value:
return True
# Unknown schema: default to close for safety.
return True
async def _log_event(
self,
operation: str,
details: str,
status: OperationStatus = OperationStatus.SUCCESS,
extra_data: Optional[dict] = None,
):
log_method = getattr(self._batch_manager, "log_execution_event", None)
if not callable(log_method):
return
try:
await log_method(
operation=operation,
details=details,
status=status,
extra_data=extra_data or {},
)
except Exception:
# Logging must not break task execution.
return
@staticmethod
def _is_successful_sync_response(plugin_response: dict) -> bool:
if not isinstance(plugin_response, dict):
return False
if plugin_response.get("error"):
return False
code_value = plugin_response.get("code")
try:
if code_value is not None and int(code_value) == 202:
# 202 usually means accepted/queued, not completed.
return False
except (TypeError, ValueError):
pass
status_value = plugin_response.get("status")
if isinstance(status_value, str) and status_value.lower() in {"accepted", "queued", "pending"}:
return False
if "success" in plugin_response:
return bool(plugin_response.get("success"))
if plugin_response.get("accepted") is True:
return False
if isinstance(status_value, str) and status_value.lower() in {"success", "ok", "completed", "done"}:
return True
try:
if code_value is not None and int(code_value) in {0, 200, 201}:
return True
except (TypeError, ValueError):
pass
if plugin_response.get("task_id") or plugin_response.get("taskId"):
return False
return False
@staticmethod
def _is_successful_open_response(plugin_response: dict) -> bool:
if not isinstance(plugin_response, dict):
return False
if plugin_response.get("error"):
return False
if "success" in plugin_response:
return bool(plugin_response.get("success"))
code_value = plugin_response.get("code")
try:
if code_value is not None and int(code_value) in {0, 200}:
return True
except (TypeError, ValueError):
pass
status_value = plugin_response.get("status")
if isinstance(status_value, str) and status_value.lower() in {"success", "ok"}:
return True
return False

View File

@ -1,4 +1,6 @@
"""FastAPI entrypoint."""
"""FastAPI entrypoint."""
import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
@ -10,6 +12,23 @@ from app.core.software_manager import software_manager
from app.core.websocket_manager import websocket_manager
def _configure_app_logging():
"""Ensure app.* logs are visible in console during runtime debugging."""
app_logger = logging.getLogger("app")
if app_logger.handlers:
return
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
handler.setFormatter(formatter)
app_logger.addHandler(handler)
app_logger.setLevel(logging.INFO)
app_logger.propagate = False
_configure_app_logging()
app = FastAPI(
title="CadHubManage API",
description="Backend service for managing CAD software and batch processing tasks.",

View File

@ -1,4 +1,4 @@
# CAD 批处理串行执行框架(设计与开发进度)
# CAD 批处理串行执行框架(设计与开发进度)
## 1. 文档目的
本文件用于同步两类信息:
@ -70,6 +70,21 @@
- `plugins.{software_id}.max_retries`
- `plugins.{software_id}.retry_backoff_sec`
- `plugins.{software_id}.callback_token`
- `plugins.{software_id}.auto_open_model_before_tasks`
- `plugins.{software_id}.auto_open_include_task_types`
- `plugins.{software_id}.auto_open_exclude_task_types`
- `plugins.{software_id}.auto_close_model_after_tasks`
- `plugins.{software_id}.auto_close_include_task_types`
- `plugins.{software_id}.auto_close_exclude_task_types`
- `plugins.{software_id}.inter_step_delay_sec`
- `plugins.{software_id}.between_items_delay_sec`
- `plugins.{software_id}.pre_batch_cleanup_enabled`
- `plugins.{software_id}.pre_batch_cleanup_task_type`
- `plugins.{software_id}.pre_batch_cleanup_task_params`
- `plugins.{software_id}.pre_batch_cleanup_ignore_error_markers`
- `plugins.{software_id}.close_model_ignore_error_markers`
- `plugins.{software_id}.pre_batch_check_open_enabled`
- `plugins.{software_id}.pre_batch_check_task_type`
- `plugins.{software_id}.tasks.{task_type}.path`
- `plugins.{software_id}.tasks.{task_type}.body_mode`
- `plugins.{software_id}.tasks.{task_type}.completion_mode`
@ -85,6 +100,7 @@
### 4.3 completion_mode 语义
- `callback`:下发后进入 `waiting_callback`,等待插件回调判定结果
- `sync`:以接口同步返回结果直接判定成功/失败,不等待回调
- `submit_only`:接口返回 2xx 即视为提交成功,条目直接置为成功(适合插件无回调且无任务查询接口场景)
---

View File

@ -18,6 +18,13 @@ plugins:
callback_timeout_sec: 60
callback_token: creo-callback-token
max_retries: 1
pre_batch_check_open_enabled: false
pre_batch_cleanup_enabled: true
pre_batch_cleanup_task_params:
force_close: true
pre_batch_cleanup_task_type: close_model
pre_batch_cleanup_ignore_error_markers: []
close_model_ignore_error_markers: []
request_timeout_sec: 10
retry_backoff_sec:
- 1
@ -33,15 +40,22 @@ plugins:
path: /api/model/open
shell_analysis:
body_mode: task_params_only
completion_mode: sync
path: /api/analysis/shell-analysis
shrinkwrap_shell:
body_mode: task_params_only
completion_mode: sync
path: /api/creo/shrinkwrap/shell
pdms:
base_url: http://localhost:9001
callback_timeout_sec: 60
callback_token: pdms-callback-token
max_retries: 1
pre_batch_check_open_enabled: false
pre_batch_cleanup_enabled: true
pre_batch_cleanup_task_type: close_project
pre_batch_cleanup_ignore_error_markers: []
close_model_ignore_error_markers: []
request_timeout_sec: 10
retry_backoff_sec:
- 1
@ -50,6 +64,7 @@ plugins:
tasks:
close_project:
body_mode: task_params_only
completion_mode: sync
path: /api/project/close
open_mdb:
body_mode: project_open
@ -64,10 +79,36 @@ plugins:
body_mode: task_params_only
path: /api/model/simplify
revit:
auto_close_exclude_task_types:
- open_model
- close_model
auto_close_include_task_types:
- shell_analyze
- shell_execute
auto_close_model_after_tasks: true
auto_open_exclude_task_types:
- open_model
- close_model
auto_open_include_task_types:
- shell_analyze
- shell_execute
auto_open_model_before_tasks: true
base_url: http://localhost:9000
callback_timeout_sec: 60
callback_token: revit-callback-token
between_items_delay_sec: 2
inter_step_delay_sec: 2
max_retries: 1
pre_batch_check_open_enabled: false
pre_batch_cleanup_enabled: true
pre_batch_cleanup_ignore_error_markers:
- NO_DOCUMENT_OPEN
- 没有打开的文档
pre_batch_cleanup_task_type: close_model
close_model_ignore_error_markers:
- NO_DOCUMENT_OPEN
- The active document may not be closed from the API.
- 没有打开的文档
request_timeout_sec: 10
retry_backoff_sec:
- 1
@ -78,14 +119,19 @@ plugins:
body_mode: empty
completion_mode: sync
path: /api/close
request_timeout_sec: 120
open_model:
body_mode: file_path
completion_mode: sync
path: /api/open
request_timeout_sec: 120
shell_analyze:
body_mode: task_params_only
completion_mode: callback
path: /api/shell/analyze
shell_execute:
body_mode: task_params_only
completion_mode: callback
path: /api/shell/execute
routing:
extension_to_software:

View File

@ -0,0 +1,61 @@
param(
[string]$BackendBaseUrl = "http://127.0.0.1:8000",
[string]$LogFile = "",
[string]$Token = "revit-callback-token",
[int]$LookbackLines = 400
)
$ErrorActionPreference = "Stop"
[Console]::OutputEncoding = [System.Text.Encoding]::UTF8
if (-not $LogFile) {
$today = Get-Date -Format "yyyy-MM-dd"
$LogFile = ".\logs\operation_logs\operation_$today.jsonl"
}
if (-not (Test-Path $LogFile)) {
throw "Log file not found: $LogFile"
}
$callbackUrl = ($BackendBaseUrl.TrimEnd("/")) + "/api/v1/plugin-callbacks/task-result"
Write-Host "Watching: $LogFile"
Write-Host "Callback: $callbackUrl"
$sent = @{}
while ($true) {
$lines = Get-Content -Path $LogFile -Tail $LookbackLines
foreach ($line in $lines) {
if ($line -notmatch '"operation":"batch_step_waiting_callback"') { continue }
$obj = $null
try {
$obj = $line | ConvertFrom-Json
} catch {
continue
}
$executionId = $obj.extra_data.execution_id
if (-not $executionId) { continue }
if ($sent.ContainsKey($executionId)) { continue }
$payload = @{
execution_id = $executionId
software_id = "revit"
status = "success"
error_message = $null
result = @{ source = "auto_revit_callback_ps1" }
finished_at = (Get-Date).ToUniversalTime().ToString("o")
token = $Token
} | ConvertTo-Json -Depth 5
try {
$resp = Invoke-RestMethod -Method Post -Uri $callbackUrl -ContentType "application/json" -Body $payload
$sent[$executionId] = $true
Write-Host ("[{0}] callback sent execution_id={1} accepted={2}" -f (Get-Date -Format "HH:mm:ss"), $executionId, $resp.accepted)
} catch {
Write-Host ("[{0}] callback failed execution_id={1} error={2}" -f (Get-Date -Format "HH:mm:ss"), $executionId, $_.Exception.Message)
}
}
Start-Sleep -Milliseconds 700
}

View File

@ -1,4 +1,4 @@
import asyncio
import asyncio
from copy import deepcopy
from datetime import datetime
@ -27,6 +27,9 @@ class FakePluginHttpClient:
behavior = payload.get("task_params", {}).get("behavior", "success")
attempt = payload.get("attempt", 0)
if task_type == "close_model" and behavior == "no_document_open":
return {"success": False, "error": "NO_DOCUMENT_OPEN", "code": 409, "message": "没有打开的文档"}
if behavior == "timeout_once_then_success" and attempt == 0:
# no callback; force timeout for attempt 0
return {"accepted": True}
@ -61,12 +64,24 @@ class FakePluginHttpClient:
@pytest.fixture(autouse=True)
def _configure_short_timeouts():
backup = deepcopy(software_config.load_config())
software_config._config.setdefault("plugins", {}).setdefault("creo", {})["callback_timeout_sec"] = 1
software_config._config.setdefault("plugins", {}).setdefault("creo", {})["retry_backoff_sec"] = [0, 0]
software_config._config.setdefault("plugins", {}).setdefault("creo", {})["max_retries"] = 1
software_config._config.setdefault("plugins", {}).setdefault("revit", {})["callback_timeout_sec"] = 1
software_config._config.setdefault("plugins", {}).setdefault("revit", {})["retry_backoff_sec"] = [0, 0]
software_config._config.setdefault("plugins", {}).setdefault("revit", {})["max_retries"] = 1
creo_plugin = software_config._config.setdefault("plugins", {}).setdefault("creo", {})
creo_plugin["callback_timeout_sec"] = 1
creo_plugin["retry_backoff_sec"] = [0, 0]
creo_plugin["max_retries"] = 1
creo_plugin["pre_batch_cleanup_enabled"] = False
revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {})
revit_plugin["callback_timeout_sec"] = 1
revit_plugin["retry_backoff_sec"] = [0, 0]
revit_plugin["max_retries"] = 1
revit_plugin["pre_batch_cleanup_enabled"] = False
revit_plugin["auto_open_model_before_tasks"] = False
revit_plugin["auto_close_model_after_tasks"] = False
revit_plugin["inter_step_delay_sec"] = 0
revit_plugin["between_items_delay_sec"] = 0
pdms_plugin = software_config._config.setdefault("plugins", {}).setdefault("pdms", {})
pdms_plugin["pre_batch_cleanup_enabled"] = False
yield
software_config._config = backup
@ -146,3 +161,85 @@ async def test_serial_executor_retry_after_timeout_then_success():
assert len(fake_client.calls) >= 2
finally:
await manager.stop()
@pytest.mark.asyncio
async def test_pre_batch_cleanup_ignores_no_document_open():
callback_registry = PluginCallbackRegistry()
fake_client = FakePluginHttpClient(callback_registry)
router = CadTaskRouter({".rvt": "revit"})
revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {})
revit_plugin["pre_batch_cleanup_enabled"] = True
revit_plugin["pre_batch_cleanup_task_type"] = "close_model"
revit_plugin["pre_batch_cleanup_task_params"] = {"behavior": "no_document_open"}
revit_plugin["pre_batch_cleanup_ignore_error_markers"] = ["NO_DOCUMENT_OPEN", "没有打开的文档"]
revit_plugin["auto_open_model_before_tasks"] = False
revit_plugin["auto_close_model_after_tasks"] = False
revit_plugin.setdefault("tasks", {}).setdefault("close_model", {})["completion_mode"] = "sync"
revit_plugin.setdefault("tasks", {}).setdefault("shell_execute", {})["completion_mode"] = "submit_only"
manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry)
await manager.start()
request = BatchSubmitRequest(
items=[BatchSubmitItem(model_path="a.rvt", task_type="shell_execute", task_params={})],
)
try:
batch = await manager.create_batch(request, submitter_id="tester")
final_batch = await _wait_batch_terminal(manager, batch.id)
items = await manager.get_batch_items(batch.id)
assert final_batch.status == BatchStatus.COMPLETED
assert items[0].status.value == "succeeded"
assert len(fake_client.calls) == 2
assert fake_client.calls[0]["task_type"] == "close_model"
assert fake_client.calls[1]["task_type"] == "shell_execute"
finally:
await manager.stop()
@pytest.mark.asyncio
async def test_auto_close_ignores_no_document_open():
callback_registry = PluginCallbackRegistry()
fake_client = FakePluginHttpClient(callback_registry)
router = CadTaskRouter({".rvt": "revit"})
revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {})
revit_plugin["pre_batch_cleanup_enabled"] = False
revit_plugin["auto_open_model_before_tasks"] = False
revit_plugin["auto_close_model_after_tasks"] = True
revit_plugin["auto_close_include_task_types"] = ["shell_execute"]
revit_plugin["close_model_ignore_error_markers"] = [
"NO_DOCUMENT_OPEN",
"The active document may not be closed from the API.",
]
revit_plugin.setdefault("tasks", {}).setdefault("close_model", {})["completion_mode"] = "sync"
revit_plugin.setdefault("tasks", {}).setdefault("shell_execute", {})["completion_mode"] = "submit_only"
manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry)
await manager.start()
request = BatchSubmitRequest(
items=[
BatchSubmitItem(
model_path="a.rvt",
task_type="shell_execute",
task_params={"close_model_params": {"behavior": "no_document_open"}},
)
],
)
try:
batch = await manager.create_batch(request, submitter_id="tester")
final_batch = await _wait_batch_terminal(manager, batch.id)
items = await manager.get_batch_items(batch.id)
assert final_batch.status == BatchStatus.COMPLETED
assert items[0].status.value == "succeeded"
assert len(fake_client.calls) == 2
assert fake_client.calls[0]["task_type"] == "shell_execute"
assert fake_client.calls[1]["task_type"] == "close_model"
finally:
await manager.stop()

View File

@ -1,6 +1,8 @@
### 环境
- conda activate websocket311
备注要先执行conda deactivate和deactivate退出虚虚拟环境
备注:
要先执行conda deactivate
deactivate退出虚虚拟环境
### 打包
- python scripts\build_exe.py --clean