diff --git a/app/services/robot_sync_service.py b/app/services/robot_sync_service.py new file mode 100644 index 0000000..40debed --- /dev/null +++ b/app/services/robot_sync_service.py @@ -0,0 +1,147 @@ +import asyncio +from datetime import datetime +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, update +from app.models.models import Robot, RobotInfo, Group, GroupRobot +from app.util.kangda import Kangda +# from app.core.redis import redis_client + +class RobotSyncService: + def __init__(self): + self.kangda = Kangda() + + async def sync_robot_data(self, db: AsyncSession): + """同步机器人数据""" + try: + # 1. 获取登录信息 + login_info = self.kangda._login_front() + if not login_info: + raise Exception("登录失败") + + # 2. 获取分组信息 + duty_list = self.kangda._get_robot_group(login_info["tenantInfoId"], login_info["token"]) + if not duty_list: + raise Exception("获取分组信息失败") + + # 3. 更新分组信息 + for duty in duty_list: + # 更新或创建分组 + group = await self._get_or_create_group(db, duty) + + # 4. 获取机器人列表 + robot_list = self.kangda._get_robot_video_list(login_info["token"], duty["groupingId"]) + if not robot_list: + continue + + # 5. 更新机器人信息 + for robot_data in robot_list: + await self._update_robot_info(db, robot_data, group.groupingId) + + await db.commit() + return True + + except Exception as e: + print(f"同步机器人数据失败: {str(e)}") + await db.rollback() + return False + + async def _get_or_create_group(self, db: AsyncSession, duty_data: dict) -> Group: + """获取或创建分组""" + # 查询分组是否存在 + query = select(Group).where(Group.groupingId == duty_data["groupingId"]) + result = await db.execute(query) + group = result.scalar_one_or_none() + + if not group: + # 创建新分组 + group = Group( + groupingId=duty_data["groupingId"], + name=duty_data["name"], + tenantInfoId=duty_data["tenantInfoId"] + ) + db.add(group) + await db.flush() + + return group + + async def _update_robot_info(self, db: AsyncSession, robot_data: dict, grouping_id: str): + """更新机器人信息""" + # 1. 更新或创建机器人基本信息 + robot = await self._get_or_create_robot(db, robot_data) + + # 2. 更新机器人详细信息 + await self._update_robot_detail(db, robot.robotId, robot_data) + + # 3. 更新分组关系 + await self._update_group_robot_relation(db, robot.robotId, grouping_id) + + async def _get_or_create_robot(self, db: AsyncSession, robot_data: dict) -> Robot: + """获取或创建机器人""" + query = select(Robot).where(Robot.robotId == robot_data["robotId"]) + result = await db.execute(query) + robot = result.scalar_one_or_none() + + if not robot: + # 创建新机器人 + robot = Robot( + robotId=robot_data["robotId"], + number=robot_data.get("number", ""), + groupingId=robot_data.get("groupingId", ""), + onlineStatus=robot_data.get("onlineStatus", "2"), + status=robot_data.get("status", "0") + ) + db.add(robot) + await db.flush() + + return robot + + async def _update_robot_detail(self, db: AsyncSession, robot_id: str, robot_data: dict): + """更新机器人详细信息""" + # 查询是否存在详细信息 + query = select(RobotInfo).where(RobotInfo.robotId == robot_id) + result = await db.execute(query) + robot_info = result.scalar_one_or_none() + + if not robot_info: + # 创建新记录 + robot_info = RobotInfo(robotId=robot_id) + db.add(robot_info) + + # 更新字段 + for key, value in robot_data.items(): + if hasattr(robot_info, key): + setattr(robot_info, key, value) + + await db.flush() + + async def _update_group_robot_relation(self, db: AsyncSession, robot_id: str, grouping_id: str): + """更新分组和机器人的关系""" + # 查询关系是否存在 + query = select(GroupRobot).where( + GroupRobot.robotId == robot_id, + GroupRobot.groupingId == grouping_id + ) + result = await db.execute(query) + relation = result.scalar_one_or_none() + + if not relation: + # 创建新关系 + relation = GroupRobot( + robotId=robot_id, + groupingId=grouping_id + ) + db.add(relation) + await db.flush() + +# async def start(self): +# await self.sync_robot_data(AsyncSession) + + + +# async def run_robot_sync(): +# client = RobotSyncService() +# await client.start() + + +# 创建单例 +robot_sync_service = RobotSyncService() \ No newline at end of file diff --git a/app/services/scheduler.py b/app/services/scheduler.py new file mode 100644 index 0000000..4ce019d --- /dev/null +++ b/app/services/scheduler.py @@ -0,0 +1,34 @@ +import asyncio +from datetime import datetime +from app.core.database import async_session +from app.services.robot_sync_service import robot_sync_service + +class Scheduler: + def __init__(self): + self.is_running = False + + async def start(self): + """启动定时任务""" + if self.is_running: + return + + self.is_running = True + while self.is_running: + try: + # 同步机器人数据 + async with async_session() as session: + await robot_sync_service.sync_robot_data(session) + + # 等待5分钟 + await asyncio.sleep(600) + + except Exception as e: + print(f"定时任务执行失败: {str(e)}") + await asyncio.sleep(60) # 发生错误时等待1分钟后重试 + + def stop(self): + """停止定时任务""" + self.is_running = False + +# 创建单例 +scheduler = Scheduler() \ No newline at end of file diff --git a/run_sync_robot.py b/run_sync_robot.py new file mode 100644 index 0000000..57f3b65 --- /dev/null +++ b/run_sync_robot.py @@ -0,0 +1,8 @@ +import asyncio + +from app.services.scheduler import scheduler + + +if __name__ == "__main__": + asyncio.run(scheduler.start()) +