初始化仓库

This commit is contained in:
haotian 2025-09-30 09:52:13 +08:00
commit 5acf3288c2
3 changed files with 557 additions and 0 deletions

86
001保存摄像头流.py Normal file
View File

@ -0,0 +1,86 @@
import cv2
import datetime
import os
def save_rtsp_to_video(rtsp_url, output_dir='output', duration_minutes=10):
"""
将RTSP流保存为视频文件
参数:
rtsp_url: RTSP流地址
output_dir: 输出目录
duration_minutes: 每个视频文件的时长(分钟)
"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 打开RTSP流
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
print("无法打开RTSP流")
return
# 获取视频的帧率和尺寸
fps = int(cap.get(cv2.CAP_PROP_FPS))
if fps <= 0:
fps = 25 # 默认帧率
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 计算最大帧数(按duration_minutes分钟分割)
max_frames = fps * 60 * duration_minutes
frame_count = 0
file_count = 1
# 创建第一个视频文件
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f"video_{timestamp}_part{file_count}.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 或者使用'avc1'
out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))
print(f"开始录制,保存到: {output_file}")
try:
while True:
ret, frame = cap.read()
if not ret:
print("无法获取帧,可能流已断开")
break
# 写入帧
out.write(frame)
frame_count += 1
# 如果达到最大帧数,创建新文件
if frame_count >= max_frames:
out.release()
file_count += 1
frame_count = 0
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(output_dir, f"video_{timestamp}_part{file_count}.mp4")
out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))
print(f"创建新文件: {output_file}")
# # 按q键退出(需要显示窗口时才有效)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
except KeyboardInterrupt:
print("用户中断录制")
finally:
cap.release()
out.release()
# cv2.destroyAllWindows()
print("录制结束")
if __name__ == "__main__":
# 示例RTSP URL - 替换为你的实际RTSP地址
rtsp_url = "rtsp://10.0.0.61/live/video6"
# 调用函数开始录制
save_rtsp_to_video(rtsp_url, duration_minutes=10)

47
config.yaml Normal file
View File

@ -0,0 +1,47 @@
# 人脸识别系统配置文件
# CompreFace API配置
compreface:
host: "http://10.0.0.202"
port: 8000
# api_key: "your_api_key_here"
recognition_api_key: "a5924457-62c9-47dc-a6e7-15462c502d2c"
detection_api_key: "070283a2-faa3-423b-9772-2cd48ecc5362"
# WebSocket配置
websocket:
url: "ws://10.0.0.61:3344"
status_interval: 0.2 # 状态查询间隔(秒)
reconnect_delay: 5 # 重连延迟(秒)
# 摄像头配置
camera:
device_id: 0 # 摄像头设备ID
width: 1280
height: 720
fps: 30
# 人脸检测配置
face_detection:
frame_interval: 10 # 检测帧间隔(每N帧检测一次)
quality_threshold: 100 # 图像质量阈值(Laplacian方差)
min_face_size: 80 # 最小人脸尺寸(像素)
face_present_duration: 2.0 # 持续出现时长(秒)才触发识别
# 人脸识别配置
face_recognition:
similarity_threshold: 0.85 # 相似度阈值(低于此值视为陌生人)
recognition_cooldown: 10.0 # 同一人识别冷却时间(秒)
# 角色映射配置
role_mapping:
employee_threshold: 0.85 # 员工识别阈值
visitor_threshold: 0.70 # 访客识别阈值
# 低于visitor_threshold视为陌生人
# 日志配置
logging:
level: "INFO" # DEBUG, INFO, WARNING, ERROR
file: "face_recognition.log"
max_bytes: 10485760 # 10MB
backup_count: 5

424
main.py Normal file
View File

@ -0,0 +1,424 @@
import cv2
import asyncio
import websockets
import json
import yaml
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import numpy as np
from compreface import CompreFace
from compreface.service import RecognitionService, DetectionService
import time
from collections import defaultdict
class FaceRecognitionSystem:
def __init__(self, config_path: str = "config.yaml"):
"""初始化人脸识别系统"""
# 加载配置
with open(config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
# 设置日志
self._setup_logging()
# 初始化CompreFace
self._init_compreface()
# 初始化摄像头
self.camera = None
# WebSocket连接
self.ws = None
self.ws_url = self.config['websocket']['url']
# 状态变量
self.robot_status = {
'is_speaking': False,
'is_thinking': False,
'listening': False
}
# 人脸检测状态
self.frame_count = 0
self.face_present_start = None
self.current_face_id = None
# 识别记录(防止重复识别)
self.recognition_history = {} # {person_id: last_recognition_time}
self.logger.info("人脸识别系统初始化完成")
def _setup_logging(self):
"""设置日志系统"""
log_config = self.config['logging']
self.logger = logging.getLogger('FaceRecognition')
self.logger.setLevel(getattr(logging, log_config['level']))
# 文件处理器
file_handler = RotatingFileHandler(
log_config['file'],
maxBytes=log_config['max_bytes'],
backupCount=log_config['backup_count']
)
# 控制台处理器
console_handler = logging.StreamHandler()
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def _init_compreface(self):
"""初始化CompreFace SDK"""
cf_config = self.config['compreface']
# 创建CompreFace实例
compre_face = CompreFace(
cf_config['host'],
cf_config['port'],
{
"limit": 0,
"det_prob_threshold": 0.8,
"prediction_count": 1
}
)
# 初始化识别和检测服务
self.recognition_service: RecognitionService = compre_face.init_face_recognition(
cf_config['recognition_api_key']
)
self.detection_service: DetectionService = compre_face.init_face_detection(
cf_config['detection_api_key']
)
self.logger.info("CompreFace服务初始化完成")
def _init_camera(self):
"""初始化摄像头"""
cam_config = self.config['camera']
self.camera = cv2.VideoCapture(cam_config['device_id'])
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, cam_config['width'])
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, cam_config['height'])
self.camera.set(cv2.CAP_PROP_FPS, cam_config['fps'])
if not self.camera.isOpened():
raise RuntimeError("无法打开摄像头")
self.logger.info("摄像头初始化完成")
def assess_frame_quality(self, frame: np.ndarray) -> float:
"""评估帧质量(使用Laplacian方差检测模糊度)"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def detect_faces(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
"""检测人脸"""
try:
# 将帧编码为JPEG
_, img_encoded = cv2.imencode('.jpg', frame)
# 调用CompreFace检测API
result = self.detection_service.detect(img_encoded.tobytes())
if result and 'result' in result and len(result['result']) > 0:
faces = result['result']
# 过滤掉太小的人脸
min_size = self.config['face_detection']['min_face_size']
valid_faces = [
face for face in faces
if face['box']['x_max'] - face['box']['x_min'] >= min_size
and face['box']['y_max'] - face['box']['y_min'] >= min_size
]
if valid_faces:
# 返回第一个(最大的)人脸
return valid_faces[0]
return None
except Exception as e:
self.logger.error(f"人脸检测错误: {e}")
return None
def recognize_face(self, frame: np.ndarray) -> Optional[Dict[str, Any]]:
"""识别人脸"""
try:
# 将帧编码为JPEG
_, img_encoded = cv2.imencode('.jpg', frame)
# 调用CompreFace识别API
result = self.recognition_service.recognize(img_encoded.tobytes())
if result and 'result' in result and len(result['result']) > 0:
faces = result['result']
if len(faces[0]['subjects']) > 0:
# 返回第一个识别结果
subject = faces[0]['subjects'][0]
return {
'subject': subject['subject'],
'similarity': subject['similarity']
}
return None
except Exception as e:
self.logger.error(f"人脸识别错误: {e}")
return None
def determine_role(self, similarity: float) -> str:
"""根据相似度确定角色"""
role_config = self.config['role_mapping']
if similarity >= role_config['employee_threshold']:
return "员工"
elif similarity >= role_config['visitor_threshold']:
return "访客"
else:
return "陌生人"
def should_recognize(self, person_id: str) -> bool:
"""检查是否应该识别(防止重复识别)"""
cooldown = self.config['face_recognition']['recognition_cooldown']
if person_id not in self.recognition_history:
return True
last_time = self.recognition_history[person_id]
elapsed = (datetime.now() - last_time).total_seconds()
return elapsed >= cooldown
async def send_websocket_message(self, message: Dict[str, Any]):
"""发送WebSocket消息"""
if self.ws:
try:
await self.ws.send(json.dumps(message))
self.logger.debug(f"发送消息: {message}")
except Exception as e:
self.logger.error(f"发送WebSocket消息失败: {e}")
async def query_robot_status(self):
"""定期查询机器人状态"""
interval = self.config['websocket']['status_interval']
while True:
try:
if self.ws:
status_msg = {
"type": "get_status",
"message": ""
}
await self.send_websocket_message(status_msg)
await asyncio.sleep(interval)
except Exception as e:
self.logger.error(f"查询状态错误: {e}")
await asyncio.sleep(interval)
async def handle_websocket_messages(self):
"""处理WebSocket接收的消息"""
while True:
try:
if self.ws:
message = await self.ws.recv()
data = json.loads(message)
if data.get('type') == 'status':
status = data.get('message', {})
self.robot_status['is_speaking'] = status.get('is_speaking', False)
self.robot_status['is_thinking'] = status.get('is_thinking', False)
self.robot_status['listening'] = status.get('listening', False)
self.logger.debug(f"机器人状态: {self.robot_status}")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已关闭")
break
except Exception as e:
self.logger.error(f"处理WebSocket消息错误: {e}")
await asyncio.sleep(0.1)
async def connect_websocket(self):
"""连接WebSocket"""
reconnect_delay = self.config['websocket']['reconnect_delay']
while True:
try:
self.logger.info(f"连接WebSocket: {self.ws_url}")
async with websockets.connect(self.ws_url) as ws:
self.ws = ws
self.logger.info("WebSocket连接成功")
# 同时运行状态查询和消息接收
await asyncio.gather(
self.query_robot_status(),
self.handle_websocket_messages()
)
except Exception as e:
self.logger.error(f"WebSocket连接错误: {e}")
self.ws = None
self.logger.info(f"{reconnect_delay}秒后重连...")
await asyncio.sleep(reconnect_delay)
def can_perform_detection(self) -> bool:
"""检查是否可以进行人脸检测"""
return not self.robot_status['is_speaking'] and not self.robot_status['is_thinking']
async def process_video_stream(self):
"""处理视频流"""
self._init_camera()
frame_interval = self.config['face_detection']['frame_interval']
quality_threshold = self.config['face_detection']['quality_threshold']
face_duration = self.config['face_detection']['face_present_duration']
self.logger.info("开始处理视频流")
try:
while True:
ret, frame = self.camera.read()
if not ret:
self.logger.warning("无法读取摄像头帧")
await asyncio.sleep(0.01)
continue
self.frame_count += 1
# 检查是否到达检测间隔
if self.frame_count % frame_interval != 0:
await asyncio.sleep(0.01)
continue
# 检查机器人状态
if not self.can_perform_detection():
self.logger.debug("机器人正在说话或思考,跳过检测")
self.face_present_start = None
await asyncio.sleep(0.01)
continue
# 评估帧质量
quality = self.assess_frame_quality(frame)
if quality < quality_threshold:
self.logger.debug(f"帧质量不足: {quality:.2f}")
await asyncio.sleep(0.01)
continue
# 检测人脸
face = self.detect_faces(frame)
if face:
# 记录人脸出现时间
if self.face_present_start is None:
self.face_present_start = datetime.now()
self.logger.info("检测到人脸,开始计时")
# 检查是否满足持续出现时长
elapsed = (datetime.now() - self.face_present_start).total_seconds()
if elapsed >= face_duration:
self.logger.info(f"人脸持续出现{elapsed:.2f}秒,开始识别")
# 进行人脸识别
recognition_result = self.recognize_face(frame)
if recognition_result:
person_id = recognition_result['subject']
similarity = recognition_result['similarity']
# 检查是否应该识别(防止重复)
if self.should_recognize(person_id):
role = self.determine_role(similarity)
self.logger.info(
f"识别到: {person_id}, 相似度: {similarity:.2f}, 角色: {role}"
)
# 发送识别结果
reception_msg = {
"type": "start_reception",
"message": {
"name": person_id,
"role": role
}
}
await self.send_websocket_message(reception_msg)
# 记录识别时间
self.recognition_history[person_id] = datetime.now()
# 重置计时器
self.face_present_start = None
else:
# 未识别到已知人脸
if self.should_recognize("unknown"):
self.logger.info("检测到陌生人")
reception_msg = {
"type": "start_reception",
"message": {
"name": "未知访客",
"role": "陌生人"
}
}
await self.send_websocket_message(reception_msg)
self.recognition_history["unknown"] = datetime.now()
# 重置计时器
self.face_present_start = None
else:
# 没有检测到人脸,重置计时器
if self.face_present_start is not None:
self.logger.debug("人脸消失,重置计时器")
self.face_present_start = None
await asyncio.sleep(0.01)
except Exception as e:
self.logger.error(f"视频流处理错误: {e}")
finally:
if self.camera:
self.camera.release()
self.logger.info("摄像头已释放")
async def run(self):
"""运行系统"""
self.logger.info("启动人脸识别系统")
# 同时运行WebSocket连接和视频处理
await asyncio.gather(
self.connect_websocket(),
self.process_video_stream()
)
def main():
"""主函数"""
system = FaceRecognitionSystem("config.yaml")
try:
asyncio.run(system.run())
except KeyboardInterrupt:
print("\n系统已停止")
except Exception as e:
print(f"系统错误: {e}")
if __name__ == "__main__":
main()