658 lines
22 KiB
Python
658 lines
22 KiB
Python
"""
|
|
协议处理器模块
|
|
处理各种通信协议和消息格式
|
|
"""
|
|
|
|
import time
|
|
import json
|
|
import zlib
|
|
import base64
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
class ProtocolHandler:
|
|
"""
|
|
协议处理器
|
|
处理各种通信协议和消息格式
|
|
"""
|
|
|
|
def __init__(self, plugin):
|
|
"""
|
|
初始化协议处理器
|
|
|
|
Args:
|
|
plugin: 实时通信插件实例
|
|
"""
|
|
self.plugin = plugin
|
|
self.enabled = False
|
|
self.initialized = False
|
|
|
|
# 协议配置
|
|
self.protocol_config = {
|
|
"supported_protocols": ["websocket", "json", "binary"],
|
|
"default_protocol": "json",
|
|
"enable_compression": True,
|
|
"compression_threshold": 1024, # 1KB
|
|
"enable_encryption": False,
|
|
"enable_message_validation": True,
|
|
"max_message_size": 65536, # 64KB
|
|
"enable_fragmentation": True,
|
|
"fragment_size": 8192, # 8KB
|
|
"enable_checksum": True
|
|
}
|
|
|
|
# 协议状态
|
|
self.protocol_state = {
|
|
"active_protocols": [],
|
|
"compressed_messages": 0,
|
|
"encrypted_messages": 0,
|
|
"fragmented_messages": 0
|
|
}
|
|
|
|
# 协议统计
|
|
self.protocol_stats = {
|
|
"messages_encoded": 0,
|
|
"messages_decoded": 0,
|
|
"bytes_encoded": 0,
|
|
"bytes_decoded": 0,
|
|
"compression_savings": 0,
|
|
"protocol_errors": 0
|
|
}
|
|
|
|
# 协议处理器映射
|
|
self.protocol_handlers = {
|
|
"json": {
|
|
"encode": self._encode_json_message,
|
|
"decode": self._decode_json_message
|
|
},
|
|
"binary": {
|
|
"encode": self._encode_binary_message,
|
|
"decode": self._decode_binary_message
|
|
}
|
|
}
|
|
|
|
# 回调函数
|
|
self.protocol_callbacks = {
|
|
"message_encoded": [],
|
|
"message_decoded": [],
|
|
"compression_applied": [],
|
|
"encryption_applied": [],
|
|
"protocol_error": []
|
|
}
|
|
|
|
# 时间戳记录
|
|
self.last_compression = 0.0
|
|
self.last_encryption = 0.0
|
|
|
|
print("✓ 协议处理器已创建")
|
|
|
|
def initialize(self) -> bool:
|
|
"""
|
|
初始化协议处理器
|
|
|
|
Returns:
|
|
是否初始化成功
|
|
"""
|
|
try:
|
|
print("正在初始化协议处理器...")
|
|
|
|
self.initialized = True
|
|
print("✓ 协议处理器初始化完成")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ 协议处理器初始化失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def enable(self) -> bool:
|
|
"""
|
|
启用协议处理器
|
|
|
|
Returns:
|
|
是否启用成功
|
|
"""
|
|
try:
|
|
if not self.initialized:
|
|
print("✗ 协议处理器未初始化")
|
|
return False
|
|
|
|
self.enabled = True
|
|
print("✓ 协议处理器已启用")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ 协议处理器启用失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def disable(self):
|
|
"""禁用协议处理器"""
|
|
try:
|
|
self.enabled = False
|
|
print("✓ 协议处理器已禁用")
|
|
|
|
except Exception as e:
|
|
print(f"✗ 协议处理器禁用失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def finalize(self):
|
|
"""清理协议处理器资源"""
|
|
try:
|
|
# 禁用协议处理器
|
|
if self.enabled:
|
|
self.disable()
|
|
|
|
# 清理回调
|
|
self.protocol_callbacks.clear()
|
|
|
|
self.initialized = False
|
|
print("✓ 协议处理器资源已清理")
|
|
|
|
except Exception as e:
|
|
print(f"✗ 协议处理器资源清理失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def update(self, dt: float):
|
|
"""
|
|
更新协议处理器状态
|
|
|
|
Args:
|
|
dt: 时间增量(秒)
|
|
"""
|
|
try:
|
|
if not self.enabled:
|
|
return
|
|
|
|
# 协议处理器不需要频繁更新
|
|
pass
|
|
|
|
except Exception as e:
|
|
print(f"✗ 协议处理器更新失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def encode_message(self, message: Dict[str, Any], protocol: str = None) -> Optional[bytes]:
|
|
"""
|
|
编码消息
|
|
|
|
Args:
|
|
message: 消息数据
|
|
protocol: 协议类型
|
|
|
|
Returns:
|
|
编码后的消息或None
|
|
"""
|
|
try:
|
|
if not self.enabled:
|
|
return None
|
|
|
|
# 使用默认协议或指定协议
|
|
if protocol is None:
|
|
protocol = self.protocol_config["default_protocol"]
|
|
|
|
# 验证协议支持
|
|
if protocol not in self.protocol_handlers:
|
|
print(f"✗ 不支持的协议: {protocol}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
# 验证消息大小
|
|
if self.protocol_config["enable_message_validation"]:
|
|
message_size = len(json.dumps(message, ensure_ascii=False).encode('utf-8'))
|
|
if message_size > self.protocol_config["max_message_size"]:
|
|
print(f"✗ 消息过大: {message_size} bytes")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
# 获取协议处理器
|
|
encoder = self.protocol_handlers[protocol]["encode"]
|
|
|
|
# 编码消息
|
|
encoded_message = encoder(message)
|
|
if encoded_message is None:
|
|
return None
|
|
|
|
# 更新统计
|
|
self.protocol_stats["messages_encoded"] += 1
|
|
self.protocol_stats["bytes_encoded"] += len(encoded_message)
|
|
|
|
# 触发消息编码回调
|
|
self._trigger_protocol_callback("message_encoded", {
|
|
"protocol": protocol,
|
|
"message_size": len(encoded_message),
|
|
"timestamp": time.time()
|
|
})
|
|
|
|
return encoded_message
|
|
|
|
except Exception as e:
|
|
print(f"✗ 消息编码失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
def decode_message(self, encoded_message: bytes, protocol: str = None) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
解码消息
|
|
|
|
Args:
|
|
encoded_message: 编码后的消息
|
|
protocol: 协议类型
|
|
|
|
Returns:
|
|
解码后的消息或None
|
|
"""
|
|
try:
|
|
if not self.enabled:
|
|
return None
|
|
|
|
# 使用默认协议或指定协议
|
|
if protocol is None:
|
|
protocol = self.protocol_config["default_protocol"]
|
|
|
|
# 验证协议支持
|
|
if protocol not in self.protocol_handlers:
|
|
print(f"✗ 不支持的协议: {protocol}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
# 获取协议处理器
|
|
decoder = self.protocol_handlers[protocol]["decode"]
|
|
|
|
# 解码消息
|
|
decoded_message = decoder(encoded_message)
|
|
if decoded_message is None:
|
|
return None
|
|
|
|
# 更新统计
|
|
self.protocol_stats["messages_decoded"] += 1
|
|
self.protocol_stats["bytes_decoded"] += len(encoded_message)
|
|
|
|
# 触发消息解码回调
|
|
self._trigger_protocol_callback("message_decoded", {
|
|
"protocol": protocol,
|
|
"message_size": len(encoded_message),
|
|
"timestamp": time.time()
|
|
})
|
|
|
|
return decoded_message
|
|
|
|
except Exception as e:
|
|
print(f"✗ 消息解码失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
def _encode_json_message(self, message: Dict[str, Any]) -> Optional[bytes]:
|
|
"""
|
|
JSON消息编码
|
|
|
|
Args:
|
|
message: 消息数据
|
|
|
|
Returns:
|
|
编码后的消息或None
|
|
"""
|
|
try:
|
|
# 序列化为JSON
|
|
json_string = json.dumps(message, ensure_ascii=False)
|
|
encoded_message = json_string.encode('utf-8')
|
|
|
|
# 检查是否需要压缩
|
|
if (self.protocol_config["enable_compression"] and
|
|
len(encoded_message) > self.protocol_config["compression_threshold"]):
|
|
compressed_message = zlib.compress(encoded_message)
|
|
if len(compressed_message) < len(encoded_message):
|
|
# 使用压缩版本
|
|
encoded_message = compressed_message
|
|
self.protocol_state["compressed_messages"] += 1
|
|
self.protocol_stats["compression_savings"] += len(encoded_message) - len(compressed_message)
|
|
|
|
# 触发压缩应用回调
|
|
self._trigger_protocol_callback("compression_applied", {
|
|
"original_size": len(json_string.encode('utf-8')),
|
|
"compressed_size": len(compressed_message),
|
|
"timestamp": time.time()
|
|
})
|
|
|
|
return encoded_message
|
|
|
|
except Exception as e:
|
|
print(f"✗ JSON消息编码失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
def _decode_json_message(self, encoded_message: bytes) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
JSON消息解码
|
|
|
|
Args:
|
|
encoded_message: 编码后的消息
|
|
|
|
Returns:
|
|
解码后的消息或None
|
|
"""
|
|
try:
|
|
# 尝试解压缩
|
|
try:
|
|
decompressed_message = zlib.decompress(encoded_message)
|
|
encoded_message = decompressed_message
|
|
except zlib.error:
|
|
# 不是压缩数据,使用原始数据
|
|
pass
|
|
|
|
# 解码为JSON
|
|
json_string = encoded_message.decode('utf-8')
|
|
decoded_message = json.loads(json_string)
|
|
|
|
return decoded_message
|
|
|
|
except Exception as e:
|
|
print(f"✗ JSON消息解码失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
def _encode_binary_message(self, message: Dict[str, Any]) -> Optional[bytes]:
|
|
"""
|
|
二进制消息编码
|
|
|
|
Args:
|
|
message: 消息数据
|
|
|
|
Returns:
|
|
编码后的消息或None
|
|
"""
|
|
try:
|
|
# 将消息转换为二进制格式(简化实现)
|
|
json_string = json.dumps(message, ensure_ascii=False)
|
|
binary_message = json_string.encode('utf-8')
|
|
|
|
# 添加简单的头部信息
|
|
header = len(binary_message).to_bytes(4, byteorder='big')
|
|
encoded_message = header + binary_message
|
|
|
|
# 检查是否需要压缩
|
|
if (self.protocol_config["enable_compression"] and
|
|
len(encoded_message) > self.protocol_config["compression_threshold"]):
|
|
compressed_message = zlib.compress(encoded_message)
|
|
if len(compressed_message) < len(encoded_message):
|
|
# 使用压缩版本
|
|
encoded_message = compressed_message
|
|
self.protocol_state["compressed_messages"] += 1
|
|
self.protocol_stats["compression_savings"] += len(encoded_message) - len(compressed_message)
|
|
|
|
# 触发压缩应用回调
|
|
self._trigger_protocol_callback("compression_applied", {
|
|
"original_size": len(header + binary_message),
|
|
"compressed_size": len(compressed_message),
|
|
"timestamp": time.time()
|
|
})
|
|
|
|
return encoded_message
|
|
|
|
except Exception as e:
|
|
print(f"✗ 二进制消息编码失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
def _decode_binary_message(self, encoded_message: bytes) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
二进制消息解码
|
|
|
|
Args:
|
|
encoded_message: 编码后的消息
|
|
|
|
Returns:
|
|
解码后的消息或None
|
|
"""
|
|
try:
|
|
# 尝试解压缩
|
|
try:
|
|
decompressed_message = zlib.decompress(encoded_message)
|
|
encoded_message = decompressed_message
|
|
except zlib.error:
|
|
# 不是压缩数据,使用原始数据
|
|
pass
|
|
|
|
# 解析头部信息
|
|
if len(encoded_message) < 4:
|
|
raise ValueError("消息太短")
|
|
|
|
message_length = int.from_bytes(encoded_message[:4], byteorder='big')
|
|
binary_message = encoded_message[4:4 + message_length]
|
|
|
|
# 解码为JSON
|
|
json_string = binary_message.decode('utf-8')
|
|
decoded_message = json.loads(json_string)
|
|
|
|
return decoded_message
|
|
|
|
except Exception as e:
|
|
print(f"✗ 二进制消息解码失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
def fragment_message(self, message: bytes, fragment_size: int = None) -> List[bytes]:
|
|
"""
|
|
分片消息
|
|
|
|
Args:
|
|
message: 消息数据
|
|
fragment_size: 分片大小
|
|
|
|
Returns:
|
|
分片列表
|
|
"""
|
|
try:
|
|
if not self.protocol_config["enable_fragmentation"]:
|
|
return [message]
|
|
|
|
if fragment_size is None:
|
|
fragment_size = self.protocol_config["fragment_size"]
|
|
|
|
fragments = []
|
|
for i in range(0, len(message), fragment_size):
|
|
fragment = message[i:i + fragment_size]
|
|
fragments.append(fragment)
|
|
|
|
self.protocol_state["fragmented_messages"] += 1
|
|
|
|
return fragments
|
|
|
|
except Exception as e:
|
|
print(f"✗ 消息分片失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return [message]
|
|
|
|
def reassemble_message(self, fragments: List[bytes]) -> Optional[bytes]:
|
|
"""
|
|
重组消息
|
|
|
|
Args:
|
|
fragments: 分片列表
|
|
|
|
Returns:
|
|
重组后的消息或None
|
|
"""
|
|
try:
|
|
if not fragments:
|
|
return None
|
|
|
|
reassembled_message = b''.join(fragments)
|
|
return reassembled_message
|
|
|
|
except Exception as e:
|
|
print(f"✗ 消息重组失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return None
|
|
|
|
def add_protocol_handler(self, protocol: str, encode_func: callable, decode_func: callable) -> bool:
|
|
"""
|
|
添加协议处理器
|
|
|
|
Args:
|
|
protocol: 协议名称
|
|
encode_func: 编码函数
|
|
decode_func: 解码函数
|
|
|
|
Returns:
|
|
是否添加成功
|
|
"""
|
|
try:
|
|
self.protocol_handlers[protocol] = {
|
|
"encode": encode_func,
|
|
"decode": decode_func
|
|
}
|
|
|
|
if protocol not in self.protocol_config["supported_protocols"]:
|
|
self.protocol_config["supported_protocols"].append(protocol)
|
|
|
|
print(f"✓ 协议处理器已添加: {protocol}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ 协议处理器添加失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return False
|
|
|
|
def remove_protocol_handler(self, protocol: str) -> bool:
|
|
"""
|
|
移除协议处理器
|
|
|
|
Args:
|
|
protocol: 协议名称
|
|
|
|
Returns:
|
|
是否移除成功
|
|
"""
|
|
try:
|
|
if protocol in self.protocol_handlers:
|
|
del self.protocol_handlers[protocol]
|
|
|
|
if protocol in self.protocol_config["supported_protocols"]:
|
|
self.protocol_config["supported_protocols"].remove(protocol)
|
|
|
|
print(f"✓ 协议处理器已移除: {protocol}")
|
|
return True
|
|
else:
|
|
print(f"✗ 协议处理器不存在: {protocol}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"✗ 协议处理器移除失败: {e}")
|
|
self.protocol_stats["protocol_errors"] += 1
|
|
return False
|
|
|
|
def get_protocol_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
获取协议统计信息
|
|
|
|
Returns:
|
|
协议统计字典
|
|
"""
|
|
return {
|
|
"state": self.protocol_state.copy(),
|
|
"stats": self.protocol_stats.copy(),
|
|
"config": self.protocol_config.copy()
|
|
}
|
|
|
|
def reset_stats(self):
|
|
"""重置协议统计信息"""
|
|
try:
|
|
self.protocol_stats = {
|
|
"messages_encoded": 0,
|
|
"messages_decoded": 0,
|
|
"bytes_encoded": 0,
|
|
"bytes_decoded": 0,
|
|
"compression_savings": 0,
|
|
"protocol_errors": 0
|
|
}
|
|
print("✓ 协议统计信息已重置")
|
|
except Exception as e:
|
|
print(f"✗ 协议统计信息重置失败: {e}")
|
|
|
|
def set_protocol_config(self, config: Dict[str, Any]) -> bool:
|
|
"""
|
|
设置协议配置
|
|
|
|
Args:
|
|
config: 协议配置字典
|
|
|
|
Returns:
|
|
是否设置成功
|
|
"""
|
|
try:
|
|
self.protocol_config.update(config)
|
|
print(f"✓ 协议配置已更新: {self.protocol_config}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ 协议配置设置失败: {e}")
|
|
return False
|
|
|
|
def get_protocol_config(self) -> Dict[str, Any]:
|
|
"""
|
|
获取协议配置
|
|
|
|
Returns:
|
|
协议配置字典
|
|
"""
|
|
return self.protocol_config.copy()
|
|
|
|
def _trigger_protocol_callback(self, callback_type: str, data: Dict[str, Any]):
|
|
"""
|
|
触发协议回调
|
|
|
|
Args:
|
|
callback_type: 回调类型
|
|
data: 回调数据
|
|
"""
|
|
try:
|
|
if callback_type in self.protocol_callbacks:
|
|
for callback in self.protocol_callbacks[callback_type]:
|
|
try:
|
|
callback(data)
|
|
except Exception as e:
|
|
print(f"✗ 协议回调执行失败: {callback_type} - {e}")
|
|
except Exception as e:
|
|
print(f"✗ 协议回调触发失败: {e}")
|
|
|
|
def register_protocol_callback(self, callback_type: str, callback: callable):
|
|
"""
|
|
注册协议回调
|
|
|
|
Args:
|
|
callback_type: 回调类型
|
|
callback: 回调函数
|
|
"""
|
|
try:
|
|
if callback_type in self.protocol_callbacks:
|
|
self.protocol_callbacks[callback_type].append(callback)
|
|
print(f"✓ 协议回调已注册: {callback_type}")
|
|
else:
|
|
print(f"✗ 无效的回调类型: {callback_type}")
|
|
except Exception as e:
|
|
print(f"✗ 协议回调注册失败: {e}")
|
|
|
|
def unregister_protocol_callback(self, callback_type: str, callback: callable):
|
|
"""
|
|
注销协议回调
|
|
|
|
Args:
|
|
callback_type: 回调类型
|
|
callback: 回调函数
|
|
"""
|
|
try:
|
|
if callback_type in self.protocol_callbacks:
|
|
if callback in self.protocol_callbacks[callback_type]:
|
|
self.protocol_callbacks[callback_type].remove(callback)
|
|
print(f"✓ 协议回调已注销: {callback_type}")
|
|
else:
|
|
print(f"✗ 无效的回调类型: {callback_type}")
|
|
except Exception as e:
|
|
print(f"✗ 协议回调注销失败: {e}") |