- Added pre-batch cleanup functionality to SerialBatchExecutor, allowing for cleanup tasks before processing items. - Introduced new task execution phases and improved error handling for task submissions. - Implemented inter-step delays and between-items delays for better task management. - Updated logging to capture detailed events during batch processing. - Enhanced configuration options for plugins in software_config.yaml to support new features. - Added tests for pre-batch cleanup and auto-close scenarios to ensure robust handling of edge cases. - Created a PowerShell script for automated callback handling from Revit.
844 lines
32 KiB
Python
844 lines
32 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import uuid
|
|
from typing import Any, Dict, Optional
|
|
|
|
from app.core.plugin_http_client import PluginSubmitError
|
|
from app.models.cad_batch import PluginResultStatus
|
|
from app.models.operation_log import OperationStatus
|
|
|
|
|
|
class SerialBatchExecutor:
|
|
"""Global single-worker FIFO executor for batch items."""
|
|
|
|
def __init__(self, batch_manager, plugin_client, callback_registry):
|
|
self._batch_manager = batch_manager
|
|
self._plugin_client = plugin_client
|
|
self._callback_registry = callback_registry
|
|
|
|
self._queue: asyncio.Queue[str] = asyncio.Queue()
|
|
self._worker_task: Optional[asyncio.Task] = None
|
|
self._running = False
|
|
self._pre_batch_cleanup_done: set[tuple[str, str]] = set()
|
|
|
|
async def start(self):
|
|
if self._worker_task and not self._worker_task.done():
|
|
return
|
|
|
|
self._running = True
|
|
self._worker_task = asyncio.create_task(self._worker_loop())
|
|
|
|
async def stop(self):
|
|
self._running = False
|
|
|
|
if self._worker_task:
|
|
self._worker_task.cancel()
|
|
try:
|
|
await self._worker_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._worker_task = None
|
|
self._pre_batch_cleanup_done.clear()
|
|
|
|
async def enqueue(self, item_id: str):
|
|
await self._queue.put(item_id)
|
|
|
|
async def _worker_loop(self):
|
|
while self._running:
|
|
item_id = await self._queue.get()
|
|
software_id = None
|
|
try:
|
|
software_id = await self._process_item(item_id)
|
|
finally:
|
|
self._queue.task_done()
|
|
|
|
if software_id:
|
|
between_items_delay = self._batch_manager.get_between_items_delay_sec(software_id)
|
|
if between_items_delay > 0:
|
|
await self._log_event(
|
|
operation="batch_between_items_delay",
|
|
details=f"Delay {between_items_delay}s before next queued item",
|
|
extra_data={"software_id": software_id, "delay_sec": between_items_delay},
|
|
)
|
|
await asyncio.sleep(between_items_delay)
|
|
|
|
async def _process_item(self, item_id: str) -> Optional[str]:
|
|
item = await self._batch_manager.get_item(item_id)
|
|
if not item:
|
|
return None
|
|
|
|
# Item may already be marked failed during routing.
|
|
if item.software_id is None:
|
|
return None
|
|
|
|
for attempt in range(item.max_retries + 1):
|
|
error_message = None
|
|
current_phase = "main_task"
|
|
open_result = None
|
|
main_result = None
|
|
close_result = None
|
|
auto_open = self._batch_manager.should_auto_open_model(item.software_id, item.task_type)
|
|
auto_close = self._batch_manager.should_auto_close_model(item.software_id, item.task_type)
|
|
|
|
try:
|
|
current_phase = "pre_batch_cleanup"
|
|
await self._ensure_pre_batch_cleanup(item=item, item_id=item_id, attempt=attempt)
|
|
|
|
if auto_open:
|
|
current_phase = "open_model"
|
|
open_result = await self._execute_task_step(
|
|
item=item,
|
|
item_id=item_id,
|
|
attempt=attempt,
|
|
task_type="open_model",
|
|
task_params=self._extract_open_task_params(item.task_params),
|
|
phase=current_phase,
|
|
)
|
|
await self._sleep_inter_step(item, phase="open_model_to_main")
|
|
|
|
current_phase = item.task_type
|
|
main_result = await self._execute_task_step(
|
|
item=item,
|
|
item_id=item_id,
|
|
attempt=attempt,
|
|
task_type=item.task_type,
|
|
task_params=item.task_params,
|
|
phase=current_phase,
|
|
)
|
|
|
|
if auto_close:
|
|
await self._sleep_inter_step(item, phase="main_to_close_model")
|
|
current_phase = "close_model"
|
|
try:
|
|
close_result = await self._execute_task_step(
|
|
item=item,
|
|
item_id=item_id,
|
|
attempt=attempt,
|
|
task_type="close_model",
|
|
task_params=self._extract_close_task_params(item.task_params),
|
|
phase=current_phase,
|
|
)
|
|
except PluginSubmitError as exc:
|
|
error_message = str(exc)
|
|
if not self._is_ignorable_close_model_error(item.software_id, error_message):
|
|
raise
|
|
close_result = {"ignored_error": error_message}
|
|
await self._log_event(
|
|
operation="batch_close_model_ignored",
|
|
details=(
|
|
f"Close model ignored item={item.id} attempt={attempt}: "
|
|
f"{error_message}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"attempt": attempt,
|
|
"software_id": item.software_id,
|
|
"error_message": error_message,
|
|
},
|
|
)
|
|
|
|
success_result = self._compose_success_result(
|
|
auto_open=auto_open,
|
|
auto_close=auto_close,
|
|
open_result=open_result,
|
|
main_result=main_result,
|
|
close_result=close_result,
|
|
)
|
|
await self._batch_manager.mark_item_succeeded(item_id, success_result)
|
|
await self._log_event(
|
|
operation="batch_item_succeeded",
|
|
details=f"Item succeeded item={item.id} attempt={attempt}",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"attempt": attempt,
|
|
"software_id": item.software_id,
|
|
"auto_open": auto_open,
|
|
"auto_close": auto_close,
|
|
},
|
|
)
|
|
return item.software_id
|
|
except PluginSubmitError as exc:
|
|
error_message = str(exc)
|
|
await self._log_event(
|
|
operation="batch_step_failed",
|
|
details=(
|
|
f"Step failed item={item.id} phase={current_phase} attempt={attempt}: "
|
|
f"{error_message}"
|
|
),
|
|
status=OperationStatus.FAILED,
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"attempt": attempt,
|
|
"phase": current_phase,
|
|
"error_message": error_message,
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
error_message = f"Unexpected execution error: {exc}"
|
|
await self._log_event(
|
|
operation="batch_step_unexpected_error",
|
|
details=(
|
|
f"Unexpected error item={item.id} phase={current_phase} attempt={attempt}: "
|
|
f"{exc}"
|
|
),
|
|
status=OperationStatus.FAILED,
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"attempt": attempt,
|
|
"phase": current_phase,
|
|
"error_message": str(exc),
|
|
},
|
|
)
|
|
|
|
if attempt < item.max_retries:
|
|
await self._batch_manager.mark_item_retrying(item_id, error_message, attempt + 1)
|
|
backoff = self._batch_manager.get_retry_backoff_sec(item.software_id, attempt)
|
|
await self._log_event(
|
|
operation="batch_retry_scheduled",
|
|
details=(
|
|
f"Retry scheduled item={item.id} phase={current_phase} next_attempt={attempt + 1} "
|
|
f"backoff_sec={backoff}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"attempt": attempt,
|
|
"next_attempt": attempt + 1,
|
|
"phase": current_phase,
|
|
"backoff_sec": backoff,
|
|
"error_message": error_message,
|
|
},
|
|
)
|
|
if backoff > 0:
|
|
await asyncio.sleep(backoff)
|
|
continue
|
|
|
|
await self._batch_manager.mark_item_failed(item_id, error_message)
|
|
await self._log_event(
|
|
operation="batch_item_failed_final",
|
|
details=f"Item failed item={item.id} phase={current_phase}: {error_message}",
|
|
status=OperationStatus.FAILED,
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"attempt": attempt,
|
|
"phase": current_phase,
|
|
"error_message": error_message,
|
|
},
|
|
)
|
|
return item.software_id
|
|
|
|
return item.software_id
|
|
|
|
async def _execute_task_step(
|
|
self,
|
|
item,
|
|
item_id: str,
|
|
attempt: int,
|
|
task_type: str,
|
|
task_params: Dict[str, Any],
|
|
phase: str,
|
|
) -> Dict[str, Any]:
|
|
execution_id = str(uuid.uuid4())
|
|
await self._batch_manager.mark_item_dispatching(item_id, execution_id, attempt)
|
|
|
|
completion_mode = self._batch_manager.get_task_completion_mode(item.software_id, task_type)
|
|
if phase == "open_model" and completion_mode == "submit_only":
|
|
raise PluginSubmitError(
|
|
"open_model completion_mode=submit_only is not allowed; use sync or callback"
|
|
)
|
|
|
|
callback_url = self._batch_manager.get_callback_endpoint_url()
|
|
payload = {
|
|
"execution_id": execution_id,
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"model_path": item.model_path,
|
|
"task_type": task_type,
|
|
"task_params": task_params,
|
|
"callback_url": callback_url,
|
|
"attempt": attempt,
|
|
}
|
|
|
|
await self._log_event(
|
|
operation="batch_step_start",
|
|
details=(
|
|
f"Step start item={item.id} phase={phase} attempt={attempt} "
|
|
f"task_type={task_type} completion_mode={completion_mode}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
"software_id": item.software_id,
|
|
"task_type": task_type,
|
|
"model_path": item.model_path,
|
|
"completion_mode": completion_mode,
|
|
},
|
|
)
|
|
|
|
plugin_response = await self._plugin_client.submit_task(item.software_id, task_type, payload)
|
|
await self._log_event(
|
|
operation="batch_step_submitted",
|
|
details=f"Step submitted item={item.id} phase={phase} execution_id={execution_id}",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
"completion_mode": completion_mode,
|
|
"plugin_response": plugin_response,
|
|
},
|
|
)
|
|
|
|
# Revit shell_execute is async by taskId/statusUrl. Poll plugin task status directly.
|
|
if item.software_id == "revit" and task_type == "shell_execute":
|
|
polled_result = await self._poll_revit_shell_execute_result(
|
|
item=item,
|
|
attempt=attempt,
|
|
phase=phase,
|
|
plugin_response=plugin_response if isinstance(plugin_response, dict) else {},
|
|
execution_id=execution_id,
|
|
)
|
|
if polled_result is not None:
|
|
return polled_result
|
|
|
|
if completion_mode == "submit_only":
|
|
submit_result = {"completion_mode": "submit_only"}
|
|
if isinstance(plugin_response, dict):
|
|
submit_result.update(plugin_response)
|
|
else:
|
|
submit_result["raw_response"] = str(plugin_response)
|
|
|
|
await self._log_event(
|
|
operation="batch_step_submit_only_accepted",
|
|
details=f"Submit-only accepted item={item.id} phase={phase} execution_id={execution_id}",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
},
|
|
)
|
|
return submit_result
|
|
|
|
if completion_mode == "sync":
|
|
if phase == "open_model":
|
|
success_flag = self._is_successful_open_response(plugin_response)
|
|
else:
|
|
success_flag = self._is_successful_sync_response(plugin_response)
|
|
if not success_flag:
|
|
error_message = "Plugin sync task failed"
|
|
if isinstance(plugin_response, dict):
|
|
error_message = plugin_response.get("error") or error_message
|
|
raise PluginSubmitError(f"{phase} sync failed: {error_message}")
|
|
|
|
await self._log_event(
|
|
operation="batch_step_sync_succeeded",
|
|
details=f"Sync succeeded item={item.id} phase={phase} execution_id={execution_id}",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
},
|
|
)
|
|
return plugin_response if isinstance(plugin_response, dict) else {"raw_response": str(plugin_response)}
|
|
|
|
await self._batch_manager.mark_item_waiting_callback(item_id, execution_id, plugin_response)
|
|
await self._log_event(
|
|
operation="batch_step_waiting_callback",
|
|
details=f"Waiting callback item={item.id} phase={phase} execution_id={execution_id}",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
},
|
|
)
|
|
|
|
timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id)
|
|
try:
|
|
callback_payload = await self._callback_registry.wait_for_callback(execution_id, timeout_sec)
|
|
except asyncio.TimeoutError as exc:
|
|
raise PluginSubmitError(f"{phase} callback timeout for execution_id={execution_id}") from exc
|
|
|
|
if callback_payload.status != PluginResultStatus.SUCCESS:
|
|
error_message = callback_payload.error_message or "Plugin returned failed status"
|
|
raise PluginSubmitError(f"{phase} callback failed: {error_message}")
|
|
|
|
await self._log_event(
|
|
operation="batch_step_callback_succeeded",
|
|
details=f"Callback succeeded item={item.id} phase={phase} execution_id={execution_id}",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
},
|
|
)
|
|
return callback_payload.result if isinstance(callback_payload.result, dict) else {"result": callback_payload.result}
|
|
|
|
async def _poll_revit_shell_execute_result(
|
|
self,
|
|
item,
|
|
attempt: int,
|
|
phase: str,
|
|
plugin_response: Dict[str, Any],
|
|
execution_id: str,
|
|
) -> Optional[Dict[str, Any]]:
|
|
if not hasattr(self._plugin_client, "get_task_status"):
|
|
return None
|
|
|
|
data = plugin_response.get("data")
|
|
if not isinstance(data, dict):
|
|
data = {}
|
|
task_id = data.get("taskId") or data.get("task_id")
|
|
status_url = data.get("statusUrl") or data.get("status_url")
|
|
if not task_id and not status_url:
|
|
return None
|
|
|
|
timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id)
|
|
poll_interval_sec = 2.0
|
|
deadline = asyncio.get_event_loop().time() + timeout_sec
|
|
last_status = None
|
|
|
|
await self._log_event(
|
|
operation="batch_step_polling_started",
|
|
details=(
|
|
f"Polling started item={item.id} phase={phase} execution_id={execution_id} "
|
|
f"task_id={task_id or 'unknown'} timeout_sec={timeout_sec}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
"task_id": task_id,
|
|
"status_url": status_url,
|
|
"poll_interval_sec": poll_interval_sec,
|
|
"timeout_sec": timeout_sec,
|
|
},
|
|
)
|
|
|
|
while asyncio.get_event_loop().time() < deadline:
|
|
status_response = await self._plugin_client.get_task_status(
|
|
software_id=item.software_id,
|
|
task_id=str(task_id) if task_id else None,
|
|
status_url=str(status_url) if status_url else None,
|
|
)
|
|
status_data = status_response.get("data") if isinstance(status_response.get("data"), dict) else {}
|
|
status_value = (
|
|
status_data.get("status")
|
|
or status_response.get("status")
|
|
or ""
|
|
)
|
|
lowered = str(status_value).strip().lower()
|
|
|
|
if lowered and lowered != last_status:
|
|
last_status = lowered
|
|
await self._log_event(
|
|
operation="batch_step_polling_status",
|
|
details=(
|
|
f"Polling status item={item.id} phase={phase} execution_id={execution_id} "
|
|
f"status={status_value}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
"task_id": task_id,
|
|
"status": status_value,
|
|
},
|
|
)
|
|
|
|
if lowered in {"completed", "success", "succeeded", "done"}:
|
|
await self._log_event(
|
|
operation="batch_step_polling_succeeded",
|
|
details=(
|
|
f"Polling succeeded item={item.id} phase={phase} execution_id={execution_id} "
|
|
f"task_id={task_id}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"execution_id": execution_id,
|
|
"attempt": attempt,
|
|
"phase": phase,
|
|
"task_id": task_id,
|
|
},
|
|
)
|
|
return status_response if isinstance(status_response, dict) else {"raw_response": str(status_response)}
|
|
|
|
if lowered in {"failed", "cancelled", "canceled", "error"}:
|
|
error_message = (
|
|
status_data.get("errorMessage")
|
|
or status_data.get("error_message")
|
|
or status_data.get("error")
|
|
or status_response.get("message")
|
|
or "Plugin task failed"
|
|
)
|
|
raise PluginSubmitError(f"{phase} polling failed: {error_message}")
|
|
|
|
await asyncio.sleep(poll_interval_sec)
|
|
|
|
raise PluginSubmitError(
|
|
f"{phase} polling timeout for task_id={task_id or 'unknown'} execution_id={execution_id}"
|
|
)
|
|
|
|
async def _ensure_pre_batch_cleanup(self, item, item_id: str, attempt: int):
|
|
key = (item.batch_id, item.software_id)
|
|
if key in self._pre_batch_cleanup_done:
|
|
return
|
|
|
|
if not self._batch_manager.should_run_pre_batch_cleanup(item.software_id):
|
|
self._pre_batch_cleanup_done.add(key)
|
|
return
|
|
|
|
await self._log_event(
|
|
operation="batch_precheck_start",
|
|
details=f"Precheck start batch={item.batch_id} software={item.software_id}",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"software_id": item.software_id,
|
|
"attempt": attempt,
|
|
},
|
|
)
|
|
|
|
should_close = True
|
|
if self._batch_manager.should_check_open_before_pre_batch_cleanup(item.software_id):
|
|
check_task_type = self._batch_manager.get_pre_batch_check_task_type(item.software_id)
|
|
if not check_task_type:
|
|
raise PluginSubmitError(
|
|
f"pre_batch_check_open_enabled is true but pre_batch_check_task_type is missing for {item.software_id}"
|
|
)
|
|
|
|
check_result = await self._execute_task_step(
|
|
item=item,
|
|
item_id=item_id,
|
|
attempt=attempt,
|
|
task_type=check_task_type,
|
|
task_params={},
|
|
phase="pre_batch_check_open",
|
|
)
|
|
should_close = self._is_model_open_from_check_result(check_result)
|
|
await self._log_event(
|
|
operation="batch_precheck_state_checked",
|
|
details=(
|
|
f"Precheck state checked batch={item.batch_id} software={item.software_id} "
|
|
f"should_close={should_close}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"software_id": item.software_id,
|
|
"attempt": attempt,
|
|
"should_close": should_close,
|
|
"check_result": check_result,
|
|
},
|
|
)
|
|
|
|
if not should_close:
|
|
self._pre_batch_cleanup_done.add(key)
|
|
await self._log_event(
|
|
operation="batch_precheck_skip_cleanup",
|
|
details=f"Precheck skip cleanup batch={item.batch_id} software={item.software_id}",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"software_id": item.software_id,
|
|
"attempt": attempt,
|
|
},
|
|
)
|
|
return
|
|
|
|
cleanup_task_type = self._batch_manager.get_pre_batch_cleanup_task_type(item.software_id)
|
|
if not cleanup_task_type:
|
|
raise PluginSubmitError(
|
|
f"pre_batch_cleanup_enabled is true but pre_batch_cleanup_task_type is missing for {item.software_id}"
|
|
)
|
|
|
|
cleanup_params = self._batch_manager.get_pre_batch_cleanup_task_params(item.software_id)
|
|
try:
|
|
await self._execute_task_step(
|
|
item=item,
|
|
item_id=item_id,
|
|
attempt=attempt,
|
|
task_type=cleanup_task_type,
|
|
task_params=cleanup_params,
|
|
phase=f"pre_batch_cleanup_{cleanup_task_type}",
|
|
)
|
|
self._pre_batch_cleanup_done.add(key)
|
|
await self._log_event(
|
|
operation="batch_precheck_cleanup_succeeded",
|
|
details=(
|
|
f"Precheck cleanup succeeded batch={item.batch_id} software={item.software_id} "
|
|
f"task_type={cleanup_task_type}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"software_id": item.software_id,
|
|
"attempt": attempt,
|
|
"cleanup_task_type": cleanup_task_type,
|
|
},
|
|
)
|
|
except PluginSubmitError as exc:
|
|
error_message = str(exc)
|
|
if not self._is_ignorable_pre_batch_cleanup_error(item.software_id, error_message):
|
|
raise
|
|
|
|
self._pre_batch_cleanup_done.add(key)
|
|
await self._log_event(
|
|
operation="batch_precheck_cleanup_ignored",
|
|
details=(
|
|
f"Precheck cleanup ignored batch={item.batch_id} software={item.software_id} "
|
|
f"task_type={cleanup_task_type}: {error_message}"
|
|
),
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"software_id": item.software_id,
|
|
"attempt": attempt,
|
|
"cleanup_task_type": cleanup_task_type,
|
|
"error_message": error_message,
|
|
},
|
|
)
|
|
|
|
async def _sleep_inter_step(self, item, phase: str):
|
|
delay = self._batch_manager.get_inter_step_delay_sec(item.software_id)
|
|
if delay <= 0:
|
|
return
|
|
|
|
await self._log_event(
|
|
operation="batch_inter_step_delay",
|
|
details=f"Delay {delay}s between steps ({phase})",
|
|
extra_data={
|
|
"batch_id": item.batch_id,
|
|
"item_id": item.id,
|
|
"software_id": item.software_id,
|
|
"phase": phase,
|
|
"delay_sec": delay,
|
|
},
|
|
)
|
|
await asyncio.sleep(delay)
|
|
|
|
@staticmethod
|
|
def _compose_success_result(
|
|
auto_open: bool,
|
|
auto_close: bool,
|
|
open_result: Optional[Dict[str, Any]],
|
|
main_result: Optional[Dict[str, Any]],
|
|
close_result: Optional[Dict[str, Any]],
|
|
) -> Dict[str, Any]:
|
|
if not auto_open and not auto_close:
|
|
return main_result or {}
|
|
|
|
result: Dict[str, Any] = {"main_result": main_result or {}}
|
|
if auto_open:
|
|
result["open_model_result"] = open_result or {}
|
|
if auto_close:
|
|
result["close_model_result"] = close_result or {}
|
|
return result
|
|
|
|
@staticmethod
|
|
def _extract_open_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not isinstance(task_params, dict):
|
|
return {}
|
|
|
|
open_params = task_params.get("open_model_params")
|
|
if isinstance(open_params, dict):
|
|
return open_params
|
|
|
|
open_params = task_params.get("open_params")
|
|
if isinstance(open_params, dict):
|
|
return open_params
|
|
|
|
return {}
|
|
|
|
@staticmethod
|
|
def _extract_close_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not isinstance(task_params, dict):
|
|
return {}
|
|
|
|
close_params = task_params.get("close_model_params")
|
|
if isinstance(close_params, dict):
|
|
return close_params
|
|
|
|
close_params = task_params.get("close_params")
|
|
if isinstance(close_params, dict):
|
|
return close_params
|
|
|
|
return {}
|
|
|
|
def _is_ignorable_pre_batch_cleanup_error(self, software_id: str, error_message: str) -> bool:
|
|
if not error_message:
|
|
return False
|
|
|
|
markers = self._batch_manager.get_pre_batch_cleanup_ignore_error_markers(software_id)
|
|
if not markers:
|
|
return False
|
|
|
|
lowered = error_message.lower()
|
|
for marker in markers:
|
|
if marker.lower() in lowered:
|
|
return True
|
|
return False
|
|
|
|
def _is_ignorable_close_model_error(self, software_id: str, error_message: str) -> bool:
|
|
if not error_message:
|
|
return False
|
|
|
|
markers = self._batch_manager.get_close_model_ignore_error_markers(software_id)
|
|
if not markers:
|
|
return False
|
|
|
|
lowered = error_message.lower()
|
|
for marker in markers:
|
|
if marker.lower() in lowered:
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def _is_model_open_from_check_result(check_result: Dict[str, Any]) -> bool:
|
|
if not isinstance(check_result, dict):
|
|
return True
|
|
|
|
explicit_flags = [
|
|
"has_open_model",
|
|
"hasOpenModel",
|
|
"is_model_open",
|
|
"isModelOpen",
|
|
"opened",
|
|
"isOpened",
|
|
"project_opened",
|
|
"projectOpened",
|
|
]
|
|
for key in explicit_flags:
|
|
if key in check_result:
|
|
return bool(check_result.get(key))
|
|
|
|
non_empty_markers = [
|
|
"model_path",
|
|
"modelPath",
|
|
"file_path",
|
|
"filePath",
|
|
"current_model",
|
|
"currentModel",
|
|
"project_name",
|
|
"projectName",
|
|
"current_project",
|
|
"currentProject",
|
|
]
|
|
for key in non_empty_markers:
|
|
value = check_result.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
return True
|
|
if isinstance(value, dict) and value:
|
|
return True
|
|
|
|
# Unknown schema: default to close for safety.
|
|
return True
|
|
|
|
async def _log_event(
|
|
self,
|
|
operation: str,
|
|
details: str,
|
|
status: OperationStatus = OperationStatus.SUCCESS,
|
|
extra_data: Optional[dict] = None,
|
|
):
|
|
log_method = getattr(self._batch_manager, "log_execution_event", None)
|
|
if not callable(log_method):
|
|
return
|
|
|
|
try:
|
|
await log_method(
|
|
operation=operation,
|
|
details=details,
|
|
status=status,
|
|
extra_data=extra_data or {},
|
|
)
|
|
except Exception:
|
|
# Logging must not break task execution.
|
|
return
|
|
|
|
@staticmethod
|
|
def _is_successful_sync_response(plugin_response: dict) -> bool:
|
|
if not isinstance(plugin_response, dict):
|
|
return False
|
|
|
|
if plugin_response.get("error"):
|
|
return False
|
|
|
|
code_value = plugin_response.get("code")
|
|
try:
|
|
if code_value is not None and int(code_value) == 202:
|
|
# 202 usually means accepted/queued, not completed.
|
|
return False
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
status_value = plugin_response.get("status")
|
|
if isinstance(status_value, str) and status_value.lower() in {"accepted", "queued", "pending"}:
|
|
return False
|
|
|
|
if "success" in plugin_response:
|
|
return bool(plugin_response.get("success"))
|
|
|
|
if plugin_response.get("accepted") is True:
|
|
return False
|
|
|
|
if isinstance(status_value, str) and status_value.lower() in {"success", "ok", "completed", "done"}:
|
|
return True
|
|
|
|
try:
|
|
if code_value is not None and int(code_value) in {0, 200, 201}:
|
|
return True
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
if plugin_response.get("task_id") or plugin_response.get("taskId"):
|
|
return False
|
|
|
|
return False
|
|
|
|
@staticmethod
|
|
def _is_successful_open_response(plugin_response: dict) -> bool:
|
|
if not isinstance(plugin_response, dict):
|
|
return False
|
|
|
|
if plugin_response.get("error"):
|
|
return False
|
|
|
|
if "success" in plugin_response:
|
|
return bool(plugin_response.get("success"))
|
|
|
|
code_value = plugin_response.get("code")
|
|
try:
|
|
if code_value is not None and int(code_value) in {0, 200}:
|
|
return True
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
status_value = plugin_response.get("status")
|
|
if isinstance(status_value, str) and status_value.lower() in {"success", "ok"}:
|
|
return True
|
|
|
|
return False
|