""" 行为树和状态机插件完整示例 演示高级功能和复杂用法 """ import sys import os import time import random # 添加项目根目录到Python路径 sys.path.insert(0, '/home/hello/EG') def advanced_behavior_tree_example(): """ 高级行为树示例 展示复杂的行为树功能 """ print("=== 高级行为树示例 ===") # 导入插件管理器 from plugins.plugin_manager import PluginManager # 创建一个虚拟的世界对象 class MockWorld: def __init__(self): self.render = None self.accept = lambda event, handler: None self.ignore = lambda event: None self.taskMgr = None # 创建世界实例 world = MockWorld() # 创建插件管理器 plugin_manager = PluginManager(world) plugin_manager.enable_plugin_system() # 加载行为树插件 bt_plugin = plugin_manager.load_plugin("behavior_tree") if not bt_plugin or not plugin_manager.enable_plugin("behavior_tree"): print("❌ 无法加载或启用行为树插件") return try: # 导入行为树组件 from plugins.user.behavior_tree import ( SelectorNode, SequenceNode, ParallelNode, RepeaterNode, InverterNode, TimerNode, ConditionNode, ActionNode, WaitNode, ProbabilityNode, BTNodeConfig ) # 创建黑板 blackboard = bt_plugin.create_blackboard("advanced_example", max_history_size=500) # 设置初始数据 blackboard.set("player_health", 100) blackboard.set("enemy_detected", False) blackboard.set("ammo_count", 30) blackboard.set("cover_available", True) blackboard.set("target_distance", 15.0) print("创建复杂的行为树结构...") # 创建根节点 - 优先级选择器 root_config = BTNodeConfig(name="Root", priority=10) root = SelectorNode(root_config) # 1. 生存行为序列 survival_sequence = SequenceNode(BTNodeConfig(name="SurvivalSequence")) # 检查生命值 health_condition = ConditionNode( "HealthCheck", lambda bb: bb.get("player_health", 0) > 30 ) # 寻找掩护动作 find_cover_action = ActionNode( "FindCover", lambda bb: print("🛡️ 寻找掩护...") or (bb.set("cover_available", random.random() > 0.3) or True) ) survival_sequence.add_child(health_condition) survival_sequence.add_child(find_cover_action) # 2. 战斗行为序列 combat_sequence = SequenceNode(BTNodeConfig(name="CombatSequence")) # 检查敌人 enemy_condition = ConditionNode( "EnemyCheck", lambda bb: bb.get("enemy_detected", False) ) # 攻击并移动并行节点 attack_move_parallel = ParallelNode( BTNodeConfig(name="AttackMoveParallel"), success_threshold=2, failure_threshold=1 ) # 攻击序列 attack_sequence = SequenceNode(BTNodeConfig(name="AttackSequence")) # 检查弹药 ammo_condition = ConditionNode( "AmmoCheck", lambda bb: bb.get("ammo_count", 0) > 0 ) # 射击动作(带概率和超时) shoot_action = ProbabilityNode( BTNodeConfig(name="Shoot"), success_probability=0.8, action_func=lambda bb: ( print(f"🔫 射击! 剩余弹药: {bb.get('ammo_count', 0)}") or bb.set("ammo_count", bb.get("ammo_count", 0) - 1) or True ) ) # 重新装弹动作 reload_action = ActionNode( "Reload", lambda bb: ( print("🔄 重新装弹...") or time.sleep(0.1) or # 模拟装弹时间 bb.set("ammo_count", 30) or True ) ) # 装弹序列(带计时器) reload_sequence = SequenceNode(BTNodeConfig(name="ReloadSequence")) reload_timer = TimerNode(reload_action, BTNodeConfig(name="ReloadTimer"), timeout=2.0) reload_sequence.add_child(reload_timer) # 弹药管理选择器 ammo_selector = SelectorNode(BTNodeConfig(name="AmmoSelector")) ammo_selector.add_child(shoot_action) ammo_selector.add_child(reload_sequence) attack_sequence.add_child(ammo_condition) attack_sequence.add_child(ammo_selector) # 移动动作 move_action = ActionNode( "Move", lambda bb: print(f"🏃 移动中... 距离目标: {bb.get('target_distance', 0):.1f}m") or bb.set("target_distance", max(0, bb.get('target_distance', 0) - random.uniform(1, 3))) or True ) attack_move_parallel.add_child(attack_sequence) attack_move_parallel.add_child(move_action) combat_sequence.add_child(enemy_condition) combat_sequence.add_child(attack_move_parallel) # 3. 巡逻行为(带重复和等待) patrol_sequence = SequenceNode(BTNodeConfig(name="PatrolSequence")) # 巡逻动作(重复3次) patrol_action = ActionNode( "Patrol", lambda bb: print("🔍 巡逻中...") or True ) patrol_repeater = RepeaterNode(patrol_action, BTNodeConfig(name="PatrolRepeater"), count=3) # 等待 wait_action = WaitNode("PatrolWait", duration=1.0, random_variance=0.5) patrol_sequence.add_child(patrol_repeater) patrol_sequence.add_child(wait_action) # 组装根节点 root.add_child(survival_sequence) root.add_child(combat_sequence) root.add_child(patrol_sequence) # 创建行为树 behavior_tree = bt_plugin.create_behavior_tree("advanced_ai", root) behavior_tree.set_blackboard(blackboard) print("✅ 行为树创建成功") print(f"行为树统计: {behavior_tree.get_statistics()}") # 执行行为树多次 print("\n开始执行行为树...") for i in range(10): print(f"\n--- 第 {i+1} 次执行 ---") # 模拟环境变化 if i == 3: blackboard.set("enemy_detected", True) print("🚨 发现敌人!") elif i == 6: blackboard.set("player_health", 20) print("💔 受到伤害,生命值降低!") elif i == 8: blackboard.set("enemy_detected", False) print("✅ 威胁解除!") # 执行行为树 result = behavior_tree.run() print(f"执行结果: {result.value}") # 显示黑板状态 print(f"当前状态 - 生命值: {blackboard.get('player_health')}, " f"弹药: {blackboard.get('ammo_count')}, " f"敌人: {blackboard.get('enemy_detected')}") # 显示最终统计 print(f"\n最终统计: {behavior_tree.get_statistics()}") print(f"黑板统计: {blackboard.get_statistics()}") except Exception as e: print(f"❌ 高级行为树示例执行失败: {e}") import traceback traceback.print_exc() finally: # 清理 plugin_manager.disable_plugin("behavior_tree") plugin_manager.unload_plugin("behavior_tree") def advanced_state_machine_example(): """ 高级状态机示例 展示复杂的状态机功能 """ print("\n=== 高级状态机示例 ===") # 导入插件管理器 from plugins.plugin_manager import PluginManager # 创建一个虚拟的世界对象 class MockWorld: def __init__(self): self.render = None self.accept = lambda event, handler: None self.ignore = lambda event: None self.taskMgr = None # 创建世界实例 world = MockWorld() # 创建插件管理器 plugin_manager = PluginManager(world) plugin_manager.enable_plugin_system() # 加载状态机插件 sm_plugin = plugin_manager.load_plugin("state_machine") if not sm_plugin or not plugin_manager.enable_plugin("state_machine"): print("❌ 无法加载或启用状态机插件") return try: # 导入状态机组件 from plugins.user.state_machine import ( State, StateContext, StateTransition, HierarchicalState, StateType ) # 创建状态上下文 context = StateContext() context.set("player_health", 100) context.set("player_level", 1) context.set("experience", 0) context.set("game_time", 0) print("创建复杂的状态机结构...") # 定义基础状态 class IdleState(State): def __init__(self): super().__init__("Idle", StateType.ACTIVE) def enter(self, context): print("🎮 进入空闲状态") context.set("state", "idle") def execute(self, context): print("⏸️ 空闲中...") # 添加转换条件 if not self.transitions: # 转换到移动状态 move_condition = lambda ctx: ctx.get("want_move", False) move_action = lambda ctx: print("🏃 准备移动") self.add_transition_simple("Moving", move_condition, move_action) # 转换到战斗状态 combat_condition = lambda ctx: ctx.get("enemy_nearby", False) combat_action = lambda ctx: print("⚔️ 进入战斗") self.add_transition_simple("Combat", combat_condition, combat_action) # 检查可用转换 available = self.get_available_transitions(context) if available: return available[0].to_state return None def exit(self, context): print("🔚 退出空闲状态") class MovingState(HierarchicalState): def __init__(self): super().__init__("Moving", StateType.ACTIVE) # 添加子状态 self.add_sub_state(WalkingState()) self.add_sub_state(RunningState()) self.set_default_sub_state("Walking") def enter(self, context): print("🚗 进入移动状态") context.set("state", "moving") # 进入默认子状态 self.enter_sub_state(context, "Walking") def execute(self, context): print("➡️ 移动中...") # 执行子状态 if self.current_sub_state: self.current_sub_state.execute(context) # 添加转换条件 if not self.transitions: # 转换到空闲状态 idle_condition = lambda ctx: not ctx.get("want_move", True) idle_action = lambda ctx: print("⏸️ 停止移动") self.add_transition_simple("Idle", idle_condition, idle_action) # 转换到战斗状态 combat_condition = lambda ctx: ctx.get("enemy_nearby", False) combat_action = lambda ctx: print("⚔️ 移动中遭遇敌人") self.add_transition_simple("Combat", combat_condition, combat_action) # 检查可用转换 available = self.get_available_transitions(context) if available: return available[0].to_state return None def exit(self, context): print("🔚 退出移动状态") # 退出子状态 self.exit_sub_state(context) class WalkingState(State): def __init__(self): super().__init__("Walking", StateType.ACTIVE) def enter(self, context): print("🚶 进入步行状态") context.set("movement_type", "walking") def execute(self, context): print("🚶 步行中...") # 随机切换到跑步 if random.random() < 0.1: return "Running" return None def exit(self, context): print("🔚 退出步行状态") class RunningState(State): def __init__(self): super().__init__("Running", StateType.ACTIVE) def enter(self, context): print("🏃 进入跑步状态") context.set("movement_type", "running") def execute(self, context): print("🏃 跑步中...") # 随机切换到步行 if random.random() < 0.05: return "Walking" return None def exit(self, context): print("🔚 退出跑步状态") class CombatState(HierarchicalState): def __init__(self): super().__init__("Combat", StateType.ACTIVE) # 添加战斗子状态 self.add_sub_state(AttackingState()) self.add_sub_state(DefendingState()) self.add_sub_state(FleeingState()) self.set_default_sub_state("Attacking") def enter(self, context): print("⚔️ 进入战斗状态") context.set("state", "combat") context.set("combat_start_time", time.time()) # 进入默认子状态 self.enter_sub_state(context, "Attacking") def execute(self, context): print("⚔️ 战斗中...") # 执行子状态 if self.current_sub_state: result = self.current_sub_state.execute(context) if result: # 切换子状态 self.enter_sub_state(context, result) # 添加转换条件 if not self.transitions: # 转换到空闲状态 idle_condition = lambda ctx: not ctx.get("enemy_nearby", True) idle_action = lambda ctx: print("✅ 战斗结束") self.add_transition_simple("Idle", idle_condition, idle_action) # 检查可用转换 available = self.get_available_transitions(context) if available: return available[0].to_state return None def exit(self, context): print("🔚 退出战斗状态") combat_time = time.time() - context.get("combat_start_time", time.time()) print(f"战斗持续时间: {combat_time:.2f}秒") # 退出子状态 self.exit_sub_state(context) class AttackingState(State): def __init__(self): super().__init__("Attacking", StateType.ACTIVE) def enter(self, context): print("⚔️ 进入攻击状态") context.set("combat_action", "attacking") def execute(self, context): print("⚔️ 攻击中...") # 随机切换到防御或逃跑 rand = random.random() if rand < 0.05: return "Defending" elif rand < 0.1: return "Fleeing" return None def exit(self, context): print("🔚 退出攻击状态") class DefendingState(State): def __init__(self): super().__init__("Defending", StateType.ACTIVE) def enter(self, context): print("🛡️ 进入防御状态") context.set("combat_action", "defending") def execute(self, context): print("🛡️ 防御中...") # 随机切换到攻击 if random.random() < 0.2: return "Attacking" return None def exit(self, context): print("🔚 退出防御状态") class FleeingState(State): def __init__(self): super().__init__("Fleeing", StateType.ACTIVE) def enter(self, context): print("🏃 进入逃跑状态") context.set("combat_action", "fleeing") def execute(self, context): print("🏃 逃跑中...") # 随机切换到攻击 if random.random() < 0.3: return "Attacking" return None def exit(self, context): print("🔚 退出逃跑状态") # 创建状态机 state_machine = sm_plugin.create_state_machine("advanced_game_character") state_machine.set_context(context) # 添加状态 state_machine.add_state(IdleState()) state_machine.add_state(MovingState()) state_machine.add_state(CombatState()) # 设置初始状态 state_machine.change_state("Idle") state_machine.start() print("✅ 状态机创建成功") # 添加状态变更回调 def state_change_callback(old_state, new_state): old_name = old_state.name if old_state else "None" new_name = new_state.name if new_state else "None" print(f"🔄 状态变更: {old_name} -> {new_name}") state_machine.add_state_change_callback(state_change_callback) # 更新状态机多次 print("\n开始更新状态机...") for i in range(15): print(f"\n--- 第 {i+1} 次更新 ---") print(f"当前状态: {state_machine.get_current_state_name()}") # 模拟环境变化 if i == 3: context.set("want_move", True) print("🎮 角色想要移动") elif i == 5: context.set("enemy_nearby", True) print("🚨 附近出现敌人") elif i == 10: context.set("want_move", False) context.set("enemy_nearby", False) print("✅ 状态恢复正常") # 更新状态机 state_machine.update() # 显示上下文状态 print(f"上下文 - 移动: {context.get('want_move', False)}, " f"敌人: {context.get('enemy_nearby', False)}, " f"动作: {context.get('combat_action', 'none')}") # 显示最终统计 print(f"\n最终统计: {state_machine.get_statistics()}") except Exception as e: print(f"❌ 高级状态机示例执行失败: {e}") import traceback traceback.print_exc() finally: # 清理 plugin_manager.disable_plugin("state_machine") plugin_manager.unload_plugin("state_machine") def main(): """ 主函数 """ print("行为树和状态机插件完整示例") print("=" * 50) # 运行高级行为树示例 advanced_behavior_tree_example() # 运行高级状态机示例 advanced_state_machine_example() print("\n🎉 所有示例执行完成!") if __name__ == "__main__": main()