import asyncio import uuid from datetime import datetime from sqlalchemy import select from app.core.database import async_session, Base, engine from app.models.models import Event, Image, Temperature, ProcessLog from sqlalchemy.orm import selectinload # 开发环境清空并创建数据.. async def init_db(): """初始化数据库""" async with engine.begin() as conn: # 删除已注册的表 # await conn.run_sync(Base.metadata.drop_all) # 创建表 await conn.run_sync(Base.metadata.create_all) async def test_create(): """测试创建操作""" print("\n=== 测试创建操作 ===") # 创建事件 event = Event( eventId=str(uuid.uuid4())[:30], number="TEST001", name="测试事件", etype="TEST", etypeName="测试类型", insDate=datetime.now(), insDateShow=datetime.now() ) print("测试用例Id", event.eventId) async with async_session() as session: session.add(event) await session.commit() await session.refresh(event) print(f"创建事件成功: {event.eventId}") # 创建图片 image = Image( eventId=event.eventId, imageUrl="http://example.com/test.jpg", localPath="/tmp/test.jpg" ) session.add(image) await session.commit() await session.refresh(image) print(f"创建图片成功: {image.imageId}") # 创建温度记录 temperature = Temperature( eventId=event.eventId, imageId=image.imageId, temperature="25.5", confidence="0.95" ) session.add(temperature) await session.commit() await session.refresh(temperature) print(f"创建温度记录成功: {temperature.tempId}") # 创建处理日志 process_log = ProcessLog( eventId=event.eventId, processStatus=1, errorMessage="处理成功" ) session.add(process_log) await session.commit() await session.refresh(process_log) print(f"创建处理日志成功: {process_log.logId}") return event.eventId async def test_read(event_id): """测试读取操作""" print("\n=== 测试读取操作 ===") async with async_session() as session: # 查询事件, 这括号可以没有, 完全是为了书写方便, 在一句话中能换行而不用加\. query = ( select(Event).where(Event.eventId == event_id).options( selectinload(Event.images), selectinload(Event.temperatures), selectinload(Event.process_logs) ) ) result = await session.execute(query) event = result.scalar_one_or_none() print(f"查询事件: {event.eventId if event else 'Not Found'}") if event: # 查询关联的图片 print(f"关联图片数量: {len(event.images)}") for image in event.images: print(f"图片ID: {image.imageId}, URL: {image.imageUrl}") # 查询关联的温度记录 print(f"关联温度记录数量: {len(event.temperatures)}") for temp in event.temperatures: print(f"温度ID: {temp.tempId}, 温度值: {temp.temperature}") # 查询关联的处理日志 print(f"关联处理日志数量: {len(event.process_logs)}") for log in event.process_logs: print(f"日志ID: {log.logId}, 状态: {log.processStatus}") async def test_update(event_id): """测试更新操作""" print("\n=== 测试更新操作 ===") async with async_session() as session: # 更新事件 query = select(Event).where(Event.eventId == event_id).options( selectinload(Event.images), selectinload(Event.temperatures), selectinload(Event.process_logs) ) result = await session.execute(query) event = result.scalar_one_or_none() if event: event.name = "更新后的测试事件" event.etypeName = "更新后的测试类型" await session.commit() print(f"更新事件成功: {event.eventId}") # 更新温度记录 for temp in event.temperatures: temp.temperature = "26.5" temp.confidence = "0.98" await session.commit() print("更新温度记录成功") async def test_delete(event_id): """测试删除操作""" print("\n=== 测试删除操作 ===") async with async_session() as session: # 删除事件(级联删除关联数据) query = select(Event).where(Event.eventId == event_id) result = await session.execute(query) event = result.scalar_one_or_none() if event: await session.delete(event) await session.commit() print(f"删除事件成功: {event_id}") async def test_transaction(): """测试事务操作""" print("\n=== 测试事务操作 ===") async with async_session() as session: try: # 开始事务 event = Event( eventId=str(uuid.uuid4()), number="TEST002", name="事务测试事件", etype="TEST", etypeName="测试类型", insDate=datetime.now(), insDateShow=datetime.now() ) session.add(event) await session.flush() # 模拟错误 raise Exception("模拟事务错误") await session.commit() except Exception as e: await session.rollback() print(f"事务回滚成功: {str(e)}") async def main(): """主测试函数""" print("开始数据库测试...") # 初始化数据库 await init_db() print("数据库初始化完成") # # 测试创建 # event_id = await test_create() # # 测试读取 # await test_read(event_id) # # 测试更新 # await test_update(event_id) # # 测试读取更新后的数据 # await test_read(event_id) # # 测试事务 # await test_transaction() # # 测试删除 # await test_delete(event_id) # print("\n数据库测试完成") if __name__ == "__main__": asyncio.run(main())