from __future__ import annotations import asyncio import json import os from typing import Any, Dict, Optional from urllib import error, parse, request from app.config import software_config class PluginSubmitError(RuntimeError): pass class PluginHttpClient: """HTTP client for submitting tasks to CAD plugins.""" def __init__(self, config_provider=software_config): self._config_provider = config_provider async def submit_task(self, software_id: str, task_type: str, payload: Dict[str, Any]) -> Dict[str, Any]: plugin_config = self._config_provider.get_plugin_config(software_id) if not plugin_config: raise PluginSubmitError(f"Plugin config for software '{software_id}' not found") base_url = plugin_config.get("base_url") submit_path = plugin_config.get("submit_path", "/api/plugin/tasks") task_config = self._config_provider.get_plugin_task_config(software_id, task_type) or {} timeout = int(task_config.get("request_timeout_sec", plugin_config.get("request_timeout_sec", 10))) submit_path = task_config.get("path", submit_path) body_mode = task_config.get("body_mode", "passthrough") if not base_url: raise PluginSubmitError(f"Plugin base_url for software '{software_id}' is missing") url = parse.urljoin(base_url.rstrip("/") + "/", submit_path.lstrip("/")) request_payload = self._build_payload(payload=payload, body_mode=body_mode) if software_id == "creo" and task_type == "open_model": request_payload.setdefault("software_type", "creo") # Creo plugin expects snake_case file path keys. raw_path = request_payload.get("file_path") or request_payload.get("filePath") if isinstance(raw_path, str) and raw_path.strip(): request_payload["file_path"] = raw_path request_payload.setdefault("dirname", os.path.dirname(raw_path)) request_payload.setdefault("filename", os.path.basename(raw_path)) try: return await asyncio.to_thread(self._post_json, url, request_payload, timeout) except PluginSubmitError: raise except Exception as exc: raise PluginSubmitError(str(exc)) from exc async def get_task_status( self, software_id: str, task_id: Optional[str] = None, status_url: Optional[str] = None, timeout_sec: int = 10, ) -> Dict[str, Any]: plugin_config = self._config_provider.get_plugin_config(software_id) if not plugin_config: raise PluginSubmitError(f"Plugin config for software '{software_id}' not found") base_url = plugin_config.get("base_url") if not base_url: raise PluginSubmitError(f"Plugin base_url for software '{software_id}' is missing") if status_url: url = parse.urljoin(base_url.rstrip("/") + "/", str(status_url).lstrip("/")) elif task_id: url = parse.urljoin(base_url.rstrip("/") + "/", f"api/task/{task_id}") else: raise PluginSubmitError("Either task_id or status_url must be provided for polling") try: return await asyncio.to_thread(self._get_json, url, int(timeout_sec)) except PluginSubmitError: raise except Exception as exc: raise PluginSubmitError(str(exc)) from exc @staticmethod def _build_payload(payload: Dict[str, Any], body_mode: str) -> Dict[str, Any]: task_params = payload.get("task_params", {}) if isinstance(payload.get("task_params"), dict) else {} model_path = payload.get("model_path") callback = { "execution_id": payload.get("execution_id"), "batch_id": payload.get("batch_id"), "item_id": payload.get("item_id"), "callback_url": payload.get("callback_url"), "attempt": payload.get("attempt"), } if body_mode == "file_path": return {"filePath": model_path, **task_params, **callback} if body_mode == "task_params_only": return {**task_params, **callback} if body_mode == "project_open": return {**task_params, **callback} if body_mode == "creo_close_model": return { "software_type": "creo", "force_close": task_params.get("force_close") is True, } if body_mode == "empty": return {} if body_mode == "passthrough": return payload # Unknown modes fallback to passthrough for compatibility. return payload @staticmethod def _post_json(url: str, payload: Dict[str, Any], timeout: int) -> Dict[str, Any]: raw = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = request.Request( url=url, data=raw, headers={"Content-Type": "application/json"}, method="POST", ) try: with request.urlopen(req, timeout=timeout) as resp: body = resp.read().decode("utf-8") if resp.length != 0 else "" if not body: return {} try: return json.loads(body) except json.JSONDecodeError: return {"raw": body} except error.HTTPError as exc: body = exc.read().decode("utf-8", errors="ignore") if exc.fp else "" raise PluginSubmitError(f"Plugin HTTP {exc.code}: {body}") from exc except error.URLError as exc: raise PluginSubmitError(f"Plugin request failed: {exc.reason}") from exc @staticmethod def _get_json(url: str, timeout: int) -> Dict[str, Any]: req = request.Request( url=url, headers={"Accept": "application/json"}, method="GET", ) try: with request.urlopen(req, timeout=timeout) as resp: body = resp.read().decode("utf-8") if resp.length != 0 else "" if not body: return {} try: return json.loads(body) except json.JSONDecodeError: return {"raw": body} except error.HTTPError as exc: body = exc.read().decode("utf-8", errors="ignore") if exc.fp else "" raise PluginSubmitError(f"Plugin HTTP {exc.code}: {body}") from exc except error.URLError as exc: raise PluginSubmitError(f"Plugin request failed: {exc.reason}") from exc