CadHubManage/app/core/serial_batch_executor.py
sladro 08623bf4d6 feat: Enhance SerialBatchExecutor with pre-batch cleanup and task execution improvements
- Added pre-batch cleanup functionality to SerialBatchExecutor, allowing for cleanup tasks before processing items.
- Introduced new task execution phases and improved error handling for task submissions.
- Implemented inter-step delays and between-items delays for better task management.
- Updated logging to capture detailed events during batch processing.
- Enhanced configuration options for plugins in software_config.yaml to support new features.
- Added tests for pre-batch cleanup and auto-close scenarios to ensure robust handling of edge cases.
- Created a PowerShell script for automated callback handling from Revit.
2026-03-03 16:13:19 +08:00

844 lines
32 KiB
Python

from __future__ import annotations
import asyncio
import uuid
from typing import Any, Dict, Optional
from app.core.plugin_http_client import PluginSubmitError
from app.models.cad_batch import PluginResultStatus
from app.models.operation_log import OperationStatus
class SerialBatchExecutor:
"""Global single-worker FIFO executor for batch items."""
def __init__(self, batch_manager, plugin_client, callback_registry):
self._batch_manager = batch_manager
self._plugin_client = plugin_client
self._callback_registry = callback_registry
self._queue: asyncio.Queue[str] = asyncio.Queue()
self._worker_task: Optional[asyncio.Task] = None
self._running = False
self._pre_batch_cleanup_done: set[tuple[str, str]] = set()
async def start(self):
if self._worker_task and not self._worker_task.done():
return
self._running = True
self._worker_task = asyncio.create_task(self._worker_loop())
async def stop(self):
self._running = False
if self._worker_task:
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
self._worker_task = None
self._pre_batch_cleanup_done.clear()
async def enqueue(self, item_id: str):
await self._queue.put(item_id)
async def _worker_loop(self):
while self._running:
item_id = await self._queue.get()
software_id = None
try:
software_id = await self._process_item(item_id)
finally:
self._queue.task_done()
if software_id:
between_items_delay = self._batch_manager.get_between_items_delay_sec(software_id)
if between_items_delay > 0:
await self._log_event(
operation="batch_between_items_delay",
details=f"Delay {between_items_delay}s before next queued item",
extra_data={"software_id": software_id, "delay_sec": between_items_delay},
)
await asyncio.sleep(between_items_delay)
async def _process_item(self, item_id: str) -> Optional[str]:
item = await self._batch_manager.get_item(item_id)
if not item:
return None
# Item may already be marked failed during routing.
if item.software_id is None:
return None
for attempt in range(item.max_retries + 1):
error_message = None
current_phase = "main_task"
open_result = None
main_result = None
close_result = None
auto_open = self._batch_manager.should_auto_open_model(item.software_id, item.task_type)
auto_close = self._batch_manager.should_auto_close_model(item.software_id, item.task_type)
try:
current_phase = "pre_batch_cleanup"
await self._ensure_pre_batch_cleanup(item=item, item_id=item_id, attempt=attempt)
if auto_open:
current_phase = "open_model"
open_result = await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type="open_model",
task_params=self._extract_open_task_params(item.task_params),
phase=current_phase,
)
await self._sleep_inter_step(item, phase="open_model_to_main")
current_phase = item.task_type
main_result = await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type=item.task_type,
task_params=item.task_params,
phase=current_phase,
)
if auto_close:
await self._sleep_inter_step(item, phase="main_to_close_model")
current_phase = "close_model"
try:
close_result = await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type="close_model",
task_params=self._extract_close_task_params(item.task_params),
phase=current_phase,
)
except PluginSubmitError as exc:
error_message = str(exc)
if not self._is_ignorable_close_model_error(item.software_id, error_message):
raise
close_result = {"ignored_error": error_message}
await self._log_event(
operation="batch_close_model_ignored",
details=(
f"Close model ignored item={item.id} attempt={attempt}: "
f"{error_message}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"software_id": item.software_id,
"error_message": error_message,
},
)
success_result = self._compose_success_result(
auto_open=auto_open,
auto_close=auto_close,
open_result=open_result,
main_result=main_result,
close_result=close_result,
)
await self._batch_manager.mark_item_succeeded(item_id, success_result)
await self._log_event(
operation="batch_item_succeeded",
details=f"Item succeeded item={item.id} attempt={attempt}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"software_id": item.software_id,
"auto_open": auto_open,
"auto_close": auto_close,
},
)
return item.software_id
except PluginSubmitError as exc:
error_message = str(exc)
await self._log_event(
operation="batch_step_failed",
details=(
f"Step failed item={item.id} phase={current_phase} attempt={attempt}: "
f"{error_message}"
),
status=OperationStatus.FAILED,
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"phase": current_phase,
"error_message": error_message,
},
)
except Exception as exc:
error_message = f"Unexpected execution error: {exc}"
await self._log_event(
operation="batch_step_unexpected_error",
details=(
f"Unexpected error item={item.id} phase={current_phase} attempt={attempt}: "
f"{exc}"
),
status=OperationStatus.FAILED,
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"phase": current_phase,
"error_message": str(exc),
},
)
if attempt < item.max_retries:
await self._batch_manager.mark_item_retrying(item_id, error_message, attempt + 1)
backoff = self._batch_manager.get_retry_backoff_sec(item.software_id, attempt)
await self._log_event(
operation="batch_retry_scheduled",
details=(
f"Retry scheduled item={item.id} phase={current_phase} next_attempt={attempt + 1} "
f"backoff_sec={backoff}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"next_attempt": attempt + 1,
"phase": current_phase,
"backoff_sec": backoff,
"error_message": error_message,
},
)
if backoff > 0:
await asyncio.sleep(backoff)
continue
await self._batch_manager.mark_item_failed(item_id, error_message)
await self._log_event(
operation="batch_item_failed_final",
details=f"Item failed item={item.id} phase={current_phase}: {error_message}",
status=OperationStatus.FAILED,
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"attempt": attempt,
"phase": current_phase,
"error_message": error_message,
},
)
return item.software_id
return item.software_id
async def _execute_task_step(
self,
item,
item_id: str,
attempt: int,
task_type: str,
task_params: Dict[str, Any],
phase: str,
) -> Dict[str, Any]:
execution_id = str(uuid.uuid4())
await self._batch_manager.mark_item_dispatching(item_id, execution_id, attempt)
completion_mode = self._batch_manager.get_task_completion_mode(item.software_id, task_type)
if phase == "open_model" and completion_mode == "submit_only":
raise PluginSubmitError(
"open_model completion_mode=submit_only is not allowed; use sync or callback"
)
callback_url = self._batch_manager.get_callback_endpoint_url()
payload = {
"execution_id": execution_id,
"batch_id": item.batch_id,
"item_id": item.id,
"model_path": item.model_path,
"task_type": task_type,
"task_params": task_params,
"callback_url": callback_url,
"attempt": attempt,
}
await self._log_event(
operation="batch_step_start",
details=(
f"Step start item={item.id} phase={phase} attempt={attempt} "
f"task_type={task_type} completion_mode={completion_mode}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"software_id": item.software_id,
"task_type": task_type,
"model_path": item.model_path,
"completion_mode": completion_mode,
},
)
plugin_response = await self._plugin_client.submit_task(item.software_id, task_type, payload)
await self._log_event(
operation="batch_step_submitted",
details=f"Step submitted item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"completion_mode": completion_mode,
"plugin_response": plugin_response,
},
)
# Revit shell_execute is async by taskId/statusUrl. Poll plugin task status directly.
if item.software_id == "revit" and task_type == "shell_execute":
polled_result = await self._poll_revit_shell_execute_result(
item=item,
attempt=attempt,
phase=phase,
plugin_response=plugin_response if isinstance(plugin_response, dict) else {},
execution_id=execution_id,
)
if polled_result is not None:
return polled_result
if completion_mode == "submit_only":
submit_result = {"completion_mode": "submit_only"}
if isinstance(plugin_response, dict):
submit_result.update(plugin_response)
else:
submit_result["raw_response"] = str(plugin_response)
await self._log_event(
operation="batch_step_submit_only_accepted",
details=f"Submit-only accepted item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
},
)
return submit_result
if completion_mode == "sync":
if phase == "open_model":
success_flag = self._is_successful_open_response(plugin_response)
else:
success_flag = self._is_successful_sync_response(plugin_response)
if not success_flag:
error_message = "Plugin sync task failed"
if isinstance(plugin_response, dict):
error_message = plugin_response.get("error") or error_message
raise PluginSubmitError(f"{phase} sync failed: {error_message}")
await self._log_event(
operation="batch_step_sync_succeeded",
details=f"Sync succeeded item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
},
)
return plugin_response if isinstance(plugin_response, dict) else {"raw_response": str(plugin_response)}
await self._batch_manager.mark_item_waiting_callback(item_id, execution_id, plugin_response)
await self._log_event(
operation="batch_step_waiting_callback",
details=f"Waiting callback item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
},
)
timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id)
try:
callback_payload = await self._callback_registry.wait_for_callback(execution_id, timeout_sec)
except asyncio.TimeoutError as exc:
raise PluginSubmitError(f"{phase} callback timeout for execution_id={execution_id}") from exc
if callback_payload.status != PluginResultStatus.SUCCESS:
error_message = callback_payload.error_message or "Plugin returned failed status"
raise PluginSubmitError(f"{phase} callback failed: {error_message}")
await self._log_event(
operation="batch_step_callback_succeeded",
details=f"Callback succeeded item={item.id} phase={phase} execution_id={execution_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
},
)
return callback_payload.result if isinstance(callback_payload.result, dict) else {"result": callback_payload.result}
async def _poll_revit_shell_execute_result(
self,
item,
attempt: int,
phase: str,
plugin_response: Dict[str, Any],
execution_id: str,
) -> Optional[Dict[str, Any]]:
if not hasattr(self._plugin_client, "get_task_status"):
return None
data = plugin_response.get("data")
if not isinstance(data, dict):
data = {}
task_id = data.get("taskId") or data.get("task_id")
status_url = data.get("statusUrl") or data.get("status_url")
if not task_id and not status_url:
return None
timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id)
poll_interval_sec = 2.0
deadline = asyncio.get_event_loop().time() + timeout_sec
last_status = None
await self._log_event(
operation="batch_step_polling_started",
details=(
f"Polling started item={item.id} phase={phase} execution_id={execution_id} "
f"task_id={task_id or 'unknown'} timeout_sec={timeout_sec}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"task_id": task_id,
"status_url": status_url,
"poll_interval_sec": poll_interval_sec,
"timeout_sec": timeout_sec,
},
)
while asyncio.get_event_loop().time() < deadline:
status_response = await self._plugin_client.get_task_status(
software_id=item.software_id,
task_id=str(task_id) if task_id else None,
status_url=str(status_url) if status_url else None,
)
status_data = status_response.get("data") if isinstance(status_response.get("data"), dict) else {}
status_value = (
status_data.get("status")
or status_response.get("status")
or ""
)
lowered = str(status_value).strip().lower()
if lowered and lowered != last_status:
last_status = lowered
await self._log_event(
operation="batch_step_polling_status",
details=(
f"Polling status item={item.id} phase={phase} execution_id={execution_id} "
f"status={status_value}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"task_id": task_id,
"status": status_value,
},
)
if lowered in {"completed", "success", "succeeded", "done"}:
await self._log_event(
operation="batch_step_polling_succeeded",
details=(
f"Polling succeeded item={item.id} phase={phase} execution_id={execution_id} "
f"task_id={task_id}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"execution_id": execution_id,
"attempt": attempt,
"phase": phase,
"task_id": task_id,
},
)
return status_response if isinstance(status_response, dict) else {"raw_response": str(status_response)}
if lowered in {"failed", "cancelled", "canceled", "error"}:
error_message = (
status_data.get("errorMessage")
or status_data.get("error_message")
or status_data.get("error")
or status_response.get("message")
or "Plugin task failed"
)
raise PluginSubmitError(f"{phase} polling failed: {error_message}")
await asyncio.sleep(poll_interval_sec)
raise PluginSubmitError(
f"{phase} polling timeout for task_id={task_id or 'unknown'} execution_id={execution_id}"
)
async def _ensure_pre_batch_cleanup(self, item, item_id: str, attempt: int):
key = (item.batch_id, item.software_id)
if key in self._pre_batch_cleanup_done:
return
if not self._batch_manager.should_run_pre_batch_cleanup(item.software_id):
self._pre_batch_cleanup_done.add(key)
return
await self._log_event(
operation="batch_precheck_start",
details=f"Precheck start batch={item.batch_id} software={item.software_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
},
)
should_close = True
if self._batch_manager.should_check_open_before_pre_batch_cleanup(item.software_id):
check_task_type = self._batch_manager.get_pre_batch_check_task_type(item.software_id)
if not check_task_type:
raise PluginSubmitError(
f"pre_batch_check_open_enabled is true but pre_batch_check_task_type is missing for {item.software_id}"
)
check_result = await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type=check_task_type,
task_params={},
phase="pre_batch_check_open",
)
should_close = self._is_model_open_from_check_result(check_result)
await self._log_event(
operation="batch_precheck_state_checked",
details=(
f"Precheck state checked batch={item.batch_id} software={item.software_id} "
f"should_close={should_close}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
"should_close": should_close,
"check_result": check_result,
},
)
if not should_close:
self._pre_batch_cleanup_done.add(key)
await self._log_event(
operation="batch_precheck_skip_cleanup",
details=f"Precheck skip cleanup batch={item.batch_id} software={item.software_id}",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
},
)
return
cleanup_task_type = self._batch_manager.get_pre_batch_cleanup_task_type(item.software_id)
if not cleanup_task_type:
raise PluginSubmitError(
f"pre_batch_cleanup_enabled is true but pre_batch_cleanup_task_type is missing for {item.software_id}"
)
cleanup_params = self._batch_manager.get_pre_batch_cleanup_task_params(item.software_id)
try:
await self._execute_task_step(
item=item,
item_id=item_id,
attempt=attempt,
task_type=cleanup_task_type,
task_params=cleanup_params,
phase=f"pre_batch_cleanup_{cleanup_task_type}",
)
self._pre_batch_cleanup_done.add(key)
await self._log_event(
operation="batch_precheck_cleanup_succeeded",
details=(
f"Precheck cleanup succeeded batch={item.batch_id} software={item.software_id} "
f"task_type={cleanup_task_type}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
"cleanup_task_type": cleanup_task_type,
},
)
except PluginSubmitError as exc:
error_message = str(exc)
if not self._is_ignorable_pre_batch_cleanup_error(item.software_id, error_message):
raise
self._pre_batch_cleanup_done.add(key)
await self._log_event(
operation="batch_precheck_cleanup_ignored",
details=(
f"Precheck cleanup ignored batch={item.batch_id} software={item.software_id} "
f"task_type={cleanup_task_type}: {error_message}"
),
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"attempt": attempt,
"cleanup_task_type": cleanup_task_type,
"error_message": error_message,
},
)
async def _sleep_inter_step(self, item, phase: str):
delay = self._batch_manager.get_inter_step_delay_sec(item.software_id)
if delay <= 0:
return
await self._log_event(
operation="batch_inter_step_delay",
details=f"Delay {delay}s between steps ({phase})",
extra_data={
"batch_id": item.batch_id,
"item_id": item.id,
"software_id": item.software_id,
"phase": phase,
"delay_sec": delay,
},
)
await asyncio.sleep(delay)
@staticmethod
def _compose_success_result(
auto_open: bool,
auto_close: bool,
open_result: Optional[Dict[str, Any]],
main_result: Optional[Dict[str, Any]],
close_result: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
if not auto_open and not auto_close:
return main_result or {}
result: Dict[str, Any] = {"main_result": main_result or {}}
if auto_open:
result["open_model_result"] = open_result or {}
if auto_close:
result["close_model_result"] = close_result or {}
return result
@staticmethod
def _extract_open_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(task_params, dict):
return {}
open_params = task_params.get("open_model_params")
if isinstance(open_params, dict):
return open_params
open_params = task_params.get("open_params")
if isinstance(open_params, dict):
return open_params
return {}
@staticmethod
def _extract_close_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(task_params, dict):
return {}
close_params = task_params.get("close_model_params")
if isinstance(close_params, dict):
return close_params
close_params = task_params.get("close_params")
if isinstance(close_params, dict):
return close_params
return {}
def _is_ignorable_pre_batch_cleanup_error(self, software_id: str, error_message: str) -> bool:
if not error_message:
return False
markers = self._batch_manager.get_pre_batch_cleanup_ignore_error_markers(software_id)
if not markers:
return False
lowered = error_message.lower()
for marker in markers:
if marker.lower() in lowered:
return True
return False
def _is_ignorable_close_model_error(self, software_id: str, error_message: str) -> bool:
if not error_message:
return False
markers = self._batch_manager.get_close_model_ignore_error_markers(software_id)
if not markers:
return False
lowered = error_message.lower()
for marker in markers:
if marker.lower() in lowered:
return True
return False
@staticmethod
def _is_model_open_from_check_result(check_result: Dict[str, Any]) -> bool:
if not isinstance(check_result, dict):
return True
explicit_flags = [
"has_open_model",
"hasOpenModel",
"is_model_open",
"isModelOpen",
"opened",
"isOpened",
"project_opened",
"projectOpened",
]
for key in explicit_flags:
if key in check_result:
return bool(check_result.get(key))
non_empty_markers = [
"model_path",
"modelPath",
"file_path",
"filePath",
"current_model",
"currentModel",
"project_name",
"projectName",
"current_project",
"currentProject",
]
for key in non_empty_markers:
value = check_result.get(key)
if isinstance(value, str) and value.strip():
return True
if isinstance(value, dict) and value:
return True
# Unknown schema: default to close for safety.
return True
async def _log_event(
self,
operation: str,
details: str,
status: OperationStatus = OperationStatus.SUCCESS,
extra_data: Optional[dict] = None,
):
log_method = getattr(self._batch_manager, "log_execution_event", None)
if not callable(log_method):
return
try:
await log_method(
operation=operation,
details=details,
status=status,
extra_data=extra_data or {},
)
except Exception:
# Logging must not break task execution.
return
@staticmethod
def _is_successful_sync_response(plugin_response: dict) -> bool:
if not isinstance(plugin_response, dict):
return False
if plugin_response.get("error"):
return False
code_value = plugin_response.get("code")
try:
if code_value is not None and int(code_value) == 202:
# 202 usually means accepted/queued, not completed.
return False
except (TypeError, ValueError):
pass
status_value = plugin_response.get("status")
if isinstance(status_value, str) and status_value.lower() in {"accepted", "queued", "pending"}:
return False
if "success" in plugin_response:
return bool(plugin_response.get("success"))
if plugin_response.get("accepted") is True:
return False
if isinstance(status_value, str) and status_value.lower() in {"success", "ok", "completed", "done"}:
return True
try:
if code_value is not None and int(code_value) in {0, 200, 201}:
return True
except (TypeError, ValueError):
pass
if plugin_response.get("task_id") or plugin_response.get("taskId"):
return False
return False
@staticmethod
def _is_successful_open_response(plugin_response: dict) -> bool:
if not isinstance(plugin_response, dict):
return False
if plugin_response.get("error"):
return False
if "success" in plugin_response:
return bool(plugin_response.get("success"))
code_value = plugin_response.get("code")
try:
if code_value is not None and int(code_value) in {0, 200}:
return True
except (TypeError, ValueError):
pass
status_value = plugin_response.get("status")
if isinstance(status_value, str) and status_value.lower() in {"success", "ok"}:
return True
return False