""" 分析引擎 负责分析游戏状态和环境条件,为音乐系统提供决策支持 """ import uuid import math import time import numpy as np from typing import Dict, List, Any, Optional, Callable from collections import deque class AnalysisEngine: """ 分析引擎 负责分析游戏状态和环境条件,为音乐系统提供决策支持 """ def __init__(self, plugin): """ 初始化分析引擎 Args: plugin: 动态音乐系统插件实例 """ self.plugin = plugin self.analyzers: Dict[str, Dict[str, Any]] = {} self.analysis_results: Dict[str, Any] = {} self.history: Dict[str, deque] = {} self.stats = { 'analyzers_created': 0, 'analyses_performed': 0, 'analysis_time': 0.0, 'data_points_processed': 0, 'prediction_accuracy': 0.0 } self.buffer_size = plugin.buffer_size if plugin else 4096 self.analysis_interval = 1.0 # 默认分析间隔(秒) self.last_analysis_time = 0.0 self.last_update_time = 0.0 # 分析权重 self.weights = { 'combat': 1.0, 'exploration': 0.8, 'environment': 0.6, 'player_state': 0.7, 'narrative': 0.5 } # 历史数据长度 self.history_length = 100 # 预测参数 self.prediction_settings = { 'lookahead_time': 5.0, # 预测时间窗口 'confidence_threshold': 0.7, # 置信度阈值 'trend_sensitivity': 0.3, # 趋势敏感度 'prediction_smoothing': 0.1 # 预测平滑系数 } # 情绪分析参数 self.emotion_analysis = { 'valence_range': (-1.0, 1.0), # 情绪效价范围 'arousal_range': (0.0, 1.0), # 情绪唤醒度范围 'tension_range': (0.0, 1.0), # 紧张度范围 'complexity_range': (0.0, 1.0) # 复杂度范围 } # 初始化分析引擎 self._initialize_analysis_engine() def _initialize_analysis_engine(self): """初始化分析引擎""" # 初始化历史记录 history_keys = [ 'intensity', 'combat_level', 'exploration_level', 'environment_changes', 'player_emotion', 'narrative_pacing', 'music_intensity', 'player_health', 'player_stamina' ] for key in history_keys: self.history[key] = deque(maxlen=self.history_length) # 创建默认分析器 self._create_default_analyzers() print("✓ 分析引擎初始化完成") def _create_default_analyzers(self): """创建默认分析器""" # 战斗强度分析器 self.create_analyzer('combat_analyzer', { 'type': 'combat', 'callback': self._analyze_combat_state }) # 探索状态分析器 self.create_analyzer('exploration_analyzer', { 'type': 'exploration', 'callback': self._analyze_exploration_state }) # 环境条件分析器 self.create_analyzer('environment_analyzer', { 'type': 'environment', 'callback': self._analyze_environment_conditions }) # 玩家状态分析器 self.create_analyzer('player_analyzer', { 'type': 'player_state', 'callback': self._analyze_player_state }) # 叙事节奏分析器 self.create_analyzer('narrative_analyzer', { 'type': 'narrative', 'callback': self._analyze_narrative_pacing }) # 音乐反馈分析器 self.create_analyzer('music_feedback_analyzer', { 'type': 'music_feedback', 'callback': self._analyze_music_feedback }) def create_analyzer(self, analyzer_name: str, analyzer_params: Dict[str, Any]) -> bool: """ 创建分析器 Args: analyzer_name: 分析器名称 analyzer_params: 分析器参数 Returns: 是否创建成功 """ try: # 设置默认参数 default_params = { 'type': 'generic', 'enabled': True, 'interval': self.analysis_interval, 'weight': 1.0, 'history_size': 50, 'sensitivity': 1.0, 'smoothing': 0.2, 'thresholds': { 'low': 0.3, 'medium': 0.6, 'high': 0.8 } } # 合并参数 params = default_params.copy() params.update(analyzer_params) # 验证回调函数 if 'callback' not in params or not callable(params['callback']): print(f"✗ 分析器缺少有效的回调函数: {analyzer_name}") return False # 创建分析器 analyzer = { 'name': analyzer_name, 'params': params, 'state': 'created', 'last_analysis': 0.0, 'results': {}, 'history': deque(maxlen=params['history_size']), 'trend': 0.0, # 趋势值 'volatility': 0.0, # 波动性 'confidence': 1.0 # 置信度 } self.analyzers[analyzer_name] = analyzer self.stats['analyzers_created'] += 1 print(f"✓ 分析器创建成功: {analyzer_name}") return True except Exception as e: print(f"✗ 创建分析器失败: {e}") import traceback traceback.print_exc() return False def enable_analyzer(self, analyzer_name: str, enable: bool = True) -> bool: """ 启用/禁用分析器 Args: analyzer_name: 分析器名称 enable: 是否启用 Returns: 是否设置成功 """ try: if analyzer_name not in self.analyzers: print(f"✗ 分析器不存在: {analyzer_name}") return False self.analyzers[analyzer_name]['params']['enabled'] = enable state = "启用" if enable else "禁用" print(f"✓ 分析器已{state}: {analyzer_name}") return True except Exception as e: print(f"✗ 设置分析器状态失败: {e}") import traceback traceback.print_exc() return False def perform_analysis(self) -> Dict[str, Any]: """ 执行分析 Returns: 分析结果 """ try: analysis_start_time = time.time() current_time = time.time() # 收集所有分析器的结果 combined_results = {} total_weight = 0.0 weighted_intensity = 0.0 analyzer_contributions = {} for analyzer_name, analyzer in self.analyzers.items(): params = analyzer['params'] # 检查分析器是否启用 if not params.get('enabled', True): continue # 检查分析间隔 interval = params.get('interval', self.analysis_interval) if current_time - analyzer.get('last_analysis', 0.0) < interval: # 使用之前的分析结果 if analyzer['results']: result = analyzer['results'] else: continue else: # 执行新的分析 try: callback = params['callback'] result = callback() analyzer['results'] = result analyzer['last_analysis'] = current_time # 更新历史记录 analyzer['history'].append(result) # 计算趋势和波动性 self._update_analyzer_trends(analyzer) self.stats['analyses_performed'] += 1 except Exception as e: print(f"✗ 分析器 {analyzer_name} 执行失败: {e}") import traceback traceback.print_exc() continue # 合并结果 analyzer_type = params.get('type', 'generic') weight = params.get('weight', 1.0) * self.weights.get(analyzer_type, 1.0) # 应用灵敏度调整 sensitivity = params.get('sensitivity', 1.0) if isinstance(result, dict): adjusted_result = {} for key, value in result.items(): if isinstance(value, (int, float)): adjusted_result[key] = value * sensitivity else: adjusted_result[key] = value result = adjusted_result elif isinstance(result, (int, float)): result = result * sensitivity if isinstance(result, dict): for key, value in result.items(): if key not in combined_results: combined_results[key] = [] combined_results[key].append((value, weight)) else: # 处理单一值结果 if 'generic' not in combined_results: combined_results['generic'] = [] combined_results['generic'].append((result, weight)) # 计算加权强度 if 'intensity' in result: if isinstance(result['intensity'], (int, float)): weighted_intensity += result['intensity'] * weight total_weight += weight analyzer_contributions[analyzer_name] = result['intensity'] * weight # 更新分析器置信度 self._update_analyzer_confidence(analyzer, result) # 计算平均强度 if total_weight > 0: average_intensity = weighted_intensity / total_weight else: average_intensity = 0.0 # 应用平滑处理 smoothing = 0.1 if 'intensity' in self.analysis_results: previous_intensity = self.analysis_results['intensity'] average_intensity = previous_intensity * smoothing + average_intensity * (1 - smoothing) # 整合分析结果 final_results = { 'timestamp': current_time, 'intensity': average_intensity, 'raw_intensity': weighted_intensity / total_weight if total_weight > 0 else 0.0, 'details': combined_results, 'confidence': min(1.0, len(combined_results) / max(1, len(self.analyzers))), 'analyzer_contributions': analyzer_contributions, 'trends': self._get_overall_trends(), 'predictions': self._make_predictions(average_intensity) } # 更新历史记录 self.history['intensity'].append(average_intensity) if 'combat_level' in combined_results: combat_values = [r[0] for r in combined_results.get('combat_level', [])] if combat_values: self.history['combat_level'].append(np.mean(combat_values)) if 'exploration_level' in combined_results: exploration_values = [r[0] for r in combined_results.get('exploration_level', [])] if exploration_values: self.history['exploration_level'].append(np.mean(exploration_values)) # 存储结果 self.analysis_results = final_results # 更新性能统计 self.stats['analysis_time'] += (time.time() - analysis_start_time) self.stats['data_points_processed'] += len(combined_results) return final_results except Exception as e: print(f"✗ 执行分析失败: {e}") import traceback traceback.print_exc() return {} def _update_analyzer_trends(self, analyzer: Dict[str, Any]): """ 更新分析器趋势 Args: analyzer: 分析器数据 """ try: if len(analyzer['history']) < 2: return # 计算强度趋势 intensity_values = [] for result in analyzer['history']: if isinstance(result, dict) and 'intensity' in result: intensity_values.append(result['intensity']) if len(intensity_values) >= 2: # 线性回归计算趋势 x = np.arange(len(intensity_values)) y = np.array(intensity_values) if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0: slope, _ = np.polyfit(x, y, 1) analyzer['trend'] = slope # 计算波动性 analyzer['volatility'] = np.std(y) except Exception as e: print(f"✗ 更新分析器趋势失败: {e}") def _update_analyzer_confidence(self, analyzer: Dict[str, Any], result: Any): """ 更新分析器置信度 Args: analyzer: 分析器数据 result: 分析结果 """ try: # 基于历史一致性更新置信度 if len(analyzer['history']) >= 3: recent_values = [] for hist_result in list(analyzer['history'])[-3:]: if isinstance(hist_result, dict) and 'intensity' in hist_result: recent_values.append(hist_result['intensity']) if len(recent_values) >= 2: # 计算变化率 changes = [abs(recent_values[i] - recent_values[i-1]) for i in range(1, len(recent_values))] avg_change = np.mean(changes) # 基于稳定性调整置信度 stability = 1.0 / (1.0 + avg_change * 10) analyzer['confidence'] = 0.7 * analyzer['confidence'] + 0.3 * stability # 确保置信度在合理范围内 analyzer['confidence'] = max(0.1, min(1.0, analyzer['confidence'])) except Exception as e: print(f"✗ 更新分析器置信度失败: {e}") def _get_overall_trends(self) -> Dict[str, float]: """ 获取整体趋势 Returns: 趋势字典 """ try: trends = {} # 计算整体强度趋势 if len(self.history['intensity']) >= 5: recent_values = list(self.history['intensity'])[-5:] x = np.arange(len(recent_values)) y = np.array(recent_values) if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0: slope, _ = np.polyfit(x, y, 1) trends['intensity'] = slope # 计算战斗趋势 if len(self.history['combat_level']) >= 5: recent_values = list(self.history['combat_level'])[-5:] x = np.arange(len(recent_values)) y = np.array(recent_values) if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0: slope, _ = np.polyfit(x, y, 1) trends['combat'] = slope # 计算探索趋势 if len(self.history['exploration_level']) >= 5: recent_values = list(self.history['exploration_level'])[-5:] x = np.arange(len(recent_values)) y = np.array(recent_values) if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0: slope, _ = np.polyfit(x, y, 1) trends['exploration'] = slope return trends except Exception as e: print(f"✗ 获取整体趋势失败: {e}") return {} def _make_predictions(self, current_intensity: float) -> Dict[str, Any]: """ 做出预测 Args: current_intensity: 当前强度 Returns: 预测结果 """ try: predictions = {} lookahead_time = self.prediction_settings['lookahead_time'] # 基于趋势预测未来强度 overall_trend = self._get_overall_trends().get('intensity', 0.0) predicted_intensity = current_intensity + overall_trend * lookahead_time # 限制在合理范围内 predicted_intensity = max(0.0, min(1.0, predicted_intensity)) # 计算预测置信度 trend_magnitude = abs(overall_trend) confidence = min(1.0, trend_magnitude * 10) # 趋势越大,置信度越高 predictions['intensity'] = { 'value': predicted_intensity, 'confidence': confidence, 'time_horizon': lookahead_time } # 基于历史模式预测事件 predictions['events'] = self._predict_events() return predictions except Exception as e: print(f"✗ 做出预测失败: {e}") return {} def _predict_events(self) -> List[Dict[str, Any]]: """ 预测事件 Returns: 预测事件列表 """ try: events = [] # 检查战斗强度是否即将上升 if len(self.history['combat_level']) >= 10: recent_combat = list(self.history['combat_level'])[-10:] trend = np.polyfit(np.arange(len(recent_combat)), np.array(recent_combat), 1)[0] if trend > 0.1 and recent_combat[-1] > 0.3: # 上升趋势且当前强度较高 events.append({ 'type': 'combat_escalation', 'probability': min(1.0, trend * 5), 'time_to_event': max(1.0, 10 - recent_combat[-1] * 10) }) # 检查探索活动是否减少 if len(self.history['exploration_level']) >= 10: recent_exploration = list(self.history['exploration_level'])[-10:] trend = np.polyfit(np.arange(len(recent_exploration)), np.array(recent_exploration), 1)[0] if trend < -0.1 and recent_exploration[-1] < 0.5: # 下降趋势且当前强度较低 events.append({ 'type': 'exploration_decrease', 'probability': min(1.0, abs(trend) * 5), 'time_to_event': max(1.0, recent_exploration[-1] * 10) }) return events except Exception as e: print(f"✗ 预测事件失败: {e}") return [] def _analyze_combat_state(self) -> Dict[str, Any]: """ 分析战斗状态 Returns: 战斗状态分析结果 """ try: # 获取游戏状态 game_state = self.plugin.game_state if self.plugin else {} # 分析战斗相关指标 in_combat = game_state.get('in_combat', False) enemies_nearby = game_state.get('enemies_nearby', 0) player_health = game_state.get('player_health', 1.0) combat_intensity = game_state.get('combat_intensity', 0.0) player_damage = game_state.get('player_damage', 0.0) enemy_damage = game_state.get('enemy_damage', 0.0) combat_duration = game_state.get('combat_duration', 0.0) # 计算战斗强度 combat_level = 0.0 if in_combat: # 基于敌人数量和强度 enemy_factor = min(1.0, enemies_nearby / 10.0) # 基于玩家生命值 health_factor = 1.0 - player_health # 基于战斗强度 intensity_factor = combat_intensity # 基于伤害交换 damage_factor = min(1.0, (player_damage + enemy_damage) / 100.0) # 基于战斗持续时间 duration_factor = min(1.0, combat_duration / 60.0) combat_level = (enemy_factor + health_factor + intensity_factor + damage_factor + duration_factor) / 5.0 # 计算战斗紧张度 tension = 0.0 if in_combat: # 基于生命值差异 health_tension = 1.0 - player_health # 基于敌人数 enemy_tension = min(1.0, enemies_nearby / 5.0) # 综合紧张度 tension = (health_tension + enemy_tension) / 2.0 # 更新历史记录 self.history['combat_level'].append(combat_level) self.history['player_health'].append(player_health) return { 'intensity': combat_level, 'in_combat': in_combat, 'enemies_nearby': enemies_nearby, 'player_health': player_health, 'combat_intensity': combat_intensity, 'tension': tension, 'damage_exchange': player_damage + enemy_damage, 'duration': combat_duration } except Exception as e: print(f"✗ 分析战斗状态失败: {e}") import traceback traceback.print_exc() return { 'intensity': 0.0, 'in_combat': False, 'enemies_nearby': 0, 'player_health': 1.0, 'combat_intensity': 0.0, 'tension': 0.0, 'damage_exchange': 0.0, 'duration': 0.0 } def _analyze_exploration_state(self) -> Dict[str, Any]: """ 分析探索状态 Returns: 探索状态分析结果 """ try: # 获取游戏状态 game_state = self.plugin.game_state if self.plugin else {} # 分析探索相关指标 exploring = game_state.get('exploring', False) areas_discovered = game_state.get('areas_discovered', 0) exploration_progress = game_state.get('exploration_progress', 0.0) time_since_discovery = game_state.get('time_since_discovery', 0.0) movement_speed = game_state.get('movement_speed', 0.0) player_stealth = game_state.get('player_stealth', 0.0) secrets_found = game_state.get('secrets_found', 0) total_secrets = game_state.get('total_secrets', 1) # 计算探索强度 exploration_level = 0.0 if exploring: # 基于探索进度 progress_factor = exploration_progress # 基于发现区域数量 discovery_factor = min(1.0, areas_discovered / 20.0) # 基于发现时间(新发现时强度更高) time_factor = max(0.0, min(1.0, 1.0 - time_since_discovery / 60.0)) # 基于移动速度 speed_factor = min(1.0, movement_speed / 10.0) # 基于隐秘程度 stealth_factor = player_stealth # 基于秘密发现率 secrets_factor = min(1.0, secrets_found / max(1, total_secrets)) exploration_level = (progress_factor + discovery_factor + time_factor + speed_factor + stealth_factor + secrets_factor) / 6.0 # 计算探索好奇心 curiosity = 0.0 if exploring: # 基于未发现区域 undiscovered_factor = 1.0 - exploration_progress # 基于秘密发现率 secrets_undiscovered = max(0, total_secrets - secrets_found) secrets_factor = min(1.0, secrets_undiscovered / max(1, total_secrets)) # 综合好奇心 curiosity = (undiscovered_factor + secrets_factor) / 2.0 # 更新历史记录 self.history['exploration_level'].append(exploration_level) return { 'intensity': exploration_level, 'exploring': exploring, 'areas_discovered': areas_discovered, 'exploration_progress': exploration_progress, 'time_since_discovery': time_since_discovery, 'movement_speed': movement_speed, 'stealth': player_stealth, 'curiosity': curiosity, 'secrets_completion': secrets_found / max(1, total_secrets) } except Exception as e: print(f"✗ 分析探索状态失败: {e}") import traceback traceback.print_exc() return { 'intensity': 0.0, 'exploring': False, 'areas_discovered': 0, 'exploration_progress': 0.0, 'time_since_discovery': 0.0, 'movement_speed': 0.0, 'stealth': 0.0, 'curiosity': 0.0, 'secrets_completion': 0.0 } def _analyze_environment_conditions(self) -> Dict[str, Any]: """ 分析环境条件 Returns: 环境条件分析结果 """ try: # 获取环境条件 environment = self.plugin.environment_conditions if self.plugin else {} # 分析环境相关指标 weather = environment.get('weather', 'clear') time_of_day = environment.get('time_of_day', 'day') location_type = environment.get('location_type', 'outdoor') danger_level = environment.get('danger_level', 0.0) atmosphere = environment.get('atmosphere', 0.5) temperature = environment.get('temperature', 20.0) visibility = environment.get('visibility', 1.0) lighting = environment.get('lighting', 1.0) weather_intensity = environment.get('weather_intensity', 0.0) # 计算环境强度 environment_level = 0.0 # 天气影响 weather_factors = { 'clear': 0.2, 'cloudy': 0.3, 'rain': 0.5, 'storm': 0.8, 'fog': 0.6, 'snow': 0.4, 'windy': 0.4 } weather_factor = weather_factors.get(weather, 0.3) # 时间影响 time_factors = { 'day': 0.3, 'dusk': 0.5, 'night': 0.7, 'dawn': 0.4 } time_factor = time_factors.get(time_of_day, 0.3) # 地点类型影响 location_factors = { 'indoor': 0.4, 'outdoor': 0.6, 'underground': 0.8, 'underwater': 0.7, 'cave': 0.7, 'forest': 0.5, 'desert': 0.4 } location_factor = location_factors.get(location_type, 0.5) # 综合计算 environment_level = (weather_factor + time_factor + location_factor + danger_level + atmosphere + weather_intensity) / 6.0 # 计算环境紧张度 environmental_tension = (danger_level + (1.0 - visibility) + (1.0 - lighting)) / 3.0 # 计算环境氛围 environmental_mood = atmosphere # 更新历史记录 self.history['environment_changes'].append(environment_level) return { 'intensity': environment_level, 'weather': weather, 'time_of_day': time_of_day, 'location_type': location_type, 'danger_level': danger_level, 'atmosphere': atmosphere, 'temperature': temperature, 'visibility': visibility, 'lighting': lighting, 'weather_intensity': weather_intensity, 'tension': environmental_tension, 'mood': environmental_mood } except Exception as e: print(f"✗ 分析环境条件失败: {e}") import traceback traceback.print_exc() return { 'intensity': 0.3, 'weather': 'clear', 'time_of_day': 'day', 'location_type': 'outdoor', 'danger_level': 0.0, 'atmosphere': 0.5, 'temperature': 20.0, 'visibility': 1.0, 'lighting': 1.0, 'weather_intensity': 0.0, 'tension': 0.0, 'mood': 0.5 } def _analyze_player_state(self) -> Dict[str, Any]: """ 分析玩家状态 Returns: 玩家状态分析结果 """ try: # 获取玩家状态 player_state = self.plugin.player_state if self.plugin else {} # 分析玩家相关指标 health = player_state.get('health', 1.0) stamina = player_state.get('stamina', 1.0) experience = player_state.get('experience', 0) level = player_state.get('level', 1) emotional_state = player_state.get('emotional_state', 'neutral') skill_level = player_state.get('skill_level', 0.5) fatigue = player_state.get('fatigue', 0.0) hunger = player_state.get('hunger', 0.0) stress = player_state.get('stress', 0.0) focus = player_state.get('focus', 0.5) courage = player_state.get('courage', 0.5) items_collected = player_state.get('items_collected', 0) quests_completed = player_state.get('quests_completed', 0) # 计算玩家状态强度 # 基于健康和耐力 physical_factor = (health + stamina) / 2.0 # 基于经验和等级 progression_factor = min(1.0, (experience / 1000.0 + level / 50.0) / 2.0) # 基于技能水平 skill_factor = skill_level # 基于疲劳和饥饿 condition_factor = 1.0 - (fatigue + hunger) / 2.0 # 综合计算 player_level = (physical_factor + progression_factor + skill_factor + condition_factor) / 4.0 # 情感状态影响 emotion_factors = { 'excited': 0.8, 'happy': 0.6, 'neutral': 0.5, 'sad': 0.3, 'afraid': 0.7, 'angry': 0.9, 'calm': 0.4, 'curious': 0.6 } emotion_factor = emotion_factors.get(emotional_state, 0.5) # 计算玩家情绪效价(正负情绪) valence_map = { 'excited': 0.7, 'happy': 0.8, 'neutral': 0.0, 'sad': -0.6, 'afraid': -0.5, 'angry': -0.3, 'calm': 0.2, 'curious': 0.4 } valence = valence_map.get(emotional_state, 0.0) # 计算玩家唤醒度(兴奋程度) arousal_map = { 'excited': 0.9, 'happy': 0.6, 'neutral': 0.4, 'sad': 0.3, 'afraid': 0.8, 'angry': 0.7, 'calm': 0.2, 'curious': 0.5 } arousal = arousal_map.get(emotional_state, 0.4) # 计算玩家压力水平 stress_level = (stress + (1.0 - health) + fatigue) / 3.0 # 更新历史记录 self.history['player_emotion'].append(emotion_factor) self.history['player_health'].append(health) self.history['player_stamina'].append(stamina) return { 'intensity': player_level, 'health': health, 'stamina': stamina, 'experience': experience, 'level': level, 'emotional_state': emotional_state, 'skill_level': skill_level, 'fatigue': fatigue, 'hunger': hunger, 'stress': stress, 'focus': focus, 'courage': courage, 'items_collected': items_collected, 'quests_completed': quests_completed, 'valence': valence, # 情绪效价 'arousal': arousal, # 唤醒度 'stress_level': stress_level # 压力水平 } except Exception as e: print(f"✗ 分析玩家状态失败: {e}") import traceback traceback.print_exc() return { 'intensity': 0.5, 'health': 1.0, 'stamina': 1.0, 'experience': 0, 'level': 1, 'emotional_state': 'neutral', 'skill_level': 0.5, 'fatigue': 0.0, 'hunger': 0.0, 'stress': 0.0, 'focus': 0.5, 'courage': 0.5, 'items_collected': 0, 'quests_completed': 0, 'valence': 0.0, 'arousal': 0.4, 'stress_level': 0.0 } def _analyze_narrative_pacing(self) -> Dict[str, Any]: """ 分析叙事节奏 Returns: 叙事节奏分析结果 """ try: # 获取游戏状态 game_state = self.plugin.game_state if self.plugin else {} # 分析叙事相关指标 story_phase = game_state.get('story_phase', 'opening') quest_progress = game_state.get('quest_progress', 0.0) recent_events = game_state.get('recent_events', []) tension_level = game_state.get('tension_level', 0.0) player_choices = game_state.get('player_choices', 0) dialogue_intensity = game_state.get('dialogue_intensity', 0.0) cutscene_time = game_state.get('cutscene_time', 0.0) player_agency = game_state.get('player_agency', 0.5) narrative_complexity = game_state.get('narrative_complexity', 0.3) character_development = game_state.get('character_development', 0.0) plot_twists = game_state.get('plot_twists', 0) # 计算叙事强度 narrative_level = 0.0 # 故事阶段影响 phase_factors = { 'opening': 0.3, 'rising_action': 0.6, 'climax': 0.9, 'falling_action': 0.5, 'resolution': 0.2 } phase_factor = phase_factors.get(story_phase, 0.5) # 任务进度影响 progress_factor = quest_progress # 紧张程度影响 tension_factor = tension_level # 事件密度影响(最近事件数量) event_density = min(1.0, len(recent_events) / 10.0) # 对话强度影响 dialogue_factor = dialogue_intensity # 玩家选择影响 choice_factor = min(1.0, player_choices / 20.0) # 综合计算 narrative_level = (phase_factor + progress_factor + tension_factor + event_density + dialogue_factor + choice_factor) / 6.0 # 计算叙事复杂度 complexity = narrative_complexity # 计算角色发展强度 character_factor = character_development # 计算剧情转折强度 twist_factor = min(1.0, plot_twists / 5.0) # 更新历史记录 self.history['narrative_pacing'].append(narrative_level) return { 'intensity': narrative_level, 'story_phase': story_phase, 'quest_progress': quest_progress, 'recent_events_count': len(recent_events), 'tension_level': tension_level, 'player_choices': player_choices, 'dialogue_intensity': dialogue_intensity, 'cutscene_time': cutscene_time, 'player_agency': player_agency, 'complexity': complexity, 'character_development': character_development, 'plot_twists': plot_twists, 'event_density': event_density, 'choice_impact': choice_factor } except Exception as e: print(f"✗ 分析叙事节奏失败: {e}") import traceback traceback.print_exc() return { 'intensity': 0.5, 'story_phase': 'opening', 'quest_progress': 0.0, 'recent_events_count': 0, 'tension_level': 0.0, 'player_choices': 0, 'dialogue_intensity': 0.0, 'cutscene_time': 0.0, 'player_agency': 0.5, 'complexity': 0.3, 'character_development': 0.0, 'plot_twists': 0, 'event_density': 0.0, 'choice_impact': 0.0 } def _analyze_music_feedback(self) -> Dict[str, Any]: """ 分析音乐反馈 Returns: 音乐反馈分析结果 """ try: # 获取当前音乐状态 current_intensity = self.plugin.get_intensity() if self.plugin else 0.0 target_intensity = self.plugin.target_intensity if self.plugin else 0.0 # 分析音乐反馈相关指标 intensity_difference = abs(current_intensity - target_intensity) transition_in_progress = hasattr(self.plugin, 'transition_manager') and \ len(self.plugin.transition_manager.active_transitions) > 0 if self.plugin else False layer_count = len(self.plugin.music_manager.active_themes) if self.plugin and hasattr(self.plugin, 'music_manager') else 0 effect_count = len(self.plugin.effect_processor.active_effects) if self.plugin and hasattr(self.plugin, 'effect_processor') else 0 # 计算音乐反馈强度 feedback_level = 0.0 # 基于强度差异 difference_factor = intensity_difference # 基于过渡状态 transition_factor = 1.0 if transition_in_progress else 0.0 # 基于复杂度(层数和效果数) complexity_factor = min(1.0, (layer_count + effect_count) / 10.0) # 综合计算 feedback_level = (difference_factor + transition_factor + complexity_factor) / 3.0 # 计算音乐适应性 adaptation = 1.0 - intensity_difference # 更新历史记录 self.history['music_intensity'].append(current_intensity) return { 'intensity': feedback_level, 'intensity_difference': intensity_difference, 'transition_in_progress': transition_in_progress, 'layer_count': layer_count, 'effect_count': effect_count, 'adaptation': adaptation, 'complexity': complexity_factor } except Exception as e: print(f"✗ 分析音乐反馈失败: {e}") import traceback traceback.print_exc() return { 'intensity': 0.0, 'intensity_difference': 0.0, 'transition_in_progress': False, 'layer_count': 0, 'effect_count': 0, 'adaptation': 1.0, 'complexity': 0.0 } def update(self, dt: float): """ 更新分析引擎状态 Args: dt: 时间增量 """ update_start_time = time.time() current_time = time.time() # 定期执行分析 if current_time - self.last_analysis_time >= self.analysis_interval: self.perform_analysis() self.last_analysis_time = current_time self.last_update_time = current_time # 更新性能统计 self.stats['analysis_time'] += (time.time() - update_start_time) def cleanup(self): """清理所有资源""" self.analyzers.clear() self.analysis_results.clear() self.history.clear() self.stats = { 'analyzers_created': 0, 'analyses_performed': 0, 'analysis_time': 0.0, 'data_points_processed': 0, 'prediction_accuracy': 0.0 } print("✓ 分析引擎资源已清理") def get_stats(self) -> Dict[str, int]: """ 获取统计信息 Returns: 统计信息字典 """ return self.stats.copy() def get_analysis_results(self) -> Dict[str, Any]: """ 获取分析结果 Returns: 分析结果 """ return self.analysis_results.copy() def get_history(self, data_type: str = 'intensity') -> List[Any]: """ 获取历史数据 Args: data_type: 数据类型 Returns: 历史数据列表 """ if data_type in self.history: return list(self.history[data_type]) return [] def set_analysis_interval(self, interval: float): """ 设置分析间隔 Args: interval: 分析间隔(秒) """ self.analysis_interval = max(0.1, interval) print(f"✓ 分析间隔设置为: {self.analysis_interval:.2f}秒") def set_analyzer_weight(self, analyzer_type: str, weight: float): """ 设置分析器权重 Args: analyzer_type: 分析器类型 weight: 权重值 """ if analyzer_type in self.weights: self.weights[analyzer_type] = max(0.0, weight) print(f"✓ {analyzer_type}分析器权重设置为: {self.weights[analyzer_type]:.2f}") else: print(f"✗ 无效的分析器类型: {analyzer_type}") def get_intensity_trend(self) -> float: """ 获取强度趋势 Returns: 强度趋势值(正数表示上升,负数表示下降) """ try: if len(self.history['intensity']) >= 2: recent_values = list(self.history['intensity'])[-5:] # 最近5个值 if len(recent_values) >= 2: # 计算线性趋势 x = np.arange(len(recent_values)) y = np.array(recent_values) if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0: slope = np.polyfit(x, y, 1)[0] # 线性拟合的斜率 return slope return 0.0 except Exception as e: print(f"✗ 计算强度趋势失败: {e}") return 0.0 def predict_next_intensity(self) -> float: """ 预测下一个强度值 Returns: 预测的强度值 """ try: trend = self.get_intensity_trend() current_intensity = self.analysis_results.get('intensity', 0.0) predicted_intensity = current_intensity + trend # 限制在0-1范围内 return max(0.0, min(1.0, predicted_intensity)) except Exception as e: print(f"✗ 预测强度值失败: {e}") return self.analysis_results.get('intensity', 0.0) def get_dominant_factor(self) -> str: """ 获取主导因素 Returns: 主导因素名称 """ if not self.analysis_results or 'details' not in self.analysis_results: return 'unknown' details = self.analysis_results['details'] max_intensity = 0.0 dominant_factor = 'unknown' for factor_type, values in details.items(): if values: # 计算平均强度 try: avg_intensity = sum(value[0].get('intensity', 0.0) * value[1] for value in values if isinstance(value[0], dict)) / len(values) if avg_intensity > max_intensity: max_intensity = avg_intensity dominant_factor = factor_type except Exception: pass # 忽略计算错误 return dominant_factor def create_custom_analyzer(self, analyzer_name: str, analysis_function: Callable[[], Dict[str, Any]]) -> bool: """ 创建自定义分析器 Args: analyzer_name: 分析器名称 analysis_function: 分析函数 Returns: 是否创建成功 """ try: analyzer_params = { 'type': 'custom', 'callback': analysis_function } return self.create_analyzer(analyzer_name, analyzer_params) except Exception as e: print(f"✗ 创建自定义分析器失败: {e}") import traceback traceback.print_exc() return False def get_analyzer_status(self, analyzer_name: str) -> Dict[str, Any]: """ 获取分析器状态 Args: analyzer_name: 分析器名称 Returns: 分析器状态字典 """ if analyzer_name in self.analyzers: analyzer = self.analyzers[analyzer_name] return { 'name': analyzer_name, 'state': analyzer['state'], 'type': analyzer['params']['type'], 'enabled': analyzer['params']['enabled'], 'last_analysis': analyzer.get('last_analysis', 0.0), 'results_count': len(analyzer['history']), 'trend': analyzer.get('trend', 0.0), 'confidence': analyzer.get('confidence', 1.0), 'volatility': analyzer.get('volatility', 0.0) } else: return { 'name': analyzer_name, 'state': 'unknown' } def set_prediction_settings(self, settings: Dict[str, Any]): """ 设置预测参数 Args: settings: 预测参数字典 """ self.prediction_settings.update(settings) print("✓ 预测参数已更新") def get_prediction_settings(self) -> Dict[str, Any]: """ 获取预测参数 Returns: 预测参数字典 """ return self.prediction_settings.copy() def set_emotion_analysis_ranges(self, ranges: Dict[str, tuple]): """ 设置情绪分析范围 Args: ranges: 范围字典 """ self.emotion_analysis.update(ranges) print("✓ 情绪分析范围已更新") def get_emotion_analysis_ranges(self) -> Dict[str, tuple]: """ 获取情绪分析范围 Returns: 范围字典 """ return self.emotion_analysis.copy() def get_analyzer_trend(self, analyzer_name: str) -> float: """ 获取分析器趋势 Args: analyzer_name: 分析器名称 Returns: 趋势值 """ if analyzer_name in self.analyzers: return self.analyzers[analyzer_name].get('trend', 0.0) return 0.0 def get_analyzer_confidence(self, analyzer_name: str) -> float: """ 获取分析器置信度 Args: analyzer_name: 分析器名称 Returns: 置信度值 """ if analyzer_name in self.analyzers: return self.analyzers[analyzer_name].get('confidence', 1.0) return 1.0 def get_overall_confidence(self) -> float: """ 获取整体置信度 Returns: 整体置信度 """ if not self.analyzers: return 1.0 total_confidence = sum(analyzer.get('confidence', 1.0) for analyzer in self.analyzers.values()) return total_confidence / len(self.analyzers) def force_analysis(self) -> Dict[str, Any]: """ 强制执行分析(忽略时间间隔) Returns: 分析结果 """ print("✓ 执行强制分析") return self.perform_analysis() def get_analyzer_history(self, analyzer_name: str) -> List[Dict[str, Any]]: """ 获取分析器历史数据 Args: analyzer_name: 分析器名称 Returns: 历史数据列表 """ if analyzer_name in self.analyzers: return list(self.analyzers[analyzer_name]['history']) return [] def clear_analyzer_history(self, analyzer_name: str): """ 清空分析器历史数据 Args: analyzer_name: 分析器名称 """ if analyzer_name in self.analyzers: self.analyzers[analyzer_name]['history'].clear() print(f"✓ 分析器 {analyzer_name} 历史数据已清空") def set_history_length(self, length: int): """ 设置历史数据长度 Args: length: 历史数据长度 """ self.history_length = max(10, length) # 更新所有历史队列的长度 for key in self.history: self.history[key] = deque(list(self.history[key])[-self.history_length:], maxlen=self.history_length) print(f"✓ 历史数据长度设置为: {self.history_length}") def get_dominant_emotion(self) -> str: """ 获取主导情绪 Returns: 主导情绪标签 """ try: player_state = self.plugin.player_state if self.plugin else {} emotional_state = player_state.get('emotional_state', 'neutral') return emotional_state except Exception as e: print(f"✗ 获取主导情绪失败: {e}") return 'neutral' def get_emotional_valence(self) -> float: """ 获取情绪效价(正负情绪) Returns: 情绪效价值 (-1.0 到 1.0) """ try: # 从玩家状态分析器获取效价 player_results = self.analysis_results.get('details', {}).get('player_state', []) if player_results: for result, _ in player_results: if isinstance(result, dict) and 'valence' in result: return result['valence'] return 0.0 except Exception as e: print(f"✗ 获取情绪效价失败: {e}") return 0.0 def get_emotional_arousal(self) -> float: """ 获取情绪唤醒度(兴奋程度) Returns: 情绪唤醒度值 (0.0 到 1.0) """ try: # 从玩家状态分析器获取唤醒度 player_results = self.analysis_results.get('details', {}).get('player_state', []) if player_results: for result, _ in player_results: if isinstance(result, dict) and 'arousal' in result: return result['arousal'] return 0.4 # 默认中等唤醒度 except Exception as e: print(f"✗ 获取情绪唤醒度失败: {e}") return 0.4 def get_narrative_tension(self) -> float: """ 获取叙事紧张度 Returns: 叙事紧张度值 (0.0 到 1.0) """ try: # 从叙事分析器获取紧张度 narrative_results = self.analysis_results.get('details', {}).get('narrative', []) if narrative_results: for result, _ in narrative_results: if isinstance(result, dict) and 'tension_level' in result: return result['tension_level'] return 0.0 except Exception as e: print(f"✗ 获取叙事紧张度失败: {e}") return 0.0 def get_environmental_mood(self) -> float: """ 获取环境氛围 Returns: 环境氛围值 (0.0 到 1.0) """ try: # 从环境分析器获取氛围 environment_results = self.analysis_results.get('details', {}).get('environment', []) if environment_results: for result, _ in environment_results: if isinstance(result, dict) and 'mood' in result: return result['mood'] return 0.5 # 默认中性氛围 except Exception as e: print(f"✗ 获取环境氛围失败: {e}") return 0.5 def get_combat_tension(self) -> float: """ 获取战斗紧张度 Returns: 战斗紧张度值 (0.0 到 1.0) """ try: # 从战斗分析器获取紧张度 combat_results = self.analysis_results.get('details', {}).get('combat', []) if combat_results: for result, _ in combat_results: if isinstance(result, dict) and 'tension' in result: return result['tension'] return 0.0 except Exception as e: print(f"✗ 获取战斗紧张度失败: {e}") return 0.0 def get_exploration_curiosity(self) -> float: """ 获取探索好奇心 Returns: 探索好奇心值 (0.0 到 1.0) """ try: # 从探索分析器获取好奇心 exploration_results = self.analysis_results.get('details', {}).get('exploration', []) if exploration_results: for result, _ in exploration_results: if isinstance(result, dict) and 'curiosity' in result: return result['curiosity'] return 0.0 except Exception as e: print(f"✗ 获取探索好奇心失败: {e}") return 0.0