""" 群体学习和适应管理器 实现群体的学习和适应能力 """ import random import math from typing import List, Dict, Any from panda3d.core import Vec3 class LearningManager: """ 学习管理器 负责实现群体的学习和适应能力,包括经验积累、行为优化和环境适应 """ def __init__(self, config): self.config = config # 学习参数 self.learning_rate = 0.1 # 学习率 self.exploration_rate = 0.3 # 探索率 self.memory_capacity = 1000 # 记忆容量 # 经验记忆 self.experience_memory = [] # 行为策略 self.behavior_policies = {} # 环境模型 self.environment_model = {} # 适应性参数 self.adaptation_factors = { 'speed': 1.0, 'cohesion': 1.0, 'separation': 1.0, 'alignment': 1.0 } def add_experience(self, state: Dict, action: str, reward: float, next_state: Dict): """ 添加经验到记忆中 :param state: 当前状态 :param action: 执行的动作 :param reward: 获得的奖励 :param next_state: 下一个状态 """ experience = { 'state': state, 'action': action, 'reward': reward, 'next_state': next_state, 'timestamp': self._get_current_time() } self.experience_memory.append(experience) # 限制记忆容量 if len(self.experience_memory) > self.memory_capacity: self.experience_memory = self.experience_memory[-self.memory_capacity:] def update_behavior_policies(self): """ 更新行为策略 基于经验记忆优化行为策略 """ if len(self.experience_memory) < 10: return # 简单的Q-learning更新 for experience in self.experience_memory[-100:]: # 只使用最近的经验 state = experience['state'] action = experience['action'] reward = experience['reward'] next_state = experience['next_state'] # 获取当前状态-动作对的Q值 state_key = self._state_to_key(state) if state_key not in self.behavior_policies: self.behavior_policies[state_key] = {} if action not in self.behavior_policies[state_key]: self.behavior_policies[state_key][action] = 0.0 # 获取下一个状态的最大Q值 next_state_key = self._state_to_key(next_state) max_next_q = 0.0 if next_state_key in self.behavior_policies: if self.behavior_policies[next_state_key]: max_next_q = max(self.behavior_policies[next_state_key].values()) # 更新Q值 current_q = self.behavior_policies[state_key][action] new_q = current_q + self.learning_rate * (reward + 0.9 * max_next_q - current_q) self.behavior_policies[state_key][action] = new_q def select_action(self, state: Dict, available_actions: List[str]) -> str: """ 根据当前状态选择动作 :param state: 当前状态 :param available_actions: 可用动作列表 :return: 选择的动作 """ if not available_actions: return None # epsilon-贪婪策略 if random.random() < self.exploration_rate: # 探索:随机选择动作 return random.choice(available_actions) else: # 利用:选择最佳动作 state_key = self._state_to_key(state) if state_key in self.behavior_policies: # 选择Q值最高的动作 action_q_values = { action: self.behavior_policies[state_key].get(action, 0.0) for action in available_actions } return max(action_q_values, key=action_q_values.get) else: # 如果没有经验,随机选择 return random.choice(available_actions) def adapt_to_environment(self, environment_state: Dict): """ 根据环境状态调整行为参数 :param environment_state: 环境状态 """ # 根据环境密度调整行为参数 if 'density' in environment_state: density = environment_state['density'] # 高密度环境下增加分离权重,减少聚集权重 if density > 0.7: self.adaptation_factors['separation'] = min(2.0, self.adaptation_factors['separation'] + 0.1) self.adaptation_factors['cohesion'] = max(0.5, self.adaptation_factors['cohesion'] - 0.1) # 低密度环境下增加聚集权重,减少分离权重 elif density < 0.3: self.adaptation_factors['cohesion'] = min(2.0, self.adaptation_factors['cohesion'] + 0.1) self.adaptation_factors['separation'] = max(0.5, self.adaptation_factors['separation'] - 0.1) # 根据威胁水平调整速度 if 'threat_level' in environment_state: threat_level = environment_state['threat_level'] # 高威胁环境下增加速度 if threat_level > 0.7: self.adaptation_factors['speed'] = min(2.0, self.adaptation_factors['speed'] + 0.2) # 低威胁环境下恢复正常速度 elif threat_level < 0.3: self.adaptation_factors['speed'] = max(0.8, self.adaptation_factors['speed'] - 0.1) # 根据资源丰富度调整行为 if 'resource_abundance' in environment_state: resource_abundance = environment_state['resource_abundance'] # 资源丰富时减少移动,增加聚集 if resource_abundance > 0.7: self.adaptation_factors['speed'] = max(0.7, self.adaptation_factors['speed'] - 0.1) self.adaptation_factors['cohesion'] = min(1.5, self.adaptation_factors['cohesion'] + 0.1) # 资源稀缺时增加移动,减少聚集 elif resource_abundance < 0.3: self.adaptation_factors['speed'] = min(1.3, self.adaptation_factors['speed'] + 0.1) self.adaptation_factors['cohesion'] = max(0.7, self.adaptation_factors['cohesion'] - 0.1) def get_adapted_parameters(self) -> Dict[str, float]: """ 获取适应后的参数 :return: 适应后的参数字典 """ return self.adaptation_factors.copy() def update_environment_model(self, observation: Dict): """ 更新环境模型 :param observation: 环境观察 """ # 简单的环境模型更新 for key, value in observation.items(): if key not in self.environment_model: self.environment_model[key] = value else: # 平滑更新 self.environment_model[key] = 0.9 * self.environment_model[key] + 0.1 * value def predict_environment_change(self, time_steps: int = 1) -> Dict[str, Any]: """ 预测环境变化 :param time_steps: 预测的时间步数 :return: 预测的环境状态 """ prediction = {} for key, value in self.environment_model.items(): # 简单的线性预测 prediction[key] = value # 在没有趋势信息的情况下保持不变 return prediction def calculate_reward(self, member: Dict, neighbors: List[Dict], obstacles: List[Dict], environment_state: Dict) -> float: """ 计算奖励值 :param member: 群体成员 :param neighbors: 邻居列表 :param obstacles: 障碍物列表 :param environment_state: 环境状态 :return: 奖励值 """ reward = 0.0 # 奖励安全距离(避免碰撞) safe_distance_reward = self._calculate_safe_distance_reward(member, neighbors, obstacles) reward += safe_distance_reward # 奖励群体凝聚力 cohesion_reward = self._calculate_cohesion_reward(member, neighbors) reward += cohesion_reward # 奖励环境适应性 adaptation_reward = self._calculate_adaptation_reward(member, environment_state) reward += adaptation_reward # 惩罚过度探索 exploration_penalty = self._calculate_exploration_penalty(member) reward += exploration_penalty return reward def _calculate_safe_distance_reward(self, member: Dict, neighbors: List[Dict], obstacles: List[Dict]) -> float: """ 计算安全距离奖励 :param member: 群体成员 :param neighbors: 邻居列表 :param obstacles: 障碍物列表 :return: 安全距离奖励 """ reward = 0.0 min_safe_distance = 2.0 # 检查与邻居的安全距离 for neighbor in neighbors: distance = (member['position'] - neighbor['position']).length() if distance < min_safe_distance: reward -= (min_safe_distance - distance) * 2.0 # 惩罚过近距离 # 检查与障碍物的安全距离 for obstacle in obstacles: distance = (member['position'] - obstacle['position']).length() - obstacle.get('radius', 1.0) if distance < min_safe_distance: reward -= (min_safe_distance - distance) * 3.0 # 惩罚过近距离 return reward def _calculate_cohesion_reward(self, member: Dict, neighbors: List[Dict]) -> float: """ 计算群体凝聚力奖励 :param member: 群体成员 :param neighbors: 邻居列表 :return: 凝聚力奖励 """ if not neighbors: return 0.0 # 计算与邻居的平均距离 total_distance = 0.0 for neighbor in neighbors: distance = (member['position'] - neighbor['position']).length() total_distance += distance avg_distance = total_distance / len(neighbors) # 理想距离范围 ideal_min_distance = 5.0 ideal_max_distance = 15.0 # 在理想范围内给予奖励 if ideal_min_distance <= avg_distance <= ideal_max_distance: return 1.0 else: # 距离理想范围越远,奖励越少 distance_to_ideal = min( abs(avg_distance - ideal_min_distance), abs(avg_distance - ideal_max_distance) ) return max(0.0, 1.0 - distance_to_ideal * 0.1) def _calculate_adaptation_reward(self, member: Dict, environment_state: Dict) -> float: """ 计算环境适应性奖励 :param member: 群体成员 :param environment_state: 环境状态 :return: 适应性奖励 """ reward = 0.0 # 根据环境条件给予奖励 if 'resource_abundance' in environment_state: resource_abundance = environment_state['resource_abundance'] # 资源丰富时给予奖励 reward += resource_abundance * 0.5 if 'threat_level' in environment_state: threat_level = environment_state['threat_level'] # 威胁低时给予奖励 reward += (1.0 - threat_level) * 0.3 return reward def _calculate_exploration_penalty(self, member: Dict) -> float: """ 计算探索惩罚 :param member: 群体成员 :return: 探索惩罚 """ # 如果成员移动到边界附近,给予惩罚 bounds = { 'min_x': -50, 'max_x': 50, 'min_y': -50, 'max_y': 50, 'min_z': 0, 'max_z': 30 } penalty = 0.0 buffer = 5.0 if (member['position'].x < bounds['min_x'] + buffer or member['position'].x > bounds['max_x'] - buffer or member['position'].y < bounds['min_y'] + buffer or member['position'].y > bounds['max_y'] - buffer or member['position'].z < bounds['min_z'] + buffer or member['position'].z > bounds['max_z'] - buffer): penalty -= 1.0 return penalty def _state_to_key(self, state: Dict) -> str: """ 将状态转换为键值 :param state: 状态字典 :return: 状态键值 """ # 简单的键值生成方法 key_parts = [] for key in sorted(state.keys()): if isinstance(state[key], (int, float)): # 数值型状态,四舍五入到小数点后一位 key_parts.append(f"{key}:{state[key]:.1f}") else: key_parts.append(f"{key}:{state[key]}") return "|".join(key_parts) def _get_current_time(self): """ 获取当前时间戳 """ import time return time.time() def get_learning_stats(self) -> Dict: """ 获取学习统计信息 :return: 学习统计信息 """ return { 'experience_count': len(self.experience_memory), 'policy_count': len(self.behavior_policies), 'exploration_rate': self.exploration_rate, 'learning_rate': self.learning_rate, 'adaptation_factors': self.adaptation_factors.copy() } def adjust_learning_parameters(self, performance: float): """ 根据性能调整学习参数 :param performance: 当前性能指标 """ # 根据性能调整探索率 if performance > 0.8: # 性能良好时减少探索 self.exploration_rate = max(0.1, self.exploration_rate - 0.05) elif performance < 0.3: # 性能较差时增加探索 self.exploration_rate = min(0.8, self.exploration_rate + 0.1) # 根据性能调整学习率 if performance > 0.9: # 性能很好时降低学习率以稳定行为 self.learning_rate = max(0.05, self.learning_rate * 0.9) elif performance < 0.2: # 性能很差时提高学习率以快速学习 self.learning_rate = min(0.5, self.learning_rate * 1.1) def clear_memory(self): """ 清空记忆 """ self.experience_memory.clear() self.behavior_policies.clear() self.environment_model.clear()