robot_face_rec/face_rec.py

726 lines
29 KiB
Python

import cv2
import asyncio
import websockets
import json
import yaml
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
import numpy as np
from compreface import CompreFace
from compreface.service import RecognitionService, DetectionService
import time
from collections import defaultdict
from PIL import Image, ImageDraw, ImageFont
class FaceRecognitionSystem:
def __init__(self, config_path: str = "config.yaml"):
"""初始化人脸识别系统"""
# 加载配置
with open(config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
# 设置日志
self._setup_logging()
# 初始化CompreFace
self._init_compreface()
# 初始化摄像头
self.camera = None
# WebSocket连接
self.ws = None
self.ws_url = self.config['websocket']['url']
# 状态变量
self.robot_status = {
'is_speaking': False,
'is_thinking': False,
'listening': False
}
# 人脸检测状态
self.frame_count = 0
self.face_present_start = None
self.current_face_id = None
# 识别记录(防止重复识别)
self.recognition_history = {} # {person_id: last_recognition_time}
# 显示相关变量
self.current_display_frame = None
self.last_detection_result = None # 最后的检测结果
self.last_recognition_result = None # 最后的识别结果
self.display_info = {
'quality': 0,
'face_detected': False,
'face_box': None,
'person_name': None,
'person_role': None,
'similarity': 0,
'frame_count': 0,
'fps': 0
}
self.last_fps_time = time.time()
self.fps_counter = 0
# 加载中文字体
self.font_path = self._get_chinese_font()
self.font_small = ImageFont.truetype(self.font_path, 20)
self.font_medium = ImageFont.truetype(self.font_path, 24)
self.font_large = ImageFont.truetype(self.font_path, 28)
self.logger.info("人脸识别系统初始化完成")
def _setup_logging(self):
"""设置日志系统"""
log_config = self.config['logging']
self.logger = logging.getLogger('FaceRecognition')
self.logger.setLevel(getattr(logging, log_config['level']))
# 文件处理器
file_handler = RotatingFileHandler(
log_config['file'],
maxBytes=log_config['max_bytes'],
backupCount=log_config['backup_count']
)
# 控制台处理器
console_handler = logging.StreamHandler()
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def _get_chinese_font(self) -> str:
"""获取中文字体路径"""
import platform
import os
system = platform.system()
# Windows系统
if system == "Windows":
font_paths = [
"C:/Windows/Fonts/msyh.ttc", # 微软雅黑
"C:/Windows/Fonts/simhei.ttf", # 黑体
"C:/Windows/Fonts/simsun.ttc", # 宋体
]
# macOS系统
elif system == "Darwin":
font_paths = [
"/System/Library/Fonts/PingFang.ttc", # 苹方
"/System/Library/Fonts/STHeiti Medium.ttc", # 黑体
"/Library/Fonts/Arial Unicode.ttf",
]
# Linux系统
else:
font_paths = [
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
]
# 查找可用的字体
for font_path in font_paths:
if os.path.exists(font_path):
self.logger.info(f"使用中文字体: {font_path}")
return font_path
# 如果没找到,尝试使用配置文件中的字体路径
if 'font_path' in self.config.get('display', {}):
custom_font = self.config['display']['font_path']
if os.path.exists(custom_font):
self.logger.info(f"使用自定义字体: {custom_font}")
return custom_font
# 默认使用一个基本字体(不支持中文,但至少不会报错)
self.logger.warning("未找到中文字体,将使用默认字体(可能无法显示中文)")
return "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
def _init_compreface(self):
"""初始化CompreFace SDK"""
cf_config = self.config['compreface']
# 创建CompreFace实例
compre_face = CompreFace(
cf_config['host'],
cf_config['port'],
{
"limit": 0,
"det_prob_threshold": 0.8,
"prediction_count": 1
}
)
# 初始化识别和检测服务
self.recognition_service: RecognitionService = compre_face.init_face_recognition(
cf_config['recognition_api_key']
)
self.detection_service: DetectionService = compre_face.init_face_detection(
cf_config['detection_api_key']
)
self.logger.info("CompreFace服务初始化完成")
def _init_camera(self):
"""初始化摄像头"""
cam_config = self.config['camera']
retry_interval = cam_config.get('retry_interval', 3) # 默认3秒重试
attempt = 0
while True:
attempt += 1
self.logger.info(f"正在尝试打开摄像头 (第{attempt}次尝试)...")
self.camera = cv2.VideoCapture(cam_config['device_id'])
if self.camera.isOpened():
# 设置摄像头参数
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width'])
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height'])
self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps'])
# 验证是否能读取帧
ret, frame = self.camera.read()
if ret:
self.logger.info(f"摄像头初始化成功 (设备ID: {cam_config['device_id']})")
return
else:
self.logger.warning("摄像头已打开但无法读取帧")
self.camera.release()
else:
self.logger.warning(f"无法打开摄像头设备 {cam_config['device_id']}")
# 等待后重试
self.logger.info(f"{retry_interval}秒后重试...")
time.sleep(retry_interval)
def assess_frame_quality(self, frame: np.ndarray) -> float:
"""评估帧质量(使用Laplacian方差检测模糊度)"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def detect_faces(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
"""检测人脸"""
try:
# 将帧编码为JPEG
_, img_encoded = cv2.imencode('.jpg', frame)
# 调用CompreFace检测API
result = self.detection_service.detect(img_encoded.tobytes())
if result and 'result' in result and len(result['result']) > 0:
faces = result['result']
# 过滤掉太小的人脸
min_size = self.config['face_detection']['min_face_size']
valid_faces = [
face for face in faces
if face['box']['x_max'] - face['box']['x_min'] >= min_size
and face['box']['y_max'] - face['box']['y_min'] >= min_size
]
if valid_faces:
# 返回第一个(最大的)人脸
return valid_faces[0]
return None
except Exception as e:
self.logger.error(f"人脸检测错误: {e}")
return None
def recognize_face(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
"""识别人脸"""
try:
# 将帧编码为JPEG
_, img_encoded = cv2.imencode('.jpg', frame)
# 调用CompreFace识别API
result = self.recognition_service.recognize(img_encoded.tobytes())
if result and 'result' in result and len(result['result']) > 0:
faces = result['result']
if len(faces[0]['subjects']) > 0:
# 返回第一个识别结果
subject = faces[0]['subjects'][0]
return {
'subject': subject['subject'],
'similarity': subject['similarity'],
'box': faces[0]['box']
}
return None
except Exception as e:
self.logger.error(f"人脸识别错误: {e}")
return None
def determine_role(self, person_id: str, similarity: float) -> Tuple[str, str]:
"""根据相似度确定角色"""
role_config = self.config['role_mapping']
if similarity < role_config['stranger_threshold']:
return "未知", "陌生人"
else:
t = person_id.split("_")
name = t[0]
role = "员工" if len(t) == 1 else "访客"
return name, role
def should_recognize(self, person_id: str) -> bool:
"""检查是否应该识别(防止重复识别)"""
cooldown = self.config['face_recognition']['recognition_cooldown']
if person_id not in self.recognition_history:
return True
last_time = self.recognition_history[person_id]
elapsed = (datetime.now() - last_time).total_seconds()
return elapsed >= cooldown
def cv2_add_chinese_text(self, img: np.ndarray, text: str, position: Tuple[int, int],
font: ImageFont.FreeTypeFont, text_color: Tuple[int, int, int]) -> np.ndarray:
"""在OpenCV图像上添加中文文本"""
# 转换为PIL图像
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
# 绘制文本
draw.text(position, text, font=font, fill=text_color)
# 转换回OpenCV格式
img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return img
def draw_info_on_frame(self, frame: np.ndarray) -> np.ndarray:
"""在帧上绘制检测和识别信息"""
display_frame = frame.copy()
h, w = display_frame.shape[:2]
# 绘制人脸框和识别结果
if self.display_info['face_detected'] and self.display_info['face_box']:
box = self.display_info['face_box']
x_min = int(box['x_min'])
y_min = int(box['y_min'])
x_max = int(box['x_max'])
y_max = int(box['y_max'])
# 根据识别状态选择颜色
if self.display_info['person_name']:
# 已识别 - 绿色
color = (0, 255, 0)
thickness = 3
else:
# 仅检测到 - 黄色
color = (0, 255, 255)
thickness = 2
# 绘制人脸框
cv2.rectangle(display_frame, (x_min, y_min), (x_max, y_max), color, thickness)
# 绘制识别信息
if self.display_info['person_name']:
name = self.display_info['person_name']
role = self.display_info['person_role']
similarity = self.display_info['similarity']
# 准备文本
text_lines = [
f"姓名: {name}",
f"角色: {role}",
f"相似度: {similarity:.2%}"
]
# 计算文本背景框
line_height = 35
padding = 10
# 计算所需的背景高度
bg_height = len(text_lines) * line_height + padding * 2
bg_y_start = max(0, y_min - bg_height - 10)
# 绘制文本背景
cv2.rectangle(
display_frame,
(x_min, bg_y_start),
(x_max, bg_y_start + bg_height),
(0, 0, 0),
-1
)
cv2.rectangle(
display_frame,
(x_min, bg_y_start),
(x_max, bg_y_start + bg_height),
color,
2
)
# 使用PIL绘制中文文本
for i, text in enumerate(text_lines):
y_pos = bg_y_start + padding + i * line_height
display_frame = self.cv2_add_chinese_text(
display_frame,
text,
(x_min + padding, y_pos),
self.font_medium,
(255, 255, 255)
)
# 绘制状态信息面板
panel_height = 200
panel_bg = np.zeros((panel_height, w, 3), dtype=np.uint8)
panel_bg[:] = (40, 40, 40)
# 状态信息
y_offset = 30
x_offset = 15
line_spacing = 30
status_texts = [
f"帧率: {self.display_info['fps']:.1f} FPS",
f"帧数: {self.display_info['frame_count']}",
f"质量: {self.display_info['quality']:.1f}",
f"检测到人脸: {'' if self.display_info['face_detected'] else ''}",
f"机器人说话: {'' if self.robot_status['is_speaking'] else ''}",
f"机器人思考: {'' if self.robot_status['is_thinking'] else ''}",
]
# 如果人脸持续出现,显示倒计时
if self.face_present_start:
elapsed = (datetime.now() - self.face_present_start).total_seconds()
face_duration = self.config['face_detection']['face_present_duration']
remaining = max(0, face_duration - elapsed)
status_texts.append(f"识别倒计时: {remaining:.1f}")
# 使用PIL绘制中文状态文本
for i, text in enumerate(status_texts):
panel_bg = self.cv2_add_chinese_text(
panel_bg,
text,
(x_offset, y_offset + i * line_spacing),
self.font_small,
(255, 255, 255)
)
# 将面板添加到画面底部
display_frame = np.vstack([display_frame, panel_bg])
return display_frame
def update_fps(self):
"""更新FPS计算"""
self.fps_counter += 1
current_time = time.time()
elapsed = current_time - self.last_fps_time
if elapsed >= 1.0:
self.display_info['fps'] = self.fps_counter / elapsed
self.fps_counter = 0
self.last_fps_time = current_time
async def send_websocket_message(self, message: Dict[str, Any]):
"""发送WebSocket消息"""
if self.ws:
try:
await self.ws.send(json.dumps(message))
self.logger.debug(f"发送消息: {message}")
except Exception as e:
self.logger.error(f"发送WebSocket消息失败: {e}")
async def query_robot_status(self):
"""定期查询机器人状态"""
interval = self.config['websocket']['status_interval']
while True:
try:
if self.ws:
status_msg = {
"type": "get_status",
"message": ""
}
await self.send_websocket_message(status_msg)
await asyncio.sleep(interval)
except Exception as e:
self.logger.error(f"查询状态错误: {e}")
await asyncio.sleep(interval)
async def handle_websocket_messages(self):
"""处理WebSocket接收的消息"""
while True:
try:
if self.ws:
message = await self.ws.recv()
data = json.loads(message)
if data.get('type') == 'status':
status = data.get('message', {})
self.robot_status['is_speaking'] = status.get('is_speaking', False)
self.robot_status['is_thinking'] = status.get('is_thinking', False)
self.robot_status['listening'] = status.get('listening', False)
self.logger.debug(f"机器人状态: {self.robot_status}")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已关闭")
break
except Exception as e:
self.logger.error(f"处理WebSocket消息错误: {e}")
await asyncio.sleep(0.1)
async def connect_websocket(self):
"""连接WebSocket"""
reconnect_delay = self.config['websocket']['reconnect_delay']
while True:
try:
self.logger.info(f"连接WebSocket: {self.ws_url}")
async with websockets.connect(self.ws_url) as ws:
self.ws = ws
self.logger.info("WebSocket连接成功")
# 同时运行状态查询和消息接收
await asyncio.gather(
self.query_robot_status(),
self.handle_websocket_messages()
)
except Exception as e:
self.logger.error(f"WebSocket连接错误: {e}")
self.ws = None
self.logger.info(f"{reconnect_delay}秒后重连...")
await asyncio.sleep(reconnect_delay)
def can_perform_detection(self) -> bool:
"""检查是否可以进行人脸检测"""
return not self.robot_status['is_speaking'] and not self.robot_status['is_thinking']
async def process_video_stream(self):
"""处理视频流"""
self._init_camera()
frame_interval = self.config['face_detection']['frame_interval']
quality_threshold = self.config['face_detection']['quality_threshold']
face_duration = self.config['face_detection']['face_present_duration']
self.logger.info("开始处理视频流")
# 创建显示窗口并最大化
window_name = 'Face Recognition System'
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
# 设置窗口为全屏或最大化
# 方法1: 全屏模式
# cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
# 方法2: 最大化窗口(推荐)
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_NORMAL)
# 获取屏幕分辨率并设置窗口大小
try:
import screeninfo
screen = screeninfo.get_monitors()[0]
cv2.resizeWindow(window_name, screen.width, screen.height)
cv2.moveWindow(window_name, 0, 0)
except:
# 如果screeninfo不可用,尝试设置一个较大的窗口
cv2.resizeWindow(window_name, 1920, 1080)
self.logger.warning("无法获取屏幕分辨率,使用默认窗口大小")
try:
while True:
ret, frame = self.camera.read()
if not ret:
self.logger.warning("无法读取摄像头帧")
await asyncio.sleep(0.01)
continue
self.frame_count += 1
self.display_info['frame_count'] = self.frame_count
# 更新FPS
self.update_fps()
# 默认清空检测和识别结果
should_detect = False
# 检查是否到达检测间隔
if self.frame_count % frame_interval == 0:
# 检查机器人状态
if self.can_perform_detection():
# 评估帧质量
quality = self.assess_frame_quality(frame)
self.display_info['quality'] = quality
if quality >= quality_threshold:
should_detect = True
else:
self.logger.debug(f"帧质量不足: {quality:.2f}")
else:
self.logger.debug("机器人正在说话或思考,跳过检测")
self.face_present_start = None
self.display_info['quality'] = self.assess_frame_quality(frame)
else:
self.display_info['quality'] = 0
# 执行人脸检测
if should_detect:
face = self.detect_faces(frame)
if face:
self.display_info['face_detected'] = True
self.display_info['face_box'] = face['box']
# 记录人脸出现时间
if self.face_present_start is None:
self.face_present_start = datetime.now()
self.logger.info("检测到人脸,开始计时")
# 清空之前的识别结果
self.display_info['person_name'] = None
self.display_info['person_role'] = None
self.display_info['similarity'] = 0
# 检查是否满足持续出现时长
elapsed = (datetime.now() - self.face_present_start).total_seconds()
if elapsed >= face_duration:
self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别")
# 进行人脸识别
recognition_result = self.recognize_face(frame)
if recognition_result:
person_id = recognition_result['subject']
similarity = recognition_result['similarity']
# 更新显示信息
self.display_info['face_box'] = recognition_result['box']
# 检查是否应该识别(防止重复)
if self.should_recognize(person_id):
name, role = self.determine_role(person_id, similarity)
# 更新显示信息
self.display_info['person_name'] = name
self.display_info['person_role'] = role
self.display_info['similarity'] = similarity
self.logger.info(
f"识别到: {name}, 相似度: {similarity:.6f}, 角色: {role}"
)
# 发送识别结果
reception_msg = {
"type": "start_reception",
"message": {
"name": name,
"role": role
}
}
await self.send_websocket_message(reception_msg)
# 记录识别时间
self.recognition_history[person_id] = datetime.now()
else:
# 在冷却期内,继续显示上次的识别结果
pass
# 重置计时器
self.face_present_start = None
else:
# 未识别到已知人脸
if self.should_recognize("unknown"):
self.logger.info("检测到陌生人")
# 更新显示信息
self.display_info['person_name'] = "未知访客"
self.display_info['person_role'] = "陌生人"
self.display_info['similarity'] = 0
reception_msg = {
"type": "start_reception",
"message": {
"name": "未知访客",
"role": "陌生人"
}
}
await self.send_websocket_message(reception_msg)
self.recognition_history["unknown"] = datetime.now()
# 重置计时器
self.face_present_start = None
else:
# 没有检测到人脸
self.display_info['face_detected'] = False
self.display_info['face_box'] = None
# 重置计时器和识别信息
if self.face_present_start is not None:
self.logger.debug("人脸消失,重置计时器")
self.face_present_start = None
# 清空识别信息(可选,如果想保留上次识别结果可以注释掉)
# self.display_info['person_name'] = None
# self.display_info['person_role'] = None
# self.display_info['similarity'] = 0
# 绘制信息并显示
display_frame = self.draw_info_on_frame(frame)
cv2.imshow(window_name, display_frame)
# 检查键盘输入
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # 'q' 或 ESC 退出
self.logger.info("用户请求退出")
break
await asyncio.sleep(0.001)
except Exception as e:
self.logger.error(f"视频流处理错误: {e}")
finally:
if self.camera:
self.camera.release()
self.logger.info("摄像头已释放")
cv2.destroyAllWindows()
async def run(self):
"""运行系统"""
self.logger.info("启动人脸识别系统")
# 同时运行WebSocket连接和视频处理
await asyncio.gather(
self.connect_websocket(),
self.process_video_stream()
)
def main():
"""主函数"""
system = FaceRecognitionSystem("config.yaml")
try:
asyncio.run(system.run())
except KeyboardInterrupt:
print("\n系统已停止")
except Exception as e:
print(f"系统错误: {e}")
if __name__ == "__main__":
main()