832 lines
32 KiB
Python
832 lines
32 KiB
Python
"""
|
||
消息路由系统模块
|
||
负责处理客户端和服务器之间的消息传递
|
||
"""
|
||
|
||
import time
|
||
import threading
|
||
import json
|
||
import uuid
|
||
from typing import Dict, Any, List, Optional, Callable
|
||
|
||
class MessageRouter:
|
||
"""
|
||
消息路由器
|
||
负责处理客户端和服务器之间的消息传递
|
||
"""
|
||
|
||
def __init__(self, plugin):
|
||
"""
|
||
初始化消息路由器
|
||
|
||
Args:
|
||
plugin: 服务器架构插件实例
|
||
"""
|
||
self.plugin = plugin
|
||
self.enabled = False
|
||
self.initialized = False
|
||
|
||
# 消息配置
|
||
self.message_config = {
|
||
"enable_compression": True,
|
||
"enable_encryption": False,
|
||
"max_message_size": 65536,
|
||
"enable_message_queue": True,
|
||
"max_queue_size": 10000,
|
||
"enable_broadcast": True,
|
||
"enable_message_filtering": True,
|
||
"enable_message_logging": True,
|
||
"log_level": "INFO"
|
||
}
|
||
|
||
# 消息处理器映射
|
||
self.message_handlers = {}
|
||
|
||
# 消息队列
|
||
self.message_queue = []
|
||
self.message_queue_lock = threading.RLock()
|
||
|
||
# 消息统计
|
||
self.message_stats = {
|
||
"messages_received": 0,
|
||
"messages_sent": 0,
|
||
"messages_routed": 0,
|
||
"messages_dropped": 0,
|
||
"broadcast_messages": 0,
|
||
"handler_errors": 0,
|
||
"bytes_received": 0,
|
||
"bytes_sent": 0,
|
||
"errors": 0
|
||
}
|
||
|
||
# 回调函数
|
||
self.message_callbacks = {
|
||
"message_received": [],
|
||
"message_sent": [],
|
||
"message_routed": [],
|
||
"message_error": [],
|
||
"broadcast_sent": []
|
||
}
|
||
|
||
# 时间戳记录
|
||
self.last_message_time = 0.0
|
||
self.last_queue_process = 0.0
|
||
|
||
print("✓ 消息路由器已创建")
|
||
|
||
def initialize(self) -> bool:
|
||
"""
|
||
初始化消息路由器
|
||
|
||
Returns:
|
||
是否初始化成功
|
||
"""
|
||
try:
|
||
print("正在初始化消息路由器...")
|
||
|
||
# 注册默认消息处理器
|
||
self._register_default_handlers()
|
||
|
||
self.initialized = True
|
||
print("✓ 消息路由器初始化完成")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息路由器初始化失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def _register_default_handlers(self):
|
||
"""注册默认消息处理器"""
|
||
try:
|
||
self.register_message_handler("ping", self._handle_ping)
|
||
self.register_message_handler("pong", self._handle_pong)
|
||
self.register_message_handler("heartbeat", self._handle_heartbeat)
|
||
self.register_message_handler("auth", self._handle_auth)
|
||
self.register_message_handler("chat", self._handle_chat)
|
||
self.register_message_handler("join_room", self._handle_join_room)
|
||
self.register_message_handler("leave_room", self._handle_leave_room)
|
||
self.register_message_handler("sync_data", self._handle_sync_data)
|
||
|
||
print("✓ 默认消息处理器已注册")
|
||
except Exception as e:
|
||
print(f"✗ 默认消息处理器注册失败: {e}")
|
||
|
||
def enable(self) -> bool:
|
||
"""
|
||
启用消息路由器
|
||
|
||
Returns:
|
||
是否启用成功
|
||
"""
|
||
try:
|
||
if not self.initialized:
|
||
print("✗ 消息路由器未初始化")
|
||
return False
|
||
|
||
self.enabled = True
|
||
print("✓ 消息路由器已启用")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息路由器启用失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def disable(self):
|
||
"""禁用消息路由器"""
|
||
try:
|
||
self.enabled = False
|
||
|
||
# 清空消息队列
|
||
with self.message_queue_lock:
|
||
self.message_queue.clear()
|
||
|
||
print("✓ 消息路由器已禁用")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息路由器禁用失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def finalize(self):
|
||
"""清理消息路由器资源"""
|
||
try:
|
||
# 禁用路由器
|
||
if self.enabled:
|
||
self.disable()
|
||
|
||
# 清理回调和处理器
|
||
self.message_callbacks.clear()
|
||
self.message_handlers.clear()
|
||
|
||
self.initialized = False
|
||
print("✓ 消息路由器资源已清理")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息路由器资源清理失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def update(self, dt: float):
|
||
"""
|
||
更新消息路由器状态
|
||
|
||
Args:
|
||
dt: 时间增量(秒)
|
||
"""
|
||
try:
|
||
if not self.enabled:
|
||
return
|
||
|
||
current_time = time.time()
|
||
self.last_message_time = current_time
|
||
|
||
# 处理消息队列
|
||
if self.message_config["enable_message_queue"]:
|
||
self._process_message_queue()
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息路由器更新失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def _process_message_queue(self):
|
||
"""处理消息队列"""
|
||
try:
|
||
current_time = time.time()
|
||
if current_time - self.last_queue_process < 0.01: # 限制处理频率
|
||
return
|
||
|
||
processed_messages = 0
|
||
max_messages_per_update = 100 # 每次更新最多处理100条消息
|
||
|
||
with self.message_queue_lock:
|
||
while self.message_queue and processed_messages < max_messages_per_update:
|
||
try:
|
||
message_data = self.message_queue.pop(0)
|
||
self._route_message_internal(
|
||
message_data["message"],
|
||
message_data["client_address"],
|
||
message_data["client_socket"],
|
||
message_data["client_id"]
|
||
)
|
||
processed_messages += 1
|
||
except Exception as e:
|
||
print(f"✗ 队列消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
processed_messages += 1 # 避免无限循环
|
||
|
||
self.last_queue_process = current_time
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息队列处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def route_message(self, message: bytes, client_address: tuple, client_socket, client_id: str = None):
|
||
"""
|
||
路由消息
|
||
|
||
Args:
|
||
message: 消息数据
|
||
client_address: 客户端地址
|
||
client_socket: 客户端套接字
|
||
client_id: 客户端ID(可选)
|
||
"""
|
||
try:
|
||
if not self.enabled:
|
||
return
|
||
|
||
self.message_stats["messages_received"] += 1
|
||
self.message_stats["bytes_received"] += len(message)
|
||
|
||
# 触发消息接收回调
|
||
self._trigger_message_callback("message_received", {
|
||
"message_size": len(message),
|
||
"client_address": client_address,
|
||
"client_id": client_id,
|
||
"timestamp": time.time()
|
||
})
|
||
|
||
# 检查消息大小
|
||
if len(message) > self.message_config["max_message_size"]:
|
||
print(f"✗ 消息过大,已丢弃: {len(message)} bytes")
|
||
self.message_stats["messages_dropped"] += 1
|
||
return
|
||
|
||
# 如果启用队列,将消息添加到队列中
|
||
if self.message_config["enable_message_queue"]:
|
||
with self.message_queue_lock:
|
||
if len(self.message_queue) < self.message_config["max_queue_size"]:
|
||
self.message_queue.append({
|
||
"message": message,
|
||
"client_address": client_address,
|
||
"client_socket": client_socket,
|
||
"client_id": client_id
|
||
})
|
||
else:
|
||
print("✗ 消息队列已满,丢弃消息")
|
||
self.message_stats["messages_dropped"] += 1
|
||
else:
|
||
# 直接处理消息
|
||
self._route_message_internal(message, client_address, client_socket, client_id)
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息路由失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def _route_message_internal(self, message: bytes, client_address: tuple, client_socket, client_id: str = None):
|
||
"""
|
||
内部消息路由处理
|
||
|
||
Args:
|
||
message: 消息数据
|
||
client_address: 客户端地址
|
||
client_socket: 客户端套接字
|
||
client_id: 客户端ID(可选)
|
||
"""
|
||
try:
|
||
# 解析消息
|
||
try:
|
||
message_str = message.decode('utf-8')
|
||
message_data = json.loads(message_str)
|
||
except Exception as e:
|
||
print(f"✗ 消息解析失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
return
|
||
|
||
# 获取消息类型
|
||
message_type = message_data.get("type", "unknown")
|
||
|
||
# 检查消息过滤
|
||
if self.message_config["enable_message_filtering"]:
|
||
if not self._is_message_allowed(message_type, client_id):
|
||
print(f"✗ 消息被过滤: {message_type}")
|
||
return
|
||
|
||
# 查找消息处理器
|
||
handler = self.message_handlers.get(message_type)
|
||
if handler:
|
||
try:
|
||
# 调用消息处理器
|
||
handler(message_data, client_address, client_socket, client_id)
|
||
self.message_stats["messages_routed"] += 1
|
||
|
||
# 触发消息路由回调
|
||
self._trigger_message_callback("message_routed", {
|
||
"message_type": message_type,
|
||
"client_address": client_address,
|
||
"client_id": client_id,
|
||
"timestamp": time.time()
|
||
})
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息处理器执行失败: {message_type} - {e}")
|
||
self.message_stats["handler_errors"] += 1
|
||
|
||
# 触发消息错误回调
|
||
self._trigger_message_callback("message_error", {
|
||
"message_type": message_type,
|
||
"error": str(e),
|
||
"client_address": client_address,
|
||
"client_id": client_id,
|
||
"timestamp": time.time()
|
||
})
|
||
else:
|
||
print(f"✗ 未找到消息处理器: {message_type}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
except Exception as e:
|
||
print(f"✗ 内部消息路由处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def _is_message_allowed(self, message_type: str, client_id: str = None) -> bool:
|
||
"""
|
||
检查消息是否被允许
|
||
|
||
Args:
|
||
message_type: 消息类型
|
||
client_id: 客户端ID(可选)
|
||
|
||
Returns:
|
||
消息是否被允许
|
||
"""
|
||
try:
|
||
# 检查客户端是否已认证(某些消息类型不需要认证)
|
||
if message_type not in ["auth", "ping", "pong"]:
|
||
if not client_id or not self.plugin.client_manager.is_client_authenticated(client_id):
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息允许检查失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
return False
|
||
|
||
def register_message_handler(self, message_type: str, handler: Callable) -> bool:
|
||
"""
|
||
注册消息处理器
|
||
|
||
Args:
|
||
message_type: 消息类型
|
||
handler: 处理器函数
|
||
|
||
Returns:
|
||
是否注册成功
|
||
"""
|
||
try:
|
||
self.message_handlers[message_type] = handler
|
||
print(f"✓ 消息处理器已注册: {message_type}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"✗ 消息处理器注册失败: {e}")
|
||
return False
|
||
|
||
def unregister_message_handler(self, message_type: str) -> bool:
|
||
"""
|
||
注销消息处理器
|
||
|
||
Args:
|
||
message_type: 消息类型
|
||
|
||
Returns:
|
||
是否注销成功
|
||
"""
|
||
try:
|
||
if message_type in self.message_handlers:
|
||
del self.message_handlers[message_type]
|
||
print(f"✓ 消息处理器已注销: {message_type}")
|
||
return True
|
||
else:
|
||
print(f"✗ 消息处理器不存在: {message_type}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"✗ 消息处理器注销失败: {e}")
|
||
return False
|
||
|
||
def send_message(self, message_data: Dict[str, Any], client_socket, client_id: str = None) -> bool:
|
||
"""
|
||
发送消息到客户端
|
||
|
||
Args:
|
||
message_data: 消息数据
|
||
client_socket: 客户端套接字
|
||
client_id: 客户端ID(可选)
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
try:
|
||
if not self.enabled:
|
||
return False
|
||
|
||
# 序列化消息
|
||
try:
|
||
message_str = json.dumps(message_data, ensure_ascii=False)
|
||
message_bytes = message_str.encode('utf-8')
|
||
except Exception as e:
|
||
print(f"✗ 消息序列化失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
return False
|
||
|
||
# 检查消息大小
|
||
if len(message_bytes) > self.message_config["max_message_size"]:
|
||
print(f"✗ 消息过大,无法发送: {len(message_bytes)} bytes")
|
||
self.message_stats["messages_dropped"] += 1
|
||
return False
|
||
|
||
# 发送消息
|
||
try:
|
||
client_socket.send(message_bytes)
|
||
except Exception as e:
|
||
print(f"✗ 消息发送失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
return False
|
||
|
||
self.message_stats["messages_sent"] += 1
|
||
self.message_stats["bytes_sent"] += len(message_bytes)
|
||
|
||
# 更新客户端统计
|
||
if client_id:
|
||
client_data = self.plugin.client_manager.get_client(client_id)
|
||
if client_data:
|
||
client_data["messages_sent"] += 1
|
||
client_data["bytes_sent"] += len(message_bytes)
|
||
|
||
# 触发消息发送回调
|
||
self._trigger_message_callback("message_sent", {
|
||
"message_type": message_data.get("type", "unknown"),
|
||
"message_size": len(message_bytes),
|
||
"client_id": client_id,
|
||
"timestamp": time.time()
|
||
})
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 发送消息失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
return False
|
||
|
||
def broadcast_message(self, message_data: Dict[str, Any], exclude_client_id: str = None) -> bool:
|
||
"""
|
||
广播消息到所有客户端
|
||
|
||
Args:
|
||
message_data: 消息数据
|
||
exclude_client_id: 要排除的客户端ID(可选)
|
||
|
||
Returns:
|
||
是否广播成功
|
||
"""
|
||
try:
|
||
if not self.enabled or not self.message_config["enable_broadcast"]:
|
||
return False
|
||
|
||
# 序列化消息
|
||
try:
|
||
message_str = json.dumps(message_data, ensure_ascii=False)
|
||
message_bytes = message_str.encode('utf-8')
|
||
except Exception as e:
|
||
print(f"✗ 广播消息序列化失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
return False
|
||
|
||
# 检查消息大小
|
||
if len(message_bytes) > self.message_config["max_message_size"]:
|
||
print(f"✗ 广播消息过大,无法发送: {len(message_bytes)} bytes")
|
||
self.message_stats["messages_dropped"] += 1
|
||
return False
|
||
|
||
# 获取所有客户端
|
||
clients = self.plugin.client_manager.get_all_clients()
|
||
success_count = 0
|
||
|
||
# 发送消息到每个客户端
|
||
for client_id, client_data in clients.items():
|
||
# 跳过排除的客户端
|
||
if client_id == exclude_client_id:
|
||
continue
|
||
|
||
try:
|
||
client_data["socket"].send(message_bytes)
|
||
client_data["messages_sent"] += 1
|
||
client_data["bytes_sent"] += len(message_bytes)
|
||
success_count += 1
|
||
except Exception as e:
|
||
print(f"✗ 广播消息发送失败: {client_id} - {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
self.message_stats["broadcast_messages"] += 1
|
||
self.message_stats["messages_sent"] += success_count
|
||
self.message_stats["bytes_sent"] += len(message_bytes) * success_count
|
||
|
||
# 触发广播发送回调
|
||
self._trigger_message_callback("broadcast_sent", {
|
||
"message_type": message_data.get("type", "unknown"),
|
||
"message_size": len(message_bytes),
|
||
"recipient_count": success_count,
|
||
"timestamp": time.time()
|
||
})
|
||
|
||
return success_count > 0
|
||
|
||
except Exception as e:
|
||
print(f"✗ 广播消息失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
return False
|
||
|
||
# 默认消息处理器
|
||
def _handle_ping(self, message_data: Dict[str, Any], client_address: tuple, client_socket, client_id: str = None):
|
||
"""处理ping消息"""
|
||
try:
|
||
# 回复pong消息
|
||
pong_message = {
|
||
"type": "pong",
|
||
"timestamp": time.time(),
|
||
"ping_id": message_data.get("ping_id", str(uuid.uuid4()))
|
||
}
|
||
|
||
self.send_message(pong_message, client_socket, client_id)
|
||
|
||
# 更新客户端心跳
|
||
if client_id:
|
||
self.plugin.client_manager.update_client_heartbeat(client_id)
|
||
|
||
except Exception as e:
|
||
print(f"✗ ping消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def _handle_pong(self, message_data: Dict[str, Any], client_address: tuple, client_socket, client_id: str = None):
|
||
"""处理pong消息"""
|
||
try:
|
||
# 可以在这里处理延迟计算等逻辑
|
||
pass
|
||
except Exception as e:
|
||
print(f"✗ pong消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def _handle_heartbeat(self, message_data: Dict[str, Any], client_address: tuple, client_socket, client_id: str = None):
|
||
"""处理心跳消息"""
|
||
try:
|
||
# 更新客户端心跳
|
||
if client_id:
|
||
self.plugin.client_manager.update_client_heartbeat(client_id)
|
||
except Exception as e:
|
||
print(f"✗ 心跳消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def _handle_auth(self, message_data: Dict[str, Any], client_address: tuple, client_socket, client_id: str = None):
|
||
"""处理认证消息"""
|
||
try:
|
||
if not client_id:
|
||
print("✗ 认证消息缺少客户端ID")
|
||
return
|
||
|
||
# 使用客户端管理器进行认证
|
||
if self.plugin.client_manager:
|
||
auth_success = self.plugin.client_manager.authenticate_client(client_id, message_data)
|
||
if auth_success:
|
||
# 发送认证成功消息
|
||
success_message = {
|
||
"type": "auth_result",
|
||
"success": True,
|
||
"message": "认证成功"
|
||
}
|
||
self.send_message(success_message, client_socket, client_id)
|
||
else:
|
||
# 发送认证失败消息
|
||
fail_message = {
|
||
"type": "auth_result",
|
||
"success": False,
|
||
"message": "认证失败"
|
||
}
|
||
self.send_message(fail_message, client_socket, client_id)
|
||
except Exception as e:
|
||
print(f"✗ 认证消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def _handle_chat(self, message_data: Dict[str, Any], client_address: tuple, client_socket, client_id: str = None):
|
||
"""处理聊天消息"""
|
||
try:
|
||
# 广播聊天消息到所有客户端
|
||
chat_message = {
|
||
"type": "chat",
|
||
"sender": message_data.get("sender", "unknown"),
|
||
"message": message_data.get("message", ""),
|
||
"timestamp": time.time()
|
||
}
|
||
|
||
self.broadcast_message(chat_message, exclude_client_id=client_id)
|
||
except Exception as e:
|
||
print(f"✗ 聊天消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def _handle_join_room(self, message_data: Dict[str, Any], client_address: tuple, client_socket, client_id: str = None):
|
||
"""处理加入房间消息"""
|
||
try:
|
||
room_id = message_data.get("room_id")
|
||
if not room_id:
|
||
print("✗ 加入房间消息缺少房间ID")
|
||
return
|
||
|
||
# 使用房间管理器处理加入房间请求
|
||
if self.plugin.room_manager:
|
||
join_success = self.plugin.room_manager.add_client_to_room(room_id, client_id)
|
||
if join_success:
|
||
# 发送加入成功消息
|
||
success_message = {
|
||
"type": "join_room_result",
|
||
"success": True,
|
||
"room_id": room_id,
|
||
"message": "成功加入房间"
|
||
}
|
||
self.send_message(success_message, client_socket, client_id)
|
||
|
||
# 通知房间内其他客户端
|
||
notification = {
|
||
"type": "client_joined_room",
|
||
"client_id": client_id,
|
||
"room_id": room_id
|
||
}
|
||
self.plugin.room_manager.broadcast_to_room(room_id, notification, exclude_client_id=client_id)
|
||
else:
|
||
# 发送加入失败消息
|
||
fail_message = {
|
||
"type": "join_room_result",
|
||
"success": False,
|
||
"room_id": room_id,
|
||
"message": "加入房间失败"
|
||
}
|
||
self.send_message(fail_message, client_socket, client_id)
|
||
except Exception as e:
|
||
print(f"✗ 加入房间消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def _handle_leave_room(self, message_data: Dict[str, Any], client_address: tuple, client_socket, client_id: str = None):
|
||
"""处理离开房间消息"""
|
||
try:
|
||
room_id = message_data.get("room_id")
|
||
if not room_id:
|
||
print("✗ 离开房间消息缺少房间ID")
|
||
return
|
||
|
||
# 使用房间管理器处理离开房间请求
|
||
if self.plugin.room_manager:
|
||
leave_success = self.plugin.room_manager.remove_client_from_room(room_id, client_id)
|
||
if leave_success:
|
||
# 发送离开成功消息
|
||
success_message = {
|
||
"type": "leave_room_result",
|
||
"success": True,
|
||
"room_id": room_id,
|
||
"message": "成功离开房间"
|
||
}
|
||
self.send_message(success_message, client_socket, client_id)
|
||
|
||
# 通知房间内其他客户端
|
||
notification = {
|
||
"type": "client_left_room",
|
||
"client_id": client_id,
|
||
"room_id": room_id
|
||
}
|
||
self.plugin.room_manager.broadcast_to_room(room_id, notification, exclude_client_id=client_id)
|
||
except Exception as e:
|
||
print(f"✗ 离开房间消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def _handle_sync_data(self, message_data: Dict[str, Any], client_address: tuple, client_socket, client_id: str = None):
|
||
"""处理同步数据消息"""
|
||
try:
|
||
# 将同步数据广播到房间内的其他客户端
|
||
room_id = message_data.get("room_id")
|
||
if room_id and self.plugin.room_manager:
|
||
# 移除房间ID,避免重复处理
|
||
sync_data = message_data.copy()
|
||
sync_data.pop("room_id", None)
|
||
|
||
self.plugin.room_manager.broadcast_to_room(room_id, sync_data, exclude_client_id=client_id)
|
||
except Exception as e:
|
||
print(f"✗ 同步数据消息处理失败: {e}")
|
||
self.message_stats["errors"] += 1
|
||
|
||
def get_message_stats(self) -> Dict[str, Any]:
|
||
"""
|
||
获取消息统计信息
|
||
|
||
Returns:
|
||
消息统计字典
|
||
"""
|
||
return {
|
||
"stats": self.message_stats.copy(),
|
||
"config": self.message_config.copy(),
|
||
"handlers_count": len(self.message_handlers),
|
||
"queue_size": len(self.message_queue)
|
||
}
|
||
|
||
def reset_stats(self):
|
||
"""重置消息统计信息"""
|
||
try:
|
||
self.message_stats = {
|
||
"messages_received": 0,
|
||
"messages_sent": 0,
|
||
"messages_routed": 0,
|
||
"messages_dropped": 0,
|
||
"broadcast_messages": 0,
|
||
"handler_errors": 0,
|
||
"bytes_received": 0,
|
||
"bytes_sent": 0,
|
||
"errors": 0
|
||
}
|
||
print("✓ 消息统计信息已重置")
|
||
except Exception as e:
|
||
print(f"✗ 消息统计信息重置失败: {e}")
|
||
|
||
def set_message_config(self, config: Dict[str, Any]) -> bool:
|
||
"""
|
||
设置消息配置
|
||
|
||
Args:
|
||
config: 消息配置字典
|
||
|
||
Returns:
|
||
是否设置成功
|
||
"""
|
||
try:
|
||
self.message_config.update(config)
|
||
print(f"✓ 消息配置已更新: {self.message_config}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"✗ 消息配置设置失败: {e}")
|
||
return False
|
||
|
||
def get_message_config(self) -> Dict[str, Any]:
|
||
"""
|
||
获取消息配置
|
||
|
||
Returns:
|
||
消息配置字典
|
||
"""
|
||
return self.message_config.copy()
|
||
|
||
def _trigger_message_callback(self, callback_type: str, data: Dict[str, Any]):
|
||
"""
|
||
触发消息回调
|
||
|
||
Args:
|
||
callback_type: 回调类型
|
||
data: 回调数据
|
||
"""
|
||
try:
|
||
if callback_type in self.message_callbacks:
|
||
for callback in self.message_callbacks[callback_type]:
|
||
try:
|
||
callback(data)
|
||
except Exception as e:
|
||
print(f"✗ 消息回调执行失败: {callback_type} - {e}")
|
||
except Exception as e:
|
||
print(f"✗ 消息回调触发失败: {e}")
|
||
|
||
def register_message_callback(self, callback_type: str, callback: callable):
|
||
"""
|
||
注册消息回调
|
||
|
||
Args:
|
||
callback_type: 回调类型
|
||
callback: 回调函数
|
||
"""
|
||
try:
|
||
if callback_type in self.message_callbacks:
|
||
self.message_callbacks[callback_type].append(callback)
|
||
print(f"✓ 消息回调已注册: {callback_type}")
|
||
else:
|
||
print(f"✗ 无效的回调类型: {callback_type}")
|
||
except Exception as e:
|
||
print(f"✗ 消息回调注册失败: {e}")
|
||
|
||
def unregister_message_callback(self, callback_type: str, callback: callable):
|
||
"""
|
||
注销消息回调
|
||
|
||
Args:
|
||
callback_type: 回调类型
|
||
callback: 回调函数
|
||
"""
|
||
try:
|
||
if callback_type in self.message_callbacks:
|
||
if callback in self.message_callbacks[callback_type]:
|
||
self.message_callbacks[callback_type].remove(callback)
|
||
print(f"✓ 消息回调已注销: {callback_type}")
|
||
else:
|
||
print(f"✗ 无效的回调类型: {callback_type}")
|
||
except Exception as e:
|
||
print(f"✗ 消息回调注销失败: {e}") |