robot_face_rec/face_rec_本地测试版.py
2025-12-05 12:01:20 +08:00

1189 lines
46 KiB
Python

import cv2
import asyncio
import websockets
import json
import yaml
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
import numpy as np
from compreface import CompreFace
from compreface.service import RecognitionService, DetectionService
import time
from collections import defaultdict
from PIL import Image, ImageDraw, ImageFont
import subprocess
import os
class FaceRecognitionSystem:
def __init__(self, config_path: str = "config.yaml"):
"""初始化人脸识别系统"""
# 加载配置
with open(config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
# 设置日志
self._setup_logging()
# 初始化CompreFace
self._init_compreface()
# 初始化摄像头
self.camera = None
# WebSocket连接
self.ws = None
self.ws_url = self.config['websocket']['url']
self.ws_connected = False # WebSocket连接状态标志
self.ws_reconnect_count = 0 # 重连次数计数(用于日志控制)
# 状态变量
self.robot_status = {
'is_speaking': False,
'is_thinking': False,
'listening': False,
'role_name': ''
}
# 人脸检测状态
self.frame_count = 0
self.face_present_start = None
self.current_face_id = None
# 识别记录(防止重复识别)
self.recognition_history = {} # {person_id: last_recognition_time}
# 显示相关变量
self.current_display_frame = None
self.last_detection_result = None # 最后的检测结果
self.last_recognition_result = None # 最后的识别结果
self.display_info = {
'quality': 0,
'face_detected': False,
'face_box': None,
'person_name': None,
'person_role': None,
'similarity': 0,
'frame_count': 0,
'fps': 0
}
self.last_fps_time = time.time()
self.fps_counter = 0
# 加载中文字体
self.font_path = self._get_chinese_font()
self.font_small = ImageFont.truetype(self.font_path, 20)
self.font_medium = ImageFont.truetype(self.font_path, 24)
self.font_large = ImageFont.truetype(self.font_path, 28)
# FFmpeg推流进程
self.ffmpeg_process = None
self.stream_enabled = self.config.get('stream', {}).get('enabled', False)
self.stream_retry_count = 0 # 推流重试计数
self.stream_max_retries = 5 # 最大重试次数
self.stream_last_retry_time = None # 上次重试时间
self.stream_retry_cooldown = 10 # 重试冷却时间(秒)
self.logger.info("人脸识别系统初始化完成")
# 添加摄像头状态跟踪
self.camera_failure_count = 0 # 连续失败次数
self.camera_last_retry_time = None # 上次重试时间
self.camera_retry_cooldown = 3 # 重试冷却时间(秒)
self.camera_max_failures = 5 # 触发重新初始化的失败次数阈值
# 二维码显示相关
self.qrcode_image_path = self.config.get('qrcode', {}).get('image_path', 'qrcode.png')
self.qrcode_display_duration = self.config.get('qrcode', {}).get('display_duration', 10)
self.qrcode_window_name = 'Stranger QR Code'
self.qrcode_display_start_time = None # 二维码显示开始时间
self.qrcode_showing = False # 二维码是否正在显示
def _show_qrcode(self):
"""显示二维码图片"""
try:
if not os.path.exists(self.qrcode_image_path):
self.logger.error(f"二维码图片不存在: {self.qrcode_image_path}")
return False
# 读取二维码图片
qr_image = cv2.imread(self.qrcode_image_path)
if qr_image is None:
self.logger.error(f"无法读取二维码图片: {self.qrcode_image_path}")
return False
# 创建二维码窗口
cv2.namedWindow(self.qrcode_window_name, cv2.WINDOW_NORMAL)
# 设置窗口大小和位置(居中显示)
qr_height, qr_width = qr_image.shape[:2]
try:
import screeninfo
screen = screeninfo.get_monitors()[0]
# 计算居中位置
window_width = min(800, qr_width)
window_height = min(800, qr_height)
x_pos = (screen.width - window_width) // 2
y_pos = (screen.height - window_height) // 2
cv2.resizeWindow(self.qrcode_window_name, window_width, window_height)
cv2.moveWindow(self.qrcode_window_name, x_pos, y_pos)
except:
cv2.resizeWindow(self.qrcode_window_name, 800, 800)
# 显示二维码
cv2.imshow(self.qrcode_window_name, qr_image)
self.qrcode_showing = True
self.qrcode_display_start_time = time.time()
self.logger.info(f"显示二维码图片: {self.qrcode_image_path}")
return True
except Exception as e:
self.logger.error(f"显示二维码失败: {e}")
return False
def _close_qrcode(self):
"""关闭二维码窗口"""
try:
if self.qrcode_showing:
cv2.destroyWindow(self.qrcode_window_name)
self.qrcode_showing = False
self.qrcode_display_start_time = None
self.logger.info("关闭二维码窗口")
except Exception as e:
self.logger.debug(f"关闭二维码窗口时出错: {e}")
def _check_qrcode_timeout(self):
"""检查二维码是否应该关闭"""
if self.qrcode_showing and self.qrcode_display_start_time:
elapsed = time.time() - self.qrcode_display_start_time
if elapsed >= self.qrcode_display_duration:
self._close_qrcode()
return True
return False
def _try_reconnect_camera(self) -> bool:
"""尝试重新连接摄像头"""
current_time = time.time()
# 检查是否在冷却期内
if self.camera_last_retry_time:
elapsed = current_time - self.camera_last_retry_time
if elapsed < self.camera_retry_cooldown:
return False
self.camera_last_retry_time = current_time
self.logger.info("尝试重新连接摄像头...")
cam_config = self.config['camera']
# 释放旧的摄像头资源
if self.camera is not None:
try:
self.camera.release()
except:
pass
self.camera = None
# 尝试重新打开
self.camera = cv2.VideoCapture(cam_config['device_id'])
if self.camera.isOpened():
# 设置摄像头参数
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width'])
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height'])
self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps'])
# 验证是否能读取帧
ret, frame = self.camera.read()
if ret:
self.logger.info("✓ 摄像头重新连接成功")
self.camera_failure_count = 0
return True
else:
self.logger.warning("摄像头已打开但无法读取帧")
self.camera.release()
self.camera = None
else:
self.logger.warning("无法重新打开摄像头")
return False
def _setup_logging(self):
"""设置日志系统"""
log_config = self.config['logging']
self.logger = logging.getLogger('FaceRecognition')
self.logger.setLevel(getattr(logging, log_config['level']))
# 文件处理器
file_handler = RotatingFileHandler(
log_config['file'],
maxBytes=log_config['max_bytes'],
backupCount=log_config['backup_count']
)
# 控制台处理器
console_handler = logging.StreamHandler()
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def _get_chinese_font(self) -> str:
"""获取中文字体路径"""
import platform
import os
system = platform.system()
# Windows系统
if system == "Windows":
font_paths = [
"C:/Windows/Fonts/msyh.ttc", # 微软雅黑
"C:/Windows/Fonts/simhei.ttf", # 黑体
"C:/Windows/Fonts/simsun.ttc", # 宋体
]
# macOS系统
elif system == "Darwin":
font_paths = [
"/System/Library/Fonts/PingFang.ttc", # 苹方
"/System/Library/Fonts/STHeiti Medium.ttc", # 黑体
"/Library/Fonts/Arial Unicode.ttf",
]
# Linux系统
else:
font_paths = [
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
]
# 查找可用的字体
for font_path in font_paths:
if os.path.exists(font_path):
self.logger.info(f"使用中文字体: {font_path}")
return font_path
# 如果没找到,尝试使用配置文件中的字体路径
if 'font_path' in self.config.get('display', {}):
custom_font = self.config['display']['font_path']
if os.path.exists(custom_font):
self.logger.info(f"使用自定义字体: {custom_font}")
return custom_font
# 默认使用一个基本字体(不支持中文,但至少不会报错)
self.logger.warning("未找到中文字体,将使用默认字体(可能无法显示中文)")
return "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
def _init_ffmpeg_stream(self):
"""初始化FFmpeg推流"""
if not self.stream_enabled:
self.logger.info("流媒体推流功能未启用")
return False
# 检查是否在冷却期内
if self.stream_last_retry_time:
elapsed = time.time() - self.stream_last_retry_time
if elapsed < self.stream_retry_cooldown:
self.logger.debug(f"推流重试冷却中,还需等待 {self.stream_retry_cooldown - elapsed:.1f}")
return False
stream_config = self.config['stream']
ffmpeg_config = stream_config['ffmpeg']
stream_url = stream_config.get('rtmp_url') or stream_config.get('stream_url', '')
# 获取摄像头分辨率
cam_config = self.config['camera']
# 计算带状态面板的总高度
panel_height = 200
total_height = cam_config['height'] + panel_height
# 根据推流URL判断输出格式
if stream_url.startswith('rtmp://'):
output_format = 'flv'
self.logger.debug("使用RTMP协议推流")
elif stream_url.startswith('rtsp://'):
output_format = 'rtsp'
self.logger.debug("使用RTSP协议推流")
else:
self.logger.error(f"不支持的推流协议: {stream_url}")
return False
# 构建FFmpeg命令
ffmpeg_cmd = [
'ffmpeg',
'-y', # 覆盖输出文件
'-f', 'rawvideo', # 输入格式
'-pix_fmt', 'bgr24', # OpenCV使用BGR格式
'-s', f"{cam_config['width']}x{total_height}", # 输入分辨率
'-r', str(ffmpeg_config['fps']), # 输入帧率
'-i', '-', # 从stdin读取
'-c:v', ffmpeg_config['video_codec'], # 视频编码器
'-pix_fmt', ffmpeg_config['pixel_format'], # 输出像素格式
'-tune', ffmpeg_config['tune'], # 编码调优
'-b:v', ffmpeg_config['video_bitrate'], # 视频码率
'-r', str(ffmpeg_config['fps']), # 输出帧率
'-g', str(ffmpeg_config['fps'] * 2), # GOP大小(2秒)
]
# 如果不需要音频,添加-an参数
if not ffmpeg_config.get('audio', False):
ffmpeg_cmd.append('-an')
# 添加RTMP特定参数
if output_format == 'flv':
ffmpeg_cmd.extend([
'-f', 'flv',
'-flvflags', 'no_duration_filesize', # 直播流不需要duration
stream_url
])
# 添加RTSP特定参数
elif output_format == 'rtsp':
ffmpeg_cmd.extend([
'-f', 'rtsp',
'-rtsp_transport', 'tcp', # 使用TCP传输
stream_url
])
try:
self.stream_retry_count += 1
self.stream_last_retry_time = time.time()
self.logger.info(f"启动FFmpeg推流 (第{self.stream_retry_count}次尝试): {stream_url}")
self.logger.debug(f"FFmpeg命令: {' '.join(ffmpeg_cmd)}")
# 启动FFmpeg进程
self.ffmpeg_process = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL, # 忽略标准输出
stderr=subprocess.PIPE,
bufsize=10 ** 8
)
# 等待一小段时间检查进程是否立即失败
time.sleep(0.5)
if self.ffmpeg_process.poll() is not None:
# 进程已退出,读取错误信息
stderr_output = self.ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
self.logger.error(f"FFmpeg进程启动失败:")
# 只显示关键错误信息
for line in stderr_output.split('\n'):
if 'error' in line.lower() or 'failed' in line.lower() or 'connection' in line.lower():
self.logger.error(f" {line.strip()}")
self.ffmpeg_process = None
return False
self.logger.info("FFmpeg推流进程启动成功")
# 重置重试计数(启动成功)
self.stream_retry_count = 0
return True
except FileNotFoundError:
self.logger.error("FFmpeg未安装或不在系统PATH中,推流功能将被禁用")
self.stream_enabled = False
self.ffmpeg_process = None
return False
except Exception as e:
self.logger.error(f"启动FFmpeg推流失败: {e}")
self.ffmpeg_process = None
return False
def _push_frame_to_stream(self, frame: np.ndarray):
"""推送帧到流媒体服务器"""
if not self.stream_enabled or self.ffmpeg_process is None:
return
try:
# 检查FFmpeg进程是否仍在运行
if self.ffmpeg_process.poll() is not None:
# 进程已退出,读取错误信息
if self.stream_retry_count < self.stream_max_retries:
stderr_output = self.ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
if stderr_output:
self.logger.error(f"FFmpeg进程异常退出: {stderr_output[:300]}")
else:
self.logger.warning("FFmpeg进程已退出")
return
# 写入帧数据到FFmpeg的stdin
self.ffmpeg_process.stdin.write(frame.tobytes())
self.ffmpeg_process.stdin.flush() # 立即刷新缓冲区
except BrokenPipeError:
if self.stream_retry_count < self.stream_max_retries:
self.logger.warning("FFmpeg推流管道断开")
self.ffmpeg_process = None
except Exception as e:
if self.stream_retry_count < self.stream_max_retries:
self.logger.error(f"推送帧到流媒体失败: {e}")
self.ffmpeg_process = None
def _close_ffmpeg_stream(self):
"""关闭FFmpeg推流"""
if self.ffmpeg_process:
try:
self.logger.info("正在关闭FFmpeg推流...")
# 先关闭stdin
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
try:
self.ffmpeg_process.stdin.close()
except:
pass
# 等待进程结束
try:
self.ffmpeg_process.wait(timeout=3)
self.logger.info("FFmpeg推流已关闭")
except subprocess.TimeoutExpired:
self.logger.warning("FFmpeg进程未响应,强制终止")
self.ffmpeg_process.kill()
self.ffmpeg_process.wait()
except Exception as e:
self.logger.debug(f"关闭FFmpeg推流时出现异常: {e}")
try:
self.ffmpeg_process.kill()
except:
pass
finally:
self.ffmpeg_process = None
def _init_compreface(self):
"""初始化CompreFace SDK"""
cf_config = self.config['compreface']
# 创建CompreFace实例
compre_face = CompreFace(
cf_config['host'],
cf_config['port'],
{
"limit": 0,
"det_prob_threshold": 0.8,
"prediction_count": 1
}
)
# 初始化识别和检测服务
self.recognition_service: RecognitionService = compre_face.init_face_recognition(
cf_config['recognition_api_key']
)
self.detection_service: DetectionService = compre_face.init_face_detection(
cf_config['detection_api_key']
)
self.logger.info("CompreFace服务初始化完成")
def _init_camera(self):
"""初始化摄像头"""
cam_config = self.config['camera']
retry_interval = cam_config.get('retry_interval', 3)
attempt = 0
while True:
attempt += 1
self.logger.info(f"正在尝试打开摄像头 (第{attempt}次尝试)...")
# 确保先释放之前的摄像头资源
if self.camera is not None:
try:
self.camera.release()
except:
pass
self.camera = None
self.camera = cv2.VideoCapture(cam_config['device_id'])
if self.camera.isOpened():
# 设置摄像头参数
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width'])
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height'])
self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps'])
# 验证是否能读取帧
ret, frame = self.camera.read()
if ret:
self.logger.info(f"摄像头初始化成功 (设备ID: {cam_config['device_id']})")
# 重置失败计数
self.camera_failure_count = 0
self.camera_last_retry_time = None
return True
else:
self.logger.warning("摄像头已打开但无法读取帧")
self.camera.release()
self.camera = None
else:
self.logger.warning(f"无法打开摄像头设备 {cam_config['device_id']}")
# 等待后重试
self.logger.info(f"{retry_interval}秒后重试...")
time.sleep(retry_interval)
def assess_frame_quality(self, frame: np.ndarray) -> float:
"""评估帧质量(使用Laplacian方差检测模糊度)"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def detect_faces(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
"""检测人脸"""
try:
# 将帧编码为JPEG
_, img_encoded = cv2.imencode('.jpg', frame)
# 调用CompreFace检测API
result = self.detection_service.detect(img_encoded.tobytes())
if result and 'result' in result and len(result['result']) > 0:
faces = result['result']
# 过滤掉太小的人脸
min_size = self.config['face_detection']['min_face_size']
valid_faces = [
face for face in faces
if face['box']['x_max'] - face['box']['x_min'] >= min_size
and face['box']['y_max'] - face['box']['y_min'] >= min_size
]
if valid_faces:
# 返回第一个(最大的)人脸
return valid_faces[0]
return None
except Exception as e:
self.logger.error(f"人脸检测错误: {e}")
return None
def recognize_face(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
"""识别人脸"""
try:
# 将帧编码为JPEG
_, img_encoded = cv2.imencode('.jpg', frame)
# 调用CompreFace识别API
result = self.recognition_service.recognize(img_encoded.tobytes())
if result and 'result' in result and len(result['result']) > 0:
faces = result['result']
if len(faces[0]['subjects']) > 0:
# 返回第一个识别结果
subject = faces[0]['subjects'][0]
return {
'subject': subject['subject'],
'similarity': subject['similarity'],
'box': faces[0]['box']
}
return None
except Exception as e:
self.logger.error(f"人脸识别错误: {e}")
return None
def determine_role(self, person_id: str, similarity: float) -> Tuple[str, str]:
"""根据相似度确定角色"""
role_config = self.config['role_mapping']
if similarity < role_config['stranger_threshold']:
return "未知", "陌生人"
else:
t = person_id.split("_")
name = t[0]
role = "员工" if len(t) == 1 else "访客"
return name, role
def should_recognize(self, person_id: str) -> bool:
"""检查是否应该识别(防止重复识别)"""
cooldown = self.config['face_recognition']['recognition_cooldown']
if person_id not in self.recognition_history:
return True
last_time = self.recognition_history[person_id]
elapsed = (datetime.now() - last_time).total_seconds()
return elapsed >= cooldown
def cv2_add_chinese_text(self, img: np.ndarray, text: str, position: Tuple[int, int],
font: ImageFont.FreeTypeFont, text_color: Tuple[int, int, int]) -> np.ndarray:
"""在OpenCV图像上添加中文文本"""
# 转换为PIL图像
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
# 绘制文本
draw.text(position, text, font=font, fill=text_color)
# 转换回OpenCV格式
img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return img
def draw_info_on_frame(self, frame: np.ndarray) -> np.ndarray:
"""在帧上绘制检测和识别信息"""
display_frame = frame.copy()
h, w = display_frame.shape[:2]
# 绘制人脸框和识别结果
if self.display_info['face_detected'] and self.display_info['face_box']:
box = self.display_info['face_box']
x_min = int(box['x_min'])
y_min = int(box['y_min'])
x_max = int(box['x_max'])
y_max = int(box['y_max'])
# 根据识别状态选择颜色
if self.display_info['person_name']:
# 已识别 - 绿色
color = (0, 255, 0)
thickness = 3
else:
# 仅检测到 - 黄色
color = (0, 255, 255)
thickness = 2
# 绘制人脸框
cv2.rectangle(display_frame, (x_min, y_min), (x_max, y_max), color, thickness)
# 绘制识别信息
if self.display_info['person_name']:
name = self.display_info['person_name']
role = self.display_info['person_role']
similarity = self.display_info['similarity']
# 准备文本
text_lines = [
f"姓名: {name}",
f"角色: {role}",
f"相似度: {similarity:.2%}"
]
# 计算文本背景框
line_height = 35
padding = 10
# 计算所需的背景高度
bg_height = len(text_lines) * line_height + padding * 2
bg_y_start = max(0, y_min - bg_height - 10)
# 绘制文本背景
cv2.rectangle(
display_frame,
(x_min, bg_y_start),
(x_max, bg_y_start + bg_height),
(0, 0, 0),
-1
)
cv2.rectangle(
display_frame,
(x_min, bg_y_start),
(x_max, bg_y_start + bg_height),
color,
2
)
# 使用PIL绘制中文文本
for i, text in enumerate(text_lines):
y_pos = bg_y_start + padding + i * line_height
display_frame = self.cv2_add_chinese_text(
display_frame,
text,
(x_min + padding, y_pos),
self.font_medium,
(255, 255, 255)
)
# 绘制状态信息面板
panel_height = 200
panel_bg = np.zeros((panel_height, w, 3), dtype=np.uint8)
panel_bg[:] = (40, 40, 40)
# 状态信息
y_offset = 30
x_offset = 15
line_spacing = 30
# WebSocket连接状态
ws_status = "已连接" if self.ws_connected else "未连接"
ws_color = (0, 255, 0) if self.ws_connected else (0, 0, 255)
status_texts = [
f"帧率: {self.display_info['fps']:.1f} FPS",
f"帧数: {self.display_info['frame_count']}",
f"质量: {self.display_info['quality']:.1f}",
f"检测到人脸: {'' if self.display_info['face_detected'] else ''}",
f"WebSocket: {ws_status}",
f"机器人说话: {'' if self.robot_status['is_speaking'] else ''}",
f"机器人思考: {'' if self.robot_status['is_thinking'] else ''}",
f"机器人角色: {self.robot_status['role_name']}"
]
# 如果人脸持续出现,显示倒计时
if self.face_present_start:
elapsed = (datetime.now() - self.face_present_start).total_seconds()
face_duration = self.config['face_detection']['face_present_duration']
remaining = max(0, face_duration - elapsed)
status_texts.append(f"识别倒计时: {remaining:.1f}")
# 使用PIL绘制中文状态文本
for i, text in enumerate(status_texts):
# WebSocket状态使用特殊颜色
if i == 4: # "WebSocket: " 这一行
panel_bg = self.cv2_add_chinese_text(
panel_bg,
text,
(x_offset, y_offset + i * line_spacing),
self.font_small,
ws_color
)
else:
panel_bg = self.cv2_add_chinese_text(
panel_bg,
text,
(x_offset, y_offset + i * line_spacing),
self.font_small,
(255, 255, 255)
)
# 将面板添加到画面底部
display_frame = np.vstack([display_frame, panel_bg])
return display_frame
def update_fps(self):
"""更新FPS计算"""
self.fps_counter += 1
current_time = time.time()
elapsed = current_time - self.last_fps_time
if elapsed >= 1.0:
self.display_info['fps'] = self.fps_counter / elapsed
self.fps_counter = 0
self.last_fps_time = current_time
async def send_websocket_message(self, message: Dict[str, Any]):
"""发送WebSocket消息"""
if self.ws and self.ws_connected:
try:
await self.ws.send(json.dumps(message))
self.logger.debug(f"发送消息: {message}")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已关闭,无法发送消息")
self.ws_connected = False
except Exception as e:
self.logger.error(f"发送WebSocket消息失败: {e}")
self.ws_connected = False
else:
self.logger.debug("WebSocket未连接,消息发送失败")
async def query_robot_status(self):
"""定期查询机器人状态"""
interval = self.config['websocket']['status_interval']
try:
while self.ws_connected:
try:
status_msg = {
"type": "get_status",
"message": ""
}
await self.send_websocket_message(status_msg)
await asyncio.sleep(interval)
except Exception as e:
self.logger.error(f"查询状态错误: {e}")
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"状态查询任务异常: {e}")
self.ws_connected = False
async def handle_websocket_messages(self):
"""处理WebSocket接收的消息"""
try:
while self.ws_connected:
try:
message = await self.ws.recv()
data = json.loads(message)
if data.get('type') == 'status':
status = data.get('message', {})
self.robot_status['is_speaking'] = status.get('is_speaking', False)
self.robot_status['is_thinking'] = status.get('is_thinking', False)
self.robot_status['listening'] = status.get('listening', False)
self.robot_status['role_name'] = status.get('role_name', '访客引导者')
self.logger.debug(f"机器人状态: {self.robot_status}")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket消息接收中断: 连接已关闭")
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"处理WebSocket消息错误: {e}")
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"WebSocket消息处理任务异常: {e}")
self.ws_connected = False
async def connect_websocket(self):
"""连接WebSocket - 无限重连"""
reconnect_delay = self.config['websocket']['reconnect_delay']
while True:
try:
self.ws_reconnect_count += 1
# 控制日志输出频率
if self.ws_reconnect_count <= 3:
self.logger.info(f"连接WebSocket: {self.ws_url} (第{self.ws_reconnect_count}次尝试)")
elif self.ws_reconnect_count % 10 == 0:
self.logger.info(f"持续尝试连接WebSocket (第{self.ws_reconnect_count}次)")
else:
self.logger.debug(f"尝试连接WebSocket (第{self.ws_reconnect_count}次)")
# 设置连接超时
async with websockets.connect(
self.ws_url,
ping_interval=20, # 每20秒发送ping
ping_timeout=10, # ping超时10秒
close_timeout=5 # 关闭超时5秒
) as ws:
self.ws = ws
self.ws_connected = True
self.ws_reconnect_count = 0 # 连接成功,重置计数
self.logger.info("✓ WebSocket连接成功")
# 同时运行状态查询和消息接收
try:
await asyncio.gather(
self.query_robot_status(),
self.handle_websocket_messages()
)
except Exception as e:
self.logger.warning(f"WebSocket任务组异常: {e}")
finally:
# 确保连接状态被重置
self.ws_connected = False
self.ws = None
self.logger.info("WebSocket连接已断开,准备重连...")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已正常关闭")
self.ws_connected = False
self.ws = None
except ConnectionRefusedError:
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket连接被拒绝: {self.ws_url}")
elif self.ws_reconnect_count == 4:
self.logger.warning("WebSocket持续连接失败,将减少日志输出频率")
self.ws_connected = False
self.ws = None
except OSError as e:
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket网络错误: {e}")
self.ws_connected = False
self.ws = None
except Exception as e:
error_msg = str(e)
# 过滤常见的连接错误
if "no close frame" not in error_msg and "Connection closed" not in error_msg:
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket连接错误: {e}")
elif self.ws_reconnect_count == 4:
self.logger.warning("WebSocket持续连接失败,将减少日志输出频率")
self.ws_connected = False
self.ws = None
# 等待后重连
if self.ws_reconnect_count <= 3:
self.logger.info(f"{reconnect_delay}秒后重连...")
elif self.ws_reconnect_count % 10 == 0:
self.logger.info(f"{reconnect_delay}秒后继续尝试重连...")
await asyncio.sleep(reconnect_delay)
def can_perform_detection(self) -> bool:
"""检查是否可以进行人脸检测"""
return not self.robot_status['is_speaking'] and not self.robot_status['is_thinking'] and self.robot_status[
'role_name'] == "访客引导者"
async def process_video_stream(self):
"""处理视频流"""
self._init_camera()
# 初始化FFmpeg推流
if self.stream_enabled:
self._init_ffmpeg_stream()
frame_interval = self.config['face_detection']['frame_interval']
quality_threshold = self.config['face_detection']['quality_threshold']
face_duration = self.config['face_detection']['face_present_duration']
self.logger.info("开始处理视频流")
# 创建显示窗口并最大化
window_name = 'Face Recognition System'
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
# 设置窗口为全屏或最大化
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_NORMAL)
try:
import screeninfo
screen = screeninfo.get_monitors()[0]
cv2.resizeWindow(window_name, screen.width, screen.height)
cv2.moveWindow(window_name, 0, 0)
except:
cv2.resizeWindow(window_name, 1920, 1080)
self.logger.warning("无法获取屏幕分辨率,使用默认窗口大小")
try:
while True:
# 检查摄像头是否可用
if self.camera is None or not self.camera.isOpened():
self.logger.warning("摄像头未打开,尝试重新连接...")
if not self._try_reconnect_camera():
await asyncio.sleep(0.1)
continue
ret, frame = self.camera.read()
if not ret:
self.camera_failure_count += 1
# 根据失败次数调整日志级别
if self.camera_failure_count <= 3:
self.logger.warning(f"无法读取摄像头帧 (连续失败{self.camera_failure_count}次)")
elif self.camera_failure_count == 4:
self.logger.warning("摄像头持续读取失败,将减少日志输出")
# 达到阈值时尝试重新初始化
if self.camera_failure_count >= self.camera_max_failures:
self.logger.error(f"摄像头连续失败{self.camera_failure_count}次,尝试重新初始化...")
if self._try_reconnect_camera():
self.logger.info("摄像头重新初始化成功,继续处理")
continue
else:
# 重置计数,避免频繁重试
self.camera_failure_count = 0
await asyncio.sleep(0.1)
continue
# 成功读取帧,重置失败计数
if self.camera_failure_count > 0:
self.logger.info(f"摄像头恢复正常 (之前连续失败{self.camera_failure_count}次)")
self.camera_failure_count = 0
self.frame_count += 1
self.display_info['frame_count'] = self.frame_count
# 更新FPS
self.update_fps()
# 默认清空检测和识别结果
should_detect = False
# 检查是否到达检测间隔
if self.frame_count % frame_interval == 0:
# 检查机器人状态
if self.can_perform_detection():
# 评估帧质量
quality = self.assess_frame_quality(frame)
self.display_info['quality'] = quality
if quality >= quality_threshold:
should_detect = True
else:
self.logger.debug(f"帧质量不足: {quality:.2f}")
else:
self.logger.debug("机器人正在说话或思考,跳过检测")
# self.face_present_start = None
self.display_info['quality'] = self.assess_frame_quality(frame)
else:
self.display_info['quality'] = 0
should_detect = True
# 执行人脸检测
if should_detect:
face = self.detect_faces(frame)
if face:
self.display_info['face_detected'] = True
self.display_info['face_box'] = face['box']
# 记录人脸出现时间
if self.face_present_start is None:
self.face_present_start = datetime.now()
self.logger.info("检测到人脸,开始计时")
# 清空之前的识别结果
self.display_info['person_name'] = None
self.display_info['person_role'] = None
self.display_info['similarity'] = 0
# 检查是否满足持续出现时长
elapsed = (datetime.now() - self.face_present_start).total_seconds()
if elapsed >= face_duration:
self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别")
# 进行人脸识别
recognition_result = self.recognize_face(frame)
if recognition_result:
person_id = recognition_result['subject']
similarity = recognition_result['similarity']
# 更新显示信息
self.display_info['face_box'] = recognition_result['box']
# 检查是否应该识别(防止重复)
if self.should_recognize(person_id):
name, role = self.determine_role(person_id, similarity)
# 更新显示信息
self.display_info['person_name'] = name
self.display_info['person_role'] = role
self.display_info['similarity'] = similarity
self.logger.info(
f"识别到: {name}, 相似度: {similarity:.6f}, 角色: {role}"
)
# 如果是陌生人,显示二维码
if role == "陌生人":
self._show_qrcode()
# 发送识别结果
reception_msg = {
"type": "start_reception",
"message": {
"name": name,
"role": role
}
}
await self.send_websocket_message(reception_msg)
# 记录识别时间
self.recognition_history[person_id] = datetime.now()
# 重置计时器
self.face_present_start = None
else:
# 未识别到已知人脸
if self.should_recognize("unknown"):
self.logger.info("检测到陌生人")
# 更新显示信息
self.display_info['person_name'] = "未知访客"
self.display_info['person_role'] = "陌生人"
self.display_info['similarity'] = 0
# 显示二维码
self._show_qrcode()
reception_msg = {
"type": "start_reception",
"message": {
"name": "未知访客",
"role": "陌生人"
}
}
await self.send_websocket_message(reception_msg)
self.recognition_history["unknown"] = datetime.now()
# 重置计时器
self.face_present_start = None
else:
# 没有检测到人脸
self.display_info['face_detected'] = False
self.display_info['face_box'] = None
# 重置计时器和识别信息
if self.face_present_start is not None:
self.logger.debug("人脸消失,重置计时器")
self.face_present_start = None
# 绘制信息并显示
display_frame = self.draw_info_on_frame(frame)
cv2.imshow(window_name, display_frame)
# 检查二维码显示是否超时
self._check_qrcode_timeout()
# 推送到流媒体服务器
if self.stream_enabled:
# 如果FFmpeg进程不存在或已退出,尝试重新启动
if self.ffmpeg_process is None or self.ffmpeg_process.poll() is not None:
if self.stream_retry_count < self.stream_max_retries:
success = self._init_ffmpeg_stream()
if not success:
pass
# 推送帧
if self.ffmpeg_process and self.ffmpeg_process.poll() is None:
self._push_frame_to_stream(display_frame)
# 检查键盘输入
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # 'q' 或 ESC 退出
self.logger.info("用户请求退出")
break
await asyncio.sleep(0.001)
except Exception as e:
self.logger.error(f"视频流处理错误: {e}")
finally:
# 清理资源
if self.camera:
self.camera.release()
self.logger.info("摄像头已释放")
# 关闭二维码窗口
self._close_qrcode()
# 关闭FFmpeg推流
if self.stream_enabled:
self._close_ffmpeg_stream()
cv2.destroyAllWindows()
async def run(self):
"""运行系统"""
self.logger.info("启动人脸识别系统")
# 同时运行WebSocket连接和视频处理
await asyncio.gather(
self.connect_websocket(),
self.process_video_stream()
)
def main():
"""主函数"""
system = FaceRecognitionSystem("config.yaml")
try:
asyncio.run(system.run())
except KeyboardInterrupt:
print("\n系统已停止")
except Exception as e:
print(f"系统错误: {e}")
if __name__ == "__main__":
main()