diff --git a/main.py b/main.py index daeb085..b64bfb3 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,7 @@ import yaml import logging from logging.handlers import RotatingFileHandler from datetime import datetime, timedelta -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Tuple import numpy as np from compreface import CompreFace from compreface.service import RecognitionService, DetectionService @@ -49,6 +49,23 @@ class FaceRecognitionSystem: # 识别记录(防止重复识别) self.recognition_history = {} # {person_id: last_recognition_time} + # 显示相关变量 + self.current_display_frame = None + self.last_detection_result = None # 最后的检测结果 + self.last_recognition_result = None # 最后的识别结果 + self.display_info = { + 'quality': 0, + 'face_detected': False, + 'face_box': None, + 'person_name': None, + 'person_role': None, + 'similarity': 0, + 'frame_count': 0, + 'fps': 0 + } + self.last_fps_time = time.time() + self.fps_counter = 0 + self.logger.info("人脸识别系统初始化完成") def _setup_logging(self): @@ -170,7 +187,8 @@ class FaceRecognitionSystem: subject = faces[0]['subjects'][0] return { 'subject': subject['subject'], - 'similarity': subject['similarity'] + 'similarity': subject['similarity'], + 'box': faces[0]['box'] } return None @@ -179,14 +197,14 @@ class FaceRecognitionSystem: self.logger.error(f"人脸识别错误: {e}") return None - def determine_role(self, persion_id: str,similarity: float) -> str: + def determine_role(self, person_id: str, similarity: float) -> Tuple[str, str]: """根据相似度确定角色""" role_config = self.config['role_mapping'] if similarity < role_config['stranger_threshold']: return "未知", "陌生人" else: - t = persion_id.split("_") + t = person_id.split("_") name = t[0] role = "员工" if len(t) == 1 else "访客" return name, role @@ -203,6 +221,139 @@ class FaceRecognitionSystem: return elapsed >= cooldown + def draw_info_on_frame(self, frame: np.ndarray) -> np.ndarray: + """在帧上绘制检测和识别信息""" + display_frame = frame.copy() + h, w = display_frame.shape[:2] + + # 绘制人脸框和识别结果 + if self.display_info['face_detected'] and self.display_info['face_box']: + box = self.display_info['face_box'] + x_min = int(box['x_min']) + y_min = int(box['y_min']) + x_max = int(box['x_max']) + y_max = int(box['y_max']) + + # 根据识别状态选择颜色 + if self.display_info['person_name']: + # 已识别 - 绿色 + color = (0, 255, 0) + thickness = 3 + else: + # 仅检测到 - 黄色 + color = (0, 255, 255) + thickness = 2 + + # 绘制人脸框 + cv2.rectangle(display_frame, (x_min, y_min), (x_max, y_max), color, thickness) + + # 绘制识别信息 + if self.display_info['person_name']: + name = self.display_info['person_name'] + role = self.display_info['person_role'] + similarity = self.display_info['similarity'] + + # 准备文本 + text_lines = [ + f"Name: {name}", + f"Role: {role}", + f"Similarity: {similarity:.2%}" + ] + + # 计算文本背景框 + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 0.6 + font_thickness = 2 + padding = 10 + line_height = 30 + + # 绘制文本背景 + bg_height = len(text_lines) * line_height + padding * 2 + bg_y_start = max(0, y_min - bg_height - 10) + cv2.rectangle( + display_frame, + (x_min, bg_y_start), + (x_max, bg_y_start + bg_height), + (0, 0, 0), + -1 + ) + cv2.rectangle( + display_frame, + (x_min, bg_y_start), + (x_max, bg_y_start + bg_height), + color, + 2 + ) + + # 绘制文本 + for i, text in enumerate(text_lines): + y_pos = bg_y_start + padding + (i + 1) * line_height - 5 + cv2.putText( + display_frame, + text, + (x_min + padding, y_pos), + font, + font_scale, + (255, 255, 255), + font_thickness + ) + + # 绘制状态信息面板 + panel_height = 180 + panel_bg = np.zeros((panel_height, w, 3), dtype=np.uint8) + panel_bg[:] = (40, 40, 40) + + # 状态信息 + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 0.5 + font_thickness = 1 + text_color = (255, 255, 255) + y_offset = 25 + x_offset = 10 + + status_texts = [ + f"FPS: {self.display_info['fps']:.1f}", + f"Frame: {self.display_info['frame_count']}", + f"Quality: {self.display_info['quality']:.1f}", + f"Face Detected: {'Yes' if self.display_info['face_detected'] else 'No'}", + f"Robot Speaking: {'Yes' if self.robot_status['is_speaking'] else 'No'}", + f"Robot Thinking: {'Yes' if self.robot_status['is_thinking'] else 'No'}", + ] + + # 如果人脸持续出现,显示倒计时 + if self.face_present_start: + elapsed = (datetime.now() - self.face_present_start).total_seconds() + face_duration = self.config['face_detection']['face_present_duration'] + remaining = max(0, face_duration - elapsed) + status_texts.append(f"Recognition in: {remaining:.1f}s") + + for i, text in enumerate(status_texts): + cv2.putText( + panel_bg, + text, + (x_offset, y_offset + i * 25), + font, + font_scale, + text_color, + font_thickness + ) + + # 将面板添加到画面底部 + display_frame = np.vstack([display_frame, panel_bg]) + + return display_frame + + def update_fps(self): + """更新FPS计算""" + self.fps_counter += 1 + current_time = time.time() + elapsed = current_time - self.last_fps_time + + if elapsed >= 1.0: + self.display_info['fps'] = self.fps_counter / elapsed + self.fps_counter = 0 + self.last_fps_time = current_time + async def send_websocket_message(self, message: Dict[str, Any]): """发送WebSocket消息""" if self.ws: @@ -238,7 +389,6 @@ class FaceRecognitionSystem: if self.ws: message = await self.ws.recv() data = json.loads(message) - # print("recv: data: ", data) if data.get('type') == 'status': status = data.get('message', {}) @@ -292,6 +442,9 @@ class FaceRecognitionSystem: self.logger.info("开始处理视频流") + # 创建显示窗口 + cv2.namedWindow('Face Recognition System', cv2.WINDOW_NORMAL) + try: while True: ret, frame = self.camera.read() @@ -301,96 +454,146 @@ class FaceRecognitionSystem: continue self.frame_count += 1 + self.display_info['frame_count'] = self.frame_count + + # 更新FPS + self.update_fps() + + # 默认清空检测和识别结果 + should_detect = False # 检查是否到达检测间隔 - if self.frame_count % frame_interval != 0: - await asyncio.sleep(0.01) - continue - - # 检查机器人状态 - if not self.can_perform_detection(): - self.logger.debug("机器人正在说话或思考,跳过检测") - self.face_present_start = None - await asyncio.sleep(0.01) - continue - - # 评估帧质量 - quality = self.assess_frame_quality(frame) - if quality < quality_threshold: - self.logger.debug(f"帧质量不足: {quality:.2f}") - await asyncio.sleep(0.01) - continue - - # 检测人脸 - face = self.detect_faces(frame) - - if face: - # 记录人脸出现时间 - if self.face_present_start is None: - self.face_present_start = datetime.now() - self.logger.info("检测到人脸,开始计时") - - # 检查是否满足持续出现时长 - elapsed = (datetime.now() - self.face_present_start).total_seconds() - - if elapsed >= face_duration: - self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别") + if self.frame_count % frame_interval == 0: + # 检查机器人状态 + if self.can_perform_detection(): + # 评估帧质量 + quality = self.assess_frame_quality(frame) + self.display_info['quality'] = quality - # 进行人脸识别 - recognition_result = self.recognize_face(frame) + if quality >= quality_threshold: + should_detect = True + else: + self.logger.debug(f"帧质量不足: {quality:.2f}") + else: + self.logger.debug("机器人正在说话或思考,跳过检测") + self.face_present_start = None + self.display_info['quality'] = self.assess_frame_quality(frame) + else: + self.display_info['quality'] = 0 + + # 执行人脸检测 + if should_detect: + face = self.detect_faces(frame) + + if face: + self.display_info['face_detected'] = True + self.display_info['face_box'] = face['box'] - if recognition_result: - person_id = recognition_result['subject'] - similarity = recognition_result['similarity'] + # 记录人脸出现时间 + if self.face_present_start is None: + self.face_present_start = datetime.now() + self.logger.info("检测到人脸,开始计时") + # 清空之前的识别结果 + self.display_info['person_name'] = None + self.display_info['person_role'] = None + self.display_info['similarity'] = 0 + + # 检查是否满足持续出现时长 + elapsed = (datetime.now() - self.face_present_start).total_seconds() + + if elapsed >= face_duration: + self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别") - # 检查是否应该识别(防止重复) - if self.should_recognize(person_id): - role = self.determine_role(person_id,similarity) + # 进行人脸识别 + recognition_result = self.recognize_face(frame) + + if recognition_result: + person_id = recognition_result['subject'] + similarity = recognition_result['similarity'] - self.logger.info( - f"识别到: {person_id}, 相似度: {similarity:.6f}, 角色: {role}" - ) + # 更新显示信息 + self.display_info['face_box'] = recognition_result['box'] - # 发送识别结果 - reception_msg = { - "type": "start_reception", - "message": { - "name": person_id, - "role": role + # 检查是否应该识别(防止重复) + if self.should_recognize(person_id): + name, role = self.determine_role(person_id, similarity) + + # 更新显示信息 + self.display_info['person_name'] = name + self.display_info['person_role'] = role + self.display_info['similarity'] = similarity + + self.logger.info( + f"识别到: {name}, 相似度: {similarity:.6f}, 角色: {role}" + ) + + # 发送识别结果 + reception_msg = { + "type": "start_reception", + "message": { + "name": name, + "role": role + } } - } - await self.send_websocket_message(reception_msg) - - # 记录识别时间 - self.recognition_history[person_id] = datetime.now() + await self.send_websocket_message(reception_msg) + + # 记录识别时间 + self.recognition_history[person_id] = datetime.now() + else: + # 在冷却期内,继续显示上次的识别结果 + pass # 重置计时器 self.face_present_start = None - else: - # 未识别到已知人脸 - if self.should_recognize("unknown"): - self.logger.info("检测到陌生人") - - reception_msg = { - "type": "start_reception", - "message": { - "name": "未知访客", - "role": "陌生人" + else: + # 未识别到已知人脸 + if self.should_recognize("unknown"): + self.logger.info("检测到陌生人") + + # 更新显示信息 + self.display_info['person_name'] = "未知访客" + self.display_info['person_role'] = "陌生人" + self.display_info['similarity'] = 0 + + reception_msg = { + "type": "start_reception", + "message": { + "name": "未知访客", + "role": "陌生人" + } } - } - await self.send_websocket_message(reception_msg) + await self.send_websocket_message(reception_msg) + + self.recognition_history["unknown"] = datetime.now() - self.recognition_history["unknown"] = datetime.now() - - # 重置计时器 - self.face_present_start = None - else: - # 没有检测到人脸,重置计时器 - if self.face_present_start is not None: - self.logger.debug("人脸消失,重置计时器") - self.face_present_start = None + # 重置计时器 + self.face_present_start = None + else: + # 没有检测到人脸 + self.display_info['face_detected'] = False + self.display_info['face_box'] = None + + # 重置计时器和识别信息 + if self.face_present_start is not None: + self.logger.debug("人脸消失,重置计时器") + self.face_present_start = None + + # 清空识别信息(可选,如果想保留上次识别结果可以注释掉) + # self.display_info['person_name'] = None + # self.display_info['person_role'] = None + # self.display_info['similarity'] = 0 - await asyncio.sleep(0.01) + # 绘制信息并显示 + display_frame = self.draw_info_on_frame(frame) + cv2.imshow('Face Recognition System', display_frame) + + # 检查键盘输入 + key = cv2.waitKey(1) & 0xFF + if key == ord('q') or key == 27: # 'q' 或 ESC 退出 + self.logger.info("用户请求退出") + break + + await asyncio.sleep(0.001) except Exception as e: self.logger.error(f"视频流处理错误: {e}") @@ -398,6 +601,7 @@ class FaceRecognitionSystem: if self.camera: self.camera.release() self.logger.info("摄像头已释放") + cv2.destroyAllWindows() async def run(self): """运行系统"""