EG/plugins/user/swarm_intelligence/learning_manager.py
2025-12-12 16:16:15 +08:00

417 lines
15 KiB
Python

"""
群体学习和适应管理器
实现群体的学习和适应能力
"""
import random
import math
from typing import List, Dict, Any
from panda3d.core import Vec3
class LearningManager:
"""
学习管理器
负责实现群体的学习和适应能力,包括经验积累、行为优化和环境适应
"""
def __init__(self, config):
self.config = config
# 学习参数
self.learning_rate = 0.1 # 学习率
self.exploration_rate = 0.3 # 探索率
self.memory_capacity = 1000 # 记忆容量
# 经验记忆
self.experience_memory = []
# 行为策略
self.behavior_policies = {}
# 环境模型
self.environment_model = {}
# 适应性参数
self.adaptation_factors = {
'speed': 1.0,
'cohesion': 1.0,
'separation': 1.0,
'alignment': 1.0
}
def add_experience(self, state: Dict, action: str, reward: float, next_state: Dict):
"""
添加经验到记忆中
:param state: 当前状态
:param action: 执行的动作
:param reward: 获得的奖励
:param next_state: 下一个状态
"""
experience = {
'state': state,
'action': action,
'reward': reward,
'next_state': next_state,
'timestamp': self._get_current_time()
}
self.experience_memory.append(experience)
# 限制记忆容量
if len(self.experience_memory) > self.memory_capacity:
self.experience_memory = self.experience_memory[-self.memory_capacity:]
def update_behavior_policies(self):
"""
更新行为策略
基于经验记忆优化行为策略
"""
if len(self.experience_memory) < 10:
return
# 简单的Q-learning更新
for experience in self.experience_memory[-100:]: # 只使用最近的经验
state = experience['state']
action = experience['action']
reward = experience['reward']
next_state = experience['next_state']
# 获取当前状态-动作对的Q值
state_key = self._state_to_key(state)
if state_key not in self.behavior_policies:
self.behavior_policies[state_key] = {}
if action not in self.behavior_policies[state_key]:
self.behavior_policies[state_key][action] = 0.0
# 获取下一个状态的最大Q值
next_state_key = self._state_to_key(next_state)
max_next_q = 0.0
if next_state_key in self.behavior_policies:
if self.behavior_policies[next_state_key]:
max_next_q = max(self.behavior_policies[next_state_key].values())
# 更新Q值
current_q = self.behavior_policies[state_key][action]
new_q = current_q + self.learning_rate * (reward + 0.9 * max_next_q - current_q)
self.behavior_policies[state_key][action] = new_q
def select_action(self, state: Dict, available_actions: List[str]) -> str:
"""
根据当前状态选择动作
:param state: 当前状态
:param available_actions: 可用动作列表
:return: 选择的动作
"""
if not available_actions:
return None
# epsilon-贪婪策略
if random.random() < self.exploration_rate:
# 探索:随机选择动作
return random.choice(available_actions)
else:
# 利用:选择最佳动作
state_key = self._state_to_key(state)
if state_key in self.behavior_policies:
# 选择Q值最高的动作
action_q_values = {
action: self.behavior_policies[state_key].get(action, 0.0)
for action in available_actions
}
return max(action_q_values, key=action_q_values.get)
else:
# 如果没有经验,随机选择
return random.choice(available_actions)
def adapt_to_environment(self, environment_state: Dict):
"""
根据环境状态调整行为参数
:param environment_state: 环境状态
"""
# 根据环境密度调整行为参数
if 'density' in environment_state:
density = environment_state['density']
# 高密度环境下增加分离权重,减少聚集权重
if density > 0.7:
self.adaptation_factors['separation'] = min(2.0, self.adaptation_factors['separation'] + 0.1)
self.adaptation_factors['cohesion'] = max(0.5, self.adaptation_factors['cohesion'] - 0.1)
# 低密度环境下增加聚集权重,减少分离权重
elif density < 0.3:
self.adaptation_factors['cohesion'] = min(2.0, self.adaptation_factors['cohesion'] + 0.1)
self.adaptation_factors['separation'] = max(0.5, self.adaptation_factors['separation'] - 0.1)
# 根据威胁水平调整速度
if 'threat_level' in environment_state:
threat_level = environment_state['threat_level']
# 高威胁环境下增加速度
if threat_level > 0.7:
self.adaptation_factors['speed'] = min(2.0, self.adaptation_factors['speed'] + 0.2)
# 低威胁环境下恢复正常速度
elif threat_level < 0.3:
self.adaptation_factors['speed'] = max(0.8, self.adaptation_factors['speed'] - 0.1)
# 根据资源丰富度调整行为
if 'resource_abundance' in environment_state:
resource_abundance = environment_state['resource_abundance']
# 资源丰富时减少移动,增加聚集
if resource_abundance > 0.7:
self.adaptation_factors['speed'] = max(0.7, self.adaptation_factors['speed'] - 0.1)
self.adaptation_factors['cohesion'] = min(1.5, self.adaptation_factors['cohesion'] + 0.1)
# 资源稀缺时增加移动,减少聚集
elif resource_abundance < 0.3:
self.adaptation_factors['speed'] = min(1.3, self.adaptation_factors['speed'] + 0.1)
self.adaptation_factors['cohesion'] = max(0.7, self.adaptation_factors['cohesion'] - 0.1)
def get_adapted_parameters(self) -> Dict[str, float]:
"""
获取适应后的参数
:return: 适应后的参数字典
"""
return self.adaptation_factors.copy()
def update_environment_model(self, observation: Dict):
"""
更新环境模型
:param observation: 环境观察
"""
# 简单的环境模型更新
for key, value in observation.items():
if key not in self.environment_model:
self.environment_model[key] = value
else:
# 平滑更新
self.environment_model[key] = 0.9 * self.environment_model[key] + 0.1 * value
def predict_environment_change(self, time_steps: int = 1) -> Dict[str, Any]:
"""
预测环境变化
:param time_steps: 预测的时间步数
:return: 预测的环境状态
"""
prediction = {}
for key, value in self.environment_model.items():
# 简单的线性预测
prediction[key] = value # 在没有趋势信息的情况下保持不变
return prediction
def calculate_reward(self, member: Dict, neighbors: List[Dict],
obstacles: List[Dict], environment_state: Dict) -> float:
"""
计算奖励值
:param member: 群体成员
:param neighbors: 邻居列表
:param obstacles: 障碍物列表
:param environment_state: 环境状态
:return: 奖励值
"""
reward = 0.0
# 奖励安全距离(避免碰撞)
safe_distance_reward = self._calculate_safe_distance_reward(member, neighbors, obstacles)
reward += safe_distance_reward
# 奖励群体凝聚力
cohesion_reward = self._calculate_cohesion_reward(member, neighbors)
reward += cohesion_reward
# 奖励环境适应性
adaptation_reward = self._calculate_adaptation_reward(member, environment_state)
reward += adaptation_reward
# 惩罚过度探索
exploration_penalty = self._calculate_exploration_penalty(member)
reward += exploration_penalty
return reward
def _calculate_safe_distance_reward(self, member: Dict, neighbors: List[Dict],
obstacles: List[Dict]) -> float:
"""
计算安全距离奖励
:param member: 群体成员
:param neighbors: 邻居列表
:param obstacles: 障碍物列表
:return: 安全距离奖励
"""
reward = 0.0
min_safe_distance = 2.0
# 检查与邻居的安全距离
for neighbor in neighbors:
distance = (member['position'] - neighbor['position']).length()
if distance < min_safe_distance:
reward -= (min_safe_distance - distance) * 2.0 # 惩罚过近距离
# 检查与障碍物的安全距离
for obstacle in obstacles:
distance = (member['position'] - obstacle['position']).length() - obstacle.get('radius', 1.0)
if distance < min_safe_distance:
reward -= (min_safe_distance - distance) * 3.0 # 惩罚过近距离
return reward
def _calculate_cohesion_reward(self, member: Dict, neighbors: List[Dict]) -> float:
"""
计算群体凝聚力奖励
:param member: 群体成员
:param neighbors: 邻居列表
:return: 凝聚力奖励
"""
if not neighbors:
return 0.0
# 计算与邻居的平均距离
total_distance = 0.0
for neighbor in neighbors:
distance = (member['position'] - neighbor['position']).length()
total_distance += distance
avg_distance = total_distance / len(neighbors)
# 理想距离范围
ideal_min_distance = 5.0
ideal_max_distance = 15.0
# 在理想范围内给予奖励
if ideal_min_distance <= avg_distance <= ideal_max_distance:
return 1.0
else:
# 距离理想范围越远,奖励越少
distance_to_ideal = min(
abs(avg_distance - ideal_min_distance),
abs(avg_distance - ideal_max_distance)
)
return max(0.0, 1.0 - distance_to_ideal * 0.1)
def _calculate_adaptation_reward(self, member: Dict, environment_state: Dict) -> float:
"""
计算环境适应性奖励
:param member: 群体成员
:param environment_state: 环境状态
:return: 适应性奖励
"""
reward = 0.0
# 根据环境条件给予奖励
if 'resource_abundance' in environment_state:
resource_abundance = environment_state['resource_abundance']
# 资源丰富时给予奖励
reward += resource_abundance * 0.5
if 'threat_level' in environment_state:
threat_level = environment_state['threat_level']
# 威胁低时给予奖励
reward += (1.0 - threat_level) * 0.3
return reward
def _calculate_exploration_penalty(self, member: Dict) -> float:
"""
计算探索惩罚
:param member: 群体成员
:return: 探索惩罚
"""
# 如果成员移动到边界附近,给予惩罚
bounds = {
'min_x': -50, 'max_x': 50,
'min_y': -50, 'max_y': 50,
'min_z': 0, 'max_z': 30
}
penalty = 0.0
buffer = 5.0
if (member['position'].x < bounds['min_x'] + buffer or
member['position'].x > bounds['max_x'] - buffer or
member['position'].y < bounds['min_y'] + buffer or
member['position'].y > bounds['max_y'] - buffer or
member['position'].z < bounds['min_z'] + buffer or
member['position'].z > bounds['max_z'] - buffer):
penalty -= 1.0
return penalty
def _state_to_key(self, state: Dict) -> str:
"""
将状态转换为键值
:param state: 状态字典
:return: 状态键值
"""
# 简单的键值生成方法
key_parts = []
for key in sorted(state.keys()):
if isinstance(state[key], (int, float)):
# 数值型状态,四舍五入到小数点后一位
key_parts.append(f"{key}:{state[key]:.1f}")
else:
key_parts.append(f"{key}:{state[key]}")
return "|".join(key_parts)
def _get_current_time(self):
"""
获取当前时间戳
"""
import time
return time.time()
def get_learning_stats(self) -> Dict:
"""
获取学习统计信息
:return: 学习统计信息
"""
return {
'experience_count': len(self.experience_memory),
'policy_count': len(self.behavior_policies),
'exploration_rate': self.exploration_rate,
'learning_rate': self.learning_rate,
'adaptation_factors': self.adaptation_factors.copy()
}
def adjust_learning_parameters(self, performance: float):
"""
根据性能调整学习参数
:param performance: 当前性能指标
"""
# 根据性能调整探索率
if performance > 0.8:
# 性能良好时减少探索
self.exploration_rate = max(0.1, self.exploration_rate - 0.05)
elif performance < 0.3:
# 性能较差时增加探索
self.exploration_rate = min(0.8, self.exploration_rate + 0.1)
# 根据性能调整学习率
if performance > 0.9:
# 性能很好时降低学习率以稳定行为
self.learning_rate = max(0.05, self.learning_rate * 0.9)
elif performance < 0.2:
# 性能很差时提高学习率以快速学习
self.learning_rate = min(0.5, self.learning_rate * 1.1)
def clear_memory(self):
"""
清空记忆
"""
self.experience_memory.clear()
self.behavior_policies.clear()
self.environment_model.clear()