EG/plugins/user/swarm_intelligence/environment_manager.py
2025-12-12 16:16:15 +08:00

481 lines
19 KiB
Python

"""
环境感知和响应管理器
提供群体对环境的感知能力和响应机制
"""
from panda3d.core import Vec3
import random
import math
from typing import Dict, List, Tuple, Optional, Any
class EnvironmentSensor:
"""
环境传感器类
模拟群体成员的环境感知能力
"""
def __init__(self, sensor_range: float = 15.0, sensor_accuracy: float = 0.9):
"""
初始化传感器
:param sensor_range: 传感器感知范围
:param sensor_accuracy: 传感器精度 (0.0-1.0)
"""
self.sensor_range = sensor_range
self.sensor_accuracy = sensor_accuracy
self.sensor_noise = 1.0 - sensor_accuracy # 传感器噪声水平
def sense_environment(self, position: Vec3, environment_data: Dict[str, Any]) -> Dict[str, Any]:
"""
感知环境
返回在指定位置感知到的环境信息
:param position: 感知位置
:param environment_data: 环境数据
:return: 感知到的环境信息
"""
sensed_data = {}
# 感知障碍物
sensed_data['obstacles'] = self._sense_obstacles(position, environment_data.get('obstacles', []))
# 感知其他群体
sensed_data['other_swarms'] = self._sense_other_swarms(position, environment_data.get('swarms', []))
# 感知捕食者
sensed_data['predators'] = self._sense_predators(position, environment_data.get('predators', []))
# 感知天气/环境条件
sensed_data['weather'] = self._sense_weather(position, environment_data.get('weather', {}))
# 感知资源点
sensed_data['resources'] = self._sense_resources(position, environment_data.get('resources', []))
# 感知危险区域
sensed_data['hazards'] = self._sense_hazards(position, environment_data.get('hazards', []))
# 添加传感器噪声
sensed_data = self._add_sensor_noise(sensed_data)
return sensed_data
def _sense_obstacles(self, position: Vec3, obstacles: List[Dict]) -> List[Dict]:
"""
感知障碍物
"""
sensed_obstacles = []
for obstacle in obstacles:
distance = (position - obstacle['position']).length()
if distance <= self.sensor_range:
# 计算感知误差
error_factor = random.uniform(1 - self.sensor_noise, 1 + self.sensor_noise)
sensed_obstacle = obstacle.copy()
sensed_obstacle['distance'] = distance * error_factor
sensed_obstacle['direction'] = (obstacle['position'] - position).normalized()
sensed_obstacles.append(sensed_obstacle)
return sensed_obstacles
def _sense_other_swarms(self, position: Vec3, swarms: List[Dict]) -> List[Dict]:
"""
感知其他群体
"""
sensed_swarms = []
for swarm in swarms:
# 计算群体中心位置
if 'members' in swarm and len(swarm['members']) > 0:
center = Vec3(0, 0, 0)
for member in swarm['members']:
center += member['position']
center /= len(swarm['members'])
distance = (position - center).length()
if distance <= self.sensor_range:
error_factor = random.uniform(1 - self.sensor_noise, 1 + self.sensor_noise)
sensed_swarm = {
'id': swarm['id'],
'type': swarm['type'],
'distance': distance * error_factor,
'direction': (center - position).normalized(),
'size': len(swarm['members'])
}
sensed_swarms.append(sensed_swarm)
return sensed_swarms
def _sense_predators(self, position: Vec3, predators: List[Dict]) -> List[Dict]:
"""
感知捕食者
"""
sensed_predators = []
for predator in predators:
distance = (position - predator['position']).length()
if distance <= self.sensor_range:
error_factor = random.uniform(1 - self.sensor_noise, 1 + self.sensor_noise)
sensed_predator = {
'position': predator['position'],
'distance': distance * error_factor,
'direction': (predator['position'] - position).normalized(),
'velocity': predator.get('velocity', Vec3(0, 0, 0))
}
sensed_predators.append(sensed_predator)
return sensed_predators
def _sense_weather(self, position: Vec3, weather: Dict) -> Dict:
"""
感知天气/环境条件
"""
# 天气信息通常对整个区域是相同的,但可能有局部变化
sensed_weather = weather.copy()
# 添加局部变化
if 'wind' in sensed_weather:
wind = sensed_weather['wind']
# 添加传感器噪声
noise_x = random.uniform(-self.sensor_noise, self.sensor_noise) * wind['direction'].x
noise_y = random.uniform(-self.sensor_noise, self.sensor_noise) * wind['direction'].y
sensed_weather['wind']['direction'] = Vec3(
wind['direction'].x + noise_x,
wind['direction'].y + noise_y,
wind['direction'].z
).normalized()
return sensed_weather
def _sense_resources(self, position: Vec3, resources: List[Dict]) -> List[Dict]:
"""
感知资源点
"""
sensed_resources = []
for resource in resources:
distance = (position - resource['position']).length()
if distance <= self.sensor_range:
error_factor = random.uniform(1 - self.sensor_noise, 1 + self.sensor_noise)
sensed_resource = resource.copy()
sensed_resource['distance'] = distance * error_factor
sensed_resource['direction'] = (resource['position'] - position).normalized()
sensed_resources.append(sensed_resource)
return sensed_resources
def _sense_hazards(self, position: Vec3, hazards: List[Dict]) -> List[Dict]:
"""
感知危险区域
"""
sensed_hazards = []
for hazard in hazards:
distance = (position - hazard['position']).length()
if distance <= self.sensor_range:
error_factor = random.uniform(1 - self.sensor_noise, 1 + self.sensor_noise)
sensed_hazard = hazard.copy()
sensed_hazard['distance'] = distance * error_factor
sensed_hazard['direction'] = (hazard['position'] - position).normalized()
sensed_hazards.append(sensed_hazard)
return sensed_hazards
def _add_sensor_noise(self, sensed_data: Dict) -> Dict:
"""
为感知数据添加噪声
"""
# 在实际应用中,可以对某些数值添加噪声
return sensed_data
class EnvironmentalResponseSystem:
"""
环境响应系统
根据感知到的环境信息决定群体的响应行为
"""
def __init__(self, config):
self.config = config
def calculate_response(self, member: Dict, sensed_data: Dict,
current_velocity: Vec3, current_position: Vec3) -> Vec3:
"""
根据感知数据计算响应力
"""
response_force = Vec3(0, 0, 0)
# 避开障碍物
response_force += self._avoid_obstacles(sensed_data['obstacles'], current_position, current_velocity)
# 应对捕食者
response_force += self._respond_to_predators(sensed_data['predators'], current_position)
# 寻找资源
response_force += self._seek_resources(sensed_data['resources'], current_position)
# 避开危险区域
response_force += self._avoid_hazards(sensed_data['hazards'], current_position)
# 适应天气条件
response_force += self._adapt_to_weather(sensed_data['weather'], current_position, current_velocity)
# 处理其他群体
response_force += self._respond_to_other_swarms(sensed_data['other_swarms'], current_position)
return response_force
def _avoid_obstacles(self, obstacles: List[Dict], position: Vec3, velocity: Vec3) -> Vec3:
"""
避开障碍物
"""
avoidance_force = Vec3(0, 0, 0)
for obstacle in obstacles:
distance = obstacle['distance']
if distance < 5.0: # 在近距离内才需要避障
# 计算避障方向(远离障碍物)
direction = obstacle['direction'] * -1
# 距离越近,避障力越大
magnitude = max(0, 5.0 - distance) / 5.0
avoidance_force += direction * magnitude
return avoidance_force * self.config.get("obstacle_response_weight", 2.0)
def _respond_to_predators(self, predators: List[Dict], position: Vec3) -> Vec3:
"""
响应捕食者
"""
flee_force = Vec3(0, 0, 0)
for predator in predators:
distance = predator['distance']
if distance < self.config.get("predator_radius", 15.0):
# 逃离捕食者方向
direction = predator['direction'] * -1
# 距离越近,逃离力越大
magnitude = max(0, (self.config.get("predator_radius", 15.0) - distance) / self.config.get("predator_radius", 15.0))
flee_force += direction * magnitude
return flee_force * self.config.get("predator_response_weight", 3.0)
def _seek_resources(self, resources: List[Dict], position: Vec3) -> Vec3:
"""
寻找资源
"""
if not resources:
return Vec3(0, 0, 0)
# 寻找最近的资源
nearest_resource = min(resources, key=lambda r: r['distance'])
distance = nearest_resource['distance']
if distance < 20.0: # 在一定范围内才寻求资源
direction = nearest_resource['direction']
# 距离越远,寻求力越大,当接近时减小
magnitude = max(0.1, 1.0 - distance/20.0)
seek_force = direction * magnitude
return seek_force * self.config.get("resource_seek_weight", 0.5) # 使用配置中的权重
return Vec3(0, 0, 0)
def _avoid_hazards(self, hazards: List[Dict], position: Vec3) -> Vec3:
"""
避开危险区域
"""
avoidance_force = Vec3(0, 0, 0)
for hazard in hazards:
distance = hazard['distance']
hazard_radius = hazard.get('radius', 5.0)
if distance < hazard_radius * 2: # 在危险区域两倍半径内
direction = hazard['direction'] * -1 # 远离危险
magnitude = max(0, (hazard_radius * 2 - distance) / (hazard_radius * 2))
avoidance_force += direction * magnitude
return avoidance_force * self.config.get("hazard_avoid_weight", 3.0) # 使用配置中的权重
def _adapt_to_weather(self, weather: Dict, position: Vec3, velocity: Vec3) -> Vec3:
"""
适应天气条件
"""
if not weather or 'wind' not in weather:
return Vec3(0, 0, 0)
wind = weather['wind']
wind_force = wind['direction'] * wind['strength']
# 根据成员类型调整对天气的响应
member_type = position # 位置参数用于占位
adaptation_factor = 1.0
# 实际应用中,可以根据成员类型调整响应
if hasattr(self, 'member_type'):
if self.member_type == "鸟类":
adaptation_factor = 0.7 # 鸟类受风影响较小
elif self.member_type == "昆虫":
adaptation_factor = 1.5 # 昆虫受风影响较大
return wind_force * adaptation_factor
def _respond_to_other_swarms(self, other_swarms: List[Dict], position: Vec3) -> Vec3:
"""
响应其他群体
根据群体类型和关系决定响应
"""
response_force = Vec3(0, 0, 0)
for swarm in other_swarms:
distance = swarm['distance']
swarm_type = swarm['type']
if distance < 10.0: # 在近距离内响应
direction = swarm['direction']
# 根据群体类型决定响应
if swarm_type == "predator": # 假设这是捕食者群体
# 逃离
response_force += direction * -1 * max(0, (10.0 - distance) / 10.0) * 2.0
elif swarm_type == "same": # 同类群体
# 可能靠近或远离,取决于密度
response_force += direction * max(0, (distance - 5.0) / 5.0) * 0.5
else: # 其他类型
# 中性响应
response_force += direction * 0.1
return response_force
class EnvironmentManager:
"""
环境管理器
管理整个环境的状态和变化
"""
def __init__(self, config):
self.config = config
self.sensors = {}
self.response_system = EnvironmentalResponseSystem(config)
self.environment_state = {
'obstacles': [],
'swarms': [],
'predators': [],
'weather': {},
'resources': [],
'hazards': [],
'time_of_day': 12.0, # 24小时制时间
'season': 'spring'
}
self.weather_effects = {
'wind': {'direction': Vec3(0, 1, 0), 'strength': 0.2},
'temperature': 20.0,
'precipitation': 0.0
}
def add_sensor(self, member_id: str, sensor_range: float = None, sensor_accuracy: float = None):
"""
为成员添加传感器
"""
if sensor_range is None:
sensor_range = self.config.get("sensor_range", 15.0)
if sensor_accuracy is None:
sensor_accuracy = self.config.get("sensor_accuracy", 0.9)
self.sensors[member_id] = EnvironmentSensor(sensor_range, sensor_accuracy)
def update_environment_state(self, obstacles: List[Dict], swarms: List[Dict],
predators: List[Dict], resources: List[Dict],
hazards: List[Dict]):
"""
更新环境状态
"""
self.environment_state['obstacles'] = obstacles
self.environment_state['swarms'] = swarms
self.environment_state['predators'] = predators
self.environment_state['resources'] = resources
self.environment_state['hazards'] = hazards
# 更新天气效果(简化模拟)
self._update_weather_effects()
def _update_weather_effects(self):
"""
更新天气效果(随时间变化)
"""
# 简单的天气变化模拟
time_factor = math.sin(self.environment_state['time_of_day'] * math.pi / 12) # 0-24小时周期
self.weather_effects['wind']['strength'] = 0.2 + abs(time_factor) * 0.3
# 温度变化
self.weather_effects['temperature'] = 15 + time_factor * 10
# 更新环境状态
self.environment_state['weather'] = self.weather_effects.copy()
def get_environmental_response(self, member: Dict, position: Vec3, velocity: Vec3) -> Vec3:
"""
获取成员的环境响应力
"""
member_id = id(member)
# 如果成员没有传感器,添加一个默认传感器
if member_id not in self.sensors:
self.add_sensor(member_id)
# 使用传感器感知环境
sensed_data = self.sensors[member_id].sense_environment(position, self.environment_state)
# 计算环境响应
response_force = self.response_system.calculate_response(
member, sensed_data, velocity, position
)
return response_force
def advance_time(self, dt: float):
"""
推进环境时间
"""
self.environment_state['time_of_day'] += dt * 0.1 # 时间流逝速度调整
if self.environment_state['time_of_day'] >= 24.0:
self.environment_state['time_of_day'] -= 24.0
def add_resource(self, position: Vec3, resource_type: str = "food", amount: float = 100.0):
"""
添加资源点
"""
resource = {
'position': position,
'type': resource_type,
'amount': amount,
'radius': 3.0
}
self.environment_state['resources'].append(resource)
return resource
def add_hazard(self, position: Vec3, hazard_type: str = "toxic", radius: float = 5.0):
"""
添加危险区域
"""
hazard = {
'position': position,
'type': hazard_type,
'radius': radius
}
self.environment_state['hazards'].append(hazard)
return hazard
def get_environment_info(self) -> Dict:
"""
获取环境信息
"""
return self.environment_state.copy()
def update_environment_dynamics(self):
"""
更新环境动态变化
如资源消耗、天气变化等
"""
# 更新资源(简化模拟资源消耗)
for i, resource in enumerate(self.environment_state['resources']):
if resource['amount'] > 0:
# 模拟资源随时间缓慢恢复
resource['amount'] = min(resource['amount'] + 0.01, 100.0)
# 更新天气
self._update_weather_effects()