kangda/app/services/event_sync_service.py

232 lines
7.8 KiB
Python

import asyncio
from datetime import datetime
from typing import List, Dict, Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import async_session
from app.models.models import Event, Image, Temperature, Message
from app.util.kangda import Kangda
from app.util.baiduOcr import BadiduOcr
class EventSyncService:
def __init__(self):
self.kangda = Kangda()
self.ocr = BadiduOcr()
async def sync_events(self):
"""同步事件数据"""
print("开始同步事件数据...")
# 获取事件列表
event_list = self.kangda.get_event_list()
if not event_list:
print("获取事件列表失败")
return
# 获取新事件列表
new_events = await self._get_new_events(event_list)
if not new_events:
print("没有新事件需要同步")
return
print(f"发现 {len(new_events)} 个新事件")
# 获取事件详情并更新
await self._update_event_details(new_events)
print("事件同步完成")
# 同步单个事件
async def sync_event(self, eventId: str):
"""同步单个事件"""
print(f"开始同步事件: {eventId}")
# 获取事件详情, 为了兼容get_event_list_detail的参数格式
event_details = self.kangda.get_event_list_detail([{"eventId": eventId}])
async with async_session() as session:
for detail in event_details:
if not detail:
continue
try:
# 更新事件信息
new_event = Event(**detail)
session.add(new_event)
await session.commit()
# 保存图片信息
image_list = await self._save_images(session, detail)
# 保存ocr温度信息
await self._ocr_images(session, image_list)
except Exception as e:
session.rollback()
print(f"事件: {eventId} 同步失败: {str(e)}")
print(f"事件: {eventId} 同步完成")
async def _get_new_events(self, event_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""获取新事件列表"""
new_events = []
async with async_session() as session:
for event_data in event_list:
# 检查事件是否已存在
query = select(Event).where(Event.eventId == event_data["eventId"])
result = await session.execute(query)
existing_event = result.scalar_one_or_none()
if not existing_event:
# 创建新事件
new_event = Event(**event_data)
session.add(new_event)
new_events.append(event_data)
await session.commit()
return new_events
async def _update_event_details(self, event_list: List[Dict[str, Any]]):
"""更新事件详情"""
# 获取事件详情
event_details = self.kangda.get_event_list_detail(event_list)
async with async_session() as session:
for detail in event_details:
if not detail:
continue
# 更新事件信息
await self._update_event(session, detail)
# 保存图片信息
image_list = await self._save_images(session, detail)
# 保存ocr温度信息
await self._ocr_images(session, image_list)
async def _update_event(self, session: AsyncSession, detail: Dict[str, Any]):
"""更新事件信息"""
query = select(Event).where(Event.eventId == detail["eventId"])
result = await session.execute(query)
event = result.scalar_one_or_none()
if event:
# 更新事件字段
event.reportEventId = detail.get("reportEventId")
event.fileType = detail.get("fileType")
event.area = detail.get("area")
event.position = detail.get("position")
event.phoneAddress = detail.get("phoneAddress")
event.width = detail.get("width")
event.height = detail.get("height")
event.resolution = detail.get("resolution")
event.originX = detail.get("originX")
event.originY = detail.get("originY")
await session.commit()
async def _save_images(self, session: AsyncSession, detail: Dict[str, Any]):
"""保存图片信息"""
if not detail.get("imgList"):
return
image_list = list()
for image_url in detail["imgList"]:
# 检查图片是否已存在
query = select(Image).where(
Image.eventId == detail["eventId"],
Image.imageUrl == image_url
)
result = await session.execute(query)
existing_image = result.scalar_one_or_none()
if not existing_image:
# 下载图片
local_path = self.kangda._download_image(image_url)
# 创建图片记录
image = Image(
eventId=detail["eventId"],
imageUrl=image_url,
localPath=local_path
)
session.add(image)
session.refresh(image)
image_list.append(image)
await session.commit()
return image_list
async def _ocr_images(self, session: AsyncSession, image_list:Dict[str, Any]):
if image_list is None:
return
for image in image_list:
result = self.ocr.image_inference(image.localPath)
values, conf = self.ocr.parse_result(result)
if len(values) > 0:
status = self.kangda.parse_value(values)
# 插入temperature表数据
temperature = Temperature(
eventId=image.eventId,
imageId=image.imageId,
temperature = ",".join(values),
confidence = ",".join(map(str, conf)),
status = status.value
)
session.add(temperature)
# 异常消息
if status.value != 0:
message = Message(
eventId = image.eventId,
messageType = status.value
)
session.add(message)
try:
await session.commit()
except Exception as e:
raise
async def run_sync():
"""运行同步任务"""
service = EventSyncService()
while True:
try:
await service.sync_events()
except Exception as e:
print(f"同步过程出错: {str(e)}")
# 等待5分钟
await asyncio.sleep(6000)
async def run_sync_event(eventId: str):
service = EventSyncService()
try:
await service.sync_event(eventId)
except Exception as e:
print(f"同步过程出错: {str(e)}")
if __name__ == "__main__":
# asyncio.run(run_sync())
asyncio.run(run_sync_event("477b88a72063465586957fe7126fbae5"))