kangda/app/services/event_sync_service.py

350 lines
12 KiB
Python

import asyncio
from datetime import datetime
from telnetlib import AYT
from tkinter import W
from typing import List, Dict, Any
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import async_session
from app.models.models import Event, Image, Temperature, Message, Robot
from app.util.kangda import Kangda
from app.util.baiduOcr import BadiduOcr
from app.util.status import EventType, etypeNameDict
class EventSyncService:
def __init__(self):
self.kangda = Kangda()
self.ocr = BadiduOcr()
self.robot_dict = None
async def sync_events(self):
"""同步事件数据"""
print("开始同步事件数据...")
# 获取事件列表
event_list = self.kangda.get_event_list()
if not event_list:
print("获取事件列表失败")
return
# 获取新事件列表
new_events = await self._get_new_events(event_list)
if not new_events:
print("没有新事件需要同步")
return
print(f"发现 {len(new_events)} 个新事件")
# 获取事件详情并更新
await self._update_event_details(new_events)
print("事件同步完成")
# 同步机器人状态
async def sync_robot_status(self, message: dict):
"""同步机器人状态"""
# 机器人列表为空时请求数据库
if self.robot_dict is None:
await self._get_robot_dict()
# 机器人信息在保存的机器人字典中,// 避免频繁查询/更新数据库.
if message["robotId"] in self.robot_dict:
# 修改在线状态
if self.robot_dict[message["robotId"]].onlineStatus != message["onlineStatus"]:
print(f"开始同步机器人状态")
self.robot_dict[message["robotId"]].onlineStatus = message["onlineStatus"]
await self._update_robot_online_status(self.robot_dict[message["robotId"]])
# 新增机器人消息
else:
await self._add_robot(message)
self.robot_dict[message["robotId"]] = message
async def _add_robot(self, message):
"""新增机器人"""
try:
async with async_session() as session:
robot = Robot(
robotId=message["robotId"],
onlineStatus=message["onlineStatus"],
number=message["number"],
groupingId=message["groupingId"],
status="0"
)
session.add(robot)
await session.commit()
except Exception as e:
print(f"添加机器人失败: {e}")
# 更新机器人在线状态
async def _update_robot_online_status(self, robot):
"""更新机器人在线状态"""
try:
async with async_session() as session:
# print(robot.onlineStatus)
update_stmt = (
update(Robot).where(Robot.robotId == robot.robotId).values(
onlineStatus = robot.onlineStatus,
updateTime = datetime.now()
)
)
# print(repr(update_stmt))
await session.execute(update_stmt)
if await session.commit() is None: # 判断提交结果
print("更新机器人状态成功")
else:
print("更新机器人状态未生效")
except Exception as e:
print(f"更新机器人在线状态失败: {e}")
async def _get_robot_dict(self):
"""获取机器人状态"""
async with async_session() as session:
query = select(Robot)
result = await session.execute(query)
robot_list = result.scalars().all()
# 将机器人列表转换为字典
self.robot_dict = dict()
for robot in robot_list:
self.robot_dict[robot.robotId] = robot
# return robot_list
# 同步单个事件
async def sync_event(self, eventId: str):
"""同步单个事件"""
print(f"开始同步事件: {eventId}")
# 获取事件详情, 为了兼容get_event_list_detail的参数格式
event_details = self.kangda.get_event_list_detail([{"eventId": eventId}])
async with async_session() as session:
for detail in event_details:
if not detail:
continue
try:
# 当imgList中有多个imgurl时,不能直接插入event表
t = detail["imgList"]
detail["imgList"] = ",".join(detail["imgList"])
# 更新事件信息
new_event = Event(**detail)
session.add(new_event)
await session.commit()
detail["imgList"] = t
# 保存图片信息
image_list = await self._save_images(session, detail)
# 保存ocr温度信息
await self._ocr_images(session, image_list)
# 保存告警消息
await self._save_message(session, detail)
except Exception as e:
session.rollback()
print(f"事件: {eventId} 同步失败: {str(e)}")
print(f"事件: {eventId} 同步完成")
# 保存告警消息
async def _save_message(self, session: AsyncSession, detail: Dict[str, Any]):
if not detail.get("etypeName"):
return
message = Message(
eventId = detail.get("eventId")
)
if detail.get("etypeName") not in etypeNameDict:
print("未知消息类型")
message.eventType = EventType.ERRORTYPE.value
else:
message.eventType = etypeNameDict[detail.get("etypeName")]
session.add(message)
await session.commit()
# pass
async def _get_new_events(self, event_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""获取新事件列表"""
new_events = []
async with async_session() as session:
for event_data in event_list:
# 检查事件是否已存在
query = select(Event).where(Event.eventId == event_data["eventId"])
result = await session.execute(query)
existing_event = result.scalar_one_or_none()
if not existing_event:
# 创建新事件
new_event = Event(**event_data)
session.add(new_event)
new_events.append(event_data)
await session.commit()
return new_events
async def _update_event_details(self, event_list: List[Dict[str, Any]]):
"""更新事件详情"""
# 获取事件详情
event_details = self.kangda.get_event_list_detail(event_list)
async with async_session() as session:
for detail in event_details:
if not detail:
continue
# 更新事件信息
await self._update_event(session, detail)
# 保存图片信息
image_list = await self._save_images(session, detail)
# 保存ocr温度信息
await self._ocr_images(session, image_list)
# 保存告警消息
await self._save_message(session, detail)
async def _update_event(self, session: AsyncSession, detail: Dict[str, Any]):
"""更新事件信息"""
query = select(Event).where(Event.eventId == detail["eventId"])
result = await session.execute(query)
event = result.scalar_one_or_none()
if event:
# 更新事件字段
event.reportEventId = detail.get("reportEventId")
event.fileType = detail.get("fileType")
event.area = detail.get("area")
event.position = detail.get("position")
event.phoneAddress = detail.get("phoneAddress")
event.width = detail.get("width")
event.height = detail.get("height")
event.resolution = detail.get("resolution")
event.originX = detail.get("originX")
event.originY = detail.get("originY")
await session.commit()
async def _save_images(self, session: AsyncSession, detail: Dict[str, Any]):
"""保存图片信息"""
if not detail.get("imgList"):
return
image_list = list()
for image_url in detail["imgList"]:
# 检查图片是否已存在
query = select(Image).where(
Image.eventId == detail["eventId"],
Image.imageUrl == image_url
)
result = await session.execute(query)
existing_image = result.scalar_one_or_none()
if not existing_image:
# 下载图片
local_path = self.kangda._download_image(image_url, save_image=True)
# 创建图片记录
image = Image(
eventId=detail["eventId"],
imageUrl=image_url,
localPath=local_path
)
session.add(image)
session.refresh(image)
image_list.append(image)
await session.commit()
return image_list
async def _ocr_images(self, session: AsyncSession, image_list:Dict[str, Any]):
if image_list is None:
return
for image in image_list:
# 裁剪图片, 去掉时间日期信息
self.kangda.crop_timestamp_with_cv2(image.localPath, image.localPath, 0, 0, 106, 115)
result = self.ocr.image_inference(image.localPath)
values, conf = self.ocr.parse_result(result)
if len(values) > 0 and len(values) < 3:
status = self.kangda.parse_value(values, conf)
if status.value != 4:
# 插入temperature表数据
temperature = Temperature(
eventId=image.eventId,
imageId=image.imageId,
temperature = ",".join(values),
confidence = ",".join(map(str, conf)),
status = status.value
)
session.add(temperature)
# # 异常消息.--- 现改为统一发送消息
# if status.value != 0:
# print("发送异常消息")
# message = Message(
# eventId = image.eventId,
# messageType = status.value,
# eventType = EventType.HOT.value
# )
# session.add(message)
try:
await session.commit()
except Exception as e:
print("插入Temperature表数据异常")
raise
async def run_sync():
"""运行同步任务"""
service = EventSyncService()
while True:
try:
await service.sync_events()
except Exception as e:
print(f"同步过程出错: {str(e)}")
# 等待5分钟
await asyncio.sleep(6000)
async def run_sync_event(eventId: str):
service = EventSyncService()
try:
await service.sync_event(eventId)
except Exception as e:
print(f"同步过程出错: {str(e)}")
if __name__ == "__main__":
# asyncio.run(run_sync())
asyncio.run(run_sync_event("56163a3ce6e44230bd8f0110bcd33e6b"))