EG/plugins/user/haptic_feedback_system/utils/haptic_utils.py
2025-12-12 16:16:15 +08:00

624 lines
20 KiB
Python

"""
工具类模块
提供触觉反馈系统所需的实用工具函数
"""
import time
import math
from typing import Dict, Any, List, Optional, Union
import threading
import json
class HapticUtils:
"""
触觉工具类
提供触觉反馈系统所需的实用工具函数
"""
def __init__(self, plugin):
"""
初始化触觉工具类
Args:
plugin: 触觉反馈系统插件实例
"""
self.plugin = plugin
self.enabled = False
self.initialized = False
# 性能监控
self.performance_metrics = {
'update_times': [],
'effect_processing_times': [],
'device_response_times': [],
'memory_usage': 0
}
# 日志配置
self.log_config = {
'enable_logging': True,
'log_level': 'INFO',
'log_file': '/home/hello/EG/plugins/user/haptic_feedback_system/logs/haptic.log',
'max_log_size': 10 * 1024 * 1024, # 10MB
'log_format': '[{timestamp}] {level}: {message}'
}
# 数据转换配置
self.conversion_config = {
'intensity_curve': 'linear', # linear, exponential, logarithmic
'frequency_mapping': 'linear',
'duration_scaling': 'linear'
}
# 数学工具
self.math_utils = MathUtils()
# 缓存管理
self.cache_manager = CacheManager()
# 调试工具
self.debug_tools = DebugTools()
# 统计信息
self.stats = {
'functions_called': 0,
'data_processed': 0,
'cache_hits': 0,
'cache_misses': 0
}
print("✓ 触觉工具类已创建")
def initialize(self) -> bool:
"""
初始化触觉工具类
Returns:
是否初始化成功
"""
try:
# 创建日志目录
import os
log_dir = "/home/hello/EG/plugins/user/haptic_feedback_system/logs"
os.makedirs(log_dir, exist_ok=True)
self.initialized = True
print("✓ 触觉工具类初始化完成")
return True
except Exception as e:
print(f"✗ 触觉工具类初始化失败: {e}")
import traceback
traceback.print_exc()
return False
def enable(self) -> bool:
"""
启用触觉工具类
Returns:
是否启用成功
"""
try:
if not self.initialized:
print("✗ 触觉工具类未初始化")
return False
self.enabled = True
print("✓ 触觉工具类已启用")
return True
except Exception as e:
print(f"✗ 触觉工具类启用失败: {e}")
import traceback
traceback.print_exc()
return False
def disable(self):
"""禁用触觉工具类"""
try:
self.enabled = False
print("✓ 触觉工具类已禁用")
except Exception as e:
print(f"✗ 触觉工具类禁用失败: {e}")
import traceback
traceback.print_exc()
def finalize(self):
"""清理触觉工具类资源"""
try:
self.disable()
self.performance_metrics.clear()
self.cache_manager.clear()
self.initialized = False
print("✓ 触觉工具类资源已清理")
except Exception as e:
print(f"✗ 触觉工具类资源清理失败: {e}")
import traceback
traceback.print_exc()
def update(self, dt: float):
"""
更新触觉工具类状态
Args:
dt: 时间增量
"""
try:
if not self.enabled:
return
# 更新性能指标
self._update_performance_metrics(dt)
except Exception as e:
print(f"✗ 触觉工具类更新失败: {e}")
import traceback
traceback.print_exc()
def _update_performance_metrics(self, dt: float):
"""更新性能指标"""
try:
# 记录更新时间
self.performance_metrics['update_times'].append(dt)
# 保持历史记录大小
if len(self.performance_metrics['update_times']) > 1000:
self.performance_metrics['update_times'] = self.performance_metrics['update_times'][-1000:]
except Exception as e:
print(f"✗ 性能指标更新失败: {e}")
def log_message(self, message: str, level: str = 'INFO'):
"""
记录日志消息
Args:
message: 日志消息
level: 日志级别 (DEBUG, INFO, WARNING, ERROR)
"""
try:
if not self.log_config['enable_logging']:
return
# 检查日志级别
level_priority = {'DEBUG': 0, 'INFO': 1, 'WARNING': 2, 'ERROR': 3}
config_level = self.log_config['log_level']
if level_priority.get(level, 1) < level_priority.get(config_level, 1):
return
# 格式化日志消息
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
formatted_message = self.log_config['log_format'].format(
timestamp=timestamp,
level=level,
message=message
)
# 输出到控制台
print(formatted_message)
# 写入日志文件
try:
with open(self.log_config['log_file'], 'a', encoding='utf-8') as f:
f.write(formatted_message + '\n')
except Exception as e:
pass # 忽略文件写入错误
except Exception as e:
pass # 忽略日志记录错误
def measure_performance(self, func: callable, *args, **kwargs) -> tuple:
"""
测量函数性能
Args:
func: 要测量的函数
*args: 函数参数
**kwargs: 函数关键字参数
Returns:
(结果, 执行时间)
"""
try:
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
execution_time = end_time - start_time
# 记录到性能指标
metric_name = f"{func.__name__}_times"
if metric_name not in self.performance_metrics:
self.performance_metrics[metric_name] = []
self.performance_metrics[metric_name].append(execution_time)
# 保持历史记录大小
if len(self.performance_metrics[metric_name]) > 1000:
self.performance_metrics[metric_name] = self.performance_metrics[metric_name][-1000:]
self.stats['functions_called'] += 1
return (result, execution_time)
except Exception as e:
self.log_message(f"性能测量失败: {e}", 'ERROR')
raise
def convert_intensity(self, intensity: float, curve: str = None) -> float:
"""
转换强度值
Args:
intensity: 原始强度值 (0.0-1.0)
curve: 转换曲线类型
Returns:
转换后的强度值
"""
try:
curve_type = curve or self.conversion_config['intensity_curve']
intensity = max(0.0, min(1.0, intensity))
if curve_type == 'exponential':
# 指数曲线
return math.pow(intensity, 2)
elif curve_type == 'logarithmic':
# 对数曲线
return math.log(intensity + 1) / math.log(2)
else:
# 线性曲线
return intensity
except Exception as e:
self.log_message(f"强度转换失败: {e}", 'ERROR')
return intensity
def convert_frequency(self, frequency: float, min_freq: float = 1.0,
max_freq: float = 100.0, mapping: str = None) -> float:
"""
转换频率值
Args:
frequency: 原始频率值
min_freq: 最小频率
max_freq: 最大频率
mapping: 映射类型
Returns:
转换后的频率值
"""
try:
mapping_type = mapping or self.conversion_config['frequency_mapping']
frequency = max(0.0, frequency)
if mapping_type == 'logarithmic':
# 对数映射
log_min = math.log(max(min_freq, 1.0))
log_max = math.log(max(max_freq, 1.0))
log_freq = math.log(max(frequency, 1.0))
normalized = (log_freq - log_min) / (log_max - log_min)
return min_freq + normalized * (max_freq - min_freq)
else:
# 线性映射
return max(min_freq, min(max_freq, frequency))
except Exception as e:
self.log_message(f"频率转换失败: {e}", 'ERROR')
return frequency
def scale_duration(self, duration: float, scale_factor: float = 1.0,
scaling_type: str = None) -> float:
"""
缩放持续时间
Args:
duration: 原始持续时间
scale_factor: 缩放因子
scaling_type: 缩放类型
Returns:
缩放后的持续时间
"""
try:
scaling = scaling_type or self.conversion_config['duration_scaling']
duration = max(0.0, duration)
scale_factor = max(0.0, scale_factor)
if scaling == 'exponential':
return duration * math.pow(scale_factor, 2)
elif scaling == 'logarithmic':
return duration * math.log(scale_factor + 1)
else:
return duration * scale_factor
except Exception as e:
self.log_message(f"持续时间缩放失败: {e}", 'ERROR')
return duration
def interpolate_values(self, start_value: float, end_value: float,
progress: float, interpolation_type: str = 'linear') -> float:
"""
插值计算
Args:
start_value: 起始值
end_value: 结束值
progress: 进度 (0.0-1.0)
interpolation_type: 插值类型
Returns:
插值结果
"""
try:
progress = max(0.0, min(1.0, progress))
if interpolation_type == 'ease_in':
progress = math.pow(progress, 2)
elif interpolation_type == 'ease_out':
progress = 1.0 - math.pow(1.0 - progress, 2)
elif interpolation_type == 'ease_in_out':
progress = 0.5 * (1 - math.cos(progress * math.pi))
# linear 为默认情况,无需特殊处理
return start_value + (end_value - start_value) * progress
except Exception as e:
self.log_message(f"插值计算失败: {e}", 'ERROR')
return start_value
def calculate_distance(self, pos1: tuple, pos2: tuple) -> float:
"""
计算两点间距离
Args:
pos1: 第一个点 (x, y, z)
pos2: 第二个点 (x, y, z)
Returns:
距离值
"""
try:
dx = pos2[0] - pos1[0]
dy = pos2[1] - pos1[1]
dz = pos2[2] - pos1[2]
return math.sqrt(dx*dx + dy*dy + dz*dz)
except Exception as e:
self.log_message(f"距离计算失败: {e}", 'ERROR')
return 0.0
def normalize_vector(self, vector: tuple) -> tuple:
"""
归一化向量
Args:
vector: 输入向量 (x, y, z)
Returns:
归一化后的向量
"""
try:
magnitude = math.sqrt(vector[0]**2 + vector[1]**2 + vector[2]**2)
if magnitude > 0:
return (vector[0]/magnitude, vector[1]/magnitude, vector[2]/magnitude)
else:
return (0, 0, 0)
except Exception as e:
self.log_message(f"向量归一化失败: {e}", 'ERROR')
return (0, 0, 0)
def get_performance_stats(self) -> Dict[str, Any]:
"""
获取性能统计信息
Returns:
性能统计字典
"""
try:
stats = {}
for key, times in self.performance_metrics.items():
if times:
stats[key] = {
'count': len(times),
'average': sum(times) / len(times),
'min': min(times),
'max': max(times),
'total': sum(times)
}
return stats
except Exception as e:
self.log_message(f"性能统计获取失败: {e}", 'ERROR')
return {}
def get_system_info(self) -> Dict[str, Any]:
"""
获取系统信息
Returns:
系统信息字典
"""
try:
import platform
import psutil
return {
'platform': platform.system(),
'platform_version': platform.version(),
'processor': platform.processor(),
'cpu_count': psutil.cpu_count(),
'memory_total': psutil.virtual_memory().total,
'memory_available': psutil.virtual_memory().available,
'plugin_version': getattr(self.plugin, 'version', 'unknown')
}
except Exception as e:
self.log_message(f"系统信息获取失败: {e}", 'ERROR')
return {}
def get_stats(self) -> Dict[str, int]:
"""
获取统计信息
Returns:
统计信息字典
"""
return self.stats.copy()
def reset_stats(self):
"""重置统计信息"""
try:
self.stats = {
'functions_called': 0,
'data_processed': 0,
'cache_hits': 0,
'cache_misses': 0
}
print("✓ 工具类统计信息已重置")
except Exception as e:
self.log_message(f"统计信息重置失败: {e}", 'ERROR')
def set_log_config(self, config: Dict[str, Any]) -> bool:
"""
设置日志配置
Args:
config: 日志配置字典
Returns:
是否设置成功
"""
try:
self.log_config.update(config)
self.log_message(f"日志配置已更新: {self.log_config}", 'INFO')
return True
except Exception as e:
self.log_message(f"日志配置设置失败: {e}", 'ERROR')
return False
def set_conversion_config(self, config: Dict[str, Any]) -> bool:
"""
设置转换配置
Args:
config: 转换配置字典
Returns:
是否设置成功
"""
try:
self.conversion_config.update(config)
self.log_message(f"转换配置已更新: {self.conversion_config}", 'INFO')
return True
except Exception as e:
self.log_message(f"转换配置设置失败: {e}", 'ERROR')
return False
def get_cache_manager(self):
"""
获取缓存管理器
Returns:
缓存管理器实例
"""
return self.cache_manager
def get_debug_tools(self):
"""
获取调试工具
Returns:
调试工具实例
"""
return self.debug_tools
class MathUtils:
"""数学工具类"""
@staticmethod
def clamp(value: float, min_value: float, max_value: float) -> float:
"""限制值在指定范围内"""
return max(min_value, min(max_value, value))
@staticmethod
def lerp(start: float, end: float, t: float) -> float:
"""线性插值"""
return start + (end - start) * t
@staticmethod
def smoothstep(edge0: float, edge1: float, x: float) -> float:
"""平滑插值"""
t = MathUtils.clamp((x - edge0) / (edge1 - edge0), 0.0, 1.0)
return t * t * (3.0 - 2.0 * t)
class CacheManager:
"""缓存管理器"""
def __init__(self):
self.cache = {}
self.access_times = {}
self.max_size = 1000
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
if key in self.cache:
self.access_times[key] = time.time()
return self.cache[key]
return None
def set(self, key: str, value: Any, ttl: float = 300.0):
"""设置缓存值"""
self.cache[key] = value
self.access_times[key] = time.time()
# 检查缓存大小
if len(self.cache) > self.max_size:
self._cleanup()
def _cleanup(self):
"""清理过期缓存"""
current_time = time.time()
expired_keys = [
key for key, access_time in self.access_times.items()
if current_time - access_time > 300.0
]
for key in expired_keys:
del self.cache[key]
del self.access_times[key]
def clear(self):
"""清空缓存"""
self.cache.clear()
self.access_times.clear()
class DebugTools:
"""调试工具类"""
def __init__(self):
self.debug_enabled = False
self.debug_data = {}
def enable_debug(self):
"""启用调试模式"""
self.debug_enabled = True
def disable_debug(self):
"""禁用调试模式"""
self.debug_enabled = False
def log_debug_data(self, key: str, data: Any):
"""记录调试数据"""
if self.debug_enabled:
self.debug_data[key] = data
def get_debug_data(self, key: str = None) -> Union[Dict[str, Any], Any]:
"""获取调试数据"""
if key:
return self.debug_data.get(key)
return self.debug_data.copy()
def clear_debug_data(self):
"""清空调试数据"""
self.debug_data.clear()