TellmeStpToGlb/services/task_manager.py
root 5102bd57e8 feat: 服务化改造STP到GLB转换器
- 🚀 新增FastAPI Web服务支持
-  实现异步任务处理和并发转换
- 📊 添加实时进度追踪(0-100%)
- 🏗️ 重构为模块化架构:core/api/services/utils
- 🔧 完整的任务管理系统和状态追踪
- 📖 自动生成API文档(Swagger/ReDoc)
- 🔄 保持CLI模式100%向后兼容
- 🛡️ 增强错误处理和文件验证
- 📝 更新完整文档(README/CLAUDE.md)

技术栈: FastAPI + uvicorn + pydantic + asyncio
API端点: /health, /api/v1/convert, /api/v1/status, /api/v1/tasks

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-02 09:44:43 +08:00

142 lines
4.6 KiB
Python

#!/usr/bin/env python3
"""
任务管理服务
负责任务的创建、跟踪和状态管理
"""
import asyncio
import uuid
from datetime import datetime
from typing import Dict, Optional
from core.models import ConvertTask, TaskStatus, ConvertOptions
from core.converter import StpToGlbConverter
class TaskManager:
"""任务管理器"""
def __init__(self):
self.tasks: Dict[str, ConvertTask] = {}
self.running_tasks: Dict[str, asyncio.Task] = {}
def create_task(self, input_path: str, output_path: str, options: ConvertOptions) -> ConvertTask:
"""创建新任务"""
task_id = str(uuid.uuid4())
task = ConvertTask(
task_id=task_id,
input_path=input_path,
output_path=output_path,
options=options
)
self.tasks[task_id] = task
return task
def get_task(self, task_id: str) -> Optional[ConvertTask]:
"""获取任务"""
return self.tasks.get(task_id)
def get_all_tasks(self) -> Dict[str, ConvertTask]:
"""获取所有任务"""
return self.tasks.copy()
async def process_task(self, task_id: str) -> None:
"""处理转换任务"""
task = self.tasks.get(task_id)
if not task:
return
# 更新任务状态
task.status = TaskStatus.PROCESSING
task.started_at = datetime.now()
task.message = "开始处理转换任务"
converter = StpToGlbConverter()
# 设置进度回调
def progress_callback(progress: int, message: str):
task.progress = progress
task.message = message
converter.set_progress_callback(progress_callback)
try:
# 执行转换
await asyncio.get_event_loop().run_in_executor(
None,
converter.convert,
task.input_path,
task.output_path,
task.options.auto_scale,
task.options.auto_center
)
# 标记完成
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.progress = 100
task.message = "转换完成"
except Exception as e:
# 标记失败
task.status = TaskStatus.FAILED
task.completed_at = datetime.now()
task.error_message = str(e)
task.message = f"转换失败: {str(e)}"
finally:
# 清理运行任务记录
if task_id in self.running_tasks:
del self.running_tasks[task_id]
async def start_task(self, task_id: str) -> bool:
"""启动任务处理"""
if task_id not in self.tasks:
return False
if task_id in self.running_tasks:
return False # 任务已在运行
# 创建异步任务
asyncio_task = asyncio.create_task(self.process_task(task_id))
self.running_tasks[task_id] = asyncio_task
return True
def cancel_task(self, task_id: str) -> bool:
"""取消任务"""
if task_id in self.running_tasks:
self.running_tasks[task_id].cancel()
del self.running_tasks[task_id]
task = self.tasks.get(task_id)
if task:
task.status = TaskStatus.FAILED
task.error_message = "任务被取消"
task.message = "任务已取消"
task.completed_at = datetime.now()
return True
return False
def cleanup_completed_tasks(self, max_tasks: int = 100) -> None:
"""清理已完成的任务,保持任务数量在限制内"""
if len(self.tasks) <= max_tasks:
return
# 按完成时间排序,保留最新的任务
completed_tasks = [
(task_id, task) for task_id, task in self.tasks.items()
if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] and task.completed_at
]
if len(completed_tasks) > max_tasks // 2:
# 删除最老的已完成任务
completed_tasks.sort(key=lambda x: x[1].completed_at)
to_remove = completed_tasks[:len(completed_tasks) - max_tasks // 2]
for task_id, _ in to_remove:
del self.tasks[task_id]
# 全局任务管理器实例
task_manager = TaskManager()