CadHubManage/tests/test_serial_batch_executor.py

324 lines
13 KiB
Python

import asyncio
from copy import deepcopy
from datetime import datetime
import pytest
from app.config import software_config
from app.core.cad_batch_manager import CadBatchManager
from app.core.cad_task_router import CadTaskRouter
from app.core.plugin_callback_registry import PluginCallbackRegistry
from app.models.cad_batch import (
BatchStatus,
BatchSubmitItem,
BatchSubmitRequest,
PluginCallbackPayload,
PluginResultStatus,
)
class FakePluginHttpClient:
def __init__(self, callback_registry: PluginCallbackRegistry):
self._registry = callback_registry
self.calls = []
async def submit_task(self, software_id: str, task_type: str, payload: dict) -> dict:
self.calls.append(payload)
behavior = payload.get("task_params", {}).get("behavior", "success")
attempt = payload.get("attempt", 0)
if task_type == "open_model":
return {"success": True, "code": 200}
if task_type == "close_model" and behavior == "no_current_model_loaded":
return {"success": False, "error": "No current model loaded", "code": 500}
if task_type == "close_model" and behavior == "no_document_open":
return {"success": False, "error": "NO_DOCUMENT_OPEN", "code": 409, "message": "没有打开的文档"}
if behavior == "timeout_once_then_success" and attempt == 0:
# no callback; force timeout for attempt 0
return {"accepted": True}
if behavior == "always_fail":
status = PluginResultStatus.FAILED
result = {}
error_message = "simulated plugin failure"
else:
status = PluginResultStatus.SUCCESS
result = {"exported": True, "model": payload.get("model_path")}
error_message = None
async def _emit_callback():
await asyncio.sleep(0.01)
await self._registry.handle_callback(
PluginCallbackPayload(
execution_id=payload["execution_id"],
software_id=software_id,
status=status,
error_message=error_message,
result=result,
finished_at=datetime.now(),
token="ignored-in-registry",
)
)
asyncio.create_task(_emit_callback())
return {"accepted": True}
@pytest.fixture(autouse=True)
def _configure_short_timeouts():
backup = deepcopy(software_config.load_config())
creo_plugin = software_config._config.setdefault("plugins", {}).setdefault("creo", {})
creo_plugin["callback_timeout_sec"] = 1
creo_plugin["retry_backoff_sec"] = [0, 0]
creo_plugin["max_retries"] = 1
creo_plugin["pre_batch_cleanup_enabled"] = False
revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {})
revit_plugin["callback_timeout_sec"] = 1
revit_plugin["retry_backoff_sec"] = [0, 0]
revit_plugin["max_retries"] = 1
revit_plugin["pre_batch_cleanup_enabled"] = False
revit_plugin["auto_open_model_before_tasks"] = False
revit_plugin["auto_close_model_after_tasks"] = False
revit_plugin["inter_step_delay_sec"] = 0
revit_plugin["between_items_delay_sec"] = 0
pdms_plugin = software_config._config.setdefault("plugins", {}).setdefault("pdms", {})
pdms_plugin["pre_batch_cleanup_enabled"] = False
yield
software_config._config = backup
async def _wait_batch_terminal(manager: CadBatchManager, batch_id: str, timeout_sec: float = 8.0):
start = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start < timeout_sec:
batch = await manager.get_batch(batch_id)
if batch and batch.status in {BatchStatus.COMPLETED, BatchStatus.COMPLETED_WITH_ERRORS, BatchStatus.FAILED}:
return batch
await asyncio.sleep(0.02)
raise TimeoutError(f"batch {batch_id} did not finish within {timeout_sec}s")
@pytest.mark.asyncio
async def test_serial_executor_fifo_and_continue_after_failure():
callback_registry = PluginCallbackRegistry()
fake_client = FakePluginHttpClient(callback_registry)
router = CadTaskRouter({".prt": "creo", ".rvt": "revit"})
manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry)
await manager.start()
request = BatchSubmitRequest(
items=[
BatchSubmitItem(model_path="a.prt", task_type="export", task_params={"behavior": "success"}),
BatchSubmitItem(model_path="b.prt", task_type="export", task_params={"behavior": "always_fail"}),
BatchSubmitItem(model_path="c.rvt", task_type="export", task_params={"behavior": "success"}),
],
batch_name="fifo-case",
)
try:
batch = await manager.create_batch(request, submitter_id="tester")
final_batch = await _wait_batch_terminal(manager, batch.id)
items = await manager.get_batch_items(batch.id)
assert final_batch.status == BatchStatus.COMPLETED_WITH_ERRORS
assert [item.status.value for item in items] == ["succeeded", "failed", "succeeded"]
call_models = [call["model_path"] for call in fake_client.calls]
assert call_models[0] == "a.prt"
assert call_models[-1] == "c.rvt"
assert max(i for i, path in enumerate(call_models) if path == "b.prt") < min(
i for i, path in enumerate(call_models) if path == "c.rvt"
)
finally:
await manager.stop()
@pytest.mark.asyncio
async def test_serial_executor_retry_after_timeout_then_success():
callback_registry = PluginCallbackRegistry()
fake_client = FakePluginHttpClient(callback_registry)
router = CadTaskRouter({".prt": "creo"})
manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry)
await manager.start()
request = BatchSubmitRequest(
items=[
BatchSubmitItem(
model_path="timeout_once.prt",
task_type="export",
task_params={"behavior": "timeout_once_then_success"},
)
]
)
try:
batch = await manager.create_batch(request, submitter_id="tester")
final_batch = await _wait_batch_terminal(manager, batch.id)
items = await manager.get_batch_items(batch.id)
assert final_batch.status == BatchStatus.COMPLETED
assert items[0].status.value == "succeeded"
assert len(fake_client.calls) >= 2
finally:
await manager.stop()
@pytest.mark.asyncio
async def test_pre_batch_cleanup_ignores_no_document_open():
callback_registry = PluginCallbackRegistry()
fake_client = FakePluginHttpClient(callback_registry)
router = CadTaskRouter({".rvt": "revit"})
revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {})
revit_plugin["pre_batch_cleanup_enabled"] = True
revit_plugin["pre_batch_cleanup_task_type"] = "close_model"
revit_plugin["pre_batch_cleanup_task_params"] = {"behavior": "no_document_open"}
revit_plugin["pre_batch_cleanup_ignore_error_markers"] = ["NO_DOCUMENT_OPEN", "没有打开的文档"]
revit_plugin["auto_open_model_before_tasks"] = False
revit_plugin["auto_close_model_after_tasks"] = False
revit_plugin.setdefault("tasks", {}).setdefault("close_model", {})["completion_mode"] = "sync"
revit_plugin.setdefault("tasks", {}).setdefault("shell_execute", {})["completion_mode"] = "submit_only"
manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry)
await manager.start()
request = BatchSubmitRequest(
items=[BatchSubmitItem(model_path="a.rvt", task_type="shell_execute", task_params={})],
)
try:
batch = await manager.create_batch(request, submitter_id="tester")
final_batch = await _wait_batch_terminal(manager, batch.id)
items = await manager.get_batch_items(batch.id)
assert final_batch.status == BatchStatus.COMPLETED
assert items[0].status.value == "succeeded"
assert len(fake_client.calls) == 2
assert fake_client.calls[0]["task_type"] == "close_model"
assert fake_client.calls[1]["task_type"] == "shell_execute"
finally:
await manager.stop()
@pytest.mark.asyncio
async def test_creo_pre_batch_cleanup_ignores_no_current_model_loaded():
callback_registry = PluginCallbackRegistry()
fake_client = FakePluginHttpClient(callback_registry)
router = CadTaskRouter({".asm": "creo"})
creo_plugin = software_config._config.setdefault("plugins", {}).setdefault("creo", {})
creo_plugin["pre_batch_cleanup_enabled"] = True
creo_plugin["pre_batch_cleanup_task_type"] = "close_model"
creo_plugin["pre_batch_cleanup_task_params"] = {"behavior": "no_current_model_loaded"}
creo_plugin["pre_batch_cleanup_ignore_error_markers"] = ["No current model loaded"]
creo_plugin["auto_open_model_before_tasks"] = False
creo_plugin["auto_close_model_after_tasks"] = False
creo_plugin.setdefault("tasks", {}).setdefault("close_model", {})["completion_mode"] = "sync"
creo_plugin.setdefault("tasks", {}).setdefault("shrinkwrap_shell", {})["completion_mode"] = "submit_only"
manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry)
await manager.start()
request = BatchSubmitRequest(
items=[BatchSubmitItem(model_path="a.asm", task_type="shrinkwrap_shell", task_params={})],
)
try:
batch = await manager.create_batch(request, submitter_id="tester")
final_batch = await _wait_batch_terminal(manager, batch.id)
items = await manager.get_batch_items(batch.id)
assert final_batch.status == BatchStatus.COMPLETED
assert items[0].status.value == "succeeded"
assert len(fake_client.calls) == 2
assert fake_client.calls[0]["task_type"] == "close_model"
assert fake_client.calls[1]["task_type"] == "shrinkwrap_shell"
finally:
await manager.stop()
@pytest.mark.asyncio
async def test_creo_auto_open_runs_before_creo_shrinkwrap():
callback_registry = PluginCallbackRegistry()
fake_client = FakePluginHttpClient(callback_registry)
router = CadTaskRouter({".asm": "creo"})
creo_plugin = software_config._config.setdefault("plugins", {}).setdefault("creo", {})
creo_plugin["pre_batch_cleanup_enabled"] = False
creo_plugin["auto_open_model_before_tasks"] = True
creo_plugin["auto_open_include_task_types"] = ["creo_shrinkwrap"]
creo_plugin["auto_close_model_after_tasks"] = False
creo_plugin.setdefault("tasks", {}).setdefault("open_model", {})["completion_mode"] = "sync"
creo_plugin.setdefault("tasks", {}).setdefault("creo_shrinkwrap", {})["completion_mode"] = "submit_only"
manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry)
await manager.start()
request = BatchSubmitRequest(
items=[BatchSubmitItem(model_path="a.asm", task_type="creo_shrinkwrap", task_params={})],
)
try:
batch = await manager.create_batch(request, submitter_id="tester")
final_batch = await _wait_batch_terminal(manager, batch.id)
items = await manager.get_batch_items(batch.id)
assert final_batch.status == BatchStatus.COMPLETED
assert items[0].status.value == "succeeded"
assert len(fake_client.calls) == 2
assert fake_client.calls[0]["task_type"] == "open_model"
assert fake_client.calls[1]["task_type"] == "creo_shrinkwrap"
finally:
await manager.stop()
@pytest.mark.asyncio
async def test_auto_close_ignores_no_document_open():
callback_registry = PluginCallbackRegistry()
fake_client = FakePluginHttpClient(callback_registry)
router = CadTaskRouter({".rvt": "revit"})
revit_plugin = software_config._config.setdefault("plugins", {}).setdefault("revit", {})
revit_plugin["pre_batch_cleanup_enabled"] = False
revit_plugin["auto_open_model_before_tasks"] = False
revit_plugin["auto_close_model_after_tasks"] = True
revit_plugin["auto_close_include_task_types"] = ["shell_execute"]
revit_plugin["close_model_ignore_error_markers"] = [
"NO_DOCUMENT_OPEN",
"The active document may not be closed from the API.",
]
revit_plugin.setdefault("tasks", {}).setdefault("close_model", {})["completion_mode"] = "sync"
revit_plugin.setdefault("tasks", {}).setdefault("shell_execute", {})["completion_mode"] = "submit_only"
manager = CadBatchManager(task_router=router, plugin_client=fake_client, callback_registry=callback_registry)
await manager.start()
request = BatchSubmitRequest(
items=[
BatchSubmitItem(
model_path="a.rvt",
task_type="shell_execute",
task_params={"close_model_params": {"behavior": "no_document_open"}},
)
],
)
try:
batch = await manager.create_batch(request, submitter_id="tester")
final_batch = await _wait_batch_terminal(manager, batch.id)
items = await manager.get_batch_items(batch.id)
assert final_batch.status == BatchStatus.COMPLETED
assert items[0].status.value == "succeeded"
assert len(fake_client.calls) == 2
assert fake_client.calls[0]["task_type"] == "shell_execute"
assert fake_client.calls[1]["task_type"] == "close_model"
finally:
await manager.stop()