import cv2 import numpy as np from rknn.api import RKNN import math import time class PaddleOCRRKNN: def __init__(self, det_model_path, rec_model_path, target='rk3588'): """ 初始化RKNN推理器 Args: det_model_path: 检测模型路径 (det.rknn) rec_model_path: 识别模型路径 (rec.rknn) target: 目标平台 ('rk3588', 'rk3566', 'rk3568', 'rv1103', 'rv1106', 'simulator') """ self.target = target # 初始化检测模型 self.det_rknn = RKNN(verbose=True) ret = self.det_rknn.load_rknn(det_model_path) if ret != 0: print(f'Load detection RKNN model failed! Error code: {ret}') print('Please check if the model file exists and is valid') exit(ret) # 指定运行时平台 print(f'Initializing detection model on target: {target}') ret = self.det_rknn.init_runtime( target=target, device_id=None, perf_debug=False, eval_mem=False, async_mode=False, core_mask=RKNN.NPU_CORE_AUTO # 自动选择NPU核心 ) if ret != 0: print(f'Init detection runtime failed! Error code: {ret}') print('Please check if you are running on the correct platform') if target == 'simulator': print('Note: Simulator mode requires different setup') exit(ret) print('Detection model loaded successfully') # 初始化识别模型 self.rec_rknn = RKNN(verbose=True) ret = self.rec_rknn.load_rknn(rec_model_path) if ret != 0: print(f'Load recognition RKNN model failed! Error code: {ret}') print('Please check if the model file exists and is valid') exit(ret) # 指定运行时平台 print(f'Initializing recognition model on target: {target}') ret = self.rec_rknn.init_runtime( target=target, device_id=None, perf_debug=False, eval_mem=False, async_mode=False, core_mask=RKNN.NPU_CORE_AUTO # 自动选择NPU核心 ) if ret != 0: print(f'Init recognition runtime failed! Error code: {ret}') print('Please check if you are running on the correct platform') exit(ret) print('Recognition model loaded successfully') # 字符集(根据您的模型调整) self.character = ['blank', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '<', '=', '>', '?', '@', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', '\\', ']', '^', '_', '`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '{', '|', '}', '~'] + [chr(i) for i in range(19968, 40870)] # 中文字符 def preprocess_det(self, img, input_shape=(640, 640)): """ 检测模型的图像预处理 - 适配RKNN """ h, w, _ = img.shape target_h, target_w = input_shape # 计算缩放比例 - 保持宽高比 ratio_h = target_h / h ratio_w = target_w / w ratio = min(ratio_h, ratio_w) # 计算缩放后的尺寸 new_h = int(h * ratio) new_w = int(w * ratio) # 调整图像大小 resized_img = cv2.resize(img, (new_w, new_h)) # 创建目标尺寸的图像,用灰色填充 padded_img = np.ones((target_h, target_w, 3), dtype=np.uint8) * 114 # 计算居中位置 top = (target_h - new_h) // 2 left = (target_w - new_w) // 2 # 将缩放后的图像放到居中位置 padded_img[top:top+new_h, left:left+new_w] = resized_img # RKNN通常需要uint8输入(如果量化时使用了uint8) # 如果您的模型使用float输入,请取消下面的注释并注释掉return语句 # padded_img = padded_img.astype(np.float32) # padded_img = (padded_img / 255.0 - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225] return padded_img, ratio, (top, left) def preprocess_rec(self, img, input_shape=(320, 48)): """ 识别模型的图像预处理 - 适配RKNN """ target_w, target_h = input_shape # 注意:宽度在前 h, w = img.shape[:2] # 计算缩放比例,保持宽高比 ratio_h = target_h / h ratio_w = target_w / w ratio = min(ratio_h, ratio_w) # 计算缩放后的尺寸 new_h = int(h * ratio) new_w = int(w * ratio) # 调整图像大小 resized_image = cv2.resize(img, (new_w, new_h)) # 创建目标尺寸的图像,用黑色填充 padded_image = np.zeros((target_h, target_w, 3), dtype=np.uint8) # 将缩放后的图像放到左上角(识别模型通常左对齐) padded_image[:new_h, :new_w] = resized_image # RKNN通常需要uint8输入(如果量化时使用了uint8) # 如果您的模型使用float输入,请取消下面的注释并注释掉return语句 # padded_image = padded_image.astype(np.float32) # padded_image = (padded_image / 255.0 - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225] return padded_image def postprocess_det_boxes(self, dt_boxes, ratio, padding_info, ori_shape): """ 检测结果后处理 - 适配固定输入形状 """ if dt_boxes is None or len(dt_boxes) == 0: return None ori_h, ori_w = ori_shape top, left = padding_info # 将坐标从模型输出空间转换回原图空间 dt_boxes[:, :, 0] = (dt_boxes[:, :, 0] - left) / ratio dt_boxes[:, :, 1] = (dt_boxes[:, :, 1] - top) / ratio # 裁剪到原图范围内 dt_boxes[:, :, 0] = np.clip(dt_boxes[:, :, 0], 0, ori_w) dt_boxes[:, :, 1] = np.clip(dt_boxes[:, :, 1], 0, ori_h) return dt_boxes def boxes_from_bitmap(self, pred, bitmap, dest_width, dest_height, max_candidates=1000, box_thresh=0.6): """ 从位图中提取文本框 """ bitmap = bitmap.astype(np.uint8) height, width = bitmap.shape # 查找轮廓 contours, _ = cv2.findContours(bitmap, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) num_contours = min(len(contours), max_candidates) boxes = [] scores = [] for i in range(num_contours): contour = contours[i] points, sside = self.get_mini_boxes(contour) if sside < 5: continue points = np.array(points) score = self.box_score_fast(pred, points.reshape(-1, 2)) if box_thresh > score: continue # 扩展box box = self.unclip(points, 1.5).reshape(-1, 1, 2) box, sside = self.get_mini_boxes(box) if sside < 5 + 2: continue box = np.array(box) box[:, 0] = np.clip(box[:, 0] / width * dest_width, 0, dest_width) box[:, 1] = np.clip(box[:, 1] / height * dest_height, 0, dest_height) boxes.append(box.astype(np.int16)) scores.append(score) return np.array(boxes), scores def get_mini_boxes(self, contour): """获取最小外接矩形""" bounding_box = cv2.minAreaRect(contour) points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0]) index_1, index_2, index_3, index_4 = 0, 1, 2, 3 if points[1][1] > points[0][1]: index_1 = 0 index_4 = 1 else: index_1 = 1 index_4 = 0 if points[3][1] > points[2][1]: index_2 = 2 index_3 = 3 else: index_2 = 3 index_3 = 2 box = [points[index_1], points[index_2], points[index_3], points[index_4]] return box, min(bounding_box[1]) def box_score_fast(self, bitmap, _box): """快速计算box得分""" h, w = bitmap.shape[:2] box = _box.copy() xmin = np.clip(np.floor(box[:, 0].min()).astype(int), 0, w - 1) xmax = np.clip(np.ceil(box[:, 0].max()).astype(int), 0, w - 1) ymin = np.clip(np.floor(box[:, 1].min()).astype(int), 0, h - 1) ymax = np.clip(np.ceil(box[:, 1].max()).astype(int), 0, h - 1) mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8) box[:, 0] = box[:, 0] - xmin box[:, 1] = box[:, 1] - ymin cv2.fillPoly(mask, box.reshape(1, -1, 2).astype(np.int32), 1) return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0] def unclip(self, box, unclip_ratio): """扩展文本框""" try: from shapely.geometry import Polygon import pyclipper poly = Polygon(box) distance = poly.area * unclip_ratio / poly.length offset = pyclipper.PyclipperOffset() offset.AddPath(box, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON) expanded = offset.Execute(distance) if len(expanded) == 0: return box else: return np.array(expanded[0]) except: # 如果shapely/pyclipper不可用,使用简单的膨胀 return box def decode_rec_result(self, preds_prob): """ 解码识别结果 """ preds_idx = np.argmax(preds_prob, axis=1) preds_prob_max = np.max(preds_prob, axis=1) # CTC解码 last_idx = 0 preds_text = [] preds_conf = [] for i, idx in enumerate(preds_idx): if idx != last_idx and idx != 0: # 0是blank if idx < len(self.character): preds_text.append(self.character[idx]) preds_conf.append(preds_prob_max[i]) last_idx = idx text = ''.join(preds_text) conf = np.mean(preds_conf) if preds_conf else 0.0 return text, conf def detect_text(self, image): """ 文本检测 - RKNN版本 """ ori_h, ori_w = image.shape[:2] # 预处理 det_img, ratio, padding_info = self.preprocess_det(image) # RKNN推理 start_time = time.time() outputs = self.det_rknn.inference(inputs=[det_img]) det_time = time.time() - start_time print(f"Detection inference time: {det_time:.3f}s") # 获取输出 (通常是第一个输出) det_output = outputs[0] # 确保输出形状正确 if len(det_output.shape) == 4: mask = det_output[0, 0, :, :] else: mask = det_output[0, :, :] if len(det_output.shape) == 3 else det_output # 后处理 threshold = 0.3 bitmap = (mask > threshold).astype(np.uint8) * 255 # 从位图中提取文本框(坐标是在640x640空间中的) boxes, scores = self.boxes_from_bitmap(mask, bitmap, 640, 640) # 将坐标转换回原图空间 if len(boxes) > 0: boxes = self.postprocess_det_boxes(boxes, ratio, padding_info, (ori_h, ori_w)) return boxes, scores def recognize_text(self, image): """ 文本识别 - RKNN版本 """ # 预处理 rec_img = self.preprocess_rec(image) # RKNN推理 start_time = time.time() outputs = self.rec_rknn.inference(inputs=[rec_img]) rec_time = time.time() - start_time print(f"Recognition inference time: {rec_time:.3f}s") # 获取输出 rec_output = outputs[0] # 确保输出维度正确 if len(rec_output.shape) == 3: rec_result = rec_output[0] # 移除batch维度 else: rec_result = rec_output # 解码 text, conf = self.decode_rec_result(rec_result) return text, conf def get_rotate_crop_image(self, img, points): """ 根据四个点坐标裁剪并矫正图像 """ img_crop_width = int( max( np.linalg.norm(points[0] - points[1]), np.linalg.norm(points[2] - points[3]))) img_crop_height = int( max( np.linalg.norm(points[0] - points[3]), np.linalg.norm(points[1] - points[2]))) pts_std = np.float32([[0, 0], [img_crop_width, 0], [img_crop_width, img_crop_height], [0, img_crop_height]]) M = cv2.getPerspectiveTransform(points, pts_std) dst_img = cv2.warpPerspective( img, M, (img_crop_width, img_crop_height), borderMode=cv2.BORDER_REPLICATE, flags=cv2.INTER_CUBIC) dst_img_height, dst_img_width = dst_img.shape[0:2] if dst_img_height * 1.0 / dst_img_width >= 1.5: dst_img = np.rot90(dst_img) return dst_img def ocr(self, image_path): """ 完整的OCR流程 - RKNN版本 """ # 读取图像 image = cv2.imread(image_path) if image is None: print(f"Cannot read image: {image_path}") return [] print(f"Processing image: {image.shape}") # 1. 文本检测 dt_boxes, scores = self.detect_text(image) if dt_boxes is None or len(dt_boxes) == 0: print("No text boxes detected") return [] print(f"Detected {len(dt_boxes)} text boxes") # 2. 文本识别 ocr_results = [] for i, box in enumerate(dt_boxes): # 裁剪文本区域 box_points = box.astype(np.float32) crop_img = self.get_rotate_crop_image(image, box_points) # 识别文本 text, conf = self.recognize_text(crop_img) if conf > 0.5: # 置信度过滤 ocr_results.append({ 'text': text, 'confidence': conf, 'box': box.tolist(), 'score': scores[i] if i < len(scores) else 0.0 }) print(f"Box {i}: {text} (conf: {conf:.3f})") return ocr_results def release(self): """ 释放RKNN资源 """ if hasattr(self, 'det_rknn'): self.det_rknn.release() if hasattr(self, 'rec_rknn'): self.rec_rknn.release() # 使用示例 def main(): # 初始化OCR - 可以指定不同的target平台 print("Initializing PaddleOCR RKNN...") # 根据您的硬件平台选择: # 'rk3588' - RK3588 (默认) # 'rk3566' - RK3566 # 'rk3568' - RK3568 # 'rv1103' - RV1103 # 'rv1106' - RV1106 # 'simulator' - 仿真器模式(用于开发调试) try: ocr = PaddleOCRRKNN('/home/admin-root/haotian/康达瑞贝斯机器狗/det_shape.rknn', '/home/admin-root/haotian/康达瑞贝斯机器狗/rec_shape.rknn', target='rk3588') except Exception as e: print(f"Failed to initialize RKNN models: {e}") print("Trying alternative initialization...") # 如果失败,尝试其他选项 try: ocr = PaddleOCRRKNN('det.rknn', 'rec.rknn', target='simulator') except Exception as e2: print(f"Alternative initialization also failed: {e2}") return try: # 执行OCR image_path = '/home/admin-root/haotian/康达瑞贝斯机器狗/data_image/001读表图片/3aee64cc1f90d93a5a45979f7b17cb4b_frame_001460.jpg' # 检查图像文件是否存在 import os if not os.path.exists(image_path): print(f"Image file not found: {image_path}") print("Please provide a valid image path") return results = ocr.ocr(image_path) # 打印结果 print(f"\n=== OCR Results ({len(results)} items) ===") for i, result in enumerate(results): print(f"\n[{i+1}]") print(f"Text: {result['text']}") print(f"Confidence: {result['confidence']:.3f}") print(f"Detection Score: {result['score']:.3f}") print(f"Box: {result['box']}") # 可视化结果 if results: visualize_results(image_path, results) else: print("No text detected in the image") except Exception as e: print(f"OCR processing failed: {e}") finally: # 释放资源 ocr.release() print("RKNN resources released") def visualize_results(image_path, results): """ 可视化OCR结果 """ image = cv2.imread(image_path) for i, result in enumerate(results): box = np.array(result['box'], dtype=np.int32) cv2.polylines(image, [box], True, (0, 255, 0), 2) # 在框上方显示文本和序号 text_display = f"{i+1}: {result['text'][:20]}..." cv2.putText(image, text_display, (box[0][0], box[0][1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) # 保存结果图像 output_path = 'ocr_result_rknn.jpg' cv2.imwrite(output_path, image) print(f"Result image saved to: {output_path}") if __name__ == "__main__": main()