- Added pre-batch cleanup functionality to SerialBatchExecutor, allowing for cleanup tasks before processing items. - Introduced new task execution phases and improved error handling for task submissions. - Implemented inter-step delays and between-items delays for better task management. - Updated logging to capture detailed events during batch processing. - Enhanced configuration options for plugins in software_config.yaml to support new features. - Added tests for pre-batch cleanup and auto-close scenarios to ensure robust handling of edge cases. - Created a PowerShell script for automated callback handling from Revit.
1191 lines
47 KiB
Python
1191 lines
47 KiB
Python
"""
|
||
WebSocket API路由
|
||
提供WebSocket连接端点
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import uuid
|
||
|
||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query, HTTPException
|
||
from app.core.websocket_manager import websocket_manager, MessageType
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# WebSocket消息类型常量
|
||
class WSMessageType:
|
||
PING = "ping"
|
||
GET_STATUS = "get_status"
|
||
GET_SOFTWARE_LIST = "get_software_list"
|
||
START_SOFTWARE = "start_software"
|
||
STOP_SOFTWARE = "stop_software"
|
||
RESTART_SOFTWARE = "restart_software"
|
||
LOG_OPERATION = "log_operation"
|
||
# 新增日志查询相关消息类型
|
||
QUERY_LOGS = "query_logs"
|
||
GET_LOG_BY_ID = "get_log_by_id"
|
||
GET_LOG_STATS = "get_log_stats"
|
||
CLEANUP_LOGS = "cleanup_logs"
|
||
GET_OPERATION_TYPES = "get_operation_types"
|
||
# 文件管理相关消息类型
|
||
GET_FILE_LIST = "get_file_list"
|
||
DOWNLOAD_FILE = "download_file"
|
||
DOWNLOAD_BATCH = "download_batch"
|
||
# 文件重命名相关消息类型
|
||
GET_RENAME_STRATEGIES = "get_rename_strategies"
|
||
PREVIEW_RENAME = "preview_rename"
|
||
RENAME_FILES = "rename_files"
|
||
# 配置管理相关消息类型
|
||
SET_BASE_PATH = "set_base_path"
|
||
SET_FILE_EXTENSIONS = "set_file_extensions"
|
||
GET_FILE_CONFIG = "get_file_config"
|
||
CONVERT_IFC_TO_STP = "convert_ifc_to_stp"
|
||
CONVERT_IFC_TO_STP_BATCH = "convert_ifc_to_stp_batch"
|
||
SUBMIT_BATCH_TASKS = "submit_batch_tasks"
|
||
GET_BATCH_STATUS = "get_batch_status"
|
||
GET_BATCH_ITEM_STATUS = "get_batch_item_status"
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.websocket("/connect")
|
||
async def websocket_endpoint(
|
||
websocket: WebSocket,
|
||
client_id: str = Query(None, description="客户端ID,如果不提供将自动生成"),
|
||
user_id: str = Query(None, description="用户ID,用于用户认证")
|
||
):
|
||
"""
|
||
WebSocket连接端点
|
||
|
||
连接参数:
|
||
- client_id: 客户端唯一标识符,如果不提供将自动生成
|
||
- user_id: 用户ID,用于消息推送和权限控制
|
||
|
||
连接URL示例:
|
||
ws://localhost:8000/api/v1/ws/connect?client_id=client123&user_id=user456
|
||
"""
|
||
|
||
# 如果没有提供client_id,自动生成一个
|
||
if not client_id:
|
||
client_id = str(uuid.uuid4())
|
||
|
||
try:
|
||
# 建立WebSocket连接
|
||
await websocket_manager.connect(websocket, client_id, user_id)
|
||
|
||
# 发送欢迎消息
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": f"欢迎连接!客户端ID: {client_id}",
|
||
"client_id": client_id,
|
||
"user_id": user_id,
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
# 保持连接并处理消息
|
||
while True:
|
||
try:
|
||
# 接收客户端消息
|
||
data = await websocket.receive_text()
|
||
|
||
# 解析JSON消息
|
||
try:
|
||
message = json.loads(data)
|
||
await handle_client_message(message, client_id, user_id)
|
||
except json.JSONDecodeError:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "消息格式错误,请发送有效的JSON",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except WebSocketDisconnect:
|
||
logger.info(f"客户端 {client_id} 主动断开连接")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"处理客户端 {client_id} 消息时发生错误: {e}")
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"服务器处理消息时发生错误: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
break
|
||
|
||
except Exception as e:
|
||
logger.error(f"WebSocket连接发生错误: {e}")
|
||
finally:
|
||
# 清理连接
|
||
websocket_manager.disconnect(client_id)
|
||
|
||
|
||
async def handle_client_message(message: dict, client_id: str, user_id: str):
|
||
"""
|
||
处理客户端发送的消息
|
||
|
||
支持的消息类型:
|
||
- ping: 心跳检测
|
||
- get_status: 获取服务状态
|
||
- get_software_list: 获取软件列表
|
||
- start_software: 启动软件
|
||
- stop_software: 停止软件
|
||
- restart_software: 重启软件
|
||
- log_operation: 记录用户操作日志
|
||
- query_logs: 查询操作日志
|
||
- get_log_by_id: 根据ID获取日志
|
||
- get_log_stats: 获取日志统计信息
|
||
- cleanup_logs: 清理过期日志
|
||
- get_operation_types: 获取操作类型列表
|
||
- get_file_list: 获取CAD文件列表
|
||
- download_file: 获取单个文件下载URL
|
||
- download_batch: 获取批量文件下载URL
|
||
- get_rename_strategies: 获取可用的重命名策略列表
|
||
- preview_rename: 预览重命名结果
|
||
- rename_files: 执行批量重命名
|
||
- get_file_config: 获取文件配置(基础路径和扩展名)
|
||
- set_base_path: 设置CAD文件基础路径
|
||
- set_file_extensions: 设置文件扩展名配置
|
||
- convert_ifc_to_stp: IFC 转 STP(单文件)
|
||
- convert_ifc_to_stp_batch: IFC 转 STP(批量)
|
||
"""
|
||
from app.core.software_manager import software_manager
|
||
from app.core.log_manager import log_manager
|
||
from app.models.operation_log import ActionType, OperationStatus
|
||
|
||
message_type = message.get("type")
|
||
logger.info(
|
||
"WS recv: client_id=%s user_id=%s type=%s keys=%s",
|
||
client_id,
|
||
user_id,
|
||
message_type,
|
||
sorted(list(message.keys())),
|
||
)
|
||
|
||
if message_type == WSMessageType.PING:
|
||
# 心跳响应
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.HEARTBEAT,
|
||
"message": "pong",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_STATUS:
|
||
# 获取服务状态
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "服务状态正常",
|
||
"data": {
|
||
"active_connections": websocket_manager.get_active_connections_count(),
|
||
"connected_users": websocket_manager.get_connected_users()
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_SOFTWARE_LIST:
|
||
# 获取软件列表
|
||
try:
|
||
software_list = await software_manager.get_software_list()
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.SOFTWARE_LIST_UPDATE,
|
||
"data": {"software_list": software_list},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"获取软件列表失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.START_SOFTWARE:
|
||
# 启动软件
|
||
software_id = message.get("software_id")
|
||
if not software_id:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: software_id",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
task = await software_manager.start_software(software_id)
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": f"软件 {software_id} 启动任务已创建",
|
||
"data": {
|
||
"task_id": task.id,
|
||
"software_id": software_id,
|
||
"status": task.status.value
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"启动软件失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.STOP_SOFTWARE:
|
||
# 停止软件
|
||
software_id = message.get("software_id")
|
||
if not software_id:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: software_id",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
task = await software_manager.stop_software(software_id)
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": f"软件 {software_id} 停止任务已创建",
|
||
"data": {
|
||
"task_id": task.id,
|
||
"software_id": software_id,
|
||
"status": task.status.value
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"停止软件失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.RESTART_SOFTWARE:
|
||
# 重启软件
|
||
software_id = message.get("software_id")
|
||
if not software_id:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: software_id",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
task = await software_manager.restart_software(software_id)
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": f"软件 {software_id} 重启任务已创建",
|
||
"data": {
|
||
"task_id": task.id,
|
||
"software_id": software_id,
|
||
"status": task.status.value
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"重启软件失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.LOG_OPERATION:
|
||
# 记录用户操作日志
|
||
operation = message.get("operation")
|
||
details = message.get("details", "")
|
||
|
||
if not operation:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: operation",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
# 提取日志参数
|
||
log_params = {
|
||
"action_type": ActionType(message.get("action_type")) if message.get("action_type") else None,
|
||
"target_object": message.get("target_object"),
|
||
"software_version": message.get("software_version"),
|
||
"status": OperationStatus(message.get("status", "success")),
|
||
"duration": message.get("duration"),
|
||
"operation_category": message.get("operation_category"),
|
||
"extra_data": {
|
||
"file_path": message.get("file_path"),
|
||
"file_size": message.get("file_size"),
|
||
"batch_count": message.get("batch_count"),
|
||
"export_format": message.get("export_format"),
|
||
**{k: v for k, v in message.items() if k.startswith("custom_")}
|
||
}
|
||
}
|
||
|
||
# 清理空值
|
||
log_params = {k: v for k, v in log_params.items() if v is not None}
|
||
if log_params.get("extra_data"):
|
||
log_params["extra_data"] = {k: v for k, v in log_params["extra_data"].items() if v is not None}
|
||
if not log_params["extra_data"]: # 如果extra_data为空,删除它
|
||
del log_params["extra_data"]
|
||
|
||
# 记录日志
|
||
log_id = await log_manager.log_user_operation(
|
||
operation=operation,
|
||
details=details,
|
||
user_id=user_id or "anonymous",
|
||
client_id=client_id,
|
||
**log_params
|
||
)
|
||
|
||
# 响应成功
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.LOG_RECORDED,
|
||
"message": "操作日志已记录",
|
||
"data": {
|
||
"log_id": log_id,
|
||
"operation": operation
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"记录日志失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.QUERY_LOGS:
|
||
# 查询操作日志
|
||
try:
|
||
from app.models.operation_log import LogFilter, LogType, LogLevel
|
||
from datetime import datetime
|
||
|
||
# 构建查询过滤器
|
||
filter_params = LogFilter(
|
||
log_type=LogType(message.get("log_type")) if message.get("log_type") else None,
|
||
operation=message.get("operation"),
|
||
user_id=message.get("user_id_filter"), # 避免与当前user_id冲突
|
||
client_id=message.get("client_id_filter"),
|
||
level=LogLevel(message.get("level")) if message.get("level") else None,
|
||
start_time=datetime.fromisoformat(message.get("start_time")) if message.get("start_time") else None,
|
||
end_time=datetime.fromisoformat(message.get("end_time")) if message.get("end_time") else None,
|
||
limit=message.get("limit", 100),
|
||
offset=message.get("offset", 0)
|
||
)
|
||
|
||
logs = await log_manager.query_logs(filter_params)
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "日志查询成功",
|
||
"data": {
|
||
"logs": [log.dict() for log in logs],
|
||
"total": len(logs),
|
||
"limit": filter_params.limit,
|
||
"offset": filter_params.offset
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"查询日志失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_LOG_BY_ID:
|
||
# 根据ID获取日志
|
||
log_id = message.get("log_id")
|
||
if not log_id:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: log_id",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
log = await log_manager.get_log_by_id(log_id)
|
||
if not log:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "日志不存在",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "获取日志成功",
|
||
"data": {"log": log.dict()},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"获取日志失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_LOG_STATS:
|
||
# 获取日志统计信息
|
||
try:
|
||
from datetime import datetime, timedelta
|
||
from app.models.operation_log import LogFilter, LogType, LogLevel
|
||
|
||
now = datetime.now()
|
||
start_time = now - timedelta(hours=24)
|
||
|
||
# 查询不同类型的日志数量
|
||
system_filter = LogFilter(
|
||
log_type=LogType.SYSTEM_OPERATION,
|
||
start_time=start_time,
|
||
limit=1000
|
||
)
|
||
system_logs = await log_manager.query_logs(system_filter)
|
||
|
||
user_filter = LogFilter(
|
||
log_type=LogType.USER_OPERATION,
|
||
start_time=start_time,
|
||
limit=1000
|
||
)
|
||
user_logs = await log_manager.query_logs(user_filter)
|
||
|
||
error_filter = LogFilter(
|
||
level=LogLevel.ERROR,
|
||
start_time=start_time,
|
||
limit=1000
|
||
)
|
||
error_logs = await log_manager.query_logs(error_filter)
|
||
|
||
stats = {
|
||
"period": "24小时",
|
||
"system_operations": len(system_logs),
|
||
"user_operations": len(user_logs),
|
||
"error_logs": len(error_logs),
|
||
"total_logs": len(system_logs) + len(user_logs),
|
||
"timestamp": now.isoformat()
|
||
}
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "获取统计信息成功",
|
||
"data": {"stats": stats},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"获取统计信息失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.CLEANUP_LOGS:
|
||
# 清理过期日志
|
||
try:
|
||
await log_manager.cleanup_old_logs()
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "过期日志清理完成",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"清理日志失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_OPERATION_TYPES:
|
||
# 获取操作类型列表
|
||
try:
|
||
from app.models.operation_log import LogFilter
|
||
|
||
# 查询最近的日志以获取操作类型
|
||
filter_params = LogFilter(limit=1000)
|
||
recent_logs = await log_manager.query_logs(filter_params)
|
||
|
||
# 统计操作类型
|
||
operations = set()
|
||
categories = set()
|
||
|
||
for log in recent_logs:
|
||
operations.add(log.operation)
|
||
if log.operation_category:
|
||
categories.add(log.operation_category)
|
||
|
||
result = {
|
||
"operations": sorted(list(operations)),
|
||
"categories": sorted(list(categories)),
|
||
"total_operations": len(operations),
|
||
"total_categories": len(categories)
|
||
}
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "获取操作类型成功",
|
||
"data": result,
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"获取操作类型失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_FILE_LIST:
|
||
# 获取CAD文件列表
|
||
try:
|
||
from app.api.v1.files import get_cad_files_path, scan_cad_files
|
||
|
||
base_path = get_cad_files_path()
|
||
files = scan_cad_files(base_path)
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "获取文件列表成功",
|
||
"data": {
|
||
"base_path": str(base_path),
|
||
"total_count": len(files),
|
||
"files": files
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except HTTPException as e:
|
||
detail = str(e.detail) if e.detail is not None else "未知错误"
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"获取文件列表失败: {detail}",
|
||
"data": {
|
||
"error": detail,
|
||
"detail": detail,
|
||
"status_code": e.status_code
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
error_text = str(e) if str(e).strip() else "未知错误"
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"获取文件列表失败: {error_text}",
|
||
"data": {
|
||
"error": error_text,
|
||
"detail": error_text
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.DOWNLOAD_FILE:
|
||
# 获取单个文件下载URL
|
||
file_path = message.get("file_path")
|
||
if not file_path:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: file_path",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.api.v1.files import get_cad_files_path, is_cad_file
|
||
from pathlib import Path
|
||
|
||
base_path = get_cad_files_path()
|
||
full_path = base_path / file_path
|
||
|
||
# 安全检查
|
||
full_path = full_path.resolve()
|
||
base_path = base_path.resolve()
|
||
|
||
if not str(full_path).startswith(str(base_path)):
|
||
raise ValueError("访问被拒绝")
|
||
|
||
if not full_path.exists():
|
||
raise FileNotFoundError("文件不存在")
|
||
|
||
if not is_cad_file(full_path.name):
|
||
raise ValueError("只能下载CAD文件")
|
||
|
||
# 返回下载URL(使用HTTP接口)
|
||
download_url = f"/api/v1/files/download/{file_path}"
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "文件下载链接已生成",
|
||
"data": {
|
||
"file_path": file_path,
|
||
"filename": full_path.name,
|
||
"download_url": download_url,
|
||
"file_size": full_path.stat().st_size
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"生成下载链接失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.DOWNLOAD_BATCH:
|
||
# 获取批量文件下载URL
|
||
file_paths = message.get("file_paths")
|
||
if not file_paths or not isinstance(file_paths, list):
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: file_paths (必须是数组)",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
# 返回批量下载URL(使用HTTP接口)
|
||
download_url = "/api/v1/files/download/batch"
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "批量下载链接已生成",
|
||
"data": {
|
||
"file_count": len(file_paths),
|
||
"file_paths": file_paths,
|
||
"download_url": download_url,
|
||
"method": "POST",
|
||
"note": "请使用POST方法,将file_paths作为JSON数组发送到此URL"
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"生成批量下载链接失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_RENAME_STRATEGIES:
|
||
# 获取可用的重命名策略列表
|
||
try:
|
||
from app.core.rename_manager import rename_manager
|
||
|
||
strategies = rename_manager.get_available_strategies()
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "获取重命名策略成功",
|
||
"data": {
|
||
"strategies": strategies,
|
||
"total_count": len(strategies)
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"获取重命名策略失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.PREVIEW_RENAME:
|
||
# 预览重命名结果
|
||
file_paths = message.get("file_paths")
|
||
strategy_name = message.get("strategy")
|
||
params = message.get("params", {})
|
||
|
||
if not file_paths or not isinstance(file_paths, list):
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: file_paths (必须是数组)",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
if not strategy_name:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: strategy",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.core.rename_manager import rename_manager
|
||
from pathlib import Path
|
||
|
||
# 提取文件名
|
||
filenames = [Path(fp).name for fp in file_paths]
|
||
|
||
# 预览重命名
|
||
preview_results = rename_manager.preview_rename(
|
||
filenames=filenames,
|
||
strategy_name=strategy_name,
|
||
params=params
|
||
)
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "重命名预览生成成功",
|
||
"data": {
|
||
"strategy": strategy_name,
|
||
"params": params,
|
||
"preview": preview_results,
|
||
"total_count": len(preview_results)
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"预览重命名失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.RENAME_FILES:
|
||
# 执行批量重命名
|
||
file_paths = message.get("file_paths")
|
||
strategy_name = message.get("strategy")
|
||
params = message.get("params", {})
|
||
|
||
if not file_paths or not isinstance(file_paths, list):
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: file_paths (必须是数组)",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
if not strategy_name:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: strategy",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.core.rename_manager import rename_manager
|
||
from app.api.v1.files import get_cad_files_path
|
||
|
||
base_path = get_cad_files_path()
|
||
|
||
# 执行重命名
|
||
results = rename_manager.execute_rename(
|
||
base_path=base_path,
|
||
file_paths=file_paths,
|
||
strategy_name=strategy_name,
|
||
params=params
|
||
)
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": f"批量重命名完成: 成功 {results['success_count']} 个, 失败 {results['failed_count']} 个",
|
||
"data": {
|
||
"strategy": strategy_name,
|
||
"params": params,
|
||
"success_count": results["success_count"],
|
||
"failed_count": results["failed_count"],
|
||
"results": results["results"]
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"批量重命名失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_FILE_CONFIG:
|
||
# 获取文件配置(基础路径和扩展名)
|
||
try:
|
||
from app.config import software_config
|
||
|
||
base_path = software_config.get_cad_files_path()
|
||
file_extensions = software_config.get_file_extensions()
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "获取文件配置成功",
|
||
"data": {
|
||
"base_path": base_path,
|
||
"file_extensions": file_extensions
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"获取文件配置失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.SET_BASE_PATH:
|
||
# 设置CAD文件基础路径
|
||
base_path = message.get("base_path")
|
||
|
||
if not base_path:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: base_path",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.config import software_config
|
||
from pathlib import Path
|
||
|
||
# 验证路径是否存在
|
||
path = Path(base_path)
|
||
if not path.exists():
|
||
raise ValueError(f"路径不存在: {base_path}")
|
||
|
||
if not path.is_dir():
|
||
raise ValueError(f"路径不是有效的目录: {base_path}")
|
||
|
||
# 保存到配置
|
||
software_config.set_cad_files_path(base_path)
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "基础路径设置成功",
|
||
"data": {
|
||
"base_path": base_path
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"设置基础路径失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.SET_FILE_EXTENSIONS:
|
||
# 设置文件扩展名配置
|
||
file_extensions = message.get("file_extensions")
|
||
|
||
if not file_extensions or not isinstance(file_extensions, dict):
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: file_extensions (必须是字典对象)",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.config import software_config
|
||
|
||
# 验证扩展名格式
|
||
for software, extensions in file_extensions.items():
|
||
if not isinstance(extensions, list):
|
||
raise ValueError(f"软件 {software} 的扩展名必须是数组")
|
||
|
||
for ext in extensions:
|
||
if not isinstance(ext, str) or not ext.startswith('.'):
|
||
raise ValueError(f"扩展名格式错误: {ext},必须以'.'开头")
|
||
|
||
# 保存到配置
|
||
software_config.set_file_extensions(file_extensions)
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "文件扩展名配置已更新",
|
||
"data": {
|
||
"file_extensions": file_extensions
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"设置文件扩展名失败: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.SUBMIT_BATCH_TASKS:
|
||
items = message.get("items")
|
||
batch_name = message.get("batch_name")
|
||
metadata = message.get("metadata", {})
|
||
logger.info(
|
||
"WS submit_batch_tasks: client_id=%s user_id=%s batch_name=%s item_count=%s",
|
||
client_id,
|
||
user_id,
|
||
batch_name,
|
||
len(items) if isinstance(items, list) else "invalid",
|
||
)
|
||
|
||
if not items or not isinstance(items, list):
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "missing required parameter: items",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.core.cad_batch_manager import cad_batch_manager
|
||
from app.models.cad_batch import BatchSubmitRequest
|
||
|
||
request = BatchSubmitRequest(
|
||
items=items,
|
||
batch_name=batch_name,
|
||
metadata=metadata if isinstance(metadata, dict) else {},
|
||
)
|
||
batch = await cad_batch_manager.create_batch(
|
||
request=request,
|
||
submitter_id=user_id or client_id,
|
||
)
|
||
batch_items = await cad_batch_manager.get_batch_items(batch.id)
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": f"batch created: {batch.id}",
|
||
"data": {
|
||
"batch": batch.model_dump(mode="json"),
|
||
"items": [item.model_dump(mode="json") for item in batch_items]
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
except Exception as e:
|
||
logger.exception("WS submit_batch_tasks failed: client_id=%s error=%s", client_id, e)
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"failed to submit batch tasks: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_BATCH_STATUS:
|
||
batch_id = message.get("batch_id")
|
||
if not batch_id:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "missing required parameter: batch_id",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.core.cad_batch_manager import cad_batch_manager
|
||
|
||
batch = await cad_batch_manager.get_batch(batch_id)
|
||
if not batch:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"batch not found: {batch_id}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
items = await cad_batch_manager.get_batch_items(batch_id)
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "batch status fetched",
|
||
"data": {
|
||
"batch": batch.model_dump(mode="json"),
|
||
"items": [item.model_dump(mode="json") for item in items]
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"failed to fetch batch status: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.GET_BATCH_ITEM_STATUS:
|
||
item_id = message.get("item_id")
|
||
if not item_id:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "missing required parameter: item_id",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.core.cad_batch_manager import cad_batch_manager
|
||
|
||
item = await cad_batch_manager.get_item(item_id)
|
||
if not item:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"batch item not found: {item_id}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "batch item status fetched",
|
||
"data": {"item": item.model_dump(mode="json")},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"failed to fetch batch item status: {str(e)}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.CONVERT_IFC_TO_STP:
|
||
ifc_path = message.get("ifc_path")
|
||
stp_path = message.get("stp_path")
|
||
|
||
if not ifc_path or not stp_path:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: ifc_path 或 stp_path",
|
||
"data": {
|
||
"ifc_path": ifc_path,
|
||
"stp_path": stp_path
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
try:
|
||
from app.core.ifc2stp_converter import Ifc2StpConverter
|
||
|
||
converter = Ifc2StpConverter()
|
||
result = await asyncio.to_thread(converter.convert, ifc_path, stp_path)
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.INFO,
|
||
"message": "IFC 转 STP 成功",
|
||
"data": result,
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
except Exception as e:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"IFC 转 STP 失败: {str(e)}",
|
||
"data": {
|
||
"ifc_path": ifc_path,
|
||
"stp_path": stp_path
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
elif message_type == WSMessageType.CONVERT_IFC_TO_STP_BATCH:
|
||
items = message.get("items")
|
||
continue_on_error = bool(message.get("continue_on_error", True))
|
||
|
||
if not isinstance(items, list) or len(items) == 0:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "缺少参数: items(非空数组)",
|
||
"data": {
|
||
"items": items
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
normalized_items = []
|
||
invalid_items = []
|
||
for index, item in enumerate(items):
|
||
if not isinstance(item, dict):
|
||
invalid_items.append({
|
||
"index": index,
|
||
"error": "item 必须是对象",
|
||
"item": item
|
||
})
|
||
continue
|
||
|
||
ifc_path = item.get("ifc_path")
|
||
stp_path = item.get("stp_path")
|
||
if not ifc_path or not stp_path:
|
||
invalid_items.append({
|
||
"index": index,
|
||
"error": "缺少 ifc_path 或 stp_path",
|
||
"item": item
|
||
})
|
||
continue
|
||
|
||
normalized_items.append({
|
||
"index": index,
|
||
"ifc_path": ifc_path,
|
||
"stp_path": stp_path
|
||
})
|
||
|
||
if invalid_items:
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": "批量参数校验失败",
|
||
"data": {
|
||
"invalid_count": len(invalid_items),
|
||
"invalid_items": invalid_items
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
return
|
||
|
||
from app.core.ifc2stp_converter import Ifc2StpConverter
|
||
|
||
converter = Ifc2StpConverter()
|
||
results = []
|
||
success_count = 0
|
||
failed_count = 0
|
||
|
||
for item in normalized_items:
|
||
ifc_path = item["ifc_path"]
|
||
stp_path = item["stp_path"]
|
||
try:
|
||
result = await asyncio.to_thread(converter.convert, ifc_path, stp_path)
|
||
success_count += 1
|
||
results.append({
|
||
"index": item["index"],
|
||
"status": "success",
|
||
"ifc_path": ifc_path,
|
||
"stp_path": stp_path,
|
||
"result": result
|
||
})
|
||
except Exception as e:
|
||
failed_count += 1
|
||
results.append({
|
||
"index": item["index"],
|
||
"status": "failed",
|
||
"ifc_path": ifc_path,
|
||
"stp_path": stp_path,
|
||
"error": str(e)
|
||
})
|
||
if not continue_on_error:
|
||
break
|
||
|
||
response_type = MessageType.INFO if failed_count == 0 else MessageType.ERROR
|
||
await websocket_manager.send_personal_message({
|
||
"type": response_type,
|
||
"message": "IFC 批量转 STP 完成" if failed_count == 0 else "IFC 批量转 STP 完成(部分失败)",
|
||
"data": {
|
||
"total_count": len(results),
|
||
"requested_count": len(normalized_items),
|
||
"success_count": success_count,
|
||
"failed_count": failed_count,
|
||
"continue_on_error": continue_on_error,
|
||
"results": results
|
||
},
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|
||
|
||
else:
|
||
# 未知消息类型
|
||
logger.warning("WS unknown message type: client_id=%s type=%s", client_id, message_type)
|
||
await websocket_manager.send_personal_message({
|
||
"type": MessageType.ERROR,
|
||
"message": f"未知的消息类型: {message_type}",
|
||
"timestamp": websocket_manager._get_timestamp()
|
||
}, client_id)
|