196 lines
6.4 KiB
Python
196 lines
6.4 KiB
Python
from typing import List, Optional
|
||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
import base64
|
||
import io
|
||
from PIL import Image
|
||
import os
|
||
import uuid
|
||
|
||
|
||
from app.core.database import get_db
|
||
from app.services.imageServices import ImageService
|
||
from app.schemas.image import ImageBase
|
||
from app.schemas.ocr import ImageBase64Request
|
||
from app.util.responseHttp import ResponseUtil
|
||
from app.util.baiduOCR import BaiduOCR, BaiduOCRONNX
|
||
from app.util.yolov8Obj import Yolov8Obj
|
||
|
||
# from app.crud.event import event
|
||
# from app.schemas.event import EventList, EventDetail, EventUpdate, EventQuery, TestEvent
|
||
|
||
baiduOCR = BaiduOCR()
|
||
baiduOcrOnnx = BaiduOCRONNX()
|
||
yolov8Obj = Yolov8Obj()
|
||
|
||
router = APIRouter(prefix="/api/v1", tags=["ocr"])
|
||
|
||
@router.get("/hello")
|
||
async def get_hello():
|
||
|
||
return {"data":"hello"}
|
||
|
||
@router.get("/test_select")
|
||
async def test_select(
|
||
db: AsyncSession = Depends(get_db),
|
||
image_query: ImageBase = None
|
||
):
|
||
"""
|
||
测试查询
|
||
"""
|
||
result = await ImageService.get_image_list(db,image_query)
|
||
return ResponseUtil.success(msg="success", data=result)
|
||
|
||
|
||
@router.get("/test_ocr")
|
||
async def test_ocr(
|
||
# image_path: str = None
|
||
):
|
||
"""
|
||
测试OCR
|
||
"""
|
||
|
||
image_path = '/home/admin-root/haotian/康达瑞贝斯机器狗/data_image/001读表图片/2c7cc83019e7388a7041101da92c9829_frame_000000.jpg'
|
||
|
||
|
||
result = baiduOCR.ocr(image_path)
|
||
# print(result)
|
||
return ResponseUtil.success(msg="success", data=result)
|
||
# return ResponseUtil.success(msg="success")
|
||
|
||
|
||
@router.post("/ocr_from_base64")
|
||
async def ocr_from_base64(request: ImageBase64Request):
|
||
"""
|
||
从base64图片数据进行OCR识别
|
||
"""
|
||
try:
|
||
# 移除base64数据的前缀(如果有)
|
||
image_base64 = request.image_base64
|
||
if image_base64.startswith('data:image'):
|
||
# 格式如: data:image/jpeg;base64,/9j/4AAQSkZJRgABAQAAAQ...
|
||
image_base64 = image_base64.split(',')[1]
|
||
|
||
# 解码base64数据
|
||
image_data = base64.b64decode(image_base64)
|
||
|
||
# 将字节数据转换为PIL Image对象
|
||
image = Image.open(io.BytesIO(image_data))
|
||
|
||
# 创建临时文件路径
|
||
temp_dir = "./tmp/ocr_images"
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
temp_filename = f"{uuid.uuid4()}.{request.image_type or 'jpg'}"
|
||
temp_path = os.path.join(temp_dir, temp_filename)
|
||
|
||
# 保存图片到临时文件
|
||
image.save(temp_path)
|
||
|
||
# 使用PaddleOCR进行识别
|
||
result = baiduOCR.ocr(temp_path)
|
||
|
||
# 删除临时文件
|
||
os.remove(temp_path)
|
||
|
||
return ResponseUtil.success(msg="OCR识别成功", data=result)
|
||
except Exception as e:
|
||
return ResponseUtil.error(msg=f"OCR识别失败: {str(e)}", data=None)
|
||
|
||
|
||
@router.post("/ocr_onnx_from_base64")
|
||
async def ocr_from_base64(request: ImageBase64Request):
|
||
"""
|
||
从base64图片数据进行OCR识别
|
||
"""
|
||
try:
|
||
# 移除base64数据的前缀(如果有)
|
||
image_base64 = request.image_base64
|
||
if image_base64.startswith('data:image'):
|
||
# 格式如: data:image/jpeg;base64,/9j/4AAQSkZJRgABAQAAAQ...
|
||
image_base64 = image_base64.split(',')[1]
|
||
|
||
# 解码base64数据
|
||
image_data = base64.b64decode(image_base64)
|
||
|
||
# 将字节数据转换为PIL Image对象
|
||
image = Image.open(io.BytesIO(image_data))
|
||
|
||
# 创建临时文件路径
|
||
temp_dir = "./tmp/ocr_images"
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
temp_filename = f"{uuid.uuid4()}.{request.image_type or 'jpg'}"
|
||
temp_path = os.path.join(temp_dir, temp_filename)
|
||
|
||
# 保存图片到临时文件
|
||
image.save(temp_path)
|
||
|
||
# 使用PaddleOCR进行识别
|
||
result = baiduOcrOnnx.ocr(temp_path)
|
||
print(result)
|
||
|
||
# 删除临时文件
|
||
# os.remove(temp_path)
|
||
|
||
# return ResponseUtil.success(msg="OCR识别成功", data=[result['text'], result['confidence']])
|
||
return ResponseUtil.success(msg="OCR识别成功", data=result)
|
||
except Exception as e:
|
||
return ResponseUtil.error(msg=f"OCR识别失败: {str(e)}", data=None)
|
||
|
||
|
||
@router.post("/detect_from_base64_0")
|
||
async def ocr_from_base64(request: ImageBase64Request):
|
||
""" 从吧色图64图片进行目标检测, 检测是否侵占消防区域"""
|
||
|
||
try:
|
||
image_base64 = request.image_base64
|
||
if image_base64.startswith('data:image'):
|
||
image_base64 = image_base64.split(',')[1]
|
||
image_data = base64.b64decode(image_base64)
|
||
|
||
image = Image.open(io.BytesIO(image_data))
|
||
|
||
temp_dir = "./tmp/detect_images"
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
temp_filename = f"{uuid.uuid4()}.{request.image_type or 'jpg'}"
|
||
temp_path = os.path.join(temp_dir, temp_filename)
|
||
|
||
image.save(temp_path)
|
||
|
||
cls, conf, coords = yolov8Obj.detect(temp_path)
|
||
|
||
os.remove(temp_path)
|
||
|
||
return ResponseUtil.success(msg="侵占消防区域目标检测成功,是否有遮挡", data=(len(cls)==0))
|
||
|
||
except Exception as e:
|
||
return ResponseUtil.error(msg=f"检测是否侵占消防区域失败: {str(e)}", data=None)
|
||
|
||
|
||
@router.post("/detect_from_base64_1")
|
||
async def ocr_from_base64(request: ImageBase64Request):
|
||
""" 从吧色图64图片进行目标检测, 检测是否存在灭火器"""
|
||
|
||
try:
|
||
image_base64 = request.image_base64
|
||
if image_base64.startswith('data:image'):
|
||
image_base64 = image_base64.split(',')[1]
|
||
image_data = base64.b64decode(image_base64)
|
||
|
||
image = Image.open(io.BytesIO(image_data))
|
||
|
||
temp_dir = "./tmp/detect_images"
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
temp_filename = f"{uuid.uuid4()}.{request.image_type or 'jpg'}"
|
||
temp_path = os.path.join(temp_dir, temp_filename)
|
||
|
||
image.save(temp_path)
|
||
|
||
cls, conf, coords = yolov8Obj.detect(temp_path)
|
||
|
||
os.remove(temp_path)
|
||
|
||
return ResponseUtil.success(msg="灭火器目标检测成功,是否存在灭火器", data=(len(cls)!=0 and 0 in cls))
|
||
# return ResponseUtil.success(msg="目标检测成功", data=cls)
|
||
|
||
except Exception as e:
|
||
return ResponseUtil.error(msg=f"检测是否存在灭火器失败: {str(e)}", data=None) |