- Added configuration for file storage and software plugins in `software_config.yaml`. - Created core components for batch processing including `CadBatchManager`, `CadTaskRouter`, and `SerialBatchExecutor`. - Implemented plugin callback handling with `PluginCallbackRegistry` and HTTP client for task submission. - Developed API endpoint for receiving plugin callbacks in `plugin_callbacks.py`. - Enhanced data models for batch processing including `BatchJob`, `BatchItem`, and callback payloads. - Introduced WebSocket support for real-time updates on batch processing status. - Added comprehensive tests for routing, callback API, and serial executor behavior. - Documented the implementation plan and core execution rules in `cad-batch-plan.md`.
65 lines
2.4 KiB
Python
65 lines
2.4 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Dict
|
|
|
|
from app.models.cad_batch import PluginCallbackPayload
|
|
|
|
|
|
class PluginCallbackRegistry:
|
|
"""Track callback waiters by execution_id and resolve futures on callback."""
|
|
|
|
def __init__(self):
|
|
self._waiters: Dict[str, asyncio.Future] = {}
|
|
self._pending_callbacks: Dict[str, PluginCallbackPayload] = {}
|
|
self._completed_callbacks: Dict[str, PluginCallbackPayload] = {}
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def wait_for_callback(self, execution_id: str, timeout_sec: int) -> PluginCallbackPayload:
|
|
async with self._lock:
|
|
if execution_id in self._completed_callbacks:
|
|
return self._completed_callbacks[execution_id]
|
|
|
|
if execution_id in self._pending_callbacks:
|
|
payload = self._pending_callbacks.pop(execution_id)
|
|
self._completed_callbacks[execution_id] = payload
|
|
return payload
|
|
|
|
future = asyncio.get_running_loop().create_future()
|
|
self._waiters[execution_id] = future
|
|
|
|
try:
|
|
return await asyncio.wait_for(future, timeout=timeout_sec)
|
|
finally:
|
|
async with self._lock:
|
|
self._waiters.pop(execution_id, None)
|
|
|
|
async def handle_callback(self, payload: PluginCallbackPayload) -> bool:
|
|
"""Return True when callback is accepted first time, False on duplicate."""
|
|
async with self._lock:
|
|
if payload.execution_id in self._completed_callbacks:
|
|
return False
|
|
|
|
waiter = self._waiters.get(payload.execution_id)
|
|
if waiter and not waiter.done():
|
|
waiter.set_result(payload)
|
|
self._completed_callbacks[payload.execution_id] = payload
|
|
return True
|
|
|
|
# Callback arrived before waiter registration.
|
|
if payload.execution_id not in self._pending_callbacks:
|
|
self._pending_callbacks[payload.execution_id] = payload
|
|
return True
|
|
|
|
return False
|
|
|
|
async def clear(self):
|
|
async with self._lock:
|
|
for waiter in self._waiters.values():
|
|
if not waiter.done():
|
|
waiter.cancel()
|
|
self._waiters.clear()
|
|
self._pending_callbacks.clear()
|
|
self._completed_callbacks.clear()
|