kangda/test_db.py

211 lines
6.5 KiB
Python

import asyncio
import uuid
from datetime import datetime
from sqlalchemy import select
from app.core.database import async_session, Base, engine
from app.models.models import Event, Image, Temperature, ProcessLog
from sqlalchemy.orm import selectinload
# 开发环境清空并创建数据..
async def init_db():
"""初始化数据库"""
async with engine.begin() as conn:
# 删除已注册的表
await conn.run_sync(Base.metadata.drop_all)
# 创建表
await conn.run_sync(Base.metadata.create_all)
async def test_create():
"""测试创建操作"""
print("\n=== 测试创建操作 ===")
# 创建事件
event = Event(
eventId=str(uuid.uuid4())[:30],
number="TEST001",
name="测试事件",
etype="TEST",
etypeName="测试类型",
insDate=datetime.now(),
insDateShow=datetime.now()
)
print("测试用例Id", event.eventId)
async with async_session() as session:
session.add(event)
await session.commit()
await session.refresh(event)
print(f"创建事件成功: {event.eventId}")
# 创建图片
image = Image(
eventId=event.eventId,
imageUrl="http://example.com/test.jpg",
localPath="/tmp/test.jpg"
)
session.add(image)
await session.commit()
await session.refresh(image)
print(f"创建图片成功: {image.imageId}")
# 创建温度记录
temperature = Temperature(
eventId=event.eventId,
imageId=image.imageId,
temperature="25.5",
confidence="0.95"
)
session.add(temperature)
await session.commit()
await session.refresh(temperature)
print(f"创建温度记录成功: {temperature.tempId}")
# 创建处理日志
process_log = ProcessLog(
eventId=event.eventId,
processStatus=1,
errorMessage="处理成功"
)
session.add(process_log)
await session.commit()
await session.refresh(process_log)
print(f"创建处理日志成功: {process_log.logId}")
return event.eventId
async def test_read(event_id):
"""测试读取操作"""
print("\n=== 测试读取操作 ===")
async with async_session() as session:
# 查询事件, 这括号可以没有, 完全是为了书写方便, 在一句话中能换行而不用加\.
query = (
select(Event).where(Event.eventId == event_id).options(
selectinload(Event.images),
selectinload(Event.temperatures),
selectinload(Event.process_logs)
)
)
result = await session.execute(query)
event = result.scalar_one_or_none()
print(f"查询事件: {event.eventId if event else 'Not Found'}")
if event:
# 查询关联的图片
print(f"关联图片数量: {len(event.images)}")
for image in event.images:
print(f"图片ID: {image.imageId}, URL: {image.imageUrl}")
# 查询关联的温度记录
print(f"关联温度记录数量: {len(event.temperatures)}")
for temp in event.temperatures:
print(f"温度ID: {temp.tempId}, 温度值: {temp.temperature}")
# 查询关联的处理日志
print(f"关联处理日志数量: {len(event.process_logs)}")
for log in event.process_logs:
print(f"日志ID: {log.logId}, 状态: {log.processStatus}")
async def test_update(event_id):
"""测试更新操作"""
print("\n=== 测试更新操作 ===")
async with async_session() as session:
# 更新事件
query = select(Event).where(Event.eventId == event_id).options(
selectinload(Event.images),
selectinload(Event.temperatures),
selectinload(Event.process_logs)
)
result = await session.execute(query)
event = result.scalar_one_or_none()
if event:
event.name = "更新后的测试事件"
event.etypeName = "更新后的测试类型"
await session.commit()
print(f"更新事件成功: {event.eventId}")
# 更新温度记录
for temp in event.temperatures:
temp.temperature = "26.5"
temp.confidence = "0.98"
await session.commit()
print("更新温度记录成功")
async def test_delete(event_id):
"""测试删除操作"""
print("\n=== 测试删除操作 ===")
async with async_session() as session:
# 删除事件(级联删除关联数据)
query = select(Event).where(Event.eventId == event_id)
result = await session.execute(query)
event = result.scalar_one_or_none()
if event:
await session.delete(event)
await session.commit()
print(f"删除事件成功: {event_id}")
async def test_transaction():
"""测试事务操作"""
print("\n=== 测试事务操作 ===")
async with async_session() as session:
try:
# 开始事务
event = Event(
eventId=str(uuid.uuid4()),
number="TEST002",
name="事务测试事件",
etype="TEST",
etypeName="测试类型",
insDate=datetime.now(),
insDateShow=datetime.now()
)
session.add(event)
await session.flush()
# 模拟错误
raise Exception("模拟事务错误")
await session.commit()
except Exception as e:
await session.rollback()
print(f"事务回滚成功: {str(e)}")
async def main():
"""主测试函数"""
print("开始数据库测试...")
# 初始化数据库
# await init_db()
print("数据库初始化完成")
# 测试创建
event_id = await test_create()
# 测试读取
await test_read(event_id)
# 测试更新
await test_update(event_id)
# 测试读取更新后的数据
await test_read(event_id)
# 测试事务
await test_transaction()
# 测试删除
await test_delete(event_id)
print("\n数据库测试完成")
if __name__ == "__main__":
asyncio.run(main())