robot_face_rec/face_rec.py

1992 lines
80 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import asyncio
import websockets
import json
import yaml
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
import numpy as np
import time
from collections import defaultdict
from PIL import Image, ImageDraw, ImageFont, ImageFilter
import subprocess
import os
import threading
import queue
from compreface import CompreFace
from compreface.service import RecognitionService, DetectionService
class FaceRecognitionSystem:
def __init__(self, config_path: str = "config.yaml"):
"""初始化人脸识别系统"""
# 加载配置
with open(config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
# 设置日志
self._setup_logging()
# 初始化CompreFace
self._init_compreface()
# 初始化摄像头
self.camera = None
# WebSocket连接
self.ws = None
self.ws_url = self.config['websocket']['url']
self.ws_connected = False # WebSocket连接状态标志
self.ws_reconnect_count = 0 # 重连次数计数(用于日志控制)
# 状态变量
self.robot_status = {
'is_speaking': False,
'is_thinking': False,
'listening': False,
'role_name': ''
}
# 人脸检测状态
self.frame_count = 0
self.face_present_start = None
self.current_face_id = None
# 识别记录(防止重复识别)
self.recognition_history = {} # {person_id: last_recognition_time}
# 显示相关变量
self.current_display_frame = None
self.last_detection_result = None # 最后的检测结果
self.last_recognition_result = None # 最后的识别结果
self.display_info = {
'quality': 0,
'face_detected': False,
'face_box': None,
'person_name': None,
'person_role': None,
'similarity': 0,
'frame_count': 0,
'fps': 0
}
self.last_fps_time = time.time()
self.fps_counter = 0
# 加载中文字体
self.font_path = self._get_chinese_font()
self.font_small = ImageFont.truetype(self.font_path, 20)
self.font_medium = ImageFont.truetype(self.font_path, 24)
self.font_large = ImageFont.truetype(self.font_path, 50)
# SOTA QR Design Fonts
self.font_qr_title = ImageFont.truetype(self.font_path, 56)
self.font_qr_subtitle = ImageFont.truetype(self.font_path, 30)
self.font_qr_body = ImageFont.truetype(self.font_path, 24)
self.font_qr_step_title = ImageFont.truetype(self.font_path, 28)
self.font_qr_badge = ImageFont.truetype(self.font_path, 30)
self.qrcode_instruction_title = "访客预约流程"
self.qrcode_instruction_subtitle = "Visitor Registration Process"
self.qrcode_instruction_steps = [
"第一步:扫码关注“康达新材”公众号。",
"第二步:点击【关于我们】→【我是访客】进入“访客注册”界面,填写并上传相应信息并点击“提交”。",
"第三步:将第二步信息提交完毕后在“访客预约”界面选择右下角的“+”号按钮,填写预约信息。",
"第四步:请仔细阅读安全告知书,点击我知道了。",
"第五步:填写被访人信息及来访事由等内容并提交。",
"第六步:显示提交成功,并且手机、微信会收到预约相关短信通知。",
]
# FFmpeg推流进程
self.ffmpeg_process = None
self.stream_enabled = self.config.get('stream', {}).get('enabled', False)
self.stream_retry_count = 0 # 推流重试计数
self.stream_max_retries = 5 # 最大重试次数
self.stream_last_retry_time = None # 上次重试时间
self.stream_retry_cooldown = 10 # 重试冷却时间(秒)
# 添加异步推流队列和线程
self.stream_frame_queue = queue.Queue(maxsize=10) # 限制队列大小防止内存溢出
self.stream_thread = None
self.stream_thread_running = False
self.logger.info("人脸识别系统初始化完成")
# 添加摄像头状态跟踪
self.camera_failure_count = 0 # 连续失败次数
self.camera_last_retry_time = None # 上次重试时间
self.camera_retry_cooldown = 3 # 重试冷却时间(秒)
self.camera_max_failures = 5 # 触发重新初始化的失败次数阈值
# 二维码显示相关
self.qrcode_image_path = self.config.get('qrcode', {}).get('image_path', 'qrcode.png')
self.qrcode_display_duration = self.config.get('qrcode', {}).get('display_duration', 10)
self.qrcode_window_name = 'Visitor guidance QR code'
self.qrcode_display_start_time = None # 二维码显示开始时间
self.qrcode_showing = False # 二维码是否正在显示
# 视频播放相关
self.video_path = self.config.get('video', {}).get('path', 'exhibition.mp4')
self.video_window_name = 'Hello Video'
self.video_showing = False
self.video_capture = None
self.video_start_time = None
self.video_loop = self.config.get('video', {}).get('loop', True) # 是否循环播放
self.last_robot_role = '' # 记录上一次的机器人角色
# 添加stderr读取线程
self.stderr_thread = None
self.stderr_thread_running = False
def _force_cleanup_ffmpeg(self):
"""强制清理所有FFmpeg相关资源 - 防止多进程推流"""
self.logger.info("=== 开始清理FFmpeg资源 ===")
# 1. 停止stderr读取线程
if self.stderr_thread and self.stderr_thread.is_alive():
self.logger.debug("停止stderr线程...")
self.stderr_thread_running = False
self.stderr_thread.join(timeout=1)
# 2. 停止推流线程
if self.stream_thread and self.stream_thread.is_alive():
self.logger.debug("停止推流线程...")
self.stream_thread_running = False
try:
self.stream_frame_queue.put(None, timeout=0.5)
except:
pass
self.stream_thread.join(timeout=2)
# 3. 清空帧队列
cleared = 0
while not self.stream_frame_queue.empty():
try:
self.stream_frame_queue.get_nowait()
cleared += 1
except:
break
if cleared > 0:
self.logger.debug(f"清空队列: {cleared}")
# 4. 强制终止FFmpeg进程
if self.ffmpeg_process:
self.logger.warning("发现旧的FFmpeg进程强制终止...")
try:
# 先尝试关闭stdin
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
try:
self.ffmpeg_process.stdin.close()
except:
pass
# 获取进程PID
pid = self.ffmpeg_process.pid
self.logger.info(f"终止FFmpeg进程 PID={pid}")
# 立即kill不等待graceful shutdown
self.ffmpeg_process.kill()
self.ffmpeg_process.wait(timeout=2)
self.logger.info(f"✓ FFmpeg进程已终止 (PID={pid})")
except Exception as e:
self.logger.error(f"终止FFmpeg进程失败: {e}")
finally:
self.ffmpeg_process = None
# 5. 额外安全措施杀死所有可能残留的ffmpeg进程
try:
import psutil
current_pid = os.getpid()
killed_count = 0
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
# 查找ffmpeg进程
if proc.info['name'] and 'ffmpeg' in proc.info['name'].lower():
# 检查是否是我们启动的(通过检查命令行参数)
cmdline = proc.info.get('cmdline', [])
if cmdline and any('rawvideo' in str(arg) for arg in cmdline):
if proc.pid != current_pid:
self.logger.warning(f"发现残留FFmpeg进程 PID={proc.pid}, 正在终止...")
proc.kill()
proc.wait(timeout=2)
killed_count += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
if killed_count > 0:
self.logger.warning(f"✓ 清理了 {killed_count} 个残留FFmpeg进程")
except ImportError:
self.logger.debug("未安装psutil跳过残留进程清理建议安装: pip install psutil")
except Exception as e:
self.logger.debug(f"清理残留进程时出错: {e}")
# 6. 等待一小段时间确保资源释放
time.sleep(0.5)
self.logger.info("=== FFmpeg资源清理完成 ===")
def _show_video(self):
"""显示展厅讲解视频"""
try:
if not os.path.exists(self.video_path):
self.logger.error(f"视频文件不存在: {self.video_path}")
return False
# 如果视频已经在播放,不需要重复初始化
if self.video_showing and self.video_capture is not None:
return True
# 打开视频文件
self.video_capture = cv2.VideoCapture(self.video_path)
if not self.video_capture.isOpened():
self.logger.error(f"无法打开视频文件: {self.video_path}")
return False
# 创建视频窗口
cv2.namedWindow(self.video_window_name, cv2.WINDOW_NORMAL | cv2.WINDOW_GUI_NORMAL)
# 获取视频尺寸
video_width = int(self.video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(self.video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
video_fps = self.video_capture.get(cv2.CAP_PROP_FPS)
# 设置窗口大小和位置
try:
import screeninfo
screen = screeninfo.get_monitors()[0]
# 设置视频显示尺寸为屏幕的80%
target_width = int(screen.width)
target_height = int(screen.height)
# 保持宽高比
scale_width = target_width / video_width
scale_height = target_height / video_height
scale = min(scale_width, scale_height)
new_width = int(video_width * scale)
new_height = int(video_height * scale)
# 计算居中位置
x_pos = (screen.width - new_width) // 2
y_pos = (screen.height - new_height) // 2
# 设置窗口大小和位置
cv2.resizeWindow(self.video_window_name, new_width, new_height)
cv2.moveWindow(self.video_window_name, x_pos, y_pos)
self.logger.info(f"视频显示尺寸: {new_width}x{new_height}, 位置: ({x_pos}, {y_pos})")
except Exception as e:
self.logger.warning(f"无法获取屏幕信息,使用默认尺寸: {e}")
cv2.resizeWindow(self.video_window_name, 1280, 720)
# 设置窗口置顶
cv2.setWindowProperty(self.video_window_name, cv2.WND_PROP_TOPMOST, 1)
self.video_showing = True
self.video_start_time = time.time()
self.logger.info(f"开始播放展厅讲解视频: {self.video_path} (FPS: {video_fps:.2f})")
return True
except Exception as e:
self.logger.error(f"显示视频失败: {e}")
return False
def _draw_robot_status_on_video(self, frame: np.ndarray) -> np.ndarray:
"""在视频帧上绘制机器人状态信息"""
try:
h, w = frame.shape[:2]
# 准备状态文本
status_texts = []
if self.robot_status['is_thinking']:
status_texts.append("🤔 正在思考...")
if self.robot_status['is_asr_processing']:
status_texts.append("👂 正在倾听...")
if self.robot_status['is_speaking']:
status_texts.append("👂 正在讲话...")
# 如果没有状态需要显示,直接返回原帧
if not status_texts:
return frame
# 转换为PIL图像以支持中文
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(frame_pil)
# 计算文本位置(中间上方)
y_start = 50 # 距离顶部的距离
line_height = 45
for i, text in enumerate(status_texts):
# 获取文本尺寸兼容旧版本Pillow
try:
# Pillow >= 8.0.0
bbox = draw.textbbox((0, 0), text, font=self.font_large)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
except AttributeError:
# Pillow < 8.0.0
text_width, text_height = draw.textsize(text, font=self.font_large)
# 计算居中位置
x = (w - text_width) // 2
y = y_start + i * line_height
# 绘制半透明背景
padding = 15
bg_x1 = x - padding
bg_y1 = y - padding
bg_x2 = x + text_width + padding
bg_y2 = y + text_height + padding
# 创建半透明背景
overlay = frame_pil.copy()
overlay_draw = ImageDraw.Draw(overlay)
overlay_draw.rectangle(
[bg_x1, bg_y1, bg_x2, bg_y2],
fill=(0, 0, 0, 180) # 黑色半透明
)
# 混合背景
frame_pil = Image.blend(frame_pil, overlay, 0.7)
draw = ImageDraw.Draw(frame_pil)
# 绘制边框
draw.rectangle(
[bg_x1, bg_y1, bg_x2, bg_y2],
outline=(0, 255, 255), # 黄色边框
width=3
)
# 绘制文本
draw.text((x, y), text, font=self.font_large, fill=(255, 255, 255))
# 转换回OpenCV格式
frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)
return frame
except Exception as e:
self.logger.error(f"在视频上绘制状态失败: {e}")
return frame
def _update_video(self):
"""更新视频帧(在主循环中调用)"""
if not self.video_showing or self.video_capture is None:
return
try:
ret, frame = self.video_capture.read()
if ret:
# 在视频帧上绘制机器人状态
frame_with_status = self._draw_robot_status_on_video(frame)
# 显示视频帧
cv2.imshow(self.video_window_name, frame_with_status)
else:
# 视频播放完毕
if self.video_loop:
# 循环播放,重新定位到开头
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.logger.debug("视频循环播放")
else:
# 不循环,关闭视频
self.logger.info("视频播放完毕")
self._close_video()
except Exception as e:
self.logger.error(f"更新视频帧失败: {e}")
self._close_video()
def _close_video(self):
"""关闭视频窗口"""
try:
if self.video_showing:
if self.video_capture is not None:
self.video_capture.release()
self.video_capture = None
cv2.destroyWindow(self.video_window_name)
self.video_showing = False
self.video_start_time = None
self.logger.info("关闭视频窗口")
except Exception as e:
self.logger.debug(f"关闭视频窗口时出错: {e}")
def _check_role_and_display_video(self):
"""检查机器人角色并显示/隐藏视频"""
current_role = self.robot_status['role_name']
# 角色发生变化时的处理
if current_role != self.last_robot_role:
self.logger.info(f"机器人角色变化: {self.last_robot_role} -> {current_role}")
self.last_robot_role = current_role
# 如果切换到展厅讲解员,显示视频
if current_role == "展厅讲解员" or current_role == "初始角色":
self._show_video()
# 如果切换到其他角色,关闭视频
elif self.video_showing:
self._close_video()
# 如果是展厅讲解员且视频应该显示但未显示,尝试显示
if (current_role == "展厅讲解员" or current_role == "初始角色") and not self.video_showing:
self._show_video()
# 如果不是展厅讲解员但视频在显示,关闭视频
if current_role != "展厅讲解员" and current_role != "初始角色" and self.video_showing:
self._close_video()
def _wrap_text_for_width(self, text: str, font: ImageFont.FreeTypeFont, max_width: int, draw: ImageDraw.ImageDraw):
lines = []
current = ''
for char in text:
tentative = current + char
try:
width = draw.textlength(tentative, font=font)
except AttributeError:
width, _ = draw.textsize(tentative, font=font)
if width <= max_width:
current = tentative
else:
if current:
lines.append(current)
current = char
if current:
lines.append(current)
return lines
def _create_gradient_background(self, size: Tuple[int, int], start_color: Tuple[int, int, int], end_color: Tuple[int, int, int]) -> Image.Image:
width, height = size
# Vertical gradient
base = Image.new('RGB', size, start_color)
top = Image.new('RGB', size, end_color)
mask = Image.new('L', size)
mask_data = np.tile(np.linspace(0, 255, height, dtype=np.uint8), (width, 1)).T
mask = Image.fromarray(mask_data, 'L')
return Image.composite(top, base, mask)
def _draw_rounded_rect(self, draw, box, radius, fill, outline=None, width=0):
draw.rounded_rectangle(box, radius=radius, fill=fill, outline=outline, width=width)
def _draw_shadow(self, image, box, radius, offset=(0, 4), blur=10, shadow_color=(0,0,0,50)):
# Create a separate shadow layer
shadow_layer = Image.new('RGBA', image.size, (0,0,0,0))
shadow_draw = ImageDraw.Draw(shadow_layer)
sx0, sy0, sx1, sy1 = box
dx, dy = offset
shadow_box = (sx0 + dx, sy0 + dy, sx1 + dx, sy1 + dy)
shadow_draw.rounded_rectangle(shadow_box, radius=radius, fill=shadow_color)
# Blur the shadow
shadow_layer = shadow_layer.filter(ImageFilter.GaussianBlur(blur))
# Composite
image.alpha_composite(shadow_layer)
def _build_qrcode_instruction_canvas(self, qr_image: np.ndarray, canvas_size: Tuple[int, int]) -> Optional[np.ndarray]:
try:
canvas_width, canvas_height = canvas_size
if canvas_width <= 0 or canvas_height <= 0:
return None
# Colors
PRIMARY_COLOR = (23, 43, 77) # Dark Blue for headings
SECONDARY_COLOR = (94, 108, 132) # Grey for secondary text
ACCENT_COLOR = (0, 82, 204) # Bright Blue for highlights/numbers
# 1. Background
bg = self._create_gradient_background((canvas_width, canvas_height), (245, 247, 250), (223, 225, 230)).convert("RGBA")
draw = ImageDraw.Draw(bg, "RGBA")
# Layout Constants
MARGIN = 50
GUTTER = 40
# Left Panel (QR Code) - 35% width approx
left_width = int((canvas_width - 2 * MARGIN - GUTTER) * 0.35)
# Ensure minimum width for QR code
left_width = max(left_width, 400)
right_width = canvas_width - 2 * MARGIN - GUTTER - left_width
left_box = (MARGIN, MARGIN, MARGIN + left_width, canvas_height - MARGIN)
right_box = (MARGIN + left_width + GUTTER, MARGIN, canvas_width - MARGIN, canvas_height - MARGIN)
# --- Draw Left Panel ---
# Shadow
self._draw_shadow(bg, left_box, radius=30, offset=(0, 10), blur=20, shadow_color=(0,0,0,30))
# Card
self._draw_rounded_rect(draw, left_box, radius=30, fill=(255, 255, 255, 255))
# Left Content
cx = (left_box[0] + left_box[2]) // 2
cy_top = left_box[1] + 120
# Title
text = "访客登记"
try:
w = draw.textlength(text, font=self.font_qr_title)
except:
w, _ = draw.textsize(text, font=self.font_qr_title)
draw.text((cx - w/2, cy_top), text, font=self.font_qr_title, fill=PRIMARY_COLOR)
# Subtitle
text = "Visitor Registration"
try:
w = draw.textlength(text, font=self.font_qr_subtitle)
except:
w, _ = draw.textsize(text, font=self.font_qr_subtitle)
draw.text((cx - w/2, cy_top + 70), text, font=self.font_qr_subtitle, fill=SECONDARY_COLOR)
# QR Code
qr_size = min(left_width - 100, 500)
qr_y = cy_top + 180
# Resize QR
qr_pil = Image.fromarray(cv2.cvtColor(qr_image, cv2.COLOR_BGR2RGB))
qr_pil = qr_pil.resize((qr_size, qr_size), Image.LANCZOS)
bg.paste(qr_pil, (cx - qr_size//2, qr_y))
# Scan Hint
hint_y = qr_y + qr_size + 50
hint_text = "请使用微信扫码登记"
try:
w = draw.textlength(hint_text, font=self.font_qr_step_title)
except:
w, _ = draw.textsize(hint_text, font=self.font_qr_step_title)
# Draw a pill background for hint
pill_padding = 20
pill_box = (cx - w/2 - pill_padding, hint_y - pill_padding, cx + w/2 + pill_padding, hint_y + 40 + pill_padding)
self._draw_rounded_rect(draw, pill_box, radius=30, fill=(240, 242, 245), outline=None)
draw.text((cx - w/2, hint_y), hint_text, font=self.font_qr_step_title, fill=ACCENT_COLOR)
# --- Draw Right Panel ---
# Right Title Area - Compacted
rt_y = MARGIN + 20
draw.text((right_box[0], rt_y), self.qrcode_instruction_title, font=self.font_qr_title, fill=PRIMARY_COLOR)
draw.text((right_box[0], rt_y + 60), self.qrcode_instruction_subtitle, font=self.font_qr_subtitle, fill=SECONDARY_COLOR)
# Separator Line
sep_y = rt_y + 110
draw.line((right_box[0], sep_y, right_box[2], sep_y), fill=(200, 200, 200), width=2)
# Grid Configuration
grid_y_start = sep_y + 40
grid_width = right_width
cols = 2
col_gap = 30
row_gap = 30
col_width = (grid_width - (cols - 1) * col_gap) // cols
# Pre-calculate text layout to find uniform height
max_lines = 0
processed_steps = []
padding = 30
badge_size = 50
text_left_margin = badge_size + 20
# Calculate available width for text inside a card
text_max_width = col_width - padding * 2 - text_left_margin
for i, step in enumerate(self.qrcode_instruction_steps):
# Remove "第一步:" etc prefix if present to make it cleaner, we have badges
clean_step = step.split("", 1)[-1] if "" in step else step
lines = self._wrap_text_for_width(clean_step, self.font_qr_body, text_max_width, draw)
processed_steps.append(lines)
max_lines = max(max_lines, len(lines))
# Calculate Card Height
# Padding top + max_lines * line_height + Padding bottom
line_height = self.font_qr_body.size + 10
card_content_height = max(badge_size, max_lines * line_height)
uniform_card_height = int(padding * 2 + card_content_height)
# Draw Grid
for idx, lines in enumerate(processed_steps):
row = idx // cols
col = idx % cols
x = right_box[0] + col * (col_width + col_gap)
y = grid_y_start + row * (uniform_card_height + row_gap)
# Ensure we don't go out of bounds
if y + uniform_card_height > canvas_height:
break
card_box = (x, y, x + col_width, y + uniform_card_height)
# Card Shadow
self._draw_shadow(bg, card_box, radius=20, offset=(0, 4), blur=12, shadow_color=(0,0,0,15))
# Card Body
self._draw_rounded_rect(draw, card_box, radius=20, fill=(255, 255, 255))
# Badge (Step Number)
bx = x + padding
by = y + padding
draw.ellipse((bx, by, bx + badge_size, by + badge_size), fill=ACCENT_COLOR)
num_text = str(idx + 1)
try:
nw = draw.textlength(num_text, font=self.font_qr_badge)
except:
nw, _ = draw.textsize(num_text, font=self.font_qr_badge)
# Center number
draw.text((bx + (badge_size - nw)/2, by + (badge_size - self.font_qr_badge.size)/2 - 2),
num_text, font=self.font_qr_badge, fill=(255, 255, 255))
# Text
tx = x + padding + text_left_margin
ty = y + padding
for line in lines:
draw.text((tx, ty), line, font=self.font_qr_body, fill=PRIMARY_COLOR)
ty += line_height
return cv2.cvtColor(np.array(bg.convert("RGB")), cv2.COLOR_RGB2BGR)
except Exception as e:
self.logger.error(f"渲染二维码指引面板失败: {e}")
return None
def _show_qrcode(self):
"""显示二维码图片"""
try:
if not os.path.exists(self.qrcode_image_path):
self.logger.error(f"二维码图片不存在: {self.qrcode_image_path}")
return False
# 读取二维码图片
qr_image = cv2.imread(self.qrcode_image_path)
if qr_image is None:
self.logger.error(f"无法读取二维码图片: {self.qrcode_image_path}")
return False
# 创建二维码窗口
cv2.namedWindow(self.qrcode_window_name, cv2.WINDOW_NORMAL | cv2.WINDOW_GUI_NORMAL)
qr_height, qr_width = qr_image.shape[:2]
try:
import screeninfo
screen = screeninfo.get_monitors()[0]
canvas_width = int(screen.width)
canvas_height = int(screen.height)
x_pos = 0
y_pos = 0
except Exception as e:
self.logger.warning(f"无法获取屏幕信息,使用默认尺寸: {e}")
canvas_width = max(qr_width * 2, 1920)
canvas_height = max(qr_height, 1080)
x_pos = 0
y_pos = 0
rendered = self._build_qrcode_instruction_canvas(qr_image, (canvas_width, canvas_height))
if rendered is None:
# 回退到原二维码展示
scale_width = canvas_width / qr_width
scale_height = canvas_height / qr_height
scale = min(scale_width, scale_height)
new_width = int(qr_width * scale)
new_height = int(qr_height * scale)
rendered = cv2.resize(qr_image, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
canvas_width, canvas_height = new_width, new_height
cv2.resizeWindow(self.qrcode_window_name, canvas_width, canvas_height)
cv2.moveWindow(self.qrcode_window_name, x_pos, y_pos)
# 设置窗口全屏显示
cv2.setWindowProperty(self.qrcode_window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
cv2.imshow(self.qrcode_window_name, rendered)
# 设置窗口置顶
cv2.setWindowProperty(self.qrcode_window_name, cv2.WND_PROP_TOPMOST, 1)
self.qrcode_showing = True
self.qrcode_display_start_time = time.time()
self.logger.info(f"显示二维码图片: {self.qrcode_image_path}")
return True
except Exception as e:
self.logger.error(f"显示二维码失败: {e}")
return False
def _close_qrcode(self):
"""关闭二维码窗口"""
try:
if self.qrcode_showing:
cv2.destroyWindow(self.qrcode_window_name)
self.qrcode_showing = False
self.qrcode_display_start_time = None
self.logger.info("关闭二维码窗口")
except Exception as e:
self.logger.debug(f"关闭二维码窗口时出错: {e}")
def _check_qrcode_timeout(self):
"""检查二维码是否应该关闭"""
if self.qrcode_showing and self.qrcode_display_start_time:
elapsed = time.time() - self.qrcode_display_start_time
if elapsed >= self.qrcode_display_duration:
self._close_qrcode()
return True
return False
def _try_reconnect_camera(self) -> bool:
"""尝试重新连接摄像头"""
current_time = time.time()
# 检查是否在冷却期内
if self.camera_last_retry_time:
elapsed = current_time - self.camera_last_retry_time
if elapsed < self.camera_retry_cooldown:
return False
self.camera_last_retry_time = current_time
self.logger.info("尝试重新连接摄像头...")
cam_config = self.config['camera']
# 释放旧的摄像头资源
if self.camera is not None:
try:
self.camera.release()
except:
pass
self.camera = None
# 尝试重新打开
self.camera = cv2.VideoCapture(cam_config['device_id'])
if self.camera.isOpened():
# 设置摄像头参数
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width'])
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height'])
self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps'])
# 验证是否能读取帧
ret, frame = self.camera.read()
if ret:
self.logger.info("✓ 摄像头重新连接成功")
self.camera_failure_count = 0
return True
else:
self.logger.warning("摄像头已打开但无法读取帧")
self.camera.release()
self.camera = None
else:
self.logger.warning("无法重新打开摄像头")
return False
def _setup_logging(self):
"""设置日志系统"""
log_config = self.config['logging']
self.logger = logging.getLogger('FaceRecognition')
self.logger.setLevel(getattr(logging, log_config['level']))
# 文件处理器
file_handler = RotatingFileHandler(
log_config['file'],
maxBytes=log_config['max_bytes'],
backupCount=log_config['backup_count']
)
# 控制台处理器
console_handler = logging.StreamHandler()
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def _get_chinese_font(self) -> str:
"""获取中文字体路径"""
import platform
import os
system = platform.system()
# Windows系统
if system == "Windows":
font_paths = [
"C:/Windows/Fonts/msyh.ttc", # 微软雅黑
"C:/Windows/Fonts/simhei.ttf", # 黑体
"C:/Windows/Fonts/simsun.ttc", # 宋体
]
# macOS系统
elif system == "Darwin":
font_paths = [
"/System/Library/Fonts/PingFang.ttc", # 苹方
"/System/Library/Fonts/STHeiti Medium.ttc", # 黑体
"/Library/Fonts/Arial Unicode.ttf",
]
# Linux系统
else:
font_paths = [
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
]
# 查找可用的字体
for font_path in font_paths:
if os.path.exists(font_path):
self.logger.info(f"使用中文字体: {font_path}")
return font_path
# 如果没找到,尝试使用配置文件中的字体路径
if 'font_path' in self.config.get('display', {}):
custom_font = self.config['display']['font_path']
if os.path.exists(custom_font):
self.logger.info(f"使用自定义字体: {custom_font}")
return custom_font
# 默认使用一个基本字体(不支持中文,但至少不会报错)
self.logger.warning("未找到中文字体,将使用默认字体(可能无法显示中文)")
return "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
def _init_ffmpeg_stream(self):
"""初始化FFmpeg推流 - 关键修复"""
if not self.stream_enabled:
self.logger.info("流媒体推流功能未启用")
return False
# ⚠️ 关键修复:先强制清理所有旧的推流进程
self._force_cleanup_ffmpeg()
# 检查冷却期
if self.stream_last_retry_time:
elapsed = time.time() - self.stream_last_retry_time
if elapsed < self.stream_retry_cooldown:
self.logger.debug(f"推流重试冷却中,还需等待 {self.stream_retry_cooldown - elapsed:.1f}")
return False
stream_config = self.config['stream']
ffmpeg_config = stream_config['ffmpeg']
stream_url = stream_config.get('rtmp_url') or stream_config.get('stream_url', '')
cam_config = self.config['camera']
panel_height = 200
total_height = cam_config['height'] + panel_height
# 判断输出格式
if stream_url.startswith('rtmp://'):
output_format = 'flv'
elif stream_url.startswith('rtsp://'):
output_format = 'rtsp'
else:
self.logger.error(f"不支持的推流协议: {stream_url}")
return False
# 构建FFmpeg命令 - 关键优化
ffmpeg_cmd = [
'ffmpeg',
'-y',
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', f"{cam_config['width']}x{total_height}",
'-r', str(ffmpeg_config['fps']),
'-i', '-',
'-c:v', ffmpeg_config['video_codec'],
'-pix_fmt', ffmpeg_config['pixel_format'],
'-preset', 'ultrafast',
'-tune', ffmpeg_config['tune'],
'-b:v', ffmpeg_config['video_bitrate'],
'-maxrate', ffmpeg_config['video_bitrate'],
'-bufsize', '1M', # 减小缓冲区避免延迟累积
'-r', str(ffmpeg_config['fps']),
'-g', str(ffmpeg_config['fps'] * 2),
'-threads', '2',
'-flush_packets', '1', # 立即刷新包
]
if not ffmpeg_config.get('audio', False):
ffmpeg_cmd.append('-an')
# RTMP特定参数 - 关键优化
if output_format == 'flv':
ffmpeg_cmd.extend([
'-f', 'flv',
'-flvflags', 'no_duration_filesize',
# 关键修复移除或增加timeout避免7分钟后的连接超时
# 原来的5秒太短改为30秒或移除
# '-timeout', '30000000', # 30秒超时(微秒)
'-reconnect', '1',
'-reconnect_streamed', '1',
'-reconnect_delay_max', '5', # 增加重连延迟
'-reconnect_at_eof', '1', # EOF时重连
stream_url
])
elif output_format == 'rtsp':
ffmpeg_cmd.extend([
'-f', 'rtsp',
'-rtsp_transport', 'tcp',
stream_url
])
try:
self.stream_retry_count += 1
self.stream_last_retry_time = time.time()
self.logger.info(f"启动FFmpeg推流 (第{self.stream_retry_count}次尝试): {stream_url}")
# 关键修复将stderr重定向到DEVNULL而不是PIPE避免缓冲区满导致阻塞
# 或者启动线程持续读取stderr
self.ffmpeg_process = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE, # 保留PIPE用于错误诊断
bufsize=512 * 1024 # 512KB缓冲区
)
# 启动stderr读取线程 - 关键修复
self._start_stderr_reader()
# 等待检查
time.sleep(0.5)
if self.ffmpeg_process.poll() is not None:
stderr_output = self.ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
self.logger.error(f"FFmpeg进程启动失败:")
for line in stderr_output.split('\n'):
if 'error' in line.lower() or 'failed' in line.lower():
self.logger.error(f" {line.strip()}")
self.ffmpeg_process = None
return False
self.logger.info("FFmpeg推流进程启动成功")
# 启动推流线程
self._start_stream_thread()
if self.stream_retry_count > 0:
self.logger.info(f"推流恢复成功 (之前重试了{self.stream_retry_count}次)")
self.stream_retry_count = 0
return True
except FileNotFoundError:
self.logger.error("FFmpeg未安装或不在系统PATH中")
self.stream_enabled = False
return False
except Exception as e:
self.logger.error(f"启动FFmpeg推流失败: {e}")
return False
def _stderr_reader(self):
"""读取FFmpeg的stderr输出 - 防止管道缓冲区满"""
last_log_time = time.time()
error_lines = []
while self.stderr_thread_running and self.ffmpeg_process:
try:
if self.ffmpeg_process.stderr:
line = self.ffmpeg_process.stderr.readline()
if not line:
break
line_str = line.decode('utf-8', errors='ignore').strip()
# 只记录重要的错误和警告
if any(keyword in line_str.lower() for keyword in ['error', 'failed', 'warning']):
error_lines.append(line_str)
# 每10秒汇总输出一次
current_time = time.time()
if current_time - last_log_time >= 10:
if error_lines:
self.logger.warning(f"FFmpeg stderr最近10秒: {len(error_lines)}条消息")
# 只输出前3条
for err in error_lines[:3]:
self.logger.debug(f" {err}")
error_lines.clear()
last_log_time = current_time
except Exception as e:
self.logger.debug(f"读取stderr错误: {e}")
break
self.logger.debug("stderr读取线程已停止")
def _start_stderr_reader(self):
"""启动stderr读取线程 - 关键修复:防止管道阻塞"""
if self.stderr_thread and self.stderr_thread.is_alive():
return
self.stderr_thread_running = True
self.stderr_thread = threading.Thread(target=self._stderr_reader, daemon=True)
self.stderr_thread.start()
self.logger.debug("✓ stderr读取线程已启动")
def _start_stream_thread(self):
"""启动推流线程"""
if self.stream_thread is not None and self.stream_thread.is_alive():
self.logger.debug("推流线程已在运行")
return
self.stream_thread_running = True
self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True)
self.stream_thread.start()
self.logger.info("✓ 推流线程已启动")
def _stream_worker(self):
"""推流工作线程 - 添加写入超时保护"""
dropped_frames = 0
total_frames = 0
last_log_time = time.time()
consecutive_failures = 0
write_timeout = 1.0 # 写入超时1秒
while self.stream_thread_running:
try:
# 获取帧
try:
frame = self.stream_frame_queue.get(timeout=0.1)
except queue.Empty:
continue
if frame is None: # 停止信号
break
total_frames += 1
# 检查FFmpeg进程
if self.ffmpeg_process is None or self.ffmpeg_process.poll() is not None:
dropped_frames += 1
consecutive_failures += 1
if consecutive_failures >= 30:
if consecutive_failures == 30:
self.logger.error("FFmpeg进程异常需要重启")
self.ffmpeg_process = None
if consecutive_failures > 100:
consecutive_failures = 30
continue
try:
# 关键修复:添加写入超时机制
frame_bytes = frame.tobytes()
# 使用非阻塞写入通过检查stdin是否可写
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
# 简单的写入依赖较小的缓冲区和flush_packets
self.ffmpeg_process.stdin.write(frame_bytes)
# 关键定期flush避免缓冲区堆积
if total_frames % 30 == 0: # 每30帧flush一次
self.ffmpeg_process.stdin.flush()
consecutive_failures = 0
else:
dropped_frames += 1
consecutive_failures += 1
except BrokenPipeError:
self.logger.warning("推流管道断开")
self.ffmpeg_process = None
dropped_frames += 1
consecutive_failures += 1
except IOError as e:
if e.errno == 32: # Broken pipe
self.logger.warning("推流管道异常")
self.ffmpeg_process = None
dropped_frames += 1
consecutive_failures += 1
except Exception as e:
self.logger.debug(f"写入帧失败: {e}")
dropped_frames += 1
consecutive_failures += 1
# 统计信息
current_time = time.time()
if current_time - last_log_time >= 10:
if total_frames > 0:
drop_rate = (dropped_frames / total_frames) * 100
process_status = "运行中" if (
self.ffmpeg_process and self.ffmpeg_process.poll() is None) else "已停止"
self.logger.info(
f"推流统计: 总帧={total_frames}, 丢帧={dropped_frames}({drop_rate:.1f}%), "
f"队列={self.stream_frame_queue.qsize()}, 状态={process_status}"
)
dropped_frames = 0
total_frames = 0
last_log_time = current_time
except Exception as e:
self.logger.error(f"推流线程错误: {e}")
time.sleep(0.1)
self.logger.info("推流线程已停止")
def _push_frame_to_stream(self, frame: np.ndarray):
"""推送帧到流媒体服务器 - 非阻塞方式"""
if not self.stream_enabled or self.ffmpeg_process is None:
return
try:
# 非阻塞方式放入队列
try:
self.stream_frame_queue.put_nowait(frame)
except queue.Full:
# 队列满时丢弃最旧的帧
try:
self.stream_frame_queue.get_nowait()
self.stream_frame_queue.put_nowait(frame)
except:
pass # 丢帧,继续处理
except Exception as e:
self.logger.debug(f"推送帧到队列失败: {e}")
def _close_ffmpeg_stream(self):
"""关闭FFmpeg推流 - 使用强制清理"""
self.logger.info("关闭推流系统...")
self._force_cleanup_ffmpeg()
self.logger.info("推流系统已关闭")
def _init_compreface(self):
"""初始化CompreFace SDK"""
cf_config = self.config['compreface']
# 创建CompreFace实例
compre_face = CompreFace(
cf_config['host'],
cf_config['port'],
{
"limit": 0,
"det_prob_threshold": 0.8,
"prediction_count": 1
}
)
# 初始化识别和检测服务
self.recognition_service: RecognitionService = compre_face.init_face_recognition(
cf_config['recognition_api_key']
)
self.detection_service: DetectionService = compre_face.init_face_detection(
cf_config['detection_api_key']
)
self.logger.info("CompreFace服务初始化完成")
def _init_camera(self):
"""初始化摄像头"""
cam_config = self.config['camera']
retry_interval = cam_config.get('retry_interval', 3)
attempt = 0
while True:
attempt += 1
self.logger.info(f"正在尝试打开摄像头 (第{attempt}次尝试)...")
# 确保先释放之前的摄像头资源
if self.camera is not None:
try:
self.camera.release()
except:
pass
self.camera = None
self.camera = cv2.VideoCapture(cam_config['device_id'])
if self.camera.isOpened():
# 设置摄像头参数
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width'])
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height'])
self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps'])
# 验证是否能读取帧
ret, frame = self.camera.read()
if ret:
self.logger.info(f"摄像头初始化成功 (设备ID: {cam_config['device_id']})")
# 重置失败计数
self.camera_failure_count = 0
self.camera_last_retry_time = None
return True
else:
self.logger.warning("摄像头已打开但无法读取帧")
self.camera.release()
self.camera = None
else:
self.logger.warning(f"无法打开摄像头设备 {cam_config['device_id']}")
# 等待后重试
self.logger.info(f"{retry_interval}秒后重试...")
time.sleep(retry_interval)
def assess_frame_quality(self, frame: np.ndarray) -> float:
"""评估帧质量(使用Laplacian方差检测模糊度)"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def detect_faces(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
"""检测人脸"""
try:
# 将帧编码为JPEG
_, img_encoded = cv2.imencode('.jpg', frame)
# 调用CompreFace检测API启用pose插件
result = self.detection_service.detect(
img_encoded.tobytes(),
face_plugins="pose"
)
if result and 'result' in result and len(result['result']) > 0:
faces = result['result']
# 过滤掉太小的人脸
min_size = self.config['face_detection']['min_face_size']
valid_faces = []
# 获取角度阈值
max_yaw = self.config['face_detection'].get('max_yaw', 30.0)
max_pitch = self.config['face_detection'].get('max_pitch', 30.0)
for face in faces:
# 检查人脸尺寸
face_width = face['box']['x_max'] - face['box']['x_min']
face_height = face['box']['y_max'] - face['box']['y_min']
if face_width >= min_size and face_height >= min_size:
# 检查人脸角度
pose = face.get('pose', {})
yaw = pose.get('yaw', 0.0)
pitch = pose.get('pitch', 0.0)
# 判断是否为正脸
if abs(yaw) <= max_yaw and abs(pitch) <= max_pitch:
valid_faces.append(face)
self.logger.debug(f"人脸角度检查通过: yaw={yaw:.1f}°, pitch={pitch:.1f}°")
else:
self.logger.debug(f"人脸角度超出范围,跳过: yaw={yaw:.1f}°(阈值±{max_yaw}°), pitch={pitch:.1f}°(阈值±{max_pitch}°)")
if valid_faces:
# 返回第一个(最大的)人脸
return valid_faces[0]
return None
except Exception as e:
self.logger.error(f"人脸检测错误: {e}")
return None
def recognize_face(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
"""识别人脸"""
try:
# 将帧编码为JPEG
_, img_encoded = cv2.imencode('.jpg', frame)
# 调用CompreFace识别API
result = self.recognition_service.recognize(img_encoded.tobytes())
if result and 'result' in result and len(result['result']) > 0:
faces = result['result']
if len(faces[0]['subjects']) > 0:
# 返回第一个识别结果
subject = faces[0]['subjects'][0]
return {
'subject': subject['subject'],
'similarity': subject['similarity'],
'box': faces[0]['box']
}
return None
except Exception as e:
self.logger.error(f"人脸识别错误: {e}")
return None
def determine_role(self, person_id: str, similarity: float) -> Tuple[str, str]:
"""根据相似度确定角色"""
role_config = self.config['role_mapping']
if similarity < role_config['stranger_threshold']:
return "未知", "陌生人"
else:
t = person_id.split("_")
name = t[0]
role = "员工" if len(t) == 1 else "访客"
return name, role
def should_recognize(self, person_id: str) -> bool:
"""检查是否应该识别(防止重复识别)"""
cooldown = self.config['face_recognition']['recognition_cooldown']
if person_id not in self.recognition_history:
return True
last_time = self.recognition_history[person_id]
elapsed = (datetime.now() - last_time).total_seconds()
return elapsed >= cooldown
def cv2_add_chinese_text(self, img: np.ndarray, text: str, position: Tuple[int, int],
font: ImageFont.FreeTypeFont, text_color: Tuple[int, int, int]) -> np.ndarray:
"""在OpenCV图像上添加中文文本"""
# 转换为PIL图像
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
# 绘制文本
draw.text(position, text, font=font, fill=text_color)
# 转换回OpenCV格式
img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return img
def draw_info_on_frame(self, frame: np.ndarray) -> np.ndarray:
"""在帧上绘制检测和识别信息"""
display_frame = frame.copy()
h, w = display_frame.shape[:2]
# 绘制人脸框和识别结果
if self.display_info['face_detected'] and self.display_info['face_box']:
box = self.display_info['face_box']
x_min = int(box['x_min'])
y_min = int(box['y_min'])
x_max = int(box['x_max'])
y_max = int(box['y_max'])
# 根据识别状态选择颜色
if self.display_info['person_name']:
# 已识别 - 绿色
color = (0, 255, 0)
thickness = 3
else:
# 仅检测到 - 黄色
color = (0, 255, 255)
thickness = 2
# 绘制人脸框
cv2.rectangle(display_frame, (x_min, y_min), (x_max, y_max), color, thickness)
# 绘制识别信息
if self.display_info['person_name']:
name = self.display_info['person_name']
role = self.display_info['person_role']
similarity = self.display_info['similarity']
# 准备文本
text_lines = [
f"姓名: {name}",
f"角色: {role}",
f"相似度: {similarity:.2%}"
]
# 计算文本背景框
line_height = 35
padding = 10
# 计算所需的背景高度
bg_height = len(text_lines) * line_height + padding * 2
bg_y_start = max(0, y_min - bg_height - 10)
# 绘制文本背景
cv2.rectangle(
display_frame,
(x_min, bg_y_start),
(x_max, bg_y_start + bg_height),
(0, 0, 0),
-1
)
cv2.rectangle(
display_frame,
(x_min, bg_y_start),
(x_max, bg_y_start + bg_height),
color,
2
)
# 使用PIL绘制中文文本
for i, text in enumerate(text_lines):
y_pos = bg_y_start + padding + i * line_height
display_frame = self.cv2_add_chinese_text(
display_frame,
text,
(x_min + padding, y_pos),
self.font_medium,
(255, 255, 255)
)
# 绘制状态信息面板
panel_height = 200
panel_bg = np.zeros((panel_height, w, 3), dtype=np.uint8)
panel_bg[:] = (40, 40, 40)
# 状态信息
y_offset = 30
x_offset = 15
line_spacing = 30
# WebSocket连接状态
ws_status = "已连接" if self.ws_connected else "未连接"
ws_color = (0, 255, 0) if self.ws_connected else (0, 0, 255)
status_texts = [
# f"帧率: {self.display_info['fps']:.1f} FPS",
# f"帧数: {self.display_info['frame_count']}",
f"质量: {self.display_info['quality']:.1f}",
# f"检测到人脸: {'是' if self.display_info['face_detected'] else '否'}",
f"WebSocket: {ws_status}",
f"机器人说话: {'' if self.robot_status['is_speaking'] else ''}",
f"机器人思考: {'' if self.robot_status['is_thinking'] else ''}",
f"机器人角色: {self.robot_status['role_name']}"
]
# 如果人脸持续出现,显示倒计时
if self.face_present_start:
elapsed = (datetime.now() - self.face_present_start).total_seconds()
face_duration = self.config['face_detection']['face_present_duration']
remaining = max(0, face_duration - elapsed)
status_texts.append(f"识别倒计时: {remaining:.1f}")
# 使用PIL绘制中文状态文本
for i, text in enumerate(status_texts):
# WebSocket状态使用特殊颜色
if i == 4: # "WebSocket: " 这一行
panel_bg = self.cv2_add_chinese_text(
panel_bg,
text,
(x_offset, y_offset + i * line_spacing),
self.font_small,
ws_color
)
else:
panel_bg = self.cv2_add_chinese_text(
panel_bg,
text,
(x_offset, y_offset + i * line_spacing),
self.font_small,
(255, 255, 255)
)
# 将面板添加到画面底部
display_frame = np.vstack([display_frame, panel_bg])
return display_frame
def update_fps(self):
"""更新FPS计算"""
self.fps_counter += 1
current_time = time.time()
elapsed = current_time - self.last_fps_time
if elapsed >= 1.0:
self.display_info['fps'] = self.fps_counter / elapsed
self.fps_counter = 0
self.last_fps_time = current_time
async def send_websocket_message(self, message: Dict[str, Any]):
"""发送WebSocket消息"""
if self.ws and self.ws_connected:
try:
await self.ws.send(json.dumps(message))
self.logger.debug(f"发送消息: {message}")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已关闭,无法发送消息")
self.ws_connected = False
except Exception as e:
self.logger.error(f"发送WebSocket消息失败: {e}")
self.ws_connected = False
else:
self.logger.debug("WebSocket未连接,消息发送失败")
async def query_robot_status(self):
"""定期查询机器人状态"""
interval = self.config['websocket']['status_interval']
try:
while self.ws_connected:
try:
status_msg = {
"type": "get_status",
"message": ""
}
await self.send_websocket_message(status_msg)
await asyncio.sleep(interval)
except Exception as e:
self.logger.error(f"查询状态错误: {e}")
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"状态查询任务异常: {e}")
self.ws_connected = False
async def handle_websocket_messages(self):
"""处理WebSocket接收的消息"""
try:
while self.ws_connected:
try:
message = await self.ws.recv()
data = json.loads(message)
if data.get('type') == 'status':
status = data.get('message', {})
self.robot_status['is_speaking'] = status.get('is_speaking', False)
self.robot_status['is_thinking'] = status.get('is_thinking', False)
self.robot_status['listening'] = status.get('listening', False)
self.robot_status['is_asr_processing'] = status.get('is_asr_processing', False)
self.robot_status['role_name'] = status.get('role_name', '访客引导者')
self.logger.debug(f"机器人状态: {self.robot_status}")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket消息接收中断: 连接已关闭")
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"处理WebSocket消息错误: {e}")
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"WebSocket消息处理任务异常: {e}")
self.ws_connected = False
async def connect_websocket(self):
"""连接WebSocket - 无限重连"""
reconnect_delay = self.config['websocket']['reconnect_delay']
while True:
try:
self.ws_reconnect_count += 1
# 控制日志输出频率
if self.ws_reconnect_count <= 3:
self.logger.info(f"连接WebSocket: {self.ws_url} (第{self.ws_reconnect_count}次尝试)")
elif self.ws_reconnect_count % 10 == 0:
self.logger.info(f"持续尝试连接WebSocket (第{self.ws_reconnect_count}次)")
else:
self.logger.debug(f"尝试连接WebSocket (第{self.ws_reconnect_count}次)")
# 设置连接超时
async with websockets.connect(
self.ws_url,
ping_interval=20, # 每20秒发送ping
ping_timeout=10, # ping超时10秒
close_timeout=5 # 关闭超时5秒
) as ws:
self.ws = ws
self.ws_connected = True
self.ws_reconnect_count = 0 # 连接成功,重置计数
self.logger.info("✓ WebSocket连接成功")
# 同时运行状态查询和消息接收
try:
await asyncio.gather(
self.query_robot_status(),
self.handle_websocket_messages()
)
except Exception as e:
self.logger.warning(f"WebSocket任务组异常: {e}")
finally:
# 确保连接状态被重置
self.ws_connected = False
self.ws = None
self.logger.info("WebSocket连接已断开,准备重连...")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已正常关闭")
self.ws_connected = False
self.ws = None
except ConnectionRefusedError:
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket连接被拒绝: {self.ws_url}")
elif self.ws_reconnect_count == 4:
self.logger.warning("WebSocket持续连接失败,将减少日志输出频率")
self.ws_connected = False
self.ws = None
except OSError as e:
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket网络错误: {e}")
self.ws_connected = False
self.ws = None
except Exception as e:
error_msg = str(e)
# 过滤常见的连接错误
if "no close frame" not in error_msg and "Connection closed" not in error_msg:
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket连接错误: {e}")
elif self.ws_reconnect_count == 4:
self.logger.warning("WebSocket持续连接失败,将减少日志输出频率")
self.ws_connected = False
self.ws = None
# 等待后重连
if self.ws_reconnect_count <= 3:
self.logger.info(f"{reconnect_delay}秒后重连...")
elif self.ws_reconnect_count % 10 == 0:
self.logger.info(f"{reconnect_delay}秒后继续尝试重连...")
await asyncio.sleep(reconnect_delay)
def can_perform_detection(self) -> bool:
"""检查是否可以进行人脸检测"""
return not self.robot_status['is_speaking'] and not self.robot_status['is_thinking'] and self.robot_status[
'role_name'] == "访客引导者"
async def process_video_stream(self):
"""处理视频流"""
self._init_camera()
# 初始化FFmpeg推流
if self.stream_enabled:
self._init_ffmpeg_stream()
frame_interval = self.config['face_detection']['frame_interval']
quality_threshold = self.config['face_detection']['quality_threshold']
face_duration = self.config['face_detection']['face_present_duration']
self.logger.info("开始处理视频流")
# 创建显示窗口并最大化
window_name = 'Hello'
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL | cv2.WINDOW_GUI_NORMAL)
# 设置窗口为全屏或最大化
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_NORMAL)
try:
import screeninfo
screen = screeninfo.get_monitors()[0]
cv2.resizeWindow(window_name, screen.width, screen.height)
cv2.moveWindow(window_name, 0, 0)
except:
cv2.resizeWindow(window_name, 1920, 1080)
self.logger.warning("无法获取屏幕分辨率,使用默认窗口大小")
# 添加推流健康检查计数器
stream_check_counter = 0
stream_check_interval = 150 # 每150帧约5秒检查一次
# 添加人脸消失计数器
no_face_counter = 0
no_face_threshold = 30 # 连续30帧约1秒没有检测到人脸才清空
try:
while True:
# 检查机器人角色并显示/隐藏视频
self._check_role_and_display_video()
# 如果视频正在播放,更新视频帧
if self.video_showing:
self._update_video()
# 检查摄像头是否可用
if self.camera is None or not self.camera.isOpened():
self.logger.warning("摄像头未打开,尝试重新连接...")
if not self._try_reconnect_camera():
await asyncio.sleep(0.1)
continue
ret, frame = self.camera.read()
if not ret:
self.camera_failure_count += 1
if self.camera_failure_count <= 3:
self.logger.warning(f"无法读取摄像头帧 (连续失败{self.camera_failure_count}次)")
elif self.camera_failure_count == 4:
self.logger.warning("摄像头持续读取失败,将减少日志输出")
if self.camera_failure_count >= self.camera_max_failures:
self.logger.error(f"摄像头连续失败{self.camera_failure_count}次,尝试重新初始化...")
if self._try_reconnect_camera():
self.logger.info("摄像头重新初始化成功,继续处理")
continue
else:
self.camera_failure_count = 0
await asyncio.sleep(0.1)
continue
# 成功读取帧,重置失败计数
if self.camera_failure_count > 0:
self.logger.info(f"摄像头恢复正常 (之前连续失败{self.camera_failure_count}次)")
self.camera_failure_count = 0
self.frame_count += 1
# 默认清空检测和识别结果
should_detect = False
# 检查是否到达检测间隔
if self.frame_count % frame_interval == 0:
if self.can_perform_detection():
quality = self.assess_frame_quality(frame)
self.display_info['quality'] = quality
if quality >= quality_threshold:
should_detect = True
else:
self.logger.debug(f"帧质量不足: {quality:.2f}")
else:
self.logger.debug("机器人正在说话或思考,跳过检测")
self.face_present_start = None
self.display_info['quality'] = self.assess_frame_quality(frame)
else:
self.display_info['quality'] = 0
# 显示二维码时不进行人脸识别。
if self.qrcode_showing:
should_detect = False
# 执行人脸检测
if should_detect:
face = self.detect_faces(frame)
if face:
# 检测到人脸,重置消失计数器
no_face_counter = 0
self.display_info['face_detected'] = True
self.display_info['face_box'] = face['box']
# 记录人脸出现时间
if self.face_present_start is None:
self.face_present_start = datetime.now()
self.logger.info("检测到人脸,开始计时")
# 清空之前的识别结果
self.display_info['person_name'] = None
self.display_info['person_role'] = None
self.display_info['similarity'] = 0
# 检查是否满足持续出现时长
elapsed = (datetime.now() - self.face_present_start).total_seconds()
if elapsed >= face_duration:
self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别")
# 进行人脸识别
recognition_result = self.recognize_face(frame)
if recognition_result:
person_id = recognition_result['subject']
similarity = recognition_result['similarity']
# 更新显示信息
self.display_info['face_box'] = recognition_result['box']
# 检查是否应该识别(防止重复)
if self.should_recognize(person_id):
name, role = self.determine_role(person_id, similarity)
# 更新显示信息
self.display_info['person_name'] = name
self.display_info['person_role'] = role
self.display_info['similarity'] = similarity
self.logger.info(
f"识别到: {name}, 相似度: {similarity:.6f}, 角色: {role}"
)
# 如果是陌生人,显示二维码
if role == "陌生人":
self._show_qrcode()
# 发送识别结果
reception_msg = {
"type": "start_reception",
"message": {
"name": name,
"role": role
}
}
await self.send_websocket_message(reception_msg)
# 记录识别时间
self.recognition_history[person_id] = datetime.now()
# 重置计时器
self.face_present_start = None
else:
# 未识别到已知人脸
if self.should_recognize("unknown"):
self.logger.info("检测到陌生人")
# 更新显示信息
self.display_info['person_name'] = "未知访客"
self.display_info['person_role'] = "陌生人"
self.display_info['similarity'] = 0
# 显示二维码
self._show_qrcode()
reception_msg = {
"type": "start_reception",
"message": {
"name": "未知访客",
"role": "陌生人"
}
}
await self.send_websocket_message(reception_msg)
self.recognition_history["unknown"] = datetime.now()
# 重置计时器
self.face_present_start = None
else:
# 没有检测到人脸
no_face_counter += 1
# 连续一段时间没检测到人脸,清空显示信息
if no_face_counter >= no_face_threshold:
if self.display_info['face_detected'] or self.display_info['person_name']:
self.logger.info("人脸消失,清空识别信息")
self.display_info['face_detected'] = False
self.display_info['face_box'] = None
self.display_info['person_name'] = None
self.display_info['person_role'] = None
self.display_info['similarity'] = 0
# 重置计时器
if self.face_present_start is not None:
self.logger.debug("人脸消失,重置计时器")
self.face_present_start = None
# 绘制信息并显示
display_frame = self.draw_info_on_frame(frame)
cv2.imshow(window_name, display_frame)
# 检查二维码显示是否超时
self._check_qrcode_timeout()
# 推送到流媒体服务器
if self.stream_enabled:
stream_check_counter += 1
# 定期健康检查
if stream_check_counter >= stream_check_interval:
stream_check_counter = 0
# 检查FFmpeg进程状态
ffmpeg_alive = self.ffmpeg_process is not None and self.ffmpeg_process.poll() is None
thread_alive = self.stream_thread is not None and self.stream_thread.is_alive()
# 如果FFmpeg进程死了或线程停了尝试重启
if not ffmpeg_alive or not thread_alive:
current_time = time.time()
# 检查是否可以重试
can_retry = (
self.stream_retry_count < self.stream_max_retries and
(self.stream_last_retry_time is None or
current_time - self.stream_last_retry_time >= self.stream_retry_cooldown)
)
if can_retry:
self.logger.warning(
f"检测到推流异常 (FFmpeg存活={ffmpeg_alive}, 线程存活={thread_alive}),尝试重启...")
# 先清理旧的
if self.ffmpeg_process:
try:
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
self.ffmpeg_process.stdin.close()
self.ffmpeg_process.kill()
self.ffmpeg_process.wait(timeout=2)
except:
pass
self.ffmpeg_process = None
# 停止旧线程
if self.stream_thread and self.stream_thread.is_alive():
self.stream_thread_running = False
try:
self.stream_frame_queue.put(None, timeout=1)
except:
pass
self.stream_thread.join(timeout=2)
# 清空队列
while not self.stream_frame_queue.empty():
try:
self.stream_frame_queue.get_nowait()
except:
break
# 重新初始化
success = self._init_ffmpeg_stream()
if success:
self.logger.info("✓ 推流重启成功")
else:
self.logger.error("✗ 推流重启失败")
elif self.stream_retry_count >= self.stream_max_retries:
if stream_check_counter % 600 == 0: # 每600帧约20秒提示一次
self.logger.warning(f"推流已达到最大重试次数({self.stream_max_retries}),已停止尝试")
# 推送帧(非阻塞)
if self.ffmpeg_process and self.ffmpeg_process.poll() is None:
self._push_frame_to_stream(display_frame)
# 检查键盘输入
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # 'q' 或 ESC 退出
self.logger.info("用户请求退出")
break
await asyncio.sleep(0.001)
except Exception as e:
self.logger.error(f"视频流处理错误: {e}")
finally:
# 清理资源
if self.camera:
self.camera.release()
self.logger.info("摄像头已释放")
# 关闭二维码窗口
self._close_qrcode()
# 关闭视频窗口
self._close_video()
# 关闭FFmpeg推流
if self.stream_enabled:
self._close_ffmpeg_stream()
cv2.destroyAllWindows()
async def run(self):
"""运行系统"""
self.logger.info("启动人脸识别系统")
# 同时运行WebSocket连接和视频处理
await asyncio.gather(
self.connect_websocket(),
self.process_video_stream()
)
def main():
"""主函数"""
system = FaceRecognitionSystem("config.yaml")
try:
asyncio.run(system.run())
except KeyboardInterrupt:
print("\n系统已停止")
except Exception as e:
print(f"系统错误: {e}")
if __name__ == "__main__":
main()