1566 lines
58 KiB
Python
1566 lines
58 KiB
Python
"""
|
|
分析引擎
|
|
负责分析游戏状态和环境条件,为音乐系统提供决策支持
|
|
"""
|
|
|
|
import uuid
|
|
import math
|
|
import time
|
|
import numpy as np
|
|
from typing import Dict, List, Any, Optional, Callable
|
|
from collections import deque
|
|
|
|
class AnalysisEngine:
|
|
"""
|
|
分析引擎
|
|
负责分析游戏状态和环境条件,为音乐系统提供决策支持
|
|
"""
|
|
|
|
def __init__(self, plugin):
|
|
"""
|
|
初始化分析引擎
|
|
|
|
Args:
|
|
plugin: 动态音乐系统插件实例
|
|
"""
|
|
self.plugin = plugin
|
|
self.analyzers: Dict[str, Dict[str, Any]] = {}
|
|
self.analysis_results: Dict[str, Any] = {}
|
|
self.history: Dict[str, deque] = {}
|
|
|
|
self.stats = {
|
|
'analyzers_created': 0,
|
|
'analyses_performed': 0,
|
|
'analysis_time': 0.0,
|
|
'data_points_processed': 0,
|
|
'prediction_accuracy': 0.0
|
|
}
|
|
|
|
self.buffer_size = plugin.buffer_size if plugin else 4096
|
|
self.analysis_interval = 1.0 # 默认分析间隔(秒)
|
|
self.last_analysis_time = 0.0
|
|
self.last_update_time = 0.0
|
|
|
|
# 分析权重
|
|
self.weights = {
|
|
'combat': 1.0,
|
|
'exploration': 0.8,
|
|
'environment': 0.6,
|
|
'player_state': 0.7,
|
|
'narrative': 0.5
|
|
}
|
|
|
|
# 历史数据长度
|
|
self.history_length = 100
|
|
|
|
# 预测参数
|
|
self.prediction_settings = {
|
|
'lookahead_time': 5.0, # 预测时间窗口
|
|
'confidence_threshold': 0.7, # 置信度阈值
|
|
'trend_sensitivity': 0.3, # 趋势敏感度
|
|
'prediction_smoothing': 0.1 # 预测平滑系数
|
|
}
|
|
|
|
# 情绪分析参数
|
|
self.emotion_analysis = {
|
|
'valence_range': (-1.0, 1.0), # 情绪效价范围
|
|
'arousal_range': (0.0, 1.0), # 情绪唤醒度范围
|
|
'tension_range': (0.0, 1.0), # 紧张度范围
|
|
'complexity_range': (0.0, 1.0) # 复杂度范围
|
|
}
|
|
|
|
# 初始化分析引擎
|
|
self._initialize_analysis_engine()
|
|
|
|
def _initialize_analysis_engine(self):
|
|
"""初始化分析引擎"""
|
|
# 初始化历史记录
|
|
history_keys = [
|
|
'intensity', 'combat_level', 'exploration_level',
|
|
'environment_changes', 'player_emotion', 'narrative_pacing',
|
|
'music_intensity', 'player_health', 'player_stamina'
|
|
]
|
|
|
|
for key in history_keys:
|
|
self.history[key] = deque(maxlen=self.history_length)
|
|
|
|
# 创建默认分析器
|
|
self._create_default_analyzers()
|
|
print("✓ 分析引擎初始化完成")
|
|
|
|
def _create_default_analyzers(self):
|
|
"""创建默认分析器"""
|
|
# 战斗强度分析器
|
|
self.create_analyzer('combat_analyzer', {
|
|
'type': 'combat',
|
|
'callback': self._analyze_combat_state
|
|
})
|
|
|
|
# 探索状态分析器
|
|
self.create_analyzer('exploration_analyzer', {
|
|
'type': 'exploration',
|
|
'callback': self._analyze_exploration_state
|
|
})
|
|
|
|
# 环境条件分析器
|
|
self.create_analyzer('environment_analyzer', {
|
|
'type': 'environment',
|
|
'callback': self._analyze_environment_conditions
|
|
})
|
|
|
|
# 玩家状态分析器
|
|
self.create_analyzer('player_analyzer', {
|
|
'type': 'player_state',
|
|
'callback': self._analyze_player_state
|
|
})
|
|
|
|
# 叙事节奏分析器
|
|
self.create_analyzer('narrative_analyzer', {
|
|
'type': 'narrative',
|
|
'callback': self._analyze_narrative_pacing
|
|
})
|
|
|
|
# 音乐反馈分析器
|
|
self.create_analyzer('music_feedback_analyzer', {
|
|
'type': 'music_feedback',
|
|
'callback': self._analyze_music_feedback
|
|
})
|
|
|
|
def create_analyzer(self, analyzer_name: str, analyzer_params: Dict[str, Any]) -> bool:
|
|
"""
|
|
创建分析器
|
|
|
|
Args:
|
|
analyzer_name: 分析器名称
|
|
analyzer_params: 分析器参数
|
|
|
|
Returns:
|
|
是否创建成功
|
|
"""
|
|
try:
|
|
# 设置默认参数
|
|
default_params = {
|
|
'type': 'generic',
|
|
'enabled': True,
|
|
'interval': self.analysis_interval,
|
|
'weight': 1.0,
|
|
'history_size': 50,
|
|
'sensitivity': 1.0,
|
|
'smoothing': 0.2,
|
|
'thresholds': {
|
|
'low': 0.3,
|
|
'medium': 0.6,
|
|
'high': 0.8
|
|
}
|
|
}
|
|
|
|
# 合并参数
|
|
params = default_params.copy()
|
|
params.update(analyzer_params)
|
|
|
|
# 验证回调函数
|
|
if 'callback' not in params or not callable(params['callback']):
|
|
print(f"✗ 分析器缺少有效的回调函数: {analyzer_name}")
|
|
return False
|
|
|
|
# 创建分析器
|
|
analyzer = {
|
|
'name': analyzer_name,
|
|
'params': params,
|
|
'state': 'created',
|
|
'last_analysis': 0.0,
|
|
'results': {},
|
|
'history': deque(maxlen=params['history_size']),
|
|
'trend': 0.0, # 趋势值
|
|
'volatility': 0.0, # 波动性
|
|
'confidence': 1.0 # 置信度
|
|
}
|
|
|
|
self.analyzers[analyzer_name] = analyzer
|
|
self.stats['analyzers_created'] += 1
|
|
|
|
print(f"✓ 分析器创建成功: {analyzer_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ 创建分析器失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def enable_analyzer(self, analyzer_name: str, enable: bool = True) -> bool:
|
|
"""
|
|
启用/禁用分析器
|
|
|
|
Args:
|
|
analyzer_name: 分析器名称
|
|
enable: 是否启用
|
|
|
|
Returns:
|
|
是否设置成功
|
|
"""
|
|
try:
|
|
if analyzer_name not in self.analyzers:
|
|
print(f"✗ 分析器不存在: {analyzer_name}")
|
|
return False
|
|
|
|
self.analyzers[analyzer_name]['params']['enabled'] = enable
|
|
state = "启用" if enable else "禁用"
|
|
print(f"✓ 分析器已{state}: {analyzer_name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"✗ 设置分析器状态失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def perform_analysis(self) -> Dict[str, Any]:
|
|
"""
|
|
执行分析
|
|
|
|
Returns:
|
|
分析结果
|
|
"""
|
|
try:
|
|
analysis_start_time = time.time()
|
|
current_time = time.time()
|
|
|
|
# 收集所有分析器的结果
|
|
combined_results = {}
|
|
total_weight = 0.0
|
|
weighted_intensity = 0.0
|
|
analyzer_contributions = {}
|
|
|
|
for analyzer_name, analyzer in self.analyzers.items():
|
|
params = analyzer['params']
|
|
|
|
# 检查分析器是否启用
|
|
if not params.get('enabled', True):
|
|
continue
|
|
|
|
# 检查分析间隔
|
|
interval = params.get('interval', self.analysis_interval)
|
|
if current_time - analyzer.get('last_analysis', 0.0) < interval:
|
|
# 使用之前的分析结果
|
|
if analyzer['results']:
|
|
result = analyzer['results']
|
|
else:
|
|
continue
|
|
else:
|
|
# 执行新的分析
|
|
try:
|
|
callback = params['callback']
|
|
result = callback()
|
|
analyzer['results'] = result
|
|
analyzer['last_analysis'] = current_time
|
|
|
|
# 更新历史记录
|
|
analyzer['history'].append(result)
|
|
|
|
# 计算趋势和波动性
|
|
self._update_analyzer_trends(analyzer)
|
|
|
|
self.stats['analyses_performed'] += 1
|
|
except Exception as e:
|
|
print(f"✗ 分析器 {analyzer_name} 执行失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
continue
|
|
|
|
# 合并结果
|
|
analyzer_type = params.get('type', 'generic')
|
|
weight = params.get('weight', 1.0) * self.weights.get(analyzer_type, 1.0)
|
|
|
|
# 应用灵敏度调整
|
|
sensitivity = params.get('sensitivity', 1.0)
|
|
if isinstance(result, dict):
|
|
adjusted_result = {}
|
|
for key, value in result.items():
|
|
if isinstance(value, (int, float)):
|
|
adjusted_result[key] = value * sensitivity
|
|
else:
|
|
adjusted_result[key] = value
|
|
result = adjusted_result
|
|
elif isinstance(result, (int, float)):
|
|
result = result * sensitivity
|
|
|
|
if isinstance(result, dict):
|
|
for key, value in result.items():
|
|
if key not in combined_results:
|
|
combined_results[key] = []
|
|
combined_results[key].append((value, weight))
|
|
else:
|
|
# 处理单一值结果
|
|
if 'generic' not in combined_results:
|
|
combined_results['generic'] = []
|
|
combined_results['generic'].append((result, weight))
|
|
|
|
# 计算加权强度
|
|
if 'intensity' in result:
|
|
if isinstance(result['intensity'], (int, float)):
|
|
weighted_intensity += result['intensity'] * weight
|
|
total_weight += weight
|
|
analyzer_contributions[analyzer_name] = result['intensity'] * weight
|
|
|
|
# 更新分析器置信度
|
|
self._update_analyzer_confidence(analyzer, result)
|
|
|
|
# 计算平均强度
|
|
if total_weight > 0:
|
|
average_intensity = weighted_intensity / total_weight
|
|
else:
|
|
average_intensity = 0.0
|
|
|
|
# 应用平滑处理
|
|
smoothing = 0.1
|
|
if 'intensity' in self.analysis_results:
|
|
previous_intensity = self.analysis_results['intensity']
|
|
average_intensity = previous_intensity * smoothing + average_intensity * (1 - smoothing)
|
|
|
|
# 整合分析结果
|
|
final_results = {
|
|
'timestamp': current_time,
|
|
'intensity': average_intensity,
|
|
'raw_intensity': weighted_intensity / total_weight if total_weight > 0 else 0.0,
|
|
'details': combined_results,
|
|
'confidence': min(1.0, len(combined_results) / max(1, len(self.analyzers))),
|
|
'analyzer_contributions': analyzer_contributions,
|
|
'trends': self._get_overall_trends(),
|
|
'predictions': self._make_predictions(average_intensity)
|
|
}
|
|
|
|
# 更新历史记录
|
|
self.history['intensity'].append(average_intensity)
|
|
if 'combat_level' in combined_results:
|
|
combat_values = [r[0] for r in combined_results.get('combat_level', [])]
|
|
if combat_values:
|
|
self.history['combat_level'].append(np.mean(combat_values))
|
|
if 'exploration_level' in combined_results:
|
|
exploration_values = [r[0] for r in combined_results.get('exploration_level', [])]
|
|
if exploration_values:
|
|
self.history['exploration_level'].append(np.mean(exploration_values))
|
|
|
|
# 存储结果
|
|
self.analysis_results = final_results
|
|
|
|
# 更新性能统计
|
|
self.stats['analysis_time'] += (time.time() - analysis_start_time)
|
|
self.stats['data_points_processed'] += len(combined_results)
|
|
|
|
return final_results
|
|
|
|
except Exception as e:
|
|
print(f"✗ 执行分析失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {}
|
|
|
|
def _update_analyzer_trends(self, analyzer: Dict[str, Any]):
|
|
"""
|
|
更新分析器趋势
|
|
|
|
Args:
|
|
analyzer: 分析器数据
|
|
"""
|
|
try:
|
|
if len(analyzer['history']) < 2:
|
|
return
|
|
|
|
# 计算强度趋势
|
|
intensity_values = []
|
|
for result in analyzer['history']:
|
|
if isinstance(result, dict) and 'intensity' in result:
|
|
intensity_values.append(result['intensity'])
|
|
|
|
if len(intensity_values) >= 2:
|
|
# 线性回归计算趋势
|
|
x = np.arange(len(intensity_values))
|
|
y = np.array(intensity_values)
|
|
if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0:
|
|
slope, _ = np.polyfit(x, y, 1)
|
|
analyzer['trend'] = slope
|
|
|
|
# 计算波动性
|
|
analyzer['volatility'] = np.std(y)
|
|
|
|
except Exception as e:
|
|
print(f"✗ 更新分析器趋势失败: {e}")
|
|
|
|
def _update_analyzer_confidence(self, analyzer: Dict[str, Any], result: Any):
|
|
"""
|
|
更新分析器置信度
|
|
|
|
Args:
|
|
analyzer: 分析器数据
|
|
result: 分析结果
|
|
"""
|
|
try:
|
|
# 基于历史一致性更新置信度
|
|
if len(analyzer['history']) >= 3:
|
|
recent_values = []
|
|
for hist_result in list(analyzer['history'])[-3:]:
|
|
if isinstance(hist_result, dict) and 'intensity' in hist_result:
|
|
recent_values.append(hist_result['intensity'])
|
|
|
|
if len(recent_values) >= 2:
|
|
# 计算变化率
|
|
changes = [abs(recent_values[i] - recent_values[i-1]) for i in range(1, len(recent_values))]
|
|
avg_change = np.mean(changes)
|
|
|
|
# 基于稳定性调整置信度
|
|
stability = 1.0 / (1.0 + avg_change * 10)
|
|
analyzer['confidence'] = 0.7 * analyzer['confidence'] + 0.3 * stability
|
|
|
|
# 确保置信度在合理范围内
|
|
analyzer['confidence'] = max(0.1, min(1.0, analyzer['confidence']))
|
|
|
|
except Exception as e:
|
|
print(f"✗ 更新分析器置信度失败: {e}")
|
|
|
|
def _get_overall_trends(self) -> Dict[str, float]:
|
|
"""
|
|
获取整体趋势
|
|
|
|
Returns:
|
|
趋势字典
|
|
"""
|
|
try:
|
|
trends = {}
|
|
|
|
# 计算整体强度趋势
|
|
if len(self.history['intensity']) >= 5:
|
|
recent_values = list(self.history['intensity'])[-5:]
|
|
x = np.arange(len(recent_values))
|
|
y = np.array(recent_values)
|
|
if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0:
|
|
slope, _ = np.polyfit(x, y, 1)
|
|
trends['intensity'] = slope
|
|
|
|
# 计算战斗趋势
|
|
if len(self.history['combat_level']) >= 5:
|
|
recent_values = list(self.history['combat_level'])[-5:]
|
|
x = np.arange(len(recent_values))
|
|
y = np.array(recent_values)
|
|
if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0:
|
|
slope, _ = np.polyfit(x, y, 1)
|
|
trends['combat'] = slope
|
|
|
|
# 计算探索趋势
|
|
if len(self.history['exploration_level']) >= 5:
|
|
recent_values = list(self.history['exploration_level'])[-5:]
|
|
x = np.arange(len(recent_values))
|
|
y = np.array(recent_values)
|
|
if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0:
|
|
slope, _ = np.polyfit(x, y, 1)
|
|
trends['exploration'] = slope
|
|
|
|
return trends
|
|
|
|
except Exception as e:
|
|
print(f"✗ 获取整体趋势失败: {e}")
|
|
return {}
|
|
|
|
def _make_predictions(self, current_intensity: float) -> Dict[str, Any]:
|
|
"""
|
|
做出预测
|
|
|
|
Args:
|
|
current_intensity: 当前强度
|
|
|
|
Returns:
|
|
预测结果
|
|
"""
|
|
try:
|
|
predictions = {}
|
|
lookahead_time = self.prediction_settings['lookahead_time']
|
|
|
|
# 基于趋势预测未来强度
|
|
overall_trend = self._get_overall_trends().get('intensity', 0.0)
|
|
predicted_intensity = current_intensity + overall_trend * lookahead_time
|
|
|
|
# 限制在合理范围内
|
|
predicted_intensity = max(0.0, min(1.0, predicted_intensity))
|
|
|
|
# 计算预测置信度
|
|
trend_magnitude = abs(overall_trend)
|
|
confidence = min(1.0, trend_magnitude * 10) # 趋势越大,置信度越高
|
|
|
|
predictions['intensity'] = {
|
|
'value': predicted_intensity,
|
|
'confidence': confidence,
|
|
'time_horizon': lookahead_time
|
|
}
|
|
|
|
# 基于历史模式预测事件
|
|
predictions['events'] = self._predict_events()
|
|
|
|
return predictions
|
|
|
|
except Exception as e:
|
|
print(f"✗ 做出预测失败: {e}")
|
|
return {}
|
|
|
|
def _predict_events(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
预测事件
|
|
|
|
Returns:
|
|
预测事件列表
|
|
"""
|
|
try:
|
|
events = []
|
|
|
|
# 检查战斗强度是否即将上升
|
|
if len(self.history['combat_level']) >= 10:
|
|
recent_combat = list(self.history['combat_level'])[-10:]
|
|
trend = np.polyfit(np.arange(len(recent_combat)), np.array(recent_combat), 1)[0]
|
|
|
|
if trend > 0.1 and recent_combat[-1] > 0.3: # 上升趋势且当前强度较高
|
|
events.append({
|
|
'type': 'combat_escalation',
|
|
'probability': min(1.0, trend * 5),
|
|
'time_to_event': max(1.0, 10 - recent_combat[-1] * 10)
|
|
})
|
|
|
|
# 检查探索活动是否减少
|
|
if len(self.history['exploration_level']) >= 10:
|
|
recent_exploration = list(self.history['exploration_level'])[-10:]
|
|
trend = np.polyfit(np.arange(len(recent_exploration)), np.array(recent_exploration), 1)[0]
|
|
|
|
if trend < -0.1 and recent_exploration[-1] < 0.5: # 下降趋势且当前强度较低
|
|
events.append({
|
|
'type': 'exploration_decrease',
|
|
'probability': min(1.0, abs(trend) * 5),
|
|
'time_to_event': max(1.0, recent_exploration[-1] * 10)
|
|
})
|
|
|
|
return events
|
|
|
|
except Exception as e:
|
|
print(f"✗ 预测事件失败: {e}")
|
|
return []
|
|
|
|
def _analyze_combat_state(self) -> Dict[str, Any]:
|
|
"""
|
|
分析战斗状态
|
|
|
|
Returns:
|
|
战斗状态分析结果
|
|
"""
|
|
try:
|
|
# 获取游戏状态
|
|
game_state = self.plugin.game_state if self.plugin else {}
|
|
|
|
# 分析战斗相关指标
|
|
in_combat = game_state.get('in_combat', False)
|
|
enemies_nearby = game_state.get('enemies_nearby', 0)
|
|
player_health = game_state.get('player_health', 1.0)
|
|
combat_intensity = game_state.get('combat_intensity', 0.0)
|
|
player_damage = game_state.get('player_damage', 0.0)
|
|
enemy_damage = game_state.get('enemy_damage', 0.0)
|
|
combat_duration = game_state.get('combat_duration', 0.0)
|
|
|
|
# 计算战斗强度
|
|
combat_level = 0.0
|
|
if in_combat:
|
|
# 基于敌人数量和强度
|
|
enemy_factor = min(1.0, enemies_nearby / 10.0)
|
|
# 基于玩家生命值
|
|
health_factor = 1.0 - player_health
|
|
# 基于战斗强度
|
|
intensity_factor = combat_intensity
|
|
# 基于伤害交换
|
|
damage_factor = min(1.0, (player_damage + enemy_damage) / 100.0)
|
|
# 基于战斗持续时间
|
|
duration_factor = min(1.0, combat_duration / 60.0)
|
|
|
|
combat_level = (enemy_factor + health_factor + intensity_factor + damage_factor + duration_factor) / 5.0
|
|
|
|
# 计算战斗紧张度
|
|
tension = 0.0
|
|
if in_combat:
|
|
# 基于生命值差异
|
|
health_tension = 1.0 - player_health
|
|
# 基于敌人数
|
|
enemy_tension = min(1.0, enemies_nearby / 5.0)
|
|
# 综合紧张度
|
|
tension = (health_tension + enemy_tension) / 2.0
|
|
|
|
# 更新历史记录
|
|
self.history['combat_level'].append(combat_level)
|
|
self.history['player_health'].append(player_health)
|
|
|
|
return {
|
|
'intensity': combat_level,
|
|
'in_combat': in_combat,
|
|
'enemies_nearby': enemies_nearby,
|
|
'player_health': player_health,
|
|
'combat_intensity': combat_intensity,
|
|
'tension': tension,
|
|
'damage_exchange': player_damage + enemy_damage,
|
|
'duration': combat_duration
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"✗ 分析战斗状态失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
'intensity': 0.0,
|
|
'in_combat': False,
|
|
'enemies_nearby': 0,
|
|
'player_health': 1.0,
|
|
'combat_intensity': 0.0,
|
|
'tension': 0.0,
|
|
'damage_exchange': 0.0,
|
|
'duration': 0.0
|
|
}
|
|
|
|
def _analyze_exploration_state(self) -> Dict[str, Any]:
|
|
"""
|
|
分析探索状态
|
|
|
|
Returns:
|
|
探索状态分析结果
|
|
"""
|
|
try:
|
|
# 获取游戏状态
|
|
game_state = self.plugin.game_state if self.plugin else {}
|
|
|
|
# 分析探索相关指标
|
|
exploring = game_state.get('exploring', False)
|
|
areas_discovered = game_state.get('areas_discovered', 0)
|
|
exploration_progress = game_state.get('exploration_progress', 0.0)
|
|
time_since_discovery = game_state.get('time_since_discovery', 0.0)
|
|
movement_speed = game_state.get('movement_speed', 0.0)
|
|
player_stealth = game_state.get('player_stealth', 0.0)
|
|
secrets_found = game_state.get('secrets_found', 0)
|
|
total_secrets = game_state.get('total_secrets', 1)
|
|
|
|
# 计算探索强度
|
|
exploration_level = 0.0
|
|
if exploring:
|
|
# 基于探索进度
|
|
progress_factor = exploration_progress
|
|
# 基于发现区域数量
|
|
discovery_factor = min(1.0, areas_discovered / 20.0)
|
|
# 基于发现时间(新发现时强度更高)
|
|
time_factor = max(0.0, min(1.0, 1.0 - time_since_discovery / 60.0))
|
|
# 基于移动速度
|
|
speed_factor = min(1.0, movement_speed / 10.0)
|
|
# 基于隐秘程度
|
|
stealth_factor = player_stealth
|
|
# 基于秘密发现率
|
|
secrets_factor = min(1.0, secrets_found / max(1, total_secrets))
|
|
|
|
exploration_level = (progress_factor + discovery_factor + time_factor +
|
|
speed_factor + stealth_factor + secrets_factor) / 6.0
|
|
|
|
# 计算探索好奇心
|
|
curiosity = 0.0
|
|
if exploring:
|
|
# 基于未发现区域
|
|
undiscovered_factor = 1.0 - exploration_progress
|
|
# 基于秘密发现率
|
|
secrets_undiscovered = max(0, total_secrets - secrets_found)
|
|
secrets_factor = min(1.0, secrets_undiscovered / max(1, total_secrets))
|
|
# 综合好奇心
|
|
curiosity = (undiscovered_factor + secrets_factor) / 2.0
|
|
|
|
# 更新历史记录
|
|
self.history['exploration_level'].append(exploration_level)
|
|
|
|
return {
|
|
'intensity': exploration_level,
|
|
'exploring': exploring,
|
|
'areas_discovered': areas_discovered,
|
|
'exploration_progress': exploration_progress,
|
|
'time_since_discovery': time_since_discovery,
|
|
'movement_speed': movement_speed,
|
|
'stealth': player_stealth,
|
|
'curiosity': curiosity,
|
|
'secrets_completion': secrets_found / max(1, total_secrets)
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"✗ 分析探索状态失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
'intensity': 0.0,
|
|
'exploring': False,
|
|
'areas_discovered': 0,
|
|
'exploration_progress': 0.0,
|
|
'time_since_discovery': 0.0,
|
|
'movement_speed': 0.0,
|
|
'stealth': 0.0,
|
|
'curiosity': 0.0,
|
|
'secrets_completion': 0.0
|
|
}
|
|
|
|
def _analyze_environment_conditions(self) -> Dict[str, Any]:
|
|
"""
|
|
分析环境条件
|
|
|
|
Returns:
|
|
环境条件分析结果
|
|
"""
|
|
try:
|
|
# 获取环境条件
|
|
environment = self.plugin.environment_conditions if self.plugin else {}
|
|
|
|
# 分析环境相关指标
|
|
weather = environment.get('weather', 'clear')
|
|
time_of_day = environment.get('time_of_day', 'day')
|
|
location_type = environment.get('location_type', 'outdoor')
|
|
danger_level = environment.get('danger_level', 0.0)
|
|
atmosphere = environment.get('atmosphere', 0.5)
|
|
temperature = environment.get('temperature', 20.0)
|
|
visibility = environment.get('visibility', 1.0)
|
|
lighting = environment.get('lighting', 1.0)
|
|
weather_intensity = environment.get('weather_intensity', 0.0)
|
|
|
|
# 计算环境强度
|
|
environment_level = 0.0
|
|
|
|
# 天气影响
|
|
weather_factors = {
|
|
'clear': 0.2,
|
|
'cloudy': 0.3,
|
|
'rain': 0.5,
|
|
'storm': 0.8,
|
|
'fog': 0.6,
|
|
'snow': 0.4,
|
|
'windy': 0.4
|
|
}
|
|
weather_factor = weather_factors.get(weather, 0.3)
|
|
|
|
# 时间影响
|
|
time_factors = {
|
|
'day': 0.3,
|
|
'dusk': 0.5,
|
|
'night': 0.7,
|
|
'dawn': 0.4
|
|
}
|
|
time_factor = time_factors.get(time_of_day, 0.3)
|
|
|
|
# 地点类型影响
|
|
location_factors = {
|
|
'indoor': 0.4,
|
|
'outdoor': 0.6,
|
|
'underground': 0.8,
|
|
'underwater': 0.7,
|
|
'cave': 0.7,
|
|
'forest': 0.5,
|
|
'desert': 0.4
|
|
}
|
|
location_factor = location_factors.get(location_type, 0.5)
|
|
|
|
# 综合计算
|
|
environment_level = (weather_factor + time_factor + location_factor +
|
|
danger_level + atmosphere + weather_intensity) / 6.0
|
|
|
|
# 计算环境紧张度
|
|
environmental_tension = (danger_level + (1.0 - visibility) + (1.0 - lighting)) / 3.0
|
|
|
|
# 计算环境氛围
|
|
environmental_mood = atmosphere
|
|
|
|
# 更新历史记录
|
|
self.history['environment_changes'].append(environment_level)
|
|
|
|
return {
|
|
'intensity': environment_level,
|
|
'weather': weather,
|
|
'time_of_day': time_of_day,
|
|
'location_type': location_type,
|
|
'danger_level': danger_level,
|
|
'atmosphere': atmosphere,
|
|
'temperature': temperature,
|
|
'visibility': visibility,
|
|
'lighting': lighting,
|
|
'weather_intensity': weather_intensity,
|
|
'tension': environmental_tension,
|
|
'mood': environmental_mood
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"✗ 分析环境条件失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
'intensity': 0.3,
|
|
'weather': 'clear',
|
|
'time_of_day': 'day',
|
|
'location_type': 'outdoor',
|
|
'danger_level': 0.0,
|
|
'atmosphere': 0.5,
|
|
'temperature': 20.0,
|
|
'visibility': 1.0,
|
|
'lighting': 1.0,
|
|
'weather_intensity': 0.0,
|
|
'tension': 0.0,
|
|
'mood': 0.5
|
|
}
|
|
|
|
def _analyze_player_state(self) -> Dict[str, Any]:
|
|
"""
|
|
分析玩家状态
|
|
|
|
Returns:
|
|
玩家状态分析结果
|
|
"""
|
|
try:
|
|
# 获取玩家状态
|
|
player_state = self.plugin.player_state if self.plugin else {}
|
|
|
|
# 分析玩家相关指标
|
|
health = player_state.get('health', 1.0)
|
|
stamina = player_state.get('stamina', 1.0)
|
|
experience = player_state.get('experience', 0)
|
|
level = player_state.get('level', 1)
|
|
emotional_state = player_state.get('emotional_state', 'neutral')
|
|
skill_level = player_state.get('skill_level', 0.5)
|
|
fatigue = player_state.get('fatigue', 0.0)
|
|
hunger = player_state.get('hunger', 0.0)
|
|
stress = player_state.get('stress', 0.0)
|
|
focus = player_state.get('focus', 0.5)
|
|
courage = player_state.get('courage', 0.5)
|
|
items_collected = player_state.get('items_collected', 0)
|
|
quests_completed = player_state.get('quests_completed', 0)
|
|
|
|
# 计算玩家状态强度
|
|
# 基于健康和耐力
|
|
physical_factor = (health + stamina) / 2.0
|
|
# 基于经验和等级
|
|
progression_factor = min(1.0, (experience / 1000.0 + level / 50.0) / 2.0)
|
|
# 基于技能水平
|
|
skill_factor = skill_level
|
|
# 基于疲劳和饥饿
|
|
condition_factor = 1.0 - (fatigue + hunger) / 2.0
|
|
|
|
# 综合计算
|
|
player_level = (physical_factor + progression_factor + skill_factor + condition_factor) / 4.0
|
|
|
|
# 情感状态影响
|
|
emotion_factors = {
|
|
'excited': 0.8,
|
|
'happy': 0.6,
|
|
'neutral': 0.5,
|
|
'sad': 0.3,
|
|
'afraid': 0.7,
|
|
'angry': 0.9,
|
|
'calm': 0.4,
|
|
'curious': 0.6
|
|
}
|
|
emotion_factor = emotion_factors.get(emotional_state, 0.5)
|
|
|
|
# 计算玩家情绪效价(正负情绪)
|
|
valence_map = {
|
|
'excited': 0.7, 'happy': 0.8, 'neutral': 0.0,
|
|
'sad': -0.6, 'afraid': -0.5, 'angry': -0.3,
|
|
'calm': 0.2, 'curious': 0.4
|
|
}
|
|
valence = valence_map.get(emotional_state, 0.0)
|
|
|
|
# 计算玩家唤醒度(兴奋程度)
|
|
arousal_map = {
|
|
'excited': 0.9, 'happy': 0.6, 'neutral': 0.4,
|
|
'sad': 0.3, 'afraid': 0.8, 'angry': 0.7,
|
|
'calm': 0.2, 'curious': 0.5
|
|
}
|
|
arousal = arousal_map.get(emotional_state, 0.4)
|
|
|
|
# 计算玩家压力水平
|
|
stress_level = (stress + (1.0 - health) + fatigue) / 3.0
|
|
|
|
# 更新历史记录
|
|
self.history['player_emotion'].append(emotion_factor)
|
|
self.history['player_health'].append(health)
|
|
self.history['player_stamina'].append(stamina)
|
|
|
|
return {
|
|
'intensity': player_level,
|
|
'health': health,
|
|
'stamina': stamina,
|
|
'experience': experience,
|
|
'level': level,
|
|
'emotional_state': emotional_state,
|
|
'skill_level': skill_level,
|
|
'fatigue': fatigue,
|
|
'hunger': hunger,
|
|
'stress': stress,
|
|
'focus': focus,
|
|
'courage': courage,
|
|
'items_collected': items_collected,
|
|
'quests_completed': quests_completed,
|
|
'valence': valence, # 情绪效价
|
|
'arousal': arousal, # 唤醒度
|
|
'stress_level': stress_level # 压力水平
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"✗ 分析玩家状态失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
'intensity': 0.5,
|
|
'health': 1.0,
|
|
'stamina': 1.0,
|
|
'experience': 0,
|
|
'level': 1,
|
|
'emotional_state': 'neutral',
|
|
'skill_level': 0.5,
|
|
'fatigue': 0.0,
|
|
'hunger': 0.0,
|
|
'stress': 0.0,
|
|
'focus': 0.5,
|
|
'courage': 0.5,
|
|
'items_collected': 0,
|
|
'quests_completed': 0,
|
|
'valence': 0.0,
|
|
'arousal': 0.4,
|
|
'stress_level': 0.0
|
|
}
|
|
|
|
def _analyze_narrative_pacing(self) -> Dict[str, Any]:
|
|
"""
|
|
分析叙事节奏
|
|
|
|
Returns:
|
|
叙事节奏分析结果
|
|
"""
|
|
try:
|
|
# 获取游戏状态
|
|
game_state = self.plugin.game_state if self.plugin else {}
|
|
|
|
# 分析叙事相关指标
|
|
story_phase = game_state.get('story_phase', 'opening')
|
|
quest_progress = game_state.get('quest_progress', 0.0)
|
|
recent_events = game_state.get('recent_events', [])
|
|
tension_level = game_state.get('tension_level', 0.0)
|
|
player_choices = game_state.get('player_choices', 0)
|
|
dialogue_intensity = game_state.get('dialogue_intensity', 0.0)
|
|
cutscene_time = game_state.get('cutscene_time', 0.0)
|
|
player_agency = game_state.get('player_agency', 0.5)
|
|
narrative_complexity = game_state.get('narrative_complexity', 0.3)
|
|
character_development = game_state.get('character_development', 0.0)
|
|
plot_twists = game_state.get('plot_twists', 0)
|
|
|
|
# 计算叙事强度
|
|
narrative_level = 0.0
|
|
|
|
# 故事阶段影响
|
|
phase_factors = {
|
|
'opening': 0.3,
|
|
'rising_action': 0.6,
|
|
'climax': 0.9,
|
|
'falling_action': 0.5,
|
|
'resolution': 0.2
|
|
}
|
|
phase_factor = phase_factors.get(story_phase, 0.5)
|
|
|
|
# 任务进度影响
|
|
progress_factor = quest_progress
|
|
|
|
# 紧张程度影响
|
|
tension_factor = tension_level
|
|
|
|
# 事件密度影响(最近事件数量)
|
|
event_density = min(1.0, len(recent_events) / 10.0)
|
|
|
|
# 对话强度影响
|
|
dialogue_factor = dialogue_intensity
|
|
|
|
# 玩家选择影响
|
|
choice_factor = min(1.0, player_choices / 20.0)
|
|
|
|
# 综合计算
|
|
narrative_level = (phase_factor + progress_factor + tension_factor +
|
|
event_density + dialogue_factor + choice_factor) / 6.0
|
|
|
|
# 计算叙事复杂度
|
|
complexity = narrative_complexity
|
|
|
|
# 计算角色发展强度
|
|
character_factor = character_development
|
|
|
|
# 计算剧情转折强度
|
|
twist_factor = min(1.0, plot_twists / 5.0)
|
|
|
|
# 更新历史记录
|
|
self.history['narrative_pacing'].append(narrative_level)
|
|
|
|
return {
|
|
'intensity': narrative_level,
|
|
'story_phase': story_phase,
|
|
'quest_progress': quest_progress,
|
|
'recent_events_count': len(recent_events),
|
|
'tension_level': tension_level,
|
|
'player_choices': player_choices,
|
|
'dialogue_intensity': dialogue_intensity,
|
|
'cutscene_time': cutscene_time,
|
|
'player_agency': player_agency,
|
|
'complexity': complexity,
|
|
'character_development': character_development,
|
|
'plot_twists': plot_twists,
|
|
'event_density': event_density,
|
|
'choice_impact': choice_factor
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"✗ 分析叙事节奏失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
'intensity': 0.5,
|
|
'story_phase': 'opening',
|
|
'quest_progress': 0.0,
|
|
'recent_events_count': 0,
|
|
'tension_level': 0.0,
|
|
'player_choices': 0,
|
|
'dialogue_intensity': 0.0,
|
|
'cutscene_time': 0.0,
|
|
'player_agency': 0.5,
|
|
'complexity': 0.3,
|
|
'character_development': 0.0,
|
|
'plot_twists': 0,
|
|
'event_density': 0.0,
|
|
'choice_impact': 0.0
|
|
}
|
|
|
|
def _analyze_music_feedback(self) -> Dict[str, Any]:
|
|
"""
|
|
分析音乐反馈
|
|
|
|
Returns:
|
|
音乐反馈分析结果
|
|
"""
|
|
try:
|
|
# 获取当前音乐状态
|
|
current_intensity = self.plugin.get_intensity() if self.plugin else 0.0
|
|
target_intensity = self.plugin.target_intensity if self.plugin else 0.0
|
|
|
|
# 分析音乐反馈相关指标
|
|
intensity_difference = abs(current_intensity - target_intensity)
|
|
transition_in_progress = hasattr(self.plugin, 'transition_manager') and \
|
|
len(self.plugin.transition_manager.active_transitions) > 0 if self.plugin else False
|
|
layer_count = len(self.plugin.music_manager.active_themes) if self.plugin and hasattr(self.plugin, 'music_manager') else 0
|
|
effect_count = len(self.plugin.effect_processor.active_effects) if self.plugin and hasattr(self.plugin, 'effect_processor') else 0
|
|
|
|
# 计算音乐反馈强度
|
|
feedback_level = 0.0
|
|
|
|
# 基于强度差异
|
|
difference_factor = intensity_difference
|
|
|
|
# 基于过渡状态
|
|
transition_factor = 1.0 if transition_in_progress else 0.0
|
|
|
|
# 基于复杂度(层数和效果数)
|
|
complexity_factor = min(1.0, (layer_count + effect_count) / 10.0)
|
|
|
|
# 综合计算
|
|
feedback_level = (difference_factor + transition_factor + complexity_factor) / 3.0
|
|
|
|
# 计算音乐适应性
|
|
adaptation = 1.0 - intensity_difference
|
|
|
|
# 更新历史记录
|
|
self.history['music_intensity'].append(current_intensity)
|
|
|
|
return {
|
|
'intensity': feedback_level,
|
|
'intensity_difference': intensity_difference,
|
|
'transition_in_progress': transition_in_progress,
|
|
'layer_count': layer_count,
|
|
'effect_count': effect_count,
|
|
'adaptation': adaptation,
|
|
'complexity': complexity_factor
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"✗ 分析音乐反馈失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
'intensity': 0.0,
|
|
'intensity_difference': 0.0,
|
|
'transition_in_progress': False,
|
|
'layer_count': 0,
|
|
'effect_count': 0,
|
|
'adaptation': 1.0,
|
|
'complexity': 0.0
|
|
}
|
|
|
|
def update(self, dt: float):
|
|
"""
|
|
更新分析引擎状态
|
|
|
|
Args:
|
|
dt: 时间增量
|
|
"""
|
|
update_start_time = time.time()
|
|
current_time = time.time()
|
|
|
|
# 定期执行分析
|
|
if current_time - self.last_analysis_time >= self.analysis_interval:
|
|
self.perform_analysis()
|
|
self.last_analysis_time = current_time
|
|
|
|
self.last_update_time = current_time
|
|
|
|
# 更新性能统计
|
|
self.stats['analysis_time'] += (time.time() - update_start_time)
|
|
|
|
def cleanup(self):
|
|
"""清理所有资源"""
|
|
self.analyzers.clear()
|
|
self.analysis_results.clear()
|
|
self.history.clear()
|
|
|
|
self.stats = {
|
|
'analyzers_created': 0,
|
|
'analyses_performed': 0,
|
|
'analysis_time': 0.0,
|
|
'data_points_processed': 0,
|
|
'prediction_accuracy': 0.0
|
|
}
|
|
|
|
print("✓ 分析引擎资源已清理")
|
|
|
|
def get_stats(self) -> Dict[str, int]:
|
|
"""
|
|
获取统计信息
|
|
|
|
Returns:
|
|
统计信息字典
|
|
"""
|
|
return self.stats.copy()
|
|
|
|
def get_analysis_results(self) -> Dict[str, Any]:
|
|
"""
|
|
获取分析结果
|
|
|
|
Returns:
|
|
分析结果
|
|
"""
|
|
return self.analysis_results.copy()
|
|
|
|
def get_history(self, data_type: str = 'intensity') -> List[Any]:
|
|
"""
|
|
获取历史数据
|
|
|
|
Args:
|
|
data_type: 数据类型
|
|
|
|
Returns:
|
|
历史数据列表
|
|
"""
|
|
if data_type in self.history:
|
|
return list(self.history[data_type])
|
|
return []
|
|
|
|
def set_analysis_interval(self, interval: float):
|
|
"""
|
|
设置分析间隔
|
|
|
|
Args:
|
|
interval: 分析间隔(秒)
|
|
"""
|
|
self.analysis_interval = max(0.1, interval)
|
|
print(f"✓ 分析间隔设置为: {self.analysis_interval:.2f}秒")
|
|
|
|
def set_analyzer_weight(self, analyzer_type: str, weight: float):
|
|
"""
|
|
设置分析器权重
|
|
|
|
Args:
|
|
analyzer_type: 分析器类型
|
|
weight: 权重值
|
|
"""
|
|
if analyzer_type in self.weights:
|
|
self.weights[analyzer_type] = max(0.0, weight)
|
|
print(f"✓ {analyzer_type}分析器权重设置为: {self.weights[analyzer_type]:.2f}")
|
|
else:
|
|
print(f"✗ 无效的分析器类型: {analyzer_type}")
|
|
|
|
def get_intensity_trend(self) -> float:
|
|
"""
|
|
获取强度趋势
|
|
|
|
Returns:
|
|
强度趋势值(正数表示上升,负数表示下降)
|
|
"""
|
|
try:
|
|
if len(self.history['intensity']) >= 2:
|
|
recent_values = list(self.history['intensity'])[-5:] # 最近5个值
|
|
if len(recent_values) >= 2:
|
|
# 计算线性趋势
|
|
x = np.arange(len(recent_values))
|
|
y = np.array(recent_values)
|
|
if len(x) > 1 and np.std(x) > 0 and np.std(y) > 0:
|
|
slope = np.polyfit(x, y, 1)[0] # 线性拟合的斜率
|
|
return slope
|
|
return 0.0
|
|
except Exception as e:
|
|
print(f"✗ 计算强度趋势失败: {e}")
|
|
return 0.0
|
|
|
|
def predict_next_intensity(self) -> float:
|
|
"""
|
|
预测下一个强度值
|
|
|
|
Returns:
|
|
预测的强度值
|
|
"""
|
|
try:
|
|
trend = self.get_intensity_trend()
|
|
current_intensity = self.analysis_results.get('intensity', 0.0)
|
|
predicted_intensity = current_intensity + trend
|
|
|
|
# 限制在0-1范围内
|
|
return max(0.0, min(1.0, predicted_intensity))
|
|
except Exception as e:
|
|
print(f"✗ 预测强度值失败: {e}")
|
|
return self.analysis_results.get('intensity', 0.0)
|
|
|
|
def get_dominant_factor(self) -> str:
|
|
"""
|
|
获取主导因素
|
|
|
|
Returns:
|
|
主导因素名称
|
|
"""
|
|
if not self.analysis_results or 'details' not in self.analysis_results:
|
|
return 'unknown'
|
|
|
|
details = self.analysis_results['details']
|
|
max_intensity = 0.0
|
|
dominant_factor = 'unknown'
|
|
|
|
for factor_type, values in details.items():
|
|
if values:
|
|
# 计算平均强度
|
|
try:
|
|
avg_intensity = sum(value[0].get('intensity', 0.0) * value[1]
|
|
for value in values if isinstance(value[0], dict)) / len(values)
|
|
if avg_intensity > max_intensity:
|
|
max_intensity = avg_intensity
|
|
dominant_factor = factor_type
|
|
except Exception:
|
|
pass # 忽略计算错误
|
|
|
|
return dominant_factor
|
|
|
|
def create_custom_analyzer(self, analyzer_name: str,
|
|
analysis_function: Callable[[], Dict[str, Any]]) -> bool:
|
|
"""
|
|
创建自定义分析器
|
|
|
|
Args:
|
|
analyzer_name: 分析器名称
|
|
analysis_function: 分析函数
|
|
|
|
Returns:
|
|
是否创建成功
|
|
"""
|
|
try:
|
|
analyzer_params = {
|
|
'type': 'custom',
|
|
'callback': analysis_function
|
|
}
|
|
|
|
return self.create_analyzer(analyzer_name, analyzer_params)
|
|
|
|
except Exception as e:
|
|
print(f"✗ 创建自定义分析器失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def get_analyzer_status(self, analyzer_name: str) -> Dict[str, Any]:
|
|
"""
|
|
获取分析器状态
|
|
|
|
Args:
|
|
analyzer_name: 分析器名称
|
|
|
|
Returns:
|
|
分析器状态字典
|
|
"""
|
|
if analyzer_name in self.analyzers:
|
|
analyzer = self.analyzers[analyzer_name]
|
|
return {
|
|
'name': analyzer_name,
|
|
'state': analyzer['state'],
|
|
'type': analyzer['params']['type'],
|
|
'enabled': analyzer['params']['enabled'],
|
|
'last_analysis': analyzer.get('last_analysis', 0.0),
|
|
'results_count': len(analyzer['history']),
|
|
'trend': analyzer.get('trend', 0.0),
|
|
'confidence': analyzer.get('confidence', 1.0),
|
|
'volatility': analyzer.get('volatility', 0.0)
|
|
}
|
|
else:
|
|
return {
|
|
'name': analyzer_name,
|
|
'state': 'unknown'
|
|
}
|
|
|
|
def set_prediction_settings(self, settings: Dict[str, Any]):
|
|
"""
|
|
设置预测参数
|
|
|
|
Args:
|
|
settings: 预测参数字典
|
|
"""
|
|
self.prediction_settings.update(settings)
|
|
print("✓ 预测参数已更新")
|
|
|
|
def get_prediction_settings(self) -> Dict[str, Any]:
|
|
"""
|
|
获取预测参数
|
|
|
|
Returns:
|
|
预测参数字典
|
|
"""
|
|
return self.prediction_settings.copy()
|
|
|
|
def set_emotion_analysis_ranges(self, ranges: Dict[str, tuple]):
|
|
"""
|
|
设置情绪分析范围
|
|
|
|
Args:
|
|
ranges: 范围字典
|
|
"""
|
|
self.emotion_analysis.update(ranges)
|
|
print("✓ 情绪分析范围已更新")
|
|
|
|
def get_emotion_analysis_ranges(self) -> Dict[str, tuple]:
|
|
"""
|
|
获取情绪分析范围
|
|
|
|
Returns:
|
|
范围字典
|
|
"""
|
|
return self.emotion_analysis.copy()
|
|
|
|
def get_analyzer_trend(self, analyzer_name: str) -> float:
|
|
"""
|
|
获取分析器趋势
|
|
|
|
Args:
|
|
analyzer_name: 分析器名称
|
|
|
|
Returns:
|
|
趋势值
|
|
"""
|
|
if analyzer_name in self.analyzers:
|
|
return self.analyzers[analyzer_name].get('trend', 0.0)
|
|
return 0.0
|
|
|
|
def get_analyzer_confidence(self, analyzer_name: str) -> float:
|
|
"""
|
|
获取分析器置信度
|
|
|
|
Args:
|
|
analyzer_name: 分析器名称
|
|
|
|
Returns:
|
|
置信度值
|
|
"""
|
|
if analyzer_name in self.analyzers:
|
|
return self.analyzers[analyzer_name].get('confidence', 1.0)
|
|
return 1.0
|
|
|
|
def get_overall_confidence(self) -> float:
|
|
"""
|
|
获取整体置信度
|
|
|
|
Returns:
|
|
整体置信度
|
|
"""
|
|
if not self.analyzers:
|
|
return 1.0
|
|
|
|
total_confidence = sum(analyzer.get('confidence', 1.0) for analyzer in self.analyzers.values())
|
|
return total_confidence / len(self.analyzers)
|
|
|
|
def force_analysis(self) -> Dict[str, Any]:
|
|
"""
|
|
强制执行分析(忽略时间间隔)
|
|
|
|
Returns:
|
|
分析结果
|
|
"""
|
|
print("✓ 执行强制分析")
|
|
return self.perform_analysis()
|
|
|
|
def get_analyzer_history(self, analyzer_name: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
获取分析器历史数据
|
|
|
|
Args:
|
|
analyzer_name: 分析器名称
|
|
|
|
Returns:
|
|
历史数据列表
|
|
"""
|
|
if analyzer_name in self.analyzers:
|
|
return list(self.analyzers[analyzer_name]['history'])
|
|
return []
|
|
|
|
def clear_analyzer_history(self, analyzer_name: str):
|
|
"""
|
|
清空分析器历史数据
|
|
|
|
Args:
|
|
analyzer_name: 分析器名称
|
|
"""
|
|
if analyzer_name in self.analyzers:
|
|
self.analyzers[analyzer_name]['history'].clear()
|
|
print(f"✓ 分析器 {analyzer_name} 历史数据已清空")
|
|
|
|
def set_history_length(self, length: int):
|
|
"""
|
|
设置历史数据长度
|
|
|
|
Args:
|
|
length: 历史数据长度
|
|
"""
|
|
self.history_length = max(10, length)
|
|
# 更新所有历史队列的长度
|
|
for key in self.history:
|
|
self.history[key] = deque(list(self.history[key])[-self.history_length:], maxlen=self.history_length)
|
|
print(f"✓ 历史数据长度设置为: {self.history_length}")
|
|
|
|
def get_dominant_emotion(self) -> str:
|
|
"""
|
|
获取主导情绪
|
|
|
|
Returns:
|
|
主导情绪标签
|
|
"""
|
|
try:
|
|
player_state = self.plugin.player_state if self.plugin else {}
|
|
emotional_state = player_state.get('emotional_state', 'neutral')
|
|
return emotional_state
|
|
except Exception as e:
|
|
print(f"✗ 获取主导情绪失败: {e}")
|
|
return 'neutral'
|
|
|
|
def get_emotional_valence(self) -> float:
|
|
"""
|
|
获取情绪效价(正负情绪)
|
|
|
|
Returns:
|
|
情绪效价值 (-1.0 到 1.0)
|
|
"""
|
|
try:
|
|
# 从玩家状态分析器获取效价
|
|
player_results = self.analysis_results.get('details', {}).get('player_state', [])
|
|
if player_results:
|
|
for result, _ in player_results:
|
|
if isinstance(result, dict) and 'valence' in result:
|
|
return result['valence']
|
|
return 0.0
|
|
except Exception as e:
|
|
print(f"✗ 获取情绪效价失败: {e}")
|
|
return 0.0
|
|
|
|
def get_emotional_arousal(self) -> float:
|
|
"""
|
|
获取情绪唤醒度(兴奋程度)
|
|
|
|
Returns:
|
|
情绪唤醒度值 (0.0 到 1.0)
|
|
"""
|
|
try:
|
|
# 从玩家状态分析器获取唤醒度
|
|
player_results = self.analysis_results.get('details', {}).get('player_state', [])
|
|
if player_results:
|
|
for result, _ in player_results:
|
|
if isinstance(result, dict) and 'arousal' in result:
|
|
return result['arousal']
|
|
return 0.4 # 默认中等唤醒度
|
|
except Exception as e:
|
|
print(f"✗ 获取情绪唤醒度失败: {e}")
|
|
return 0.4
|
|
|
|
def get_narrative_tension(self) -> float:
|
|
"""
|
|
获取叙事紧张度
|
|
|
|
Returns:
|
|
叙事紧张度值 (0.0 到 1.0)
|
|
"""
|
|
try:
|
|
# 从叙事分析器获取紧张度
|
|
narrative_results = self.analysis_results.get('details', {}).get('narrative', [])
|
|
if narrative_results:
|
|
for result, _ in narrative_results:
|
|
if isinstance(result, dict) and 'tension_level' in result:
|
|
return result['tension_level']
|
|
return 0.0
|
|
except Exception as e:
|
|
print(f"✗ 获取叙事紧张度失败: {e}")
|
|
return 0.0
|
|
|
|
def get_environmental_mood(self) -> float:
|
|
"""
|
|
获取环境氛围
|
|
|
|
Returns:
|
|
环境氛围值 (0.0 到 1.0)
|
|
"""
|
|
try:
|
|
# 从环境分析器获取氛围
|
|
environment_results = self.analysis_results.get('details', {}).get('environment', [])
|
|
if environment_results:
|
|
for result, _ in environment_results:
|
|
if isinstance(result, dict) and 'mood' in result:
|
|
return result['mood']
|
|
return 0.5 # 默认中性氛围
|
|
except Exception as e:
|
|
print(f"✗ 获取环境氛围失败: {e}")
|
|
return 0.5
|
|
|
|
def get_combat_tension(self) -> float:
|
|
"""
|
|
获取战斗紧张度
|
|
|
|
Returns:
|
|
战斗紧张度值 (0.0 到 1.0)
|
|
"""
|
|
try:
|
|
# 从战斗分析器获取紧张度
|
|
combat_results = self.analysis_results.get('details', {}).get('combat', [])
|
|
if combat_results:
|
|
for result, _ in combat_results:
|
|
if isinstance(result, dict) and 'tension' in result:
|
|
return result['tension']
|
|
return 0.0
|
|
except Exception as e:
|
|
print(f"✗ 获取战斗紧张度失败: {e}")
|
|
return 0.0
|
|
|
|
def get_exploration_curiosity(self) -> float:
|
|
"""
|
|
获取探索好奇心
|
|
|
|
Returns:
|
|
探索好奇心值 (0.0 到 1.0)
|
|
"""
|
|
try:
|
|
# 从探索分析器获取好奇心
|
|
exploration_results = self.analysis_results.get('details', {}).get('exploration', [])
|
|
if exploration_results:
|
|
for result, _ in exploration_results:
|
|
if isinstance(result, dict) and 'curiosity' in result:
|
|
return result['curiosity']
|
|
return 0.0
|
|
except Exception as e:
|
|
print(f"✗ 获取探索好奇心失败: {e}")
|
|
return 0.0 |