From 08623bf4d6005b2e185295bf27139f9549b4d0aa Mon Sep 17 00:00:00 2001 From: sladro Date: Tue, 3 Mar 2026 16:13:19 +0800 Subject: [PATCH] feat: Enhance SerialBatchExecutor with pre-batch cleanup and task execution improvements - Added pre-batch cleanup functionality to SerialBatchExecutor, allowing for cleanup tasks before processing items. - Introduced new task execution phases and improved error handling for task submissions. - Implemented inter-step delays and between-items delays for better task management. - Updated logging to capture detailed events during batch processing. - Enhanced configuration options for plugins in software_config.yaml to support new features. - Added tests for pre-batch cleanup and auto-close scenarios to ensure robust handling of edge cases. - Created a PowerShell script for automated callback handling from Revit. --- app/api/v1/websocket.py | 16 + app/core/cad_batch_manager.py | 132 ++++- app/core/plugin_http_client.py | 54 +- app/core/serial_batch_executor.py | 813 ++++++++++++++++++++++++++-- app/main.py | 21 +- cad-batch-plan.md | 18 +- configs/software_config.yaml | 46 ++ scripts/auto_revit_callback.ps1 | 61 +++ tests/test_serial_batch_executor.py | 111 +++- 命令.md | 4 +- 10 files changed, 1218 insertions(+), 58 deletions(-) create mode 100644 scripts/auto_revit_callback.ps1 diff --git a/app/api/v1/websocket.py b/app/api/v1/websocket.py index 275fe62..15942ea 100644 --- a/app/api/v1/websocket.py +++ b/app/api/v1/websocket.py @@ -152,6 +152,13 @@ async def handle_client_message(message: dict, client_id: str, user_id: str): from app.models.operation_log import ActionType, OperationStatus message_type = message.get("type") + logger.info( + "WS recv: client_id=%s user_id=%s type=%s keys=%s", + client_id, + user_id, + message_type, + sorted(list(message.keys())), + ) if message_type == WSMessageType.PING: # 心跳响应 @@ -912,6 +919,13 @@ async def handle_client_message(message: dict, client_id: str, user_id: str): items = message.get("items") batch_name = message.get("batch_name") metadata = message.get("metadata", {}) + logger.info( + "WS submit_batch_tasks: client_id=%s user_id=%s batch_name=%s item_count=%s", + client_id, + user_id, + batch_name, + len(items) if isinstance(items, list) else "invalid", + ) if not items or not isinstance(items, list): await websocket_manager.send_personal_message({ @@ -946,6 +960,7 @@ async def handle_client_message(message: dict, client_id: str, user_id: str): "timestamp": websocket_manager._get_timestamp() }, client_id) except Exception as e: + logger.exception("WS submit_batch_tasks failed: client_id=%s error=%s", client_id, e) await websocket_manager.send_personal_message({ "type": MessageType.ERROR, "message": f"failed to submit batch tasks: {str(e)}", @@ -1167,6 +1182,7 @@ async def handle_client_message(message: dict, client_id: str, user_id: str): else: # 未知消息类型 + logger.warning("WS unknown message type: client_id=%s type=%s", client_id, message_type) await websocket_manager.send_personal_message({ "type": MessageType.ERROR, "message": f"未知的消息类型: {message_type}", diff --git a/app/core/cad_batch_manager.py b/app/core/cad_batch_manager.py index 3639db2..ac40cf6 100644 --- a/app/core/cad_batch_manager.py +++ b/app/core/cad_batch_manager.py @@ -1,6 +1,7 @@ -from __future__ import annotations +from __future__ import annotations import asyncio +import logging import uuid from datetime import datetime from typing import Dict, List, Optional, Set @@ -21,6 +22,9 @@ from app.models.cad_batch import ( from app.models.operation_log import OperationStatus +logger = logging.getLogger(__name__) + + TERMINAL_BATCH_STATUSES: Set[BatchStatus] = { BatchStatus.COMPLETED, BatchStatus.COMPLETED_WITH_ERRORS, @@ -178,10 +182,127 @@ class CadBatchManager: def get_task_completion_mode(self, software_id: str, task_type: str) -> str: task_cfg = software_config.get_plugin_task_config(software_id, task_type) or {} mode = task_cfg.get("completion_mode", "callback") - if mode not in {"callback", "sync"}: + if mode not in {"callback", "sync", "submit_only"}: return "callback" return mode + def should_auto_open_model(self, software_id: str, task_type: str) -> bool: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + enabled = bool(plugin_cfg.get("auto_open_model_before_tasks", False)) + if not enabled: + return False + + exclude = plugin_cfg.get("auto_open_exclude_task_types", ["open_model", "close_model"]) + if isinstance(exclude, list) and task_type in exclude: + return False + + include = plugin_cfg.get("auto_open_include_task_types") + if isinstance(include, list) and include: + return task_type in include + + return True + + def should_auto_close_model(self, software_id: str, task_type: str) -> bool: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + enabled = bool(plugin_cfg.get("auto_close_model_after_tasks", False)) + if not enabled: + return False + + exclude = plugin_cfg.get("auto_close_exclude_task_types", ["open_model", "close_model"]) + if isinstance(exclude, list) and task_type in exclude: + return False + + include = plugin_cfg.get("auto_close_include_task_types") + if isinstance(include, list) and include: + return task_type in include + + return True + + def get_inter_step_delay_sec(self, software_id: str) -> float: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + value = plugin_cfg.get("inter_step_delay_sec", 0) + try: + delay = float(value) + except (TypeError, ValueError): + return 0 + return max(0.0, delay) + + def get_between_items_delay_sec(self, software_id: str) -> float: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + value = plugin_cfg.get("between_items_delay_sec", 0) + try: + delay = float(value) + except (TypeError, ValueError): + return 0 + return max(0.0, delay) + + def should_run_pre_batch_cleanup(self, software_id: str) -> bool: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + return bool(plugin_cfg.get("pre_batch_cleanup_enabled", False)) + + def get_pre_batch_cleanup_task_type(self, software_id: str) -> Optional[str]: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + task_type = plugin_cfg.get("pre_batch_cleanup_task_type") + if isinstance(task_type, str) and task_type.strip(): + return task_type.strip() + return None + + def get_pre_batch_cleanup_task_params(self, software_id: str) -> Dict[str, object]: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + params = plugin_cfg.get("pre_batch_cleanup_task_params") + return params if isinstance(params, dict) else {} + + def get_pre_batch_cleanup_ignore_error_markers(self, software_id: str) -> List[str]: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + markers = plugin_cfg.get("pre_batch_cleanup_ignore_error_markers") + if not isinstance(markers, list): + return [] + normalized: List[str] = [] + for marker in markers: + if isinstance(marker, str): + value = marker.strip() + if value: + normalized.append(value) + return normalized + + def get_close_model_ignore_error_markers(self, software_id: str) -> List[str]: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + markers = plugin_cfg.get("close_model_ignore_error_markers") + if not isinstance(markers, list): + return [] + normalized: List[str] = [] + for marker in markers: + if isinstance(marker, str): + value = marker.strip() + if value: + normalized.append(value) + return normalized + + def should_check_open_before_pre_batch_cleanup(self, software_id: str) -> bool: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + return bool(plugin_cfg.get("pre_batch_check_open_enabled", False)) + + def get_pre_batch_check_task_type(self, software_id: str) -> Optional[str]: + plugin_cfg = software_config.get_plugin_config(software_id) or {} + task_type = plugin_cfg.get("pre_batch_check_task_type") + if isinstance(task_type, str) and task_type.strip(): + return task_type.strip() + return None + + async def log_execution_event( + self, + operation: str, + details: str, + status: OperationStatus = OperationStatus.SUCCESS, + extra_data: Optional[dict] = None, + ): + await self._log_system_operation( + operation=operation, + details=details, + status=status, + extra_data=extra_data or {}, + ) + def validate_callback_token(self, software_id: str, token: Optional[str]) -> bool: plugin_cfg = software_config.get_plugin_config(software_id) or {} expected = plugin_cfg.get("callback_token") @@ -375,6 +496,13 @@ class CadBatchManager: ) async def _log_system_operation(self, operation: str, details: str, status=OperationStatus.SUCCESS, **kwargs): + extra_data = kwargs.get("extra_data") + prefix = "[batch]" + if status == OperationStatus.FAILED: + logger.warning("%s %s | %s | extra=%s", prefix, operation, details, extra_data or {}) + else: + logger.info("%s %s | %s | extra=%s", prefix, operation, details, extra_data or {}) + if self._log_manager: await self._log_manager.log_system_operation( operation=operation, diff --git a/app/core/plugin_http_client.py b/app/core/plugin_http_client.py index 8c9d37a..f3b6077 100644 --- a/app/core/plugin_http_client.py +++ b/app/core/plugin_http_client.py @@ -25,8 +25,8 @@ class PluginHttpClient: base_url = plugin_config.get("base_url") submit_path = plugin_config.get("submit_path", "/api/plugin/tasks") - timeout = int(plugin_config.get("request_timeout_sec", 10)) task_config = self._config_provider.get_plugin_task_config(software_id, task_type) or {} + timeout = int(task_config.get("request_timeout_sec", plugin_config.get("request_timeout_sec", 10))) submit_path = task_config.get("path", submit_path) body_mode = task_config.get("body_mode", "passthrough") @@ -43,6 +43,35 @@ class PluginHttpClient: except Exception as exc: raise PluginSubmitError(str(exc)) from exc + async def get_task_status( + self, + software_id: str, + task_id: Optional[str] = None, + status_url: Optional[str] = None, + timeout_sec: int = 10, + ) -> Dict[str, Any]: + plugin_config = self._config_provider.get_plugin_config(software_id) + if not plugin_config: + raise PluginSubmitError(f"Plugin config for software '{software_id}' not found") + + base_url = plugin_config.get("base_url") + if not base_url: + raise PluginSubmitError(f"Plugin base_url for software '{software_id}' is missing") + + if status_url: + url = parse.urljoin(base_url.rstrip("/") + "/", str(status_url).lstrip("/")) + elif task_id: + url = parse.urljoin(base_url.rstrip("/") + "/", f"api/task/{task_id}") + else: + raise PluginSubmitError("Either task_id or status_url must be provided for polling") + + try: + return await asyncio.to_thread(self._get_json, url, int(timeout_sec)) + except PluginSubmitError: + raise + except Exception as exc: + raise PluginSubmitError(str(exc)) from exc + @staticmethod def _build_payload(payload: Dict[str, Any], body_mode: str) -> Dict[str, Any]: task_params = payload.get("task_params", {}) if isinstance(payload.get("task_params"), dict) else {} @@ -98,3 +127,26 @@ class PluginHttpClient: raise PluginSubmitError(f"Plugin HTTP {exc.code}: {body}") from exc except error.URLError as exc: raise PluginSubmitError(f"Plugin request failed: {exc.reason}") from exc + + @staticmethod + def _get_json(url: str, timeout: int) -> Dict[str, Any]: + req = request.Request( + url=url, + headers={"Accept": "application/json"}, + method="GET", + ) + + try: + with request.urlopen(req, timeout=timeout) as resp: + body = resp.read().decode("utf-8") if resp.length != 0 else "" + if not body: + return {} + try: + return json.loads(body) + except json.JSONDecodeError: + return {"raw": body} + except error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="ignore") if exc.fp else "" + raise PluginSubmitError(f"Plugin HTTP {exc.code}: {body}") from exc + except error.URLError as exc: + raise PluginSubmitError(f"Plugin request failed: {exc.reason}") from exc diff --git a/app/core/serial_batch_executor.py b/app/core/serial_batch_executor.py index bc1fc8d..c341da1 100644 --- a/app/core/serial_batch_executor.py +++ b/app/core/serial_batch_executor.py @@ -1,11 +1,12 @@ -from __future__ import annotations +from __future__ import annotations import asyncio import uuid -from typing import Optional +from typing import Any, Dict, Optional from app.core.plugin_http_client import PluginSubmitError from app.models.cad_batch import PluginResultStatus +from app.models.operation_log import OperationStatus class SerialBatchExecutor: @@ -19,6 +20,7 @@ class SerialBatchExecutor: self._queue: asyncio.Queue[str] = asyncio.Queue() self._worker_task: Optional[asyncio.Task] = None self._running = False + self._pre_batch_cleanup_done: set[tuple[str, str]] = set() async def start(self): if self._worker_task and not self._worker_task.done(): @@ -37,6 +39,7 @@ class SerialBatchExecutor: except asyncio.CancelledError: pass self._worker_task = None + self._pre_batch_cleanup_done.clear() async def enqueue(self, item_id: str): await self._queue.put(item_id) @@ -44,77 +47,797 @@ class SerialBatchExecutor: async def _worker_loop(self): while self._running: item_id = await self._queue.get() + software_id = None try: - await self._process_item(item_id) + software_id = await self._process_item(item_id) finally: self._queue.task_done() - async def _process_item(self, item_id: str): + if software_id: + between_items_delay = self._batch_manager.get_between_items_delay_sec(software_id) + if between_items_delay > 0: + await self._log_event( + operation="batch_between_items_delay", + details=f"Delay {between_items_delay}s before next queued item", + extra_data={"software_id": software_id, "delay_sec": between_items_delay}, + ) + await asyncio.sleep(between_items_delay) + + async def _process_item(self, item_id: str) -> Optional[str]: item = await self._batch_manager.get_item(item_id) if not item: - return + return None # Item may already be marked failed during routing. if item.software_id is None: - return + return None for attempt in range(item.max_retries + 1): - execution_id = str(uuid.uuid4()) - await self._batch_manager.mark_item_dispatching(item_id, execution_id, attempt) - - callback_url = self._batch_manager.get_callback_endpoint_url() - payload = { - "execution_id": execution_id, - "batch_id": item.batch_id, - "item_id": item.id, - "model_path": item.model_path, - "task_type": item.task_type, - "task_params": item.task_params, - "callback_url": callback_url, - "attempt": attempt, - } - error_message = None + current_phase = "main_task" + open_result = None + main_result = None + close_result = None + auto_open = self._batch_manager.should_auto_open_model(item.software_id, item.task_type) + auto_close = self._batch_manager.should_auto_close_model(item.software_id, item.task_type) try: - plugin_response = await self._plugin_client.submit_task( - item.software_id, - item.task_type, - payload, + current_phase = "pre_batch_cleanup" + await self._ensure_pre_batch_cleanup(item=item, item_id=item_id, attempt=attempt) + + if auto_open: + current_phase = "open_model" + open_result = await self._execute_task_step( + item=item, + item_id=item_id, + attempt=attempt, + task_type="open_model", + task_params=self._extract_open_task_params(item.task_params), + phase=current_phase, + ) + await self._sleep_inter_step(item, phase="open_model_to_main") + + current_phase = item.task_type + main_result = await self._execute_task_step( + item=item, + item_id=item_id, + attempt=attempt, + task_type=item.task_type, + task_params=item.task_params, + phase=current_phase, ) - completion_mode = self._batch_manager.get_task_completion_mode(item.software_id, item.task_type) - if completion_mode == "sync": - success_flag = bool(plugin_response.get("success")) - if success_flag: - await self._batch_manager.mark_item_succeeded(item_id, plugin_response) - return - error_message = plugin_response.get("error") or "Plugin sync task failed" - raise PluginSubmitError(error_message) + if auto_close: + await self._sleep_inter_step(item, phase="main_to_close_model") + current_phase = "close_model" + try: + close_result = await self._execute_task_step( + item=item, + item_id=item_id, + attempt=attempt, + task_type="close_model", + task_params=self._extract_close_task_params(item.task_params), + phase=current_phase, + ) + except PluginSubmitError as exc: + error_message = str(exc) + if not self._is_ignorable_close_model_error(item.software_id, error_message): + raise + close_result = {"ignored_error": error_message} + await self._log_event( + operation="batch_close_model_ignored", + details=( + f"Close model ignored item={item.id} attempt={attempt}: " + f"{error_message}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "attempt": attempt, + "software_id": item.software_id, + "error_message": error_message, + }, + ) - await self._batch_manager.mark_item_waiting_callback(item_id, execution_id, plugin_response) - - timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id) - callback_payload = await self._callback_registry.wait_for_callback(execution_id, timeout_sec) - - if callback_payload.status == PluginResultStatus.SUCCESS: - await self._batch_manager.mark_item_succeeded(item_id, callback_payload.result) - return - - error_message = callback_payload.error_message or "Plugin returned failed status" - except asyncio.TimeoutError: - error_message = f"Callback timeout for execution_id={execution_id}" + success_result = self._compose_success_result( + auto_open=auto_open, + auto_close=auto_close, + open_result=open_result, + main_result=main_result, + close_result=close_result, + ) + await self._batch_manager.mark_item_succeeded(item_id, success_result) + await self._log_event( + operation="batch_item_succeeded", + details=f"Item succeeded item={item.id} attempt={attempt}", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "attempt": attempt, + "software_id": item.software_id, + "auto_open": auto_open, + "auto_close": auto_close, + }, + ) + return item.software_id except PluginSubmitError as exc: error_message = str(exc) + await self._log_event( + operation="batch_step_failed", + details=( + f"Step failed item={item.id} phase={current_phase} attempt={attempt}: " + f"{error_message}" + ), + status=OperationStatus.FAILED, + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "attempt": attempt, + "phase": current_phase, + "error_message": error_message, + }, + ) except Exception as exc: error_message = f"Unexpected execution error: {exc}" + await self._log_event( + operation="batch_step_unexpected_error", + details=( + f"Unexpected error item={item.id} phase={current_phase} attempt={attempt}: " + f"{exc}" + ), + status=OperationStatus.FAILED, + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "attempt": attempt, + "phase": current_phase, + "error_message": str(exc), + }, + ) if attempt < item.max_retries: await self._batch_manager.mark_item_retrying(item_id, error_message, attempt + 1) backoff = self._batch_manager.get_retry_backoff_sec(item.software_id, attempt) + await self._log_event( + operation="batch_retry_scheduled", + details=( + f"Retry scheduled item={item.id} phase={current_phase} next_attempt={attempt + 1} " + f"backoff_sec={backoff}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "attempt": attempt, + "next_attempt": attempt + 1, + "phase": current_phase, + "backoff_sec": backoff, + "error_message": error_message, + }, + ) if backoff > 0: await asyncio.sleep(backoff) continue await self._batch_manager.mark_item_failed(item_id, error_message) + await self._log_event( + operation="batch_item_failed_final", + details=f"Item failed item={item.id} phase={current_phase}: {error_message}", + status=OperationStatus.FAILED, + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "attempt": attempt, + "phase": current_phase, + "error_message": error_message, + }, + ) + return item.software_id + + return item.software_id + + async def _execute_task_step( + self, + item, + item_id: str, + attempt: int, + task_type: str, + task_params: Dict[str, Any], + phase: str, + ) -> Dict[str, Any]: + execution_id = str(uuid.uuid4()) + await self._batch_manager.mark_item_dispatching(item_id, execution_id, attempt) + + completion_mode = self._batch_manager.get_task_completion_mode(item.software_id, task_type) + if phase == "open_model" and completion_mode == "submit_only": + raise PluginSubmitError( + "open_model completion_mode=submit_only is not allowed; use sync or callback" + ) + + callback_url = self._batch_manager.get_callback_endpoint_url() + payload = { + "execution_id": execution_id, + "batch_id": item.batch_id, + "item_id": item.id, + "model_path": item.model_path, + "task_type": task_type, + "task_params": task_params, + "callback_url": callback_url, + "attempt": attempt, + } + + await self._log_event( + operation="batch_step_start", + details=( + f"Step start item={item.id} phase={phase} attempt={attempt} " + f"task_type={task_type} completion_mode={completion_mode}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + "software_id": item.software_id, + "task_type": task_type, + "model_path": item.model_path, + "completion_mode": completion_mode, + }, + ) + + plugin_response = await self._plugin_client.submit_task(item.software_id, task_type, payload) + await self._log_event( + operation="batch_step_submitted", + details=f"Step submitted item={item.id} phase={phase} execution_id={execution_id}", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + "completion_mode": completion_mode, + "plugin_response": plugin_response, + }, + ) + + # Revit shell_execute is async by taskId/statusUrl. Poll plugin task status directly. + if item.software_id == "revit" and task_type == "shell_execute": + polled_result = await self._poll_revit_shell_execute_result( + item=item, + attempt=attempt, + phase=phase, + plugin_response=plugin_response if isinstance(plugin_response, dict) else {}, + execution_id=execution_id, + ) + if polled_result is not None: + return polled_result + + if completion_mode == "submit_only": + submit_result = {"completion_mode": "submit_only"} + if isinstance(plugin_response, dict): + submit_result.update(plugin_response) + else: + submit_result["raw_response"] = str(plugin_response) + + await self._log_event( + operation="batch_step_submit_only_accepted", + details=f"Submit-only accepted item={item.id} phase={phase} execution_id={execution_id}", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + }, + ) + return submit_result + + if completion_mode == "sync": + if phase == "open_model": + success_flag = self._is_successful_open_response(plugin_response) + else: + success_flag = self._is_successful_sync_response(plugin_response) + if not success_flag: + error_message = "Plugin sync task failed" + if isinstance(plugin_response, dict): + error_message = plugin_response.get("error") or error_message + raise PluginSubmitError(f"{phase} sync failed: {error_message}") + + await self._log_event( + operation="batch_step_sync_succeeded", + details=f"Sync succeeded item={item.id} phase={phase} execution_id={execution_id}", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + }, + ) + return plugin_response if isinstance(plugin_response, dict) else {"raw_response": str(plugin_response)} + + await self._batch_manager.mark_item_waiting_callback(item_id, execution_id, plugin_response) + await self._log_event( + operation="batch_step_waiting_callback", + details=f"Waiting callback item={item.id} phase={phase} execution_id={execution_id}", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + }, + ) + + timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id) + try: + callback_payload = await self._callback_registry.wait_for_callback(execution_id, timeout_sec) + except asyncio.TimeoutError as exc: + raise PluginSubmitError(f"{phase} callback timeout for execution_id={execution_id}") from exc + + if callback_payload.status != PluginResultStatus.SUCCESS: + error_message = callback_payload.error_message or "Plugin returned failed status" + raise PluginSubmitError(f"{phase} callback failed: {error_message}") + + await self._log_event( + operation="batch_step_callback_succeeded", + details=f"Callback succeeded item={item.id} phase={phase} execution_id={execution_id}", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + }, + ) + return callback_payload.result if isinstance(callback_payload.result, dict) else {"result": callback_payload.result} + + async def _poll_revit_shell_execute_result( + self, + item, + attempt: int, + phase: str, + plugin_response: Dict[str, Any], + execution_id: str, + ) -> Optional[Dict[str, Any]]: + if not hasattr(self._plugin_client, "get_task_status"): + return None + + data = plugin_response.get("data") + if not isinstance(data, dict): + data = {} + task_id = data.get("taskId") or data.get("task_id") + status_url = data.get("statusUrl") or data.get("status_url") + if not task_id and not status_url: + return None + + timeout_sec = self._batch_manager.get_callback_timeout_sec(item.software_id) + poll_interval_sec = 2.0 + deadline = asyncio.get_event_loop().time() + timeout_sec + last_status = None + + await self._log_event( + operation="batch_step_polling_started", + details=( + f"Polling started item={item.id} phase={phase} execution_id={execution_id} " + f"task_id={task_id or 'unknown'} timeout_sec={timeout_sec}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + "task_id": task_id, + "status_url": status_url, + "poll_interval_sec": poll_interval_sec, + "timeout_sec": timeout_sec, + }, + ) + + while asyncio.get_event_loop().time() < deadline: + status_response = await self._plugin_client.get_task_status( + software_id=item.software_id, + task_id=str(task_id) if task_id else None, + status_url=str(status_url) if status_url else None, + ) + status_data = status_response.get("data") if isinstance(status_response.get("data"), dict) else {} + status_value = ( + status_data.get("status") + or status_response.get("status") + or "" + ) + lowered = str(status_value).strip().lower() + + if lowered and lowered != last_status: + last_status = lowered + await self._log_event( + operation="batch_step_polling_status", + details=( + f"Polling status item={item.id} phase={phase} execution_id={execution_id} " + f"status={status_value}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + "task_id": task_id, + "status": status_value, + }, + ) + + if lowered in {"completed", "success", "succeeded", "done"}: + await self._log_event( + operation="batch_step_polling_succeeded", + details=( + f"Polling succeeded item={item.id} phase={phase} execution_id={execution_id} " + f"task_id={task_id}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "execution_id": execution_id, + "attempt": attempt, + "phase": phase, + "task_id": task_id, + }, + ) + return status_response if isinstance(status_response, dict) else {"raw_response": str(status_response)} + + if lowered in {"failed", "cancelled", "canceled", "error"}: + error_message = ( + status_data.get("errorMessage") + or status_data.get("error_message") + or status_data.get("error") + or status_response.get("message") + or "Plugin task failed" + ) + raise PluginSubmitError(f"{phase} polling failed: {error_message}") + + await asyncio.sleep(poll_interval_sec) + + raise PluginSubmitError( + f"{phase} polling timeout for task_id={task_id or 'unknown'} execution_id={execution_id}" + ) + + async def _ensure_pre_batch_cleanup(self, item, item_id: str, attempt: int): + key = (item.batch_id, item.software_id) + if key in self._pre_batch_cleanup_done: return + + if not self._batch_manager.should_run_pre_batch_cleanup(item.software_id): + self._pre_batch_cleanup_done.add(key) + return + + await self._log_event( + operation="batch_precheck_start", + details=f"Precheck start batch={item.batch_id} software={item.software_id}", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "software_id": item.software_id, + "attempt": attempt, + }, + ) + + should_close = True + if self._batch_manager.should_check_open_before_pre_batch_cleanup(item.software_id): + check_task_type = self._batch_manager.get_pre_batch_check_task_type(item.software_id) + if not check_task_type: + raise PluginSubmitError( + f"pre_batch_check_open_enabled is true but pre_batch_check_task_type is missing for {item.software_id}" + ) + + check_result = await self._execute_task_step( + item=item, + item_id=item_id, + attempt=attempt, + task_type=check_task_type, + task_params={}, + phase="pre_batch_check_open", + ) + should_close = self._is_model_open_from_check_result(check_result) + await self._log_event( + operation="batch_precheck_state_checked", + details=( + f"Precheck state checked batch={item.batch_id} software={item.software_id} " + f"should_close={should_close}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "software_id": item.software_id, + "attempt": attempt, + "should_close": should_close, + "check_result": check_result, + }, + ) + + if not should_close: + self._pre_batch_cleanup_done.add(key) + await self._log_event( + operation="batch_precheck_skip_cleanup", + details=f"Precheck skip cleanup batch={item.batch_id} software={item.software_id}", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "software_id": item.software_id, + "attempt": attempt, + }, + ) + return + + cleanup_task_type = self._batch_manager.get_pre_batch_cleanup_task_type(item.software_id) + if not cleanup_task_type: + raise PluginSubmitError( + f"pre_batch_cleanup_enabled is true but pre_batch_cleanup_task_type is missing for {item.software_id}" + ) + + cleanup_params = self._batch_manager.get_pre_batch_cleanup_task_params(item.software_id) + try: + await self._execute_task_step( + item=item, + item_id=item_id, + attempt=attempt, + task_type=cleanup_task_type, + task_params=cleanup_params, + phase=f"pre_batch_cleanup_{cleanup_task_type}", + ) + self._pre_batch_cleanup_done.add(key) + await self._log_event( + operation="batch_precheck_cleanup_succeeded", + details=( + f"Precheck cleanup succeeded batch={item.batch_id} software={item.software_id} " + f"task_type={cleanup_task_type}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "software_id": item.software_id, + "attempt": attempt, + "cleanup_task_type": cleanup_task_type, + }, + ) + except PluginSubmitError as exc: + error_message = str(exc) + if not self._is_ignorable_pre_batch_cleanup_error(item.software_id, error_message): + raise + + self._pre_batch_cleanup_done.add(key) + await self._log_event( + operation="batch_precheck_cleanup_ignored", + details=( + f"Precheck cleanup ignored batch={item.batch_id} software={item.software_id} " + f"task_type={cleanup_task_type}: {error_message}" + ), + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "software_id": item.software_id, + "attempt": attempt, + "cleanup_task_type": cleanup_task_type, + "error_message": error_message, + }, + ) + + async def _sleep_inter_step(self, item, phase: str): + delay = self._batch_manager.get_inter_step_delay_sec(item.software_id) + if delay <= 0: + return + + await self._log_event( + operation="batch_inter_step_delay", + details=f"Delay {delay}s between steps ({phase})", + extra_data={ + "batch_id": item.batch_id, + "item_id": item.id, + "software_id": item.software_id, + "phase": phase, + "delay_sec": delay, + }, + ) + await asyncio.sleep(delay) + + @staticmethod + def _compose_success_result( + auto_open: bool, + auto_close: bool, + open_result: Optional[Dict[str, Any]], + main_result: Optional[Dict[str, Any]], + close_result: Optional[Dict[str, Any]], + ) -> Dict[str, Any]: + if not auto_open and not auto_close: + return main_result or {} + + result: Dict[str, Any] = {"main_result": main_result or {}} + if auto_open: + result["open_model_result"] = open_result or {} + if auto_close: + result["close_model_result"] = close_result or {} + return result + + @staticmethod + def _extract_open_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]: + if not isinstance(task_params, dict): + return {} + + open_params = task_params.get("open_model_params") + if isinstance(open_params, dict): + return open_params + + open_params = task_params.get("open_params") + if isinstance(open_params, dict): + return open_params + + return {} + + @staticmethod + def _extract_close_task_params(task_params: Dict[str, Any]) -> Dict[str, Any]: + if not isinstance(task_params, dict): + return {} + + close_params = task_params.get("close_model_params") + if isinstance(close_params, dict): + return close_params + + close_params = task_params.get("close_params") + if isinstance(close_params, dict): + return close_params + + return {} + + def _is_ignorable_pre_batch_cleanup_error(self, software_id: str, error_message: str) -> bool: + if not error_message: + return False + + markers = self._batch_manager.get_pre_batch_cleanup_ignore_error_markers(software_id) + if not markers: + return False + + lowered = error_message.lower() + for marker in markers: + if marker.lower() in lowered: + return True + return False + + def _is_ignorable_close_model_error(self, software_id: str, error_message: str) -> bool: + if not error_message: + return False + + markers = self._batch_manager.get_close_model_ignore_error_markers(software_id) + if not markers: + return False + + lowered = error_message.lower() + for marker in markers: + if marker.lower() in lowered: + return True + return False + + @staticmethod + def _is_model_open_from_check_result(check_result: Dict[str, Any]) -> bool: + if not isinstance(check_result, dict): + return True + + explicit_flags = [ + "has_open_model", + "hasOpenModel", + "is_model_open", + "isModelOpen", + "opened", + "isOpened", + "project_opened", + "projectOpened", + ] + for key in explicit_flags: + if key in check_result: + return bool(check_result.get(key)) + + non_empty_markers = [ + "model_path", + "modelPath", + "file_path", + "filePath", + "current_model", + "currentModel", + "project_name", + "projectName", + "current_project", + "currentProject", + ] + for key in non_empty_markers: + value = check_result.get(key) + if isinstance(value, str) and value.strip(): + return True + if isinstance(value, dict) and value: + return True + + # Unknown schema: default to close for safety. + return True + + async def _log_event( + self, + operation: str, + details: str, + status: OperationStatus = OperationStatus.SUCCESS, + extra_data: Optional[dict] = None, + ): + log_method = getattr(self._batch_manager, "log_execution_event", None) + if not callable(log_method): + return + + try: + await log_method( + operation=operation, + details=details, + status=status, + extra_data=extra_data or {}, + ) + except Exception: + # Logging must not break task execution. + return + + @staticmethod + def _is_successful_sync_response(plugin_response: dict) -> bool: + if not isinstance(plugin_response, dict): + return False + + if plugin_response.get("error"): + return False + + code_value = plugin_response.get("code") + try: + if code_value is not None and int(code_value) == 202: + # 202 usually means accepted/queued, not completed. + return False + except (TypeError, ValueError): + pass + + status_value = plugin_response.get("status") + if isinstance(status_value, str) and status_value.lower() in {"accepted", "queued", "pending"}: + return False + + if "success" in plugin_response: + return bool(plugin_response.get("success")) + + if plugin_response.get("accepted") is True: + return False + + if isinstance(status_value, str) and status_value.lower() in {"success", "ok", "completed", "done"}: + return True + + try: + if code_value is not None and int(code_value) in {0, 200, 201}: + return True + except (TypeError, ValueError): + pass + + if plugin_response.get("task_id") or plugin_response.get("taskId"): + return False + + return False + + @staticmethod + def _is_successful_open_response(plugin_response: dict) -> bool: + if not isinstance(plugin_response, dict): + return False + + if plugin_response.get("error"): + return False + + if "success" in plugin_response: + return bool(plugin_response.get("success")) + + code_value = plugin_response.get("code") + try: + if code_value is not None and int(code_value) in {0, 200}: + return True + except (TypeError, ValueError): + pass + + status_value = plugin_response.get("status") + if isinstance(status_value, str) and status_value.lower() in {"success", "ok"}: + return True + + return False diff --git a/app/main.py b/app/main.py index ab37985..57ff95d 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,6 @@ -"""FastAPI entrypoint.""" +"""FastAPI entrypoint.""" + +import logging from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware @@ -10,6 +12,23 @@ from app.core.software_manager import software_manager from app.core.websocket_manager import websocket_manager + + +def _configure_app_logging(): + """Ensure app.* logs are visible in console during runtime debugging.""" + app_logger = logging.getLogger("app") + if app_logger.handlers: + return + + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") + handler.setFormatter(formatter) + app_logger.addHandler(handler) + app_logger.setLevel(logging.INFO) + app_logger.propagate = False + + +_configure_app_logging() app = FastAPI( title="CadHubManage API", description="Backend service for managing CAD software and batch processing tasks.", diff --git a/cad-batch-plan.md b/cad-batch-plan.md index a9c3726..1bed9b9 100644 --- a/cad-batch-plan.md +++ b/cad-batch-plan.md @@ -1,4 +1,4 @@ -# CAD 批处理串行执行框架(设计与开发进度) +# CAD 批处理串行执行框架(设计与开发进度) ## 1. 文档目的 本文件用于同步两类信息: @@ -70,6 +70,21 @@ - `plugins.{software_id}.max_retries` - `plugins.{software_id}.retry_backoff_sec` - `plugins.{software_id}.callback_token` +- `plugins.{software_id}.auto_open_model_before_tasks` +- `plugins.{software_id}.auto_open_include_task_types` +- `plugins.{software_id}.auto_open_exclude_task_types` +- `plugins.{software_id}.auto_close_model_after_tasks` +- `plugins.{software_id}.auto_close_include_task_types` +- `plugins.{software_id}.auto_close_exclude_task_types` +- `plugins.{software_id}.inter_step_delay_sec` +- `plugins.{software_id}.between_items_delay_sec` +- `plugins.{software_id}.pre_batch_cleanup_enabled` +- `plugins.{software_id}.pre_batch_cleanup_task_type` +- `plugins.{software_id}.pre_batch_cleanup_task_params` +- `plugins.{software_id}.pre_batch_cleanup_ignore_error_markers` +- `plugins.{software_id}.close_model_ignore_error_markers` +- `plugins.{software_id}.pre_batch_check_open_enabled` +- `plugins.{software_id}.pre_batch_check_task_type` - `plugins.{software_id}.tasks.{task_type}.path` - `plugins.{software_id}.tasks.{task_type}.body_mode` - `plugins.{software_id}.tasks.{task_type}.completion_mode` @@ -85,6 +100,7 @@ ### 4.3 completion_mode 语义 - `callback`:下发后进入 `waiting_callback`,等待插件回调判定结果 - `sync`:以接口同步返回结果直接判定成功/失败,不等待回调 +- `submit_only`:接口返回 2xx 即视为提交成功,条目直接置为成功(适合插件无回调且无任务查询接口场景) --- diff --git a/configs/software_config.yaml b/configs/software_config.yaml index e84ca5a..52ad76c 100644 --- a/configs/software_config.yaml +++ b/configs/software_config.yaml @@ -18,6 +18,13 @@ plugins: callback_timeout_sec: 60 callback_token: creo-callback-token max_retries: 1 + pre_batch_check_open_enabled: false + pre_batch_cleanup_enabled: true + pre_batch_cleanup_task_params: + force_close: true + pre_batch_cleanup_task_type: close_model + pre_batch_cleanup_ignore_error_markers: [] + close_model_ignore_error_markers: [] request_timeout_sec: 10 retry_backoff_sec: - 1 @@ -33,15 +40,22 @@ plugins: path: /api/model/open shell_analysis: body_mode: task_params_only + completion_mode: sync path: /api/analysis/shell-analysis shrinkwrap_shell: body_mode: task_params_only + completion_mode: sync path: /api/creo/shrinkwrap/shell pdms: base_url: http://localhost:9001 callback_timeout_sec: 60 callback_token: pdms-callback-token max_retries: 1 + pre_batch_check_open_enabled: false + pre_batch_cleanup_enabled: true + pre_batch_cleanup_task_type: close_project + pre_batch_cleanup_ignore_error_markers: [] + close_model_ignore_error_markers: [] request_timeout_sec: 10 retry_backoff_sec: - 1 @@ -50,6 +64,7 @@ plugins: tasks: close_project: body_mode: task_params_only + completion_mode: sync path: /api/project/close open_mdb: body_mode: project_open @@ -64,10 +79,36 @@ plugins: body_mode: task_params_only path: /api/model/simplify revit: + auto_close_exclude_task_types: + - open_model + - close_model + auto_close_include_task_types: + - shell_analyze + - shell_execute + auto_close_model_after_tasks: true + auto_open_exclude_task_types: + - open_model + - close_model + auto_open_include_task_types: + - shell_analyze + - shell_execute + auto_open_model_before_tasks: true base_url: http://localhost:9000 callback_timeout_sec: 60 callback_token: revit-callback-token + between_items_delay_sec: 2 + inter_step_delay_sec: 2 max_retries: 1 + pre_batch_check_open_enabled: false + pre_batch_cleanup_enabled: true + pre_batch_cleanup_ignore_error_markers: + - NO_DOCUMENT_OPEN + - 没有打开的文档 + pre_batch_cleanup_task_type: close_model + close_model_ignore_error_markers: + - NO_DOCUMENT_OPEN + - The active document may not be closed from the API. + - 没有打开的文档 request_timeout_sec: 10 retry_backoff_sec: - 1 @@ -78,14 +119,19 @@ plugins: body_mode: empty completion_mode: sync path: /api/close + request_timeout_sec: 120 open_model: body_mode: file_path + completion_mode: sync path: /api/open + request_timeout_sec: 120 shell_analyze: body_mode: task_params_only + completion_mode: callback path: /api/shell/analyze shell_execute: body_mode: task_params_only + completion_mode: callback path: /api/shell/execute routing: extension_to_software: diff --git a/scripts/auto_revit_callback.ps1 b/scripts/auto_revit_callback.ps1 new file mode 100644 index 0000000..44551d4 --- /dev/null +++ b/scripts/auto_revit_callback.ps1 @@ -0,0 +1,61 @@ +param( + [string]$BackendBaseUrl = "http://127.0.0.1:8000", + [string]$LogFile = "", + [string]$Token = "revit-callback-token", + [int]$LookbackLines = 400 +) + +$ErrorActionPreference = "Stop" +[Console]::OutputEncoding = [System.Text.Encoding]::UTF8 + +if (-not $LogFile) { + $today = Get-Date -Format "yyyy-MM-dd" + $LogFile = ".\logs\operation_logs\operation_$today.jsonl" +} + +if (-not (Test-Path $LogFile)) { + throw "Log file not found: $LogFile" +} + +$callbackUrl = ($BackendBaseUrl.TrimEnd("/")) + "/api/v1/plugin-callbacks/task-result" +Write-Host "Watching: $LogFile" +Write-Host "Callback: $callbackUrl" + +$sent = @{} + +while ($true) { + $lines = Get-Content -Path $LogFile -Tail $LookbackLines + foreach ($line in $lines) { + if ($line -notmatch '"operation":"batch_step_waiting_callback"') { continue } + $obj = $null + try { + $obj = $line | ConvertFrom-Json + } catch { + continue + } + + $executionId = $obj.extra_data.execution_id + if (-not $executionId) { continue } + if ($sent.ContainsKey($executionId)) { continue } + + $payload = @{ + execution_id = $executionId + software_id = "revit" + status = "success" + error_message = $null + result = @{ source = "auto_revit_callback_ps1" } + finished_at = (Get-Date).ToUniversalTime().ToString("o") + token = $Token + } | ConvertTo-Json -Depth 5 + + try { + $resp = Invoke-RestMethod -Method Post -Uri $callbackUrl -ContentType "application/json" -Body $payload + $sent[$executionId] = $true + Write-Host ("[{0}] callback sent execution_id={1} accepted={2}" -f (Get-Date -Format "HH:mm:ss"), $executionId, $resp.accepted) + } catch { + Write-Host ("[{0}] callback failed execution_id={1} error={2}" -f (Get-Date -Format "HH:mm:ss"), $executionId, $_.Exception.Message) + } + } + + Start-Sleep -Milliseconds 700 +} diff --git a/tests/test_serial_batch_executor.py b/tests/test_serial_batch_executor.py index 88f0a28..7702970 100644 --- a/tests/test_serial_batch_executor.py +++ b/tests/test_serial_batch_executor.py @@ -1,4 +1,4 @@ -import asyncio +import asyncio from copy import deepcopy from datetime import datetime @@ -27,6 +27,9 @@ class FakePluginHttpClient: behavior = payload.get("task_params", {}).get("behavior", "success") attempt = payload.get("attempt", 0) + if task_type == "close_model" and behavior == "no_document_open": + return {"success": False, "error": "NO_DOCUMENT_OPEN", "code": 409, "message": "没有打开的文档"} + if behavior == "timeout_once_then_success" and attempt == 0: # no callback; force timeout for attempt 0 return {"accepted": True} @@ -61,12 +64,24 @@ class FakePluginHttpClient: @pytest.fixture(autouse=True) def _configure_short_timeouts(): backup = deepcopy(software_config.load_config()) - software_config._config.setdefault("plugins", {}).setdefault("creo", {})["callback_timeout_sec"] = 1 - software_config._config.setdefault("plugins", {}).setdefault("creo", {})["retry_backoff_sec"] = [0, 0] - software_config._config.setdefault("plugins", {}).setdefault("creo", {})["max_retries"] = 1 - software_config._config.setdefault("plugins", {}).setdefault("revit", {})["callback_timeout_sec"] = 1 - software_config._config.setdefault("plugins", {}).setdefault("revit", {})["retry_backoff_sec"] = [0, 0] - software_config._config.setdefault("plugins", {}).setdefault("revit", {})["max_retries"] = 1 + creo_plugin = software_config._config.setdefault("plugins", {}).setdefault("creo", {}) + creo_plugin["callback_timeout_sec"] = 1 + creo_plugin["retry_backoff_sec"] = [0, 0] + creo_plugin["max_retries"] = 1 + creo_plugin["pre_batch_cleanup_enabled"] = False + + revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {}) + revit_plugin["callback_timeout_sec"] = 1 + revit_plugin["retry_backoff_sec"] = [0, 0] + revit_plugin["max_retries"] = 1 + revit_plugin["pre_batch_cleanup_enabled"] = False + revit_plugin["auto_open_model_before_tasks"] = False + revit_plugin["auto_close_model_after_tasks"] = False + revit_plugin["inter_step_delay_sec"] = 0 + revit_plugin["between_items_delay_sec"] = 0 + + pdms_plugin = software_config._config.setdefault("plugins", {}).setdefault("pdms", {}) + pdms_plugin["pre_batch_cleanup_enabled"] = False yield software_config._config = backup @@ -146,3 +161,85 @@ async def test_serial_executor_retry_after_timeout_then_success(): assert len(fake_client.calls) >= 2 finally: await manager.stop() + + +@pytest.mark.asyncio +async def test_pre_batch_cleanup_ignores_no_document_open(): + callback_registry = PluginCallbackRegistry() + fake_client = FakePluginHttpClient(callback_registry) + router = CadTaskRouter({".rvt": "revit"}) + + revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {}) + revit_plugin["pre_batch_cleanup_enabled"] = True + revit_plugin["pre_batch_cleanup_task_type"] = "close_model" + revit_plugin["pre_batch_cleanup_task_params"] = {"behavior": "no_document_open"} + revit_plugin["pre_batch_cleanup_ignore_error_markers"] = ["NO_DOCUMENT_OPEN", "没有打开的文档"] + revit_plugin["auto_open_model_before_tasks"] = False + revit_plugin["auto_close_model_after_tasks"] = False + revit_plugin.setdefault("tasks", {}).setdefault("close_model", {})["completion_mode"] = "sync" + revit_plugin.setdefault("tasks", {}).setdefault("shell_execute", {})["completion_mode"] = "submit_only" + + manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry) + await manager.start() + + request = BatchSubmitRequest( + items=[BatchSubmitItem(model_path="a.rvt", task_type="shell_execute", task_params={})], + ) + + try: + batch = await manager.create_batch(request, submitter_id="tester") + final_batch = await _wait_batch_terminal(manager, batch.id) + items = await manager.get_batch_items(batch.id) + + assert final_batch.status == BatchStatus.COMPLETED + assert items[0].status.value == "succeeded" + assert len(fake_client.calls) == 2 + assert fake_client.calls[0]["task_type"] == "close_model" + assert fake_client.calls[1]["task_type"] == "shell_execute" + finally: + await manager.stop() + + +@pytest.mark.asyncio +async def test_auto_close_ignores_no_document_open(): + callback_registry = PluginCallbackRegistry() + fake_client = FakePluginHttpClient(callback_registry) + router = CadTaskRouter({".rvt": "revit"}) + + revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {}) + revit_plugin["pre_batch_cleanup_enabled"] = False + revit_plugin["auto_open_model_before_tasks"] = False + revit_plugin["auto_close_model_after_tasks"] = True + revit_plugin["auto_close_include_task_types"] = ["shell_execute"] + revit_plugin["close_model_ignore_error_markers"] = [ + "NO_DOCUMENT_OPEN", + "The active document may not be closed from the API.", + ] + revit_plugin.setdefault("tasks", {}).setdefault("close_model", {})["completion_mode"] = "sync" + revit_plugin.setdefault("tasks", {}).setdefault("shell_execute", {})["completion_mode"] = "submit_only" + + manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry) + await manager.start() + + request = BatchSubmitRequest( + items=[ + BatchSubmitItem( + model_path="a.rvt", + task_type="shell_execute", + task_params={"close_model_params": {"behavior": "no_document_open"}}, + ) + ], + ) + + try: + batch = await manager.create_batch(request, submitter_id="tester") + final_batch = await _wait_batch_terminal(manager, batch.id) + items = await manager.get_batch_items(batch.id) + + assert final_batch.status == BatchStatus.COMPLETED + assert items[0].status.value == "succeeded" + assert len(fake_client.calls) == 2 + assert fake_client.calls[0]["task_type"] == "shell_execute" + assert fake_client.calls[1]["task_type"] == "close_model" + finally: + await manager.stop() diff --git a/命令.md b/命令.md index be62f59..e3edb3d 100644 --- a/命令.md +++ b/命令.md @@ -1,6 +1,8 @@ ### 环境 - conda activate websocket311 -备注:要先执行conda deactivate和deactivate退出虚虚拟环境 +备注: +要先执行conda deactivate +deactivate,退出虚虚拟环境 ### 打包 - python scripts\build_exe.py --clean