kangda/app/crud/base.py

81 lines
2.5 KiB
Python

from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from app.core.database import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD对象与SQLAlchemy模型类一起使用
:param model: SQLAlchemy模型类
"""
self.model = model
# 根据id获取对象
async def get(self, db: AsyncSession, id: Any) -> Optional[ModelType]:
"""
通过ID获取对象
"""
query = select(self.model).where(self.model.id == id)
result = await db.execute(query)
return result.scalar_one_or_none()
# 分页查询
async def get_multi(
self, db: AsyncSession, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
"""
获取多个对象
"""
query = select(self.model).offset(skip).limit(limit)
result = await db.execute(query)
return result.scalars().all()
async def create(self, db: AsyncSession, *, obj_in: CreateSchemaType) -> ModelType:
"""
创建对象
"""
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def update(
self,
db: AsyncSession,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
"""
更新对象
"""
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
async def remove(self, db: AsyncSession, *, id: Any) -> ModelType:
"""
删除对象
"""
obj = await self.get(db=db, id=id)
await db.delete(obj)
await db.commit()
return obj