1090 lines
39 KiB
Python
1090 lines
39 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
脚本系统模块
|
||
|
||
负责脚本的创建、管理、挂载和运行:
|
||
- 脚本管理器:统一管理所有脚本
|
||
- 脚本组件:挂载到游戏对象的脚本实例
|
||
- 脚本引擎:执行脚本逻辑
|
||
- 脚本API:提供给脚本使用的API接口
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import importlib
|
||
import importlib.util
|
||
import traceback
|
||
import inspect
|
||
import time
|
||
from typing import Dict, List, Any, Optional, Callable
|
||
from abc import ABC, abstractmethod
|
||
from direct.task.TaskManagerGlobal import taskMgr
|
||
from panda3d.core import PythonTask
|
||
|
||
|
||
class ScriptBase(ABC):
|
||
"""脚本基类 - 所有用户脚本都应该继承此类"""
|
||
|
||
def __init__(self):
|
||
self.enabled = True
|
||
self.transform = None # 挂载的对象的transform
|
||
self.gameObject = None # 挂载的游戏对象
|
||
self.world = None # 引擎世界对象引用
|
||
self._script_id = None
|
||
|
||
@abstractmethod
|
||
def start(self):
|
||
"""脚本开始时调用(类似Unity的Start)"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def update(self, dt):
|
||
"""每帧更新时调用(类似Unity的Update)"""
|
||
pass
|
||
|
||
def on_destroy(self):
|
||
"""脚本销毁时调用(类似Unity的OnDestroy)"""
|
||
pass
|
||
|
||
def on_enable(self):
|
||
"""脚本启用时调用"""
|
||
pass
|
||
|
||
def on_disable(self):
|
||
"""脚本禁用时调用"""
|
||
pass
|
||
|
||
def on_collision_enter(self, other):
|
||
"""碰撞开始时调用"""
|
||
pass
|
||
|
||
def on_collision_exit(self, other):
|
||
"""碰撞结束时调用"""
|
||
pass
|
||
|
||
def log(self, message):
|
||
"""日志输出"""
|
||
print(f"[{self.__class__.__name__}] {message}")
|
||
|
||
|
||
class ScriptComponent:
|
||
"""脚本组件 - 挂载到游戏对象上的脚本实例"""
|
||
|
||
def __init__(self, script_instance: ScriptBase, game_object, script_manager, script_key: Optional[str] = None):
|
||
self.script_instance = script_instance
|
||
self.game_object = game_object
|
||
self.script_manager = script_manager
|
||
self.enabled = True
|
||
|
||
# 保存脚本名称,便于UI显示
|
||
self.script_name = script_instance.__class__.__name__
|
||
self.script_key = script_key or self.script_name
|
||
|
||
# 设置脚本实例的引用
|
||
script_instance.gameObject = game_object
|
||
script_instance.transform = game_object # Panda3D中NodePath就是transform
|
||
script_instance.world = script_manager.world
|
||
script_instance._script_id = id(self)
|
||
|
||
# 标记脚本已开始
|
||
self._started = False
|
||
|
||
def start(self):
|
||
"""启动脚本"""
|
||
if not self._started and self.enabled:
|
||
try:
|
||
self.script_instance.start()
|
||
self._started = True
|
||
except Exception as e:
|
||
print(f"脚本启动失败: {e}")
|
||
traceback.print_exc()
|
||
|
||
def update(self, dt):
|
||
"""更新脚本"""
|
||
if self.enabled and self._started:
|
||
try:
|
||
self.script_instance.update(dt)
|
||
except Exception as e:
|
||
print(f"脚本更新失败: {e}")
|
||
traceback.print_exc()
|
||
|
||
def destroy(self):
|
||
"""销毁脚本"""
|
||
try:
|
||
self.script_instance.on_destroy()
|
||
except Exception as e:
|
||
print(f"脚本销毁失败: {e}")
|
||
traceback.print_exc()
|
||
|
||
def set_enabled(self, enabled):
|
||
"""设置脚本启用状态"""
|
||
if self.enabled != enabled:
|
||
self.enabled = enabled
|
||
try:
|
||
if enabled:
|
||
self.script_instance.on_enable()
|
||
else:
|
||
self.script_instance.on_disable()
|
||
except Exception as e:
|
||
print(f"设置脚本状态失败: {e}")
|
||
traceback.print_exc()
|
||
|
||
|
||
class ScriptEngine:
|
||
"""脚本引擎 - 负责脚本的执行和生命周期管理"""
|
||
|
||
def __init__(self, world):
|
||
self.world = world
|
||
self.script_components: List[ScriptComponent] = []
|
||
self.update_task = None
|
||
|
||
def start_engine(self):
|
||
"""启动脚本引擎"""
|
||
if self.update_task is None:
|
||
self.update_task = taskMgr.add(self._update_scripts, "script_update")
|
||
print("✓ 脚本引擎已启动")
|
||
|
||
def stop_engine(self):
|
||
"""停止脚本引擎"""
|
||
if self.update_task:
|
||
taskMgr.remove(self.update_task)
|
||
self.update_task = None
|
||
print("✓ 脚本引擎已停止")
|
||
|
||
def add_script_component(self, script_component: ScriptComponent):
|
||
"""添加脚本组件"""
|
||
self.script_components.append(script_component)
|
||
# 如果引擎已运行,立即启动脚本
|
||
if self.update_task:
|
||
script_component.start()
|
||
|
||
def remove_script_component(self, script_component: ScriptComponent):
|
||
"""移除脚本组件"""
|
||
if script_component in self.script_components:
|
||
script_component.destroy()
|
||
self.script_components.remove(script_component)
|
||
|
||
def _update_scripts(self, task):
|
||
"""更新所有脚本(每帧调用)"""
|
||
from direct.showbase.ShowBaseGlobal import globalClock
|
||
dt = globalClock.getDt()
|
||
|
||
# 复制列表以避免迭代时修改
|
||
components_copy = self.script_components.copy()
|
||
|
||
for component in components_copy:
|
||
if component.enabled:
|
||
# 如果脚本还没开始,先调用start
|
||
if not component._started:
|
||
component.start()
|
||
# 然后调用update
|
||
component.update(dt)
|
||
|
||
return task.cont
|
||
|
||
|
||
class ScriptLoader:
|
||
"""脚本加载器 - 负责加载和重载脚本"""
|
||
|
||
def __init__(self, script_manager):
|
||
self.script_manager = script_manager
|
||
self.loaded_modules = {} # 模块名 -> 模块对象
|
||
self.script_classes = {} # 脚本名 -> 脚本类
|
||
self.file_mtimes = {} # 文件路径 -> 修改时间
|
||
|
||
def load_script_from_file(self, script_path: str) -> Optional[type]:
|
||
"""从文件加载脚本类"""
|
||
try:
|
||
if not os.path.exists(script_path):
|
||
print(f"脚本文件不存在: {script_path}")
|
||
return None
|
||
|
||
# 获取脚本名称(不包含扩展名)
|
||
script_name = os.path.splitext(os.path.basename(script_path))[0]
|
||
|
||
# 动态导入模块
|
||
spec = importlib.util.spec_from_file_location(script_name, script_path)
|
||
if spec is None:
|
||
print(f"无法创建模块规范: {script_path}")
|
||
return None
|
||
|
||
module = importlib.util.module_from_spec(spec)
|
||
|
||
# 如果模块已经加载过,先卸载
|
||
if script_name in self.loaded_modules:
|
||
self.unload_script(script_name)
|
||
|
||
# 执行模块
|
||
spec.loader.exec_module(module)
|
||
|
||
# 查找继承自ScriptBase的类
|
||
script_class = None
|
||
for name, obj in inspect.getmembers(module, inspect.isclass):
|
||
if issubclass(obj, ScriptBase) and obj != ScriptBase:
|
||
script_class = obj
|
||
break
|
||
|
||
if script_class is None:
|
||
print(f"脚本文件中没有找到继承自ScriptBase的类: {script_path}")
|
||
return None
|
||
|
||
# 保存模块和类信息
|
||
self.loaded_modules[script_name] = module
|
||
self.script_classes[script_name] = script_class
|
||
self.file_mtimes[script_path] = os.path.getmtime(script_path)
|
||
|
||
print(f"✓ 成功加载脚本: {script_name} 从 {script_path}")
|
||
return script_class
|
||
|
||
except Exception as e:
|
||
print(f"加载脚本失败 {script_path}: {e}")
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def unload_script(self, script_name: str):
|
||
"""卸载脚本"""
|
||
if script_name in self.loaded_modules:
|
||
# 移除所有使用此脚本的组件
|
||
components_to_remove = []
|
||
for component in self.script_manager.engine.script_components:
|
||
if self.script_manager._script_matches(component, script_name):
|
||
components_to_remove.append(component)
|
||
|
||
for component in components_to_remove:
|
||
self.script_manager.remove_script_from_object(component.game_object, script_name)
|
||
|
||
# 从sys.modules中移除
|
||
module = self.loaded_modules[script_name]
|
||
if module.__name__ in sys.modules:
|
||
del sys.modules[module.__name__]
|
||
|
||
# 清理引用
|
||
del self.loaded_modules[script_name]
|
||
if script_name in self.script_classes:
|
||
del self.script_classes[script_name]
|
||
|
||
print(f"✓ 脚本已卸载: {script_name}")
|
||
|
||
def clear(self, unload_components: bool = False):
|
||
"""清空当前加载的脚本缓存。"""
|
||
if unload_components:
|
||
for script_name in list(self.loaded_modules.keys()):
|
||
try:
|
||
self.unload_script(script_name)
|
||
except Exception as e:
|
||
print(f"卸载脚本失败 {script_name}: {e}")
|
||
else:
|
||
for module in list(self.loaded_modules.values()):
|
||
module_name = getattr(module, "__name__", "")
|
||
if module_name and module_name in sys.modules:
|
||
del sys.modules[module_name]
|
||
|
||
self.loaded_modules.clear()
|
||
self.script_classes.clear()
|
||
self.file_mtimes.clear()
|
||
|
||
def reload_script(self, script_path: str) -> Optional[type]:
|
||
"""重新加载脚本(热重载)"""
|
||
script_name = os.path.splitext(os.path.basename(script_path))[0]
|
||
print(f"重新加载脚本: {script_name}")
|
||
|
||
# 先卸载旧版本
|
||
if script_name in self.loaded_modules:
|
||
self.unload_script(script_name)
|
||
|
||
# 重新加载
|
||
return self.load_script_from_file(script_path)
|
||
|
||
def check_for_changes(self):
|
||
"""检查脚本文件是否有变化(用于热重载)"""
|
||
changed_scripts = []
|
||
|
||
for script_path, old_mtime in self.file_mtimes.items():
|
||
if os.path.exists(script_path):
|
||
current_mtime = os.path.getmtime(script_path)
|
||
if current_mtime > old_mtime:
|
||
changed_scripts.append(script_path)
|
||
|
||
# 重新加载变化的脚本
|
||
for script_path in changed_scripts:
|
||
self.reload_script(script_path)
|
||
|
||
return len(changed_scripts) > 0
|
||
|
||
def find_script_file(self, script_name: str) -> Optional[str]:
|
||
"""根据脚本名称查找脚本文件路径"""
|
||
# 首先检查已加载的脚本
|
||
if script_name in self.loaded_modules:
|
||
module = self.loaded_modules[script_name]
|
||
if hasattr(module, '__file__') and module.__file__:
|
||
return module.__file__
|
||
|
||
# 在已知的文件路径中查找
|
||
for file_path in self.file_mtimes.keys():
|
||
file_name = os.path.splitext(os.path.basename(file_path))[0]
|
||
if file_name == script_name:
|
||
return file_path
|
||
|
||
# 在脚本目录中查找
|
||
scripts_dir = self.script_manager.scripts_directory
|
||
if os.path.exists(scripts_dir):
|
||
for file_name in os.listdir(scripts_dir):
|
||
if file_name.endswith(('.py', '.pyc')):
|
||
base_name = os.path.splitext(file_name)[0]
|
||
if base_name == script_name:
|
||
return os.path.join(scripts_dir, file_name)
|
||
|
||
return None
|
||
|
||
|
||
class ScriptAPI:
|
||
"""脚本API - 提供给脚本使用的API接口"""
|
||
|
||
def __init__(self, world):
|
||
self.world = world
|
||
|
||
# ==================== 游戏对象操作 ====================
|
||
|
||
def find_object_by_name(self, name: str):
|
||
"""根据名称查找游戏对象"""
|
||
return self.world.render.find(name)
|
||
|
||
def create_object(self, name: str = "GameObject"):
|
||
"""创建游戏对象"""
|
||
obj = self.world.render.attachNewNode(name)
|
||
return obj
|
||
|
||
def destroy_object(self, obj):
|
||
"""销毁游戏对象"""
|
||
if obj:
|
||
obj.removeNode()
|
||
|
||
# ==================== 变换操作 ====================
|
||
|
||
def get_position(self, obj):
|
||
"""获取对象位置"""
|
||
return obj.getPos() if obj else None
|
||
|
||
def set_position(self, obj, x, y, z):
|
||
"""设置对象位置"""
|
||
if obj:
|
||
obj.setPos(x, y, z)
|
||
|
||
def get_rotation(self, obj):
|
||
"""获取对象旋转"""
|
||
return obj.getHpr() if obj else None
|
||
|
||
def set_rotation(self, obj, h, p, r):
|
||
"""设置对象旋转"""
|
||
if obj:
|
||
obj.setHpr(h, p, r)
|
||
|
||
def get_scale(self, obj):
|
||
"""获取对象缩放"""
|
||
return obj.getScale() if obj else None
|
||
|
||
def set_scale(self, obj, sx, sy, sz):
|
||
"""设置对象缩放"""
|
||
if obj:
|
||
obj.setScale(sx, sy, sz)
|
||
|
||
# ==================== 输入系统 ====================
|
||
|
||
def is_key_pressed(self, key):
|
||
"""检查按键是否被按下"""
|
||
# 这里需要集成到现有的输入系统
|
||
return False # 暂时返回False
|
||
|
||
# ==================== 时间系统 ====================
|
||
|
||
def get_time(self):
|
||
"""获取游戏时间"""
|
||
return time.time()
|
||
|
||
def get_delta_time(self):
|
||
"""获取帧间隔时间"""
|
||
from direct.showbase.ShowBaseGlobal import globalClock
|
||
return globalClock.getDt()
|
||
|
||
# ==================== 日志系统 ====================
|
||
|
||
def log(self, message):
|
||
"""输出日志"""
|
||
print(f"[ScriptAPI] {message}")
|
||
|
||
|
||
class ScriptManager:
|
||
"""脚本管理器 - 统一管理所有脚本功能"""
|
||
|
||
def __init__(self, world):
|
||
"""初始化脚本管理器
|
||
|
||
Args:
|
||
world: 主程序world对象引用
|
||
"""
|
||
self.world = world
|
||
|
||
# 初始化子系统
|
||
self.engine = ScriptEngine(world)
|
||
self.loader = ScriptLoader(self)
|
||
self.api = ScriptAPI(world)
|
||
|
||
# 脚本存储
|
||
self.object_scripts: Dict[Any, List[ScriptComponent]] = {} # 对象 -> 脚本组件列表
|
||
self.script_templates: Dict[str, type] = {} # 脚本名 -> 脚本类
|
||
|
||
# 脚本目录
|
||
self.scripts_directory = self._normalize_scripts_directory("scripts")
|
||
self._ensure_scripts_directory()
|
||
|
||
# 热重载监控
|
||
self.hot_reload_enabled = True
|
||
self.hot_reload_task = None
|
||
|
||
print("✓ 脚本管理系统初始化完成")
|
||
|
||
def _normalize_scripts_directory(self, directory: str) -> str:
|
||
directory = directory or "scripts"
|
||
return os.path.normpath(os.path.abspath(directory))
|
||
|
||
def get_project_path(self) -> Optional[str]:
|
||
project_manager = getattr(self.world, "project_manager", None)
|
||
project_path = getattr(project_manager, "current_project_path", None)
|
||
if project_path:
|
||
return os.path.normpath(project_path)
|
||
|
||
project_path = getattr(self.world, "project_path", None)
|
||
if project_path:
|
||
return os.path.normpath(project_path)
|
||
|
||
return None
|
||
|
||
def get_project_scripts_directory(self, project_path: Optional[str] = None) -> Optional[str]:
|
||
project_path = project_path or self.get_project_path()
|
||
if not project_path:
|
||
return None
|
||
return os.path.normpath(os.path.join(project_path, "Assets", "Scripts"))
|
||
|
||
def get_script_relative_path(self, script_path: str) -> str:
|
||
if not script_path:
|
||
return ""
|
||
|
||
script_path = os.path.normpath(os.path.abspath(script_path))
|
||
project_path = self.get_project_path()
|
||
if not project_path:
|
||
return ""
|
||
|
||
try:
|
||
relative_path = os.path.relpath(script_path, project_path)
|
||
except ValueError:
|
||
return ""
|
||
|
||
if relative_path.startswith(".."):
|
||
return ""
|
||
|
||
return relative_path.replace("\\", "/")
|
||
|
||
def resolve_script_path(self, script_info: Dict[str, Any]) -> str:
|
||
if not isinstance(script_info, dict):
|
||
return ""
|
||
|
||
project_path = self.get_project_path()
|
||
candidates = []
|
||
script_guid = str(script_info.get("script_guid", "") or "").strip()
|
||
|
||
if script_guid:
|
||
try:
|
||
project_manager = getattr(self.world, "project_manager", None)
|
||
asset_database = project_manager.get_asset_database() if project_manager and hasattr(project_manager, "get_asset_database") else None
|
||
asset_record = asset_database.get_asset(script_guid) if asset_database else {}
|
||
asset_path = str(asset_record.get("asset_path", "") or "")
|
||
if asset_path and project_path:
|
||
asset_abs_path = os.path.normpath(os.path.join(project_path, asset_path.replace("/", os.sep)))
|
||
candidates.append(asset_abs_path)
|
||
except Exception:
|
||
pass
|
||
|
||
for key in ("project_relative_path", "relative_path", "path", "file"):
|
||
raw_value = str(script_info.get(key, "") or "").strip()
|
||
if not raw_value:
|
||
continue
|
||
|
||
normalized_value = raw_value.replace("/", os.sep)
|
||
if os.path.isabs(normalized_value):
|
||
candidates.append(os.path.normpath(normalized_value))
|
||
continue
|
||
|
||
if project_path:
|
||
candidates.append(os.path.normpath(os.path.join(project_path, normalized_value)))
|
||
|
||
candidates.append(os.path.normpath(os.path.join(self.scripts_directory, normalized_value)))
|
||
|
||
lower_value = normalized_value.lower()
|
||
if not lower_value.endswith((".py", ".pyc")):
|
||
candidates.append(os.path.normpath(os.path.join(self.scripts_directory, f"{normalized_value}.py")))
|
||
candidates.append(os.path.normpath(os.path.join(self.scripts_directory, f"{normalized_value}.pyc")))
|
||
elif lower_value.endswith(".py"):
|
||
candidates.append(os.path.normpath(os.path.join(self.scripts_directory, f"{os.path.splitext(normalized_value)[0]}.pyc")))
|
||
elif lower_value.endswith(".pyc"):
|
||
candidates.append(os.path.normpath(os.path.join(self.scripts_directory, f"{os.path.splitext(normalized_value)[0]}.py")))
|
||
|
||
script_name = str(script_info.get("name", "") or "").strip()
|
||
if script_name:
|
||
candidates.append(os.path.normpath(os.path.join(self.scripts_directory, f"{script_name}.py")))
|
||
candidates.append(os.path.normpath(os.path.join(self.scripts_directory, f"{script_name}.pyc")))
|
||
|
||
seen = set()
|
||
for candidate in candidates:
|
||
if not candidate or candidate in seen:
|
||
continue
|
||
seen.add(candidate)
|
||
if os.path.exists(candidate):
|
||
return candidate
|
||
|
||
if script_name:
|
||
return self.loader.find_script_file(script_name) or ""
|
||
|
||
return ""
|
||
|
||
def build_script_reference(self, script_name: str, script_file: str = "") -> Dict[str, Any]:
|
||
reference = {"name": script_name}
|
||
|
||
resolved_script_path = script_file or self.loader.find_script_file(script_name) or ""
|
||
if resolved_script_path:
|
||
resolved_script_path = os.path.normpath(os.path.abspath(resolved_script_path))
|
||
relative_path = self.get_script_relative_path(resolved_script_path)
|
||
if relative_path:
|
||
reference["project_relative_path"] = relative_path
|
||
reference["file"] = relative_path
|
||
else:
|
||
reference["file"] = resolved_script_path
|
||
|
||
try:
|
||
project_manager = getattr(self.world, "project_manager", None)
|
||
if project_manager and hasattr(project_manager, "register_project_asset"):
|
||
asset_record = project_manager.register_project_asset(resolved_script_path)
|
||
if asset_record and asset_record.get("guid"):
|
||
reference["script_guid"] = asset_record["guid"]
|
||
except Exception:
|
||
pass
|
||
|
||
return reference
|
||
|
||
def set_scripts_directory(
|
||
self,
|
||
directory: str,
|
||
*,
|
||
create: bool = True,
|
||
reload_scripts: bool = True,
|
||
) -> str:
|
||
normalized_directory = self._normalize_scripts_directory(directory)
|
||
if normalized_directory == self.scripts_directory and (
|
||
os.path.exists(normalized_directory) or not create
|
||
):
|
||
return self.scripts_directory
|
||
|
||
self.scripts_directory = normalized_directory
|
||
if create:
|
||
self._ensure_scripts_directory()
|
||
|
||
self.loader.clear(unload_components=False)
|
||
self.script_templates.clear()
|
||
|
||
if reload_scripts and os.path.exists(self.scripts_directory):
|
||
self.load_all_scripts_from_directory()
|
||
|
||
print(f"✓ 当前脚本目录已切换到: {self.scripts_directory}")
|
||
return self.scripts_directory
|
||
|
||
def _ensure_scripts_directory(self):
|
||
"""确保脚本目录存在"""
|
||
if not os.path.exists(self.scripts_directory):
|
||
os.makedirs(self.scripts_directory)
|
||
|
||
# 创建示例脚本
|
||
self._create_example_script()
|
||
|
||
def _create_example_script(self):
|
||
"""创建示例脚本"""
|
||
example_script = '''#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
示例脚本 - 演示如何编写脚本
|
||
"""
|
||
|
||
from core.script_system import ScriptBase
|
||
|
||
class ExampleScript(ScriptBase):
|
||
"""示例脚本类"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.counter = 0
|
||
self.rotation_speed = 30.0 # 度/秒
|
||
|
||
def start(self):
|
||
"""脚本开始时调用"""
|
||
self.log("示例脚本开始运行!")
|
||
self.log(f"挂载到对象: {self.gameObject.getName()}")
|
||
|
||
def update(self, dt):
|
||
"""每帧更新"""
|
||
self.counter += 1
|
||
|
||
# 每60帧输出一次信息
|
||
if self.counter % 60 == 0:
|
||
self.log(f"运行了 {self.counter} 帧")
|
||
|
||
# 让对象旋转
|
||
if self.transform:
|
||
current_h = self.transform.getH()
|
||
new_h = current_h + self.rotation_speed * dt
|
||
self.transform.setH(new_h)
|
||
|
||
def on_destroy(self):
|
||
"""脚本销毁时调用"""
|
||
self.log("示例脚本被销毁")
|
||
|
||
def on_enable(self):
|
||
"""脚本启用时调用"""
|
||
self.log("示例脚本被启用")
|
||
|
||
def on_disable(self):
|
||
"""脚本禁用时调用"""
|
||
self.log("示例脚本被禁用")
|
||
'''
|
||
|
||
example_path = os.path.join(self.scripts_directory, "example_script.py")
|
||
with open(example_path, 'w', encoding='utf-8') as f:
|
||
f.write(example_script)
|
||
|
||
print(f"✓ 创建示例脚本: {example_path}")
|
||
|
||
# ==================== 脚本管理功能 ====================
|
||
|
||
def start_system(self):
|
||
"""启动脚本系统"""
|
||
self.engine.start_engine()
|
||
|
||
# 加载scripts目录中的所有现有脚本
|
||
self.load_all_scripts_from_directory()
|
||
|
||
if self.hot_reload_enabled:
|
||
self.start_hot_reload()
|
||
|
||
print("✓ 脚本系统已启动")
|
||
|
||
def stop_system(self):
|
||
"""停止脚本系统"""
|
||
self.engine.stop_engine()
|
||
self.stop_hot_reload()
|
||
print("✓ 脚本系统已停止")
|
||
|
||
def reset_scene_state(self):
|
||
"""Clear all mounted script components before loading/replacing a scene."""
|
||
try:
|
||
for component in list(self.engine.script_components):
|
||
try:
|
||
self.engine.remove_script_component(component)
|
||
except Exception as e:
|
||
print(f"移除脚本组件失败: {e}")
|
||
self.object_scripts.clear()
|
||
print("✓ 脚本场景状态已清空")
|
||
except Exception as e:
|
||
print(f"清空脚本场景状态失败: {e}")
|
||
|
||
def start_hot_reload(self):
|
||
"""启动热重载监控"""
|
||
if self.hot_reload_task is None:
|
||
self.hot_reload_task = taskMgr.add(self._check_hot_reload, "script_hot_reload")
|
||
print("✓ 脚本热重载监控已启动")
|
||
|
||
def stop_hot_reload(self):
|
||
"""停止热重载监控"""
|
||
if self.hot_reload_task:
|
||
taskMgr.remove(self.hot_reload_task)
|
||
self.hot_reload_task = None
|
||
print("✓ 脚本热重载监控已停止")
|
||
|
||
def _check_hot_reload(self, task):
|
||
"""检查热重载(每秒调用一次)"""
|
||
self.loader.check_for_changes()
|
||
task.delayTime = 1.0 # 1秒后再次调用
|
||
return task.again
|
||
|
||
# ==================== 脚本创建和加载 ====================
|
||
|
||
def create_script_file(self, script_name: str, template: str = "basic") -> str:
|
||
"""创建新的脚本文件"""
|
||
script_base_name = os.path.splitext(script_name.strip())[0]
|
||
script_path = os.path.join(self.scripts_directory, f"{script_base_name}.py")
|
||
|
||
if not os.path.exists(self.scripts_directory):
|
||
os.makedirs(self.scripts_directory)
|
||
|
||
if os.path.exists(script_path):
|
||
print(f"脚本文件已存在: {script_path}")
|
||
return script_path
|
||
|
||
# 根据模板创建脚本
|
||
if template == "basic":
|
||
script_content = self._get_basic_script_template(script_base_name)
|
||
elif template == "movement":
|
||
script_content = self._get_movement_script_template(script_base_name)
|
||
else:
|
||
script_content = self._get_basic_script_template(script_base_name)
|
||
|
||
with open(script_path, 'w', encoding='utf-8') as f:
|
||
f.write(script_content)
|
||
|
||
print(f"✓ 创建脚本文件: {script_path}")
|
||
return script_path
|
||
|
||
def _build_script_class_name(self, script_name: str) -> str:
|
||
normalized_parts = []
|
||
for raw_part in script_name.replace('-', '_').split('_'):
|
||
part = ''.join(ch for ch in raw_part if ch.isalnum())
|
||
if part:
|
||
normalized_parts.append(part.capitalize())
|
||
|
||
class_name = ''.join(normalized_parts) or "GeneratedScript"
|
||
if class_name[0].isdigit():
|
||
class_name = f"Script{class_name}"
|
||
return class_name
|
||
|
||
def _get_basic_script_template(self, script_name: str) -> str:
|
||
"""获取基础脚本模板"""
|
||
class_name = self._build_script_class_name(script_name)
|
||
|
||
return f'''#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
{script_name} - 自定义脚本
|
||
"""
|
||
|
||
from core.script_system import ScriptBase
|
||
|
||
class {class_name}(ScriptBase):
|
||
"""自定义脚本类"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
# 在这里初始化您的变量
|
||
|
||
def start(self):
|
||
"""脚本开始时调用"""
|
||
self.log("脚本开始运行!")
|
||
|
||
def update(self, dt):
|
||
"""每帧更新"""
|
||
# 在这里编写更新逻辑
|
||
pass
|
||
|
||
def on_destroy(self):
|
||
"""脚本销毁时调用"""
|
||
self.log("脚本被销毁")
|
||
'''
|
||
|
||
def _get_movement_script_template(self, script_name: str) -> str:
|
||
"""获取移动脚本模板"""
|
||
class_name = self._build_script_class_name(script_name)
|
||
|
||
return f'''#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
{script_name} - 移动脚本
|
||
"""
|
||
|
||
from core.script_system import ScriptBase
|
||
|
||
class {class_name}(ScriptBase):
|
||
"""移动脚本类"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.speed = 5.0 # 移动速度
|
||
self.direction = [1, 0, 0] # 移动方向
|
||
|
||
def start(self):
|
||
"""脚本开始时调用"""
|
||
self.log("移动脚本开始运行!")
|
||
|
||
def update(self, dt):
|
||
"""每帧更新"""
|
||
if self.transform:
|
||
# 计算移动偏移
|
||
offset_x = self.direction[0] * self.speed * dt
|
||
offset_y = self.direction[1] * self.speed * dt
|
||
offset_z = self.direction[2] * self.speed * dt
|
||
|
||
# 更新位置
|
||
current_pos = self.transform.getPos()
|
||
new_pos = (
|
||
current_pos.x + offset_x,
|
||
current_pos.y + offset_y,
|
||
current_pos.z + offset_z
|
||
)
|
||
self.transform.setPos(*new_pos)
|
||
|
||
def on_destroy(self):
|
||
"""脚本销毁时调用"""
|
||
self.log("移动脚本被销毁")
|
||
'''
|
||
|
||
def load_script_from_file(self, script_path: str) -> Optional[type]:
|
||
"""从文件加载脚本"""
|
||
return self.loader.load_script_from_file(script_path)
|
||
|
||
def load_all_scripts_from_directory(self, directory: str = None) -> List[str]:
|
||
"""从目录加载所有脚本"""
|
||
if directory is None:
|
||
directory = self.scripts_directory
|
||
else:
|
||
directory = self._normalize_scripts_directory(directory)
|
||
|
||
if not os.path.exists(directory):
|
||
print(f"脚本目录不存在: {directory}")
|
||
return []
|
||
|
||
loaded_scripts = []
|
||
seen_script_names = set()
|
||
for filename in sorted(os.listdir(directory)):
|
||
if filename.startswith('__') or not filename.endswith(('.py', '.pyc')):
|
||
continue
|
||
|
||
script_name = os.path.splitext(filename)[0]
|
||
if script_name in seen_script_names:
|
||
continue
|
||
|
||
preferred_path = os.path.join(directory, f"{script_name}.py")
|
||
script_path = preferred_path if os.path.exists(preferred_path) else os.path.join(directory, filename)
|
||
if not os.path.exists(script_path):
|
||
continue
|
||
|
||
script_class = self.load_script_from_file(script_path)
|
||
if script_class:
|
||
loaded_scripts.append(script_name)
|
||
seen_script_names.add(script_name)
|
||
|
||
print(f"✓ 从目录 {directory} 加载了 {len(loaded_scripts)} 个脚本")
|
||
return loaded_scripts
|
||
|
||
# ==================== 脚本挂载和管理 ====================
|
||
|
||
def add_script_to_object(self, game_object, script_name: str) -> Optional[ScriptComponent]:
|
||
"""为对象添加脚本"""
|
||
# 查找脚本类
|
||
script_class = self.loader.script_classes.get(script_name)
|
||
if script_class is None:
|
||
print(f"未找到脚本类: {script_name}")
|
||
return None
|
||
|
||
try:
|
||
# 创建脚本实例
|
||
script_instance = script_class()
|
||
|
||
# 创建脚本组件
|
||
script_component = ScriptComponent(script_instance, game_object, self, script_key=script_name)
|
||
|
||
# 添加到对象的脚本列表
|
||
if game_object not in self.object_scripts:
|
||
self.object_scripts[game_object] = []
|
||
self.object_scripts[game_object].append(script_component)
|
||
|
||
# 添加到脚本引擎
|
||
self.engine.add_script_component(script_component)
|
||
|
||
print(f"✓ 为对象 {game_object.getName()} 添加脚本: {script_name}")
|
||
return script_component
|
||
|
||
except Exception as e:
|
||
print(f"添加脚本失败: {e}")
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def remove_script_from_object(self, game_object, script_name: str) -> bool:
|
||
"""从游戏对象移除脚本"""
|
||
if game_object not in self.object_scripts:
|
||
return False
|
||
|
||
script_components = self.object_scripts[game_object]
|
||
removed = False
|
||
|
||
for component in script_components[:]: # 复制列表以避免修改时出错
|
||
if self._script_matches(component, script_name):
|
||
# 从引擎移除
|
||
self.engine.remove_script_component(component)
|
||
# 从对象脚本列表移除
|
||
script_components.remove(component)
|
||
removed = True
|
||
|
||
print(f"✓ 从对象 {game_object.getName()} 移除脚本: {script_name}")
|
||
|
||
|
||
if not script_components:
|
||
del self.object_scripts[game_object]
|
||
|
||
# 更新节点上保存的脚本信息标签
|
||
if removed:
|
||
self._update_node_script_tags_after_removal(game_object, script_name)
|
||
|
||
return removed
|
||
|
||
def _script_matches(self, component: ScriptComponent, script_identifier: str) -> bool:
|
||
return script_identifier in {
|
||
getattr(component, "script_key", None),
|
||
getattr(component, "script_name", None),
|
||
component.script_instance.__class__.__name__,
|
||
}
|
||
|
||
def _update_node_script_tags_after_removal(self, game_object, removed_script_name):
|
||
"""在移除脚本后更新节点标签"""
|
||
try:
|
||
# 获取对象上剩余的脚本
|
||
remaining_scripts = self.get_scripts_on_object(game_object)
|
||
|
||
if not remaining_scripts:
|
||
# 如果没有其他脚本,清除所有脚本标签
|
||
if game_object.hasTag("has_scripts"):
|
||
game_object.clearTag("has_scripts")
|
||
if game_object.hasTag("scripts_info"):
|
||
game_object.clearTag("scripts_info")
|
||
print(f"✓ 清除节点 {game_object.getName()} 的所有脚本标签")
|
||
else:
|
||
# 如果还有其他脚本,更新脚本信息标签
|
||
script_info_list = []
|
||
for script_component in remaining_scripts:
|
||
script_name = script_component.script_name
|
||
script_file = self.loader.find_script_file(script_name) or ""
|
||
script_info_list.append(self.build_script_reference(script_name, script_file))
|
||
|
||
import json
|
||
game_object.setTag("has_scripts", "true")
|
||
game_object.setTag("scripts_info", json.dumps(script_info_list, ensure_ascii=False))
|
||
print(f"✓ 更新节点 {game_object.getName()} 的脚本标签信息,剩余 {len(script_info_list)} 个脚本")
|
||
|
||
except Exception as e:
|
||
print(f"更新节点标签失败: {e}")
|
||
|
||
def get_scripts_on_object(self, game_object) -> List[ScriptComponent]:
|
||
"""获取对象上的所有脚本"""
|
||
return self.object_scripts.get(game_object, [])
|
||
|
||
def get_script_on_object(self, game_object, script_name: str) -> Optional[ScriptComponent]:
|
||
"""获取对象上的特定脚本"""
|
||
scripts = self.get_scripts_on_object(game_object)
|
||
for script in scripts:
|
||
if self._script_matches(script, script_name):
|
||
return script
|
||
return None
|
||
|
||
# ==================== 脚本信息查询 ====================
|
||
|
||
def get_available_scripts(self) -> List[str]:
|
||
"""获取所有可用的脚本名称"""
|
||
return list(self.loader.script_classes.keys())
|
||
|
||
def get_script_info(self, script_name: str) -> Optional[Dict[str, Any]]:
|
||
"""获取脚本信息"""
|
||
script_class = self.loader.script_classes.get(script_name)
|
||
if script_class is None:
|
||
return None
|
||
|
||
return {
|
||
"name": script_name,
|
||
"class": script_class,
|
||
"doc": script_class.__doc__,
|
||
"file": self.loader.find_script_file(script_name) or inspect.getsourcefile(script_class),
|
||
"methods": [method for method in dir(script_class) if not method.startswith('_')]
|
||
}
|
||
|
||
def reload_script(self, script_name: str) -> bool:
|
||
"""重新加载脚本"""
|
||
script_info = self.get_script_info(script_name)
|
||
if script_info and script_info["file"]:
|
||
return self.loader.reload_script(script_info["file"]) is not None
|
||
return False
|
||
|
||
def set_hot_reload_enabled(self, enabled: bool):
|
||
"""切换热重载并同步后台监控任务。"""
|
||
enabled = bool(enabled)
|
||
if self.hot_reload_enabled == enabled:
|
||
return
|
||
|
||
self.hot_reload_enabled = enabled
|
||
if enabled:
|
||
self.start_hot_reload()
|
||
else:
|
||
self.stop_hot_reload()
|
||
|
||
# ==================== 调试功能 ====================
|
||
|
||
def list_all_scripts(self):
|
||
"""列出所有脚本信息"""
|
||
print("\n=== 脚本系统状态 ===")
|
||
print(f"可用脚本数量: {len(self.loader.script_classes)}")
|
||
print(f"运行中的脚本组件数量: {len(self.engine.script_components)}")
|
||
print(f"有脚本的对象数量: {len(self.object_scripts)}")
|
||
|
||
if self.loader.script_classes:
|
||
print("\n可用脚本:")
|
||
for script_name in self.loader.script_classes:
|
||
print(f" - {script_name}")
|
||
|
||
if self.object_scripts:
|
||
print("\n对象脚本分布:")
|
||
for obj, scripts in self.object_scripts.items():
|
||
script_names = [s.script_instance.__class__.__name__ for s in scripts]
|
||
print(f" - {obj.getName()}: {script_names}")
|
||
|
||
print("==================\n")
|
||
|
||
def save_object_scripts(self,game_object,node_data:dict):
|
||
try:
|
||
if game_object in self.object_scripts:
|
||
scripts_data = []
|
||
for script_commponent in self.object_scripts[game_object]:
|
||
script_info = {
|
||
'script_name':script_commponent.script_name,
|
||
'enabled':script_commponent.enabled,
|
||
'script_class':script_commponent.script_instance.__class__.__name__
|
||
}
|
||
scripts_data.append(script_info)
|
||
if scripts_data:
|
||
node_data['scripts'] = scripts_data
|
||
print(f"✓ 保存了 {len(scripts_data)} 个脚本到对象 {game_object.getName()}")
|
||
except Exception as e:
|
||
print(f"保存对象脚本信息失败: {e}")
|
||
traceback.print_exc()
|
||
|
||
# def restore_object_scripts(self,game_object,node_data:dict):
|
||
# try:
|
||
# if 'scripts' in node_data:
|
||
# scripts_data = node_data['scripts']
|
||
# restored_count = 0
|
||
# for script_info in scripts_data:
|
||
# script_name = script_info.get('script_name')
|
||
# enabled = script_info.get('enabled',True)
|
||
#
|
||
# #检查脚本是否可用
|
||
# if script_name in self.loader.script_classes:
|
||
#为
|
||
|
||
# 添加全局便捷函数,让脚本更容易使用API
|
||
def get_script_api():
|
||
"""获取脚本API实例(需要在脚本管理器初始化后使用)"""
|
||
# 这个函数将在脚本系统集成到主系统后实现
|
||
return None
|
||
|
||
|
||
# 导出主要类
|
||
__all__ = [
|
||
'ScriptBase', 'ScriptComponent', 'ScriptEngine',
|
||
'ScriptLoader', 'ScriptAPI', 'ScriptManager'
|
||
]
|