EG/plugins/user/haptic_feedback_system/audio/audio_haptics.py
2025-12-12 16:16:15 +08:00

703 lines
24 KiB
Python

"""
音频触觉反馈模块
负责将音频特征转换为触觉效果
"""
import time
from typing import Dict, Any, List, Optional
import math
class AudioHaptics:
"""
音频触觉反馈
负责将音频特征转换为触觉效果
"""
def __init__(self, plugin):
"""
初始化音频触觉反馈
Args:
plugin: 触觉反馈系统插件实例
"""
self.plugin = plugin
self.enabled = False
self.initialized = False
# 音频分析配置
self.audio_config = {
'enable_audio_haptics': True,
'analysis_rate': 30.0, # Hz
'frequency_bands': 8,
'min_frequency': 20.0,
'max_frequency': 20000.0,
'intensity_scale': 1.0,
'frequency_scale': 1.0,
'smoothing_factor': 0.3
}
# 频带映射配置
self.band_mapping = {
'sub_bass': {'range': (20, 60), 'haptic_freq': 20.0, 'intensity': 1.0},
'bass': {'range': (60, 250), 'haptic_freq': 30.0, 'intensity': 0.9},
'low_mids': {'range': (250, 500), 'haptic_freq': 40.0, 'intensity': 0.7},
'mids': {'range': (500, 2000), 'haptic_freq': 50.0, 'intensity': 0.5},
'high_mids': {'range': (2000, 4000), 'haptic_freq': 60.0, 'intensity': 0.3},
'presence': {'range': (4000, 6000), 'haptic_freq': 70.0, 'intensity': 0.2},
'brilliance': {'range': (6000, 12000), 'haptic_freq': 80.0, 'intensity': 0.1},
'air': {'range': (12000, 20000), 'haptic_freq': 90.0, 'intensity': 0.05}
}
# 节拍检测配置
self.beat_config = {
'enable_beat_detection': True,
'min_bpm': 60,
'max_bpm': 180,
'sensitivity': 0.7,
'history_size': 30
}
# 音频特征配置
self.feature_config = {
'enable_volume_haptics': True,
'enable_spectral_haptics': True,
'enable_temporal_haptics': True,
'enable_rhythm_haptics': True
}
# 状态跟踪
self.audio_state = {
'last_analysis_time': 0.0,
'current_features': {},
'beat_history': [],
'intensity_history': []
}
# 设备映射
self.device_mapping = {
'low_frequencies': 'body', # 低频到身体设备
'mid_frequencies': 'hands', # 中频到手部设备
'high_frequencies': 'fingers' # 高频到手指设备
}
# 统计信息
self.stats = {
'audio_frames_analyzed': 0,
'haptic_effects_generated': 0,
'beats_detected': 0,
'spectral_events': 0
}
# 回调函数
self.audio_callbacks = {
'audio_analyzed': [],
'beat_detected': [],
'feature_extracted': [],
'haptic_effect_generated': []
}
print("✓ 音频触觉反馈已创建")
def initialize(self) -> bool:
"""
初始化音频触觉反馈
Returns:
是否初始化成功
"""
try:
self.initialized = True
print("✓ 音频触觉反馈初始化完成")
return True
except Exception as e:
print(f"✗ 音频触觉反馈初始化失败: {e}")
import traceback
traceback.print_exc()
return False
def enable(self) -> bool:
"""
启用音频触觉反馈
Returns:
是否启用成功
"""
try:
if not self.initialized:
print("✗ 音频触觉反馈未初始化")
return False
self.enabled = True
print("✓ 音频触觉反馈已启用")
return True
except Exception as e:
print(f"✗ 音频触觉反馈启用失败: {e}")
import traceback
traceback.print_exc()
return False
def disable(self):
"""禁用音频触觉反馈"""
try:
self.enabled = False
print("✓ 音频触觉反馈已禁用")
except Exception as e:
print(f"✗ 音频触觉反馈禁用失败: {e}")
import traceback
traceback.print_exc()
def finalize(self):
"""清理音频触觉反馈资源"""
try:
self.disable()
self.audio_state.clear()
self.audio_callbacks.clear()
self.initialized = False
print("✓ 音频触觉反馈资源已清理")
except Exception as e:
print(f"✗ 音频触觉反馈资源清理失败: {e}")
import traceback
traceback.print_exc()
def update(self, dt: float):
"""
更新音频触觉反馈状态
Args:
dt: 时间增量
"""
try:
if not self.enabled:
return
# 检查是否需要进行音频分析
current_time = time.time()
analysis_interval = 1.0 / self.audio_config['analysis_rate']
if current_time - self.audio_state['last_analysis_time'] >= analysis_interval:
# 模拟音频分析
self._analyze_audio_frame()
self.audio_state['last_analysis_time'] = current_time
except Exception as e:
print(f"✗ 音频触觉反馈更新失败: {e}")
import traceback
traceback.print_exc()
def _analyze_audio_frame(self):
"""分析音频帧并生成触觉效果"""
try:
if not self.enabled or not self.audio_config['enable_audio_haptics']:
return
# 模拟音频特征提取
audio_features = self._extract_audio_features()
# 更新当前特征
self.audio_state['current_features'] = audio_features
# 触发音频分析回调
self._trigger_audio_callback('audio_analyzed', {
'features': audio_features,
'timestamp': time.time()
})
# 处理音频特征并生成触觉效果
self._process_audio_features(audio_features)
# 检测节拍
if self.beat_config['enable_beat_detection']:
self._detect_beat(audio_features)
self.stats['audio_frames_analyzed'] += 1
except Exception as e:
print(f"✗ 音频帧分析失败: {e}")
def _extract_audio_features(self) -> Dict[str, Any]:
"""
提取音频特征
Returns:
音频特征字典
"""
try:
# 模拟音频特征提取
features = {
'timestamp': time.time(),
'rms': 0.5 + 0.5 * math.sin(time.time()), # 模拟音量
'spectral_centroid': 5000.0, # 模拟频谱质心
'spectral_rolloff': 10000.0, # 模拟频谱滚降点
'zero_crossing_rate': 0.1, # 模拟过零率
'temporal_flux': 0.3, # 模拟时间通量
'bpm': 120.0 # 模拟节拍
}
# 生成频带能量(模拟)
band_energies = []
for i in range(self.audio_config['frequency_bands']):
energy = 0.3 + 0.7 * math.sin(time.time() + i * 0.5)
band_energies.append(max(0.0, min(1.0, energy)))
features['band_energies'] = band_energies
# 触发特征提取回调
self._trigger_audio_callback('feature_extracted', {
'features': features
})
return features
except Exception as e:
print(f"✗ 音频特征提取失败: {e}")
return {}
def _process_audio_features(self, features: Dict[str, Any]):
"""
处理音频特征并生成触觉效果
Args:
features: 音频特征字典
"""
try:
if not features:
return
# 处理频带能量
if self.feature_config['enable_spectral_haptics']:
self._process_spectral_features(features)
# 处理音量
if self.feature_config['enable_volume_haptics']:
self._process_volume_features(features)
# 处理时间特征
if self.feature_config['enable_temporal_haptics']:
self._process_temporal_features(features)
except Exception as e:
print(f"✗ 音频特征处理失败: {e}")
def _process_spectral_features(self, features: Dict[str, Any]):
"""
处理频谱特征并生成触觉效果
Args:
features: 音频特征字典
"""
try:
band_energies = features.get('band_energies', [])
for i, energy in enumerate(band_energies):
if energy > 0.1: # 能量阈值
# 获取频带信息
band_names = list(self.band_mapping.keys())
if i < len(band_names):
band_name = band_names[i]
band_info = self.band_mapping[band_name]
# 计算触觉效果参数
intensity = energy * band_info['intensity'] * self.audio_config['intensity_scale']
frequency = band_info['haptic_freq'] * self.audio_config['frequency_scale']
duration = 0.05 + energy * 0.1 # 基于能量的持续时间
# 确定目标设备
device_type = self._map_frequency_to_device(band_info['haptic_freq'])
device_id = self._get_device_id_by_type(device_type)
# 生成触觉效果
self._generate_haptic_effect(
effect_type='periodic',
device_id=device_id,
intensity=intensity,
frequency=frequency,
duration=duration,
parameters={
'source': 'spectral',
'band': band_name,
'energy': energy
}
)
self.stats['spectral_events'] += 1
except Exception as e:
print(f"✗ 频谱特征处理失败: {e}")
def _process_volume_features(self, features: Dict[str, Any]):
"""
处理音量特征并生成触觉效果
Args:
features: 音频特征字典
"""
try:
rms = features.get('rms', 0.0)
if rms > 0.05: # 音量阈值
# 计算触觉效果参数
intensity = rms * self.audio_config['intensity_scale']
frequency = 20.0 + rms * 30.0 # 基于音量的频率
duration = 0.02 # 短持续时间以保持响应性
# 生成触觉效果
self._generate_haptic_effect(
effect_type='periodic',
device_id=None, # 所有设备
intensity=intensity,
frequency=frequency,
duration=duration,
parameters={
'source': 'volume',
'rms': rms
}
)
except Exception as e:
print(f"✗ 音量特征处理失败: {e}")
def _process_temporal_features(self, features: Dict[str, Any]):
"""
处理时间特征并生成触觉效果
Args:
features: 音频特征字典
"""
try:
temporal_flux = features.get('temporal_flux', 0.0)
if temporal_flux > 0.2: # 变化阈值
# 计算触觉效果参数
intensity = temporal_flux * self.audio_config['intensity_scale']
frequency = 40.0 + temporal_flux * 40.0
duration = 0.03 + temporal_flux * 0.07
# 生成触觉效果
self._generate_haptic_effect(
effect_type='noise',
device_id=None,
intensity=intensity,
frequency=frequency,
duration=duration,
parameters={
'source': 'temporal',
'flux': temporal_flux
}
)
except Exception as e:
print(f"✗ 时间特征处理失败: {e}")
def _detect_beat(self, features: Dict[str, Any]):
"""
检测节拍并生成触觉效果
Args:
features: 音频特征字典
"""
try:
if not self.beat_config['enable_beat_detection']:
return
# 添加到节拍历史
timestamp = features.get('timestamp', time.time())
rms = features.get('rms', 0.0)
self.audio_state['beat_history'].append({
'timestamp': timestamp,
'energy': rms
})
# 保持历史记录大小
if len(self.audio_state['beat_history']) > self.beat_config['history_size']:
self.audio_state['beat_history'].pop(0)
# 检测节拍(简化实现)
if len(self.audio_state['beat_history']) > 2:
current_energy = self.audio_state['beat_history'][-1]['energy']
previous_energy = self.audio_state['beat_history'][-2]['energy']
# 简单的能量峰值检测
if current_energy > previous_energy * (1.0 + self.beat_config['sensitivity']):
# 生成节拍触觉效果
self._generate_haptic_effect(
effect_type='impulse',
device_id=None,
intensity=current_energy * self.audio_config['intensity_scale'],
frequency=60.0,
duration=0.05,
parameters={
'source': 'beat',
'energy': current_energy
}
)
# 触发节拍检测回调
self._trigger_audio_callback('beat_detected', {
'timestamp': timestamp,
'energy': current_energy
})
self.stats['beats_detected'] += 1
except Exception as e:
print(f"✗ 节拍检测失败: {e}")
def _map_frequency_to_device(self, frequency: float) -> str:
"""
将频率映射到设备类型
Args:
frequency: 频率值
Returns:
设备类型
"""
try:
if frequency < 40.0:
return 'body'
elif frequency < 80.0:
return 'hands'
else:
return 'fingers'
except Exception as e:
print(f"✗ 频率到设备映射失败: {e}")
return 'hands'
def _get_device_id_by_type(self, device_type: str) -> Optional[int]:
"""
根据设备类型获取设备ID
Args:
device_type: 设备类型
Returns:
设备ID或None
"""
try:
if not self.plugin.device_manager:
return None
# 查找匹配类型的设备
devices = self.plugin.device_manager.get_devices_by_type(device_type)
if devices:
return devices[0]['id'] # 返回第一个匹配的设备
return None
except Exception as e:
print(f"✗ 设备ID获取失败: {e}")
return None
def _generate_haptic_effect(self, effect_type: str, device_id: Optional[int],
intensity: float, frequency: float, duration: float,
parameters: Dict[str, Any]) -> bool:
"""
生成触觉效果
Args:
effect_type: 效果类型
device_id: 设备ID
intensity: 强度
frequency: 频率
duration: 持续时间
parameters: 参数
Returns:
是否生成成功
"""
try:
if not self.plugin.haptic_manager:
return False
# 限制参数范围
intensity = max(0.0, min(1.0, intensity))
frequency = max(1.0, min(100.0, frequency))
duration = max(0.01, min(2.0, duration))
# 生成触觉效果
effect_id = self.plugin.haptic_manager.play_effect(
effect_type=effect_type,
device_id=device_id,
intensity=intensity,
duration=duration,
frequency=frequency,
parameters=parameters
)
if effect_id >= 0:
# 触发效果生成回调
self._trigger_audio_callback('haptic_effect_generated', {
'effect_id': effect_id,
'effect_type': effect_type,
'intensity': intensity,
'frequency': frequency,
'duration': duration,
'parameters': parameters
})
self.stats['haptic_effects_generated'] += 1
return True
return False
except Exception as e:
print(f"✗ 触觉效果生成失败: {e}")
return False
def _trigger_audio_callback(self, callback_type: str, data: Dict[str, Any]):
"""
触发音频回调
Args:
callback_type: 回调类型
data: 回调数据
"""
try:
if callback_type in self.audio_callbacks:
for callback in self.audio_callbacks[callback_type]:
try:
callback(data)
except Exception as e:
print(f"✗ 音频回调执行失败: {e}")
except Exception as e:
print(f"✗ 音频回调触发失败: {e}")
def register_audio_callback(self, callback_type: str, callback: callable):
"""
注册音频回调
Args:
callback_type: 回调类型
callback: 回调函数
"""
try:
if callback_type in self.audio_callbacks:
self.audio_callbacks[callback_type].append(callback)
print(f"✓ 音频回调已注册: {callback_type}")
else:
print(f"✗ 无效的回调类型: {callback_type}")
except Exception as e:
print(f"✗ 音频回调注册失败: {e}")
def unregister_audio_callback(self, callback_type: str, callback: callable):
"""
注销音频回调
Args:
callback_type: 回调类型
callback: 回调函数
"""
try:
if callback_type in self.audio_callbacks:
if callback in self.audio_callbacks[callback_type]:
self.audio_callbacks[callback_type].remove(callback)
print(f"✓ 音频回调已注销: {callback_type}")
except Exception as e:
print(f"✗ 音频回调注销失败: {e}")
def set_audio_config(self, config: Dict[str, Any]) -> bool:
"""
设置音频配置
Args:
config: 配置字典
Returns:
是否设置成功
"""
try:
self.audio_config.update(config)
print(f"✓ 音频配置已更新: {self.audio_config}")
return True
except Exception as e:
print(f"✗ 音频配置设置失败: {e}")
return False
def set_band_mapping(self, mapping: Dict[str, Dict[str, Any]]) -> bool:
"""
设置频带映射
Args:
mapping: 映射字典
Returns:
是否设置成功
"""
try:
self.band_mapping.update(mapping)
print(f"✓ 频带映射已更新")
return True
except Exception as e:
print(f"✗ 频带映射设置失败: {e}")
return False
def set_beat_config(self, config: Dict[str, Any]) -> bool:
"""
设置节拍检测配置
Args:
config: 配置字典
Returns:
是否设置成功
"""
try:
self.beat_config.update(config)
print(f"✓ 节拍检测配置已更新: {self.beat_config}")
return True
except Exception as e:
print(f"✗ 节拍检测配置设置失败: {e}")
return False
def get_audio_config(self) -> Dict[str, Any]:
"""
获取音频配置
Returns:
配置字典
"""
return self.audio_config.copy()
def get_band_mapping(self) -> Dict[str, Dict[str, Any]]:
"""
获取频带映射
Returns:
映射字典
"""
return self.band_mapping.copy()
def get_beat_config(self) -> Dict[str, Any]:
"""
获取节拍检测配置
Returns:
配置字典
"""
return self.beat_config.copy()
def get_stats(self) -> Dict[str, int]:
"""
获取统计信息
Returns:
统计信息字典
"""
return self.stats.copy()
def reset_stats(self):
"""重置统计信息"""
try:
self.stats = {
'audio_frames_analyzed': 0,
'haptic_effects_generated': 0,
'beats_detected': 0,
'spectral_events': 0
}
print("✓ 音频触觉反馈统计信息已重置")
except Exception as e:
print(f"✗ 音频触觉反馈统计信息重置失败: {e}")