#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 脚本系统模块 负责脚本的创建、管理、挂载和运行: - 脚本管理器:统一管理所有脚本 - 脚本组件:挂载到游戏对象的脚本实例 - 脚本引擎:执行脚本逻辑 - 脚本API:提供给脚本使用的API接口 """ import os import sys import importlib import importlib.util import traceback import inspect import time from typing import Dict, List, Any, Optional, Callable from abc import ABC, abstractmethod from direct.task.TaskManagerGlobal import taskMgr from panda3d.core import PythonTask class ScriptBase(ABC): """脚本基类 - 所有用户脚本都应该继承此类""" def __init__(self): self.enabled = True self.transform = None # 挂载的对象的transform self.gameObject = None # 挂载的游戏对象 self.world = None # 引擎世界对象引用 self._script_id = None @abstractmethod def start(self): """脚本开始时调用(类似Unity的Start)""" pass @abstractmethod def update(self, dt): """每帧更新时调用(类似Unity的Update)""" pass def on_destroy(self): """脚本销毁时调用(类似Unity的OnDestroy)""" pass def on_enable(self): """脚本启用时调用""" pass def on_disable(self): """脚本禁用时调用""" pass def on_collision_enter(self, other): """碰撞开始时调用""" pass def on_collision_exit(self, other): """碰撞结束时调用""" pass def log(self, message): """日志输出""" print(f"[{self.__class__.__name__}] {message}") class ScriptComponent: """脚本组件 - 挂载到游戏对象上的脚本实例""" def __init__(self, script_instance: ScriptBase, game_object, script_manager): self.script_instance = script_instance self.game_object = game_object self.script_manager = script_manager self.enabled = True # 保存脚本名称,便于UI显示 self.script_name = script_instance.__class__.__name__ # 设置脚本实例的引用 script_instance.gameObject = game_object script_instance.transform = game_object # Panda3D中NodePath就是transform script_instance.world = script_manager.world script_instance._script_id = id(self) # 标记脚本已开始 self._started = False def start(self): """启动脚本""" if not self._started and self.enabled: try: self.script_instance.start() self._started = True except Exception as e: print(f"脚本启动失败: {e}") traceback.print_exc() def update(self, dt): """更新脚本""" if self.enabled and self._started: try: self.script_instance.update(dt) except Exception as e: print(f"脚本更新失败: {e}") traceback.print_exc() def destroy(self): """销毁脚本""" try: self.script_instance.on_destroy() except Exception as e: print(f"脚本销毁失败: {e}") traceback.print_exc() def set_enabled(self, enabled): """设置脚本启用状态""" if self.enabled != enabled: self.enabled = enabled try: if enabled: self.script_instance.on_enable() else: self.script_instance.on_disable() except Exception as e: print(f"设置脚本状态失败: {e}") traceback.print_exc() class ScriptEngine: """脚本引擎 - 负责脚本的执行和生命周期管理""" def __init__(self, world): self.world = world self.script_components: List[ScriptComponent] = [] self.update_task = None def start_engine(self): """启动脚本引擎""" if self.update_task is None: self.update_task = taskMgr.add(self._update_scripts, "script_update") print("✓ 脚本引擎已启动") def stop_engine(self): """停止脚本引擎""" if self.update_task: taskMgr.remove(self.update_task) self.update_task = None print("✓ 脚本引擎已停止") def add_script_component(self, script_component: ScriptComponent): """添加脚本组件""" self.script_components.append(script_component) # 如果引擎已运行,立即启动脚本 if self.update_task: script_component.start() def remove_script_component(self, script_component: ScriptComponent): """移除脚本组件""" if script_component in self.script_components: script_component.destroy() self.script_components.remove(script_component) def _update_scripts(self, task): """更新所有脚本(每帧调用)""" from direct.showbase.ShowBaseGlobal import globalClock dt = globalClock.getDt() # 复制列表以避免迭代时修改 components_copy = self.script_components.copy() for component in components_copy: if component.enabled: # 如果脚本还没开始,先调用start if not component._started: component.start() # 然后调用update component.update(dt) return task.cont class ScriptLoader: """脚本加载器 - 负责加载和重载脚本""" def __init__(self, script_manager): self.script_manager = script_manager self.loaded_modules = {} # 模块名 -> 模块对象 self.script_classes = {} # 脚本名 -> 脚本类 self.file_mtimes = {} # 文件路径 -> 修改时间 def load_script_from_file(self, script_path: str) -> Optional[type]: """从文件加载脚本类""" try: if not os.path.exists(script_path): print(f"脚本文件不存在: {script_path}") return None # 获取脚本名称(不包含扩展名) script_name = os.path.splitext(os.path.basename(script_path))[0] # 动态导入模块 spec = importlib.util.spec_from_file_location(script_name, script_path) if spec is None: print(f"无法创建模块规范: {script_path}") return None module = importlib.util.module_from_spec(spec) # 如果模块已经加载过,先卸载 if script_name in self.loaded_modules: self.unload_script(script_name) # 执行模块 spec.loader.exec_module(module) # 查找继承自ScriptBase的类 script_class = None for name, obj in inspect.getmembers(module, inspect.isclass): if issubclass(obj, ScriptBase) and obj != ScriptBase: script_class = obj break if script_class is None: print(f"脚本文件中没有找到继承自ScriptBase的类: {script_path}") return None # 保存模块和类信息 self.loaded_modules[script_name] = module self.script_classes[script_name] = script_class self.file_mtimes[script_path] = os.path.getmtime(script_path) print(f"✓ 成功加载脚本: {script_name} 从 {script_path}") return script_class except Exception as e: print(f"加载脚本失败 {script_path}: {e}") traceback.print_exc() return None def unload_script(self, script_name: str): """卸载脚本""" if script_name in self.loaded_modules: # 移除所有使用此脚本的组件 components_to_remove = [] for component in self.script_manager.engine.script_components: if component.script_instance.__class__.__name__ == script_name: components_to_remove.append(component) for component in components_to_remove: self.script_manager.remove_script_from_object(component.game_object, script_name) # 从sys.modules中移除 module = self.loaded_modules[script_name] if module.__name__ in sys.modules: del sys.modules[module.__name__] # 清理引用 del self.loaded_modules[script_name] if script_name in self.script_classes: del self.script_classes[script_name] print(f"✓ 脚本已卸载: {script_name}") def reload_script(self, script_path: str) -> Optional[type]: """重新加载脚本(热重载)""" script_name = os.path.splitext(os.path.basename(script_path))[0] print(f"重新加载脚本: {script_name}") # 先卸载旧版本 if script_name in self.loaded_modules: self.unload_script(script_name) # 重新加载 return self.load_script_from_file(script_path) def check_for_changes(self): """检查脚本文件是否有变化(用于热重载)""" changed_scripts = [] for script_path, old_mtime in self.file_mtimes.items(): if os.path.exists(script_path): current_mtime = os.path.getmtime(script_path) if current_mtime > old_mtime: changed_scripts.append(script_path) # 重新加载变化的脚本 for script_path in changed_scripts: self.reload_script(script_path) return len(changed_scripts) > 0 def find_script_file(self, script_name: str) -> Optional[str]: """根据脚本名称查找脚本文件路径""" # 首先检查已加载的脚本 if script_name in self.loaded_modules: module = self.loaded_modules[script_name] if hasattr(module, '__file__') and module.__file__: return module.__file__ # 在已知的文件路径中查找 for file_path in self.file_mtimes.keys(): file_name = os.path.splitext(os.path.basename(file_path))[0] if file_name == script_name: return file_path # 在脚本目录中查找 scripts_dir = self.script_manager.scripts_directory if os.path.exists(scripts_dir): for file_name in os.listdir(scripts_dir): if file_name.endswith('.py'): base_name = os.path.splitext(file_name)[0] if base_name == script_name: return os.path.join(scripts_dir, file_name) return None class ScriptAPI: """脚本API - 提供给脚本使用的API接口""" def __init__(self, world): self.world = world # ==================== 游戏对象操作 ==================== def find_object_by_name(self, name: str): """根据名称查找游戏对象""" return self.world.render.find(name) def create_object(self, name: str = "GameObject"): """创建游戏对象""" obj = self.world.render.attachNewNode(name) return obj def destroy_object(self, obj): """销毁游戏对象""" if obj: obj.removeNode() # ==================== 变换操作 ==================== def get_position(self, obj): """获取对象位置""" return obj.getPos() if obj else None def set_position(self, obj, x, y, z): """设置对象位置""" if obj: obj.setPos(x, y, z) def get_rotation(self, obj): """获取对象旋转""" return obj.getHpr() if obj else None def set_rotation(self, obj, h, p, r): """设置对象旋转""" if obj: obj.setHpr(h, p, r) def get_scale(self, obj): """获取对象缩放""" return obj.getScale() if obj else None def set_scale(self, obj, sx, sy, sz): """设置对象缩放""" if obj: obj.setScale(sx, sy, sz) # ==================== 输入系统 ==================== def is_key_pressed(self, key): """检查按键是否被按下""" # 这里需要集成到现有的输入系统 return False # 暂时返回False # ==================== 时间系统 ==================== def get_time(self): """获取游戏时间""" return time.time() def get_delta_time(self): """获取帧间隔时间""" from direct.showbase.ShowBaseGlobal import globalClock return globalClock.getDt() # ==================== 日志系统 ==================== def log(self, message): """输出日志""" print(f"[ScriptAPI] {message}") class ScriptManager: """脚本管理器 - 统一管理所有脚本功能""" def __init__(self, world): """初始化脚本管理器 Args: world: 主程序world对象引用 """ self.world = world # 初始化子系统 self.engine = ScriptEngine(world) self.loader = ScriptLoader(self) self.api = ScriptAPI(world) # 脚本存储 self.object_scripts: Dict[Any, List[ScriptComponent]] = {} # 对象 -> 脚本组件列表 self.script_templates: Dict[str, type] = {} # 脚本名 -> 脚本类 # 脚本目录 self.scripts_directory = "scripts" self._ensure_scripts_directory() # 热重载监控 self.hot_reload_enabled = True self.hot_reload_task = None print("✓ 脚本管理系统初始化完成") def _ensure_scripts_directory(self): """确保脚本目录存在""" if not os.path.exists(self.scripts_directory): os.makedirs(self.scripts_directory) # 创建示例脚本 self._create_example_script() def _create_example_script(self): """创建示例脚本""" example_script = '''#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 示例脚本 - 演示如何编写脚本 """ from core.script_system import ScriptBase class ExampleScript(ScriptBase): """示例脚本类""" def __init__(self): super().__init__() self.counter = 0 self.rotation_speed = 30.0 # 度/秒 def start(self): """脚本开始时调用""" self.log("示例脚本开始运行!") self.log(f"挂载到对象: {self.gameObject.getName()}") def update(self, dt): """每帧更新""" self.counter += 1 # 每60帧输出一次信息 if self.counter % 60 == 0: self.log(f"运行了 {self.counter} 帧") # 让对象旋转 if self.transform: current_h = self.transform.getH() new_h = current_h + self.rotation_speed * dt self.transform.setH(new_h) def on_destroy(self): """脚本销毁时调用""" self.log("示例脚本被销毁") def on_enable(self): """脚本启用时调用""" self.log("示例脚本被启用") def on_disable(self): """脚本禁用时调用""" self.log("示例脚本被禁用") ''' example_path = os.path.join(self.scripts_directory, "example_script.py") with open(example_path, 'w', encoding='utf-8') as f: f.write(example_script) print(f"✓ 创建示例脚本: {example_path}") # ==================== 脚本管理功能 ==================== def start_system(self): """启动脚本系统""" self.engine.start_engine() # 加载scripts目录中的所有现有脚本 self.load_all_scripts_from_directory() if self.hot_reload_enabled: self.start_hot_reload() print("✓ 脚本系统已启动") def stop_system(self): """停止脚本系统""" self.engine.stop_engine() self.stop_hot_reload() print("✓ 脚本系统已停止") def start_hot_reload(self): """启动热重载监控""" if self.hot_reload_task is None: self.hot_reload_task = taskMgr.add(self._check_hot_reload, "script_hot_reload") print("✓ 脚本热重载监控已启动") def stop_hot_reload(self): """停止热重载监控""" if self.hot_reload_task: taskMgr.remove(self.hot_reload_task) self.hot_reload_task = None print("✓ 脚本热重载监控已停止") def _check_hot_reload(self, task): """检查热重载(每秒调用一次)""" self.loader.check_for_changes() task.delayTime = 1.0 # 1秒后再次调用 return task.again # ==================== 脚本创建和加载 ==================== def create_script_file(self, script_name: str, template: str = "basic") -> str: """创建新的脚本文件""" script_path = os.path.join(self.scripts_directory, f"{script_name}.py") if os.path.exists(script_path): print(f"脚本文件已存在: {script_path}") return script_path # 根据模板创建脚本 if template == "basic": script_content = self._get_basic_script_template(script_name) elif template == "movement": script_content = self._get_movement_script_template(script_name) else: script_content = self._get_basic_script_template(script_name) with open(script_path, 'w', encoding='utf-8') as f: f.write(script_content) print(f"✓ 创建脚本文件: {script_path}") return script_path def _get_basic_script_template(self, script_name: str) -> str: """获取基础脚本模板""" class_name = ''.join(word.capitalize() for word in script_name.split('_')) return f'''#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ {script_name} - 自定义脚本 """ from core.script_system import ScriptBase class {class_name}(ScriptBase): """自定义脚本类""" def __init__(self): super().__init__() # 在这里初始化您的变量 def start(self): """脚本开始时调用""" self.log("脚本开始运行!") def update(self, dt): """每帧更新""" # 在这里编写更新逻辑 pass def on_destroy(self): """脚本销毁时调用""" self.log("脚本被销毁") ''' def _get_movement_script_template(self, script_name: str) -> str: """获取移动脚本模板""" class_name = ''.join(word.capitalize() for word in script_name.split('_')) return f'''#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ {script_name} - 移动脚本 """ from core.script_system import ScriptBase class {class_name}(ScriptBase): """移动脚本类""" def __init__(self): super().__init__() self.speed = 5.0 # 移动速度 self.direction = [1, 0, 0] # 移动方向 def start(self): """脚本开始时调用""" self.log("移动脚本开始运行!") def update(self, dt): """每帧更新""" if self.transform: # 计算移动偏移 offset_x = self.direction[0] * self.speed * dt offset_y = self.direction[1] * self.speed * dt offset_z = self.direction[2] * self.speed * dt # 更新位置 current_pos = self.transform.getPos() new_pos = ( current_pos.x + offset_x, current_pos.y + offset_y, current_pos.z + offset_z ) self.transform.setPos(*new_pos) def on_destroy(self): """脚本销毁时调用""" self.log("移动脚本被销毁") ''' def load_script_from_file(self, script_path: str) -> Optional[type]: """从文件加载脚本""" return self.loader.load_script_from_file(script_path) def load_all_scripts_from_directory(self, directory: str = None) -> List[str]: """从目录加载所有脚本""" if directory is None: directory = self.scripts_directory if not os.path.exists(directory): print(f"脚本目录不存在: {directory}") return [] loaded_scripts = [] for filename in os.listdir(directory): if filename.endswith('.py') and not filename.startswith('__'): script_path = os.path.join(directory, filename) script_class = self.load_script_from_file(script_path) if script_class: script_name = os.path.splitext(filename)[0] loaded_scripts.append(script_name) print(f"✓ 从目录 {directory} 加载了 {len(loaded_scripts)} 个脚本") return loaded_scripts # ==================== 脚本挂载和管理 ==================== def add_script_to_object(self, game_object, script_name: str) -> Optional[ScriptComponent]: """为对象添加脚本""" # 查找脚本类 script_class = self.loader.script_classes.get(script_name) if script_class is None: print(f"未找到脚本类: {script_name}") return None try: # 创建脚本实例 script_instance = script_class() # 创建脚本组件 script_component = ScriptComponent(script_instance, game_object, self) # 添加到对象的脚本列表 if game_object not in self.object_scripts: self.object_scripts[game_object] = [] self.object_scripts[game_object].append(script_component) # 添加到脚本引擎 self.engine.add_script_component(script_component) print(f"✓ 为对象 {game_object.getName()} 添加脚本: {script_name}") return script_component except Exception as e: print(f"添加脚本失败: {e}") traceback.print_exc() return None def remove_script_from_object(self, game_object, script_name: str) -> bool: """从游戏对象移除脚本""" if game_object not in self.object_scripts: return False script_components = self.object_scripts[game_object] removed = False for component in script_components[:]: # 复制列表以避免修改时出错 if component.script_instance.__class__.__name__ == script_name: # 从引擎移除 self.engine.remove_script_component(component) # 从对象脚本列表移除 script_components.remove(component) removed = True print(f"✓ 从对象 {game_object.getName()} 移除脚本: {script_name}") if not script_components: del self.object_scripts[game_object] # 更新节点上保存的脚本信息标签 if removed: self._update_node_script_tags_after_removal(game_object, script_name) return removed def _update_node_script_tags_after_removal(self, game_object, removed_script_name): """在移除脚本后更新节点标签""" try: # 获取对象上剩余的脚本 remaining_scripts = self.get_scripts_on_object(game_object) if not remaining_scripts: # 如果没有其他脚本,清除所有脚本标签 if game_object.hasTag("has_scripts"): game_object.clearTag("has_scripts") if game_object.hasTag("scripts_info"): game_object.clearTag("scripts_info") print(f"✓ 清除节点 {game_object.getName()} 的所有脚本标签") else: # 如果还有其他脚本,更新脚本信息标签 script_info_list = [] for script_component in remaining_scripts: script_name = script_component.script_name script_class = script_component.script_instance.__class__ script_file = self.loader.find_script_file(script_name) or "" script_info_list.append({ "name": script_name, "file": script_file }) import json game_object.setTag("has_scripts", "true") game_object.setTag("scripts_info", json.dumps(script_info_list, ensure_ascii=False)) print(f"✓ 更新节点 {game_object.getName()} 的脚本标签信息,剩余 {len(script_info_list)} 个脚本") except Exception as e: print(f"更新节点标签失败: {e}") def get_scripts_on_object(self, game_object) -> List[ScriptComponent]: """获取对象上的所有脚本""" return self.object_scripts.get(game_object, []) def get_script_on_object(self, game_object, script_name: str) -> Optional[ScriptComponent]: """获取对象上的特定脚本""" scripts = self.get_scripts_on_object(game_object) for script in scripts: if script.script_instance.__class__.__name__ == script_name: return script return None # ==================== 脚本信息查询 ==================== def get_available_scripts(self) -> List[str]: """获取所有可用的脚本名称""" return list(self.loader.script_classes.keys()) def get_script_info(self, script_name: str) -> Optional[Dict[str, Any]]: """获取脚本信息""" script_class = self.loader.script_classes.get(script_name) if script_class is None: return None return { "name": script_name, "class": script_class, "doc": script_class.__doc__, "file": inspect.getfile(script_class) if hasattr(script_class, '__file__') else None, "methods": [method for method in dir(script_class) if not method.startswith('_')] } def reload_script(self, script_name: str) -> bool: """重新加载脚本""" script_info = self.get_script_info(script_name) if script_info and script_info["file"]: return self.loader.reload_script(script_info["file"]) is not None return False # ==================== 调试功能 ==================== def list_all_scripts(self): """列出所有脚本信息""" print("\n=== 脚本系统状态 ===") print(f"可用脚本数量: {len(self.loader.script_classes)}") print(f"运行中的脚本组件数量: {len(self.engine.script_components)}") print(f"有脚本的对象数量: {len(self.object_scripts)}") if self.loader.script_classes: print("\n可用脚本:") for script_name in self.loader.script_classes: print(f" - {script_name}") if self.object_scripts: print("\n对象脚本分布:") for obj, scripts in self.object_scripts.items(): script_names = [s.script_instance.__class__.__name__ for s in scripts] print(f" - {obj.getName()}: {script_names}") print("==================\n") def save_object_scripts(self,game_object,node_data:dict): try: if game_object in self.object_scripts: scripts_data = [] for script_commponent in self.object_scripts[game_object]: script_info = { 'script_name':script_commponent.script_name, 'enabled':script_commponent.enabled, 'script_class':script_commponent.script_instance.__class__.__name__ } scripts_data.append(script_info) if scripts_data: node_data['scripts'] = scripts_data print(f"✓ 保存了 {len(scripts_data)} 个脚本到对象 {game_object.getName()}") except Exception as e: print(f"保存对象脚本信息失败: {e}") traceback.print_exc() # def restore_object_scripts(self,game_object,node_data:dict): # try: # if 'scripts' in node_data: # scripts_data = node_data['scripts'] # restored_count = 0 # for script_info in scripts_data: # script_name = script_info.get('script_name') # enabled = script_info.get('enabled',True) # # #检查脚本是否可用 # if script_name in self.loader.script_classes: #为 # 添加全局便捷函数,让脚本更容易使用API def get_script_api(): """获取脚本API实例(需要在脚本管理器初始化后使用)""" # 这个函数将在脚本系统集成到主系统后实现 return None # 导出主要类 __all__ = [ 'ScriptBase', 'ScriptComponent', 'ScriptEngine', 'ScriptLoader', 'ScriptAPI', 'ScriptManager' ]