470 lines
16 KiB
Python
470 lines
16 KiB
Python
from ultralytics import YOLO
|
||
# from rknn.api import RKNN
|
||
import cv2
|
||
import numpy as np
|
||
import onnxruntime as ort
|
||
import time
|
||
|
||
from app.config.config import yolov8_settings
|
||
|
||
|
||
|
||
|
||
class Yolov8Obj:
|
||
|
||
def __init__(self):
|
||
self.model = YOLO(yolov8_settings.YOLOV8_MODEL_DIR)
|
||
|
||
def detect(self, image_path):
|
||
result = self.model.predict(image_path)
|
||
boxes = result[0].boxes
|
||
|
||
cls = boxes.cls.tolist()
|
||
conf = boxes.conf.tolist()
|
||
coords = boxes.xyxy.tolist()
|
||
|
||
return cls, conf, coords
|
||
|
||
|
||
class YOLOv8ONNX:
|
||
def __init__(self, model_path=yolov8_settings.YOLOV8_MODEL_ONNX_DIRS, conf_threshold=0.5, iou_threshold=0.4):
|
||
"""
|
||
初始化YOLOv8 ONNX模型
|
||
|
||
Args:
|
||
model_path: ONNX模型文件路径
|
||
conf_threshold: 置信度阈值
|
||
iou_threshold: NMS IoU阈值
|
||
"""
|
||
self.conf_threshold = conf_threshold
|
||
self.iou_threshold = iou_threshold
|
||
|
||
# 创建ONNX Runtime会话
|
||
self.session = ort.InferenceSession(model_path)
|
||
|
||
# 获取模型输入输出信息
|
||
self.input_name = self.session.get_inputs()[0].name
|
||
self.output_name = self.session.get_outputs()[0].name
|
||
|
||
# 获取输入尺寸
|
||
input_shape = self.session.get_inputs()[0].shape
|
||
self.input_height = input_shape[2]
|
||
self.input_width = input_shape[3]
|
||
|
||
def preprocess(self, image):
|
||
"""
|
||
预处理图像
|
||
|
||
Args:
|
||
image: 输入图像 (BGR格式)
|
||
|
||
Returns:
|
||
preprocessed_image: 预处理后的图像
|
||
scale_ratio: 缩放比例
|
||
pad_info: 填充信息 (pad_x, pad_y)
|
||
"""
|
||
# 获取原图尺寸
|
||
h, w = image.shape[:2]
|
||
|
||
# 计算缩放比例
|
||
scale = min(self.input_height / h, self.input_width / w)
|
||
new_h, new_w = int(h * scale), int(w * scale)
|
||
|
||
# 等比例缩放
|
||
resized_image = cv2.resize(image, (new_w, new_h))
|
||
|
||
# 计算填充
|
||
pad_x = (self.input_width - new_w) // 2
|
||
pad_y = (self.input_height - new_h) // 2
|
||
|
||
# 创建填充后的图像
|
||
padded_image = np.full((self.input_height, self.input_width, 3), 114, dtype=np.uint8)
|
||
padded_image[pad_y:pad_y + new_h, pad_x:pad_x + new_w] = resized_image
|
||
|
||
# 转换为模型输入格式: BGR -> RGB, HWC -> CHW, 归一化
|
||
input_image = padded_image[:, :, ::-1].transpose(2, 0, 1).astype(np.float32) / 255.0
|
||
input_image = np.expand_dims(input_image, axis=0) # 添加batch维度
|
||
|
||
return input_image, scale, (pad_x, pad_y)
|
||
|
||
def postprocess(self, outputs, scale, pad_info, original_shape):
|
||
"""
|
||
后处理模型输出 - 针对YOLOv8格式优化
|
||
|
||
Args:
|
||
outputs: 模型原始输出
|
||
scale: 图像缩放比例
|
||
pad_info: 填充信息
|
||
original_shape: 原图尺寸
|
||
|
||
Returns:
|
||
boxes: 检测框 [[x1, y1, x2, y2], ...]
|
||
scores: 置信度分数
|
||
class_ids: 类别ID
|
||
"""
|
||
predictions = outputs[0] # 形状通常是: [1, 6, 8400] 或 [1, num_classes+4, num_boxes]
|
||
|
||
# YOLOv8输出格式: [batch, 4+num_classes, num_boxes]
|
||
# 需要转置为 [batch, num_boxes, 4+num_classes]
|
||
if len(predictions.shape) == 3:
|
||
predictions = predictions.transpose(0, 2, 1) # [1, num_boxes, 4+num_classes]
|
||
|
||
predictions = predictions[0] # 移除batch维度: [num_boxes, 4+num_classes]
|
||
|
||
# 打印调试信息
|
||
# print(f"预测输出形状: {predictions.shape}")
|
||
# print(f"前几个预测值: {predictions[:5]}")
|
||
|
||
# 分离坐标和分类信息
|
||
boxes = predictions[:, :4] # [x_center, y_center, width, height]
|
||
scores = predictions[:, 4:] # 类别置信度 [num_boxes, num_classes]
|
||
|
||
# print(f"检测框形状: {boxes.shape}")
|
||
# print(f"分数形状: {scores.shape}")
|
||
|
||
# 获取最高置信度和对应类别
|
||
class_ids = np.argmax(scores, axis=1)
|
||
confidences = np.max(scores, axis=1)
|
||
|
||
# print(f"置信度范围: {confidences.min():.4f} - {confidences.max():.4f}")
|
||
# print(f"检测到的类别: {np.unique(class_ids)}")
|
||
|
||
# 过滤低置信度检测
|
||
valid_indices = confidences > self.conf_threshold
|
||
valid_boxes = boxes[valid_indices]
|
||
valid_confidences = confidences[valid_indices]
|
||
valid_class_ids = class_ids[valid_indices]
|
||
|
||
# print(f"过滤后检测数量: {len(valid_boxes)}")
|
||
|
||
if len(valid_boxes) == 0:
|
||
return [], [], []
|
||
|
||
# 转换为 [x1, y1, x2, y2] 格式
|
||
x_center, y_center, width, height = valid_boxes[:, 0], valid_boxes[:, 1], valid_boxes[:, 2], valid_boxes[:, 3]
|
||
x1 = x_center - width / 2
|
||
y1 = y_center - height / 2
|
||
x2 = x_center + width / 2
|
||
y2 = y_center + height / 2
|
||
|
||
converted_boxes = np.stack([x1, y1, x2, y2], axis=1)
|
||
|
||
# 坐标反变换到原图
|
||
pad_x, pad_y = pad_info
|
||
converted_boxes[:, [0, 2]] = (converted_boxes[:, [0, 2]] - pad_x) / scale
|
||
converted_boxes[:, [1, 3]] = (converted_boxes[:, [1, 3]] - pad_y) / scale
|
||
|
||
# 限制坐标范围
|
||
h, w = original_shape[:2]
|
||
converted_boxes[:, [0, 2]] = np.clip(converted_boxes[:, [0, 2]], 0, w)
|
||
converted_boxes[:, [1, 3]] = np.clip(converted_boxes[:, [1, 3]], 0, h)
|
||
|
||
# 非极大值抑制 (NMS)
|
||
indices = cv2.dnn.NMSBoxes(
|
||
converted_boxes.tolist(),
|
||
valid_confidences.tolist(),
|
||
self.conf_threshold,
|
||
self.iou_threshold
|
||
)
|
||
|
||
if len(indices) > 0:
|
||
indices = indices.flatten()
|
||
return converted_boxes[indices], valid_confidences[indices], valid_class_ids[indices]
|
||
|
||
return [], [], []
|
||
|
||
def detect(self, image):
|
||
"""
|
||
对图像进行目标检测
|
||
|
||
Args:
|
||
image: 输入图像 (BGR格式)
|
||
|
||
Returns:
|
||
boxes: 检测框列表
|
||
scores: 置信度分数列表
|
||
class_ids: 类别ID列表
|
||
"""
|
||
# 预处理
|
||
input_image, scale, pad_info = self.preprocess(image)
|
||
|
||
# 推理
|
||
outputs = self.session.run([self.output_name], {self.input_name: input_image})
|
||
|
||
# 后处理
|
||
boxes, scores, class_ids = self.postprocess(outputs, scale, pad_info, image.shape)
|
||
|
||
return boxes, scores, class_ids
|
||
|
||
|
||
class YOLOv8RKNN:
|
||
def __init__(self, model_path, input_size=(640, 640)):
|
||
self.model_path = model_path
|
||
self.input_size = input_size
|
||
self.rknn = RKNN()
|
||
|
||
# 类别名称,根据你的2个类别修改
|
||
self.class_names = ['class1', 'class2'] # 请替换为你实际的类别名称
|
||
|
||
# 初始化模型
|
||
self.load_model()
|
||
|
||
def load_model(self):
|
||
"""加载RKNN模型"""
|
||
print("Loading RKNN model...")
|
||
ret = self.rknn.load_rknn(self.model_path)
|
||
if ret != 0:
|
||
print("Load RKNN model failed!")
|
||
return False
|
||
|
||
# 初始化运行时环境(在RK3588设备上运行)
|
||
print("Init RKNN runtime...")
|
||
ret = self.rknn.init_runtime(target='rk3588', device_id=None, perf_debug=False, eval_mem=False)
|
||
if ret != 0:
|
||
print("Init RKNN runtime failed!")
|
||
return False
|
||
|
||
print("RKNN model loaded successfully!")
|
||
return True
|
||
|
||
def preprocess(self, image):
|
||
"""图像预处理"""
|
||
# 获取原始图像尺寸
|
||
self.orig_height, self.orig_width = image.shape[:2]
|
||
|
||
# Resize到模型输入尺寸,保持宽高比
|
||
scale = min(self.input_size[0]/self.orig_width, self.input_size[1]/self.orig_height)
|
||
new_width = int(self.orig_width * scale)
|
||
new_height = int(self.orig_height * scale)
|
||
|
||
# 缩放图像
|
||
resized = cv2.resize(image, (new_width, new_height))
|
||
|
||
# 创建输入图像(填充到目标尺寸)
|
||
input_image = np.full((self.input_size[1], self.input_size[0], 3), 114, dtype=np.uint8)
|
||
|
||
# 计算填充位置(居中)
|
||
y_offset = (self.input_size[1] - new_height) // 2
|
||
x_offset = (self.input_size[0] - new_width) // 2
|
||
|
||
# 将缩放后的图像放到中心位置
|
||
input_image[y_offset:y_offset+new_height, x_offset:x_offset+new_width] = resized
|
||
|
||
# 保存缩放参数用于后处理
|
||
self.scale = scale
|
||
self.x_offset = x_offset
|
||
self.y_offset = y_offset
|
||
|
||
return input_image
|
||
|
||
def postprocess(self, outputs, conf_threshold=0.5, nms_threshold=0.4):
|
||
"""后处理:解析YOLO输出并进行NMS"""
|
||
# YOLOv8输出格式: [batch, 84, 8400] (2个类别: 4+2+80=84,但实际只有6维)
|
||
# 对于2类别: [x, y, w, h, conf_class1, conf_class2]
|
||
predictions = outputs[0][0] # 移除batch维度
|
||
|
||
# 转置为 [8400, 6] 格式
|
||
predictions = predictions.transpose()
|
||
|
||
boxes = []
|
||
scores = []
|
||
class_ids = []
|
||
|
||
for detection in predictions:
|
||
# 提取坐标和类别置信度
|
||
x, y, w, h = detection[:4]
|
||
class_confs = detection[4:6] # 2个类别的置信度
|
||
|
||
# 找到最大置信度的类别
|
||
class_id = np.argmax(class_confs)
|
||
max_conf = class_confs[class_id]
|
||
|
||
if max_conf >= conf_threshold:
|
||
# 转换坐标格式 (中心点 -> 左上角)
|
||
x1 = x - w/2
|
||
y1 = y - h/2
|
||
x2 = x + w/2
|
||
y2 = y + h/2
|
||
|
||
# 将坐标映射回原图尺寸
|
||
x1 = (x1 - self.x_offset) / self.scale
|
||
y1 = (y1 - self.y_offset) / self.scale
|
||
x2 = (x2 - self.x_offset) / self.scale
|
||
y2 = (y2 - self.y_offset) / self.scale
|
||
|
||
# 限制在图像边界内
|
||
x1 = max(0, min(x1, self.orig_width))
|
||
y1 = max(0, min(y1, self.orig_height))
|
||
x2 = max(0, min(x2, self.orig_width))
|
||
y2 = max(0, min(y2, self.orig_height))
|
||
|
||
boxes.append([x1, y1, x2, y2])
|
||
scores.append(max_conf)
|
||
class_ids.append(class_id)
|
||
|
||
# 执行NMS
|
||
if len(boxes) > 0:
|
||
boxes = np.array(boxes)
|
||
scores = np.array(scores)
|
||
class_ids = np.array(class_ids)
|
||
|
||
# OpenCV NMS
|
||
indices = cv2.dnn.NMSBoxes(boxes, scores, conf_threshold, nms_threshold)
|
||
|
||
if len(indices) > 0:
|
||
indices = indices.flatten()
|
||
return boxes[indices], scores[indices], class_ids[indices]
|
||
|
||
return np.array([]), np.array([]), np.array([])
|
||
|
||
def detect(self, image, conf_threshold=0.5, nms_threshold=0.4):
|
||
"""执行检测"""
|
||
# 预处理
|
||
input_image = self.preprocess(image)
|
||
|
||
# 推理
|
||
start_time = time.time()
|
||
outputs = self.rknn.inference(inputs=[input_image])
|
||
inference_time = time.time() - start_time
|
||
|
||
# 后处理
|
||
boxes, scores, class_ids = self.postprocess(outputs, conf_threshold, nms_threshold)
|
||
|
||
return boxes, scores, class_ids, inference_time
|
||
|
||
def draw_detections(self, image, boxes, scores, class_ids):
|
||
"""在图像上绘制检测结果"""
|
||
for i in range(len(boxes)):
|
||
x1, y1, x2, y2 = boxes[i].astype(int)
|
||
score = scores[i]
|
||
class_id = int(class_ids[i])
|
||
|
||
# 绘制边界框
|
||
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
||
|
||
# 绘制标签
|
||
label = f"{self.class_names[class_id]}: {score:.2f}"
|
||
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
|
||
cv2.rectangle(image, (x1, y1-label_size[1]-10),
|
||
(x1+label_size[0], y1), (0, 255, 0), -1)
|
||
cv2.putText(image, label, (x1, y1-5),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)
|
||
|
||
return image
|
||
|
||
def release(self):
|
||
"""释放资源"""
|
||
if self.rknn:
|
||
self.rknn.release()
|
||
|
||
def main():
|
||
# 初始化检测器
|
||
model_path = "/home/orangepi/Desktop/康达机器狗/model_rknn/yolov8_20250820.rknn"
|
||
detector = YOLOv8RKNN(model_path)
|
||
|
||
# 测试单张图片
|
||
def test_image(image_path):
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
print(f"Cannot load image: {image_path}")
|
||
return
|
||
|
||
# 执行检测
|
||
boxes, scores, class_ids, inference_time = detector.detect(image)
|
||
|
||
print(f"Inference time: {inference_time*1000:.2f}ms")
|
||
print(f"Detected {len(boxes)} objects")
|
||
|
||
# 绘制结果
|
||
result_image = detector.draw_detections(image, boxes, scores, class_ids)
|
||
|
||
# 显示结果
|
||
# cv2.imshow("Detection Result", result_image)
|
||
# cv2.waitKey(0)
|
||
# cv2.destroyAllWindows()
|
||
|
||
cv2.imwrite("xxxxxxx.jpg", result_image)
|
||
|
||
# 测试摄像头实时检测
|
||
def test_camera():
|
||
cap = cv2.VideoCapture(0) # 使用默认摄像头
|
||
if not cap.isOpened():
|
||
print("Cannot open camera")
|
||
return
|
||
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
# 执行检测
|
||
boxes, scores, class_ids, inference_time = detector.detect(frame)
|
||
|
||
# 绘制结果
|
||
result_frame = detector.draw_detections(frame, boxes, scores, class_ids)
|
||
|
||
# 显示FPS
|
||
fps = 1.0 / inference_time if inference_time > 0 else 0
|
||
cv2.putText(result_frame, f"FPS: {fps:.1f}", (10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
||
|
||
cv2.imshow("Real-time Detection", result_frame)
|
||
|
||
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||
break
|
||
|
||
cap.release()
|
||
cv2.destroyAllWindows()
|
||
|
||
# 选择测试模式
|
||
mode = input("选择模式 (1: 图片检测, 2: 摄像头实时检测): ")
|
||
|
||
if mode == "1":
|
||
image_path = input("输入图片路径: ")
|
||
test_image(image_path)
|
||
elif mode == "2":
|
||
test_camera()
|
||
else:
|
||
print("无效选择")
|
||
|
||
# 释放资源
|
||
detector.release()
|
||
|
||
def draw_detections(image, boxes, scores, class_ids, class_names=None):
|
||
"""
|
||
在图像上绘制检测结果
|
||
|
||
Args:
|
||
image: 输入图像
|
||
boxes: 检测框
|
||
scores: 置信度分数
|
||
class_ids: 类别ID
|
||
class_names: 类别名称列表(可选)
|
||
|
||
Returns:
|
||
绘制了检测结果的图像
|
||
"""
|
||
result_image = image.copy()
|
||
|
||
for i, (box, score, class_id) in enumerate(zip(boxes, scores, class_ids)):
|
||
x1, y1, x2, y2 = map(int, box)
|
||
|
||
# 绘制边界框
|
||
cv2.rectangle(result_image, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
||
|
||
# 准备标签文本
|
||
if class_names and class_id < len(class_names):
|
||
label = f"{class_names[class_id]}: {score:.2f}"
|
||
else:
|
||
label = f"Class {class_id}: {score:.2f}"
|
||
|
||
# 绘制标签背景
|
||
label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
|
||
cv2.rectangle(result_image, (x1, y1 - label_size[1] - 10),
|
||
(x1 + label_size[0], y1), (0, 255, 0), -1)
|
||
|
||
# 绘制标签文本
|
||
cv2.putText(result_image, label, (x1, y1 - 5),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)
|
||
|
||
return result_image |