This commit is contained in:
haotian 2025-01-13 11:07:23 +08:00
parent 336ed3dadc
commit 2a26c0b973
30 changed files with 365 additions and 23 deletions

0
__init__.py Normal file
View File

Binary file not shown.

View File

@ -17,6 +17,11 @@
├── utils/
│ └── 006_utils.py # 工具函数
├── tests/ # 测试各个程序
│ ├── __init__.py
│ ├── 008_test_person_detector.py
│ ├── 009_test_distance_estimator.py
│ └── 010_test_api_server.py
├── 007_main.py # 主程序
└── requirements.txt # 项目依赖

47
main.py Normal file
View File

@ -0,0 +1,47 @@
import yaml
import uvicorn
from src. import RTSPCamera
from src.person_detector import PersonDetector
from src.distance_estimator import DistanceEstimator
from src.api_server import DistanceAPI, app
def load_config():
with open('config/config.yaml', 'r') as f:
return yaml.safe_load(f)
def main():
config = load_config()
# 初始化摄像头
camera = RTSPCamera(
config['camera']['rtsp_url'],
config['camera']['fps']
)
camera.start()
# 初始化检测器
detector = PersonDetector(
config['model']['person_detection']['model_name'],
config['model']['person_detection']['confidence_threshold']
)
# 初始化距离估算器
estimator = DistanceEstimator(
config['model']['distance_estimation']['focal_length'],
config['model']['distance_estimation']['sensor_height'],
config['model']['distance_estimation']['average_person_height']
)
# 初始化API
distance_api = DistanceAPI(camera, detector, estimator)
app.distance_api = distance_api
# 启动API服务器
uvicorn.run(
app,
host=config['api']['host'],
port=config['api']['port']
)
if __name__ == "__main__":
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 96 KiB

View File

@ -1,23 +0,0 @@
from ultralytics import YOLO
import numpy as np
class PersonDetector:
def __init__(self, model_path, conf_threshold=0.5):
self.model = YOLO(model_path)
self.conf_threshold = conf_threshold
def detect(self, frame):
results = self.model(frame, classes=0) # 只检测人类
persons = []
for result in results:
boxes = result.boxes
for box in boxes:
if box.conf > self.conf_threshold:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
persons.append({
'bbox': (int(x1), int(y1), int(x2), int(y2)),
'confidence': float(box.conf)
})
return persons

0
src/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

51
src/person_detector.py Normal file
View File

@ -0,0 +1,51 @@
from ultralytics import YOLO
import numpy as np
from ..utils.utils import VisualizationUtils
class PersonDetector:
def __init__(self, model_path, conf_threshold=0.5):
self.model = YOLO(model_path)
self.conf_threshold = conf_threshold
self.vis_utils = VisualizationUtils()
def detect(self, frame, save_visualization=False, save_path="output"):
"""
检测图像中的人物并可选择保存可视化结果
Args:
frame: 输入图像
save_visualization: 是否保存可视化结果
save_path: 可视化结果保存路径
Returns:
persons: 检测到的人物列表
vis_path: 如果save_visualization为True返回保存的文件路径否则为None
"""
results = self.model(frame, classes=0) # 只检测人类
persons = []
for result in results:
boxes = result.boxes
for box in boxes:
if box.conf > self.conf_threshold:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
persons.append({
'bbox': (int(x1), int(y1), int(x2), int(y2)),
'confidence': float(box.conf)
})
vis_path = None
if save_visualization and persons:
# 绘制检测结果
vis_image = self.vis_utils.draw_detection_results(
frame,
persons,
show_confidence=True
)
# 保存结果
vis_path = self.vis_utils.save_detection_image(
vis_image,
save_path
)
return persons, vis_path

0
tests/__init__.py Normal file
View File

Binary file not shown.

40
tests/test_api_server.py Normal file
View File

@ -0,0 +1,40 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
class Distance(BaseModel):
distance_mm: float
confidence: float
detection_image_path: Optional[str] = None
class DistanceAPI:
def __init__(self, camera, detector, estimator):
self.camera = camera
self.detector = detector
self.estimator = estimator
async def get_distances(self) -> List[Distance]:
frame = self.camera.get_frame()
if frame is None:
raise HTTPException(status_code=500, detail="No frame available")
# 检测人物并保存可视化结果
persons, vis_path = self.detector.detect(
frame,
save_visualization=True,
save_path="output/detections"
)
distances = []
for person in persons:
bbox = person['bbox']
person_height = bbox[3] - bbox[1] # y2 - y1
distance = self.estimator.estimate_distance(person_height, frame.shape[0])
distances.append(Distance(
distance_mm=distance,
confidence=person['confidence'],
detection_image_path=vis_path
))
return distances

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

BIN
tests/test_data/person.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

View File

