516 lines
18 KiB
Python
516 lines
18 KiB
Python
import cv2
|
||
import numpy as np
|
||
from rknn.api import RKNN
|
||
import math
|
||
import time
|
||
|
||
class PaddleOCRRKNN:
|
||
def __init__(self, det_model_path, rec_model_path, target='rk3588'):
|
||
"""
|
||
初始化RKNN推理器
|
||
|
||
Args:
|
||
det_model_path: 检测模型路径 (det.rknn)
|
||
rec_model_path: 识别模型路径 (rec.rknn)
|
||
target: 目标平台 ('rk3588', 'rk3566', 'rk3568', 'rv1103', 'rv1106', 'simulator')
|
||
"""
|
||
self.target = target
|
||
|
||
# 初始化检测模型
|
||
self.det_rknn = RKNN(verbose=True)
|
||
ret = self.det_rknn.load_rknn(det_model_path)
|
||
if ret != 0:
|
||
print(f'Load detection RKNN model failed! Error code: {ret}')
|
||
print('Please check if the model file exists and is valid')
|
||
exit(ret)
|
||
|
||
# 指定运行时平台
|
||
print(f'Initializing detection model on target: {target}')
|
||
ret = self.det_rknn.init_runtime(
|
||
target=target,
|
||
device_id=None,
|
||
perf_debug=False,
|
||
eval_mem=False,
|
||
async_mode=False,
|
||
core_mask=RKNN.NPU_CORE_AUTO # 自动选择NPU核心
|
||
)
|
||
if ret != 0:
|
||
print(f'Init detection runtime failed! Error code: {ret}')
|
||
print('Please check if you are running on the correct platform')
|
||
if target == 'simulator':
|
||
print('Note: Simulator mode requires different setup')
|
||
exit(ret)
|
||
print('Detection model loaded successfully')
|
||
|
||
# 初始化识别模型
|
||
self.rec_rknn = RKNN(verbose=True)
|
||
ret = self.rec_rknn.load_rknn(rec_model_path)
|
||
if ret != 0:
|
||
print(f'Load recognition RKNN model failed! Error code: {ret}')
|
||
print('Please check if the model file exists and is valid')
|
||
exit(ret)
|
||
|
||
# 指定运行时平台
|
||
print(f'Initializing recognition model on target: {target}')
|
||
ret = self.rec_rknn.init_runtime(
|
||
target=target,
|
||
device_id=None,
|
||
perf_debug=False,
|
||
eval_mem=False,
|
||
async_mode=False,
|
||
core_mask=RKNN.NPU_CORE_AUTO # 自动选择NPU核心
|
||
)
|
||
if ret != 0:
|
||
print(f'Init recognition runtime failed! Error code: {ret}')
|
||
print('Please check if you are running on the correct platform')
|
||
exit(ret)
|
||
print('Recognition model loaded successfully')
|
||
|
||
# 字符集(根据您的模型调整)
|
||
self.character = ['blank', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+',
|
||
',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8',
|
||
'9', ':', ';', '<', '=', '>', '?', '@', 'A', 'B', 'C', 'D', 'E',
|
||
'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R',
|
||
'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', '\\', ']', '^', '_',
|
||
'`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
|
||
'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y',
|
||
'z', '{', '|', '}', '~'] + [chr(i) for i in range(19968, 40870)] # 中文字符
|
||
|
||
def preprocess_det(self, img, input_shape=(640, 640)):
|
||
"""
|
||
检测模型的图像预处理 - 适配RKNN
|
||
"""
|
||
h, w, _ = img.shape
|
||
target_h, target_w = input_shape
|
||
|
||
# 计算缩放比例 - 保持宽高比
|
||
ratio_h = target_h / h
|
||
ratio_w = target_w / w
|
||
ratio = min(ratio_h, ratio_w)
|
||
|
||
# 计算缩放后的尺寸
|
||
new_h = int(h * ratio)
|
||
new_w = int(w * ratio)
|
||
|
||
# 调整图像大小
|
||
resized_img = cv2.resize(img, (new_w, new_h))
|
||
|
||
# 创建目标尺寸的图像,用灰色填充
|
||
padded_img = np.ones((target_h, target_w, 3), dtype=np.uint8) * 114
|
||
|
||
# 计算居中位置
|
||
top = (target_h - new_h) // 2
|
||
left = (target_w - new_w) // 2
|
||
|
||
# 将缩放后的图像放到居中位置
|
||
padded_img[top:top+new_h, left:left+new_w] = resized_img
|
||
|
||
# RKNN通常需要uint8输入(如果量化时使用了uint8)
|
||
# 如果您的模型使用float输入,请取消下面的注释并注释掉return语句
|
||
# padded_img = padded_img.astype(np.float32)
|
||
# padded_img = (padded_img / 255.0 - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225]
|
||
|
||
return padded_img, ratio, (top, left)
|
||
|
||
def preprocess_rec(self, img, input_shape=(320, 48)):
|
||
"""
|
||
识别模型的图像预处理 - 适配RKNN
|
||
"""
|
||
target_w, target_h = input_shape # 注意:宽度在前
|
||
|
||
h, w = img.shape[:2]
|
||
|
||
# 计算缩放比例,保持宽高比
|
||
ratio_h = target_h / h
|
||
ratio_w = target_w / w
|
||
ratio = min(ratio_h, ratio_w)
|
||
|
||
# 计算缩放后的尺寸
|
||
new_h = int(h * ratio)
|
||
new_w = int(w * ratio)
|
||
|
||
# 调整图像大小
|
||
resized_image = cv2.resize(img, (new_w, new_h))
|
||
|
||
# 创建目标尺寸的图像,用黑色填充
|
||
padded_image = np.zeros((target_h, target_w, 3), dtype=np.uint8)
|
||
|
||
# 将缩放后的图像放到左上角(识别模型通常左对齐)
|
||
padded_image[:new_h, :new_w] = resized_image
|
||
|
||
# RKNN通常需要uint8输入(如果量化时使用了uint8)
|
||
# 如果您的模型使用float输入,请取消下面的注释并注释掉return语句
|
||
# padded_image = padded_image.astype(np.float32)
|
||
# padded_image = (padded_image / 255.0 - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225]
|
||
|
||
return padded_image
|
||
|
||
def postprocess_det_boxes(self, dt_boxes, ratio, padding_info, ori_shape):
|
||
"""
|
||
检测结果后处理 - 适配固定输入形状
|
||
"""
|
||
if dt_boxes is None or len(dt_boxes) == 0:
|
||
return None
|
||
|
||
ori_h, ori_w = ori_shape
|
||
top, left = padding_info
|
||
|
||
# 将坐标从模型输出空间转换回原图空间
|
||
dt_boxes[:, :, 0] = (dt_boxes[:, :, 0] - left) / ratio
|
||
dt_boxes[:, :, 1] = (dt_boxes[:, :, 1] - top) / ratio
|
||
|
||
# 裁剪到原图范围内
|
||
dt_boxes[:, :, 0] = np.clip(dt_boxes[:, :, 0], 0, ori_w)
|
||
dt_boxes[:, :, 1] = np.clip(dt_boxes[:, :, 1], 0, ori_h)
|
||
|
||
return dt_boxes
|
||
|
||
def boxes_from_bitmap(self, pred, bitmap, dest_width, dest_height, max_candidates=1000, box_thresh=0.6):
|
||
"""
|
||
从位图中提取文本框
|
||
"""
|
||
bitmap = bitmap.astype(np.uint8)
|
||
height, width = bitmap.shape
|
||
|
||
# 查找轮廓
|
||
contours, _ = cv2.findContours(bitmap, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
|
||
|
||
num_contours = min(len(contours), max_candidates)
|
||
boxes = []
|
||
scores = []
|
||
|
||
for i in range(num_contours):
|
||
contour = contours[i]
|
||
points, sside = self.get_mini_boxes(contour)
|
||
if sside < 5:
|
||
continue
|
||
|
||
points = np.array(points)
|
||
score = self.box_score_fast(pred, points.reshape(-1, 2))
|
||
if box_thresh > score:
|
||
continue
|
||
|
||
# 扩展box
|
||
box = self.unclip(points, 1.5).reshape(-1, 1, 2)
|
||
box, sside = self.get_mini_boxes(box)
|
||
if sside < 5 + 2:
|
||
continue
|
||
|
||
box = np.array(box)
|
||
box[:, 0] = np.clip(box[:, 0] / width * dest_width, 0, dest_width)
|
||
box[:, 1] = np.clip(box[:, 1] / height * dest_height, 0, dest_height)
|
||
|
||
boxes.append(box.astype(np.int16))
|
||
scores.append(score)
|
||
|
||
return np.array(boxes), scores
|
||
|
||
def get_mini_boxes(self, contour):
|
||
"""获取最小外接矩形"""
|
||
bounding_box = cv2.minAreaRect(contour)
|
||
points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0])
|
||
|
||
index_1, index_2, index_3, index_4 = 0, 1, 2, 3
|
||
if points[1][1] > points[0][1]:
|
||
index_1 = 0
|
||
index_4 = 1
|
||
else:
|
||
index_1 = 1
|
||
index_4 = 0
|
||
|
||
if points[3][1] > points[2][1]:
|
||
index_2 = 2
|
||
index_3 = 3
|
||
else:
|
||
index_2 = 3
|
||
index_3 = 2
|
||
|
||
box = [points[index_1], points[index_2], points[index_3], points[index_4]]
|
||
return box, min(bounding_box[1])
|
||
|
||
def box_score_fast(self, bitmap, _box):
|
||
"""快速计算box得分"""
|
||
h, w = bitmap.shape[:2]
|
||
box = _box.copy()
|
||
xmin = np.clip(np.floor(box[:, 0].min()).astype(int), 0, w - 1)
|
||
xmax = np.clip(np.ceil(box[:, 0].max()).astype(int), 0, w - 1)
|
||
ymin = np.clip(np.floor(box[:, 1].min()).astype(int), 0, h - 1)
|
||
ymax = np.clip(np.ceil(box[:, 1].max()).astype(int), 0, h - 1)
|
||
|
||
mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8)
|
||
box[:, 0] = box[:, 0] - xmin
|
||
box[:, 1] = box[:, 1] - ymin
|
||
cv2.fillPoly(mask, box.reshape(1, -1, 2).astype(np.int32), 1)
|
||
|
||
return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0]
|
||
|
||
def unclip(self, box, unclip_ratio):
|
||
"""扩展文本框"""
|
||
try:
|
||
from shapely.geometry import Polygon
|
||
import pyclipper
|
||
|
||
poly = Polygon(box)
|
||
distance = poly.area * unclip_ratio / poly.length
|
||
|
||
offset = pyclipper.PyclipperOffset()
|
||
offset.AddPath(box, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON)
|
||
expanded = offset.Execute(distance)
|
||
|
||
if len(expanded) == 0:
|
||
return box
|
||
else:
|
||
return np.array(expanded[0])
|
||
except:
|
||
# 如果shapely/pyclipper不可用,使用简单的膨胀
|
||
return box
|
||
|
||
def decode_rec_result(self, preds_prob):
|
||
"""
|
||
解码识别结果
|
||
"""
|
||
preds_idx = np.argmax(preds_prob, axis=1)
|
||
preds_prob_max = np.max(preds_prob, axis=1)
|
||
|
||
# CTC解码
|
||
last_idx = 0
|
||
preds_text = []
|
||
preds_conf = []
|
||
|
||
for i, idx in enumerate(preds_idx):
|
||
if idx != last_idx and idx != 0: # 0是blank
|
||
if idx < len(self.character):
|
||
preds_text.append(self.character[idx])
|
||
preds_conf.append(preds_prob_max[i])
|
||
last_idx = idx
|
||
|
||
text = ''.join(preds_text)
|
||
conf = np.mean(preds_conf) if preds_conf else 0.0
|
||
|
||
return text, conf
|
||
|
||
def detect_text(self, image):
|
||
"""
|
||
文本检测 - RKNN版本
|
||
"""
|
||
ori_h, ori_w = image.shape[:2]
|
||
|
||
# 预处理
|
||
det_img, ratio, padding_info = self.preprocess_det(image)
|
||
|
||
# RKNN推理
|
||
start_time = time.time()
|
||
outputs = self.det_rknn.inference(inputs=[det_img])
|
||
det_time = time.time() - start_time
|
||
print(f"Detection inference time: {det_time:.3f}s")
|
||
|
||
# 获取输出 (通常是第一个输出)
|
||
det_output = outputs[0]
|
||
|
||
# 确保输出形状正确
|
||
if len(det_output.shape) == 4:
|
||
mask = det_output[0, 0, :, :]
|
||
else:
|
||
mask = det_output[0, :, :] if len(det_output.shape) == 3 else det_output
|
||
|
||
# 后处理
|
||
threshold = 0.3
|
||
bitmap = (mask > threshold).astype(np.uint8) * 255
|
||
|
||
# 从位图中提取文本框(坐标是在640x640空间中的)
|
||
boxes, scores = self.boxes_from_bitmap(mask, bitmap, 640, 640)
|
||
|
||
# 将坐标转换回原图空间
|
||
if len(boxes) > 0:
|
||
boxes = self.postprocess_det_boxes(boxes, ratio, padding_info, (ori_h, ori_w))
|
||
|
||
return boxes, scores
|
||
|
||
def recognize_text(self, image):
|
||
"""
|
||
文本识别 - RKNN版本
|
||
"""
|
||
# 预处理
|
||
rec_img = self.preprocess_rec(image)
|
||
|
||
# RKNN推理
|
||
start_time = time.time()
|
||
outputs = self.rec_rknn.inference(inputs=[rec_img])
|
||
rec_time = time.time() - start_time
|
||
print(f"Recognition inference time: {rec_time:.3f}s")
|
||
|
||
# 获取输出
|
||
rec_output = outputs[0]
|
||
|
||
# 确保输出维度正确
|
||
if len(rec_output.shape) == 3:
|
||
rec_result = rec_output[0] # 移除batch维度
|
||
else:
|
||
rec_result = rec_output
|
||
|
||
# 解码
|
||
text, conf = self.decode_rec_result(rec_result)
|
||
|
||
return text, conf
|
||
|
||
def get_rotate_crop_image(self, img, points):
|
||
"""
|
||
根据四个点坐标裁剪并矫正图像
|
||
"""
|
||
img_crop_width = int(
|
||
max(
|
||
np.linalg.norm(points[0] - points[1]),
|
||
np.linalg.norm(points[2] - points[3])))
|
||
img_crop_height = int(
|
||
max(
|
||
np.linalg.norm(points[0] - points[3]),
|
||
np.linalg.norm(points[1] - points[2])))
|
||
pts_std = np.float32([[0, 0], [img_crop_width, 0],
|
||
[img_crop_width, img_crop_height],
|
||
[0, img_crop_height]])
|
||
M = cv2.getPerspectiveTransform(points, pts_std)
|
||
dst_img = cv2.warpPerspective(
|
||
img,
|
||
M, (img_crop_width, img_crop_height),
|
||
borderMode=cv2.BORDER_REPLICATE,
|
||
flags=cv2.INTER_CUBIC)
|
||
dst_img_height, dst_img_width = dst_img.shape[0:2]
|
||
if dst_img_height * 1.0 / dst_img_width >= 1.5:
|
||
dst_img = np.rot90(dst_img)
|
||
return dst_img
|
||
|
||
def ocr(self, image_path):
|
||
"""
|
||
完整的OCR流程 - RKNN版本
|
||
"""
|
||
# 读取图像
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
print(f"Cannot read image: {image_path}")
|
||
return []
|
||
|
||
print(f"Processing image: {image.shape}")
|
||
|
||
# 1. 文本检测
|
||
dt_boxes, scores = self.detect_text(image)
|
||
|
||
if dt_boxes is None or len(dt_boxes) == 0:
|
||
print("No text boxes detected")
|
||
return []
|
||
|
||
print(f"Detected {len(dt_boxes)} text boxes")
|
||
|
||
# 2. 文本识别
|
||
ocr_results = []
|
||
for i, box in enumerate(dt_boxes):
|
||
# 裁剪文本区域
|
||
box_points = box.astype(np.float32)
|
||
crop_img = self.get_rotate_crop_image(image, box_points)
|
||
|
||
# 识别文本
|
||
text, conf = self.recognize_text(crop_img)
|
||
|
||
if conf > 0.5: # 置信度过滤
|
||
ocr_results.append({
|
||
'text': text,
|
||
'confidence': conf,
|
||
'box': box.tolist(),
|
||
'score': scores[i] if i < len(scores) else 0.0
|
||
})
|
||
print(f"Box {i}: {text} (conf: {conf:.3f})")
|
||
|
||
return ocr_results
|
||
|
||
def release(self):
|
||
"""
|
||
释放RKNN资源
|
||
"""
|
||
if hasattr(self, 'det_rknn'):
|
||
self.det_rknn.release()
|
||
if hasattr(self, 'rec_rknn'):
|
||
self.rec_rknn.release()
|
||
|
||
|
||
# 使用示例
|
||
def main():
|
||
# 初始化OCR - 可以指定不同的target平台
|
||
print("Initializing PaddleOCR RKNN...")
|
||
|
||
# 根据您的硬件平台选择:
|
||
# 'rk3588' - RK3588 (默认)
|
||
# 'rk3566' - RK3566
|
||
# 'rk3568' - RK3568
|
||
# 'rv1103' - RV1103
|
||
# 'rv1106' - RV1106
|
||
# 'simulator' - 仿真器模式(用于开发调试)
|
||
|
||
try:
|
||
ocr = PaddleOCRRKNN('/home/admin-root/haotian/康达瑞贝斯机器狗/det_shape.rknn', '/home/admin-root/haotian/康达瑞贝斯机器狗/rec_shape.rknn', target='rk3588')
|
||
except Exception as e:
|
||
print(f"Failed to initialize RKNN models: {e}")
|
||
print("Trying alternative initialization...")
|
||
# 如果失败,尝试其他选项
|
||
try:
|
||
ocr = PaddleOCRRKNN('det.rknn', 'rec.rknn', target='simulator')
|
||
except Exception as e2:
|
||
print(f"Alternative initialization also failed: {e2}")
|
||
return
|
||
|
||
try:
|
||
# 执行OCR
|
||
image_path = '/home/admin-root/haotian/康达瑞贝斯机器狗/data_image/001读表图片/3aee64cc1f90d93a5a45979f7b17cb4b_frame_001460.jpg'
|
||
|
||
# 检查图像文件是否存在
|
||
import os
|
||
if not os.path.exists(image_path):
|
||
print(f"Image file not found: {image_path}")
|
||
print("Please provide a valid image path")
|
||
return
|
||
|
||
results = ocr.ocr(image_path)
|
||
|
||
# 打印结果
|
||
print(f"\n=== OCR Results ({len(results)} items) ===")
|
||
for i, result in enumerate(results):
|
||
print(f"\n[{i+1}]")
|
||
print(f"Text: {result['text']}")
|
||
print(f"Confidence: {result['confidence']:.3f}")
|
||
print(f"Detection Score: {result['score']:.3f}")
|
||
print(f"Box: {result['box']}")
|
||
|
||
# 可视化结果
|
||
if results:
|
||
visualize_results(image_path, results)
|
||
else:
|
||
print("No text detected in the image")
|
||
|
||
except Exception as e:
|
||
print(f"OCR processing failed: {e}")
|
||
finally:
|
||
# 释放资源
|
||
ocr.release()
|
||
print("RKNN resources released")
|
||
|
||
def visualize_results(image_path, results):
|
||
"""
|
||
可视化OCR结果
|
||
"""
|
||
image = cv2.imread(image_path)
|
||
|
||
for i, result in enumerate(results):
|
||
box = np.array(result['box'], dtype=np.int32)
|
||
cv2.polylines(image, [box], True, (0, 255, 0), 2)
|
||
|
||
# 在框上方显示文本和序号
|
||
text_display = f"{i+1}: {result['text'][:20]}..."
|
||
cv2.putText(image, text_display,
|
||
(box[0][0], box[0][1] - 10),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
|
||
|
||
# 保存结果图像
|
||
output_path = 'ocr_result_rknn.jpg'
|
||
cv2.imwrite(output_path, image)
|
||
print(f"Result image saved to: {output_path}")
|
||
|
||
if __name__ == "__main__":
|
||
main() |