EG/plugins/user/swarm_intelligence/examples/learning_demo.py
2025-12-12 16:16:15 +08:00

317 lines
11 KiB
Python

"""
群体学习能力示例
演示群体的学习和适应能力
"""
from panda3d.core import Vec3
import random
def create_learning_demo(swarm_manager):
"""
创建学习能力演示
群体学习如何在复杂环境中生存和适应
"""
print("创建学习能力演示...")
# 清理现有群体
swarm_manager.cleanup()
swarm_manager.swarms.clear()
swarm_manager.swarm_id_counter = 0
# 创建学习型群体
print("创建学习型群体...")
learning_swarm = swarm_manager.create_example_swarm(
swarm_type="鸟类",
member_count=40,
position_offset=Vec3(0, 0, 10)
)
# 设置学习型群体为紫色
for member in learning_swarm['members']:
if 'node' in member:
member['node'].setColor(0.5, 0, 0.5, 1) # 紫色
# 创建复杂环境
print("创建复杂环境...")
# 添加多个障碍物
obstacles = [
(Vec3(-15, -15, 8), 4.0), # 大障碍物
(Vec3(15, 15, 12), 3.0), # 中等障碍物
(Vec3(-10, 10, 6), 2.0), # 小障碍物
(Vec3(10, -10, 10), 3.5), # 中等障碍物
(Vec3(0, 0, 15), 2.5), # 中心障碍物
]
for position, radius in obstacles:
swarm_manager.create_obstacle(position, radius)
# 添加移动障碍物
moving_obstacle = {
'position': Vec3(-20, 0, 10),
'radius': 3.0,
'velocity': Vec3(2, 1, 0),
'bounds': {'min_x': -25, 'max_x': 25, 'min_y': -25, 'max_y': 25}
}
if not hasattr(swarm_manager, 'moving_obstacles'):
swarm_manager.moving_obstacles = []
swarm_manager.moving_obstacles.append(moving_obstacle)
# 添加资源点
resources = [
(Vec3(-25, -25, 5), 2.0), # 资源点1
(Vec3(25, 25, 8), 2.5), # 资源点2
(Vec3(-25, 25, 6), 1.5), # 资源点3
]
for position, radius in resources:
swarm_manager.create_obstacle(position, radius)
# 配置学习参数
print("配置学习参数...")
swarm_manager.config.set("learning_enabled", True)
swarm_manager.config.set("learning_rate", 0.1)
swarm_manager.config.set("exploration_rate", 0.3)
swarm_manager.config.set("memory_capacity", 500)
# 调整Boids参数以促进学习
swarm_manager.config.set("cohesion_weight", 0.8)
swarm_manager.config.set("separation_weight", 1.2)
swarm_manager.config.set("alignment_weight", 0.6)
swarm_manager.config.set("max_speed", 6.0)
swarm_manager.config.set("perception_radius", 12.0)
# 启用避障和边界限制
swarm_manager.config.set("obstacle_avoidance_enabled", True)
swarm_manager.config.set("boundary_enabled", True)
swarm_manager.config.set("boundary_weight", 2.0)
print("学习能力演示创建完成")
print("观察群体如何通过学习适应复杂环境")
print("群体将学会避开障碍物并寻找资源")
def create_adaptation_demo(swarm_manager):
"""
创建环境适应演示
群体学习适应不同的环境条件
"""
print("创建环境适应演示...")
# 清理现有群体
swarm_manager.cleanup()
swarm_manager.swarms.clear()
swarm_manager.swarm_id_counter = 0
# 创建适应型群体
print("创建适应型群体...")
adaptive_swarm = swarm_manager.create_example_swarm(
swarm_type="鱼类",
member_count=60,
position_offset=Vec3(0, 0, 8)
)
# 设置适应型群体为蓝色
for member in adaptive_swarm['members']:
if 'node' in member:
member['node'].setColor(0, 0, 1, 1) # 蓝色
# 创建变化的环境
print("创建变化环境...")
# 添加多个区域
# 安全区
safe_zone = swarm_manager.create_obstacle(Vec3(0, 0, 5), 8.0)
# 危险区
danger_zone = swarm_manager.create_obstacle(Vec3(15, 15, 10), 5.0)
# 资源丰富区
resource_zone = swarm_manager.create_obstacle(Vec3(-15, -15, 6), 6.0)
# 配置学习参数
print("配置适应参数...")
swarm_manager.config.set("learning_enabled", True)
swarm_manager.config.set("learning_rate", 0.15)
swarm_manager.config.set("exploration_rate", 0.25)
swarm_manager.config.set("memory_capacity", 800)
# 设置环境变化
swarm_manager.environment_changes = {
'time_points': [0, 60, 120, 180], # 时间点(秒)
'conditions': [
{'threat_level': 0.2, 'resource_abundance': 0.8}, # 安全且资源丰富
{'threat_level': 0.7, 'resource_abundance': 0.3}, # 危险且资源稀缺
{'threat_level': 0.4, 'resource_abundance': 0.6}, # 中等条件
{'threat_level': 0.1, 'resource_abundance': 0.9}, # 安全且资源丰富
]
}
# 调整参数以促进适应
swarm_manager.config.set("cohesion_weight", 1.0)
swarm_manager.config.set("separation_weight", 1.3)
swarm_manager.config.set("alignment_weight", 0.7)
swarm_manager.config.set("max_speed", 5.0)
swarm_manager.config.set("perception_radius", 10.0)
# 启用相关行为
swarm_manager.config.set("obstacle_avoidance_enabled", True)
swarm_manager.config.set("boundary_enabled", True)
swarm_manager.config.set("wander_enabled", True)
swarm_manager.config.set("wander_weight", 0.5)
print("环境适应演示创建完成")
print("观察群体如何适应变化的环境条件")
print("群体将学会在不同条件下调整行为策略")
def create_multi_task_learning_demo(swarm_manager):
"""
创建多任务学习演示
群体学习执行多个任务
"""
print("创建多任务学习演示...")
# 清理现有群体
swarm_manager.cleanup()
swarm_manager.swarms.clear()
swarm_manager.swarm_id_counter = 0
# 创建多任务群体
print("创建多任务群体...")
multi_task_swarm = swarm_manager.create_example_swarm(
swarm_type="昆虫",
member_count=100,
position_offset=Vec3(0, 0, 3)
)
# 设置多任务群体为绿色
for member in multi_task_swarm['members']:
if 'node' in member:
member['node'].setColor(0, 1, 0, 1) # 绿色
# 创建多个任务区域
print("创建任务区域...")
# 搜寻任务区域
search_area = swarm_manager.create_obstacle(Vec3(-20, 0, 4), 5.0)
# 运输任务区域
transport_area = swarm_manager.create_obstacle(Vec3(20, 0, 4), 4.0)
# 建造任务区域
build_area = swarm_manager.create_obstacle(Vec3(0, 20, 5), 6.0)
# 防御任务区域
defense_area = swarm_manager.create_obstacle(Vec3(0, -20, 3), 5.0)
# 配置学习参数
print("配置多任务学习参数...")
swarm_manager.config.set("learning_enabled", True)
swarm_manager.config.set("learning_rate", 0.12)
swarm_manager.config.set("exploration_rate", 0.35)
swarm_manager.config.set("memory_capacity", 1000)
# 设置任务轮换
swarm_manager.task_rotation = {
'time_intervals': 45, # 每45秒切换任务
'tasks': ['search', 'transport', 'build', 'defense'],
'current_task_index': 0
}
# 调整参数以适应多任务
swarm_manager.config.set("cohesion_weight", 0.9)
swarm_manager.config.set("separation_weight", 1.1)
swarm_manager.config.set("alignment_weight", 0.8)
swarm_manager.config.set("max_speed", 7.0)
swarm_manager.config.set("perception_radius", 15.0)
# 启用相关行为
swarm_manager.config.set("obstacle_avoidance_enabled", True)
swarm_manager.config.set("boundary_enabled", True)
swarm_manager.config.set("wander_enabled", True)
swarm_manager.config.set("wander_weight", 0.7)
swarm_manager.config.set("path_following_enabled", True)
print("多任务学习演示创建完成")
print("观察群体如何学习执行不同的任务")
print("群体将学会在不同任务间切换并优化策略")
def update_dynamic_environment(swarm_manager, elapsed_time):
"""
更新动态环境
"""
# 更新移动障碍物
if hasattr(swarm_manager, 'moving_obstacles'):
for obstacle in swarm_manager.moving_obstacles:
# 移动障碍物
obstacle['position'] += obstacle['velocity'] * 0.1 # 时间步长
# 边界反弹
bounds = obstacle['bounds']
if obstacle['position'].x < bounds['min_x'] or obstacle['position'].x > bounds['max_x']:
obstacle['velocity'].x *= -1
if obstacle['position'].y < bounds['min_y'] or obstacle['position'].y > bounds['max_y']:
obstacle['velocity'].y *= -1
# 保持在边界内
obstacle['position'].x = max(bounds['min_x'], min(bounds['max_x'], obstacle['position'].x))
obstacle['position'].y = max(bounds['min_y'], min(bounds['max_y'], obstacle['position'].y))
# 更新环境条件变化
if hasattr(swarm_manager, 'environment_changes'):
time_points = swarm_manager.environment_changes['time_points']
conditions = swarm_manager.environment_changes['conditions']
# 找到当前时间点
current_index = 0
for i, time_point in enumerate(time_points):
if elapsed_time >= time_point:
current_index = i
else:
break
if current_index < len(conditions):
condition = conditions[current_index]
# 这里可以更新环境可视化或其他相关参数
print(f"环境条件更新: 威胁等级 {condition['threat_level']}, 资源丰富度 {condition['resource_abundance']}")
# 更新任务轮换
if hasattr(swarm_manager, 'task_rotation'):
interval = swarm_manager.task_rotation['time_intervals']
tasks = swarm_manager.task_rotation['tasks']
# 计算当前任务索引
current_task_index = int(elapsed_time // interval) % len(tasks)
if current_task_index != swarm_manager.task_rotation['current_task_index']:
swarm_manager.task_rotation['current_task_index'] = current_task_index
print(f"任务切换到: {tasks[current_task_index]}")
def run_learning_demo(swarm_manager, demo_type="basic"):
"""
运行学习能力演示
:param swarm_manager: 群体管理器
:param demo_type: 演示类型 ("basic", "adaptation", "multi_task")
"""
print("=== 群体学习能力演示 ===")
if demo_type == "basic":
create_learning_demo(swarm_manager)
elif demo_type == "adaptation":
create_adaptation_demo(swarm_manager)
elif demo_type == "multi_task":
create_multi_task_learning_demo(swarm_manager)
else:
print(f"未知的演示类型: {demo_type}")
return
# 启用学习功能
swarm_manager.config.set("learning_enabled", True)
print(f"\n{demo_type} 学习演示已启动")
print("使用学习控制面板可以调整学习参数")
print("观察群体如何通过学习改善行为表现")
return update_dynamic_environment