kangda/app/crud/event.py

331 lines
11 KiB
Python

from typing import List, Optional, Dict, Any
from sqlalchemy import select, and_, or_, update, bindparam
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.crud.base import CRUDBase
from app.models.models import Event, Image, Temperature, Message
from app.schemas.event import EventUpdate, EventQuery, BackStageEvent, BackStageEventDto, BackStageEventDetail, EditTemperatureDto, OcrAlertMessage, OcrAlertMessageDto
from pydantic import VERSION as PYDANTIC_VERSION
from datetime import datetime
class CRUDEvent(CRUDBase[Event, EventUpdate, EventUpdate]):
async def get_by_id(self, db: AsyncSession, *, event_id: str) -> Optional[Event]:
"""根据ID获取事件"""
query = (
select(Event)
.options(
selectinload(Event.images),
selectinload(Event.temperatures)
)
.where(Event.eventId == event_id)
)
result = await db.execute(query)
return result.scalar_one_or_none()
async def get_event_detail(
self,
db: AsyncSession,
*,
eventId: str
) -> Optional[BackStageEventDetail]:
stmt = (select(Event.eventId, Event.number, Event.name,Image.imageUrl, Image.localPath, Temperature.temperature,Temperature.confidence, Temperature.status, Temperature.createTime).
select_from(Event).
outerjoin(Image, Image.eventId == Event.eventId).
outerjoin(Temperature, Event.eventId==Temperature.eventId).
where(Event.eventId==eventId)
)
result = await db.execute(stmt)
result = result.mappings().first()
return result
async def get_multi_backstage_events(
self,
db: AsyncSession,
*,
query: BackStageEventDto
) -> List[BackStageEvent]:
"""后台根据条件获取事件列表"""
conditions = []
if query.eventId is not None and len(query.eventId) > 0:
conditions.append(Event.eventId==query.eventId)
if query.number is not None and len(query.number) > 0:
conditions.append(Event.number == query.number)
if query.name is not None and len(query.name) > 0:
conditions.append(Event.name == query.name)
if query.status is not None and len(query.status) > 0:
conditions.append(Temperature.status == query.status)
if query.start_time is not None:
conditions.append(Temperature.createTime >= query.start_time)
if query.end_time is not None:
conditions.append(Temperature.createTime <= query.end_time)
stmt = (select(Event.eventId, Event.number, Event.name, Image.imageUrl, Image.localPath, Temperature.temperature, Temperature.confidence, Temperature.createTime, Temperature.status)
.select_from(Temperature)
.outerjoin(Event, Event.eventId==Temperature.eventId)
.outerjoin(Image, Image.imageId == Temperature.imageId)
.where(*conditions)
.order_by(Temperature.createTime.desc())
.offset(query.skip)
.limit(query.limit))
results = await db.execute(stmt)
results = results.mappings().all()
back = list()
for result in results:
# if result["temperature"] is not None:
back.append(BackStageEvent(**result))
return back
async def get_multi_with_query(
self,
db: AsyncSession,
*,
query: EventQuery
) -> List[Event]:
"""根据查询条件获取事件列表"""
conditions = []
if query.start_time:
conditions.append(Event.insDate >= query.start_time)
if query.end_time:
conditions.append(Event.insDate <= query.end_time)
if query.etypeName:
conditions.append(Event.etypeName == query.etypeName)
if query.area:
conditions.append(Event.area == query.area)
query_stmt = (
select(Event)
.options(
selectinload(Event.images),
selectinload(Event.temperatures)
)
)
if conditions:
query_stmt = query_stmt.where(and_(*conditions))
query_stmt = query_stmt.offset(query.skip).limit(query.limit)
result = await db.execute(query_stmt)
return result.scalars().all()
async def update_event(
self,
db: AsyncSession,
*,
event_id: str,
obj_in: EditTemperatureDto
) -> Optional[Event]:
"""更新事件信息"""
dict_values = dict()
if PYDANTIC_VERSION.startswith("2."):
field_names = list(EditTemperatureDto.model_fields.keys())
else:
field_names = list(EditTemperatureDto.__fields__.keys())
for key in field_names:
value = getattr(obj_in, key)
if value is not None:
dict_values[key] = value
try:
update_stmt = (
update(Temperature).where(Temperature.eventId == event_id).values(**dict_values)
)
await db.execute(update_stmt)
await db.commit()
return True
except:
return False
# event = await self.get_by_id(db, event_id=event_id)
# if not event:
# return None
# update_data = obj_in.model_dump()
# for field, value in update_data.items():
# setattr(event, field, value)
# # db.add(event)
# await db.commit()
# await db.refresh(event)
# return event
async def delete_event(
self,
db: AsyncSession,
*,
event_id: str
) -> Optional[Event]:
"""删除事件"""
event = await self.get_by_id(db, event_id=event_id)
if not event:
return None
await db.delete(event)
await db.commit()
return event
async def get_messages(
self,
db: AsyncSession,
skip:int,
limit:int
) -> Optional[List[OcrAlertMessage]]:
query_stmt = (
select(Message.messageId,Message.eventId ,Message.messageType, Message.eventType, Message.handle, Message.remark, Message.createTime,
Event.number,Event.name,
Image.imageUrl, Image.localPath,
Temperature.temperature)
.select_from(Message)
.outerjoin(Event, Message.eventId == Event.eventId)
.outerjoin(Image, Message.eventId == Image.eventId)
.outerjoin(Temperature, Message.eventId == Temperature.eventId)
.offset(skip)
.limit(limit)
)
messages = await db.execute(query_stmt)
messages = messages.mappings().all()
return [OcrAlertMessage(**t) for t in messages]
async def handle_ocr_alerts(
self,
db:AsyncSession,
messageIdList: List[int]
):
"""处理OCR告警消息
Args:
db: 数据库会话
messageIdList: 要处理的messageId列表
Returns:
bool: 更新是否成功
"""
try:
# 获取所有需要更新的消息ID
# 构建更新语句
update_stmt = (
update(Message)
.where(Message.messageId.in_(messageIdList))
.values(
handle="1",
updateTime=datetime.now()
)
)
# 准备更新数据
# 执行更新
await db.execute(
update_stmt,
# update_data,
execution_options={"synchronize_session": False}
)
await db.commit()
return True
except Exception as e:
print(f"更新OCR告警消息失败: {str(e)}")
await db.rollback()
return False
async def handle_ocr_alerts_get(
self,
db: AsyncSession
):
try:
update_stmt = (
update(Message)
.where(Message.handle == "0")
.values(
handle = "1",
updateTime=datetime.now()
)
)
# 执行更新
await db.execute(
update_stmt,
execution_options={"synchronize_session": False}
)
await db.commit()
return True
except Exception as e:
print(f"更新OCR告警消息失败: {str(e)}")
await db.rollback()
return False
async def handle_ocr_alert(
self,
db: AsyncSession,
ocrAlertMessageDto: OcrAlertMessageDto
):
try:
update_stmt = (
update(Message)
.where(Message.messageId == ocrAlertMessageDto.messageId)
.values(
remark = ocrAlertMessageDto.remark,
updateTime=datetime.now()
)
)
await db.execute(update_stmt)
await db.commit()
return True
except Exception as e:
print(f"处理单个消息失败: {e}")
await db.rollback()
return False
async def get_alert_detail(
self,
db: AsyncSession,
messageId: int
) ->Optional[OcrAlertMessage]:
try:
select_stmt = (
select(Message.messageId,Message.eventId ,Message.messageType, Message.eventType, Message.handle, Message.remark, Message.createTime,
Event.number,Event.name,
Image.imageUrl, Image.localPath,
Temperature.temperature)
.select_from(Message)
.outerjoin(Event, Message.eventId == Event.eventId)
.outerjoin(Image, Message.eventId == Image.eventId)
.outerjoin(Temperature, Message.eventId == Temperature.eventId)
.where(Message.messageId == messageId)
)
result = await db.execute(select_stmt)
return result.mappings().first()
except Exception as e:
print(f"获取告警详情失败: {e}")
event = CRUDEvent(Event)