kangda/app/services/robot_sync_service.py

147 lines
5.2 KiB
Python

import asyncio
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from app.models.models import Robot, RobotInfo, Group, GroupRobot
from app.util.kangda import Kangda
# from app.core.redis import redis_client
class RobotSyncService:
def __init__(self):
self.kangda = Kangda()
async def sync_robot_data(self, db: AsyncSession):
"""同步机器人数据"""
try:
# 1. 获取登录信息
login_info = self.kangda._login_front()
if not login_info:
raise Exception("登录失败")
# 2. 获取分组信息
duty_list = self.kangda._get_robot_group(login_info["tenantInfoId"], login_info["token"])
if not duty_list:
raise Exception("获取分组信息失败")
# 3. 更新分组信息
for duty in duty_list:
# 更新或创建分组
group = await self._get_or_create_group(db, duty)
# 4. 获取机器人列表
robot_list = self.kangda._get_robot_video_list(login_info["token"], duty["groupingId"])
if not robot_list:
continue
# 5. 更新机器人信息
for robot_data in robot_list:
await self._update_robot_info(db, robot_data, group.groupingId)
await db.commit()
return True
except Exception as e:
print(f"同步机器人数据失败: {str(e)}")
await db.rollback()
return False
async def _get_or_create_group(self, db: AsyncSession, duty_data: dict) -> Group:
"""获取或创建分组"""
# 查询分组是否存在
query = select(Group).where(Group.groupingId == duty_data["groupingId"])
result = await db.execute(query)
group = result.scalar_one_or_none()
if not group:
# 创建新分组
group = Group(
groupingId=duty_data["groupingId"],
name=duty_data["name"],
tenantInfoId=duty_data["tenantInfoId"]
)
db.add(group)
await db.flush()
return group
async def _update_robot_info(self, db: AsyncSession, robot_data: dict, grouping_id: str):
"""更新机器人信息"""
# 1. 更新或创建机器人基本信息
robot = await self._get_or_create_robot(db, robot_data)
# 2. 更新机器人详细信息
await self._update_robot_detail(db, robot.robotId, robot_data)
# 3. 更新分组关系
await self._update_group_robot_relation(db, robot.robotId, grouping_id)
async def _get_or_create_robot(self, db: AsyncSession, robot_data: dict) -> Robot:
"""获取或创建机器人"""
query = select(Robot).where(Robot.robotId == robot_data["robotId"])
result = await db.execute(query)
robot = result.scalar_one_or_none()
if not robot:
# 创建新机器人
robot = Robot(
robotId=robot_data["robotId"],
number=robot_data.get("number", ""),
groupingId=robot_data.get("groupingId", ""),
onlineStatus=robot_data.get("onlineStatus", "2"),
status=robot_data.get("status", "0")
)
db.add(robot)
await db.flush()
return robot
async def _update_robot_detail(self, db: AsyncSession, robot_id: str, robot_data: dict):
"""更新机器人详细信息"""
# 查询是否存在详细信息
query = select(RobotInfo).where(RobotInfo.robotId == robot_id)
result = await db.execute(query)
robot_info = result.scalar_one_or_none()
if not robot_info:
# 创建新记录
robot_info = RobotInfo(robotId=robot_id)
db.add(robot_info)
# 更新字段
for key, value in robot_data.items():
if hasattr(robot_info, key):
setattr(robot_info, key, value)
await db.flush()
async def _update_group_robot_relation(self, db: AsyncSession, robot_id: str, grouping_id: str):
"""更新分组和机器人的关系"""
# 查询关系是否存在
query = select(GroupRobot).where(
GroupRobot.robotId == robot_id,
GroupRobot.groupingId == grouping_id
)
result = await db.execute(query)
relation = result.scalar_one_or_none()
if not relation:
# 创建新关系
relation = GroupRobot(
robotId=robot_id,
groupingId=grouping_id
)
db.add(relation)
await db.flush()
# async def start(self):
# await self.sync_robot_data(AsyncSession)
# async def run_robot_sync():
# client = RobotSyncService()
# await client.start()
# 创建单例
robot_sync_service = RobotSyncService()