EG/plugins/user/dynamic_music_system/effects/effect_processor.py
2025-12-12 16:16:15 +08:00

941 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
效果处理器
负责为动态音乐添加各种音频效果
"""
import uuid
import math
import time
import numpy as np
from typing import Dict, List, Any, Optional
class EffectProcessor:
"""
效果处理器
负责为动态音乐添加各种音频效果
"""
def __init__(self, plugin):
"""
初始化效果处理器
Args:
plugin: 动态音乐系统插件实例
"""
self.plugin = plugin
self.effects: Dict[str, Dict[str, Any]] = {}
self.active_effects: Dict[str, Dict[str, Any]] = {}
self.effect_chains: Dict[str, List[str]] = {}
self.stats = {
'effects_created': 0,
'effects_active': 0,
'effects_processed': 0,
'processing_time': 0.0,
'memory_usage': 0
}
self.sample_rate = 44100
self.buffer_size = plugin.buffer_size if plugin else 4096
# 效果参数范围
self.effect_ranges = {
'reverb': {'room_size': (0.0, 1.0), 'damping': (0.0, 1.0), 'wet_level': (0.0, 1.0)},
'chorus': {'rate': (0.1, 5.0), 'depth': (0.0, 1.0), 'mix': (0.0, 1.0)},
'delay': {'time': (0.01, 2.0), 'feedback': (0.0, 1.0), 'mix': (0.0, 1.0)},
'filter': {'frequency': (20.0, 20000.0), 'q': (0.1, 10.0), 'gain': (-24.0, 24.0)},
'distortion': {'drive': (0.0, 1.0), 'tone': (0.0, 1.0), 'mix': (0.0, 1.0)},
'phaser': {'rate': (0.1, 10.0), 'depth': (0.0, 1.0), 'feedback': (-0.9, 0.9)},
'flanger': {'rate': (0.1, 10.0), 'depth': (0.0, 0.005), 'feedback': (-0.9, 0.9)},
'compressor': {'threshold': (-60.0, 0.0), 'ratio': (1.0, 20.0), 'attack': (0.001, 0.1)},
'eq': {'frequency': (20.0, 20000.0), 'gain': (-24.0, 24.0), 'q': (0.1, 10.0)}
}
# 初始化效果处理器
self._initialize_effect_processor()
def _initialize_effect_processor(self):
"""初始化效果处理器"""
print("✓ 效果处理器初始化完成")
def create_effect(self, effect_name: str, effect_type: str, effect_params: Dict[str, Any]) -> bool:
"""
创建音频效果
Args:
effect_name: 效果名称
effect_type: 效果类型
effect_params: 效果参数
Returns:
是否创建成功
"""
try:
# 验证效果类型
valid_effects = [
'reverb', 'chorus', 'delay', 'filter', 'distortion',
'phaser', 'flanger', 'compressor', 'eq', 'limiter',
'tremolo', 'vibrato', 'pitch_shift', 'stereo_widen'
]
if effect_type not in valid_effects:
print(f"✗ 无效的效果类型: {effect_type}")
return False
# 设置默认参数
default_params = self._get_default_effect_params(effect_type)
# 合并参数
params = default_params.copy()
params.update(effect_params)
# 验证参数范围
if not self._validate_effect_params(effect_type, params):
print(f"✗ 效果参数超出有效范围: {effect_type}")
return False
# 创建效果
effect = {
'name': effect_name,
'type': effect_type,
'params': params,
'state': 'created',
'bypass': False,
'wet_dry_mix': params.get('mix', 1.0),
'created_time': time.time()
}
self.effects[effect_name] = effect
self.stats['effects_created'] += 1
print(f"✓ 音频效果创建成功: {effect_name} ({effect_type})")
return True
except Exception as e:
print(f"✗ 创建音频效果失败: {e}")
return False
def _get_default_effect_params(self, effect_type: str) -> Dict[str, Any]:
"""
获取默认效果参数
Args:
effect_type: 效果类型
Returns:
默认参数字典
"""
defaults = {
'reverb': {
'room_size': 0.5,
'damping': 0.5,
'wet_level': 0.3,
'dry_level': 0.7,
'width': 1.0,
'freeze_mode': 0.0
},
'chorus': {
'rate': 1.0,
'depth': 0.5,
'feedback': 0.3,
'mix': 0.5,
'delay': 0.01
},
'delay': {
'time': 0.3,
'feedback': 0.4,
'mix': 0.5,
'sync': False,
'sync_note': '1/4'
},
'filter': {
'type': 'lowpass',
'frequency': 1000.0,
'q': 1.0,
'gain': 0.0
},
'distortion': {
'drive': 0.5,
'tone': 0.5,
'mix': 0.5
},
'phaser': {
'rate': 1.0,
'depth': 0.5,
'feedback': 0.3,
'mix': 0.5
},
'flanger': {
'rate': 1.0,
'depth': 0.002,
'feedback': 0.3,
'mix': 0.5
},
'compressor': {
'threshold': -20.0,
'ratio': 4.0,
'attack': 0.01,
'release': 0.1,
'gain': 0.0,
'knee': 5.0
},
'eq': {
'type': 'peaking',
'frequency': 1000.0,
'gain': 0.0,
'q': 1.0
},
'limiter': {
'threshold': -0.1,
'release': 0.05,
'mix': 1.0
},
'tremolo': {
'rate': 5.0,
'depth': 0.5,
'mix': 1.0,
'waveform': 'sine'
},
'vibrato': {
'rate': 5.0,
'depth': 0.5,
'mix': 1.0,
'delay': 0.01
},
'pitch_shift': {
'semitones': 0.0,
'mix': 1.0,
'formant_preserve': True
},
'stereo_widen': {
'width': 1.0,
'mix': 1.0
}
}
return defaults.get(effect_type, {})
def _validate_effect_params(self, effect_type: str, params: Dict[str, Any]) -> bool:
"""
验证效果参数范围
Args:
effect_type: 效果类型
params: 参数字典
Returns:
是否验证通过
"""
if effect_type not in self.effect_ranges:
return True # 没有定义范围的效果默认通过验证
ranges = self.effect_ranges[effect_type]
for param_name, (min_val, max_val) in ranges.items():
if param_name in params:
param_value = params[param_name]
if not (min_val <= param_value <= max_val):
print(f"✗ 参数 {param_name} 超出范围 [{min_val}, {max_val}]: {param_value}")
return False
return True
def activate_effect(self, effect_name: str) -> bool:
"""
激活音频效果
Args:
effect_name: 效果名称
Returns:
是否激活成功
"""
try:
if effect_name not in self.effects:
print(f"✗ 音频效果不存在: {effect_name}")
return False
self.active_effects[effect_name] = self.effects[effect_name]
self.stats['effects_active'] = len(self.active_effects)
print(f"✓ 音频效果已激活: {effect_name}")
return True
except Exception as e:
print(f"✗ 激活音频效果失败: {e}")
return False
def deactivate_effect(self, effect_name: str) -> bool:
"""
停用音频效果
Args:
effect_name: 效果名称
Returns:
是否停用成功
"""
try:
if effect_name not in self.active_effects:
print(f"⚠ 音频效果未激活: {effect_name}")
return True
del self.active_effects[effect_name]
self.stats['effects_active'] = len(self.active_effects)
print(f"✓ 音频效果已停用: {effect_name}")
return True
except Exception as e:
print(f"✗ 停用音频效果失败: {e}")
return False
def apply_effects(self, audio_data: Any, chain_name: str = 'default') -> Any:
"""
应用效果链到音频数据
Args:
audio_data: 输入音频数据
chain_name: 效果链名称
Returns:
处理后的音频数据
"""
try:
process_start_time = time.time()
# 获取效果链
effect_chain = self.effect_chains.get(chain_name, [])
if not effect_chain:
# 如果没有指定效果链,使用所有激活的效果
effect_chain = list(self.active_effects.keys())
# 依次应用效果
processed_data = audio_data
effects_applied = 0
for effect_name in effect_chain:
if effect_name in self.active_effects:
effect = self.active_effects[effect_name]
if not effect.get('bypass', False):
processed_data = self._apply_single_effect(processed_data, effect)
effects_applied += 1
# 更新统计信息
self.stats['effects_processed'] += effects_applied
self.stats['processing_time'] += (time.time() - process_start_time)
return processed_data
except Exception as e:
print(f"✗ 应用效果链失败: {e}")
return audio_data
def _apply_single_effect(self, audio_data: Any, effect: Dict[str, Any]) -> Any:
"""
应用单个效果到音频数据
Args:
audio_data: 输入音频数据
effect: 效果数据
Returns:
处理后的音频数据
"""
try:
effect_type = effect['type']
params = effect['params']
wet_dry_mix = effect.get('wet_dry_mix', 1.0)
# 根据效果类型应用处理
if effect_type == 'reverb':
processed_data = self._apply_reverb(audio_data, params)
elif effect_type == 'chorus':
processed_data = self._apply_chorus(audio_data, params)
elif effect_type == 'delay':
processed_data = self._apply_delay(audio_data, params)
elif effect_type == 'filter':
processed_data = self._apply_filter(audio_data, params)
elif effect_type == 'distortion':
processed_data = self._apply_distortion(audio_data, params)
elif effect_type == 'phaser':
processed_data = self._apply_phaser(audio_data, params)
elif effect_type == 'flanger':
processed_data = self._apply_flanger(audio_data, params)
elif effect_type == 'compressor':
processed_data = self._apply_compressor(audio_data, params)
elif effect_type == 'eq':
processed_data = self._apply_eq(audio_data, params)
else:
# 未知效果类型,不处理
processed_data = audio_data
# 应用湿/干混合
if wet_dry_mix < 1.0:
processed_data = audio_data * (1.0 - wet_dry_mix) + processed_data * wet_dry_mix
return processed_data
except Exception as e:
print(f"✗ 应用单个效果失败: {e}")
return audio_data
def _apply_reverb(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用混响效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
# 简化的混响实现
# 实际应用中会使用更复杂的算法
room_size = params.get('room_size', 0.5)
damping = params.get('damping', 0.5)
wet_level = params.get('wet_level', 0.3)
# 模拟混响效果
reverb_amount = wet_level * room_size
damp_factor = 1.0 - damping * 0.5
if isinstance(audio_data, np.ndarray):
# 对NumPy数组应用简化效果
processed = audio_data.copy()
# 添加延迟和衰减效果
for i in range(1, min(100, len(processed))):
processed[i:] += processed[:-i] * reverb_amount * damp_factor * (0.95 ** i)
return np.clip(processed, -1.0, 1.0)
else:
return audio_data
def _apply_chorus(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用合唱效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
# 简化的合唱实现
rate = params.get('rate', 1.0)
depth = params.get('depth', 0.5)
if isinstance(audio_data, np.ndarray):
processed = audio_data.copy()
# 添加基于正弦波的调制
t = np.arange(len(processed)) / self.sample_rate
modulation = np.sin(2 * np.pi * rate * t) * depth
# 简化的延迟效果
delay_samples = (modulation * self.sample_rate * 0.01).astype(int)
for i in range(len(processed)):
delay_idx = max(0, i - abs(delay_samples[i]))
processed[i] += processed[delay_idx] * 0.3
return np.clip(processed, -1.0, 1.0)
else:
return audio_data
def _apply_delay(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用延迟效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
delay_time = params.get('time', 0.3)
feedback = params.get('feedback', 0.4)
if isinstance(audio_data, np.ndarray):
processed = audio_data.copy()
delay_samples = int(delay_time * self.sample_rate)
if delay_samples > 0:
for i in range(delay_samples, len(processed)):
processed[i] += processed[i - delay_samples] * feedback
return np.clip(processed, -1.0, 1.0)
else:
return audio_data
def _apply_filter(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用滤波器效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
filter_type = params.get('type', 'lowpass')
frequency = params.get('frequency', 1000.0)
if isinstance(audio_data, np.ndarray):
processed = audio_data.copy()
# 简化的滤波器实现
if filter_type == 'lowpass':
# 简单的低通滤波
rc = 1.0 / (2 * np.pi * frequency)
dt = 1.0 / self.sample_rate
alpha = dt / (rc + dt)
for i in range(1, len(processed)):
processed[i] = alpha * processed[i] + (1 - alpha) * processed[i-1]
elif filter_type == 'highpass':
# 简单的高通滤波
rc = 1.0 / (2 * np.pi * frequency)
dt = 1.0 / self.sample_rate
alpha = rc / (rc + dt)
for i in range(1, len(processed)):
processed[i] = alpha * (processed[i-1] + processed[i] - processed[i-1])
return processed
else:
return audio_data
def _apply_distortion(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用失真效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
drive = params.get('drive', 0.5)
if isinstance(audio_data, np.ndarray):
processed = audio_data.copy()
# 应用饱和失真
drive_factor = 1.0 + drive * 10.0
processed = np.tanh(processed * drive_factor)
return processed
else:
return audio_data
def _apply_phaser(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用移相器效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
rate = params.get('rate', 1.0)
depth = params.get('depth', 0.5)
if isinstance(audio_data, np.ndarray):
processed = audio_data.copy()
# 简化的移相器实现
t = np.arange(len(processed)) / self.sample_rate
lfo = np.sin(2 * np.pi * rate * t) * depth
# 应用相位调制
processed += processed * lfo * 0.5
return np.clip(processed, -1.0, 1.0)
else:
return audio_data
def _apply_flanger(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用镶边效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
rate = params.get('rate', 1.0)
depth = params.get('depth', 0.002)
if isinstance(audio_data, np.ndarray):
processed = audio_data.copy()
# 简化的镶边实现
t = np.arange(len(processed)) / self.sample_rate
delay = np.sin(2 * np.pi * rate * t) * depth
delay_samples = (delay * self.sample_rate).astype(int)
for i in range(len(processed)):
delay_idx = max(0, i - abs(delay_samples[i]))
processed[i] += processed[delay_idx] * 0.7
return np.clip(processed, -1.0, 1.0)
else:
return audio_data
def _apply_compressor(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用压缩器效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
threshold = params.get('threshold', -20.0)
ratio = params.get('ratio', 4.0)
if isinstance(audio_data, np.ndarray):
processed = audio_data.copy()
# 简化的压缩器实现
threshold_linear = 10 ** (threshold / 20.0)
# 对超过阈值的信号进行压缩
abs_signal = np.abs(processed)
compressed = np.where(
abs_signal > threshold_linear,
np.sign(processed) * (threshold_linear + (abs_signal - threshold_linear) / ratio),
processed
)
return compressed
else:
return audio_data
def _apply_eq(self, audio_data: Any, params: Dict[str, Any]) -> Any:
"""
应用均衡器效果
Args:
audio_data: 输入音频数据
params: 效果参数
Returns:
处理后的音频数据
"""
frequency = params.get('frequency', 1000.0)
gain = params.get('gain', 0.0)
if isinstance(audio_data, np.ndarray):
processed = audio_data.copy()
# 简化的均衡器实现
if gain != 0.0:
gain_factor = 10 ** (gain / 20.0)
# 根据频率调整增益(简化实现)
if frequency < 500:
# 低频增强/衰减
processed *= 0.8 + 0.2 * gain_factor
elif frequency < 2000:
# 中频增强/衰减
processed *= gain_factor
else:
# 高频增强/衰减
processed *= 1.0 + 0.1 * (gain_factor - 1.0)
return processed
else:
return audio_data
def create_effect_chain(self, chain_name: str, effect_names: List[str]) -> bool:
"""
创建效果链
Args:
chain_name: 效果链名称
effect_names: 效果名称列表
Returns:
是否创建成功
"""
try:
# 验证效果是否存在
for effect_name in effect_names:
if effect_name not in self.effects:
print(f"✗ 效果不存在: {effect_name}")
return False
# 创建效果链
self.effect_chains[chain_name] = effect_names.copy()
print(f"✓ 效果链创建成功: {chain_name}")
return True
except Exception as e:
print(f"✗ 创建效果链失败: {e}")
return False
def update(self, dt: float):
"""
更新效果处理器状态
Args:
dt: 时间增量
"""
# 这里可以更新任何需要定期更新的状态
# 例如LFO相位更新、动态参数调整等
pass
def cleanup(self):
"""清理所有资源"""
self.effects.clear()
self.active_effects.clear()
self.effect_chains.clear()
self.stats = {
'effects_created': 0,
'effects_active': 0,
'effects_processed': 0,
'processing_time': 0.0,
'memory_usage': 0
}
print("✓ 效果处理器资源已清理")
def get_stats(self) -> Dict[str, int]:
"""
获取统计信息
Returns:
统计信息字典
"""
return self.stats.copy()
def get_active_effects(self) -> List[str]:
"""
获取激活的效果列表
Returns:
激活的效果名称列表
"""
return list(self.active_effects.keys())
def get_effect_status(self, effect_name: str) -> Dict[str, Any]:
"""
获取效果状态
Args:
effect_name: 效果名称
Returns:
效果状态字典
"""
if effect_name in self.active_effects:
effect = self.active_effects[effect_name]
return {
'name': effect_name,
'type': effect['type'],
'state': 'active',
'bypass': effect.get('bypass', False),
'wet_dry_mix': effect.get('wet_dry_mix', 1.0)
}
elif effect_name in self.effects:
effect = self.effects[effect_name]
return {
'name': effect_name,
'type': effect['type'],
'state': 'inactive',
'bypass': effect.get('bypass', False),
'wet_dry_mix': effect.get('wet_dry_mix', 1.0)
}
else:
return {
'name': effect_name,
'state': 'unknown'
}
def bypass_effect(self, effect_name: str, bypass: bool = True) -> bool:
"""
旁路/启用效果
Args:
effect_name: 效果名称
bypass: 是否旁路
Returns:
是否设置成功
"""
try:
if effect_name not in self.effects:
print(f"✗ 效果不存在: {effect_name}")
return False
self.effects[effect_name]['bypass'] = bypass
if effect_name in self.active_effects:
self.active_effects[effect_name]['bypass'] = bypass
state = "旁路" if bypass else "启用"
print(f"✓ 效果已{state}: {effect_name}")
return True
except Exception as e:
print(f"✗ 设置效果旁路状态失败: {e}")
return False
def set_effect_parameter(self, effect_name: str, parameter: str, value: Any) -> bool:
"""
设置效果参数
Args:
effect_name: 效果名称
parameter: 参数名称
value: 参数值
Returns:
是否设置成功
"""
try:
if effect_name not in self.effects:
print(f"✗ 效果不存在: {effect_name}")
return False
# 验证参数范围
effect = self.effects[effect_name]
effect_type = effect['type']
if effect_type in self.effect_ranges and parameter in self.effect_ranges[effect_type]:
min_val, max_val = self.effect_ranges[effect_type][parameter]
if not (min_val <= value <= max_val):
print(f"✗ 参数 {parameter} 超出范围 [{min_val}, {max_val}]: {value}")
return False
# 设置参数
self.effects[effect_name]['params'][parameter] = value
if effect_name in self.active_effects:
self.active_effects[effect_name]['params'][parameter] = value
print(f"✓ 效果参数已设置: {effect_name}.{parameter} = {value}")
return True
except Exception as e:
print(f"✗ 设置效果参数失败: {e}")
return False
def get_effect_parameters(self, effect_name: str) -> Dict[str, Any]:
"""
获取效果参数
Args:
effect_name: 效果名称
Returns:
参数字典
"""
if effect_name in self.effects:
return self.effects[effect_name]['params'].copy()
return {}
def set_wet_dry_mix(self, effect_name: str, mix: float) -> bool:
"""
设置湿/干混合比例
Args:
effect_name: 效果名称
mix: 混合比例 (0.0-1.0)
Returns:
是否设置成功
"""
mix = max(0.0, min(1.0, mix))
return self.set_effect_parameter(effect_name, 'mix', mix)
def set_sample_rate(self, sample_rate: int):
"""
设置采样率
Args:
sample_rate: 采样率 (Hz)
"""
self.sample_rate = sample_rate
print(f"✓ 效果处理器采样率设置为: {self.sample_rate} Hz")
def create_modulation_effect(self, effect_name: str, effect_type: str,
mod_rate: float, mod_depth: float) -> bool:
"""
创建调制效果
Args:
effect_name: 效果名称
effect_type: 效果类型 (chorus, flanger, phaser, tremolo, vibrato)
mod_rate: 调制速率 (Hz)
mod_depth: 调制深度 (0.0-1.0)
Returns:
是否创建成功
"""
try:
# 验证调制效果类型
modulation_effects = ['chorus', 'flanger', 'phaser', 'tremolo', 'vibrato']
if effect_type not in modulation_effects:
print(f"✗ 无效的调制效果类型: {effect_type}")
return False
# 创建效果参数
effect_params = {
'rate': mod_rate,
'depth': mod_depth,
'mix': 0.5
}
return self.create_effect(effect_name, effect_type, effect_params)
except Exception as e:
print(f"✗ 创建调制效果失败: {e}")
return False
def create_dynamic_effect(self, effect_name: str, effect_type: str,
intensity_callback: callable) -> bool:
"""
创建动态效果(参数随游戏状态变化)
Args:
effect_name: 效果名称
effect_type: 效果类型
intensity_callback: 强度回调函数
Returns:
是否创建成功
"""
try:
# 创建效果
if self.create_effect(effect_name, effect_type, {}):
# 添加动态参数支持
self.effects[effect_name]['dynamic'] = True
self.effects[effect_name]['intensity_callback'] = intensity_callback
return True
return False
except Exception as e:
print(f"✗ 创建动态效果失败: {e}")
return False
def update_dynamic_effects(self):
"""
更新动态效果参数
"""
try:
for effect_name, effect in self.active_effects.items():
if effect.get('dynamic', False) and 'intensity_callback' in effect:
try:
# 获取当前强度
intensity = effect['intensity_callback']()
# 根据强度调整效果参数
effect_type = effect['type']
params = effect['params']
if effect_type == 'reverb':
params['room_size'] = intensity
params['wet_level'] = intensity * 0.5
elif effect_type == 'chorus':
params['rate'] = 0.5 + intensity * 4.5
params['depth'] = intensity
elif effect_type == 'delay':
params['time'] = 0.1 + intensity * 0.9
params['feedback'] = intensity * 0.8
# 更新效果参数
self.active_effects[effect_name]['params'] = params
except Exception as e:
print(f"✗ 更新动态效果参数失败: {effect_name} - {e}")
except Exception as e:
print(f"✗ 更新动态效果失败: {e}")