2033 lines
82 KiB
Python
2033 lines
82 KiB
Python
import cv2
|
||
import asyncio
|
||
import websockets
|
||
import json
|
||
import yaml
|
||
import logging
|
||
from logging.handlers import RotatingFileHandler
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Dict, Any, Tuple
|
||
import numpy as np
|
||
import time
|
||
from collections import defaultdict
|
||
from PIL import Image, ImageDraw, ImageFont, ImageFilter
|
||
import subprocess
|
||
import os
|
||
import threading
|
||
import queue
|
||
from compreface import CompreFace
|
||
from compreface.service import RecognitionService, DetectionService
|
||
|
||
|
||
class FaceRecognitionSystem:
|
||
def __init__(self, config_path: str = "config.yaml"):
|
||
"""初始化人脸识别系统"""
|
||
# 加载配置
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
self.config = yaml.safe_load(f)
|
||
|
||
# 设置日志
|
||
self._setup_logging()
|
||
|
||
# 初始化CompreFace
|
||
self._init_compreface()
|
||
|
||
# 初始化摄像头
|
||
self.camera = None
|
||
|
||
# WebSocket连接
|
||
self.ws = None
|
||
self.ws_url = self.config['websocket']['url']
|
||
self.ws_connected = False # WebSocket连接状态标志
|
||
self.ws_reconnect_count = 0 # 重连次数计数(用于日志控制)
|
||
|
||
# 状态变量
|
||
self.robot_status = {
|
||
'is_speaking': False,
|
||
'is_thinking': False,
|
||
'listening': False,
|
||
'role_name': ''
|
||
}
|
||
|
||
# 人脸检测状态
|
||
self.frame_count = 0
|
||
self.face_present_start = None
|
||
self.current_face_id = None
|
||
|
||
# 识别记录(防止重复识别)
|
||
self.recognition_history = {} # {person_id: last_recognition_time}
|
||
|
||
# 显示相关变量
|
||
self.current_display_frame = None
|
||
self.last_detection_result = None # 最后的检测结果
|
||
self.last_recognition_result = None # 最后的识别结果
|
||
self.display_info = {
|
||
'quality': 0,
|
||
'face_detected': False,
|
||
'face_box': None,
|
||
'person_name': None,
|
||
'person_role': None,
|
||
'similarity': 0,
|
||
'frame_count': 0,
|
||
'fps': 0
|
||
}
|
||
self.last_fps_time = time.time()
|
||
self.fps_counter = 0
|
||
|
||
# 加载中文字体
|
||
self.font_path = self._get_chinese_font()
|
||
self.font_small = ImageFont.truetype(self.font_path, 20)
|
||
self.font_medium = ImageFont.truetype(self.font_path, 24)
|
||
self.font_large = ImageFont.truetype(self.font_path, 50)
|
||
|
||
# SOTA QR Design Fonts
|
||
self.font_qr_title = ImageFont.truetype(self.font_path, 56)
|
||
self.font_qr_subtitle = ImageFont.truetype(self.font_path, 30)
|
||
self.font_qr_body = ImageFont.truetype(self.font_path, 24)
|
||
self.font_qr_step_title = ImageFont.truetype(self.font_path, 28)
|
||
self.font_qr_badge = ImageFont.truetype(self.font_path, 30)
|
||
|
||
self.qrcode_instruction_title = "访客预约流程"
|
||
self.qrcode_instruction_subtitle = "Visitor Registration Process"
|
||
self.qrcode_instruction_steps = [
|
||
"第一步:扫码关注“康达新材”公众号。",
|
||
"第二步:点击【关于我们】→【我是访客】进入“访客注册”界面,填写并上传相应信息并点击“提交”。",
|
||
"第三步:将第二步信息提交完毕后在“访客预约”界面选择右下角的“+”号按钮,填写预约信息。",
|
||
"第四步:请仔细阅读安全告知书,点击我知道了。",
|
||
"第五步:填写被访人信息及来访事由等内容并提交。",
|
||
"第六步:显示提交成功,并且手机、微信会收到预约相关短信通知。",
|
||
]
|
||
|
||
# FFmpeg推流进程
|
||
self.ffmpeg_process = None
|
||
self.stream_enabled = self.config.get('stream', {}).get('enabled', False)
|
||
self.stream_retry_count = 0 # 推流重试计数
|
||
self.stream_max_retries = 5 # 最大重试次数
|
||
self.stream_last_retry_time = None # 上次重试时间
|
||
self.stream_retry_cooldown = 10 # 重试冷却时间(秒)
|
||
|
||
# 添加异步推流队列和线程
|
||
self.stream_frame_queue = queue.Queue(maxsize=10) # 限制队列大小防止内存溢出
|
||
self.stream_thread = None
|
||
self.stream_thread_running = False
|
||
|
||
self.logger.info("人脸识别系统初始化完成")
|
||
|
||
# 添加摄像头状态跟踪
|
||
self.camera_failure_count = 0 # 连续失败次数
|
||
self.camera_last_retry_time = None # 上次重试时间
|
||
self.camera_retry_cooldown = 3 # 重试冷却时间(秒)
|
||
self.camera_max_failures = 5 # 触发重新初始化的失败次数阈值
|
||
|
||
# 二维码显示相关
|
||
self.qrcode_image_path = self.config.get('qrcode', {}).get('image_path', 'qrcode.png')
|
||
self.qrcode_display_duration = self.config.get('qrcode', {}).get('display_duration', 10)
|
||
self.qrcode_window_name = 'Visitor guidance QR code'
|
||
self.qrcode_display_start_time = None # 二维码显示开始时间
|
||
self.qrcode_showing = False # 二维码是否正在显示
|
||
|
||
# 视频播放相关
|
||
self.video_path = self.config.get('video', {}).get('path', 'exhibition.mp4')
|
||
self.video_window_name = 'Hello Video'
|
||
self.video_showing = False
|
||
self.video_capture = None
|
||
self.video_start_time = None
|
||
self.video_loop = self.config.get('video', {}).get('loop', True) # 是否循环播放
|
||
self.last_robot_role = '' # 记录上一次的机器人角色
|
||
|
||
# 添加stderr读取线程
|
||
self.stderr_thread = None
|
||
self.stderr_thread_running = False
|
||
|
||
def _force_cleanup_ffmpeg(self):
|
||
"""强制清理所有FFmpeg相关资源 - 防止多进程推流"""
|
||
self.logger.info("=== 开始清理FFmpeg资源 ===")
|
||
|
||
# 1. 停止stderr读取线程
|
||
if self.stderr_thread and self.stderr_thread.is_alive():
|
||
self.logger.debug("停止stderr线程...")
|
||
self.stderr_thread_running = False
|
||
self.stderr_thread.join(timeout=1)
|
||
|
||
# 2. 停止推流线程
|
||
if self.stream_thread and self.stream_thread.is_alive():
|
||
self.logger.debug("停止推流线程...")
|
||
self.stream_thread_running = False
|
||
try:
|
||
self.stream_frame_queue.put(None, timeout=0.5)
|
||
except:
|
||
pass
|
||
self.stream_thread.join(timeout=2)
|
||
|
||
# 3. 清空帧队列
|
||
cleared = 0
|
||
while not self.stream_frame_queue.empty():
|
||
try:
|
||
self.stream_frame_queue.get_nowait()
|
||
cleared += 1
|
||
except:
|
||
break
|
||
if cleared > 0:
|
||
self.logger.debug(f"清空队列: {cleared}帧")
|
||
|
||
# 4. 强制终止FFmpeg进程
|
||
if self.ffmpeg_process:
|
||
self.logger.warning("发现旧的FFmpeg进程,强制终止...")
|
||
try:
|
||
# 先尝试关闭stdin
|
||
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||
try:
|
||
self.ffmpeg_process.stdin.close()
|
||
except:
|
||
pass
|
||
|
||
# 获取进程PID
|
||
pid = self.ffmpeg_process.pid
|
||
self.logger.info(f"终止FFmpeg进程 PID={pid}")
|
||
|
||
# 立即kill(不等待graceful shutdown)
|
||
self.ffmpeg_process.kill()
|
||
self.ffmpeg_process.wait(timeout=2)
|
||
self.logger.info(f"✓ FFmpeg进程已终止 (PID={pid})")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"终止FFmpeg进程失败: {e}")
|
||
finally:
|
||
self.ffmpeg_process = None
|
||
|
||
# 5. 额外安全措施:杀死所有可能残留的ffmpeg进程
|
||
try:
|
||
import psutil
|
||
current_pid = os.getpid()
|
||
killed_count = 0
|
||
|
||
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||
try:
|
||
# 查找ffmpeg进程
|
||
if proc.info['name'] and 'ffmpeg' in proc.info['name'].lower():
|
||
# 检查是否是我们启动的(通过检查命令行参数)
|
||
cmdline = proc.info.get('cmdline', [])
|
||
if cmdline and any('rawvideo' in str(arg) for arg in cmdline):
|
||
if proc.pid != current_pid:
|
||
self.logger.warning(f"发现残留FFmpeg进程 PID={proc.pid}, 正在终止...")
|
||
proc.kill()
|
||
proc.wait(timeout=2)
|
||
killed_count += 1
|
||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||
pass
|
||
|
||
if killed_count > 0:
|
||
self.logger.warning(f"✓ 清理了 {killed_count} 个残留FFmpeg进程")
|
||
|
||
except ImportError:
|
||
self.logger.debug("未安装psutil,跳过残留进程清理(建议安装: pip install psutil)")
|
||
except Exception as e:
|
||
self.logger.debug(f"清理残留进程时出错: {e}")
|
||
|
||
# 6. 等待一小段时间确保资源释放
|
||
time.sleep(0.5)
|
||
|
||
self.logger.info("=== FFmpeg资源清理完成 ===")
|
||
|
||
def _show_video(self):
|
||
"""显示展厅讲解视频"""
|
||
try:
|
||
if not os.path.exists(self.video_path):
|
||
self.logger.error(f"视频文件不存在: {self.video_path}")
|
||
return False
|
||
|
||
# 如果视频已经在播放,不需要重复初始化
|
||
if self.video_showing and self.video_capture is not None:
|
||
return True
|
||
|
||
# 打开视频文件
|
||
self.video_capture = cv2.VideoCapture(self.video_path)
|
||
if not self.video_capture.isOpened():
|
||
self.logger.error(f"无法打开视频文件: {self.video_path}")
|
||
return False
|
||
|
||
# 创建视频窗口
|
||
cv2.namedWindow(self.video_window_name, cv2.WINDOW_NORMAL | cv2.WINDOW_GUI_NORMAL)
|
||
|
||
# 获取视频尺寸
|
||
video_width = int(self.video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
video_height = int(self.video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
video_fps = self.video_capture.get(cv2.CAP_PROP_FPS)
|
||
|
||
# 设置窗口大小和位置
|
||
try:
|
||
import screeninfo
|
||
screen = screeninfo.get_monitors()[0]
|
||
|
||
# 设置视频显示尺寸为屏幕的80%
|
||
target_width = int(screen.width)
|
||
target_height = int(screen.height)
|
||
|
||
# 保持宽高比
|
||
scale_width = target_width / video_width
|
||
scale_height = target_height / video_height
|
||
scale = min(scale_width, scale_height)
|
||
|
||
new_width = int(video_width * scale)
|
||
new_height = int(video_height * scale)
|
||
|
||
# 计算居中位置
|
||
x_pos = (screen.width - new_width) // 2
|
||
y_pos = (screen.height - new_height) // 2
|
||
|
||
# 设置窗口大小和位置
|
||
cv2.resizeWindow(self.video_window_name, new_width, new_height)
|
||
cv2.moveWindow(self.video_window_name, x_pos, y_pos)
|
||
|
||
self.logger.info(f"视频显示尺寸: {new_width}x{new_height}, 位置: ({x_pos}, {y_pos})")
|
||
|
||
except Exception as e:
|
||
self.logger.warning(f"无法获取屏幕信息,使用默认尺寸: {e}")
|
||
cv2.resizeWindow(self.video_window_name, 1280, 720)
|
||
|
||
# 设置窗口置顶
|
||
cv2.setWindowProperty(self.video_window_name, cv2.WND_PROP_TOPMOST, 1)
|
||
|
||
self.video_showing = True
|
||
self.video_start_time = time.time()
|
||
self.logger.info(f"开始播放展厅讲解视频: {self.video_path} (FPS: {video_fps:.2f})")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"显示视频失败: {e}")
|
||
return False
|
||
|
||
def _draw_robot_status_on_video(self, frame: np.ndarray) -> np.ndarray:
|
||
"""在视频帧上绘制机器人状态信息"""
|
||
try:
|
||
h, w = frame.shape[:2]
|
||
|
||
# 准备状态文本
|
||
status_texts = []
|
||
|
||
if self.robot_status['is_thinking']:
|
||
status_texts.append("🤔 正在思考...")
|
||
|
||
if self.robot_status['is_asr_processing']:
|
||
status_texts.append("👂 正在倾听...")
|
||
|
||
if self.robot_status['is_speaking']:
|
||
status_texts.append("👂 正在讲话...")
|
||
|
||
# 如果没有状态需要显示,直接返回原帧
|
||
if not status_texts:
|
||
return frame
|
||
|
||
# 转换为PIL图像以支持中文
|
||
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
||
draw = ImageDraw.Draw(frame_pil)
|
||
|
||
# 计算文本位置(中间上方)
|
||
y_start = 50 # 距离顶部的距离
|
||
line_height = 45
|
||
|
||
for i, text in enumerate(status_texts):
|
||
# 获取文本尺寸(兼容旧版本Pillow)
|
||
try:
|
||
# Pillow >= 8.0.0
|
||
bbox = draw.textbbox((0, 0), text, font=self.font_large)
|
||
text_width = bbox[2] - bbox[0]
|
||
text_height = bbox[3] - bbox[1]
|
||
except AttributeError:
|
||
# Pillow < 8.0.0
|
||
text_width, text_height = draw.textsize(text, font=self.font_large)
|
||
|
||
# 计算居中位置
|
||
x = (w - text_width) // 2
|
||
y = y_start + i * line_height
|
||
|
||
# 绘制半透明背景
|
||
padding = 15
|
||
bg_x1 = x - padding
|
||
bg_y1 = y - padding
|
||
bg_x2 = x + text_width + padding
|
||
bg_y2 = y + text_height + padding
|
||
|
||
# 创建半透明背景
|
||
overlay = frame_pil.copy()
|
||
overlay_draw = ImageDraw.Draw(overlay)
|
||
overlay_draw.rectangle(
|
||
[bg_x1, bg_y1, bg_x2, bg_y2],
|
||
fill=(0, 0, 0, 180) # 黑色半透明
|
||
)
|
||
|
||
# 混合背景
|
||
frame_pil = Image.blend(frame_pil, overlay, 0.7)
|
||
draw = ImageDraw.Draw(frame_pil)
|
||
|
||
# 绘制边框
|
||
draw.rectangle(
|
||
[bg_x1, bg_y1, bg_x2, bg_y2],
|
||
outline=(0, 255, 255), # 黄色边框
|
||
width=3
|
||
)
|
||
|
||
# 绘制文本
|
||
draw.text((x, y), text, font=self.font_large, fill=(255, 255, 255))
|
||
|
||
# 转换回OpenCV格式
|
||
frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)
|
||
return frame
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"在视频上绘制状态失败: {e}")
|
||
return frame
|
||
|
||
def _update_video(self):
|
||
"""更新视频帧(在主循环中调用)"""
|
||
if not self.video_showing or self.video_capture is None:
|
||
return
|
||
|
||
try:
|
||
ret, frame = self.video_capture.read()
|
||
|
||
if ret:
|
||
# 在视频帧上绘制机器人状态
|
||
frame_with_status = self._draw_robot_status_on_video(frame)
|
||
|
||
# 显示视频帧
|
||
cv2.imshow(self.video_window_name, frame_with_status)
|
||
else:
|
||
# 视频播放完毕
|
||
if self.video_loop:
|
||
# 循环播放,重新定位到开头
|
||
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
||
self.logger.debug("视频循环播放")
|
||
else:
|
||
# 不循环,关闭视频
|
||
self.logger.info("视频播放完毕")
|
||
self._close_video()
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"更新视频帧失败: {e}")
|
||
self._close_video()
|
||
|
||
def _close_video(self):
|
||
"""关闭视频窗口"""
|
||
try:
|
||
if self.video_showing:
|
||
if self.video_capture is not None:
|
||
self.video_capture.release()
|
||
self.video_capture = None
|
||
|
||
cv2.destroyWindow(self.video_window_name)
|
||
self.video_showing = False
|
||
self.video_start_time = None
|
||
self.logger.info("关闭视频窗口")
|
||
except Exception as e:
|
||
self.logger.debug(f"关闭视频窗口时出错: {e}")
|
||
|
||
def _check_role_and_display_video(self):
|
||
"""检查机器人角色并显示/隐藏视频"""
|
||
current_role = self.robot_status['role_name']
|
||
|
||
# 角色发生变化时的处理
|
||
if current_role != self.last_robot_role:
|
||
self.logger.info(f"机器人角色变化: {self.last_robot_role} -> {current_role}")
|
||
self.last_robot_role = current_role
|
||
|
||
# 如果切换到展厅讲解员,显示视频
|
||
if current_role == "展厅讲解员" or current_role == "初始角色":
|
||
self._show_video()
|
||
# 如果切换到其他角色,关闭视频
|
||
elif self.video_showing:
|
||
self._close_video()
|
||
|
||
# 如果是展厅讲解员且视频应该显示但未显示,尝试显示
|
||
if (current_role == "展厅讲解员" or current_role == "初始角色") and not self.video_showing:
|
||
self._show_video()
|
||
|
||
# 如果不是展厅讲解员但视频在显示,关闭视频
|
||
if current_role != "展厅讲解员" and current_role != "初始角色" and self.video_showing:
|
||
self._close_video()
|
||
|
||
def _wrap_text_for_width(self, text: str, font: ImageFont.FreeTypeFont, max_width: int, draw: ImageDraw.ImageDraw):
|
||
lines = []
|
||
current = ''
|
||
for char in text:
|
||
tentative = current + char
|
||
try:
|
||
width = draw.textlength(tentative, font=font)
|
||
except AttributeError:
|
||
width, _ = draw.textsize(tentative, font=font)
|
||
if width <= max_width:
|
||
current = tentative
|
||
else:
|
||
if current:
|
||
lines.append(current)
|
||
current = char
|
||
if current:
|
||
lines.append(current)
|
||
return lines
|
||
|
||
def _create_gradient_background(self, size: Tuple[int, int], start_color: Tuple[int, int, int], end_color: Tuple[int, int, int]) -> Image.Image:
|
||
width, height = size
|
||
# Vertical gradient
|
||
base = Image.new('RGB', size, start_color)
|
||
top = Image.new('RGB', size, end_color)
|
||
mask = Image.new('L', size)
|
||
mask_data = np.tile(np.linspace(0, 255, height, dtype=np.uint8), (width, 1)).T
|
||
mask = Image.fromarray(mask_data, 'L')
|
||
return Image.composite(top, base, mask)
|
||
|
||
def _draw_rounded_rect(self, draw, box, radius, fill, outline=None, width=0):
|
||
# 兼容旧版本PIL,手动绘制圆角矩形
|
||
x1, y1, x2, y2 = box
|
||
|
||
# 绘制主体部分
|
||
draw.rectangle([x1+radius, y1, x2-radius, y2], fill=fill, outline=outline, width=width)
|
||
draw.rectangle([x1, y1+radius, x2, y2-radius], fill=fill, outline=outline, width=width)
|
||
|
||
# 绘制四个角的圆弧
|
||
if radius > 0:
|
||
# 左上角
|
||
draw.ellipse([x1, y1, x1+2*radius, y1+2*radius], fill=fill, outline=outline, width=width)
|
||
# 右上角
|
||
draw.ellipse([x2-2*radius, y1, x2, y1+2*radius], fill=fill, outline=outline, width=width)
|
||
# 左下角
|
||
draw.ellipse([x1, y2-2*radius, x1+2*radius, y2], fill=fill, outline=outline, width=width)
|
||
# 右下角
|
||
draw.ellipse([x2-2*radius, y2-2*radius, x2, y2], fill=fill, outline=outline, width=width)
|
||
|
||
def _draw_shadow(self, image, box, radius, offset=(0, 4), blur=10, shadow_color=(0,0,0,50)):
|
||
# Create a separate shadow layer
|
||
shadow_layer = Image.new('RGBA', image.size, (0,0,0,0))
|
||
shadow_draw = ImageDraw.Draw(shadow_layer)
|
||
|
||
sx0, sy0, sx1, sy1 = box
|
||
dx, dy = offset
|
||
|
||
shadow_box = (sx0 + dx, sy0 + dy, sx1 + dx, sy1 + dy)
|
||
shadow_draw.rounded_rectangle(shadow_box, radius=radius, fill=shadow_color)
|
||
|
||
# Blur the shadow
|
||
shadow_layer = shadow_layer.filter(ImageFilter.GaussianBlur(blur))
|
||
|
||
# Composite
|
||
image.alpha_composite(shadow_layer)
|
||
|
||
def _build_qrcode_instruction_canvas(self, qr_image: np.ndarray, canvas_size: Tuple[int, int]) -> Optional[np.ndarray]:
|
||
try:
|
||
canvas_width, canvas_height = canvas_size
|
||
if canvas_width <= 0 or canvas_height <= 0:
|
||
return None
|
||
|
||
# Colors
|
||
PRIMARY_COLOR = (23, 43, 77) # Dark Blue for headings
|
||
SECONDARY_COLOR = (94, 108, 132) # Grey for secondary text
|
||
ACCENT_COLOR = (0, 82, 204) # Bright Blue for highlights/numbers
|
||
|
||
# 1. Background
|
||
bg = self._create_gradient_background((canvas_width, canvas_height), (245, 247, 250), (223, 225, 230)).convert("RGBA")
|
||
draw = ImageDraw.Draw(bg, "RGBA")
|
||
|
||
# Layout Constants
|
||
MARGIN = 50
|
||
GUTTER = 40
|
||
|
||
# Left Panel (QR Code) - 35% width approx
|
||
left_width = int((canvas_width - 2 * MARGIN - GUTTER) * 0.35)
|
||
# Ensure minimum width for QR code
|
||
left_width = max(left_width, 400)
|
||
right_width = canvas_width - 2 * MARGIN - GUTTER - left_width
|
||
|
||
left_box = (MARGIN, MARGIN, MARGIN + left_width, canvas_height - MARGIN)
|
||
right_box = (MARGIN + left_width + GUTTER, MARGIN, canvas_width - MARGIN, canvas_height - MARGIN)
|
||
|
||
# --- Draw Left Panel ---
|
||
# Shadow
|
||
self._draw_shadow(bg, left_box, radius=30, offset=(0, 10), blur=20, shadow_color=(0,0,0,30))
|
||
# Card
|
||
self._draw_rounded_rect(draw, left_box, radius=30, fill=(255, 255, 255, 255))
|
||
|
||
# Left Content
|
||
cx = (left_box[0] + left_box[2]) // 2
|
||
cy_top = left_box[1] + 120
|
||
|
||
# Title
|
||
text = "访客登记"
|
||
try:
|
||
w = draw.textlength(text, font=self.font_qr_title)
|
||
except:
|
||
w, _ = draw.textsize(text, font=self.font_qr_title)
|
||
draw.text((cx - w/2, cy_top), text, font=self.font_qr_title, fill=PRIMARY_COLOR)
|
||
|
||
# Subtitle
|
||
text = "Visitor Registration"
|
||
try:
|
||
w = draw.textlength(text, font=self.font_qr_subtitle)
|
||
except:
|
||
w, _ = draw.textsize(text, font=self.font_qr_subtitle)
|
||
draw.text((cx - w/2, cy_top + 70), text, font=self.font_qr_subtitle, fill=SECONDARY_COLOR)
|
||
|
||
# QR Code
|
||
qr_size = min(left_width - 100, 500)
|
||
qr_y = cy_top + 180
|
||
|
||
# Resize QR
|
||
qr_pil = Image.fromarray(cv2.cvtColor(qr_image, cv2.COLOR_BGR2RGB))
|
||
qr_pil = qr_pil.resize((qr_size, qr_size), Image.LANCZOS)
|
||
bg.paste(qr_pil, (cx - qr_size//2, qr_y))
|
||
|
||
# Scan Hint
|
||
hint_y = qr_y + qr_size + 50
|
||
hint_text = "请使用微信扫码登记"
|
||
try:
|
||
w = draw.textlength(hint_text, font=self.font_qr_step_title)
|
||
except:
|
||
w, _ = draw.textsize(hint_text, font=self.font_qr_step_title)
|
||
|
||
# Draw a pill background for hint
|
||
pill_padding = 20
|
||
pill_box = (cx - w/2 - pill_padding, hint_y - pill_padding, cx + w/2 + pill_padding, hint_y + 40 + pill_padding)
|
||
self._draw_rounded_rect(draw, pill_box, radius=30, fill=(240, 242, 245), outline=None)
|
||
draw.text((cx - w/2, hint_y), hint_text, font=self.font_qr_step_title, fill=ACCENT_COLOR)
|
||
|
||
# --- Draw Right Panel ---
|
||
|
||
# Right Title Area - Compacted
|
||
rt_y = MARGIN + 20
|
||
draw.text((right_box[0], rt_y), self.qrcode_instruction_title, font=self.font_qr_title, fill=PRIMARY_COLOR)
|
||
draw.text((right_box[0], rt_y + 60), self.qrcode_instruction_subtitle, font=self.font_qr_subtitle, fill=SECONDARY_COLOR)
|
||
|
||
# Separator Line
|
||
sep_y = rt_y + 110
|
||
draw.line((right_box[0], sep_y, right_box[2], sep_y), fill=(200, 200, 200), width=2)
|
||
|
||
# Grid Configuration
|
||
grid_y_start = sep_y + 40
|
||
grid_width = right_width
|
||
cols = 2
|
||
col_gap = 30
|
||
row_gap = 30
|
||
|
||
col_width = (grid_width - (cols - 1) * col_gap) // cols
|
||
|
||
# Pre-calculate text layout to find uniform height
|
||
max_lines = 0
|
||
processed_steps = []
|
||
|
||
padding = 30
|
||
badge_size = 50
|
||
text_left_margin = badge_size + 20
|
||
|
||
# Calculate available width for text inside a card
|
||
text_max_width = col_width - padding * 2 - text_left_margin
|
||
|
||
for i, step in enumerate(self.qrcode_instruction_steps):
|
||
# Remove "第一步:" etc prefix if present to make it cleaner, we have badges
|
||
clean_step = step.split(":", 1)[-1] if ":" in step else step
|
||
|
||
lines = self._wrap_text_for_width(clean_step, self.font_qr_body, text_max_width, draw)
|
||
processed_steps.append(lines)
|
||
max_lines = max(max_lines, len(lines))
|
||
|
||
# Calculate Card Height
|
||
# Padding top + max_lines * line_height + Padding bottom
|
||
line_height = self.font_qr_body.size + 10
|
||
card_content_height = max(badge_size, max_lines * line_height)
|
||
uniform_card_height = int(padding * 2 + card_content_height)
|
||
|
||
# Draw Grid
|
||
for idx, lines in enumerate(processed_steps):
|
||
row = idx // cols
|
||
col = idx % cols
|
||
|
||
x = right_box[0] + col * (col_width + col_gap)
|
||
y = grid_y_start + row * (uniform_card_height + row_gap)
|
||
|
||
# Ensure we don't go out of bounds
|
||
if y + uniform_card_height > canvas_height:
|
||
break
|
||
|
||
card_box = (x, y, x + col_width, y + uniform_card_height)
|
||
|
||
# Card Shadow
|
||
self._draw_shadow(bg, card_box, radius=20, offset=(0, 4), blur=12, shadow_color=(0,0,0,15))
|
||
# Card Body
|
||
self._draw_rounded_rect(draw, card_box, radius=20, fill=(255, 255, 255))
|
||
|
||
# Badge (Step Number)
|
||
bx = x + padding
|
||
by = y + padding
|
||
draw.ellipse((bx, by, bx + badge_size, by + badge_size), fill=ACCENT_COLOR)
|
||
|
||
num_text = str(idx + 1)
|
||
try:
|
||
nw = draw.textlength(num_text, font=self.font_qr_badge)
|
||
except:
|
||
nw, _ = draw.textsize(num_text, font=self.font_qr_badge)
|
||
# Center number
|
||
draw.text((bx + (badge_size - nw)/2, by + (badge_size - self.font_qr_badge.size)/2 - 2),
|
||
num_text, font=self.font_qr_badge, fill=(255, 255, 255))
|
||
|
||
# Text
|
||
tx = x + padding + text_left_margin
|
||
ty = y + padding
|
||
|
||
for line in lines:
|
||
draw.text((tx, ty), line, font=self.font_qr_body, fill=PRIMARY_COLOR)
|
||
ty += line_height
|
||
|
||
return cv2.cvtColor(np.array(bg.convert("RGB")), cv2.COLOR_RGB2BGR)
|
||
except Exception as e:
|
||
self.logger.error(f"渲染二维码指引面板失败: {e}")
|
||
return None
|
||
|
||
def _show_qrcode(self):
|
||
"""显示二维码图片"""
|
||
try:
|
||
if not os.path.exists(self.qrcode_image_path):
|
||
self.logger.error(f"二维码图片不存在: {self.qrcode_image_path}")
|
||
return False
|
||
|
||
# 读取二维码图片
|
||
qr_image = cv2.imread(self.qrcode_image_path)
|
||
if qr_image is None:
|
||
self.logger.error(f"无法读取二维码图片: {self.qrcode_image_path}")
|
||
return False
|
||
|
||
# 创建二维码窗口
|
||
cv2.namedWindow(self.qrcode_window_name, cv2.WINDOW_NORMAL | cv2.WINDOW_GUI_NORMAL)
|
||
|
||
qr_height, qr_width = qr_image.shape[:2]
|
||
try:
|
||
import screeninfo
|
||
screen = screeninfo.get_monitors()[0]
|
||
# 添加额外边距确保完全覆盖屏幕
|
||
canvas_width = int(screen.width + 40)
|
||
canvas_height = int(screen.height + 40)
|
||
x_pos = -20 # 负偏移量居中
|
||
y_pos = -20
|
||
except Exception as e:
|
||
self.logger.warning(f"无法获取屏幕信息,使用默认尺寸: {e}")
|
||
canvas_width = max(qr_width * 2, 1960)
|
||
canvas_height = max(qr_height, 1120)
|
||
x_pos = -20
|
||
y_pos = -20
|
||
|
||
rendered = self._build_qrcode_instruction_canvas(qr_image, (canvas_width, canvas_height))
|
||
if rendered is None:
|
||
# 回退到原二维码展示
|
||
scale_width = canvas_width / qr_width
|
||
scale_height = canvas_height / qr_height
|
||
scale = min(scale_width, scale_height)
|
||
new_width = int(qr_width * scale)
|
||
new_height = int(qr_height * scale)
|
||
rendered = cv2.resize(qr_image, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
|
||
canvas_width, canvas_height = new_width, new_height
|
||
|
||
cv2.resizeWindow(self.qrcode_window_name, canvas_width, canvas_height)
|
||
cv2.moveWindow(self.qrcode_window_name, x_pos, y_pos)
|
||
|
||
# 设置窗口全屏显示
|
||
cv2.setWindowProperty(self.qrcode_window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
|
||
|
||
cv2.imshow(self.qrcode_window_name, rendered)
|
||
|
||
# 设置窗口置顶
|
||
cv2.setWindowProperty(self.qrcode_window_name, cv2.WND_PROP_TOPMOST, 1)
|
||
|
||
self.qrcode_showing = True
|
||
self.qrcode_display_start_time = time.time()
|
||
self.logger.info(f"显示二维码图片: {self.qrcode_image_path}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"显示二维码失败: {e}")
|
||
return False
|
||
|
||
def _close_qrcode(self):
|
||
"""关闭二维码窗口"""
|
||
try:
|
||
if self.qrcode_showing:
|
||
cv2.destroyWindow(self.qrcode_window_name)
|
||
self.qrcode_showing = False
|
||
self.qrcode_display_start_time = None
|
||
self.logger.info("关闭二维码窗口")
|
||
except Exception as e:
|
||
self.logger.debug(f"关闭二维码窗口时出错: {e}")
|
||
|
||
def _check_qrcode_timeout(self):
|
||
"""检查二维码是否应该关闭"""
|
||
if self.qrcode_showing and self.qrcode_display_start_time:
|
||
elapsed = time.time() - self.qrcode_display_start_time
|
||
if elapsed >= self.qrcode_display_duration:
|
||
self._close_qrcode()
|
||
return True
|
||
return False
|
||
|
||
def _try_reconnect_camera(self) -> bool:
|
||
"""尝试重新连接摄像头"""
|
||
current_time = time.time()
|
||
|
||
# 检查是否在冷却期内
|
||
if self.camera_last_retry_time:
|
||
elapsed = current_time - self.camera_last_retry_time
|
||
if elapsed < self.camera_retry_cooldown:
|
||
return False
|
||
|
||
self.camera_last_retry_time = current_time
|
||
self.logger.info("尝试重新连接摄像头...")
|
||
|
||
cam_config = self.config['camera']
|
||
|
||
# 释放旧的摄像头资源
|
||
if self.camera is not None:
|
||
try:
|
||
self.camera.release()
|
||
except:
|
||
pass
|
||
self.camera = None
|
||
|
||
# 尝试重新打开
|
||
self.camera = cv2.VideoCapture(cam_config['device_id'])
|
||
|
||
if self.camera.isOpened():
|
||
# 设置摄像头参数
|
||
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width'])
|
||
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height'])
|
||
self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps'])
|
||
|
||
# 验证是否能读取帧
|
||
ret, frame = self.camera.read()
|
||
if ret:
|
||
self.logger.info("✓ 摄像头重新连接成功")
|
||
self.camera_failure_count = 0
|
||
return True
|
||
else:
|
||
self.logger.warning("摄像头已打开但无法读取帧")
|
||
self.camera.release()
|
||
self.camera = None
|
||
else:
|
||
self.logger.warning("无法重新打开摄像头")
|
||
|
||
return False
|
||
|
||
def _setup_logging(self):
|
||
"""设置日志系统"""
|
||
log_config = self.config['logging']
|
||
|
||
self.logger = logging.getLogger('FaceRecognition')
|
||
self.logger.setLevel(getattr(logging, log_config['level']))
|
||
|
||
# 文件处理器
|
||
file_handler = RotatingFileHandler(
|
||
log_config['file'],
|
||
maxBytes=log_config['max_bytes'],
|
||
backupCount=log_config['backup_count']
|
||
)
|
||
|
||
# 控制台处理器
|
||
console_handler = logging.StreamHandler()
|
||
|
||
# 格式化
|
||
formatter = logging.Formatter(
|
||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
file_handler.setFormatter(formatter)
|
||
console_handler.setFormatter(formatter)
|
||
|
||
self.logger.addHandler(file_handler)
|
||
self.logger.addHandler(console_handler)
|
||
|
||
def _get_chinese_font(self) -> str:
|
||
"""获取中文字体路径"""
|
||
import platform
|
||
import os
|
||
|
||
system = platform.system()
|
||
|
||
# Windows系统
|
||
if system == "Windows":
|
||
font_paths = [
|
||
"C:/Windows/Fonts/msyh.ttc", # 微软雅黑
|
||
"C:/Windows/Fonts/simhei.ttf", # 黑体
|
||
"C:/Windows/Fonts/simsun.ttc", # 宋体
|
||
]
|
||
# macOS系统
|
||
elif system == "Darwin":
|
||
font_paths = [
|
||
"/System/Library/Fonts/PingFang.ttc", # 苹方
|
||
"/System/Library/Fonts/STHeiti Medium.ttc", # 黑体
|
||
"/Library/Fonts/Arial Unicode.ttf",
|
||
]
|
||
# Linux系统
|
||
else:
|
||
font_paths = [
|
||
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
|
||
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
|
||
]
|
||
|
||
# 查找可用的字体
|
||
for font_path in font_paths:
|
||
if os.path.exists(font_path):
|
||
self.logger.info(f"使用中文字体: {font_path}")
|
||
return font_path
|
||
|
||
# 如果没找到,尝试使用配置文件中的字体路径
|
||
if 'font_path' in self.config.get('display', {}):
|
||
custom_font = self.config['display']['font_path']
|
||
if os.path.exists(custom_font):
|
||
self.logger.info(f"使用自定义字体: {custom_font}")
|
||
return custom_font
|
||
|
||
# 默认使用一个基本字体(不支持中文,但至少不会报错)
|
||
self.logger.warning("未找到中文字体,将使用默认字体(可能无法显示中文)")
|
||
return "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
|
||
|
||
def _init_ffmpeg_stream(self):
|
||
"""初始化FFmpeg推流 - 关键修复"""
|
||
if not self.stream_enabled:
|
||
self.logger.info("流媒体推流功能未启用")
|
||
return False
|
||
|
||
# ⚠️ 关键修复:先强制清理所有旧的推流进程
|
||
self._force_cleanup_ffmpeg()
|
||
|
||
# 检查冷却期
|
||
if self.stream_last_retry_time:
|
||
elapsed = time.time() - self.stream_last_retry_time
|
||
if elapsed < self.stream_retry_cooldown:
|
||
self.logger.debug(f"推流重试冷却中,还需等待 {self.stream_retry_cooldown - elapsed:.1f} 秒")
|
||
return False
|
||
|
||
stream_config = self.config['stream']
|
||
ffmpeg_config = stream_config['ffmpeg']
|
||
stream_url = stream_config.get('rtmp_url') or stream_config.get('stream_url', '')
|
||
|
||
cam_config = self.config['camera']
|
||
panel_height = 200
|
||
total_height = cam_config['height'] + panel_height
|
||
|
||
# 判断输出格式
|
||
if stream_url.startswith('rtmp://'):
|
||
output_format = 'flv'
|
||
elif stream_url.startswith('rtsp://'):
|
||
output_format = 'rtsp'
|
||
else:
|
||
self.logger.error(f"不支持的推流协议: {stream_url}")
|
||
return False
|
||
|
||
# 构建FFmpeg命令 - 关键优化
|
||
ffmpeg_cmd = [
|
||
'ffmpeg',
|
||
'-y',
|
||
'-f', 'rawvideo',
|
||
'-pix_fmt', 'bgr24',
|
||
'-s', f"{cam_config['width']}x{total_height}",
|
||
'-r', str(ffmpeg_config['fps']),
|
||
'-i', '-',
|
||
'-c:v', ffmpeg_config['video_codec'],
|
||
'-pix_fmt', ffmpeg_config['pixel_format'],
|
||
'-preset', 'ultrafast',
|
||
'-tune', ffmpeg_config['tune'],
|
||
'-b:v', ffmpeg_config['video_bitrate'],
|
||
'-maxrate', ffmpeg_config['video_bitrate'],
|
||
'-bufsize', '1M', # 减小缓冲区避免延迟累积
|
||
'-r', str(ffmpeg_config['fps']),
|
||
'-g', str(ffmpeg_config['fps'] * 2),
|
||
'-threads', '2',
|
||
'-flush_packets', '1', # 立即刷新包
|
||
]
|
||
|
||
if not ffmpeg_config.get('audio', False):
|
||
ffmpeg_cmd.append('-an')
|
||
|
||
# RTMP特定参数 - 关键优化
|
||
if output_format == 'flv':
|
||
ffmpeg_cmd.extend([
|
||
'-f', 'flv',
|
||
'-flvflags', 'no_duration_filesize',
|
||
# 关键修复:移除或增加timeout,避免7分钟后的连接超时
|
||
# 原来的5秒太短,改为30秒或移除
|
||
# '-timeout', '30000000', # 30秒超时(微秒)
|
||
'-reconnect', '1',
|
||
'-reconnect_streamed', '1',
|
||
'-reconnect_delay_max', '5', # 增加重连延迟
|
||
'-reconnect_at_eof', '1', # EOF时重连
|
||
stream_url
|
||
])
|
||
elif output_format == 'rtsp':
|
||
ffmpeg_cmd.extend([
|
||
'-f', 'rtsp',
|
||
'-rtsp_transport', 'tcp',
|
||
stream_url
|
||
])
|
||
|
||
try:
|
||
self.stream_retry_count += 1
|
||
self.stream_last_retry_time = time.time()
|
||
|
||
self.logger.info(f"启动FFmpeg推流 (第{self.stream_retry_count}次尝试): {stream_url}")
|
||
|
||
# 关键修复:将stderr重定向到DEVNULL而不是PIPE,避免缓冲区满导致阻塞
|
||
# 或者启动线程持续读取stderr
|
||
self.ffmpeg_process = subprocess.Popen(
|
||
ffmpeg_cmd,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.PIPE, # 保留PIPE用于错误诊断
|
||
bufsize=512 * 1024 # 512KB缓冲区
|
||
)
|
||
|
||
# 启动stderr读取线程 - 关键修复
|
||
self._start_stderr_reader()
|
||
|
||
# 等待检查
|
||
time.sleep(0.5)
|
||
if self.ffmpeg_process.poll() is not None:
|
||
stderr_output = self.ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
|
||
self.logger.error(f"FFmpeg进程启动失败:")
|
||
for line in stderr_output.split('\n'):
|
||
if 'error' in line.lower() or 'failed' in line.lower():
|
||
self.logger.error(f" {line.strip()}")
|
||
self.ffmpeg_process = None
|
||
return False
|
||
|
||
self.logger.info("FFmpeg推流进程启动成功")
|
||
|
||
# 启动推流线程
|
||
self._start_stream_thread()
|
||
|
||
if self.stream_retry_count > 0:
|
||
self.logger.info(f"推流恢复成功 (之前重试了{self.stream_retry_count}次)")
|
||
self.stream_retry_count = 0
|
||
|
||
return True
|
||
|
||
except FileNotFoundError:
|
||
self.logger.error("FFmpeg未安装或不在系统PATH中")
|
||
self.stream_enabled = False
|
||
return False
|
||
except Exception as e:
|
||
self.logger.error(f"启动FFmpeg推流失败: {e}")
|
||
return False
|
||
|
||
def _stderr_reader(self):
|
||
"""读取FFmpeg的stderr输出 - 防止管道缓冲区满"""
|
||
last_log_time = time.time()
|
||
error_lines = []
|
||
|
||
while self.stderr_thread_running and self.ffmpeg_process:
|
||
try:
|
||
if self.ffmpeg_process.stderr:
|
||
line = self.ffmpeg_process.stderr.readline()
|
||
if not line:
|
||
break
|
||
|
||
line_str = line.decode('utf-8', errors='ignore').strip()
|
||
|
||
# 只记录重要的错误和警告
|
||
if any(keyword in line_str.lower() for keyword in ['error', 'failed', 'warning']):
|
||
error_lines.append(line_str)
|
||
|
||
# 每10秒汇总输出一次
|
||
current_time = time.time()
|
||
if current_time - last_log_time >= 10:
|
||
if error_lines:
|
||
self.logger.warning(f"FFmpeg stderr最近10秒: {len(error_lines)}条消息")
|
||
# 只输出前3条
|
||
for err in error_lines[:3]:
|
||
self.logger.debug(f" {err}")
|
||
error_lines.clear()
|
||
last_log_time = current_time
|
||
|
||
except Exception as e:
|
||
self.logger.debug(f"读取stderr错误: {e}")
|
||
break
|
||
|
||
self.logger.debug("stderr读取线程已停止")
|
||
|
||
def _start_stderr_reader(self):
|
||
"""启动stderr读取线程 - 关键修复:防止管道阻塞"""
|
||
if self.stderr_thread and self.stderr_thread.is_alive():
|
||
return
|
||
|
||
self.stderr_thread_running = True
|
||
self.stderr_thread = threading.Thread(target=self._stderr_reader, daemon=True)
|
||
self.stderr_thread.start()
|
||
self.logger.debug("✓ stderr读取线程已启动")
|
||
|
||
def _start_stream_thread(self):
|
||
"""启动推流线程"""
|
||
if self.stream_thread is not None and self.stream_thread.is_alive():
|
||
self.logger.debug("推流线程已在运行")
|
||
return
|
||
|
||
self.stream_thread_running = True
|
||
self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True)
|
||
self.stream_thread.start()
|
||
self.logger.info("✓ 推流线程已启动")
|
||
|
||
def _stream_worker(self):
|
||
"""推流工作线程 - 添加写入超时保护"""
|
||
dropped_frames = 0
|
||
total_frames = 0
|
||
last_log_time = time.time()
|
||
consecutive_failures = 0
|
||
write_timeout = 1.0 # 写入超时1秒
|
||
|
||
while self.stream_thread_running:
|
||
try:
|
||
# 获取帧
|
||
try:
|
||
frame = self.stream_frame_queue.get(timeout=0.1)
|
||
except queue.Empty:
|
||
continue
|
||
|
||
if frame is None: # 停止信号
|
||
break
|
||
|
||
total_frames += 1
|
||
|
||
# 检查FFmpeg进程
|
||
if self.ffmpeg_process is None or self.ffmpeg_process.poll() is not None:
|
||
dropped_frames += 1
|
||
consecutive_failures += 1
|
||
|
||
if consecutive_failures >= 30:
|
||
if consecutive_failures == 30:
|
||
self.logger.error("FFmpeg进程异常,需要重启")
|
||
self.ffmpeg_process = None
|
||
if consecutive_failures > 100:
|
||
consecutive_failures = 30
|
||
continue
|
||
|
||
try:
|
||
# 关键修复:添加写入超时机制
|
||
frame_bytes = frame.tobytes()
|
||
|
||
# 使用非阻塞写入(通过检查stdin是否可写)
|
||
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||
# 简单的写入,依赖较小的缓冲区和flush_packets
|
||
self.ffmpeg_process.stdin.write(frame_bytes)
|
||
# 关键:定期flush避免缓冲区堆积
|
||
if total_frames % 30 == 0: # 每30帧flush一次
|
||
self.ffmpeg_process.stdin.flush()
|
||
|
||
consecutive_failures = 0
|
||
else:
|
||
dropped_frames += 1
|
||
consecutive_failures += 1
|
||
|
||
except BrokenPipeError:
|
||
self.logger.warning("推流管道断开")
|
||
self.ffmpeg_process = None
|
||
dropped_frames += 1
|
||
consecutive_failures += 1
|
||
except IOError as e:
|
||
if e.errno == 32: # Broken pipe
|
||
self.logger.warning("推流管道异常")
|
||
self.ffmpeg_process = None
|
||
dropped_frames += 1
|
||
consecutive_failures += 1
|
||
except Exception as e:
|
||
self.logger.debug(f"写入帧失败: {e}")
|
||
dropped_frames += 1
|
||
consecutive_failures += 1
|
||
|
||
# 统计信息
|
||
current_time = time.time()
|
||
if current_time - last_log_time >= 10:
|
||
if total_frames > 0:
|
||
drop_rate = (dropped_frames / total_frames) * 100
|
||
process_status = "运行中" if (
|
||
self.ffmpeg_process and self.ffmpeg_process.poll() is None) else "已停止"
|
||
self.logger.info(
|
||
f"推流统计: 总帧={total_frames}, 丢帧={dropped_frames}({drop_rate:.1f}%), "
|
||
f"队列={self.stream_frame_queue.qsize()}, 状态={process_status}"
|
||
)
|
||
dropped_frames = 0
|
||
total_frames = 0
|
||
last_log_time = current_time
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"推流线程错误: {e}")
|
||
time.sleep(0.1)
|
||
|
||
self.logger.info("推流线程已停止")
|
||
|
||
def _push_frame_to_stream(self, frame: np.ndarray):
|
||
"""推送帧到流媒体服务器 - 非阻塞方式"""
|
||
if not self.stream_enabled or self.ffmpeg_process is None:
|
||
return
|
||
|
||
try:
|
||
# 非阻塞方式放入队列
|
||
try:
|
||
self.stream_frame_queue.put_nowait(frame)
|
||
except queue.Full:
|
||
# 队列满时丢弃最旧的帧
|
||
try:
|
||
self.stream_frame_queue.get_nowait()
|
||
self.stream_frame_queue.put_nowait(frame)
|
||
except:
|
||
pass # 丢帧,继续处理
|
||
|
||
except Exception as e:
|
||
self.logger.debug(f"推送帧到队列失败: {e}")
|
||
|
||
def _close_ffmpeg_stream(self):
|
||
"""关闭FFmpeg推流 - 使用强制清理"""
|
||
self.logger.info("关闭推流系统...")
|
||
self._force_cleanup_ffmpeg()
|
||
self.logger.info("推流系统已关闭")
|
||
|
||
def _init_compreface(self):
|
||
"""初始化CompreFace SDK"""
|
||
cf_config = self.config['compreface']
|
||
|
||
# 创建CompreFace实例
|
||
compre_face = CompreFace(
|
||
cf_config['host'],
|
||
cf_config['port'],
|
||
{
|
||
"limit": 0,
|
||
"det_prob_threshold": 0.8,
|
||
"prediction_count": 1
|
||
}
|
||
)
|
||
|
||
# 初始化识别和检测服务
|
||
self.recognition_service: RecognitionService = compre_face.init_face_recognition(
|
||
cf_config['recognition_api_key']
|
||
)
|
||
|
||
self.detection_service: DetectionService = compre_face.init_face_detection(
|
||
cf_config['detection_api_key']
|
||
)
|
||
|
||
self.logger.info("CompreFace服务初始化完成")
|
||
|
||
def _init_camera(self):
|
||
"""初始化摄像头"""
|
||
cam_config = self.config['camera']
|
||
retry_interval = cam_config.get('retry_interval', 3)
|
||
|
||
attempt = 0
|
||
while True:
|
||
attempt += 1
|
||
self.logger.info(f"正在尝试打开摄像头 (第{attempt}次尝试)...")
|
||
|
||
# 确保先释放之前的摄像头资源
|
||
if self.camera is not None:
|
||
try:
|
||
self.camera.release()
|
||
except:
|
||
pass
|
||
self.camera = None
|
||
|
||
self.camera = cv2.VideoCapture(cam_config['device_id'])
|
||
|
||
if self.camera.isOpened():
|
||
# 设置摄像头参数
|
||
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width'])
|
||
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height'])
|
||
self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps'])
|
||
|
||
# 验证是否能读取帧
|
||
ret, frame = self.camera.read()
|
||
if ret:
|
||
self.logger.info(f"摄像头初始化成功 (设备ID: {cam_config['device_id']})")
|
||
# 重置失败计数
|
||
self.camera_failure_count = 0
|
||
self.camera_last_retry_time = None
|
||
return True
|
||
else:
|
||
self.logger.warning("摄像头已打开但无法读取帧")
|
||
self.camera.release()
|
||
self.camera = None
|
||
else:
|
||
self.logger.warning(f"无法打开摄像头设备 {cam_config['device_id']}")
|
||
|
||
# 等待后重试
|
||
self.logger.info(f"{retry_interval}秒后重试...")
|
||
time.sleep(retry_interval)
|
||
|
||
def assess_frame_quality(self, frame: np.ndarray) -> float:
|
||
"""评估帧质量(使用Laplacian方差检测模糊度)"""
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
|
||
return laplacian_var
|
||
|
||
def detect_faces(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
|
||
"""检测人脸"""
|
||
try:
|
||
import requests
|
||
|
||
# 将帧编码为JPEG
|
||
_, img_encoded = cv2.imencode('.jpg', frame)
|
||
|
||
# 构建CompreFace REST API URL
|
||
api_url = f"{self.config['compreface']['host']}:{self.config['compreface']['port']}/api/v1/detection/detect"
|
||
|
||
# 准备请求参数
|
||
params = {
|
||
'face_plugins': 'pose' # 启用pose插件
|
||
}
|
||
|
||
# 准备请求头
|
||
headers = {
|
||
'x-api-key': self.config['compreface']['detection_api_key']
|
||
}
|
||
|
||
# 发送请求
|
||
files = {
|
||
'file': ('image.jpg', img_encoded.tobytes(), 'image/jpeg')
|
||
}
|
||
|
||
response = requests.post(api_url, headers=headers, files=files, params=params)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
|
||
if result and 'result' in result and len(result['result']) > 0:
|
||
faces = result['result']
|
||
|
||
# 过滤掉太小的人脸
|
||
min_size = self.config['face_detection']['min_face_size']
|
||
valid_faces = []
|
||
|
||
# 获取角度阈值
|
||
max_yaw = self.config['face_detection'].get('max_yaw', 30.0)
|
||
max_pitch = self.config['face_detection'].get('max_pitch', 30.0)
|
||
|
||
for face in faces:
|
||
# 检查人脸尺寸
|
||
face_width = face['box']['x_max'] - face['box']['x_min']
|
||
face_height = face['box']['y_max'] - face['box']['y_min']
|
||
|
||
if face_width >= min_size and face_height >= min_size:
|
||
# 检查人脸角度
|
||
pose = face.get('pose', {})
|
||
yaw = pose.get('yaw', 0.0)
|
||
pitch = pose.get('pitch', 0.0)
|
||
|
||
# 记录pose数据
|
||
self.logger.debug(f"检测到人脸数据: {list(face.keys())}")
|
||
if 'pose' in face:
|
||
self.logger.debug(f"Pose数据: yaw={yaw:.1f}°, pitch={pitch:.1f}°, roll={pose.get('roll', 0.0):.1f}°")
|
||
|
||
# 判断是否为正脸
|
||
if abs(yaw) <= max_yaw and abs(pitch) <= max_pitch:
|
||
valid_faces.append(face)
|
||
self.logger.debug(f"人脸角度检查通过: yaw={yaw:.1f}°, pitch={pitch:.1f}°")
|
||
else:
|
||
self.logger.debug(f"人脸角度超出范围,跳过: yaw={yaw:.1f}°(阈值±{max_yaw}°), pitch={pitch:.1f}°(阈值±{max_pitch}°)")
|
||
|
||
if valid_faces:
|
||
# 返回第一个(最大的)人脸
|
||
return valid_faces[0]
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"人脸检测错误: {e}")
|
||
return None
|
||
|
||
def recognize_face(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
|
||
"""识别人脸"""
|
||
try:
|
||
# 将帧编码为JPEG
|
||
_, img_encoded = cv2.imencode('.jpg', frame)
|
||
|
||
# 调用CompreFace识别API
|
||
result = self.recognition_service.recognize(img_encoded.tobytes())
|
||
|
||
if result and 'result' in result and len(result['result']) > 0:
|
||
faces = result['result']
|
||
if len(faces[0]['subjects']) > 0:
|
||
# 返回第一个识别结果
|
||
subject = faces[0]['subjects'][0]
|
||
return {
|
||
'subject': subject['subject'],
|
||
'similarity': subject['similarity'],
|
||
'box': faces[0]['box']
|
||
}
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"人脸识别错误: {e}")
|
||
return None
|
||
|
||
def determine_role(self, person_id: str, similarity: float) -> Tuple[str, str]:
|
||
"""根据相似度确定角色"""
|
||
role_config = self.config['role_mapping']
|
||
|
||
if similarity < role_config['stranger_threshold']:
|
||
return "未知", "陌生人"
|
||
else:
|
||
t = person_id.split("_")
|
||
name = t[0]
|
||
role = "员工" if len(t) == 1 else "访客"
|
||
return name, role
|
||
|
||
def should_recognize(self, person_id: str) -> bool:
|
||
"""检查是否应该识别(防止重复识别)"""
|
||
cooldown = self.config['face_recognition']['recognition_cooldown']
|
||
|
||
if person_id not in self.recognition_history:
|
||
return True
|
||
|
||
last_time = self.recognition_history[person_id]
|
||
elapsed = (datetime.now() - last_time).total_seconds()
|
||
|
||
return elapsed >= cooldown
|
||
|
||
def cv2_add_chinese_text(self, img: np.ndarray, text: str, position: Tuple[int, int],
|
||
font: ImageFont.FreeTypeFont, text_color: Tuple[int, int, int]) -> np.ndarray:
|
||
"""在OpenCV图像上添加中文文本"""
|
||
# 转换为PIL图像
|
||
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
|
||
draw = ImageDraw.Draw(img_pil)
|
||
|
||
# 绘制文本
|
||
draw.text(position, text, font=font, fill=text_color)
|
||
|
||
# 转换回OpenCV格式
|
||
img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
|
||
return img
|
||
|
||
def draw_info_on_frame(self, frame: np.ndarray) -> np.ndarray:
|
||
"""在帧上绘制检测和识别信息"""
|
||
display_frame = frame.copy()
|
||
h, w = display_frame.shape[:2]
|
||
|
||
# 绘制人脸框和识别结果
|
||
if self.display_info['face_detected'] and self.display_info['face_box']:
|
||
box = self.display_info['face_box']
|
||
x_min = int(box['x_min'])
|
||
y_min = int(box['y_min'])
|
||
x_max = int(box['x_max'])
|
||
y_max = int(box['y_max'])
|
||
|
||
# 根据识别状态选择颜色
|
||
if self.display_info['person_name']:
|
||
# 已识别 - 绿色
|
||
color = (0, 255, 0)
|
||
thickness = 3
|
||
else:
|
||
# 仅检测到 - 黄色
|
||
color = (0, 255, 255)
|
||
thickness = 2
|
||
|
||
# 绘制人脸框
|
||
cv2.rectangle(display_frame, (x_min, y_min), (x_max, y_max), color, thickness)
|
||
|
||
# 绘制识别信息
|
||
if self.display_info['person_name']:
|
||
name = self.display_info['person_name']
|
||
role = self.display_info['person_role']
|
||
similarity = self.display_info['similarity']
|
||
|
||
# 准备文本
|
||
text_lines = [
|
||
f"姓名: {name}",
|
||
f"角色: {role}",
|
||
f"相似度: {similarity:.2%}"
|
||
]
|
||
|
||
# 计算文本背景框
|
||
line_height = 35
|
||
padding = 10
|
||
|
||
# 计算所需的背景高度
|
||
bg_height = len(text_lines) * line_height + padding * 2
|
||
bg_y_start = max(0, y_min - bg_height - 10)
|
||
|
||
# 绘制文本背景
|
||
cv2.rectangle(
|
||
display_frame,
|
||
(x_min, bg_y_start),
|
||
(x_max, bg_y_start + bg_height),
|
||
(0, 0, 0),
|
||
-1
|
||
)
|
||
cv2.rectangle(
|
||
display_frame,
|
||
(x_min, bg_y_start),
|
||
(x_max, bg_y_start + bg_height),
|
||
color,
|
||
2
|
||
)
|
||
|
||
# 使用PIL绘制中文文本
|
||
for i, text in enumerate(text_lines):
|
||
y_pos = bg_y_start + padding + i * line_height
|
||
display_frame = self.cv2_add_chinese_text(
|
||
display_frame,
|
||
text,
|
||
(x_min + padding, y_pos),
|
||
self.font_medium,
|
||
(255, 255, 255)
|
||
)
|
||
|
||
# 绘制状态信息面板
|
||
panel_height = 200
|
||
panel_bg = np.zeros((panel_height, w, 3), dtype=np.uint8)
|
||
panel_bg[:] = (40, 40, 40)
|
||
|
||
# 状态信息
|
||
y_offset = 30
|
||
x_offset = 15
|
||
line_spacing = 30
|
||
|
||
# WebSocket连接状态
|
||
ws_status = "已连接" if self.ws_connected else "未连接"
|
||
ws_color = (0, 255, 0) if self.ws_connected else (0, 0, 255)
|
||
|
||
status_texts = [
|
||
# f"帧率: {self.display_info['fps']:.1f} FPS",
|
||
# f"帧数: {self.display_info['frame_count']}",
|
||
f"质量: {self.display_info['quality']:.1f}",
|
||
# f"检测到人脸: {'是' if self.display_info['face_detected'] else '否'}",
|
||
f"WebSocket: {ws_status}",
|
||
f"机器人说话: {'是' if self.robot_status['is_speaking'] else '否'}",
|
||
f"机器人思考: {'是' if self.robot_status['is_thinking'] else '否'}",
|
||
f"机器人角色: {self.robot_status['role_name']}"
|
||
]
|
||
|
||
# 如果人脸持续出现,显示倒计时
|
||
if self.face_present_start:
|
||
elapsed = (datetime.now() - self.face_present_start).total_seconds()
|
||
face_duration = self.config['face_detection']['face_present_duration']
|
||
remaining = max(0, face_duration - elapsed)
|
||
status_texts.append(f"识别倒计时: {remaining:.1f}秒")
|
||
|
||
# 使用PIL绘制中文状态文本
|
||
for i, text in enumerate(status_texts):
|
||
# WebSocket状态使用特殊颜色
|
||
if i == 4: # "WebSocket: " 这一行
|
||
panel_bg = self.cv2_add_chinese_text(
|
||
panel_bg,
|
||
text,
|
||
(x_offset, y_offset + i * line_spacing),
|
||
self.font_small,
|
||
ws_color
|
||
)
|
||
else:
|
||
panel_bg = self.cv2_add_chinese_text(
|
||
panel_bg,
|
||
text,
|
||
(x_offset, y_offset + i * line_spacing),
|
||
self.font_small,
|
||
(255, 255, 255)
|
||
)
|
||
|
||
# 将面板添加到画面底部
|
||
display_frame = np.vstack([display_frame, panel_bg])
|
||
|
||
return display_frame
|
||
|
||
def update_fps(self):
|
||
"""更新FPS计算"""
|
||
self.fps_counter += 1
|
||
current_time = time.time()
|
||
elapsed = current_time - self.last_fps_time
|
||
|
||
if elapsed >= 1.0:
|
||
self.display_info['fps'] = self.fps_counter / elapsed
|
||
self.fps_counter = 0
|
||
self.last_fps_time = current_time
|
||
|
||
async def send_websocket_message(self, message: Dict[str, Any]):
|
||
"""发送WebSocket消息"""
|
||
if self.ws and self.ws_connected:
|
||
try:
|
||
await self.ws.send(json.dumps(message))
|
||
self.logger.debug(f"发送消息: {message}")
|
||
except websockets.exceptions.ConnectionClosed:
|
||
self.logger.warning("WebSocket连接已关闭,无法发送消息")
|
||
self.ws_connected = False
|
||
except Exception as e:
|
||
self.logger.error(f"发送WebSocket消息失败: {e}")
|
||
self.ws_connected = False
|
||
else:
|
||
self.logger.debug("WebSocket未连接,消息发送失败")
|
||
|
||
async def query_robot_status(self):
|
||
"""定期查询机器人状态"""
|
||
interval = self.config['websocket']['status_interval']
|
||
|
||
try:
|
||
while self.ws_connected:
|
||
try:
|
||
status_msg = {
|
||
"type": "get_status",
|
||
"message": ""
|
||
}
|
||
await self.send_websocket_message(status_msg)
|
||
await asyncio.sleep(interval)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"查询状态错误: {e}")
|
||
self.ws_connected = False
|
||
break
|
||
except Exception as e:
|
||
self.logger.error(f"状态查询任务异常: {e}")
|
||
self.ws_connected = False
|
||
|
||
async def handle_websocket_messages(self):
|
||
"""处理WebSocket接收的消息"""
|
||
try:
|
||
while self.ws_connected:
|
||
try:
|
||
message = await self.ws.recv()
|
||
data = json.loads(message)
|
||
|
||
if data.get('type') == 'status':
|
||
status = data.get('message', {})
|
||
self.robot_status['is_speaking'] = status.get('is_speaking', False)
|
||
self.robot_status['is_thinking'] = status.get('is_thinking', False)
|
||
self.robot_status['listening'] = status.get('listening', False)
|
||
self.robot_status['is_asr_processing'] = status.get('is_asr_processing', False)
|
||
self.robot_status['role_name'] = status.get('role_name', '访客引导者')
|
||
|
||
self.logger.debug(f"机器人状态: {self.robot_status}")
|
||
|
||
except websockets.exceptions.ConnectionClosed:
|
||
self.logger.warning("WebSocket消息接收中断: 连接已关闭")
|
||
self.ws_connected = False
|
||
break
|
||
except Exception as e:
|
||
self.logger.error(f"处理WebSocket消息错误: {e}")
|
||
self.ws_connected = False
|
||
break
|
||
except Exception as e:
|
||
self.logger.error(f"WebSocket消息处理任务异常: {e}")
|
||
self.ws_connected = False
|
||
|
||
async def connect_websocket(self):
|
||
"""连接WebSocket - 无限重连"""
|
||
reconnect_delay = self.config['websocket']['reconnect_delay']
|
||
|
||
while True:
|
||
try:
|
||
self.ws_reconnect_count += 1
|
||
|
||
# 控制日志输出频率
|
||
if self.ws_reconnect_count <= 3:
|
||
self.logger.info(f"连接WebSocket: {self.ws_url} (第{self.ws_reconnect_count}次尝试)")
|
||
elif self.ws_reconnect_count % 10 == 0:
|
||
self.logger.info(f"持续尝试连接WebSocket (第{self.ws_reconnect_count}次)")
|
||
else:
|
||
self.logger.debug(f"尝试连接WebSocket (第{self.ws_reconnect_count}次)")
|
||
|
||
# 设置连接超时
|
||
async with websockets.connect(
|
||
self.ws_url,
|
||
ping_interval=20, # 每20秒发送ping
|
||
ping_timeout=10, # ping超时10秒
|
||
close_timeout=5 # 关闭超时5秒
|
||
) as ws:
|
||
self.ws = ws
|
||
self.ws_connected = True
|
||
self.ws_reconnect_count = 0 # 连接成功,重置计数
|
||
self.logger.info("✓ WebSocket连接成功")
|
||
|
||
# 同时运行状态查询和消息接收
|
||
try:
|
||
await asyncio.gather(
|
||
self.query_robot_status(),
|
||
self.handle_websocket_messages()
|
||
)
|
||
except Exception as e:
|
||
self.logger.warning(f"WebSocket任务组异常: {e}")
|
||
finally:
|
||
# 确保连接状态被重置
|
||
self.ws_connected = False
|
||
self.ws = None
|
||
self.logger.info("WebSocket连接已断开,准备重连...")
|
||
|
||
except websockets.exceptions.ConnectionClosed:
|
||
self.logger.warning("WebSocket连接已正常关闭")
|
||
self.ws_connected = False
|
||
self.ws = None
|
||
except ConnectionRefusedError:
|
||
if self.ws_reconnect_count <= 3:
|
||
self.logger.error(f"WebSocket连接被拒绝: {self.ws_url}")
|
||
elif self.ws_reconnect_count == 4:
|
||
self.logger.warning("WebSocket持续连接失败,将减少日志输出频率")
|
||
self.ws_connected = False
|
||
self.ws = None
|
||
except OSError as e:
|
||
if self.ws_reconnect_count <= 3:
|
||
self.logger.error(f"WebSocket网络错误: {e}")
|
||
self.ws_connected = False
|
||
self.ws = None
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
# 过滤常见的连接错误
|
||
if "no close frame" not in error_msg and "Connection closed" not in error_msg:
|
||
if self.ws_reconnect_count <= 3:
|
||
self.logger.error(f"WebSocket连接错误: {e}")
|
||
elif self.ws_reconnect_count == 4:
|
||
self.logger.warning("WebSocket持续连接失败,将减少日志输出频率")
|
||
self.ws_connected = False
|
||
self.ws = None
|
||
|
||
# 等待后重连
|
||
if self.ws_reconnect_count <= 3:
|
||
self.logger.info(f"{reconnect_delay}秒后重连...")
|
||
elif self.ws_reconnect_count % 10 == 0:
|
||
self.logger.info(f"{reconnect_delay}秒后继续尝试重连...")
|
||
await asyncio.sleep(reconnect_delay)
|
||
|
||
def can_perform_detection(self) -> bool:
|
||
"""检查是否可以进行人脸检测"""
|
||
return not self.robot_status['is_speaking'] and not self.robot_status['is_thinking'] and self.robot_status[
|
||
'role_name'] == "访客引导者"
|
||
|
||
async def process_video_stream(self):
|
||
"""处理视频流"""
|
||
self._init_camera()
|
||
|
||
# 初始化FFmpeg推流
|
||
if self.stream_enabled:
|
||
self._init_ffmpeg_stream()
|
||
|
||
frame_interval = self.config['face_detection']['frame_interval']
|
||
quality_threshold = self.config['face_detection']['quality_threshold']
|
||
face_duration = self.config['face_detection']['face_present_duration']
|
||
|
||
self.logger.info("开始处理视频流")
|
||
|
||
# 创建显示窗口并最大化
|
||
window_name = 'Hello'
|
||
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL | cv2.WINDOW_GUI_NORMAL)
|
||
|
||
# 设置窗口为全屏或最大化
|
||
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_NORMAL)
|
||
try:
|
||
import screeninfo
|
||
screen = screeninfo.get_monitors()[0]
|
||
cv2.resizeWindow(window_name, screen.width, screen.height)
|
||
cv2.moveWindow(window_name, 0, 0)
|
||
except:
|
||
cv2.resizeWindow(window_name, 1920, 1080)
|
||
self.logger.warning("无法获取屏幕分辨率,使用默认窗口大小")
|
||
|
||
# 添加推流健康检查计数器
|
||
stream_check_counter = 0
|
||
stream_check_interval = 150 # 每150帧(约5秒)检查一次
|
||
|
||
# 添加人脸消失计数器
|
||
no_face_counter = 0
|
||
no_face_threshold = 30 # 连续30帧(约1秒)没有检测到人脸才清空
|
||
|
||
try:
|
||
while True:
|
||
# 检查机器人角色并显示/隐藏视频
|
||
self._check_role_and_display_video()
|
||
|
||
# 如果视频正在播放,更新视频帧
|
||
if self.video_showing:
|
||
self._update_video()
|
||
|
||
# 检查摄像头是否可用
|
||
if self.camera is None or not self.camera.isOpened():
|
||
self.logger.warning("摄像头未打开,尝试重新连接...")
|
||
if not self._try_reconnect_camera():
|
||
await asyncio.sleep(0.1)
|
||
continue
|
||
|
||
ret, frame = self.camera.read()
|
||
|
||
if not ret:
|
||
self.camera_failure_count += 1
|
||
|
||
if self.camera_failure_count <= 3:
|
||
self.logger.warning(f"无法读取摄像头帧 (连续失败{self.camera_failure_count}次)")
|
||
elif self.camera_failure_count == 4:
|
||
self.logger.warning("摄像头持续读取失败,将减少日志输出")
|
||
|
||
if self.camera_failure_count >= self.camera_max_failures:
|
||
self.logger.error(f"摄像头连续失败{self.camera_failure_count}次,尝试重新初始化...")
|
||
if self._try_reconnect_camera():
|
||
self.logger.info("摄像头重新初始化成功,继续处理")
|
||
continue
|
||
else:
|
||
self.camera_failure_count = 0
|
||
|
||
await asyncio.sleep(0.1)
|
||
continue
|
||
|
||
# 成功读取帧,重置失败计数
|
||
if self.camera_failure_count > 0:
|
||
self.logger.info(f"摄像头恢复正常 (之前连续失败{self.camera_failure_count}次)")
|
||
self.camera_failure_count = 0
|
||
|
||
self.frame_count += 1
|
||
|
||
# 默认清空检测和识别结果
|
||
should_detect = False
|
||
|
||
# 检查是否到达检测间隔
|
||
if self.frame_count % frame_interval == 0:
|
||
if self.can_perform_detection():
|
||
quality = self.assess_frame_quality(frame)
|
||
self.display_info['quality'] = quality
|
||
|
||
if quality >= quality_threshold:
|
||
should_detect = True
|
||
else:
|
||
self.logger.debug(f"帧质量不足: {quality:.2f}")
|
||
else:
|
||
self.logger.debug("机器人正在说话或思考,跳过检测")
|
||
self.face_present_start = None
|
||
self.display_info['quality'] = self.assess_frame_quality(frame)
|
||
else:
|
||
self.display_info['quality'] = 0
|
||
|
||
# 显示二维码时不进行人脸识别。
|
||
if self.qrcode_showing:
|
||
should_detect = False
|
||
|
||
# 执行人脸检测
|
||
if should_detect:
|
||
face = self.detect_faces(frame)
|
||
|
||
if face:
|
||
# 检测到人脸,重置消失计数器
|
||
no_face_counter = 0
|
||
|
||
self.display_info['face_detected'] = True
|
||
self.display_info['face_box'] = face['box']
|
||
|
||
# 记录人脸出现时间
|
||
if self.face_present_start is None:
|
||
self.face_present_start = datetime.now()
|
||
self.logger.info("检测到人脸,开始计时")
|
||
# 清空之前的识别结果
|
||
self.display_info['person_name'] = None
|
||
self.display_info['person_role'] = None
|
||
self.display_info['similarity'] = 0
|
||
|
||
# 检查是否满足持续出现时长
|
||
elapsed = (datetime.now() - self.face_present_start).total_seconds()
|
||
|
||
if elapsed >= face_duration:
|
||
self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别")
|
||
|
||
# 进行人脸识别
|
||
recognition_result = self.recognize_face(frame)
|
||
|
||
if recognition_result:
|
||
person_id = recognition_result['subject']
|
||
similarity = recognition_result['similarity']
|
||
|
||
# 更新显示信息
|
||
self.display_info['face_box'] = recognition_result['box']
|
||
|
||
# 检查是否应该识别(防止重复)
|
||
if self.should_recognize(person_id):
|
||
name, role = self.determine_role(person_id, similarity)
|
||
|
||
# 更新显示信息
|
||
self.display_info['person_name'] = name
|
||
self.display_info['person_role'] = role
|
||
self.display_info['similarity'] = similarity
|
||
|
||
self.logger.info(
|
||
f"识别到: {name}, 相似度: {similarity:.6f}, 角色: {role}"
|
||
)
|
||
|
||
# 如果是陌生人,显示二维码
|
||
if role == "陌生人":
|
||
self._show_qrcode()
|
||
|
||
# 发送识别结果
|
||
reception_msg = {
|
||
"type": "start_reception",
|
||
"message": {
|
||
"name": name,
|
||
"role": role
|
||
}
|
||
}
|
||
await self.send_websocket_message(reception_msg)
|
||
|
||
# 记录识别时间
|
||
self.recognition_history[person_id] = datetime.now()
|
||
|
||
# 重置计时器
|
||
self.face_present_start = None
|
||
else:
|
||
# 未识别到已知人脸
|
||
if self.should_recognize("unknown"):
|
||
self.logger.info("检测到陌生人")
|
||
|
||
# 更新显示信息
|
||
self.display_info['person_name'] = "未知访客"
|
||
self.display_info['person_role'] = "陌生人"
|
||
self.display_info['similarity'] = 0
|
||
|
||
# 显示二维码
|
||
self._show_qrcode()
|
||
|
||
reception_msg = {
|
||
"type": "start_reception",
|
||
"message": {
|
||
"name": "未知访客",
|
||
"role": "陌生人"
|
||
}
|
||
}
|
||
await self.send_websocket_message(reception_msg)
|
||
|
||
self.recognition_history["unknown"] = datetime.now()
|
||
|
||
# 重置计时器
|
||
self.face_present_start = None
|
||
else:
|
||
# 没有检测到人脸
|
||
no_face_counter += 1
|
||
|
||
# 连续一段时间没检测到人脸,清空显示信息
|
||
if no_face_counter >= no_face_threshold:
|
||
if self.display_info['face_detected'] or self.display_info['person_name']:
|
||
self.logger.info("人脸消失,清空识别信息")
|
||
self.display_info['face_detected'] = False
|
||
self.display_info['face_box'] = None
|
||
self.display_info['person_name'] = None
|
||
self.display_info['person_role'] = None
|
||
self.display_info['similarity'] = 0
|
||
|
||
# 重置计时器
|
||
if self.face_present_start is not None:
|
||
self.logger.debug("人脸消失,重置计时器")
|
||
self.face_present_start = None
|
||
|
||
# 绘制信息并显示
|
||
display_frame = self.draw_info_on_frame(frame)
|
||
cv2.imshow(window_name, display_frame)
|
||
|
||
# 检查二维码显示是否超时
|
||
self._check_qrcode_timeout()
|
||
|
||
# 推送到流媒体服务器
|
||
if self.stream_enabled:
|
||
stream_check_counter += 1
|
||
|
||
# 定期健康检查
|
||
if stream_check_counter >= stream_check_interval:
|
||
stream_check_counter = 0
|
||
|
||
# 检查FFmpeg进程状态
|
||
ffmpeg_alive = self.ffmpeg_process is not None and self.ffmpeg_process.poll() is None
|
||
thread_alive = self.stream_thread is not None and self.stream_thread.is_alive()
|
||
|
||
# 如果FFmpeg进程死了或线程停了,尝试重启
|
||
if not ffmpeg_alive or not thread_alive:
|
||
current_time = time.time()
|
||
|
||
# 检查是否可以重试
|
||
can_retry = (
|
||
self.stream_retry_count < self.stream_max_retries and
|
||
(self.stream_last_retry_time is None or
|
||
current_time - self.stream_last_retry_time >= self.stream_retry_cooldown)
|
||
)
|
||
|
||
if can_retry:
|
||
self.logger.warning(
|
||
f"检测到推流异常 (FFmpeg存活={ffmpeg_alive}, 线程存活={thread_alive}),尝试重启...")
|
||
|
||
# 先清理旧的
|
||
if self.ffmpeg_process:
|
||
try:
|
||
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||
self.ffmpeg_process.stdin.close()
|
||
self.ffmpeg_process.kill()
|
||
self.ffmpeg_process.wait(timeout=2)
|
||
except:
|
||
pass
|
||
self.ffmpeg_process = None
|
||
|
||
# 停止旧线程
|
||
if self.stream_thread and self.stream_thread.is_alive():
|
||
self.stream_thread_running = False
|
||
try:
|
||
self.stream_frame_queue.put(None, timeout=1)
|
||
except:
|
||
pass
|
||
self.stream_thread.join(timeout=2)
|
||
|
||
# 清空队列
|
||
while not self.stream_frame_queue.empty():
|
||
try:
|
||
self.stream_frame_queue.get_nowait()
|
||
except:
|
||
break
|
||
|
||
# 重新初始化
|
||
success = self._init_ffmpeg_stream()
|
||
if success:
|
||
self.logger.info("✓ 推流重启成功")
|
||
else:
|
||
self.logger.error("✗ 推流重启失败")
|
||
elif self.stream_retry_count >= self.stream_max_retries:
|
||
if stream_check_counter % 600 == 0: # 每600帧(约20秒)提示一次
|
||
self.logger.warning(f"推流已达到最大重试次数({self.stream_max_retries}),已停止尝试")
|
||
|
||
# 推送帧(非阻塞)
|
||
if self.ffmpeg_process and self.ffmpeg_process.poll() is None:
|
||
self._push_frame_to_stream(display_frame)
|
||
|
||
# 检查键盘输入
|
||
key = cv2.waitKey(1) & 0xFF
|
||
if key == ord('q') or key == 27: # 'q' 或 ESC 退出
|
||
self.logger.info("用户请求退出")
|
||
break
|
||
|
||
await asyncio.sleep(0.001)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"视频流处理错误: {e}")
|
||
finally:
|
||
# 清理资源
|
||
if self.camera:
|
||
self.camera.release()
|
||
self.logger.info("摄像头已释放")
|
||
|
||
# 关闭二维码窗口
|
||
self._close_qrcode()
|
||
|
||
# 关闭视频窗口
|
||
self._close_video()
|
||
|
||
# 关闭FFmpeg推流
|
||
if self.stream_enabled:
|
||
self._close_ffmpeg_stream()
|
||
|
||
cv2.destroyAllWindows()
|
||
|
||
async def run(self):
|
||
"""运行系统"""
|
||
self.logger.info("启动人脸识别系统")
|
||
|
||
# 同时运行WebSocket连接和视频处理
|
||
await asyncio.gather(
|
||
self.connect_websocket(),
|
||
self.process_video_stream()
|
||
)
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
system = FaceRecognitionSystem("config.yaml")
|
||
|
||
try:
|
||
asyncio.run(system.run())
|
||
except KeyboardInterrupt:
|
||
print("\n系统已停止")
|
||
except Exception as e:
|
||
print(f"系统错误: {e}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |