344 lines
12 KiB
Python
344 lines
12 KiB
Python
import asyncio
|
|
from datetime import datetime
|
|
from telnetlib import AYT
|
|
from tkinter import W
|
|
from typing import List, Dict, Any
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.core.database import async_session
|
|
from app.models.models import Event, Image, Temperature, Message, Robot
|
|
from app.util.kangda import Kangda
|
|
from app.util.baiduOcr import BadiduOcr
|
|
from app.util.status import EventType, etypeNameDict
|
|
|
|
|
|
class EventSyncService:
|
|
def __init__(self):
|
|
self.kangda = Kangda()
|
|
self.ocr = BadiduOcr()
|
|
self.robot_dict = None
|
|
|
|
|
|
|
|
|
|
async def sync_events(self):
|
|
"""同步事件数据"""
|
|
print("开始同步事件数据...")
|
|
|
|
# 获取事件列表
|
|
event_list = self.kangda.get_event_list()
|
|
if not event_list:
|
|
print("获取事件列表失败")
|
|
return
|
|
|
|
# 获取新事件列表
|
|
new_events = await self._get_new_events(event_list)
|
|
if not new_events:
|
|
print("没有新事件需要同步")
|
|
return
|
|
|
|
print(f"发现 {len(new_events)} 个新事件")
|
|
|
|
# 获取事件详情并更新
|
|
await self._update_event_details(new_events)
|
|
|
|
print("事件同步完成")
|
|
|
|
|
|
# 同步机器人状态
|
|
async def sync_robot_status(self, message: dict):
|
|
"""同步机器人状态"""
|
|
|
|
|
|
# 机器人列表为空时请求数据库
|
|
if self.robot_dict is None:
|
|
await self._get_robot_dict()
|
|
|
|
# 机器人信息在保存的机器人字典中,// 避免频繁查询/更新数据库.
|
|
if message["robotId"] in self.robot_dict:
|
|
# 修改在线状态
|
|
if self.robot_dict[message["robotId"]].onlineStatus != message["onlineStatus"]:
|
|
print(f"开始同步机器人状态")
|
|
self.robot_dict[message["robotId"]].onlineStatus = message["onlineStatus"]
|
|
await self._update_robot_online_status(self.robot_dict[message["robotId"]])
|
|
|
|
# 新增机器人消息
|
|
else:
|
|
await self._add_robot(message)
|
|
self.robot_dict[message["robotId"]] = message
|
|
|
|
|
|
async def _add_robot(self, message):
|
|
"""新增机器人"""
|
|
try:
|
|
async with async_session() as session:
|
|
robot = Robot(
|
|
robotId=message["robotId"],
|
|
onlineStatus=message["onlineStatus"],
|
|
number=message["number"],
|
|
groupingId=message["groupingId"],
|
|
status="0"
|
|
)
|
|
session.add(robot)
|
|
await session.commit()
|
|
except Exception as e:
|
|
print(f"添加机器人失败: {e}")
|
|
|
|
# 更新机器人在线状态
|
|
async def _update_robot_online_status(self, robot):
|
|
"""更新机器人在线状态"""
|
|
try:
|
|
async with async_session() as session:
|
|
# print(robot.onlineStatus)
|
|
update_stmt = (
|
|
update(Robot).where(Robot.robotId == robot.robotId).values(
|
|
onlineStatus = robot.onlineStatus,
|
|
updateTime = datetime.now()
|
|
)
|
|
)
|
|
# print(repr(update_stmt))
|
|
await session.execute(update_stmt)
|
|
if await session.commit() is None: # 判断提交结果
|
|
print("更新机器人状态成功")
|
|
else:
|
|
print("更新机器人状态未生效")
|
|
except Exception as e:
|
|
print(f"更新机器人在线状态失败: {e}")
|
|
|
|
|
|
async def _get_robot_dict(self):
|
|
"""获取机器人状态"""
|
|
async with async_session() as session:
|
|
query = select(Robot)
|
|
result = await session.execute(query)
|
|
robot_list = result.scalars().all()
|
|
# 将机器人列表转换为字典
|
|
self.robot_dict = dict()
|
|
for robot in robot_list:
|
|
self.robot_dict[robot.robotId] = robot
|
|
# return robot_list
|
|
|
|
# 同步单个事件
|
|
async def sync_event(self, eventId: str):
|
|
"""同步单个事件"""
|
|
print(f"开始同步事件: {eventId}")
|
|
|
|
# 获取事件详情, 为了兼容get_event_list_detail的参数格式
|
|
event_details = self.kangda.get_event_list_detail([{"eventId": eventId}])
|
|
|
|
async with async_session() as session:
|
|
for detail in event_details:
|
|
if not detail:
|
|
continue
|
|
try:
|
|
|
|
# 当imgList中有多个imgurl时,不能直接插入event表
|
|
t = detail["imgList"]
|
|
detail["imgList"] = ",".join(detail["imgList"])
|
|
|
|
# 更新事件信息
|
|
new_event = Event(**detail)
|
|
session.add(new_event)
|
|
await session.commit()
|
|
|
|
|
|
|
|
detail["imgList"] = t
|
|
# 保存图片信息
|
|
image_list = await self._save_images(session, detail)
|
|
|
|
# 保存ocr温度信息
|
|
await self._ocr_images(session, image_list)
|
|
|
|
# 保存告警消息
|
|
await self._save_message(session, detail)
|
|
except Exception as e:
|
|
session.rollback()
|
|
print(f"事件: {eventId} 同步失败: {str(e)}")
|
|
|
|
print(f"事件: {eventId} 同步完成")
|
|
|
|
# 保存告警消息
|
|
async def _save_message(self, session: AsyncSession, detail: Dict[str, Any]):
|
|
if not detail.get("etypeName"):
|
|
return
|
|
|
|
message = Message(
|
|
eventId = detail.get("eventId")
|
|
)
|
|
if detail.get("etypeName") not in etypeNameDict:
|
|
print("未知消息类型")
|
|
message.eventType = EventType.ERRORTYPE.value
|
|
else:
|
|
message.eventType = etypeNameDict[detail.get("etypeName")]
|
|
|
|
session.add(message)
|
|
await session.commit()
|
|
# pass
|
|
|
|
async def _get_new_events(self, event_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""获取新事件列表"""
|
|
new_events = []
|
|
|
|
async with async_session() as session:
|
|
for event_data in event_list:
|
|
# 检查事件是否已存在
|
|
query = select(Event).where(Event.eventId == event_data["eventId"])
|
|
result = await session.execute(query)
|
|
existing_event = result.scalar_one_or_none()
|
|
|
|
if not existing_event:
|
|
# 创建新事件
|
|
new_event = Event(**event_data)
|
|
session.add(new_event)
|
|
new_events.append(event_data)
|
|
|
|
await session.commit()
|
|
|
|
return new_events
|
|
|
|
async def _update_event_details(self, event_list: List[Dict[str, Any]]):
|
|
"""更新事件详情"""
|
|
# 获取事件详情
|
|
event_details = self.kangda.get_event_list_detail(event_list)
|
|
|
|
async with async_session() as session:
|
|
for detail in event_details:
|
|
if not detail:
|
|
continue
|
|
|
|
# 更新事件信息
|
|
await self._update_event(session, detail)
|
|
|
|
# 保存图片信息
|
|
image_list = await self._save_images(session, detail)
|
|
|
|
# 保存ocr温度信息
|
|
await self._ocr_images(session, image_list)
|
|
|
|
# 保存告警消息
|
|
await self._save_message(session, detail)
|
|
|
|
|
|
async def _update_event(self, session: AsyncSession, detail: Dict[str, Any]):
|
|
"""更新事件信息"""
|
|
query = select(Event).where(Event.eventId == detail["eventId"])
|
|
result = await session.execute(query)
|
|
event = result.scalar_one_or_none()
|
|
|
|
if event:
|
|
# 更新事件字段
|
|
event.reportEventId = detail.get("reportEventId")
|
|
event.fileType = detail.get("fileType")
|
|
event.area = detail.get("area")
|
|
event.position = detail.get("position")
|
|
event.phoneAddress = detail.get("phoneAddress")
|
|
event.width = detail.get("width")
|
|
event.height = detail.get("height")
|
|
event.resolution = detail.get("resolution")
|
|
event.originX = detail.get("originX")
|
|
event.originY = detail.get("originY")
|
|
|
|
await session.commit()
|
|
|
|
async def _save_images(self, session: AsyncSession, detail: Dict[str, Any]):
|
|
"""保存图片信息"""
|
|
if not detail.get("imgList"):
|
|
return
|
|
|
|
image_list = list()
|
|
|
|
for image_url in detail["imgList"]:
|
|
# 检查图片是否已存在
|
|
query = select(Image).where(
|
|
Image.eventId == detail["eventId"],
|
|
Image.imageUrl == image_url
|
|
)
|
|
result = await session.execute(query)
|
|
existing_image = result.scalar_one_or_none()
|
|
|
|
if not existing_image:
|
|
# 下载图片
|
|
local_path = self.kangda._download_image(image_url, save_image=True)
|
|
|
|
# 创建图片记录
|
|
image = Image(
|
|
eventId=detail["eventId"],
|
|
imageUrl=image_url,
|
|
localPath=local_path
|
|
)
|
|
session.add(image)
|
|
session.refresh(image)
|
|
|
|
image_list.append(image)
|
|
|
|
await session.commit()
|
|
|
|
return image_list
|
|
|
|
async def _ocr_images(self, session: AsyncSession, image_list:Dict[str, Any]):
|
|
if image_list is None:
|
|
return
|
|
|
|
for image in image_list:
|
|
result = self.ocr.image_inference(image.localPath)
|
|
values, conf = self.ocr.parse_result(result)
|
|
|
|
|
|
|
|
if len(values) > 0:
|
|
|
|
status = self.kangda.parse_value(values)
|
|
|
|
# 插入temperature表数据
|
|
temperature = Temperature(
|
|
eventId=image.eventId,
|
|
imageId=image.imageId,
|
|
temperature = ",".join(values),
|
|
confidence = ",".join(map(str, conf)),
|
|
status = status.value
|
|
)
|
|
session.add(temperature)
|
|
|
|
# # 异常消息.--- 现改为统一发送消息
|
|
# if status.value != 0:
|
|
# print("发送异常消息")
|
|
# message = Message(
|
|
# eventId = image.eventId,
|
|
# messageType = status.value,
|
|
# eventType = EventType.HOT.value
|
|
# )
|
|
# session.add(message)
|
|
try:
|
|
await session.commit()
|
|
except Exception as e:
|
|
print("插入Temperature表数据异常")
|
|
raise
|
|
|
|
|
|
|
|
async def run_sync():
|
|
"""运行同步任务"""
|
|
service = EventSyncService()
|
|
while True:
|
|
try:
|
|
await service.sync_events()
|
|
except Exception as e:
|
|
print(f"同步过程出错: {str(e)}")
|
|
|
|
# 等待5分钟
|
|
await asyncio.sleep(6000)
|
|
|
|
async def run_sync_event(eventId: str):
|
|
service = EventSyncService()
|
|
try:
|
|
await service.sync_event(eventId)
|
|
except Exception as e:
|
|
print(f"同步过程出错: {str(e)}")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# asyncio.run(run_sync())
|
|
|
|
asyncio.run(run_sync_event("477b88a72063465586957fe7126fbae5")) |