CadHubManage/app/api/v1/websocket.py
sladro 08623bf4d6 feat: Enhance SerialBatchExecutor with pre-batch cleanup and task execution improvements
- Added pre-batch cleanup functionality to SerialBatchExecutor, allowing for cleanup tasks before processing items.
- Introduced new task execution phases and improved error handling for task submissions.
- Implemented inter-step delays and between-items delays for better task management.
- Updated logging to capture detailed events during batch processing.
- Enhanced configuration options for plugins in software_config.yaml to support new features.
- Added tests for pre-batch cleanup and auto-close scenarios to ensure robust handling of edge cases.
- Created a PowerShell script for automated callback handling from Revit.
2026-03-03 16:13:19 +08:00

1191 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WebSocket API路由
提供WebSocket连接端点
"""
import asyncio
import json
import logging
import uuid
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query, HTTPException
from app.core.websocket_manager import websocket_manager, MessageType
logger = logging.getLogger(__name__)
# WebSocket消息类型常量
class WSMessageType:
PING = "ping"
GET_STATUS = "get_status"
GET_SOFTWARE_LIST = "get_software_list"
START_SOFTWARE = "start_software"
STOP_SOFTWARE = "stop_software"
RESTART_SOFTWARE = "restart_software"
LOG_OPERATION = "log_operation"
# 新增日志查询相关消息类型
QUERY_LOGS = "query_logs"
GET_LOG_BY_ID = "get_log_by_id"
GET_LOG_STATS = "get_log_stats"
CLEANUP_LOGS = "cleanup_logs"
GET_OPERATION_TYPES = "get_operation_types"
# 文件管理相关消息类型
GET_FILE_LIST = "get_file_list"
DOWNLOAD_FILE = "download_file"
DOWNLOAD_BATCH = "download_batch"
# 文件重命名相关消息类型
GET_RENAME_STRATEGIES = "get_rename_strategies"
PREVIEW_RENAME = "preview_rename"
RENAME_FILES = "rename_files"
# 配置管理相关消息类型
SET_BASE_PATH = "set_base_path"
SET_FILE_EXTENSIONS = "set_file_extensions"
GET_FILE_CONFIG = "get_file_config"
CONVERT_IFC_TO_STP = "convert_ifc_to_stp"
CONVERT_IFC_TO_STP_BATCH = "convert_ifc_to_stp_batch"
SUBMIT_BATCH_TASKS = "submit_batch_tasks"
GET_BATCH_STATUS = "get_batch_status"
GET_BATCH_ITEM_STATUS = "get_batch_item_status"
router = APIRouter()
@router.websocket("/connect")
async def websocket_endpoint(
websocket: WebSocket,
client_id: str = Query(None, description="客户端ID如果不提供将自动生成"),
user_id: str = Query(None, description="用户ID用于用户认证")
):
"""
WebSocket连接端点
连接参数:
- client_id: 客户端唯一标识符,如果不提供将自动生成
- user_id: 用户ID用于消息推送和权限控制
连接URL示例:
ws://localhost:8000/api/v1/ws/connect?client_id=client123&user_id=user456
"""
# 如果没有提供client_id自动生成一个
if not client_id:
client_id = str(uuid.uuid4())
try:
# 建立WebSocket连接
await websocket_manager.connect(websocket, client_id, user_id)
# 发送欢迎消息
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": f"欢迎连接客户端ID: {client_id}",
"client_id": client_id,
"user_id": user_id,
"timestamp": websocket_manager._get_timestamp()
}, client_id)
# 保持连接并处理消息
while True:
try:
# 接收客户端消息
data = await websocket.receive_text()
# 解析JSON消息
try:
message = json.loads(data)
await handle_client_message(message, client_id, user_id)
except json.JSONDecodeError:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "消息格式错误请发送有效的JSON",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except WebSocketDisconnect:
logger.info(f"客户端 {client_id} 主动断开连接")
break
except Exception as e:
logger.error(f"处理客户端 {client_id} 消息时发生错误: {e}")
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"服务器处理消息时发生错误: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
break
except Exception as e:
logger.error(f"WebSocket连接发生错误: {e}")
finally:
# 清理连接
websocket_manager.disconnect(client_id)
async def handle_client_message(message: dict, client_id: str, user_id: str):
"""
处理客户端发送的消息
支持的消息类型:
- ping: 心跳检测
- get_status: 获取服务状态
- get_software_list: 获取软件列表
- start_software: 启动软件
- stop_software: 停止软件
- restart_software: 重启软件
- log_operation: 记录用户操作日志
- query_logs: 查询操作日志
- get_log_by_id: 根据ID获取日志
- get_log_stats: 获取日志统计信息
- cleanup_logs: 清理过期日志
- get_operation_types: 获取操作类型列表
- get_file_list: 获取CAD文件列表
- download_file: 获取单个文件下载URL
- download_batch: 获取批量文件下载URL
- get_rename_strategies: 获取可用的重命名策略列表
- preview_rename: 预览重命名结果
- rename_files: 执行批量重命名
- get_file_config: 获取文件配置(基础路径和扩展名)
- set_base_path: 设置CAD文件基础路径
- set_file_extensions: 设置文件扩展名配置
- convert_ifc_to_stp: IFC 转 STP单文件
- convert_ifc_to_stp_batch: IFC 转 STP批量
"""
from app.core.software_manager import software_manager
from app.core.log_manager import log_manager
from app.models.operation_log import ActionType, OperationStatus
message_type = message.get("type")
logger.info(
"WS recv: client_id=%s user_id=%s type=%s keys=%s",
client_id,
user_id,
message_type,
sorted(list(message.keys())),
)
if message_type == WSMessageType.PING:
# 心跳响应
await websocket_manager.send_personal_message({
"type": MessageType.HEARTBEAT,
"message": "pong",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_STATUS:
# 获取服务状态
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "服务状态正常",
"data": {
"active_connections": websocket_manager.get_active_connections_count(),
"connected_users": websocket_manager.get_connected_users()
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_SOFTWARE_LIST:
# 获取软件列表
try:
software_list = await software_manager.get_software_list()
await websocket_manager.send_personal_message({
"type": MessageType.SOFTWARE_LIST_UPDATE,
"data": {"software_list": software_list},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"获取软件列表失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.START_SOFTWARE:
# 启动软件
software_id = message.get("software_id")
if not software_id:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: software_id",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
task = await software_manager.start_software(software_id)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": f"软件 {software_id} 启动任务已创建",
"data": {
"task_id": task.id,
"software_id": software_id,
"status": task.status.value
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"启动软件失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.STOP_SOFTWARE:
# 停止软件
software_id = message.get("software_id")
if not software_id:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: software_id",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
task = await software_manager.stop_software(software_id)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": f"软件 {software_id} 停止任务已创建",
"data": {
"task_id": task.id,
"software_id": software_id,
"status": task.status.value
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"停止软件失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.RESTART_SOFTWARE:
# 重启软件
software_id = message.get("software_id")
if not software_id:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: software_id",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
task = await software_manager.restart_software(software_id)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": f"软件 {software_id} 重启任务已创建",
"data": {
"task_id": task.id,
"software_id": software_id,
"status": task.status.value
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"重启软件失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.LOG_OPERATION:
# 记录用户操作日志
operation = message.get("operation")
details = message.get("details", "")
if not operation:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: operation",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
# 提取日志参数
log_params = {
"action_type": ActionType(message.get("action_type")) if message.get("action_type") else None,
"target_object": message.get("target_object"),
"software_version": message.get("software_version"),
"status": OperationStatus(message.get("status", "success")),
"duration": message.get("duration"),
"operation_category": message.get("operation_category"),
"extra_data": {
"file_path": message.get("file_path"),
"file_size": message.get("file_size"),
"batch_count": message.get("batch_count"),
"export_format": message.get("export_format"),
**{k: v for k, v in message.items() if k.startswith("custom_")}
}
}
# 清理空值
log_params = {k: v for k, v in log_params.items() if v is not None}
if log_params.get("extra_data"):
log_params["extra_data"] = {k: v for k, v in log_params["extra_data"].items() if v is not None}
if not log_params["extra_data"]: # 如果extra_data为空删除它
del log_params["extra_data"]
# 记录日志
log_id = await log_manager.log_user_operation(
operation=operation,
details=details,
user_id=user_id or "anonymous",
client_id=client_id,
**log_params
)
# 响应成功
await websocket_manager.send_personal_message({
"type": MessageType.LOG_RECORDED,
"message": "操作日志已记录",
"data": {
"log_id": log_id,
"operation": operation
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"记录日志失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.QUERY_LOGS:
# 查询操作日志
try:
from app.models.operation_log import LogFilter, LogType, LogLevel
from datetime import datetime
# 构建查询过滤器
filter_params = LogFilter(
log_type=LogType(message.get("log_type")) if message.get("log_type") else None,
operation=message.get("operation"),
user_id=message.get("user_id_filter"), # 避免与当前user_id冲突
client_id=message.get("client_id_filter"),
level=LogLevel(message.get("level")) if message.get("level") else None,
start_time=datetime.fromisoformat(message.get("start_time")) if message.get("start_time") else None,
end_time=datetime.fromisoformat(message.get("end_time")) if message.get("end_time") else None,
limit=message.get("limit", 100),
offset=message.get("offset", 0)
)
logs = await log_manager.query_logs(filter_params)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "日志查询成功",
"data": {
"logs": [log.dict() for log in logs],
"total": len(logs),
"limit": filter_params.limit,
"offset": filter_params.offset
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"查询日志失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_LOG_BY_ID:
# 根据ID获取日志
log_id = message.get("log_id")
if not log_id:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: log_id",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
log = await log_manager.get_log_by_id(log_id)
if not log:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "日志不存在",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "获取日志成功",
"data": {"log": log.dict()},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"获取日志失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_LOG_STATS:
# 获取日志统计信息
try:
from datetime import datetime, timedelta
from app.models.operation_log import LogFilter, LogType, LogLevel
now = datetime.now()
start_time = now - timedelta(hours=24)
# 查询不同类型的日志数量
system_filter = LogFilter(
log_type=LogType.SYSTEM_OPERATION,
start_time=start_time,
limit=1000
)
system_logs = await log_manager.query_logs(system_filter)
user_filter = LogFilter(
log_type=LogType.USER_OPERATION,
start_time=start_time,
limit=1000
)
user_logs = await log_manager.query_logs(user_filter)
error_filter = LogFilter(
level=LogLevel.ERROR,
start_time=start_time,
limit=1000
)
error_logs = await log_manager.query_logs(error_filter)
stats = {
"period": "24小时",
"system_operations": len(system_logs),
"user_operations": len(user_logs),
"error_logs": len(error_logs),
"total_logs": len(system_logs) + len(user_logs),
"timestamp": now.isoformat()
}
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "获取统计信息成功",
"data": {"stats": stats},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"获取统计信息失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.CLEANUP_LOGS:
# 清理过期日志
try:
await log_manager.cleanup_old_logs()
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "过期日志清理完成",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"清理日志失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_OPERATION_TYPES:
# 获取操作类型列表
try:
from app.models.operation_log import LogFilter
# 查询最近的日志以获取操作类型
filter_params = LogFilter(limit=1000)
recent_logs = await log_manager.query_logs(filter_params)
# 统计操作类型
operations = set()
categories = set()
for log in recent_logs:
operations.add(log.operation)
if log.operation_category:
categories.add(log.operation_category)
result = {
"operations": sorted(list(operations)),
"categories": sorted(list(categories)),
"total_operations": len(operations),
"total_categories": len(categories)
}
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "获取操作类型成功",
"data": result,
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"获取操作类型失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_FILE_LIST:
# 获取CAD文件列表
try:
from app.api.v1.files import get_cad_files_path, scan_cad_files
base_path = get_cad_files_path()
files = scan_cad_files(base_path)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "获取文件列表成功",
"data": {
"base_path": str(base_path),
"total_count": len(files),
"files": files
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except HTTPException as e:
detail = str(e.detail) if e.detail is not None else "未知错误"
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"获取文件列表失败: {detail}",
"data": {
"error": detail,
"detail": detail,
"status_code": e.status_code
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
error_text = str(e) if str(e).strip() else "未知错误"
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"获取文件列表失败: {error_text}",
"data": {
"error": error_text,
"detail": error_text
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.DOWNLOAD_FILE:
# 获取单个文件下载URL
file_path = message.get("file_path")
if not file_path:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: file_path",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.api.v1.files import get_cad_files_path, is_cad_file
from pathlib import Path
base_path = get_cad_files_path()
full_path = base_path / file_path
# 安全检查
full_path = full_path.resolve()
base_path = base_path.resolve()
if not str(full_path).startswith(str(base_path)):
raise ValueError("访问被拒绝")
if not full_path.exists():
raise FileNotFoundError("文件不存在")
if not is_cad_file(full_path.name):
raise ValueError("只能下载CAD文件")
# 返回下载URL使用HTTP接口
download_url = f"/api/v1/files/download/{file_path}"
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "文件下载链接已生成",
"data": {
"file_path": file_path,
"filename": full_path.name,
"download_url": download_url,
"file_size": full_path.stat().st_size
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"生成下载链接失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.DOWNLOAD_BATCH:
# 获取批量文件下载URL
file_paths = message.get("file_paths")
if not file_paths or not isinstance(file_paths, list):
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: file_paths (必须是数组)",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
# 返回批量下载URL使用HTTP接口
download_url = "/api/v1/files/download/batch"
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "批量下载链接已生成",
"data": {
"file_count": len(file_paths),
"file_paths": file_paths,
"download_url": download_url,
"method": "POST",
"note": "请使用POST方法将file_paths作为JSON数组发送到此URL"
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"生成批量下载链接失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_RENAME_STRATEGIES:
# 获取可用的重命名策略列表
try:
from app.core.rename_manager import rename_manager
strategies = rename_manager.get_available_strategies()
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "获取重命名策略成功",
"data": {
"strategies": strategies,
"total_count": len(strategies)
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"获取重命名策略失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.PREVIEW_RENAME:
# 预览重命名结果
file_paths = message.get("file_paths")
strategy_name = message.get("strategy")
params = message.get("params", {})
if not file_paths or not isinstance(file_paths, list):
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: file_paths (必须是数组)",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
if not strategy_name:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: strategy",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.core.rename_manager import rename_manager
from pathlib import Path
# 提取文件名
filenames = [Path(fp).name for fp in file_paths]
# 预览重命名
preview_results = rename_manager.preview_rename(
filenames=filenames,
strategy_name=strategy_name,
params=params
)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "重命名预览生成成功",
"data": {
"strategy": strategy_name,
"params": params,
"preview": preview_results,
"total_count": len(preview_results)
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"预览重命名失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.RENAME_FILES:
# 执行批量重命名
file_paths = message.get("file_paths")
strategy_name = message.get("strategy")
params = message.get("params", {})
if not file_paths or not isinstance(file_paths, list):
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: file_paths (必须是数组)",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
if not strategy_name:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: strategy",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.core.rename_manager import rename_manager
from app.api.v1.files import get_cad_files_path
base_path = get_cad_files_path()
# 执行重命名
results = rename_manager.execute_rename(
base_path=base_path,
file_paths=file_paths,
strategy_name=strategy_name,
params=params
)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": f"批量重命名完成: 成功 {results['success_count']} 个, 失败 {results['failed_count']}",
"data": {
"strategy": strategy_name,
"params": params,
"success_count": results["success_count"],
"failed_count": results["failed_count"],
"results": results["results"]
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"批量重命名失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_FILE_CONFIG:
# 获取文件配置(基础路径和扩展名)
try:
from app.config import software_config
base_path = software_config.get_cad_files_path()
file_extensions = software_config.get_file_extensions()
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "获取文件配置成功",
"data": {
"base_path": base_path,
"file_extensions": file_extensions
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"获取文件配置失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.SET_BASE_PATH:
# 设置CAD文件基础路径
base_path = message.get("base_path")
if not base_path:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: base_path",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.config import software_config
from pathlib import Path
# 验证路径是否存在
path = Path(base_path)
if not path.exists():
raise ValueError(f"路径不存在: {base_path}")
if not path.is_dir():
raise ValueError(f"路径不是有效的目录: {base_path}")
# 保存到配置
software_config.set_cad_files_path(base_path)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "基础路径设置成功",
"data": {
"base_path": base_path
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"设置基础路径失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.SET_FILE_EXTENSIONS:
# 设置文件扩展名配置
file_extensions = message.get("file_extensions")
if not file_extensions or not isinstance(file_extensions, dict):
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: file_extensions (必须是字典对象)",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.config import software_config
# 验证扩展名格式
for software, extensions in file_extensions.items():
if not isinstance(extensions, list):
raise ValueError(f"软件 {software} 的扩展名必须是数组")
for ext in extensions:
if not isinstance(ext, str) or not ext.startswith('.'):
raise ValueError(f"扩展名格式错误: {ext},必须以'.'开头")
# 保存到配置
software_config.set_file_extensions(file_extensions)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "文件扩展名配置已更新",
"data": {
"file_extensions": file_extensions
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"设置文件扩展名失败: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.SUBMIT_BATCH_TASKS:
items = message.get("items")
batch_name = message.get("batch_name")
metadata = message.get("metadata", {})
logger.info(
"WS submit_batch_tasks: client_id=%s user_id=%s batch_name=%s item_count=%s",
client_id,
user_id,
batch_name,
len(items) if isinstance(items, list) else "invalid",
)
if not items or not isinstance(items, list):
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "missing required parameter: items",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.core.cad_batch_manager import cad_batch_manager
from app.models.cad_batch import BatchSubmitRequest
request = BatchSubmitRequest(
items=items,
batch_name=batch_name,
metadata=metadata if isinstance(metadata, dict) else {},
)
batch = await cad_batch_manager.create_batch(
request=request,
submitter_id=user_id or client_id,
)
batch_items = await cad_batch_manager.get_batch_items(batch.id)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": f"batch created: {batch.id}",
"data": {
"batch": batch.model_dump(mode="json"),
"items": [item.model_dump(mode="json") for item in batch_items]
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
logger.exception("WS submit_batch_tasks failed: client_id=%s error=%s", client_id, e)
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"failed to submit batch tasks: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_BATCH_STATUS:
batch_id = message.get("batch_id")
if not batch_id:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "missing required parameter: batch_id",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.core.cad_batch_manager import cad_batch_manager
batch = await cad_batch_manager.get_batch(batch_id)
if not batch:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"batch not found: {batch_id}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
items = await cad_batch_manager.get_batch_items(batch_id)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "batch status fetched",
"data": {
"batch": batch.model_dump(mode="json"),
"items": [item.model_dump(mode="json") for item in items]
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"failed to fetch batch status: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.GET_BATCH_ITEM_STATUS:
item_id = message.get("item_id")
if not item_id:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "missing required parameter: item_id",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.core.cad_batch_manager import cad_batch_manager
item = await cad_batch_manager.get_item(item_id)
if not item:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"batch item not found: {item_id}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "batch item status fetched",
"data": {"item": item.model_dump(mode="json")},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"failed to fetch batch item status: {str(e)}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.CONVERT_IFC_TO_STP:
ifc_path = message.get("ifc_path")
stp_path = message.get("stp_path")
if not ifc_path or not stp_path:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: ifc_path 或 stp_path",
"data": {
"ifc_path": ifc_path,
"stp_path": stp_path
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
try:
from app.core.ifc2stp_converter import Ifc2StpConverter
converter = Ifc2StpConverter()
result = await asyncio.to_thread(converter.convert, ifc_path, stp_path)
await websocket_manager.send_personal_message({
"type": MessageType.INFO,
"message": "IFC 转 STP 成功",
"data": result,
"timestamp": websocket_manager._get_timestamp()
}, client_id)
except Exception as e:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"IFC 转 STP 失败: {str(e)}",
"data": {
"ifc_path": ifc_path,
"stp_path": stp_path
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
elif message_type == WSMessageType.CONVERT_IFC_TO_STP_BATCH:
items = message.get("items")
continue_on_error = bool(message.get("continue_on_error", True))
if not isinstance(items, list) or len(items) == 0:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "缺少参数: items非空数组",
"data": {
"items": items
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
normalized_items = []
invalid_items = []
for index, item in enumerate(items):
if not isinstance(item, dict):
invalid_items.append({
"index": index,
"error": "item 必须是对象",
"item": item
})
continue
ifc_path = item.get("ifc_path")
stp_path = item.get("stp_path")
if not ifc_path or not stp_path:
invalid_items.append({
"index": index,
"error": "缺少 ifc_path 或 stp_path",
"item": item
})
continue
normalized_items.append({
"index": index,
"ifc_path": ifc_path,
"stp_path": stp_path
})
if invalid_items:
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": "批量参数校验失败",
"data": {
"invalid_count": len(invalid_items),
"invalid_items": invalid_items
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
return
from app.core.ifc2stp_converter import Ifc2StpConverter
converter = Ifc2StpConverter()
results = []
success_count = 0
failed_count = 0
for item in normalized_items:
ifc_path = item["ifc_path"]
stp_path = item["stp_path"]
try:
result = await asyncio.to_thread(converter.convert, ifc_path, stp_path)
success_count += 1
results.append({
"index": item["index"],
"status": "success",
"ifc_path": ifc_path,
"stp_path": stp_path,
"result": result
})
except Exception as e:
failed_count += 1
results.append({
"index": item["index"],
"status": "failed",
"ifc_path": ifc_path,
"stp_path": stp_path,
"error": str(e)
})
if not continue_on_error:
break
response_type = MessageType.INFO if failed_count == 0 else MessageType.ERROR
await websocket_manager.send_personal_message({
"type": response_type,
"message": "IFC 批量转 STP 完成" if failed_count == 0 else "IFC 批量转 STP 完成(部分失败)",
"data": {
"total_count": len(results),
"requested_count": len(normalized_items),
"success_count": success_count,
"failed_count": failed_count,
"continue_on_error": continue_on_error,
"results": results
},
"timestamp": websocket_manager._get_timestamp()
}, client_id)
else:
# 未知消息类型
logger.warning("WS unknown message type: client_id=%s type=%s", client_id, message_type)
await websocket_manager.send_personal_message({
"type": MessageType.ERROR,
"message": f"未知的消息类型: {message_type}",
"timestamp": websocket_manager._get_timestamp()
}, client_id)