@ -0,0 +1,54 @@
import pytest
import numpy as np
from ..src.distance_estimator import DistanceEstimator
@pytest.fixture
def estimator():
return DistanceEstimator(
focal_length=1000, # mm
sensor_height=5.6, # mm
avg_person_height=1700 # mm
)
def test_distance_estimator_initialization(estimator):
assert estimator.focal_length == 1000
assert estimator.sensor_height == 5.6
assert estimator.avg_person_height == 1700
def test_distance_estimation(estimator):
# 测试不同像素高度的情况
test_cases = [
# (person_height_pixels, frame_height, expected_distance_range)
(400, 720, (2000, 5000)), # 近距离
(200, 720, (4000, 10000)), # 中等距离
(100, 720, (8000, 20000)) # 远距离
]
for height_px, frame_height, (min_dist, max_dist) in test_cases:
distance = estimator.estimate_distance(height_px, frame_height)
assert min_dist <= distance <= max_dist, \
f"Distance {distance}mm not in expected range [{min_dist}, {max_dist}]"
def test_distance_estimation_edge_cases(estimator):
# 测试边界情况
with pytest.raises(ZeroDivisionError):
estimator.estimate_distance(0, 720)
# 测试极小值
distance = estimator.estimate_distance(1, 720)
assert distance > 0
# 测试当人物高度等于图像高度的情况
distance = estimator.estimate_distance(720, 720)
assert distance > 0
def test_distance_estimation_linearity(estimator):
# 测试距离估算的线性关系
height1, height2 = 200, 100
frame_height = 720
distance1 = estimator.estimate_distance(height1, frame_height)
distance2 = estimator.estimate_distance(height2, frame_height)
# 当像素高度减半时,距离应该翻倍
assert abs(distance2 / distance1 - 2.0) < 0.1

View File

@ -0,0 +1,51 @@
import pytest
import numpy as np
import cv2
# import os
# import sys
from ..src.person_detector import PersonDetector
# from src.person_detector import PersonDetector
@pytest.fixture
def detector():
# 使用YOLOv8n模型进行测试
return PersonDetector("yolov8n.pt", conf_threshold=0.5)
@pytest.fixture
def sample_image():
# 创建一个测试用的图像,包含一个人的图像
img_path = "tests/test_data/person.jpg" # 请准备一张包含人物的测试图片
return cv2.imread(img_path)
def test_person_detector_initialization(detector):
assert detector.model is not None
assert detector.conf_threshold == 0.5
def test_person_detection(detector, sample_image):
persons = detector.detect(sample_image, True)
# 验证检测结果
assert isinstance(persons, list)
if len(persons) > 0:
person = persons[0]
assert 'bbox' in person
assert 'confidence' in person
# 验证边界框格式
bbox = person['bbox']
assert len(bbox) == 4
assert all(isinstance(x, int) for x in bbox)
assert bbox[0] < bbox[2] # x1 < x2
assert bbox[1] < bbox[3] # y1 < y2
# 验证置信度
print("人物置信度", person['confidence'])
assert 0 <= person['confidence'] <= 1
def test_person_detector_empty_image(detector):
# 测试空图像
empty_image = np.zeros((100, 100, 3), dtype=np.uint8)
persons = detector.detect(empty_image)
assert isinstance(persons, list)
assert len(persons) == 0

Binary file not shown.

117
utils/utils.py Normal file
View File

@ -0,0 +1,117 @@
import cv2
import numpy as np
from datetime import datetime
import os
class VisualizationUtils:
@staticmethod
def draw_detection_results(
image: np.ndarray,
persons: list,
distances: list = None,
save_path: str = "output",
show_confidence: bool = True
) -> np.ndarray:
"""
在图像上绘制检测结果和距离信息
Args:
image: 原始图像
persons: 检测到的人物列表每个元素包含bbox和confidence
distances: 对应的距离列表可选
save_path: 保存路径
show_confidence: 是否显示置信度
Returns:
绘制了检测结果的图像
"""
# 创建图像副本
vis_image = image.copy()
# 定义颜色和字体
BOX_COLOR = (0, 255, 0) # 绿色边框
TEXT_COLOR = (255, 255, 255) # 白色文字
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 0.6
THICKNESS = 2
# 遍历每个检测结果
for idx, person in enumerate(persons):
bbox = person['bbox']
conf = person['confidence']
# 绘制边界框
cv2.rectangle(
vis_image,
(bbox[0], bbox[1]),
(bbox[2], bbox[3]),
BOX_COLOR,
THICKNESS
)
# 准备显示文本
text_items = []
if show_confidence:
text_items.append(f"Conf: {conf:.2f}")
if distances and idx < len(distances):
dist_meters = distances[idx] / 1000 # 转换为米
text_items.append(f"Dist: {dist_meters:.2f}m")
# 绘制文本
text = " | ".join(text_items)
if text:
# 获取文本大小
(text_width, text_height), _ = cv2.getTextSize(
text, FONT, FONT_SCALE, THICKNESS
)
# 绘制文本背景
cv2.rectangle(
vis_image,
(bbox[0], bbox[1] - text_height - 10),
(bbox[0] + text_width + 10, bbox[1]),
BOX_COLOR,
-1 # 填充矩形
)
# 绘制文本
cv2.putText(
vis_image,
text,
(bbox[0] + 5, bbox[1] - 5),
FONT,
FONT_SCALE,
TEXT_COLOR,
THICKNESS
)
return vis_image
@staticmethod
def save_detection_image(
image: np.ndarray,
save_path: str = "output"
) -> str:
"""
保存检测结果图像
Args:
image: 要保存的图像
save_path: 保存路径
Returns:
保存的文件路径
"""
# 确保输出目录存在
os.makedirs(save_path, exist_ok=True)
# 生成文件名(使用时间戳)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"detection_{timestamp}.jpg"
filepath = os.path.join(save_path, filename)
# 保存图像
cv2.imwrite(filepath, image)
return filepath

BIN
yolov8n.pt Normal file

Binary file not shown.