648 lines
21 KiB
Python
648 lines
21 KiB
Python
"""
|
||
加密管理模块
|
||
负责处理网络数据的加密和解密
|
||
"""
|
||
|
||
import time
|
||
from typing import Dict, Any, Optional
|
||
import hashlib
|
||
import hmac
|
||
import secrets
|
||
from cryptography.fernet import Fernet
|
||
from cryptography.hazmat.primitives import hashes
|
||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||
import base64
|
||
|
||
class EncryptionManager:
|
||
"""
|
||
加密管理器
|
||
负责处理网络数据的加密和解密
|
||
"""
|
||
|
||
def __init__(self, plugin):
|
||
"""
|
||
初始化加密管理器
|
||
|
||
Args:
|
||
plugin: 网络同步插件实例
|
||
"""
|
||
self.plugin = plugin
|
||
self.enabled = False
|
||
self.initialized = False
|
||
|
||
# 加密配置
|
||
self.encryption_config = {
|
||
"enable_encryption": False,
|
||
"encryption_algorithm": "AES", # Fernet使用AES 128加密
|
||
"key_derivation_function": "PBKDF2",
|
||
"salt_length": 16,
|
||
"iterations": 100000,
|
||
"enable_message_authentication": True,
|
||
"enable_session_keys": True,
|
||
"session_key_lifetime": 3600, # 1小时
|
||
"enable_key_rotation": True,
|
||
"key_rotation_interval": 86400 # 24小时
|
||
}
|
||
|
||
# 加密密钥
|
||
self.master_key = None
|
||
self.session_key = None
|
||
self.session_key_timestamp = 0.0
|
||
self.previous_key = None
|
||
|
||
# 加密器实例
|
||
self.cipher = None
|
||
self.previous_cipher = None
|
||
|
||
# 加密统计
|
||
self.encryption_stats = {
|
||
"messages_encrypted": 0,
|
||
"messages_decrypted": 0,
|
||
"encryption_errors": 0,
|
||
"decryption_errors": 0,
|
||
"key_rotations": 0,
|
||
"auth_failures": 0,
|
||
"bytes_encrypted": 0,
|
||
"bytes_decrypted": 0
|
||
}
|
||
|
||
# 回调函数
|
||
self.encryption_callbacks = {
|
||
"encryption_enabled": [],
|
||
"key_rotated": [],
|
||
"auth_failed": [],
|
||
"encryption_error": []
|
||
}
|
||
|
||
# 时间戳记录
|
||
self.last_key_rotation = 0.0
|
||
self.last_encryption_operation = 0.0
|
||
|
||
print("✓ 加密管理器已创建")
|
||
|
||
def initialize(self) -> bool:
|
||
"""
|
||
初始化加密管理器
|
||
|
||
Returns:
|
||
是否初始化成功
|
||
"""
|
||
try:
|
||
# 生成主密钥
|
||
self._generate_master_key()
|
||
|
||
self.initialized = True
|
||
print("✓ 加密管理器初始化完成")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 加密管理器初始化失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def enable(self) -> bool:
|
||
"""
|
||
启用加密管理器
|
||
|
||
Returns:
|
||
是否启用成功
|
||
"""
|
||
try:
|
||
if not self.initialized:
|
||
print("✗ 加密管理器未初始化")
|
||
return False
|
||
|
||
self.enabled = True
|
||
|
||
# 生成会话密钥
|
||
if self.encryption_config["enable_session_keys"]:
|
||
self._generate_session_key()
|
||
|
||
# 触发加密启用回调
|
||
self._trigger_encryption_callback("encryption_enabled", {
|
||
"timestamp": time.time()
|
||
})
|
||
|
||
print("✓ 加密管理器已启用")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 加密管理器启用失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def disable(self):
|
||
"""禁用加密管理器"""
|
||
try:
|
||
self.enabled = False
|
||
self.cipher = None
|
||
self.session_key = None
|
||
print("✓ 加密管理器已禁用")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 加密管理器禁用失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def finalize(self):
|
||
"""清理加密管理器资源"""
|
||
try:
|
||
self.disable()
|
||
self.encryption_callbacks.clear()
|
||
self.master_key = None
|
||
self.previous_key = None
|
||
self.initialized = False
|
||
print("✓ 加密管理器资源已清理")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 加密管理器资源清理失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def update(self, dt: float):
|
||
"""
|
||
更新加密管理器状态
|
||
|
||
Args:
|
||
dt: 时间增量
|
||
"""
|
||
try:
|
||
if not self.enabled:
|
||
return
|
||
|
||
current_time = time.time()
|
||
self.last_encryption_operation = current_time
|
||
|
||
# 检查密钥轮换
|
||
if (self.encryption_config["enable_key_rotation"] and
|
||
current_time - self.last_key_rotation > self.encryption_config["key_rotation_interval"]):
|
||
self._rotate_keys()
|
||
|
||
# 检查会话密钥过期
|
||
if (self.encryption_config["enable_session_keys"] and
|
||
current_time - self.session_key_timestamp > self.encryption_config["session_key_lifetime"]):
|
||
self._generate_session_key()
|
||
|
||
except Exception as e:
|
||
print(f"✗ 加密管理器更新失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def _generate_master_key(self):
|
||
"""生成主密钥"""
|
||
try:
|
||
# 生成随机主密钥
|
||
self.master_key = Fernet.generate_key()
|
||
print("✓ 主密钥已生成")
|
||
except Exception as e:
|
||
print(f"✗ 主密钥生成失败: {e}")
|
||
|
||
def _generate_session_key(self):
|
||
"""生成会话密钥"""
|
||
try:
|
||
if not self.master_key:
|
||
self._generate_master_key()
|
||
|
||
# 生成新的会话密钥
|
||
self.session_key = Fernet.generate_key()
|
||
self.session_key_timestamp = time.time()
|
||
self.cipher = Fernet(self.session_key)
|
||
|
||
print("✓ 会话密钥已生成")
|
||
except Exception as e:
|
||
print(f"✗ 会话密钥生成失败: {e}")
|
||
|
||
def _rotate_keys(self):
|
||
"""轮换密钥"""
|
||
try:
|
||
# 保存当前密钥作为前一个密钥
|
||
self.previous_key = self.session_key
|
||
self.previous_cipher = self.cipher
|
||
|
||
# 生成新密钥
|
||
self._generate_session_key()
|
||
|
||
self.last_key_rotation = time.time()
|
||
self.encryption_stats["key_rotations"] += 1
|
||
|
||
# 触发密钥轮换回调
|
||
self._trigger_encryption_callback("key_rotated", {
|
||
"timestamp": self.last_key_rotation
|
||
})
|
||
|
||
print("✓ 密钥已轮换")
|
||
except Exception as e:
|
||
print(f"✗ 密钥轮换失败: {e}")
|
||
|
||
def encrypt_data(self, data: bytes, add_auth: bool = None) -> Optional[bytes]:
|
||
"""
|
||
加密数据
|
||
|
||
Args:
|
||
data: 要加密的数据
|
||
add_auth: 是否添加认证(None表示使用配置)
|
||
|
||
Returns:
|
||
加密后的数据或None
|
||
"""
|
||
try:
|
||
if not self.enabled:
|
||
print("✗ 加密管理器未启用")
|
||
return data # 不加密直接返回
|
||
|
||
if add_auth is None:
|
||
add_auth = self.encryption_config["enable_message_authentication"]
|
||
|
||
self.encryption_stats["messages_encrypted"] += 1
|
||
self.encryption_stats["bytes_encrypted"] += len(data)
|
||
|
||
# 使用Fernet加密
|
||
if self.cipher:
|
||
encrypted_data = self.cipher.encrypt(data)
|
||
else:
|
||
# 如果没有会话密钥,使用主密钥
|
||
cipher = Fernet(self.master_key)
|
||
encrypted_data = cipher.encrypt(data)
|
||
|
||
# 添加认证信息
|
||
if add_auth:
|
||
# 使用HMAC添加消息认证码
|
||
salt = secrets.token_bytes(self.encryption_config["salt_length"])
|
||
mac = hmac.new(self.session_key or self.master_key,
|
||
encrypted_data + salt,
|
||
hashlib.sha256).digest()
|
||
encrypted_data = salt + mac + encrypted_data
|
||
|
||
return encrypted_data
|
||
|
||
except Exception as e:
|
||
print(f"✗ 数据加密失败: {e}")
|
||
self.encryption_stats["encryption_errors"] += 1
|
||
|
||
# 触发加密错误回调
|
||
self._trigger_encryption_callback("encryption_error", {
|
||
"error": str(e),
|
||
"data_size": len(data) if data else 0
|
||
})
|
||
|
||
return None
|
||
|
||
def decrypt_data(self, encrypted_data: bytes, verify_auth: bool = None) -> Optional[bytes]:
|
||
"""
|
||
解密数据
|
||
|
||
Args:
|
||
encrypted_data: 要解密的数据
|
||
verify_auth: 是否验证认证(None表示使用配置)
|
||
|
||
Returns:
|
||
解密后的数据或None
|
||
"""
|
||
try:
|
||
if not self.enabled:
|
||
print("✗ 加密管理器未启用")
|
||
return encrypted_data # 不解密直接返回
|
||
|
||
if verify_auth is None:
|
||
verify_auth = self.encryption_config["enable_message_authentication"]
|
||
|
||
original_size = len(encrypted_data)
|
||
self.encryption_stats["messages_decrypted"] += 1
|
||
|
||
# 验证认证信息
|
||
data_to_decrypt = encrypted_data
|
||
if verify_auth:
|
||
try:
|
||
salt_length = self.encryption_config["salt_length"]
|
||
mac_length = 32 # SHA256 HMAC长度
|
||
|
||
if len(encrypted_data) < salt_length + mac_length:
|
||
raise ValueError("数据太短,无法包含认证信息")
|
||
|
||
salt = encrypted_data[:salt_length]
|
||
expected_mac = encrypted_data[salt_length:salt_length + mac_length]
|
||
data_to_decrypt = encrypted_data[salt_length + mac_length:]
|
||
|
||
# 验证MAC
|
||
actual_mac = hmac.new(self.session_key or self.master_key,
|
||
data_to_decrypt + salt,
|
||
hashlib.sha256).digest()
|
||
|
||
if not hmac.compare_digest(expected_mac, actual_mac):
|
||
raise ValueError("消息认证失败")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 消息认证失败: {e}")
|
||
self.encryption_stats["auth_failures"] += 1
|
||
|
||
# 触发认证失败回调
|
||
self._trigger_encryption_callback("auth_failed", {
|
||
"error": str(e)
|
||
})
|
||
|
||
return None
|
||
|
||
# 使用Fernet解密
|
||
try:
|
||
if self.cipher:
|
||
decrypted_data = self.cipher.decrypt(data_to_decrypt)
|
||
else:
|
||
# 尝试使用主密钥解密
|
||
cipher = Fernet(self.master_key)
|
||
decrypted_data = cipher.decrypt(data_to_decrypt)
|
||
|
||
self.encryption_stats["bytes_decrypted"] += len(decrypted_data)
|
||
return decrypted_data
|
||
|
||
except Exception as e:
|
||
# 如果当前密钥失败,尝试使用前一个密钥(密钥轮换期间)
|
||
if self.previous_cipher and self.encryption_config["enable_key_rotation"]:
|
||
try:
|
||
decrypted_data = self.previous_cipher.decrypt(data_to_decrypt)
|
||
self.encryption_stats["bytes_decrypted"] += len(decrypted_data)
|
||
return decrypted_data
|
||
except:
|
||
pass
|
||
|
||
raise e # 重新抛出原始异常
|
||
|
||
except Exception as e:
|
||
print(f"✗ 数据解密失败: {e}")
|
||
self.encryption_stats["decryption_errors"] += 1
|
||
|
||
# 触发加密错误回调
|
||
self._trigger_encryption_callback("encryption_error", {
|
||
"error": str(e),
|
||
"data_size": original_size
|
||
})
|
||
|
||
return None
|
||
|
||
def hash_password(self, password: str, salt: bytes = None) -> Dict[str, Any]:
|
||
"""
|
||
哈希密码
|
||
|
||
Args:
|
||
password: 密码
|
||
salt: 盐值(None表示自动生成)
|
||
|
||
Returns:
|
||
包含哈希值和盐值的字典
|
||
"""
|
||
try:
|
||
if salt is None:
|
||
salt = secrets.token_bytes(16)
|
||
|
||
# 使用PBKDF2进行密码哈希
|
||
kdf = PBKDF2HMAC(
|
||
algorithm=hashes.SHA256(),
|
||
length=32,
|
||
salt=salt,
|
||
iterations=self.encryption_config["iterations"]
|
||
)
|
||
|
||
password_bytes = password.encode('utf-8')
|
||
hashed_password = base64.urlsafe_b64encode(kdf.derive(password_bytes))
|
||
|
||
return {
|
||
"hash": hashed_password.decode('utf-8'),
|
||
"salt": base64.urlsafe_b64encode(salt).decode('utf-8')
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"✗ 密码哈希失败: {e}")
|
||
return {}
|
||
|
||
def verify_password(self, password: str, hashed_password: str, salt: str) -> bool:
|
||
"""
|
||
验证密码
|
||
|
||
Args:
|
||
password: 密码
|
||
hashed_password: 哈希值
|
||
salt: 盐值
|
||
|
||
Returns:
|
||
密码是否正确
|
||
"""
|
||
try:
|
||
# 解码盐值和哈希值
|
||
salt_bytes = base64.urlsafe_b64decode(salt.encode('utf-8'))
|
||
stored_hash = base64.urlsafe_b64decode(hashed_password.encode('utf-8'))
|
||
|
||
# 使用相同的参数进行哈希
|
||
kdf = PBKDF2HMAC(
|
||
algorithm=hashes.SHA256(),
|
||
length=32,
|
||
salt=salt_bytes,
|
||
iterations=self.encryption_config["iterations"]
|
||
)
|
||
|
||
password_bytes = password.encode('utf-8')
|
||
|
||
# 验证密码
|
||
try:
|
||
kdf.verify(password_bytes, stored_hash)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"✗ 密码验证失败: {e}")
|
||
return False
|
||
|
||
def generate_token(self, data: str, lifetime: int = 3600) -> str:
|
||
"""
|
||
生成令牌
|
||
|
||
Args:
|
||
data: 令牌数据
|
||
lifetime: 生存时间(秒)
|
||
|
||
Returns:
|
||
生成的令牌
|
||
"""
|
||
try:
|
||
# 创建包含数据和过期时间的令牌
|
||
token_data = {
|
||
"data": data,
|
||
"expires": time.time() + lifetime
|
||
}
|
||
|
||
# 序列化数据
|
||
token_string = str(token_data)
|
||
|
||
# 加密令牌
|
||
token_bytes = token_string.encode('utf-8')
|
||
encrypted_token = self.encrypt_data(token_bytes)
|
||
|
||
if encrypted_token:
|
||
# Base64编码以便传输
|
||
return base64.urlsafe_b64encode(encrypted_token).decode('utf-8')
|
||
else:
|
||
return ""
|
||
|
||
except Exception as e:
|
||
print(f"✗ 令牌生成失败: {e}")
|
||
return ""
|
||
|
||
def verify_token(self, token: str) -> Optional[str]:
|
||
"""
|
||
验证令牌
|
||
|
||
Args:
|
||
token: 令牌
|
||
|
||
Returns:
|
||
令牌数据或None
|
||
"""
|
||
try:
|
||
# Base64解码
|
||
token_bytes = base64.urlsafe_b64decode(token.encode('utf-8'))
|
||
|
||
# 解密令牌
|
||
decrypted_data = self.decrypt_data(token_bytes)
|
||
|
||
if decrypted_data:
|
||
# 反序列化数据
|
||
token_string = decrypted_data.decode('utf-8')
|
||
token_data = eval(token_string) # 注意:实际应用中应使用安全的反序列化方法
|
||
|
||
# 检查过期时间
|
||
if token_data.get("expires", 0) > time.time():
|
||
return token_data.get("data")
|
||
else:
|
||
print("✗ 令牌已过期")
|
||
return None
|
||
else:
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"✗ 令牌验证失败: {e}")
|
||
return None
|
||
|
||
def get_encryption_stats(self) -> Dict[str, Any]:
|
||
"""
|
||
获取加密统计信息
|
||
|
||
Returns:
|
||
加密统计字典
|
||
"""
|
||
return self.encryption_stats.copy()
|
||
|
||
def reset_encryption_stats(self):
|
||
"""重置加密统计信息"""
|
||
try:
|
||
self.encryption_stats = {
|
||
"messages_encrypted": 0,
|
||
"messages_decrypted": 0,
|
||
"encryption_errors": 0,
|
||
"decryption_errors": 0,
|
||
"key_rotations": 0,
|
||
"auth_failures": 0,
|
||
"bytes_encrypted": 0,
|
||
"bytes_decrypted": 0
|
||
}
|
||
print("✓ 加密统计信息已重置")
|
||
except Exception as e:
|
||
print(f"✗ 加密统计信息重置失败: {e}")
|
||
|
||
def set_encryption_config(self, config: Dict[str, Any]) -> bool:
|
||
"""
|
||
设置加密配置
|
||
|
||
Args:
|
||
config: 加密配置字典
|
||
|
||
Returns:
|
||
是否设置成功
|
||
"""
|
||
try:
|
||
old_enabled = self.encryption_config.get("enable_encryption", False)
|
||
new_enabled = config.get("enable_encryption", False)
|
||
|
||
self.encryption_config.update(config)
|
||
print(f"✓ 加密配置已更新: {self.encryption_config}")
|
||
|
||
# 如果启用了加密,确保密钥已生成
|
||
if new_enabled and not old_enabled:
|
||
if not self.master_key:
|
||
self._generate_master_key()
|
||
if self.encryption_config["enable_session_keys"]:
|
||
self._generate_session_key()
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"✗ 加密配置设置失败: {e}")
|
||
return False
|
||
|
||
def get_encryption_config(self) -> Dict[str, Any]:
|
||
"""
|
||
获取加密配置
|
||
|
||
Returns:
|
||
加密配置字典
|
||
"""
|
||
return self.encryption_config.copy()
|
||
|
||
def is_encryption_enabled(self) -> bool:
|
||
"""
|
||
检查加密是否启用
|
||
|
||
Returns:
|
||
加密是否启用
|
||
"""
|
||
return self.enabled and self.encryption_config.get("enable_encryption", False)
|
||
|
||
def _trigger_encryption_callback(self, callback_type: str, data: Dict[str, Any]):
|
||
"""
|
||
触发加密回调
|
||
|
||
Args:
|
||
callback_type: 回调类型
|
||
data: 回调数据
|
||
"""
|
||
try:
|
||
if callback_type in self.encryption_callbacks:
|
||
for callback in self.encryption_callbacks[callback_type]:
|
||
try:
|
||
callback(data)
|
||
except Exception as e:
|
||
print(f"✗ 加密回调执行失败: {e}")
|
||
except Exception as e:
|
||
print(f"✗ 加密回调触发失败: {e}")
|
||
|
||
def register_encryption_callback(self, callback_type: str, callback: callable):
|
||
"""
|
||
注册加密回调
|
||
|
||
Args:
|
||
callback_type: 回调类型
|
||
callback: 回调函数
|
||
"""
|
||
try:
|
||
if callback_type in self.encryption_callbacks:
|
||
self.encryption_callbacks[callback_type].append(callback)
|
||
print(f"✓ 加密回调已注册: {callback_type}")
|
||
else:
|
||
print(f"✗ 无效的回调类型: {callback_type}")
|
||
except Exception as e:
|
||
print(f"✗ 加密回调注册失败: {e}")
|
||
|
||
def unregister_encryption_callback(self, callback_type: str, callback: callable):
|
||
"""
|
||
注销加密回调
|
||
|
||
Args:
|
||
callback_type: 回调类型
|
||
callback: 回调函数
|
||
"""
|
||
try:
|
||
if callback_type in self.encryption_callbacks:
|
||
if callback in self.encryption_callbacks[callback_type]:
|
||
self.encryption_callbacks[callback_type].remove(callback)
|
||
print(f"✓ 加密回调已注销: {callback_type}")
|
||
except Exception as e:
|
||
print(f"✗ 加密回调注销失败: {e}") |