添加cv2绘制实时显示摄像头画面功能

This commit is contained in:
haotian 2025-09-30 11:11:38 +08:00
parent 24a0e85718
commit 8ef2ba9130

366
main.py
View File

@ -6,7 +6,7 @@ import yaml
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, Tuple
import numpy as np
from compreface import CompreFace
from compreface.service import RecognitionService, DetectionService
@ -49,6 +49,23 @@ class FaceRecognitionSystem:
# 识别记录(防止重复识别)
self.recognition_history = {} # {person_id: last_recognition_time}
# 显示相关变量
self.current_display_frame = None
self.last_detection_result = None # 最后的检测结果
self.last_recognition_result = None # 最后的识别结果
self.display_info = {
'quality': 0,
'face_detected': False,
'face_box': None,
'person_name': None,
'person_role': None,
'similarity': 0,
'frame_count': 0,
'fps': 0
}
self.last_fps_time = time.time()
self.fps_counter = 0
self.logger.info("人脸识别系统初始化完成")
def _setup_logging(self):
@ -170,7 +187,8 @@ class FaceRecognitionSystem:
subject = faces[0]['subjects'][0]
return {
'subject': subject['subject'],
'similarity': subject['similarity']
'similarity': subject['similarity'],
'box': faces[0]['box']
}
return None
@ -179,14 +197,14 @@ class FaceRecognitionSystem:
self.logger.error(f"人脸识别错误: {e}")
return None
def determine_role(self, persion_id: str,similarity: float) -> str:
def determine_role(self, person_id: str, similarity: float) -> Tuple[str, str]:
"""根据相似度确定角色"""
role_config = self.config['role_mapping']
if similarity < role_config['stranger_threshold']:
return "未知", "陌生人"
else:
t = persion_id.split("_")
t = person_id.split("_")
name = t[0]
role = "员工" if len(t) == 1 else "访客"
return name, role
@ -203,6 +221,139 @@ class FaceRecognitionSystem:
return elapsed >= cooldown
def draw_info_on_frame(self, frame: np.ndarray) -> np.ndarray:
"""在帧上绘制检测和识别信息"""
display_frame = frame.copy()
h, w = display_frame.shape[:2]
# 绘制人脸框和识别结果
if self.display_info['face_detected'] and self.display_info['face_box']:
box = self.display_info['face_box']
x_min = int(box['x_min'])
y_min = int(box['y_min'])
x_max = int(box['x_max'])
y_max = int(box['y_max'])
# 根据识别状态选择颜色
if self.display_info['person_name']:
# 已识别 - 绿色
color = (0, 255, 0)
thickness = 3
else:
# 仅检测到 - 黄色
color = (0, 255, 255)
thickness = 2
# 绘制人脸框
cv2.rectangle(display_frame, (x_min, y_min), (x_max, y_max), color, thickness)
# 绘制识别信息
if self.display_info['person_name']:
name = self.display_info['person_name']
role = self.display_info['person_role']
similarity = self.display_info['similarity']
# 准备文本
text_lines = [
f"Name: {name}",
f"Role: {role}",
f"Similarity: {similarity:.2%}"
]
# 计算文本背景框
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
font_thickness = 2
padding = 10
line_height = 30
# 绘制文本背景
bg_height = len(text_lines) * line_height + padding * 2
bg_y_start = max(0, y_min - bg_height - 10)
cv2.rectangle(
display_frame,
(x_min, bg_y_start),
(x_max, bg_y_start + bg_height),
(0, 0, 0),
-1
)
cv2.rectangle(
display_frame,
(x_min, bg_y_start),
(x_max, bg_y_start + bg_height),
color,
2
)
# 绘制文本
for i, text in enumerate(text_lines):
y_pos = bg_y_start + padding + (i + 1) * line_height - 5
cv2.putText(
display_frame,
text,
(x_min + padding, y_pos),
font,
font_scale,
(255, 255, 255),
font_thickness
)
# 绘制状态信息面板
panel_height = 180
panel_bg = np.zeros((panel_height, w, 3), dtype=np.uint8)
panel_bg[:] = (40, 40, 40)
# 状态信息
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
font_thickness = 1
text_color = (255, 255, 255)
y_offset = 25
x_offset = 10
status_texts = [
f"FPS: {self.display_info['fps']:.1f}",
f"Frame: {self.display_info['frame_count']}",
f"Quality: {self.display_info['quality']:.1f}",
f"Face Detected: {'Yes' if self.display_info['face_detected'] else 'No'}",
f"Robot Speaking: {'Yes' if self.robot_status['is_speaking'] else 'No'}",
f"Robot Thinking: {'Yes' if self.robot_status['is_thinking'] else 'No'}",
]
# 如果人脸持续出现,显示倒计时
if self.face_present_start:
elapsed = (datetime.now() - self.face_present_start).total_seconds()
face_duration = self.config['face_detection']['face_present_duration']
remaining = max(0, face_duration - elapsed)
status_texts.append(f"Recognition in: {remaining:.1f}s")
for i, text in enumerate(status_texts):
cv2.putText(
panel_bg,
text,
(x_offset, y_offset + i * 25),
font,
font_scale,
text_color,
font_thickness
)
# 将面板添加到画面底部
display_frame = np.vstack([display_frame, panel_bg])
return display_frame
def update_fps(self):
"""更新FPS计算"""
self.fps_counter += 1
current_time = time.time()
elapsed = current_time - self.last_fps_time
if elapsed >= 1.0:
self.display_info['fps'] = self.fps_counter / elapsed
self.fps_counter = 0
self.last_fps_time = current_time
async def send_websocket_message(self, message: Dict[str, Any]):
"""发送WebSocket消息"""
if self.ws:
@ -238,7 +389,6 @@ class FaceRecognitionSystem:
if self.ws:
message = await self.ws.recv()
data = json.loads(message)
# print("recv: data: ", data)
if data.get('type') == 'status':
status = data.get('message', {})
@ -292,6 +442,9 @@ class FaceRecognitionSystem:
self.logger.info("开始处理视频流")
# 创建显示窗口
cv2.namedWindow('Face Recognition System', cv2.WINDOW_NORMAL)
try:
while True:
ret, frame = self.camera.read()
@ -301,96 +454,146 @@ class FaceRecognitionSystem:
continue
self.frame_count += 1
self.display_info['frame_count'] = self.frame_count
# 更新FPS
self.update_fps()
# 默认清空检测和识别结果
should_detect = False
# 检查是否到达检测间隔
if self.frame_count % frame_interval != 0:
await asyncio.sleep(0.01)
continue
# 检查机器人状态
if not self.can_perform_detection():
self.logger.debug("机器人正在说话或思考,跳过检测")
self.face_present_start = None
await asyncio.sleep(0.01)
continue
# 评估帧质量
quality = self.assess_frame_quality(frame)
if quality < quality_threshold:
self.logger.debug(f"帧质量不足: {quality:.2f}")
await asyncio.sleep(0.01)
continue
# 检测人脸
face = self.detect_faces(frame)
if face:
# 记录人脸出现时间
if self.face_present_start is None:
self.face_present_start = datetime.now()
self.logger.info("检测到人脸,开始计时")
# 检查是否满足持续出现时长
elapsed = (datetime.now() - self.face_present_start).total_seconds()
if elapsed >= face_duration:
self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别")
if self.frame_count % frame_interval == 0:
# 检查机器人状态
if self.can_perform_detection():
# 评估帧质量
quality = self.assess_frame_quality(frame)
self.display_info['quality'] = quality
# 进行人脸识别
recognition_result = self.recognize_face(frame)
if quality >= quality_threshold:
should_detect = True
else:
self.logger.debug(f"帧质量不足: {quality:.2f}")
else:
self.logger.debug("机器人正在说话或思考,跳过检测")
self.face_present_start = None
self.display_info['quality'] = self.assess_frame_quality(frame)
else:
self.display_info['quality'] = 0
# 执行人脸检测
if should_detect:
face = self.detect_faces(frame)
if face:
self.display_info['face_detected'] = True
self.display_info['face_box'] = face['box']
if recognition_result:
person_id = recognition_result['subject']
similarity = recognition_result['similarity']
# 记录人脸出现时间
if self.face_present_start is None:
self.face_present_start = datetime.now()
self.logger.info("检测到人脸,开始计时")
# 清空之前的识别结果
self.display_info['person_name'] = None
self.display_info['person_role'] = None
self.display_info['similarity'] = 0
# 检查是否满足持续出现时长
elapsed = (datetime.now() - self.face_present_start).total_seconds()
if elapsed >= face_duration:
self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别")
# 检查是否应该识别(防止重复)
if self.should_recognize(person_id):
role = self.determine_role(person_id,similarity)
# 进行人脸识别
recognition_result = self.recognize_face(frame)
if recognition_result:
person_id = recognition_result['subject']
similarity = recognition_result['similarity']
self.logger.info(
f"识别到: {person_id}, 相似度: {similarity:.6f}, 角色: {role}"
)
# 更新显示信息
self.display_info['face_box'] = recognition_result['box']
# 发送识别结果
reception_msg = {
"type": "start_reception",
"message": {
"name": person_id,
"role": role
# 检查是否应该识别(防止重复)
if self.should_recognize(person_id):
name, role = self.determine_role(person_id, similarity)
# 更新显示信息
self.display_info['person_name'] = name
self.display_info['person_role'] = role
self.display_info['similarity'] = similarity
self.logger.info(
f"识别到: {name}, 相似度: {similarity:.6f}, 角色: {role}"
)
# 发送识别结果
reception_msg = {
"type": "start_reception",
"message": {
"name": name,
"role": role
}
}
}
await self.send_websocket_message(reception_msg)
# 记录识别时间
self.recognition_history[person_id] = datetime.now()
await self.send_websocket_message(reception_msg)
# 记录识别时间
self.recognition_history[person_id] = datetime.now()
else:
# 在冷却期内,继续显示上次的识别结果
pass
# 重置计时器
self.face_present_start = None
else:
# 未识别到已知人脸
if self.should_recognize("unknown"):
self.logger.info("检测到陌生人")
reception_msg = {
"type": "start_reception",
"message": {
"name": "未知访客",
"role": "陌生人"
else:
# 未识别到已知人脸
if self.should_recognize("unknown"):
self.logger.info("检测到陌生人")
# 更新显示信息
self.display_info['person_name'] = "未知访客"
self.display_info['person_role'] = "陌生人"
self.display_info['similarity'] = 0
reception_msg = {
"type": "start_reception",
"message": {
"name": "未知访客",
"role": "陌生人"
}
}
}
await self.send_websocket_message(reception_msg)
await self.send_websocket_message(reception_msg)
self.recognition_history["unknown"] = datetime.now()
self.recognition_history["unknown"] = datetime.now()
# 重置计时器
self.face_present_start = None
else:
# 没有检测到人脸,重置计时器
if self.face_present_start is not None:
self.logger.debug("人脸消失,重置计时器")
self.face_present_start = None
# 重置计时器
self.face_present_start = None
else:
# 没有检测到人脸
self.display_info['face_detected'] = False
self.display_info['face_box'] = None
# 重置计时器和识别信息
if self.face_present_start is not None:
self.logger.debug("人脸消失,重置计时器")
self.face_present_start = None
# 清空识别信息(可选,如果想保留上次识别结果可以注释掉)
# self.display_info['person_name'] = None
# self.display_info['person_role'] = None
# self.display_info['similarity'] = 0
await asyncio.sleep(0.01)
# 绘制信息并显示
display_frame = self.draw_info_on_frame(frame)
cv2.imshow('Face Recognition System', display_frame)
# 检查键盘输入
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # 'q' 或 ESC 退出
self.logger.info("用户请求退出")
break
await asyncio.sleep(0.001)
except Exception as e:
self.logger.error(f"视频流处理错误: {e}")
@ -398,6 +601,7 @@ class FaceRecognitionSystem:
if self.camera:
self.camera.release()
self.logger.info("摄像头已释放")
cv2.destroyAllWindows()
async def run(self):
"""运行系统"""