From 0181569a5ee80e35cfc0fb472d35655996e1cf13 Mon Sep 17 00:00:00 2001 From: haotian <2421912570@qq.com> Date: Tue, 30 Sep 2025 18:07:52 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AF=B9websocket=E6=96=AD?= =?UTF-8?q?=E8=BF=9E=E7=9A=84=E6=94=AF=E6=8C=81,=20=E8=BF=98=E6=98=AF?= =?UTF-8?q?=E6=9C=89=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- face_rec.py | 74 +++++- main.py | 707 ---------------------------------------------------- 2 files changed, 68 insertions(+), 713 deletions(-) delete mode 100644 main.py diff --git a/face_rec.py b/face_rec.py index 8d17001..833e0d8 100644 --- a/face_rec.py +++ b/face_rec.py @@ -126,8 +126,10 @@ class FaceRecognitionSystem: # Linux系统 else: font_paths = [ + # "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf", "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", "/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", + # "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", ] # 查找可用的字体 @@ -438,12 +440,23 @@ class FaceRecognitionSystem: try: await self.ws.send(json.dumps(message)) self.logger.debug(f"发送消息: {message}") + except websockets.exceptions.ConnectionClosed: + self.logger.warning("WebSocket连接已关闭,无法发送消息") + self.ws = None # 标记连接已断开 except Exception as e: - self.logger.error(f"发送WebSocket消息失败: {e}") + # 避免重复记录相同的错误 + error_msg = str(e) + if "no close frame received or sent" not in error_msg: + self.logger.error(f"发送WebSocket消息失败: {e}") + else: + self.logger.debug(f"WebSocket连接异常: {e}") + self.ws = None # 标记连接已断开 async def query_robot_status(self): """定期查询机器人状态""" interval = self.config['websocket']['status_interval'] + consecutive_errors = 0 # 连续错误计数 + max_consecutive_errors = 3 # 最大连续错误次数 while True: try: @@ -453,11 +466,26 @@ class FaceRecognitionSystem: "message": "" } await self.send_websocket_message(status_msg) + consecutive_errors = 0 # 重置错误计数 + else: + # WebSocket未连接,不需要频繁记录 + consecutive_errors += 1 + if consecutive_errors >= max_consecutive_errors: + self.logger.debug("WebSocket未连接,等待重连...") + consecutive_errors = 0 # 重置计数,避免频繁日志 await asyncio.sleep(interval) + except websockets.exceptions.ConnectionClosed: + self.logger.warning("WebSocket连接已关闭,停止状态查询") + self.ws = None + break except Exception as e: - self.logger.error(f"查询状态错误: {e}") + consecutive_errors += 1 + if consecutive_errors < max_consecutive_errors: + self.logger.error(f"查询状态错误: {e}") + elif consecutive_errors == max_consecutive_errors: + self.logger.error(f"连续{max_consecutive_errors}次查询失败,将减少错误日志输出") await asyncio.sleep(interval) async def handle_websocket_messages(self): @@ -486,13 +514,22 @@ class FaceRecognitionSystem: async def connect_websocket(self): """连接WebSocket""" reconnect_delay = self.config['websocket']['reconnect_delay'] + connection_errors = 0 # 连续连接错误计数 while True: try: self.logger.info(f"连接WebSocket: {self.ws_url}") - async with websockets.connect(self.ws_url) as ws: + + # 设置连接超时 + async with websockets.connect( + self.ws_url, + ping_interval=20, # 每20秒发送ping + ping_timeout=10, # ping超时10秒 + close_timeout=5 # 关闭超时5秒 + ) as ws: self.ws = ws self.logger.info("WebSocket连接成功") + connection_errors = 0 # 重置错误计数 # 同时运行状态查询和消息接收 await asyncio.gather( @@ -500,11 +537,35 @@ class FaceRecognitionSystem: self.handle_websocket_messages() ) - except Exception as e: - self.logger.error(f"WebSocket连接错误: {e}") + except websockets.exceptions.ConnectionClosed: + self.logger.warning("WebSocket连接已正常关闭") self.ws = None + except ConnectionRefusedError: + connection_errors += 1 + if connection_errors <= 3: + self.logger.error(f"WebSocket连接被拒绝: 无法连接到 {self.ws_url}") + elif connection_errors == 4: + self.logger.error(f"WebSocket连接持续失败,将减少错误日志输出") + else: + self.logger.debug(f"WebSocket连接失败 (第{connection_errors}次)") + self.ws = None + except Exception as e: + connection_errors += 1 + error_msg = str(e) + # 过滤掉常见的连接错误信息 + if "no close frame" in error_msg or "Connection closed" in error_msg: + self.logger.debug(f"WebSocket连接异常: {e}") + else: + if connection_errors <= 3: + self.logger.error(f"WebSocket连接错误: {e}") + elif connection_errors == 4: + self.logger.error(f"WebSocket连接持续失败,将减少错误日志输出") + self.ws = None + + # 显示重连信息 + if connection_errors <= 3: self.logger.info(f"{reconnect_delay}秒后重连...") - await asyncio.sleep(reconnect_delay) + await asyncio.sleep(reconnect_delay) def can_perform_detection(self) -> bool: """检查是否可以进行人脸检测""" @@ -632,6 +693,7 @@ class FaceRecognitionSystem: } } await self.send_websocket_message(reception_msg) + self.logger.info(f"发送消息: {reception_msg}") # 记录识别时间 self.recognition_history[person_id] = datetime.now() diff --git a/main.py b/main.py deleted file mode 100644 index 99a7ea6..0000000 --- a/main.py +++ /dev/null @@ -1,707 +0,0 @@ -import cv2 -import asyncio -import websockets -import json -import yaml -import logging -from logging.handlers import RotatingFileHandler -from datetime import datetime, timedelta -from typing import Optional, Dict, Any, Tuple -import numpy as np -from compreface import CompreFace -from compreface.service import RecognitionService, DetectionService -import time -from collections import defaultdict -from PIL import Image, ImageDraw, ImageFont - - -class FaceRecognitionSystem: - def __init__(self, config_path: str = "config.yaml"): - """初始化人脸识别系统""" - # 加载配置 - with open(config_path, 'r', encoding='utf-8') as f: - self.config = yaml.safe_load(f) - - # 设置日志 - self._setup_logging() - - # 初始化CompreFace - self._init_compreface() - - # 初始化摄像头 - self.camera = None - - # WebSocket连接 - self.ws = None - self.ws_url = self.config['websocket']['url'] - - # 状态变量 - self.robot_status = { - 'is_speaking': False, - 'is_thinking': False, - 'listening': False - } - - # 人脸检测状态 - self.frame_count = 0 - self.face_present_start = None - self.current_face_id = None - - # 识别记录(防止重复识别) - self.recognition_history = {} # {person_id: last_recognition_time} - - # 显示相关变量 - self.current_display_frame = None - self.last_detection_result = None # 最后的检测结果 - self.last_recognition_result = None # 最后的识别结果 - self.display_info = { - 'quality': 0, - 'face_detected': False, - 'face_box': None, - 'person_name': None, - 'person_role': None, - 'similarity': 0, - 'frame_count': 0, - 'fps': 0 - } - self.last_fps_time = time.time() - self.fps_counter = 0 - - # 加载中文字体 - self.font_path = self._get_chinese_font() - self.font_small = ImageFont.truetype(self.font_path, 20) - self.font_medium = ImageFont.truetype(self.font_path, 24) - self.font_large = ImageFont.truetype(self.font_path, 28) - - self.logger.info("人脸识别系统初始化完成") - - def _setup_logging(self): - """设置日志系统""" - log_config = self.config['logging'] - - self.logger = logging.getLogger('FaceRecognition') - self.logger.setLevel(getattr(logging, log_config['level'])) - - # 文件处理器 - file_handler = RotatingFileHandler( - log_config['file'], - maxBytes=log_config['max_bytes'], - backupCount=log_config['backup_count'] - ) - - # 控制台处理器 - console_handler = logging.StreamHandler() - - # 格式化 - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - ) - file_handler.setFormatter(formatter) - console_handler.setFormatter(formatter) - - self.logger.addHandler(file_handler) - self.logger.addHandler(console_handler) - - def _get_chinese_font(self) -> str: - """获取中文字体路径""" - import platform - import os - - system = platform.system() - - # Windows系统 - if system == "Windows": - font_paths = [ - "C:/Windows/Fonts/msyh.ttc", # 微软雅黑 - "C:/Windows/Fonts/simhei.ttf", # 黑体 - "C:/Windows/Fonts/simsun.ttc", # 宋体 - ] - # macOS系统 - elif system == "Darwin": - font_paths = [ - "/System/Library/Fonts/PingFang.ttc", # 苹方 - "/System/Library/Fonts/STHeiti Medium.ttc", # 黑体 - "/Library/Fonts/Arial Unicode.ttf", - ] - # Linux系统 - else: - font_paths = [ - "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", - "/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", - ] - - # 查找可用的字体 - for font_path in font_paths: - if os.path.exists(font_path): - self.logger.info(f"使用中文字体: {font_path}") - return font_path - - # 如果没找到,尝试使用配置文件中的字体路径 - if 'font_path' in self.config.get('display', {}): - custom_font = self.config['display']['font_path'] - if os.path.exists(custom_font): - self.logger.info(f"使用自定义字体: {custom_font}") - return custom_font - - # 默认使用一个基本字体(不支持中文,但至少不会报错) - self.logger.warning("未找到中文字体,将使用默认字体(可能无法显示中文)") - return "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf" - - def _init_compreface(self): - """初始化CompreFace SDK""" - cf_config = self.config['compreface'] - - # 创建CompreFace实例 - compre_face = CompreFace( - cf_config['host'], - cf_config['port'], - { - "limit": 0, - "det_prob_threshold": 0.8, - "prediction_count": 1 - } - ) - - # 初始化识别和检测服务 - self.recognition_service: RecognitionService = compre_face.init_face_recognition( - cf_config['recognition_api_key'] - ) - - self.detection_service: DetectionService = compre_face.init_face_detection( - cf_config['detection_api_key'] - ) - - self.logger.info("CompreFace服务初始化完成") - - def _init_camera(self): - """初始化摄像头""" - cam_config = self.config['camera'] - - self.camera = cv2.VideoCapture(cam_config['device_id']) - self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width']) - self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height']) - self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps']) - - if not self.camera.isOpened(): - raise RuntimeError("无法打开摄像头") - - self.logger.info("摄像头初始化完成") - - def assess_frame_quality(self, frame: np.ndarray) -> float: - """评估帧质量(使用Laplacian方差检测模糊度)""" - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() - return laplacian_var - - def detect_faces(self, frame: np.ndarray) -> Optional[Dict[str, Any]]: - """检测人脸""" - try: - # 将帧编码为JPEG - _, img_encoded = cv2.imencode('.jpg', frame) - - # 调用CompreFace检测API - result = self.detection_service.detect(img_encoded.tobytes()) - - if result and 'result' in result and len(result['result']) > 0: - faces = result['result'] - - # 过滤掉太小的人脸 - min_size = self.config['face_detection']['min_face_size'] - valid_faces = [ - face for face in faces - if face['box']['x_max'] - face['box']['x_min'] >= min_size - and face['box']['y_max'] - face['box']['y_min'] >= min_size - ] - - if valid_faces: - # 返回第一个(最大的)人脸 - return valid_faces[0] - - return None - - except Exception as e: - self.logger.error(f"人脸检测错误: {e}") - return None - - def recognize_face(self, frame: np.ndarray) -> Optional[Dict[str, Any]]: - """识别人脸""" - try: - # 将帧编码为JPEG - _, img_encoded = cv2.imencode('.jpg', frame) - - # 调用CompreFace识别API - result = self.recognition_service.recognize(img_encoded.tobytes()) - - if result and 'result' in result and len(result['result']) > 0: - faces = result['result'] - if len(faces[0]['subjects']) > 0: - # 返回第一个识别结果 - subject = faces[0]['subjects'][0] - return { - 'subject': subject['subject'], - 'similarity': subject['similarity'], - 'box': faces[0]['box'] - } - - return None - - except Exception as e: - self.logger.error(f"人脸识别错误: {e}") - return None - - def determine_role(self, person_id: str, similarity: float) -> Tuple[str, str]: - """根据相似度确定角色""" - role_config = self.config['role_mapping'] - - if similarity < role_config['stranger_threshold']: - return "未知", "陌生人" - else: - t = person_id.split("_") - name = t[0] - role = "员工" if len(t) == 1 else "访客" - return name, role - - def should_recognize(self, person_id: str) -> bool: - """检查是否应该识别(防止重复识别)""" - cooldown = self.config['face_recognition']['recognition_cooldown'] - - if person_id not in self.recognition_history: - return True - - last_time = self.recognition_history[person_id] - elapsed = (datetime.now() - last_time).total_seconds() - - return elapsed >= cooldown - - def cv2_add_chinese_text(self, img: np.ndarray, text: str, position: Tuple[int, int], - font: ImageFont.FreeTypeFont, text_color: Tuple[int, int, int]) -> np.ndarray: - """在OpenCV图像上添加中文文本""" - # 转换为PIL图像 - img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) - draw = ImageDraw.Draw(img_pil) - - # 绘制文本 - draw.text(position, text, font=font, fill=text_color) - - # 转换回OpenCV格式 - img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) - return img - - def draw_info_on_frame(self, frame: np.ndarray) -> np.ndarray: - """在帧上绘制检测和识别信息""" - display_frame = frame.copy() - h, w = display_frame.shape[:2] - - # 绘制人脸框和识别结果 - if self.display_info['face_detected'] and self.display_info['face_box']: - box = self.display_info['face_box'] - x_min = int(box['x_min']) - y_min = int(box['y_min']) - x_max = int(box['x_max']) - y_max = int(box['y_max']) - - # 根据识别状态选择颜色 - if self.display_info['person_name']: - # 已识别 - 绿色 - color = (0, 255, 0) - thickness = 3 - else: - # 仅检测到 - 黄色 - color = (0, 255, 255) - thickness = 2 - - # 绘制人脸框 - cv2.rectangle(display_frame, (x_min, y_min), (x_max, y_max), color, thickness) - - # 绘制识别信息 - if self.display_info['person_name']: - name = self.display_info['person_name'] - role = self.display_info['person_role'] - similarity = self.display_info['similarity'] - - # 准备文本 - text_lines = [ - f"姓名: {name}", - f"角色: {role}", - f"相似度: {similarity:.2%}" - ] - - # 计算文本背景框 - line_height = 35 - padding = 10 - - # 计算所需的背景高度 - bg_height = len(text_lines) * line_height + padding * 2 - bg_y_start = max(0, y_min - bg_height - 10) - - # 绘制文本背景 - cv2.rectangle( - display_frame, - (x_min, bg_y_start), - (x_max, bg_y_start + bg_height), - (0, 0, 0), - -1 - ) - cv2.rectangle( - display_frame, - (x_min, bg_y_start), - (x_max, bg_y_start + bg_height), - color, - 2 - ) - - # 使用PIL绘制中文文本 - for i, text in enumerate(text_lines): - y_pos = bg_y_start + padding + i * line_height - display_frame = self.cv2_add_chinese_text( - display_frame, - text, - (x_min + padding, y_pos), - self.font_medium, - (255, 255, 255) - ) - - # 绘制状态信息面板 - panel_height = 200 - panel_bg = np.zeros((panel_height, w, 3), dtype=np.uint8) - panel_bg[:] = (40, 40, 40) - - # 状态信息 - y_offset = 30 - x_offset = 15 - line_spacing = 30 - - status_texts = [ - f"帧率: {self.display_info['fps']:.1f} FPS", - f"帧数: {self.display_info['frame_count']}", - f"质量: {self.display_info['quality']:.1f}", - f"检测到人脸: {'是' if self.display_info['face_detected'] else '否'}", - f"机器人说话: {'是' if self.robot_status['is_speaking'] else '否'}", - f"机器人思考: {'是' if self.robot_status['is_thinking'] else '否'}", - ] - - # 如果人脸持续出现,显示倒计时 - if self.face_present_start: - elapsed = (datetime.now() - self.face_present_start).total_seconds() - face_duration = self.config['face_detection']['face_present_duration'] - remaining = max(0, face_duration - elapsed) - status_texts.append(f"识别倒计时: {remaining:.1f}秒") - - # 使用PIL绘制中文状态文本 - for i, text in enumerate(status_texts): - panel_bg = self.cv2_add_chinese_text( - panel_bg, - text, - (x_offset, y_offset + i * line_spacing), - self.font_small, - (255, 255, 255) - ) - - # 将面板添加到画面底部 - display_frame = np.vstack([display_frame, panel_bg]) - - return display_frame - - def update_fps(self): - """更新FPS计算""" - self.fps_counter += 1 - current_time = time.time() - elapsed = current_time - self.last_fps_time - - if elapsed >= 1.0: - self.display_info['fps'] = self.fps_counter / elapsed - self.fps_counter = 0 - self.last_fps_time = current_time - - async def send_websocket_message(self, message: Dict[str, Any]): - """发送WebSocket消息""" - if self.ws: - try: - await self.ws.send(json.dumps(message)) - self.logger.debug(f"发送消息: {message}") - except Exception as e: - self.logger.error(f"发送WebSocket消息失败: {e}") - - async def query_robot_status(self): - """定期查询机器人状态""" - interval = self.config['websocket']['status_interval'] - - while True: - try: - if self.ws: - status_msg = { - "type": "get_status", - "message": "" - } - await self.send_websocket_message(status_msg) - - await asyncio.sleep(interval) - - except Exception as e: - self.logger.error(f"查询状态错误: {e}") - await asyncio.sleep(interval) - - async def handle_websocket_messages(self): - """处理WebSocket接收的消息""" - while True: - try: - if self.ws: - message = await self.ws.recv() - data = json.loads(message) - - if data.get('type') == 'status': - status = data.get('message', {}) - self.robot_status['is_speaking'] = status.get('is_speaking', False) - self.robot_status['is_thinking'] = status.get('is_thinking', False) - self.robot_status['listening'] = status.get('listening', False) - - self.logger.debug(f"机器人状态: {self.robot_status}") - - except websockets.exceptions.ConnectionClosed: - self.logger.warning("WebSocket连接已关闭") - break - except Exception as e: - self.logger.error(f"处理WebSocket消息错误: {e}") - await asyncio.sleep(0.1) - - async def connect_websocket(self): - """连接WebSocket""" - reconnect_delay = self.config['websocket']['reconnect_delay'] - - while True: - try: - self.logger.info(f"连接WebSocket: {self.ws_url}") - async with websockets.connect(self.ws_url) as ws: - self.ws = ws - self.logger.info("WebSocket连接成功") - - # 同时运行状态查询和消息接收 - await asyncio.gather( - self.query_robot_status(), - self.handle_websocket_messages() - ) - - except Exception as e: - self.logger.error(f"WebSocket连接错误: {e}") - self.ws = None - self.logger.info(f"{reconnect_delay}秒后重连...") - await asyncio.sleep(reconnect_delay) - - def can_perform_detection(self) -> bool: - """检查是否可以进行人脸检测""" - return not self.robot_status['is_speaking'] and not self.robot_status['is_thinking'] - - async def process_video_stream(self): - """处理视频流""" - self._init_camera() - - frame_interval = self.config['face_detection']['frame_interval'] - quality_threshold = self.config['face_detection']['quality_threshold'] - face_duration = self.config['face_detection']['face_present_duration'] - - self.logger.info("开始处理视频流") - - # 创建显示窗口并最大化 - window_name = 'Face Recognition System' - cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) - - # 设置窗口为全屏或最大化 - # 方法1: 全屏模式 - # cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) - - # 方法2: 最大化窗口(推荐) - cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_NORMAL) - # 获取屏幕分辨率并设置窗口大小 - try: - import screeninfo - screen = screeninfo.get_monitors()[0] - cv2.resizeWindow(window_name, screen.width, screen.height) - cv2.moveWindow(window_name, 0, 0) - except: - # 如果screeninfo不可用,尝试设置一个较大的窗口 - cv2.resizeWindow(window_name, 1920, 1080) - self.logger.warning("无法获取屏幕分辨率,使用默认窗口大小") - - try: - while True: - ret, frame = self.camera.read() - if not ret: - self.logger.warning("无法读取摄像头帧") - await asyncio.sleep(0.01) - continue - - self.frame_count += 1 - self.display_info['frame_count'] = self.frame_count - - # 更新FPS - self.update_fps() - - # 默认清空检测和识别结果 - should_detect = False - - # 检查是否到达检测间隔 - if self.frame_count % frame_interval == 0: - # 检查机器人状态 - if self.can_perform_detection(): - # 评估帧质量 - quality = self.assess_frame_quality(frame) - self.display_info['quality'] = quality - - if quality >= quality_threshold: - should_detect = True - else: - self.logger.debug(f"帧质量不足: {quality:.2f}") - else: - self.logger.debug("机器人正在说话或思考,跳过检测") - self.face_present_start = None - self.display_info['quality'] = self.assess_frame_quality(frame) - else: - self.display_info['quality'] = 0 - - # 执行人脸检测 - if should_detect: - face = self.detect_faces(frame) - - if face: - self.display_info['face_detected'] = True - self.display_info['face_box'] = face['box'] - - # 记录人脸出现时间 - if self.face_present_start is None: - self.face_present_start = datetime.now() - self.logger.info("检测到人脸,开始计时") - # 清空之前的识别结果 - self.display_info['person_name'] = None - self.display_info['person_role'] = None - self.display_info['similarity'] = 0 - - # 检查是否满足持续出现时长 - elapsed = (datetime.now() - self.face_present_start).total_seconds() - - if elapsed >= face_duration: - self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别") - - # 进行人脸识别 - recognition_result = self.recognize_face(frame) - - if recognition_result: - person_id = recognition_result['subject'] - similarity = recognition_result['similarity'] - - # 更新显示信息 - self.display_info['face_box'] = recognition_result['box'] - - # 检查是否应该识别(防止重复) - if self.should_recognize(person_id): - name, role = self.determine_role(person_id, similarity) - - # 更新显示信息 - self.display_info['person_name'] = name - self.display_info['person_role'] = role - self.display_info['similarity'] = similarity - - self.logger.info( - f"识别到: {name}, 相似度: {similarity:.6f}, 角色: {role}" - ) - - # 发送识别结果 - reception_msg = { - "type": "start_reception", - "message": { - "name": name, - "role": role - } - } - await self.send_websocket_message(reception_msg) - - # 记录识别时间 - self.recognition_history[person_id] = datetime.now() - else: - # 在冷却期内,继续显示上次的识别结果 - pass - - # 重置计时器 - self.face_present_start = None - else: - # 未识别到已知人脸 - if self.should_recognize("unknown"): - self.logger.info("检测到陌生人") - - # 更新显示信息 - self.display_info['person_name'] = "未知访客" - self.display_info['person_role'] = "陌生人" - self.display_info['similarity'] = 0 - - reception_msg = { - "type": "start_reception", - "message": { - "name": "未知访客", - "role": "陌生人" - } - } - await self.send_websocket_message(reception_msg) - - self.recognition_history["unknown"] = datetime.now() - - # 重置计时器 - self.face_present_start = None - else: - # 没有检测到人脸 - self.display_info['face_detected'] = False - self.display_info['face_box'] = None - - # 重置计时器和识别信息 - if self.face_present_start is not None: - self.logger.debug("人脸消失,重置计时器") - self.face_present_start = None - - # 清空识别信息(可选,如果想保留上次识别结果可以注释掉) - # self.display_info['person_name'] = None - # self.display_info['person_role'] = None - # self.display_info['similarity'] = 0 - - # 绘制信息并显示 - display_frame = self.draw_info_on_frame(frame) - cv2.imshow(window_name, display_frame) - - # 检查键盘输入 - key = cv2.waitKey(1) & 0xFF - if key == ord('q') or key == 27: # 'q' 或 ESC 退出 - self.logger.info("用户请求退出") - break - - await asyncio.sleep(0.001) - - except Exception as e: - self.logger.error(f"视频流处理错误: {e}") - finally: - if self.camera: - self.camera.release() - self.logger.info("摄像头已释放") - cv2.destroyAllWindows() - - async def run(self): - """运行系统""" - self.logger.info("启动人脸识别系统") - - # 同时运行WebSocket连接和视频处理 - await asyncio.gather( - self.connect_websocket(), - self.process_video_stream() - ) - - -def main(): - """主函数""" - system = FaceRecognitionSystem("config.yaml") - - try: - asyncio.run(system.run()) - except KeyboardInterrupt: - print("\n系统已停止") - except Exception as e: - print(f"系统错误: {e}") - - -if __name__ == "__main__": - main() \ No newline at end of file