from __future__ import annotations import asyncio import uuid from typing import Any, Dict, Optional from app.core.plugin_http_client import PluginSubmitError from app.models.cad_batch import PluginResultStatus from app.models.operation_log import OperationStatus class SerialBatchExecutor: """Global single-worker FIFO executor for batch items.""" def __init__(self, batch_manager, plugin_client, callback_registry): self._batch_manager = batch_manager self._plugin_client = plugin_client self._callback_registry = callback_registry self._queue: asyncio.Queue[str] = asyncio.Queue() self._worker_task: Optional[asyncio.Task] = None self._running = False self._pre_batch_cleanup_done: set[tuple[str, str]] = set() async def start(self): if self._worker_task and not self._worker_task.done(): return self._running = True self._worker_task = asyncio.create_task(self._worker_loop()) async def stop(self): self._running = False if self._worker_task: self._worker_task.cancel() try: await self._worker_task except asyncio.CancelledError: pass self._worker_task = None self._pre_batch_cleanup_done.clear() async def enqueue(self, item_id: str): await self._queue.put(item_id) async def _worker_loop(self): while self._running: item_id = await self._queue.get() software_id = None try: software_id = await self._process_item(item_id) finally: self._queue.task_done() if software_id: between_items_delay = self._batch_manager.get_between_items_delay_sec(software_id) if between_items_delay > 0: await self._log_event( operation="batch_between_items_delay", details=f"Delay {between_items_delay}s before next queued item", extra_data={"software_id": software_id, "delay_sec": between_items_delay}, ) await asyncio.sleep(between_items_delay) async def _process_item(self, item_id: str) -> Optional[str]: item = await self._batch_manager.get_item(item_id) if not item: return None # Item may already be marked failed during routing. if item.software_id is None: return None for attempt in range(item.max_retries + 1): error_message = None current_phase = "main_task" open_result = None main_result = None close_result = None auto_open = self._batch_manager.should_auto_open_model(item.software_id, item.task_type) auto_close = self._batch_manager.should_auto_close_model(item.software_id, item.task_type) try: current_phase = "pre_batch_cleanup" await self._ensure_pre_batch_cleanup(item=item, item_id=item_id, attempt=attempt) if auto_open: current_phase = "open_model" open_result = await self._execute_task_step( item=item, item_id=item_id, attempt=attempt, task_type="open_model", task_params=self._extract_open_task_params(item.task_params), phase=current_phase, ) await self._sleep_inter_step(item, phase="open_model_to_main") current_phase = item.task_type main_result = await self._execute_task_step( item=item, item_id=item_id, attempt=attempt, task_type=item.task_type, task_params=item.task_params, phase=current_phase, ) if auto_close: await self._sleep_inter_step(item, phase="main_to_close_model") current_phase = "close_model" try: close_result = await self._execute_task_step( item=item, item_id=item_id, attempt=attempt, task_type="close_model", task_params=self._extract_close_task_params(item.task_params), phase=current_phase, ) except PluginSubmitError as exc: error_message = str(exc) if not self._is_ignorable_close_model_error(item.software_id, error_message): raise close_result = {"ignored_error": error_message} await self._log_event( operation="batch_close_model_ignored", details=( f"Close model ignored item={item.id} attempt={attempt}: " f"{error_message}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "attempt": attempt, "software_id": item.software_id, "error_message": error_message, }, ) success_result = self._compose_success_result( auto_open=auto_open, auto_close=auto_close, open_result=open_result, main_result=main_result, close_result=close_result, ) await self._batch_manager.mark_item_succeeded(item_id, success_result) await self._log_event( operation="batch_item_succeeded", details=f"Item succeeded item={item.id} attempt={attempt}", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "attempt": attempt, "software_id": item.software_id, "auto_open": auto_open, "auto_close": auto_close, }, ) return item.software_id except PluginSubmitError as exc: error_message = str(exc) await self._log_event( operation="batch_step_failed", details=( f"Step failed item={item.id} phase={current_phase} attempt={attempt}: " f"{error_message}" ), status=OperationStatus.FAILED, extra_data={ "batch_id": item.batch_id, "item_id": item.id, "attempt": attempt, "phase": current_phase, "error_message": error_message, }, ) except Exception as exc: error_message = f"Unexpected execution error: {exc}" await self._log_event( operation="batch_step_unexpected_error", details=( f"Unexpected error item={item.id} phase={current_phase} attempt={attempt}: " f"{exc}" ), status=OperationStatus.FAILED, extra_data={ "batch_id": item.batch_id, "item_id": item.id, "attempt": attempt, "phase": current_phase, "error_message": str(exc), }, ) if attempt < item.max_retries: await self._batch_manager.mark_item_retrying(item_id, error_message, attempt + 1) backoff = self._batch_manager.get_retry_backoff_sec(item.software_id, attempt) await self._log_event( operation="batch_retry_scheduled", details=( f"Retry scheduled item={item.id} phase={current_phase} next_attempt={attempt + 1} " f"backoff_sec={backoff}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "attempt": attempt, "next_attempt": attempt + 1, "phase": current_phase, "backoff_sec": backoff, "error_message": error_message, }, ) if backoff > 0: await asyncio.sleep(backoff) continue await self._batch_manager.mark_item_failed(item_id, error_message) await self._log_event( operation="batch_item_failed_final", details=f"Item failed item={item.id} phase={current_phase}: {error_message}", status=OperationStatus.FAILED, extra_data={ "batch_id": item.batch_id, "item_id": item.id, "attempt": attempt, "phase": current_phase, "error_message": error_message, }, ) return item.software_id return item.software_id async def _execute_task_step( self, item, item_id: str, attempt: int, task_type: str, task_params: Dict[str, Any], phase: str, ) -> Dict[str, Any]: execution_id = str(uuid.uuid4()) await self._batch_manager.mark_item_dispatching(item_id, execution_id, attempt) completion_mode = self._batch_manager.get_task_completion_mode(item.software_id, task_type) if phase == "open_model" and completion_mode == "submit_only": raise PluginSubmitError( "open_model completion_mode=submit_only is not allowed; use sync or callback" ) callback_url = self._batch_manager.get_callback_endpoint_url() payload = { "execution_id": execution_id, "batch_id": item.batch_id, "item_id": item.id, "model_path": item.model_path, "task_type": task_type, "task_params": task_params, "callback_url": callback_url, "attempt": attempt, } await self._log_event( operation="batch_step_start", details=( f"Step start item={item.id} phase={phase} attempt={attempt} " f"task_type={task_type} completion_mode={completion_mode}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, "software_id": item.software_id, "task_type": task_type, "model_path": item.model_path, "completion_mode": completion_mode, }, ) plugin_response = await self._plugin_client.submit_task(item.software_id, task_type, payload) await self._log_event( operation="batch_step_submitted", details=f"Step submitted item={item.id} phase={phase} execution_id={execution_id}", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, "completion_mode": completion_mode, "plugin_response": plugin_response, }, ) # Revit shell_execute is async by taskId/statusUrl. Poll plugin task status directly. if item.software_id == "revit" and task_type == "shell_execute": polled_result = await self._poll_revit_shell_execute_result( item=item, attempt=attempt, phase=phase, plugin_response=plugin_response if isinstance(plugin_response, dict) else {}, execution_id=execution_id, ) if polled_result is not None: return polled_result if completion_mode == "submit_only": submit_result = {"completion_mode": "submit_only"} if isinstance(plugin_response, dict): submit_result.update(plugin_response) else: submit_result["raw_response"] = str(plugin_response) await self._log_event( operation="batch_step_submit_only_accepted", details=f"Submit-only accepted item={item.id} phase={phase} execution_id={execution_id}", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, }, ) return submit_result if completion_mode == "sync": if phase == "open_model": success_flag = self._is_successful_open_response(plugin_response) else: success_flag = self._is_successful_sync_response(plugin_response) if not success_flag: error_message = "Plugin sync task failed" if isinstance(plugin_response, dict): error_message = plugin_response.get("error") or error_message raise PluginSubmitError(f"{phase} sync failed: {error_message}") await self._log_event( operation="batch_step_sync_succeeded", details=f"Sync succeeded item={item.id} phase={phase} execution_id={execution_id}", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, }, ) return plugin_response if isinstance(plugin_response, dict) else {"raw_response": str(plugin_response)} await self._batch_manager.mark_item_waiting_callback(item_id, execution_id, plugin_response) await self._log_event( operation="batch_step_waiting_callback", details=f"Waiting callback item={item.id} phase={phase} execution_id={execution_id}", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, }, ) timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id) try: callback_payload = await self._callback_registry.wait_for_callback(execution_id, timeout_sec) except asyncio.TimeoutError as exc: raise PluginSubmitError(f"{phase} callback timeout for execution_id={execution_id}") from exc if callback_payload.status != PluginResultStatus.SUCCESS: error_message = callback_payload.error_message or "Plugin returned failed status" raise PluginSubmitError(f"{phase} callback failed: {error_message}") await self._log_event( operation="batch_step_callback_succeeded", details=f"Callback succeeded item={item.id} phase={phase} execution_id={execution_id}", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, }, ) return callback_payload.result if isinstance(callback_payload.result, dict) else {"result": callback_payload.result} async def _poll_revit_shell_execute_result( self, item, attempt: int, phase: str, plugin_response: Dict[str, Any], execution_id: str, ) -> Optional[Dict[str, Any]]: if not hasattr(self._plugin_client, "get_task_status"): return None data = plugin_response.get("data") if not isinstance(data, dict): data = {} task_id = data.get("taskId") or data.get("task_id") status_url = data.get("statusUrl") or data.get("status_url") if not task_id and not status_url: return None timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id) poll_interval_sec = 2.0 deadline = asyncio.get_event_loop().time() + timeout_sec last_status = None await self._log_event( operation="batch_step_polling_started", details=( f"Polling started item={item.id} phase={phase} execution_id={execution_id} " f"task_id={task_id or 'unknown'} timeout_sec={timeout_sec}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, "task_id": task_id, "status_url": status_url, "poll_interval_sec": poll_interval_sec, "timeout_sec": timeout_sec, }, ) while asyncio.get_event_loop().time() < deadline: status_response = await self._plugin_client.get_task_status( software_id=item.software_id, task_id=str(task_id) if task_id else None, status_url=str(status_url) if status_url else None, ) status_data = status_response.get("data") if isinstance(status_response.get("data"), dict) else {} status_value = ( status_data.get("status") or status_response.get("status") or "" ) lowered = str(status_value).strip().lower() if lowered and lowered != last_status: last_status = lowered await self._log_event( operation="batch_step_polling_status", details=( f"Polling status item={item.id} phase={phase} execution_id={execution_id} " f"status={status_value}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, "task_id": task_id, "status": status_value, }, ) if lowered in {"completed", "success", "succeeded", "done"}: await self._log_event( operation="batch_step_polling_succeeded", details=( f"Polling succeeded item={item.id} phase={phase} execution_id={execution_id} " f"task_id={task_id}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "execution_id": execution_id, "attempt": attempt, "phase": phase, "task_id": task_id, }, ) return status_response if isinstance(status_response, dict) else {"raw_response": str(status_response)} if lowered in {"failed", "cancelled", "canceled", "error"}: error_message = ( status_data.get("errorMessage") or status_data.get("error_message") or status_data.get("error") or status_response.get("message") or "Plugin task failed" ) raise PluginSubmitError(f"{phase} polling failed: {error_message}") await asyncio.sleep(poll_interval_sec) raise PluginSubmitError( f"{phase} polling timeout for task_id={task_id or 'unknown'} execution_id={execution_id}" ) async def _ensure_pre_batch_cleanup(self, item, item_id: str, attempt: int): key = (item.batch_id, item.software_id) if key in self._pre_batch_cleanup_done: return if not self._batch_manager.should_run_pre_batch_cleanup(item.software_id): self._pre_batch_cleanup_done.add(key) return await self._log_event( operation="batch_precheck_start", details=f"Precheck start batch={item.batch_id} software={item.software_id}", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "software_id": item.software_id, "attempt": attempt, }, ) should_close = True if self._batch_manager.should_check_open_before_pre_batch_cleanup(item.software_id): check_task_type = self._batch_manager.get_pre_batch_check_task_type(item.software_id) if not check_task_type: raise PluginSubmitError( f"pre_batch_check_open_enabled is true but pre_batch_check_task_type is missing for {item.software_id}" ) check_result = await self._execute_task_step( item=item, item_id=item_id, attempt=attempt, task_type=check_task_type, task_params={}, phase="pre_batch_check_open", ) should_close = self._is_model_open_from_check_result(check_result) await self._log_event( operation="batch_precheck_state_checked", details=( f"Precheck state checked batch={item.batch_id} software={item.software_id} " f"should_close={should_close}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "software_id": item.software_id, "attempt": attempt, "should_close": should_close, "check_result": check_result, }, ) if not should_close: self._pre_batch_cleanup_done.add(key) await self._log_event( operation="batch_precheck_skip_cleanup", details=f"Precheck skip cleanup batch={item.batch_id} software={item.software_id}", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "software_id": item.software_id, "attempt": attempt, }, ) return cleanup_task_type = self._batch_manager.get_pre_batch_cleanup_task_type(item.software_id) if not cleanup_task_type: raise PluginSubmitError( f"pre_batch_cleanup_enabled is true but pre_batch_cleanup_task_type is missing for {item.software_id}" ) cleanup_params = self._batch_manager.get_pre_batch_cleanup_task_params(item.software_id) try: await self._execute_task_step( item=item, item_id=item_id, attempt=attempt, task_type=cleanup_task_type, task_params=cleanup_params, phase=f"pre_batch_cleanup_{cleanup_task_type}", ) self._pre_batch_cleanup_done.add(key) await self._log_event( operation="batch_precheck_cleanup_succeeded", details=( f"Precheck cleanup succeeded batch={item.batch_id} software={item.software_id} " f"task_type={cleanup_task_type}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "software_id": item.software_id, "attempt": attempt, "cleanup_task_type": cleanup_task_type, }, ) except PluginSubmitError as exc: error_message = str(exc) if not self._is_ignorable_pre_batch_cleanup_error(item.software_id, error_message): raise self._pre_batch_cleanup_done.add(key) await self._log_event( operation="batch_precheck_cleanup_ignored", details=( f"Precheck cleanup ignored batch={item.batch_id} software={item.software_id} " f"task_type={cleanup_task_type}: {error_message}" ), extra_data={ "batch_id": item.batch_id, "item_id": item.id, "software_id": item.software_id, "attempt": attempt, "cleanup_task_type": cleanup_task_type, "error_message": error_message, }, ) async def _sleep_inter_step(self, item, phase: str): delay = self._batch_manager.get_inter_step_delay_sec(item.software_id) if delay <= 0: return await self._log_event( operation="batch_inter_step_delay", details=f"Delay {delay}s between steps ({phase})", extra_data={ "batch_id": item.batch_id, "item_id": item.id, "software_id": item.software_id, "phase": phase, "delay_sec": delay, }, ) await asyncio.sleep(delay) @staticmethod def _compose_success_result( auto_open: bool, auto_close: bool, open_result: Optional[Dict[str, Any]], main_result: Optional[Dict[str, Any]], close_result: Optional[Dict[str, Any]], ) -> Dict[str, Any]: if not auto_open and not auto_close: return main_result or {} result: Dict[str, Any] = {"main_result": main_result or {}} if auto_open: result["open_model_result"] = open_result or {} if auto_close: result["close_model_result"] = close_result or {} return result @staticmethod def _extract_open_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(task_params, dict): return {} open_params = task_params.get("open_model_params") if isinstance(open_params, dict): return open_params open_params = task_params.get("open_params") if isinstance(open_params, dict): return open_params return {} @staticmethod def _extract_close_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(task_params, dict): return {} close_params = task_params.get("close_model_params") if isinstance(close_params, dict): return close_params close_params = task_params.get("close_params") if isinstance(close_params, dict): return close_params return {} def _is_ignorable_pre_batch_cleanup_error(self, software_id: str, error_message: str) -> bool: if not error_message: return False markers = self._batch_manager.get_pre_batch_cleanup_ignore_error_markers(software_id) if not markers: return False lowered = error_message.lower() for marker in markers: if marker.lower() in lowered: return True return False def _is_ignorable_close_model_error(self, software_id: str, error_message: str) -> bool: if not error_message: return False markers = self._batch_manager.get_close_model_ignore_error_markers(software_id) if not markers: return False lowered = error_message.lower() for marker in markers: if marker.lower() in lowered: return True return False @staticmethod def _is_model_open_from_check_result(check_result: Dict[str, Any]) -> bool: if not isinstance(check_result, dict): return True explicit_flags = [ "has_open_model", "hasOpenModel", "is_model_open", "isModelOpen", "opened", "isOpened", "project_opened", "projectOpened", ] for key in explicit_flags: if key in check_result: return bool(check_result.get(key)) non_empty_markers = [ "model_path", "modelPath", "file_path", "filePath", "current_model", "currentModel", "project_name", "projectName", "current_project", "currentProject", ] for key in non_empty_markers: value = check_result.get(key) if isinstance(value, str) and value.strip(): return True if isinstance(value, dict) and value: return True # Unknown schema: default to close for safety. return True async def _log_event( self, operation: str, details: str, status: OperationStatus = OperationStatus.SUCCESS, extra_data: Optional[dict] = None, ): log_method = getattr(self._batch_manager, "log_execution_event", None) if not callable(log_method): return try: await log_method( operation=operation, details=details, status=status, extra_data=extra_data or {}, ) except Exception: # Logging must not break task execution. return @staticmethod def _is_successful_sync_response(plugin_response: dict) -> bool: if not isinstance(plugin_response, dict): return False if plugin_response.get("error"): return False code_value = plugin_response.get("code") try: if code_value is not None and int(code_value) == 202: # 202 usually means accepted/queued, not completed. return False except (TypeError, ValueError): pass status_value = plugin_response.get("status") if isinstance(status_value, str) and status_value.lower() in {"accepted", "queued", "pending"}: return False if "success" in plugin_response: return bool(plugin_response.get("success")) if plugin_response.get("accepted") is True: return False if isinstance(status_value, str) and status_value.lower() in {"success", "ok", "completed", "done"}: return True try: if code_value is not None and int(code_value) in {0, 200, 201}: return True except (TypeError, ValueError): pass if plugin_response.get("task_id") or plugin_response.get("taskId"): return False return False @staticmethod def _is_successful_open_response(plugin_response: dict) -> bool: if not isinstance(plugin_response, dict): return False if plugin_response.get("error"): return False if "success" in plugin_response: return bool(plugin_response.get("success")) code_value = plugin_response.get("code") try: if code_value is not None and int(code_value) in {0, 200}: return True except (TypeError, ValueError): pass status_value = plugin_response.get("status") if isinstance(status_value, str) and status_value.lower() in {"success", "ok"}: return True return False