EG/plugins/user/server_architecture/utils/server_utils.py
2025-12-12 16:16:15 +08:00

334 lines
9.0 KiB
Python

"""
服务器架构插件工具类模块
提供实用工具函数
"""
import time
import json
import hashlib
import base64
from typing import Dict, Any, List, Optional
class ServerUtils:
"""
服务器工具类
提供实用工具函数
"""
@staticmethod
def generate_unique_id() -> str:
"""
生成唯一ID
Returns:
唯一ID字符串
"""
return hashlib.sha256(f"{time.time()}_{id(object())}".encode()).hexdigest()[:16]
@staticmethod
def validate_json(data: str) -> bool:
"""
验证JSON数据
Args:
data: JSON字符串
Returns:
是否为有效JSON
"""
try:
json.loads(data)
return True
except:
return False
@staticmethod
def format_bytes(bytes_count: int) -> str:
"""
格式化字节数
Args:
bytes_count: 字节数
Returns:
格式化后的字符串
"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_count < 1024.0:
return f"{bytes_count:.2f} {unit}"
bytes_count /= 1024.0
return f"{bytes_count:.2f} PB"
@staticmethod
def calculate_percentage(part: float, total: float) -> float:
"""
计算百分比
Args:
part: 部分值
total: 总值
Returns:
百分比
"""
if total == 0:
return 0.0
return (part / total) * 100
@staticmethod
def sanitize_input(input_str: str) -> str:
"""
清理输入字符串
Args:
input_str: 输入字符串
Returns:
清理后的字符串
"""
# 移除潜在的危险字符
dangerous_chars = ['<', '>', '&', '"', "'", '\\', '/', '..']
sanitized = input_str
for char in dangerous_chars:
sanitized = sanitized.replace(char, '')
return sanitized.strip()
@staticmethod
def validate_ip_address(ip: str) -> bool:
"""
验证IP地址格式
Args:
ip: IP地址字符串
Returns:
是否为有效IP地址
"""
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
try:
num = int(part)
if num < 0 or num > 255:
return False
except:
return False
return True
@staticmethod
def mask_sensitive_data(data: Dict[str, Any], sensitive_keys: List[str]) -> Dict[str, Any]:
"""
遮蔽敏感数据
Args:
data: 数据字典
sensitive_keys: 敏感键列表
Returns:
遮蔽后的数据字典
"""
masked_data = data.copy()
for key in sensitive_keys:
if key in masked_data:
masked_data[key] = "***MASKED***"
return masked_data
@staticmethod
def calculate_uptime(start_time: float) -> str:
"""
计算运行时间
Args:
start_time: 开始时间戳
Returns:
格式化的运行时间字符串
"""
if start_time == 0:
return "0s"
uptime_seconds = time.time() - start_time
days = int(uptime_seconds // 86400)
hours = int((uptime_seconds % 86400) // 3600)
minutes = int((uptime_seconds % 3600) // 60)
seconds = int(uptime_seconds % 60)
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
if seconds > 0 or not parts:
parts.append(f"{seconds}s")
return " ".join(parts)
@staticmethod
def encode_base64(data: str) -> str:
"""
Base64编码
Args:
data: 要编码的数据
Returns:
编码后的字符串
"""
return base64.b64encode(data.encode('utf-8')).decode('utf-8')
@staticmethod
def decode_base64(encoded_data: str) -> str:
"""
Base64解码
Args:
encoded_data: 要解码的数据
Returns:
解码后的字符串
"""
return base64.b64decode(encoded_data.encode('utf-8')).decode('utf-8')
@staticmethod
def get_system_info() -> Dict[str, Any]:
"""
获取系统信息
Returns:
系统信息字典
"""
try:
import platform
import psutil
# CPU信息
cpu_percent = psutil.cpu_percent(interval=1)
# 内存信息
memory = psutil.virtual_memory()
# 磁盘信息
disk = psutil.disk_usage('/')
return {
"platform": platform.system(),
"platform_version": platform.version(),
"architecture": platform.machine(),
"cpu_count": psutil.cpu_count(),
"cpu_percent": cpu_percent,
"memory_total": memory.total,
"memory_available": memory.available,
"memory_percent": memory.percent,
"disk_total": disk.total,
"disk_used": disk.used,
"disk_free": disk.free,
"disk_percent": (disk.used / disk.total) * 100 if disk.total > 0 else 0
}
except:
# 如果无法获取系统信息,返回基本数据
return {
"platform": "Unknown",
"platform_version": "Unknown",
"architecture": "Unknown",
"cpu_count": 0,
"cpu_percent": 0.0,
"memory_total": 0,
"memory_available": 0,
"memory_percent": 0.0,
"disk_total": 0,
"disk_used": 0,
"disk_free": 0,
"disk_percent": 0.0
}
@staticmethod
def create_response(success: bool, message: str, data: Any = None) -> Dict[str, Any]:
"""
创建标准响应
Args:
success: 是否成功
message: 消息
data: 数据
Returns:
标准响应字典
"""
response = {
"success": success,
"message": message,
"timestamp": time.time()
}
if data is not None:
response["data"] = data
return response
@staticmethod
def merge_dicts(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> Dict[str, Any]:
"""
合并两个字典(递归)
Args:
dict1: 第一个字典
dict2: 第二个字典
Returns:
合并后的字典
"""
result = dict1.copy()
for key, value in dict2.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = ServerUtils.merge_dicts(result[key], value)
else:
result[key] = value
return result
@staticmethod
def get_nested_value(data: Dict[str, Any], path: str, default: Any = None) -> Any:
"""
获取嵌套字典中的值
Args:
data: 数据字典
path: 路径(用.分隔)
default: 默认值
Returns:
获取到的值或默认值
"""
keys = path.split('.')
current = data
try:
for key in keys:
current = current[key]
return current
except (KeyError, TypeError):
return default
@staticmethod
def set_nested_value(data: Dict[str, Any], path: str, value: Any) -> None:
"""
设置嵌套字典中的值
Args:
data: 数据字典
path: 路径(用.分隔)
value: 要设置的值
"""
keys = path.split('.')
current = data
for key in keys[:-1]:
if key not in current:
current[key] = {}
current = current[key]
current[keys[-1]] = value