from typing import List, Optional, Dict, Any from sqlalchemy import select, and_, or_, update, bindparam, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.crud.base import CRUDBase from app.models.models import Event, Image, RobotInfo, Temperature, Message, Robot, Task from app.schemas.event import AllMessageDto, EventUpdate, EventQuery, BackStageEvent, BackStageEventDto, BackStageEventDetail, EditTemperatureDto, OcrAlertMessage, OcrAlertMessageDto, GetRobotDto, CommonAlertMessage, GetAllMessageDto from pydantic import VERSION as PYDANTIC_VERSION from datetime import datetime, timedelta, timezone # from app.util import kangda class CRUDEvent(CRUDBase[Event, EventUpdate, EventUpdate]): async def get_by_id(self, db: AsyncSession, *, event_id: str) -> Optional[Event]: """根据ID获取事件""" query = ( select(Event) .options( selectinload(Event.images), selectinload(Event.temperatures) ) .where(Event.eventId == event_id) ) result = await db.execute(query) return result.scalar_one_or_none() async def get_event_detail( self, db: AsyncSession, *, eventId: str ) -> Optional[BackStageEventDetail]: stmt = (select(Event.eventId, Event.number, Event.name,Image.imageUrl, Image.localPath, Temperature.temperature,Temperature.confidence, Temperature.status, Temperature.createTime). select_from(Event). outerjoin(Image, Image.eventId == Event.eventId). outerjoin(Temperature, Event.eventId==Temperature.eventId). where(Event.eventId==eventId) ) result = await db.execute(stmt) result = result.mappings().first() return result async def get_multi_backstage_events( self, db: AsyncSession, *, query: BackStageEventDto ) -> List[BackStageEvent]: """后台根据条件获取事件列表""" conditions = [] if query.eventId is not None and len(query.eventId) > 0: conditions.append(Event.eventId==query.eventId) if query.number is not None and len(query.number) > 0: conditions.append(Event.number == query.number) if query.name is not None and len(query.name) > 0: conditions.append(Event.name == query.name) if query.status is not None and len(query.status) > 0: conditions.append(Temperature.status == query.status) if query.start_time is not None: conditions.append(Temperature.createTime >= query.start_time) if query.end_time is not None: conditions.append(Temperature.createTime <= query.end_time) stmt = (select(Event.eventId, Event.number, Event.name, Image.imageUrl, Image.localPath, Temperature.temperature, Temperature.confidence, Temperature.createTime, Temperature.status) .select_from(Temperature) .outerjoin(Event, Event.eventId==Temperature.eventId) .outerjoin(Image, Image.imageId == Temperature.imageId) .where(*conditions) .order_by(Temperature.createTime.desc()) .offset(query.skip) .limit(query.limit)) results = await db.execute(stmt) results = results.mappings().all() back = list() for result in results: # if result["temperature"] is not None: back.append(BackStageEvent(**result)) return back async def get_multi_with_query( self, db: AsyncSession, *, query: EventQuery ) -> List[Event]: """根据查询条件获取事件列表""" conditions = [] if query.start_time: conditions.append(Event.insDate >= query.start_time) if query.end_time: conditions.append(Event.insDate <= query.end_time) if query.etypeName: conditions.append(Event.etypeName == query.etypeName) if query.area: conditions.append(Event.area == query.area) query_stmt = ( select(Event) .options( selectinload(Event.images), selectinload(Event.temperatures) ) ) if conditions: query_stmt = query_stmt.where(and_(*conditions)) query_stmt = query_stmt.offset(query.skip).limit(query.limit) result = await db.execute(query_stmt) return result.scalars().all() async def update_event( self, db: AsyncSession, *, event_id: str, obj_in: EditTemperatureDto ) -> Optional[Event]: """更新事件信息""" dict_values = dict() if PYDANTIC_VERSION.startswith("2."): field_names = list(EditTemperatureDto.model_fields.keys()) else: field_names = list(EditTemperatureDto.__fields__.keys()) for key in field_names: value = getattr(obj_in, key) if value is not None: dict_values[key] = value try: update_stmt = ( update(Temperature).where(Temperature.eventId == event_id).values(**dict_values) ) await db.execute(update_stmt) await db.commit() return True except: return False # event = await self.get_by_id(db, event_id=event_id) # if not event: # return None # update_data = obj_in.model_dump() # for field, value in update_data.items(): # setattr(event, field, value) # # db.add(event) # await db.commit() # await db.refresh(event) # return event async def delete_event( self, db: AsyncSession, *, event_id: str ) -> Optional[Event]: """删除事件""" event = await self.get_by_id(db, event_id=event_id) if not event: return None await db.delete(event) await db.commit() return event async def get_messages( self, db: AsyncSession, skip:int, limit:int, condition: list = [] ) -> Optional[List[OcrAlertMessage]]: query_stmt = ( select(Message.messageId,Message.eventId ,Message.messageType, Message.eventType, Message.handle, Message.remark, Message.createTime, Event.number,Event.name, # Image.imageUrl, Image.localPath, Temperature.temperature) .select_from(Message) .outerjoin(Event, Message.eventId == Event.eventId) # .outerjoin(Image, Message.eventId == Image.eventId) .outerjoin(Temperature, Message.eventId == Temperature.eventId) .where(*condition) .order_by(Message.createTime.desc()) .offset(skip) .limit(limit) ) messages = await db.execute(query_stmt) messages = messages.mappings().all() back = list() for message in messages: # 获取图片URL列表 query_image_stmt = ( select(Image.imageUrl) .where(Image.eventId == message.get("eventId")) ) images = await db.execute(query_image_stmt) images = images.mappings().all() # 将RowMapping转换为字典 message_dict = dict(message) # 将图片URL列表转换为普通列表 image_urls = [dict(img)["imageUrl"] for img in images] message_dict["imageUrl"] = image_urls ocrAlertMessage = OcrAlertMessage(**message_dict) back.append(ocrAlertMessage) return back async def get_messages_front( self, db: AsyncSession, skip:int, limit:int, condition: list = [] ) -> Optional[List[OcrAlertMessage]]: query_stmt = ( select(Message.messageId,Message.eventId ,Message.messageType, Message.eventType, Message.handle, Message.remark, Message.createTime, Event.number,Event.name, # Image.imageUrl, Image.localPath, RobotInfo.flvPtz, RobotInfo.flvTherm, RobotInfo.flvThermLight, Temperature.temperature) .select_from(Message) .outerjoin(Event, Message.eventId == Event.eventId) # .outerjoin(Image, Message.eventId == Image.eventId) .outerjoin(RobotInfo, RobotInfo.number == Event.number) .outerjoin(Temperature, Message.eventId == Temperature.eventId) .where(*condition) .order_by(Message.createTime.desc()) .offset(skip) .limit(limit) ) messages = await db.execute(query_stmt) messages = messages.mappings().all() back = list() for message in messages: # 获取图片URL列表 query_image_stmt = ( select(Image.imageUrl) .where(Image.eventId == message.get("eventId")) ) images = await db.execute(query_image_stmt) images = images.mappings().all() # 将RowMapping转换为字典 message_dict = dict(message) # 将图片URL列表转换为普通列表 image_urls = [dict(img)["imageUrl"] for img in images] message_dict["imageUrl"] = image_urls commonAlertMessage = CommonAlertMessage(**message_dict) back.append(commonAlertMessage) return back async def get_all_alert_message( self, db: AsyncSession, number: str, offset: int, limit: int ): """ 获取机器人所有的告警 """ query_stmt = ( select(Message.messageId, Message.handle, Message.remark, Message.eventId,Message.createTime,Event.number, Event.name, Event.etypeName) .select_from(Message) .outerjoin(Event, Message.eventId==Event.eventId) .where(Event.number == number) .order_by(Message.createTime.desc()) .offset(offset) .limit(limit) ) results = await db.execute(query_stmt) results = results.mappings().all() data = list() for result in results: t = AllMessageDto(**result) query_image_stmt = ( select(Image.imageUrl) .where(Image.eventId == result.eventId) ) image_list = await db.execute(query_image_stmt) image_list = image_list.scalars().all() t.imageList = image_list t.imagePreview = image_list[0] data.append(t) return data async def handle_ocr_alerts( self, db:AsyncSession, messageIdList: List[int] ): """处理OCR告警消息 Args: db: 数据库会话 messageIdList: 要处理的messageId列表 Returns: bool: 更新是否成功 """ try: # 获取所有需要更新的消息ID # 构建更新语句 update_stmt = ( update(Message) .where(Message.messageId.in_(messageIdList)) .values( handle="1", updateTime=datetime.now() ) ) # 准备更新数据 # 执行更新 await db.execute( update_stmt, # update_data, execution_options={"synchronize_session": False} ) await db.commit() return True except Exception as e: print(f"更新OCR告警消息失败: {str(e)}") await db.rollback() return False async def handle_ocr_alerts_get( self, db: AsyncSession, number: Optional[str] = None ): try: if number and len(number) > 0: update_stmt = ( update(Message) .values( handle="1", updateTime=datetime.now() ) .where(Message.eventId == Event.eventId) # JOIN 条件放在 where() .where(Event.number == number) # 过滤条件 .where(Message.handle == "0") # 过滤条件 ) print(number, "number") else: update_stmt = ( update(Message) .where(Message.handle == "0") .values( handle = "1", updateTime=datetime.now() ) ) # 执行更新 await db.execute( update_stmt, execution_options={"synchronize_session": False} ) await db.commit() return True except Exception as e: print(f"更新告警消息失败: {str(e)}") await db.rollback() return False async def handle_ocr_alert( self, db: AsyncSession, ocrAlertMessageDto: OcrAlertMessageDto, ): try: if ocrAlertMessageDto.number and len(ocrAlertMessageDto.number) > 0: update_stmt = ( update(Message) .where(Message.eventId == Event.eventId) .where(Event.number == ocrAlertMessageDto.number) .where(Message.messageId == ocrAlertMessageDto.messageId) .values( handle = "1", remark = ocrAlertMessageDto.remark, updateTime=datetime.now() ) ) else: update_stmt = ( update(Message) .where(Message.messageId == ocrAlertMessageDto.messageId) .values( handle = "1", remark = ocrAlertMessageDto.remark, updateTime=datetime.now() ) ) await db.execute(update_stmt) await db.commit() return True except Exception as e: print(f"处理单个消息失败: {e}") await db.rollback() return False async def get_alert_detail( self, db: AsyncSession, messageId: int ) ->Optional[OcrAlertMessage]: try: select_stmt = ( select(Message.messageId,Message.eventId ,Message.messageType, Message.eventType, Message.handle, Message.remark, Message.createTime, Event.number,Event.name, Image.imageUrl, Image.localPath, Temperature.temperature) .select_from(Message) .outerjoin(Event, Message.eventId == Event.eventId) .outerjoin(Image, Message.eventId == Image.eventId) .outerjoin(Temperature, Message.eventId == Temperature.eventId) .where(Message.messageId == messageId) ) result = await db.execute(select_stmt) message = result.mappings().first() if message: # 获取图片URL列表 query_image_stmt = ( select(Image.imageUrl) .where(Image.eventId == message.get("eventId")) ) images = await db.execute(query_image_stmt) images = images.mappings().all() # 将RowMapping转换为字典 message_dict = dict(message) # 添加图片URL列表 message_dict["imageUrl"] = images return OcrAlertMessage(**message_dict) return None except Exception as e: print(f"获取告警详情失败: {e}") return None async def get_alert_count( self, db: AsyncSession, handle: str, number: Optional[str] = None ): try: query_stmt = ( select(func.count(Message.messageId)) .select_from(Message) .outerjoin(Event, Message.eventId == Event.eventId) .where(and_(Message.handle == handle, (Event.number==number) if number else True)) ) result = await db.execute(query_stmt) return result.scalar() except Exception as e: print(f"{e}") async def get_robot_count( self, db: AsyncSession, *, # 关键字参数分隔符,代表后面的参数一定要用关键字指定 status: list ): try: query_stmt = ( select(func.count(Robot.robotId)) .where(*status) ) result = await db.execute(query_stmt) return result.scalar() except Exception as e: print(f"{e}") async def get_etype_name_list( self, db: AsyncSession ): query_stmt = ( select(Event.etypeName) .group_by(Event.etypeName) ) results = await db.execute(query_stmt) return results.scalars().all() async def get_robot_task( self, db: AsyncSession, robotId: Optional[str] = None ): try: query_stmt = ( select(Task).where(Task.robotId == robotId) ) result = await db.execute(query_stmt) result = result.mappings().all() # print(result) return [Task(**t) for t in result] except Exception as e: print(f"{e}") async def get_messages_st( self, db: AsyncSession, day: int ): try: now = datetime.now(timezone.utc) start_time = now - timedelta(days=day) query_stmt = ( select( Message.eventType, func.count(Message.eventType).label("count")) .where(and_(start_time<=Message.createTime, Message.createTime <= now)) .group_by(Message.eventType) ) result = await db.execute(query_stmt) return [ {"eventType": row.eventType, "count": row.count} for row in result ] except Exception as e: print(f"{e}") # async def get_robot_list( # self, # db: AsyncSession, # get_robot_dto: GetRobotDto # ): # # 前端登录 # d = kangda._login_front() # duty_list = kangda._get_robot_group(d["tenantInfoId"], d["token"]) event = CRUDEvent(Event)