162 lines
6.5 KiB
Python
162 lines
6.5 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
from typing import Any, Dict, Optional
|
|
from urllib import error, parse, request
|
|
|
|
from app.config import software_config
|
|
|
|
|
|
class PluginSubmitError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class PluginHttpClient:
|
|
"""HTTP client for submitting tasks to CAD plugins."""
|
|
|
|
def __init__(self, config_provider=software_config):
|
|
self._config_provider = config_provider
|
|
|
|
async def submit_task(self, software_id: str, task_type: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
plugin_config = self._config_provider.get_plugin_config(software_id)
|
|
if not plugin_config:
|
|
raise PluginSubmitError(f"Plugin config for software '{software_id}' not found")
|
|
|
|
base_url = plugin_config.get("base_url")
|
|
submit_path = plugin_config.get("submit_path", "/api/plugin/tasks")
|
|
task_config = self._config_provider.get_plugin_task_config(software_id, task_type) or {}
|
|
timeout = int(task_config.get("request_timeout_sec", plugin_config.get("request_timeout_sec", 10)))
|
|
submit_path = task_config.get("path", submit_path)
|
|
body_mode = task_config.get("body_mode", "passthrough")
|
|
|
|
if not base_url:
|
|
raise PluginSubmitError(f"Plugin base_url for software '{software_id}' is missing")
|
|
|
|
url = parse.urljoin(base_url.rstrip("/") + "/", submit_path.lstrip("/"))
|
|
request_payload = self._build_payload(payload=payload, body_mode=body_mode)
|
|
if software_id == "creo" and task_type == "open_model":
|
|
request_payload.setdefault("software_type", "creo")
|
|
# Creo plugin expects snake_case file path keys.
|
|
raw_path = request_payload.get("file_path") or request_payload.get("filePath")
|
|
if isinstance(raw_path, str) and raw_path.strip():
|
|
request_payload["file_path"] = raw_path
|
|
request_payload.setdefault("dirname", os.path.dirname(raw_path))
|
|
request_payload.setdefault("filename", os.path.basename(raw_path))
|
|
|
|
try:
|
|
return await asyncio.to_thread(self._post_json, url, request_payload, timeout)
|
|
except PluginSubmitError:
|
|
raise
|
|
except Exception as exc:
|
|
raise PluginSubmitError(str(exc)) from exc
|
|
|
|
async def get_task_status(
|
|
self,
|
|
software_id: str,
|
|
task_id: Optional[str] = None,
|
|
status_url: Optional[str] = None,
|
|
timeout_sec: int = 10,
|
|
) -> Dict[str, Any]:
|
|
plugin_config = self._config_provider.get_plugin_config(software_id)
|
|
if not plugin_config:
|
|
raise PluginSubmitError(f"Plugin config for software '{software_id}' not found")
|
|
|
|
base_url = plugin_config.get("base_url")
|
|
if not base_url:
|
|
raise PluginSubmitError(f"Plugin base_url for software '{software_id}' is missing")
|
|
|
|
if status_url:
|
|
url = parse.urljoin(base_url.rstrip("/") + "/", str(status_url).lstrip("/"))
|
|
elif task_id:
|
|
url = parse.urljoin(base_url.rstrip("/") + "/", f"api/task/{task_id}")
|
|
else:
|
|
raise PluginSubmitError("Either task_id or status_url must be provided for polling")
|
|
|
|
try:
|
|
return await asyncio.to_thread(self._get_json, url, int(timeout_sec))
|
|
except PluginSubmitError:
|
|
raise
|
|
except Exception as exc:
|
|
raise PluginSubmitError(str(exc)) from exc
|
|
|
|
@staticmethod
|
|
def _build_payload(payload: Dict[str, Any], body_mode: str) -> Dict[str, Any]:
|
|
task_params = payload.get("task_params", {}) if isinstance(payload.get("task_params"), dict) else {}
|
|
model_path = payload.get("model_path")
|
|
callback = {
|
|
"execution_id": payload.get("execution_id"),
|
|
"batch_id": payload.get("batch_id"),
|
|
"item_id": payload.get("item_id"),
|
|
"callback_url": payload.get("callback_url"),
|
|
"attempt": payload.get("attempt"),
|
|
}
|
|
|
|
if body_mode == "file_path":
|
|
return {"filePath": model_path, **task_params, **callback}
|
|
if body_mode == "task_params_only":
|
|
return {**task_params, **callback}
|
|
if body_mode == "project_open":
|
|
return {**task_params, **callback}
|
|
if body_mode == "creo_close_model":
|
|
return {
|
|
"software_type": "creo",
|
|
"force_close": task_params.get("force_close") is True,
|
|
}
|
|
if body_mode == "empty":
|
|
return {}
|
|
if body_mode == "passthrough":
|
|
return payload
|
|
|
|
# Unknown modes fallback to passthrough for compatibility.
|
|
return payload
|
|
|
|
@staticmethod
|
|
def _post_json(url: str, payload: Dict[str, Any], timeout: int) -> Dict[str, Any]:
|
|
raw = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
req = request.Request(
|
|
url=url,
|
|
data=raw,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
|
|
try:
|
|
with request.urlopen(req, timeout=timeout) as resp:
|
|
body = resp.read().decode("utf-8") if resp.length != 0 else ""
|
|
if not body:
|
|
return {}
|
|
try:
|
|
return json.loads(body)
|
|
except json.JSONDecodeError:
|
|
return {"raw": body}
|
|
except error.HTTPError as exc:
|
|
body = exc.read().decode("utf-8", errors="ignore") if exc.fp else ""
|
|
raise PluginSubmitError(f"Plugin HTTP {exc.code}: {body}") from exc
|
|
except error.URLError as exc:
|
|
raise PluginSubmitError(f"Plugin request failed: {exc.reason}") from exc
|
|
|
|
@staticmethod
|
|
def _get_json(url: str, timeout: int) -> Dict[str, Any]:
|
|
req = request.Request(
|
|
url=url,
|
|
headers={"Accept": "application/json"},
|
|
method="GET",
|
|
)
|
|
|
|
try:
|
|
with request.urlopen(req, timeout=timeout) as resp:
|
|
body = resp.read().decode("utf-8") if resp.length != 0 else ""
|
|
if not body:
|
|
return {}
|
|
try:
|
|
return json.loads(body)
|
|
except json.JSONDecodeError:
|
|
return {"raw": body}
|
|
except error.HTTPError as exc:
|
|
body = exc.read().decode("utf-8", errors="ignore") if exc.fp else ""
|
|
raise PluginSubmitError(f"Plugin HTTP {exc.code}: {body}") from exc
|
|
except error.URLError as exc:
|
|
raise PluginSubmitError(f"Plugin request failed: {exc.reason}") from exc
|