添加websocket断线重连机制

This commit is contained in:
haotian 2025-10-10 16:53:50 +08:00
parent 58a7562c3f
commit b78237af5b
2 changed files with 103 additions and 96 deletions

View File

@ -36,7 +36,7 @@ face_recognition:
# 角色映射配置
role_mapping:
stranger_threshold: 0.99 # 员工识别阈值
stranger_threshold: 0.98 # 员工识别阈值
# visitor_threshold: 0.70 # 访客识别阈值
# 低于visitor_threshold视为陌生人

View File

@ -36,6 +36,8 @@ class FaceRecognitionSystem:
# WebSocket连接
self.ws = None
self.ws_url = self.config['websocket']['url']
self.ws_connected = False # WebSocket连接状态标志
self.ws_reconnect_count = 0 # 重连次数计数(用于日志控制)
# 状态变量
self.robot_status = {
@ -170,13 +172,6 @@ class FaceRecognitionSystem:
self.logger.debug(f"推流重试冷却中,还需等待 {self.stream_retry_cooldown - elapsed:.1f}")
return False
# # 检查是否超过最大重试次数
# if self.stream_retry_count >= self.stream_max_retries:
# self.logger.error(f"推流失败次数已达到上限({self.stream_max_retries}次),推流功能已禁用")
# self.logger.error("请检查ZLMediaKit是否正常运行,以及推流地址是否正确")
# self.stream_enabled = False
# return False
stream_config = self.config['stream']
ffmpeg_config = stream_config['ffmpeg']
stream_url = stream_config.get('rtmp_url') or stream_config.get('stream_url', '')
@ -203,14 +198,12 @@ class FaceRecognitionSystem:
'ffmpeg',
'-y', # 覆盖输出文件
'-f', 'rawvideo', # 输入格式
# '-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24', # OpenCV使用BGR格式
'-s', f"{cam_config['width']}x{total_height}", # 输入分辨率
'-r', str(ffmpeg_config['fps']), # 输入帧率
'-i', '-', # 从stdin读取
'-c:v', ffmpeg_config['video_codec'], # 视频编码器
'-pix_fmt', ffmpeg_config['pixel_format'], # 输出像素格式
# '-preset', ffmpeg_config['preset'], # 编码预设
'-tune', ffmpeg_config['tune'], # 编码调优
'-b:v', ffmpeg_config['video_bitrate'], # 视频码率
'-r', str(ffmpeg_config['fps']), # 输出帧率
@ -584,11 +577,16 @@ class FaceRecognitionSystem:
x_offset = 15
line_spacing = 30
# WebSocket连接状态
ws_status = "已连接" if self.ws_connected else "未连接"
ws_color = (0, 255, 0) if self.ws_connected else (0, 0, 255)
status_texts = [
f"帧率: {self.display_info['fps']:.1f} FPS",
f"帧数: {self.display_info['frame_count']}",
f"质量: {self.display_info['quality']:.1f}",
f"检测到人脸: {'' if self.display_info['face_detected'] else ''}",
f"WebSocket: {ws_status}",
f"机器人说话: {'' if self.robot_status['is_speaking'] else ''}",
f"机器人思考: {'' if self.robot_status['is_thinking'] else ''}",
]
@ -602,13 +600,23 @@ class FaceRecognitionSystem:
# 使用PIL绘制中文状态文本
for i, text in enumerate(status_texts):
panel_bg = self.cv2_add_chinese_text(
panel_bg,
text,
(x_offset, y_offset + i * line_spacing),
self.font_small,
(255, 255, 255)
)
# WebSocket状态使用特殊颜色
if i == 4: # "WebSocket: " 这一行
panel_bg = self.cv2_add_chinese_text(
panel_bg,
text,
(x_offset, y_offset + i * line_spacing),
self.font_small,
ws_color
)
else:
panel_bg = self.cv2_add_chinese_text(
panel_bg,
text,
(x_offset, y_offset + i * line_spacing),
self.font_small,
(255, 255, 255)
)
# 将面板添加到画面底部
display_frame = np.vstack([display_frame, panel_bg])
@ -628,63 +636,46 @@ class FaceRecognitionSystem:
async def send_websocket_message(self, message: Dict[str, Any]):
"""发送WebSocket消息"""
if self.ws:
if self.ws and self.ws_connected:
try:
await self.ws.send(json.dumps(message))
self.logger.debug(f"发送消息: {message}")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已关闭,无法发送消息")
self.ws = None # 标记连接已断开
self.ws_connected = False
except Exception as e:
# 避免重复记录相同的错误
error_msg = str(e)
if "no close frame received or sent" not in error_msg:
self.logger.error(f"发送WebSocket消息失败: {e}")
else:
self.logger.debug(f"WebSocket连接异常: {e}")
self.ws = None # 标记连接已断开
self.logger.error(f"发送WebSocket消息失败: {e}")
self.ws_connected = False
else:
self.logger.debug("WebSocket未连接,消息发送失败")
async def query_robot_status(self):
"""定期查询机器人状态"""
interval = self.config['websocket']['status_interval']
consecutive_errors = 0 # 连续错误计数
max_consecutive_errors = 3 # 最大连续错误次数
while True:
try:
if self.ws:
try:
while self.ws_connected:
try:
status_msg = {
"type": "get_status",
"message": ""
}
await self.send_websocket_message(status_msg)
consecutive_errors = 0 # 重置错误计数
else:
# WebSocket未连接,不需要频繁记录
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
self.logger.debug("WebSocket未连接,等待重连...")
consecutive_errors = 0 # 重置计数,避免频繁日志
await asyncio.sleep(interval)
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已关闭,停止状态查询")
self.ws = None
break
except Exception as e:
consecutive_errors += 1
if consecutive_errors < max_consecutive_errors:
await asyncio.sleep(interval)
except Exception as e:
self.logger.error(f"查询状态错误: {e}")
elif consecutive_errors == max_consecutive_errors:
self.logger.error(f"连续{max_consecutive_errors}次查询失败,将减少错误日志输出")
await asyncio.sleep(interval)
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"状态查询任务异常: {e}")
self.ws_connected = False
async def handle_websocket_messages(self):
"""处理WebSocket接收的消息"""
while True:
try:
if self.ws:
try:
while self.ws_connected:
try:
message = await self.ws.recv()
data = json.loads(message)
@ -695,22 +686,34 @@ class FaceRecognitionSystem:
self.robot_status['listening'] = status.get('listening', False)
self.logger.debug(f"机器人状态: {self.robot_status}")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已关闭")
break
except Exception as e:
self.logger.error(f"处理WebSocket消息错误: {e}")
await asyncio.sleep(0.1)
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket消息接收中断: 连接已关闭")
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"处理WebSocket消息错误: {e}")
self.ws_connected = False
break
except Exception as e:
self.logger.error(f"WebSocket消息处理任务异常: {e}")
self.ws_connected = False
async def connect_websocket(self):
"""连接WebSocket"""
"""连接WebSocket - 无限重连"""
reconnect_delay = self.config['websocket']['reconnect_delay']
connection_errors = 0 # 连续连接错误计数
while True:
try:
self.logger.info(f"连接WebSocket: {self.ws_url}")
self.ws_reconnect_count += 1
# 控制日志输出频率
if self.ws_reconnect_count <= 3:
self.logger.info(f"连接WebSocket: {self.ws_url} (第{self.ws_reconnect_count}次尝试)")
elif self.ws_reconnect_count % 10 == 0:
self.logger.info(f"持续尝试连接WebSocket (第{self.ws_reconnect_count}次)")
else:
self.logger.debug(f"尝试连接WebSocket (第{self.ws_reconnect_count}次)")
# 设置连接超时
async with websockets.connect(
@ -720,43 +723,56 @@ class FaceRecognitionSystem:
close_timeout=5 # 关闭超时5秒
) as ws:
self.ws = ws
self.logger.info("WebSocket连接成功")
connection_errors = 0 # 重置错误计数
self.ws_connected = True
self.ws_reconnect_count = 0 # 连接成功,重置计数
self.logger.info("✓ WebSocket连接成功")
# 同时运行状态查询和消息接收
await asyncio.gather(
self.query_robot_status(),
self.handle_websocket_messages()
)
try:
await asyncio.gather(
self.query_robot_status(),
self.handle_websocket_messages()
)
except Exception as e:
self.logger.warning(f"WebSocket任务组异常: {e}")
finally:
# 确保连接状态被重置
self.ws_connected = False
self.ws = None
self.logger.info("WebSocket连接已断开,准备重连...")
except websockets.exceptions.ConnectionClosed:
self.logger.warning("WebSocket连接已正常关闭")
self.ws_connected = False
self.ws = None
except ConnectionRefusedError:
connection_errors += 1
if connection_errors <= 3:
self.logger.error(f"WebSocket连接被拒绝: 无法连接到 {self.ws_url}")
elif connection_errors == 4:
self.logger.error(f"WebSocket连接持续失败,将减少错误日志输出")
else:
self.logger.debug(f"WebSocket连接失败 (第{connection_errors}次)")
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket连接被拒绝: {self.ws_url}")
elif self.ws_reconnect_count == 4:
self.logger.warning("WebSocket持续连接失败,将减少日志输出频率")
self.ws_connected = False
self.ws = None
except OSError as e:
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket网络错误: {e}")
self.ws_connected = False
self.ws = None
except Exception as e:
connection_errors += 1
error_msg = str(e)
# 过滤掉常见的连接错误信息
if "no close frame" in error_msg or "Connection closed" in error_msg:
self.logger.debug(f"WebSocket连接异常: {e}")
else:
if connection_errors <= 3:
# 过滤常见的连接错误
if "no close frame" not in error_msg and "Connection closed" not in error_msg:
if self.ws_reconnect_count <= 3:
self.logger.error(f"WebSocket连接错误: {e}")
elif connection_errors == 4:
self.logger.error(f"WebSocket连接持续失败,将减少错误日志输出")
elif self.ws_reconnect_count == 4:
self.logger.warning("WebSocket持续连接失败,将减少日志输出频率")
self.ws_connected = False
self.ws = None
# 显示重连信息
if connection_errors <= 3:
# 等待后重连
if self.ws_reconnect_count <= 3:
self.logger.info(f"{reconnect_delay}秒后重连...")
elif self.ws_reconnect_count % 10 == 0:
self.logger.info(f"{reconnect_delay}秒后继续尝试重连...")
await asyncio.sleep(reconnect_delay)
def can_perform_detection(self) -> bool:
@ -782,10 +798,6 @@ class FaceRecognitionSystem:
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
# 设置窗口为全屏或最大化
# 方法1: 全屏模式
# cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
# 方法2: 最大化窗口(推荐)
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_NORMAL)
# 获取屏幕分辨率并设置窗口大小
try:
@ -930,11 +942,6 @@ class FaceRecognitionSystem:
if self.face_present_start is not None:
self.logger.debug("人脸消失,重置计时器")
self.face_present_start = None
# 清空识别信息(可选,如果想保留上次识别结果可以注释掉)
# self.display_info['person_name'] = None
# self.display_info['person_role'] = None
# self.display_info['similarity'] = 0
# 绘制信息并显示
display_frame = self.draw_info_on_frame(frame)