EG/project/project_manager.py
2026-04-07 10:02:09 +08:00

3210 lines
138 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import copy
import datetime
import subprocess
import shutil
import importlib.util
import py_compile
import uuid
from panda3d.core import Filename, ModelRoot, NodePath
from project.asset_database import AssetDatabase
from project.project_schema import (
ENGINE_NAME,
SCENE_DESCRIPTION_EXTENSION,
PROJECT_SCHEMA_VERSION,
ProjectLayout,
default_build_settings,
ensure_project_directories,
generate_guid,
normalize_path,
relative_project_path,
)
from project.scene_description import (
build_scene_cook_manifest,
build_scene_description_from_world,
build_runtime_manifest,
build_runtime_scene,
load_json,
normalize_scene_description,
save_json,
)
from project.webgl_packager import WebGLPackager
class ProjectManager:
def __init__(self, world):
self.world = world
self.current_project_path = None
self.project_config = None
self.current_scene_guid = None
self._asset_database = None
self.last_webgl_export_report = None
print("✓ 项目管理系统初始化完成")
def _get_repo_root(self):
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def _vector_differs(self, current_value, target_values, tolerance=1e-4):
try:
current_list = [float(current_value[i]) for i in range(3)]
except Exception:
try:
current_list = [float(current_value.x), float(current_value.y), float(current_value.z)]
except Exception:
current_list = []
if len(current_list) < 3:
return True
try:
target_list = [float(target_values[0]), float(target_values[1]), float(target_values[2])]
except Exception:
return True
return any(abs(current_list[i] - target_list[i]) > tolerance for i in range(3))
def _log_render_runtime_stats(self, label):
"""Print renderer/task stats to compare manual import vs project-open paths."""
world = getattr(self, "world", None)
if not world:
return
try:
win = getattr(world, "win", None)
num_regions = win.getNumDisplayRegions() if win else 0
region_lines = []
if win:
for index in range(num_regions):
try:
dr = win.getDisplayRegion(index)
camera = dr.getCamera()
camera_name = "None"
if camera and not camera.isEmpty():
camera_name = camera.getName()
region_lines.append(
f"{index}:{dr.getSort()}:{int(bool(dr.isActive()))}:{camera_name}"
)
except Exception:
continue
graphics_engine = getattr(world, "graphicsEngine", None)
num_windows = graphics_engine.getNumWindows() if graphics_engine else 0
render = getattr(world, "render", None)
camera_count = 0
special_camera_counts = {}
if render and not render.isEmpty():
for pattern in ("**/+Camera",):
try:
camera_count += render.find_all_matches(pattern).get_num_paths()
except Exception:
pass
for camera_name in (
"gizmo_overlay_cam",
"pick_camera",
"selection_outline_mask_camera",
):
try:
special_camera_counts[camera_name] = render.find_all_matches(
f"**/{camera_name}"
).get_num_paths()
except Exception:
special_camera_counts[camera_name] = 0
task_mgr = getattr(world, "taskMgr", None) or getattr(world, "task_mgr", None)
task_names = []
if task_mgr:
try:
for task in list(task_mgr.getTasks()):
try:
task_names.append(task.name)
except Exception:
continue
except Exception:
task_names = []
interesting_tasks = [
name for name in task_names
if (
"gizmo" in str(name).lower()
or "outline" in str(name).lower()
or "pick" in str(name).lower()
or "lui" in str(name).lower()
or "canvas" in str(name).lower()
)
]
print(
f"[RenderStats:{label}] regions={num_regions} windows={num_windows} "
f"render_cameras={camera_count} special={special_camera_counts} "
f"interesting_tasks={interesting_tasks}"
)
if region_lines:
print(f"[RenderStats:{label}] region_detail={' | '.join(region_lines)}")
except Exception:
pass
def get_project_scripts_dir(self, project_path=None):
project_path = os.path.normpath(project_path or self.current_project_path or "")
if not project_path:
return ""
return os.path.join(project_path, "Assets", "Scripts")
def get_project_layout(self, project_path=None):
project_path = os.path.normpath(project_path or self.current_project_path or "")
if not project_path:
return None
return ProjectLayout(project_path)
def get_asset_database(self, project_path=None, *, reload=False):
project_path = os.path.normpath(project_path or self.current_project_path or "")
if not project_path:
return None
if reload or self._asset_database is None or self._asset_database.layout.project_root != normalize_path(project_path):
self._asset_database = AssetDatabase(project_path, world=self.world)
return self._asset_database
def _sync_resource_manager_root(self, project_path=None):
resource_manager = getattr(self.world, "resource_manager", None)
if resource_manager and hasattr(resource_manager, "set_project_path"):
resource_manager.set_project_path(project_path or self.current_project_path)
def _build_scene_entry(self, scene_guid, scene_name):
return {
"guid": scene_guid,
"name": scene_name,
"path": f"Scenes/{scene_name}{SCENE_DESCRIPTION_EXTENSION}",
"enabled_in_build": True,
}
def _normalize_scene_entry_payload(self, scene_entry):
scene_entry = dict(scene_entry or {})
scene_guid = str(scene_entry.get("guid", "") or "").strip() or generate_guid()
scene_name = str(scene_entry.get("name", "") or "Main").strip() or "Main"
normalized_path = str(scene_entry.get("path", "") or "").replace("\\", "/").strip()
if not normalized_path:
normalized_path = f"Scenes/{scene_name}{SCENE_DESCRIPTION_EXTENSION}"
elif normalized_path.lower().endswith(".egscene"):
normalized_path = normalized_path[:-8] + SCENE_DESCRIPTION_EXTENSION
elif not normalized_path.lower().endswith(SCENE_DESCRIPTION_EXTENSION):
normalized_path = f"Scenes/{scene_name}{SCENE_DESCRIPTION_EXTENSION}"
normalized_entry = {
"guid": scene_guid,
"name": scene_name,
"path": normalized_path,
"enabled_in_build": bool(scene_entry.get("enabled_in_build", True)),
}
return normalized_entry
def _upgrade_scene_description_files(self, project_path, project_config):
changed = False
normalized_scenes = []
for raw_scene_entry in self._get_scene_entries(project_config):
original_entry = dict(raw_scene_entry or {})
normalized_entry = self._normalize_scene_entry_payload(original_entry)
original_rel = str(original_entry.get("path", "") or "").replace("\\", "/").strip()
normalized_rel = normalized_entry["path"]
if original_rel and original_rel != normalized_rel:
old_scene_file = os.path.join(project_path, original_rel.replace("/", os.sep))
new_scene_file = os.path.join(project_path, normalized_rel.replace("/", os.sep))
os.makedirs(os.path.dirname(new_scene_file), exist_ok=True)
if os.path.exists(old_scene_file) and not os.path.exists(new_scene_file):
shutil.move(old_scene_file, new_scene_file)
changed = True
if original_entry != normalized_entry:
changed = True
normalized_scenes.append(normalized_entry)
if normalized_scenes:
project_config["scenes"] = normalized_scenes
scene_file = str(project_config.get("scene_file", "") or "").replace("\\", "/").strip()
if scene_file.lower().endswith(".egscene"):
project_config["scene_file"] = scene_file[:-8] + SCENE_DESCRIPTION_EXTENSION
changed = True
elif not scene_file and normalized_scenes:
project_config["scene_file"] = normalized_scenes[0]["path"]
changed = True
return changed, project_config
def _get_scene_entries(self, project_config=None):
project_config = project_config or self.project_config or {}
scenes = project_config.get("scenes", [])
return list(scenes) if isinstance(scenes, list) else []
def _get_scene_entry(self, scene_guid=None, project_config=None):
project_config = project_config or self.project_config or {}
target_guid = scene_guid or self.current_scene_guid or project_config.get("last_open_scene_guid") or project_config.get("startup_scene_guid")
for scene_entry in self._get_scene_entries(project_config):
if scene_entry.get("guid") == target_guid:
return dict(scene_entry)
scenes = self._get_scene_entries(project_config)
return dict(scenes[0]) if scenes else {}
def _scene_paths(self, scene_entry=None, project_path=None):
layout = self.get_project_layout(project_path)
if not layout:
return {}
scene_entry = scene_entry or self._get_scene_entry(project_config=self.project_config)
if not scene_entry:
return {}
scene_rel = scene_entry.get("path", "")
scene_abs = os.path.join(layout.project_root, scene_rel.replace("/", os.sep)) if scene_rel else ""
scene_root, _ = os.path.splitext(scene_abs)
return {
"scene_file": scene_abs,
"scene_gui_file": f"{scene_root}_gui.json" if scene_root else "",
"scene_lui_file": f"{scene_root}_lui.json" if scene_root else "",
}
def _create_default_project_config(self, project_root, project_name):
scene_guid = generate_guid()
scene_name = "Main"
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return {
"schema_version": PROJECT_SCHEMA_VERSION,
"name": project_name,
"path": normalize_path(project_root),
"created_at": timestamp,
"last_modified": timestamp,
"version": "2.0.0",
"engine_name": ENGINE_NAME,
"engine_version": "2.0.0",
"scenes": [self._build_scene_entry(scene_guid, scene_name)],
"scene_file": f"Scenes/{scene_name}{SCENE_DESCRIPTION_EXTENSION}",
"startup_scene_guid": scene_guid,
"last_open_scene_guid": scene_guid,
"build_settings": {
**default_build_settings(),
"enabled_scene_guids": [scene_guid],
},
}
def _ensure_v2_project_defaults(self, project_path, project_config):
project_config = dict(project_config or {})
project_config["schema_version"] = PROJECT_SCHEMA_VERSION
project_config["path"] = normalize_path(project_path)
project_config["engine_name"] = ENGINE_NAME
project_config["engine_version"] = str(project_config.get("engine_version", "2.0.0") or "2.0.0")
scenes = self._get_scene_entries(project_config)
if not scenes:
fallback_guid = generate_guid()
fallback_name = "Main"
scenes = [self._build_scene_entry(fallback_guid, fallback_name)]
project_config["scenes"] = scenes
else:
project_config["scenes"] = [self._normalize_scene_entry_payload(scene) for scene in scenes]
scenes = self._get_scene_entries(project_config)
startup_scene_guid = project_config.get("startup_scene_guid") or scenes[0]["guid"]
last_open_scene_guid = project_config.get("last_open_scene_guid") or startup_scene_guid
project_config["startup_scene_guid"] = startup_scene_guid
project_config["last_open_scene_guid"] = last_open_scene_guid
current_scene_entry = self._get_scene_entry(scene_guid=last_open_scene_guid, project_config=project_config) or scenes[0]
project_config["scene_file"] = current_scene_entry.get("path", project_config.get("scene_file", ""))
build_settings = project_config.get("build_settings", {}) or {}
enabled_scene_guids = list(build_settings.get("enabled_scene_guids", []) or [])
if not enabled_scene_guids:
enabled_scene_guids = [scene["guid"] for scene in scenes]
build_settings["enabled_scene_guids"] = enabled_scene_guids
build_settings.setdefault("output_format", "directory")
build_settings.setdefault("windows_player", True)
profiles = build_settings.get("profiles", {}) or {}
active_profile = str(build_settings.get("active_profile", "") or "default")
default_profile = dict((profiles.get(active_profile) or profiles.get("default") or {}))
if not default_profile:
default_profile = {
"name": "Default",
"target_platform": "windows",
"output_format": build_settings.get("output_format", "directory"),
"output_dir": "Builds",
"windows": {
"enabled": bool(build_settings.get("windows_player", True)),
"exe_name": "",
"windowed": True,
},
"runtime": {
"startup_scene_guid": project_config.get("startup_scene_guid", ""),
"enabled_scene_guids": enabled_scene_guids,
"script_mode": "pyc",
},
}
default_profile = self._normalize_build_profile(
default_profile,
scenes=scenes,
project_config=project_config,
default_enabled_scene_guids=enabled_scene_guids,
fallback_profile_name="Default",
fallback_output_dir="Builds",
fallback_output_format=build_settings.get("output_format", "directory"),
fallback_windows_enabled=build_settings.get("windows_player", True),
)
build_settings["active_profile"] = active_profile or "default"
build_settings["profiles"] = {"default": default_profile, **{k: v for k, v in profiles.items() if k != "default"}}
project_config["build_settings"] = build_settings
return project_config
def _normalize_enabled_scene_guids(self, enabled_scene_guids, scenes):
valid_scene_guids = [str(scene.get("guid", "") or "") for scene in (scenes or []) if scene.get("guid")]
valid_scene_guid_set = set(valid_scene_guids)
normalized = []
for scene_guid in enabled_scene_guids or []:
normalized_guid = str(scene_guid or "").strip()
if normalized_guid and normalized_guid in valid_scene_guid_set and normalized_guid not in normalized:
normalized.append(normalized_guid)
return normalized or valid_scene_guids
def _normalize_build_profile(
self,
profile,
*,
scenes,
project_config,
default_enabled_scene_guids,
fallback_profile_name,
fallback_output_dir,
fallback_output_format,
fallback_windows_enabled,
):
profile = copy.deepcopy(profile or {})
normalized_output_dir = str(profile.get("output_dir", fallback_output_dir) or fallback_output_dir).strip() or fallback_output_dir
normalized_output_dir = normalized_output_dir.replace("\\", "/")
while normalized_output_dir.startswith("./"):
normalized_output_dir = normalized_output_dir[2:]
normalized_output_dir = normalized_output_dir.strip("/") or fallback_output_dir
profile["name"] = str(profile.get("name", fallback_profile_name) or fallback_profile_name)
profile["target_platform"] = str(profile.get("target_platform", "windows") or "windows")
profile["output_format"] = str(profile.get("output_format", fallback_output_format) or fallback_output_format)
profile["output_dir"] = normalized_output_dir
profile["windows"] = dict(profile.get("windows", {}) or {})
profile["windows"]["enabled"] = bool(profile["windows"].get("enabled", fallback_windows_enabled))
profile["windows"]["exe_name"] = str(profile["windows"].get("exe_name", "") or "").strip()
profile["windows"]["windowed"] = bool(profile["windows"].get("windowed", True))
profile["runtime"] = dict(profile.get("runtime", {}) or {})
normalized_enabled_scene_guids = self._normalize_enabled_scene_guids(
profile["runtime"].get("enabled_scene_guids", []) or default_enabled_scene_guids,
scenes,
)
startup_scene_guid = str(
profile["runtime"].get("startup_scene_guid", project_config.get("startup_scene_guid", "")) or project_config.get("startup_scene_guid", "")
).strip()
if startup_scene_guid not in normalized_enabled_scene_guids:
startup_scene_guid = normalized_enabled_scene_guids[0] if normalized_enabled_scene_guids else ""
profile["runtime"]["enabled_scene_guids"] = normalized_enabled_scene_guids
profile["runtime"]["startup_scene_guid"] = startup_scene_guid
profile["runtime"]["script_mode"] = str(profile["runtime"].get("script_mode", "pyc") or "pyc")
return profile
def _get_active_build_profile(self, project_config=None):
project_config = self._ensure_v2_project_defaults(
self.current_project_path or (project_config or {}).get("path") or "",
project_config or self.project_config or {},
)
build_settings = project_config.get("build_settings", {}) or {}
active_profile = str(build_settings.get("active_profile", "default") or "default")
profiles = build_settings.get("profiles", {}) or {}
profile = dict((profiles.get(active_profile) or profiles.get("default") or {}))
return active_profile, profile
def get_build_profile_options(self, project_config=None):
project_config = self._ensure_v2_project_defaults(
self.current_project_path or (project_config or {}).get("path") or "",
project_config or self.project_config or {},
)
active_profile_name, profile = self._get_active_build_profile(project_config)
build_settings = project_config.get("build_settings", {}) or {}
profiles = dict(build_settings.get("profiles", {}) or {})
runtime_settings = dict((profile or {}).get("runtime", {}) or {})
windows_settings = dict((profile or {}).get("windows", {}) or {})
scene_entries = self._get_scene_entries(project_config)
enabled_scene_guids = self._normalize_enabled_scene_guids(runtime_settings.get("enabled_scene_guids", []) or [], scene_entries)
startup_scene_guid = str(runtime_settings.get("startup_scene_guid", project_config.get("startup_scene_guid", "")) or "")
if startup_scene_guid not in enabled_scene_guids:
startup_scene_guid = enabled_scene_guids[0] if enabled_scene_guids else ""
return {
"active_profile": active_profile_name,
"profile_names": list(profiles.keys()) or ["default"],
"output_dir": str((profile or {}).get("output_dir", "Builds") or "Builds"),
"exe_name": str(windows_settings.get("exe_name", "") or ""),
"startup_scene_guid": startup_scene_guid,
"enabled_scene_guids": enabled_scene_guids,
"scenes": scene_entries,
}
def update_active_build_profile(self, *, output_dir=None, exe_name=None, startup_scene_guid=None, enabled_scene_guids=None, active_profile_name=None):
if not self.current_project_path:
return False
project_config = self._ensure_v2_project_defaults(self.current_project_path, self.project_config or {})
current_profile_name, active_profile = self._get_active_build_profile(project_config)
build_settings = dict(project_config.get("build_settings", {}) or {})
profiles = dict(build_settings.get("profiles", {}) or {})
target_profile_name = str(active_profile_name or current_profile_name or "default").strip() or "default"
scene_entries = self._get_scene_entries(project_config)
if target_profile_name != current_profile_name:
active_profile = dict((profiles.get(target_profile_name) or profiles.get("default") or active_profile or {}))
profile = dict(active_profile or {})
profile["windows"] = dict(profile.get("windows", {}) or {})
profile["runtime"] = dict(profile.get("runtime", {}) or {})
if output_dir is not None:
profile["output_dir"] = str(output_dir or "Builds").strip() or "Builds"
if exe_name is not None:
profile["windows"]["exe_name"] = str(exe_name or "").strip()
if startup_scene_guid is not None:
startup_scene_guid = str(startup_scene_guid or "").strip()
profile["runtime"]["startup_scene_guid"] = startup_scene_guid
if startup_scene_guid:
project_config["startup_scene_guid"] = startup_scene_guid
if enabled_scene_guids is not None:
normalized_scene_guids = [str(scene_guid).strip() for scene_guid in enabled_scene_guids if str(scene_guid).strip()]
profile["runtime"]["enabled_scene_guids"] = normalized_scene_guids
build_settings["enabled_scene_guids"] = normalized_scene_guids
profile = self._normalize_build_profile(
profile,
scenes=scene_entries,
project_config=project_config,
default_enabled_scene_guids=build_settings.get("enabled_scene_guids", []) or [scene.get("guid", "") for scene in scene_entries if scene.get("guid")],
fallback_profile_name=target_profile_name,
fallback_output_dir="Builds",
fallback_output_format=build_settings.get("output_format", "directory"),
fallback_windows_enabled=build_settings.get("windows_player", True),
)
build_settings["enabled_scene_guids"] = list(profile.get("runtime", {}).get("enabled_scene_guids", []) or [])
if profile.get("runtime", {}).get("startup_scene_guid"):
project_config["startup_scene_guid"] = profile["runtime"]["startup_scene_guid"]
profiles[target_profile_name] = profile
build_settings["active_profile"] = target_profile_name
build_settings["profiles"] = profiles
project_config["build_settings"] = build_settings
self.project_config = project_config
self._write_project_config(self.current_project_path, project_config)
return True
def create_build_profile(self, profile_name):
if not self.current_project_path:
return False
profile_name = str(profile_name or "").strip()
if not profile_name:
return False
project_config = self._ensure_v2_project_defaults(self.current_project_path, self.project_config or {})
active_profile_name, active_profile = self._get_active_build_profile(project_config)
build_settings = dict(project_config.get("build_settings", {}) or {})
profiles = dict(build_settings.get("profiles", {}) or {})
if profile_name in profiles:
build_settings["active_profile"] = profile_name
project_config["build_settings"] = build_settings
self.project_config = project_config
self._write_project_config(self.current_project_path, project_config)
return True
profiles[profile_name] = copy.deepcopy(active_profile or {})
profiles[profile_name]["name"] = profile_name
build_settings["active_profile"] = profile_name
build_settings["profiles"] = profiles
project_config["build_settings"] = build_settings
self.project_config = project_config
self._write_project_config(self.current_project_path, project_config)
return True
def _ensure_project_scripts_dir(self, project_path):
scripts_dir = self.get_project_scripts_dir(project_path)
if scripts_dir:
os.makedirs(scripts_dir, exist_ok=True)
return scripts_dir
def _sync_project_script_manager(self, project_path, reload_scripts=True):
script_manager = getattr(self.world, "script_manager", None)
if not script_manager:
return ""
scripts_dir = self._ensure_project_scripts_dir(project_path)
script_manager.set_scripts_directory(
scripts_dir,
create=True,
reload_scripts=reload_scripts,
)
return scripts_dir
def _restore_project_context(self, project_path, project_config, reload_scripts=True):
normalized_project_path = os.path.normpath(project_path) if project_path else None
self.current_project_path = normalized_project_path
self.project_config = project_config
self.current_scene_guid = (project_config or {}).get("last_open_scene_guid") if project_config else None
self._asset_database = None
script_manager = getattr(self.world, "script_manager", None)
if not script_manager:
self._sync_resource_manager_root(normalized_project_path)
return
if normalized_project_path:
self._sync_project_script_manager(normalized_project_path, reload_scripts=reload_scripts)
else:
script_manager.set_scripts_directory(
"scripts",
create=True,
reload_scripts=reload_scripts,
)
self.get_asset_database(normalized_project_path, reload=True) if normalized_project_path else None
self._sync_resource_manager_root(normalized_project_path)
def _sanitize_build_name(self, project_name):
invalid_chars = '<>:"/\\|?*'
safe_name = "".join("_" if char in invalid_chars else char for char in str(project_name or "").strip())
safe_name = safe_name.rstrip(". ")
return safe_name or f"{ENGINE_NAME}Project"
def _write_project_config(self, project_path, project_config):
config_file = os.path.join(project_path, "project.json")
with open(config_file, "w", encoding="utf-8") as f:
json.dump(project_config, f, ensure_ascii=False, indent=4)
def _copy_if_exists(self, source_path, target_path):
if not source_path or not os.path.exists(source_path):
return
os.makedirs(os.path.dirname(target_path), exist_ok=True)
if os.path.isdir(source_path):
if os.path.exists(target_path):
shutil.rmtree(target_path)
shutil.copytree(source_path, target_path)
else:
shutil.copy2(source_path, target_path)
def _create_legacy_backup(self, project_path):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_root = os.path.join(project_path, "LegacyBackup", timestamp)
os.makedirs(backup_root, exist_ok=True)
for name in ("project.json", "models", "textures", "scenes", "Resources", "scripts"):
source_path = os.path.join(project_path, name)
if not os.path.exists(source_path):
continue
self._copy_if_exists(source_path, os.path.join(backup_root, name))
print(f"✓ 已创建旧项目备份: {backup_root}")
return backup_root
def _capture_camera_state(self):
camera = getattr(self.world, "camera", None) or getattr(self.world, "cam", None)
if not camera or camera.isEmpty():
return {}
return {
"position": list(camera.getPos()),
"rotation": list(camera.getHpr()),
"camera_control_enabled": bool(getattr(self.world, "camera_control_enabled", True)),
}
def import_asset_into_project(self, source_path, preferred_subdir=""):
asset_database = self.get_asset_database()
if not asset_database or not source_path:
return {}
return asset_database.import_asset(source_path, preferred_subdir=preferred_subdir)
def register_project_asset(self, asset_path):
asset_database = self.get_asset_database()
if not asset_database or not asset_path:
return {}
return asset_database.register_asset(asset_path, copy_into_assets=False)
def _retarget_live_scene_asset_paths(self):
asset_database = self.get_asset_database()
if not asset_database or not getattr(self.world, "render", None):
return
try:
for node in self.world.render.findAllMatches("**"):
if not node.hasTag("is_model_root"):
continue
source_path = ""
for tag_name in ("model_path", "saved_model_path", "original_path"):
if node.hasTag(tag_name) and node.getTag(tag_name):
source_path = node.getTag(tag_name)
break
if not source_path:
continue
asset_record = asset_database.register_asset(source_path, copy_into_assets=False)
if not asset_record:
continue
asset_abs_path = os.path.join(self.current_project_path, asset_record["asset_path"].replace("/", os.sep))
node.setTag("model_path", asset_abs_path)
node.setTag("saved_model_path", asset_abs_path)
node.setTag("asset_guid", asset_record.get("guid", ""))
node.setTag("asset_path", asset_record.get("asset_path", ""))
for gui_node in getattr(self.world, "gui_elements", []) or []:
for tag_name, preferred_subdir in (
("image_path", "UI"),
("gui_image_path", "UI"),
("video_path", "Video"),
):
if not gui_node or gui_node.isEmpty() or not gui_node.hasTag(tag_name):
continue
raw_value = gui_node.getTag(tag_name)
if not raw_value or raw_value.startswith(("http://", "https://")):
continue
asset_record = asset_database.register_asset(raw_value, preferred_subdir=preferred_subdir, copy_into_assets=False)
if not asset_record:
continue
asset_abs_path = os.path.join(self.current_project_path, asset_record["asset_path"].replace("/", os.sep))
gui_node.setTag(tag_name, asset_abs_path)
except Exception as e:
print(f"⚠️ 同步场景资产路径失败: {e}")
def _load_scene_entry_into_editor(self, scene_entry, project_path):
scene_description = self._load_scene_description(scene_entry, project_path)
if not scene_description:
print(f"⚠️ 场景描述不存在,已回退为空场景: {scene_entry.get('path', '')}")
self._clearCurrentScene()
return True
return self._load_scene_description_into_editor(scene_description, project_path, scene_entry)
def createScene(self, scene_name):
if not self.current_project_path or not scene_name:
return False
scene_name = str(scene_name).strip()
if not scene_name:
return False
project_config = dict(self.project_config or {})
if any(scene.get("name") == scene_name for scene in self._get_scene_entries(project_config)):
print(f"错误: 场景已存在: {scene_name}")
return False
scene_guid = generate_guid()
scene_entry = self._build_scene_entry(scene_guid, scene_name)
project_config.setdefault("scenes", []).append(scene_entry)
project_config["last_open_scene_guid"] = scene_guid
project_config["scene_file"] = scene_entry["path"]
build_settings = project_config.get("build_settings", {}) or {}
enabled_scene_guids = list(build_settings.get("enabled_scene_guids", []) or [])
enabled_scene_guids.append(scene_guid)
build_settings["enabled_scene_guids"] = enabled_scene_guids
project_config["build_settings"] = build_settings
self._clearCurrentScene()
self.project_config = project_config
self.current_scene_guid = scene_guid
self._write_project_config(self.current_project_path, project_config)
return self.saveProject()
def openScene(self, scene_guid):
if not self.current_project_path or not scene_guid:
return False
project_config = dict(self.project_config or {})
scene_entry = self._get_scene_entry(scene_guid=scene_guid, project_config=project_config)
if not scene_entry:
return False
if not self._load_scene_entry_into_editor(scene_entry, self.current_project_path):
return False
self.current_scene_guid = scene_entry["guid"]
project_config["last_open_scene_guid"] = scene_entry["guid"]
project_config["scene_file"] = scene_entry["path"]
self.project_config = project_config
self._write_project_config(self.current_project_path, project_config)
return True
# ==================== 项目生命周期管理 ====================
def createNewProject(self, project_path, project_name):
"""创建新项目
Args:
project_path: 项目路径
project_name: 项目名称
Returns:
bool: 创建是否成功
"""
if not project_path or not project_name:
print("错误: 项目路径和名称不能为空")
return False
full_project_path = os.path.normpath(os.path.join(project_path, project_name))
print(f"full_project_path: {full_project_path}")
try:
if os.path.exists(full_project_path):
if not os.path.isdir(full_project_path):
print("错误: 项目路径已存在且不是文件夹")
return False
if os.listdir(full_project_path):
print("错误: 项目目录已存在且非空")
return False
layout = ProjectLayout(full_project_path)
ensure_project_directories(layout)
project_config = self._create_default_project_config(full_project_path, project_name)
self._write_project_config(full_project_path, project_config)
print(f"项目配置文件已创建: {layout.project_file}")
self._clearCurrentScene()
self.current_project_path = full_project_path
self.project_config = project_config
self.current_scene_guid = project_config.get("startup_scene_guid")
self.get_asset_database(full_project_path, reload=True)
self._sync_project_script_manager(full_project_path, reload_scripts=True)
self._sync_resource_manager_root(full_project_path)
if not self.saveProject():
print("初始场景保存失败")
return False
print(f"项目 '{project_name}' 创建成功!")
return True
except Exception as e:
print(f"创建项目失败: {str(e)}")
return False
def openProject(self, project_path):
"""打开项目
Args:
project_path: 项目路径
Returns:
bool: 打开是否成功
"""
# print(f"\n[DEBUG] ===== 开始打开项目: {project_path} =====")
try:
try:
self.world._scene_tree_epoch = int(getattr(self.world, "_scene_tree_epoch", 0) or 0) + 1
except Exception:
self.world._scene_tree_epoch = 1
if not project_path:
print("错误: 项目路径不能为空")
return False
project_path = normalize_path(project_path)
config_file = os.path.join(project_path, "project.json")
if not os.path.exists(config_file):
print("错误: 选择的不是有效的项目文件夹!")
return False
with open(config_file, "r", encoding="utf-8") as f:
project_config = json.load(f)
previous_project_path = self.current_project_path
previous_project_config = self.project_config
ensure_project_directories(ProjectLayout(project_path))
project_config = self._ensure_v2_project_defaults(project_path, project_config)
_, project_config = self._upgrade_scene_description_files(project_path, project_config)
self.current_project_path = project_path
self.project_config = project_config
self.current_scene_guid = project_config.get("last_open_scene_guid") or project_config.get("startup_scene_guid")
self.get_asset_database(project_path, reload=True)
self._sync_project_script_manager(self.current_project_path, reload_scripts=True)
self._sync_resource_manager_root(project_path)
scene_entry = self._get_scene_entry(project_config=project_config)
if scene_entry:
if not self._load_scene_entry_into_editor(scene_entry, project_path):
self._restore_project_context(
previous_project_path,
previous_project_config,
reload_scripts=True,
)
return False
project_config["last_modified"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
project_config["last_open_scene_guid"] = self.current_scene_guid or project_config.get("startup_scene_guid")
current_scene_entry = self._get_scene_entry(project_config=project_config)
if current_scene_entry:
project_config["scene_file"] = current_scene_entry.get("path", project_config.get("scene_file", ""))
self._write_project_config(project_path, project_config)
self.current_project_path = project_path
self.project_config = project_config
print("项目加载成功!")
return True
except Exception as e:
self._restore_project_context(
locals().get("previous_project_path"),
locals().get("previous_project_config"),
reload_scripts=True,
)
# print(f"[DEBUG] ===== 项目打开失败 =====")
print(f"加载项目时发生错误:{str(e)}")
import traceback
traceback.print_exc()
return False
def openProjectForPath(self, project_path):
"""通过路径打开项目
Args:
project_path: 项目路径
Returns:
bool: 打开是否成功
"""
return self.openProject(project_path)
def _capture_scene_sidecar_snapshots(self, scene_paths):
gui_snapshot = self._capture_gui_snapshot_from_world()
lui_snapshot = self._capture_lui_snapshot_from_world()
if not gui_snapshot:
gui_snapshot = load_json(scene_paths.get("scene_gui_file", ""), [])
if not lui_snapshot:
lui_snapshot = load_json(scene_paths.get("scene_lui_file", ""), {})
return gui_snapshot, lui_snapshot
def _capture_gui_snapshot_from_world(self):
scene_manager = getattr(self.world, "scene_manager", None)
collect_fn = getattr(scene_manager, "_collectGUIElementInfo", None) if scene_manager else None
if not callable(collect_fn):
return []
gui_snapshot = []
for gui_node in list(getattr(self.world, "gui_elements", []) or []):
if not gui_node or gui_node.isEmpty():
continue
try:
gui_info = collect_fn(gui_node)
except Exception:
gui_info = None
if gui_info:
gui_snapshot.append(gui_info)
return gui_snapshot
def _capture_lui_snapshot_from_world(self):
lui_manager = getattr(self.world, "lui_manager", None)
capture_fn = getattr(lui_manager, "_capture_lui_structure_snapshot", None) if lui_manager else None
if not callable(capture_fn):
return {}
try:
return dict(capture_fn() or {})
except Exception:
return {}
def _write_scene_sidecars(self, scene_paths, gui_snapshot, lui_snapshot):
gui_file = scene_paths.get("scene_gui_file", "")
lui_file = scene_paths.get("scene_lui_file", "")
if gui_file:
save_json(gui_file, list(gui_snapshot or []))
if lui_file:
save_json(lui_file, dict(lui_snapshot or {}))
def _cleanup_legacy_scene_artifacts(self, scene_entry, project_path):
scene_paths = self._scene_paths(scene_entry, project_path=project_path)
scene_file = scene_paths.get("scene_file", "")
legacy_candidates = [
os.path.join(project_path, "scenes", "scene.bam"),
os.path.join(project_path, "scenes", "scene_gui.json"),
os.path.join(project_path, "scenes", "scene_lui.json"),
]
if scene_file:
legacy_candidates.append(f"{os.path.splitext(scene_file)[0]}.bam")
removed_files = []
for legacy_path in legacy_candidates:
if not legacy_path:
continue
normalized_path = os.path.normpath(legacy_path)
if not os.path.exists(normalized_path):
continue
try:
os.remove(normalized_path)
removed_files.append(normalized_path)
except OSError:
continue
if removed_files:
print(f"✓ 已清理旧场景缓存文件: {len(removed_files)}")
def _collect_live_scene_root_nodes(self):
scene_manager = getattr(self.world, "scene_manager", None)
if not scene_manager:
return []
def _node_is_valid(node):
if not node:
return False
try:
return not node.isEmpty()
except Exception:
try:
return not node.is_empty()
except Exception:
return False
ssbo_editor = getattr(self.world, "ssbo_editor", None)
source_model_root = getattr(ssbo_editor, "source_model_root", None) if ssbo_editor else None
runtime_model = getattr(ssbo_editor, "model", None) if ssbo_editor else None
def _is_source_tree_node(node):
if not _node_is_valid(node):
return False
if not ssbo_editor or not hasattr(ssbo_editor, "is_source_tree_node"):
return False
try:
return bool(ssbo_editor.is_source_tree_node(node))
except Exception:
return False
def _is_model_root_candidate(node):
if not _node_is_valid(node):
return False
if runtime_model and node == runtime_model:
return False
try:
if node.hasTag("light_type"):
return False
if node.hasTag("gui_type") or node.hasTag("is_gui_element"):
return False
if node.hasTag("is_model_root") or node.hasTag("asset_guid"):
return True
if node.hasTag("model_path") or node.hasTag("saved_model_path") or node.hasTag("original_path"):
return True
except Exception:
return False
return False
def _node_has_live_parent(node):
if not _node_is_valid(node):
return False
try:
if not node.hasParent():
return False
parent = node.getParent()
return bool(parent and not parent.isEmpty())
except Exception:
try:
if not node.has_parent():
return False
parent = node.get_parent()
return bool(parent and not parent.is_empty())
except Exception:
return False
def _is_attached_source_child(node):
if not _node_is_valid(node) or not _node_is_valid(source_model_root):
return False
try:
return node.getParent() == source_model_root
except Exception:
try:
return node.get_parent() == source_model_root
except Exception:
return False
def _append_unique_model_root(target_list, candidate, include_ssbo_source, allow_untyped_model=False):
if not _node_is_valid(candidate):
return
if runtime_model and candidate == runtime_model:
return
is_source_node = _is_source_tree_node(candidate)
if is_source_node and not include_ssbo_source:
return
if is_source_node:
# Source nodes must still be attached under source_model_root.
# Detached stale nodes in scene_manager.models should never be saved.
if not _is_attached_source_child(candidate):
return
else:
# Non-source nodes must still be part of a live scene graph branch.
if not _node_has_live_parent(candidate):
return
if not is_source_node:
if not _is_model_root_candidate(candidate):
if not allow_untyped_model:
return
try:
if candidate.hasTag("light_type"):
return
if candidate.hasTag("gui_type") or candidate.hasTag("is_gui_element"):
return
except Exception:
return
for existing in target_list:
try:
if existing == candidate:
return
except Exception:
continue
target_list.append(candidate)
def _gather_model_roots(include_ssbo_source=True):
model_nodes = []
if include_ssbo_source and _node_is_valid(source_model_root):
snapshot_fn = getattr(ssbo_editor, "_snapshot_top_level_transforms_to_source_root", None)
snapshot_material_fn = getattr(ssbo_editor, "_snapshot_runtime_materials_to_source_root", None)
if callable(snapshot_fn):
try:
snapshot_fn()
except Exception:
pass
if callable(snapshot_material_fn):
try:
snapshot_material_fn()
except Exception:
pass
try:
source_children = list(source_model_root.getChildren())
except Exception:
source_children = []
for child in source_children:
_append_unique_model_root(
model_nodes,
child,
include_ssbo_source=True,
allow_untyped_model=False,
)
for node in list(getattr(scene_manager, "models", []) or []):
_append_unique_model_root(
model_nodes,
node,
include_ssbo_source=include_ssbo_source,
allow_untyped_model=True,
)
render = getattr(self.world, "render", None)
if _node_is_valid(render):
try:
render_children = list(render.getChildren())
except Exception:
render_children = []
for child in render_children:
_append_unique_model_root(
model_nodes,
child,
include_ssbo_source=include_ssbo_source,
allow_untyped_model=False,
)
return model_nodes
root_nodes = []
seen = set()
model_root_nodes = []
auxiliary_root_nodes = []
model_nodes = _gather_model_roots(include_ssbo_source=True)
for node in model_nodes:
if not _node_is_valid(node):
continue
node_key = id(node)
if node_key in seen:
continue
seen.add(node_key)
model_root_nodes.append(node)
# Keep scene_manager.models in sync with current live roots so the next
# save/open cycle does not accidentally reuse detached stale references.
try:
if hasattr(scene_manager, "models"):
scene_manager.models = list(model_root_nodes)
except Exception:
pass
for collection_name in ("Spotlight", "Pointlight"):
for node in list(getattr(scene_manager, collection_name, []) or []):
if not node or node.isEmpty():
continue
node_key = id(node)
if node_key in seen:
continue
seen.add(node_key)
auxiliary_root_nodes.append(node)
root_nodes.extend(model_root_nodes)
root_nodes.extend(auxiliary_root_nodes)
return root_nodes
def _write_scene_description_from_live_scene(self, scene_entry, project_path):
scene_paths = self._scene_paths(scene_entry, project_path=project_path)
asset_database = self.get_asset_database(project_path, reload=True)
if asset_database:
asset_database.ensure_project_assets_registered()
gui_snapshot, lui_snapshot = self._capture_scene_sidecar_snapshots(scene_paths)
root_nodes = self._collect_live_scene_root_nodes()
if not root_nodes:
raise RuntimeError("当前场景没有可保存的根节点")
scene_description = build_scene_description_from_world(
asset_database=asset_database,
project_root=project_path,
scene_guid=scene_entry["guid"],
scene_name=scene_entry["name"],
scene_file_rel=scene_entry["path"],
root_nodes=root_nodes,
cache_bam_path="",
cache_gui_path=scene_paths["scene_gui_file"],
cache_lui_path=scene_paths["scene_lui_file"],
gui_elements=gui_snapshot,
lui_snapshot=lui_snapshot,
camera_state=self._capture_camera_state(),
)
scene_file_path = os.path.join(project_path, scene_entry["path"].replace("/", os.sep))
save_json(scene_file_path, scene_description)
self._write_scene_sidecars(scene_paths, gui_snapshot, lui_snapshot)
try:
print(
f"[SceneSave] roots={len(root_nodes)} nodes={len(scene_description.get('nodes', []) or [])} "
f"assets={len(scene_description.get('referenced_asset_guids', []) or [])} "
f"scripts={len(scene_description.get('referenced_script_guids', []) or [])}"
)
except Exception:
pass
return scene_description
def _load_scene_description_into_editor(self, scene_description, project_path, scene_entry=None):
asset_database = self.get_asset_database(project_path, reload=True)
if not asset_database:
return False
self._clearCurrentScene()
self._log_render_runtime_stats("after_clear")
try:
print(
f"[SceneOpen] scene_nodes={len(scene_description.get('nodes', []) or [])} "
f"assets={len(scene_description.get('referenced_asset_guids', []) or [])}"
)
except Exception:
pass
ssbo_loaded = self._load_scene_description_via_ssbo(scene_description, project_path, asset_database)
use_manual_asset_import = (
not ssbo_loaded
and callable(getattr(self.world, "_import_model_with_menu_logic", None))
)
scene_root, keep_nodes, built_nodes = self._build_scene_root_from_description(
scene_description,
project_path,
asset_database,
include_mode="all",
skip_asset_nodes=(ssbo_loaded or use_manual_asset_import),
return_node_map=True,
)
manually_imported_models = []
if use_manual_asset_import:
manually_imported_models = self._load_scene_assets_via_manual_import(
scene_description,
project_path,
asset_database,
scene_root,
keep_nodes,
built_nodes,
)
scene_manager = getattr(self.world, "scene_manager", None)
if scene_manager:
scene_manager.models = []
scene_manager.Spotlight = []
scene_manager.Pointlight = []
built_model_nodes = []
built_spot_lights = []
built_point_lights = []
try:
root_children = list(scene_root.getChildren())
except Exception:
root_children = []
manual_model_keys = set()
for manual_model in manually_imported_models:
try:
manual_model_keys.add(id(manual_model))
except Exception:
continue
try:
ssbo_editor = getattr(self.world, "ssbo_editor", None)
runtime_model = getattr(ssbo_editor, "model", None) if ssbo_editor else None
runtime_desc = 0
if runtime_model and not runtime_model.isEmpty():
try:
runtime_desc = runtime_model.find_all_matches("**").get_num_paths()
except Exception:
runtime_desc = 0
print(
f"[SceneOpen] ssbo_loaded={ssbo_loaded} keep_nodes={len(keep_nodes)} "
f"scene_root_children={len(root_children)} runtime_desc={runtime_desc}"
)
except Exception:
pass
for child in root_children:
child.reparentTo(self.world.render)
if child.hasTag("is_model_root") or child.hasTag("asset_guid"):
built_model_nodes.append(child)
try:
if scene_manager and hasattr(scene_manager, "setupCollision") and id(child) not in manual_model_keys:
scene_manager.setupCollision(child)
if scene_manager and hasattr(scene_manager, "_processModelAnimations") and id(child) not in manual_model_keys:
scene_manager._processModelAnimations(child)
except Exception:
pass
elif child.hasTag("light_type"):
light_type = child.getTag("light_type")
if light_type == "spot_light":
built_spot_lights.append(child)
elif light_type == "point_light":
built_point_lights.append(child)
if scene_manager:
if not ssbo_loaded:
merged_models = []
for candidate in list(manually_imported_models) + built_model_nodes:
if not candidate or candidate.isEmpty():
continue
if any(existing == candidate for existing in merged_models):
continue
merged_models.append(candidate)
scene_manager.models = merged_models
else:
merged_models = list(getattr(scene_manager, "models", []) or [])
for child in built_model_nodes:
if not child or child.isEmpty():
continue
if child not in merged_models:
merged_models.append(child)
scene_manager.models = merged_models
scene_manager.Spotlight = built_spot_lights
scene_manager.Pointlight = built_point_lights
update_tree_fn = getattr(scene_manager, "updateSceneTree", None)
if callable(update_tree_fn):
try:
update_tree_fn()
except Exception:
pass
try:
if not scene_root.isEmpty():
scene_root.removeNode()
except Exception:
pass
try:
render_desc = self.world.render.find_all_matches("**").get_num_paths()
print(
f"[SceneOpen] render_desc={render_desc} built_models={len(built_model_nodes)} "
f"spot={len(built_spot_lights)} point={len(built_point_lights)}"
)
except Exception:
pass
try:
controller = getattr(ssbo_editor, "controller", None) if ssbo_loaded else None
if controller and hasattr(controller, "get_runtime_structure_stats"):
print(f"[SceneOpen] runtime_structure={controller.get_runtime_structure_stats()}")
except Exception:
pass
try:
if ssbo_loaded and ssbo_editor and hasattr(ssbo_editor, "get_source_tree_stats"):
print(f"[SceneOpen] source_tree={ssbo_editor.get_source_tree_stats()}")
except Exception:
pass
self._log_render_runtime_stats("after_scene_rebuild")
scene_components = dict(scene_description.get("scene_components", {}) or {})
camera_state = dict(scene_components.get("camera", {}) or scene_description.get("camera", {}) or {})
camera = getattr(self.world, "camera", None) or getattr(self.world, "cam", None)
if camera and not camera.isEmpty():
try:
position = list(camera_state.get("position", []) or [])
rotation = list(camera_state.get("rotation", []) or [])
if len(position) >= 3:
camera.setPos(float(position[0]), float(position[1]), float(position[2]))
if len(rotation) >= 3:
camera.setHpr(float(rotation[0]), float(rotation[1]), float(rotation[2]))
if "camera_control_enabled" in camera_state:
self.world.camera_control_enabled = bool(camera_state.get("camera_control_enabled", True))
except Exception:
pass
gui_snapshot = list((scene_components.get("gui", {}) or {}).get("elements", []) or scene_description.get("gui", []) or [])
lui_snapshot = dict(scene_components.get("lui", {}) or scene_description.get("lui", {}) or {})
scene_paths = self._scene_paths(scene_entry or {}, project_path=project_path) if scene_entry else {}
self._write_scene_sidecars(scene_paths, gui_snapshot, lui_snapshot)
try:
load_lui_fn = getattr(scene_manager, "_load_lui_snapshot_file", None) if scene_manager else None
if callable(load_lui_fn) and scene_paths.get("scene_file"):
temp_stub = os.path.splitext(scene_paths["scene_file"])[0] + ".bam"
load_lui_fn(temp_stub)
except Exception:
pass
self._log_render_runtime_stats("after_lui_restore")
return bool(ssbo_loaded or built_model_nodes or built_spot_lights or built_point_lights or scene_components)
def _iter_top_level_scene_asset_nodes(self, scene_description):
nodes = list(scene_description.get("nodes", []) or [])
node_lookup = {
str(node.get("node_id", "") or ""): dict(node)
for node in nodes
if node.get("node_id") is not None
}
top_level_nodes = []
for node_id, node in node_lookup.items():
asset_guid = str(node.get("asset_guid", "") or (node.get("components", {}) or {}).get("model", {}).get("asset_guid", "") or "").strip()
if not asset_guid:
continue
parent_id = node.get("parent_id")
has_asset_parent = False
while parent_id:
parent_node = node_lookup.get(parent_id)
if not parent_node:
break
parent_asset_guid = str(parent_node.get("asset_guid", "") or (parent_node.get("components", {}) or {}).get("model", {}).get("asset_guid", "") or "").strip()
if parent_asset_guid:
has_asset_parent = True
break
parent_id = parent_node.get("parent_id")
if not has_asset_parent:
top_level_nodes.append(node)
return top_level_nodes, node_lookup
def _load_scene_assets_via_manual_import(
self,
scene_description,
project_path,
asset_database,
scene_root,
keep_nodes,
built_nodes,
):
import_with_menu = getattr(self.world, "_import_model_with_menu_logic", None)
if not callable(import_with_menu):
return []
top_level_nodes, node_lookup = self._iter_top_level_scene_asset_nodes(scene_description)
imported_models = []
for node in top_level_nodes:
components = dict(node.get("components", {}) or {})
model_component = dict(components.get("model", {}) or {})
imported_node_key = str(
model_component.get("imported_node_key", "")
or node.get("imported_node_key", "")
or ""
).strip()
if imported_node_key:
return []
asset_guid = str(
model_component.get("asset_guid", "")
or node.get("asset_guid", "")
or ""
).strip()
if not asset_guid:
continue
asset_record = asset_database.get_asset(asset_guid)
if not asset_record:
return []
asset_rel = str(
asset_record.get("asset_path", "")
or model_component.get("asset_path", "")
or node.get("asset_path", "")
or ""
).strip()
if not asset_rel:
return []
asset_abs = os.path.join(project_path, asset_rel.replace("/", os.sep))
if not os.path.exists(asset_abs):
return []
imported_np = import_with_menu(
asset_abs,
select_model=False,
set_origin=False,
show_info_message=False,
show_success_message=False,
)
if not imported_np or imported_np.isEmpty():
return []
parent_np = scene_root
parent_id = self._resolve_scene_description_parent(node, keep_nodes, node_lookup)
if parent_id in built_nodes:
parent_np = built_nodes[parent_id]
imported_np.reparentTo(parent_np)
self._apply_scene_description_state_to_subtree(
imported_np,
node,
project_path,
node_lookup,
apply_material_state=True,
prune_missing_children=False,
)
imported_models.append(imported_np)
return imported_models
def _apply_scene_description_state_to_node(self, target_np, node, project_path, apply_material_state=True):
if not target_np or target_np.isEmpty() or not isinstance(node, dict):
return
node_name = str(node.get("name", "") or target_np.getName() or "Node")
target_np.setName(node_name)
transform = dict(node.get("transform", {}) or {})
position = list(transform.get("position", [0, 0, 0]) or [0, 0, 0])
rotation = list(transform.get("rotation", [0, 0, 0]) or [0, 0, 0])
scale = list(transform.get("scale", [1, 1, 1]) or [1, 1, 1])
try:
current_pos = target_np.getPos()
except Exception:
current_pos = None
try:
current_hpr = target_np.getHpr()
except Exception:
current_hpr = None
try:
current_scale = target_np.getScale()
except Exception:
current_scale = None
if len(position) >= 3 and self._vector_differs(current_pos, position):
target_np.setPos(float(position[0]), float(position[1]), float(position[2]))
if len(rotation) >= 3 and self._vector_differs(current_hpr, rotation):
target_np.setHpr(float(rotation[0]), float(rotation[1]), float(rotation[2]))
if len(scale) >= 3 and self._vector_differs(current_scale, scale):
target_np.setScale(float(scale[0]), float(scale[1]), float(scale[2]))
visibility = dict(node.get("visibility", {}) or {})
user_visible = bool(visibility.get("user_visible", True))
target_np.setPythonTag("user_visible", user_visible)
target_np.setTag("user_visible", "true" if user_visible else "false")
if user_visible:
target_np.show()
else:
target_np.hide()
for tag_name, tag_value in dict(node.get("tags", {}) or {}).items():
if tag_value is None:
continue
target_np.setTag(str(tag_name), str(tag_value))
components = dict(node.get("components", {}) or {})
model_component = dict(components.get("model", {}) or {})
scripts_component = dict(components.get("scripts", {}) or {})
metadata_component = dict(components.get("metadata", {}) or {})
asset_guid = str(model_component.get("asset_guid", "") or node.get("asset_guid", "") or "").strip()
asset_path = str(model_component.get("asset_path", "") or node.get("asset_path", "") or "").strip()
imported_node_key = str(model_component.get("imported_node_key", "") or node.get("imported_node_key", "") or "").strip()
if asset_guid:
target_np.setTag("asset_guid", asset_guid)
if asset_path:
asset_abs_path = os.path.join(project_path, asset_path.replace("/", os.sep))
target_np.setTag("asset_path", asset_path)
target_np.setTag("model_path", asset_abs_path)
target_np.setTag("saved_model_path", asset_abs_path)
target_np.setTag("original_path", asset_abs_path)
target_np.setTag("file", os.path.basename(asset_abs_path))
if imported_node_key:
target_np.setTag("imported_node_key", imported_node_key)
if asset_guid:
target_np.setTag("is_model_root", "1")
target_np.setTag("is_scene_element", "1")
runtime_interactive = bool(metadata_component.get("runtime_interactive", node.get("runtime_interactive", False)))
if runtime_interactive:
target_np.setTag("runtime_interactive", "true")
for tag_name in (
"has_animations",
"has_animations_checked",
"can_create_actor_from_memory",
"saved_has_animations",
"saved_can_create_actor_from_memory",
):
tag_value = str(
metadata_component.get(tag_name, (node.get("tags", {}) or {}).get(tag_name, ""))
or ""
).strip()
if tag_value:
target_np.setTag(tag_name, tag_value)
if asset_path:
asset_abs_path = os.path.join(project_path, asset_path.replace("/", os.sep))
if not target_np.hasTag("saved_model_path"):
target_np.setTag("saved_model_path", asset_abs_path)
scripts = list(scripts_component.get("entries", []) or node.get("scripts", []) or [])
if scripts:
target_np.setTag("has_scripts", "true")
target_np.setTag("scripts_info", json.dumps(scripts, ensure_ascii=False))
if apply_material_state and self._should_restore_saved_material_state(node):
self._apply_saved_material_tags_to_node(target_np)
def _should_restore_saved_material_state(self, node):
if not isinstance(node, dict):
return False
tags = dict(node.get("tags", {}) or {})
if str(tags.get("scene_material_dirty", "") or "").strip().lower() == "true":
return True
return False
def _apply_saved_material_tags_to_node(self, target_np):
"""Rebuild runtime material state from serialized material_* tags."""
if not target_np or target_np.isEmpty():
return
property_helpers = getattr(self.world, "property_helpers", None)
if not property_helpers:
return
material_tag_names = (
"material_basecolor",
"material_emission",
"material_roughness",
"material_metallic",
"material_ior",
)
if not any(target_np.hasTag(tag_name) for tag_name in material_tag_names):
return
try:
from panda3d.core import Vec4
except Exception:
Vec4 = None
def _parse_vec4(raw_value):
try:
cleaned = str(raw_value or "").strip()
for prefix in ("LVecBase4f", "Vec4", "LColor"):
cleaned = cleaned.replace(prefix, "")
cleaned = cleaned.strip("() ")
values = [float(part.strip()) for part in cleaned.split(",") if part.strip()]
if len(values) == 3:
values.append(1.0)
if len(values) >= 4:
return tuple(values[:4])
except Exception:
pass
return None
def _parse_float(tag_name):
try:
return float(target_np.getTag(tag_name))
except Exception:
return None
ensure_material_fn = getattr(property_helpers, "_ensure_material_for_node", None)
sync_runtime_fn = getattr(property_helpers, "_sync_material_node_runtime", None)
set_base_color_fn = getattr(property_helpers, "_set_material_base_color", None)
if not callable(ensure_material_fn):
return
try:
material = ensure_material_fn(target_np)
except Exception:
material = None
if material is None:
return
base_color = _parse_vec4(target_np.getTag("material_basecolor")) if target_np.hasTag("material_basecolor") else None
if base_color is not None and callable(set_base_color_fn):
try:
set_base_color_fn(material, base_color)
except Exception:
pass
emission = _parse_vec4(target_np.getTag("material_emission")) if target_np.hasTag("material_emission") else None
if emission is not None and Vec4 is not None and hasattr(material, "set_emission"):
try:
material.set_emission(Vec4(*emission))
except Exception:
pass
roughness = _parse_float("material_roughness")
if roughness is not None and hasattr(material, "set_roughness"):
try:
material.set_roughness(roughness)
except Exception:
pass
metallic = _parse_float("material_metallic")
if metallic is not None and hasattr(material, "set_metallic"):
try:
material.set_metallic(metallic)
except Exception:
pass
ior = _parse_float("material_ior")
if ior is not None and hasattr(material, "set_refractive_index"):
try:
material.set_refractive_index(ior)
except Exception:
pass
if callable(sync_runtime_fn):
try:
sync_runtime_fn(target_np, material, refresh_ssbo_runtime=False)
except Exception:
pass
def _apply_scene_description_state_to_subtree(
self,
target_np,
node,
project_path,
node_lookup,
apply_material_state=True,
prune_missing_children=False,
):
"""Apply saved state to one imported node and its serialized descendants by child order."""
if not target_np or target_np.isEmpty() or not isinstance(node, dict):
return
self._apply_scene_description_state_to_node(
target_np,
node,
project_path,
apply_material_state=apply_material_state,
)
node_id = str(node.get("node_id", "") or "").strip()
if not node_id:
return
child_nodes = []
for candidate in list((node_lookup or {}).values()):
if str(candidate.get("parent_id", "") or "").strip() != node_id:
continue
child_nodes.append(candidate)
def _node_index(candidate):
candidate_id = str(candidate.get("node_id", "") or "")
try:
return int(candidate_id.rsplit("/", 1)[-1])
except Exception:
return 0
child_nodes.sort(key=_node_index)
runtime_children = []
try:
for child in target_np.getChildren():
if not child or child.isEmpty():
continue
child_name = ""
try:
child_name = str(child.getName() or "")
except Exception:
child_name = ""
if child_name.startswith("modelCollision_"):
continue
if child_name.startswith("selectionOutline"):
continue
runtime_children.append(child)
except Exception:
runtime_children = []
def _entry_imported_key(entry):
if not isinstance(entry, dict):
return ""
components = dict(entry.get("components", {}) or {})
model_component = dict(components.get("model", {}) or {})
return str(
model_component.get("imported_node_key", "")
or entry.get("imported_node_key", "")
or ""
).strip()
def _runtime_imported_key(runtime_np):
if not runtime_np or runtime_np.isEmpty():
return ""
for tag_name in ("imported_node_key", "source_model_node_key", "ssbo_tree_key", "tree_item_key"):
try:
if runtime_np.hasTag(tag_name):
tag_value = str(runtime_np.getTag(tag_name) or "").strip()
if tag_value:
return tag_value
except Exception:
continue
return ""
runtime_children_by_key = {}
runtime_children_by_name = {}
for runtime_index, runtime_child in enumerate(runtime_children):
key = _runtime_imported_key(runtime_child)
if key and key not in runtime_children_by_key:
runtime_children_by_key[key] = (runtime_index, runtime_child)
try:
runtime_name = str(runtime_child.getName() or "").strip()
except Exception:
runtime_name = ""
if runtime_name:
runtime_children_by_name.setdefault(runtime_name, []).append((runtime_index, runtime_child))
used_runtime_indices = set()
for child_entry in child_nodes:
target_runtime_child = None
target_runtime_index = -1
imported_key = _entry_imported_key(child_entry)
if imported_key:
keyed_entry = runtime_children_by_key.get(imported_key)
if keyed_entry:
keyed_index, keyed_child = keyed_entry
if keyed_index not in used_runtime_indices:
target_runtime_index = keyed_index
target_runtime_child = keyed_child
if target_runtime_child is None:
entry_name = str(child_entry.get("name", "") or "").strip()
for named_index, named_child in runtime_children_by_name.get(entry_name, []):
if named_index in used_runtime_indices:
continue
target_runtime_index = named_index
target_runtime_child = named_child
break
if target_runtime_child is None:
child_index = _node_index(child_entry)
if child_index < 0 or child_index >= len(runtime_children):
continue
if child_index in used_runtime_indices:
continue
target_runtime_index = child_index
target_runtime_child = runtime_children[child_index]
if target_runtime_child is None:
continue
used_runtime_indices.add(target_runtime_index)
self._apply_scene_description_state_to_subtree(
target_runtime_child,
child_entry,
project_path,
node_lookup,
apply_material_state=apply_material_state,
prune_missing_children=prune_missing_children,
)
if prune_missing_children:
for runtime_index, runtime_child in enumerate(runtime_children):
if runtime_index in used_runtime_indices:
continue
if not runtime_child or runtime_child.isEmpty():
continue
try:
runtime_child.detachNode()
except Exception:
try:
runtime_child.detach_node()
except Exception:
continue
def _load_scene_description_via_ssbo(self, scene_description, project_path, asset_database):
if not getattr(self.world, "use_ssbo_mouse_picking", False):
return False
ssbo_editor = getattr(self.world, "ssbo_editor", None)
if not ssbo_editor:
return False
refresh_runtime_fn = getattr(ssbo_editor, "refresh_runtime_from_source", None)
top_level_asset_nodes, node_lookup = self._iter_top_level_scene_asset_nodes(scene_description)
if not top_level_asset_nodes:
return False
candidate_nodes = []
for node in top_level_asset_nodes:
components = dict(node.get("components", {}) or {})
model_component = dict(components.get("model", {}) or {})
asset_guid = str(model_component.get("asset_guid", "") or node.get("asset_guid", "") or "").strip()
if not asset_guid:
continue
asset_record = asset_database.get_asset(asset_guid)
if not asset_record:
continue
asset_rel = str(asset_record.get("asset_path", "") or model_component.get("asset_path", "") or node.get("asset_path", "") or "").strip()
if not asset_rel:
continue
asset_abs = os.path.join(project_path, asset_rel.replace("/", os.sep))
if not os.path.exists(asset_abs):
continue
candidate_nodes.append((node, components, model_component, asset_abs))
loaded_any = False
total_candidates = len(candidate_nodes)
for index, (node, components, model_component, asset_abs) in enumerate(candidate_nodes):
has_more_assets = index < (total_candidates - 1)
has_saved_animation = False
try:
metadata_component = dict(components.get("metadata", {}) or {})
tags = dict(node.get("tags", {}) or {})
has_saved_animation = (
str(tags.get("has_animations", "")).lower() == "true"
or str(tags.get("saved_has_animations", "")).lower() == "true"
or str(metadata_component.get("has_animations", "")).lower() == "true"
or str(metadata_component.get("saved_has_animations", "")).lower() == "true"
or str(metadata_component.get("can_create_actor_from_memory", "")).lower() == "true"
or str(metadata_component.get("saved_can_create_actor_from_memory", "")).lower() == "true"
)
except Exception:
has_saved_animation = False
if has_saved_animation:
scene_manager = getattr(self.world, "scene_manager", None)
if scene_manager and hasattr(scene_manager, "importModel"):
try:
imported_np = scene_manager.importModel(asset_abs)
except Exception:
imported_np = None
if imported_np and not imported_np.isEmpty():
self._apply_scene_description_state_to_node(imported_np, node, project_path)
loaded_any = True
continue
try:
imported_root = ssbo_editor.load_model(
asset_abs,
keep_source_model=False,
append=loaded_any,
scene_package_import=False,
rebuild_runtime=False,
)
except Exception as e:
print(f"⚠️ SSBO 恢复场景模型失败 {asset_abs}: {e}")
continue
target_root = imported_root if imported_root and not imported_root.isEmpty() else None
if target_root is None:
continue
self._apply_scene_description_state_to_subtree(
target_root,
node,
project_path,
node_lookup,
apply_material_state=False,
prune_missing_children=(
str((node.get("tags", {}) or {}).get("ssbo_managed", "")).strip().lower()
in ("1", "true", "yes", "on")
),
)
# Only rebuild between imports, or once after the whole batch below.
# Otherwise reopening a project pays one extra full SSBO rebuild per model.
if has_more_assets and callable(refresh_runtime_fn):
try:
refresh_runtime_fn(preserve_selection=False)
except Exception:
pass
loaded_any = True
if not loaded_any:
return False
if callable(refresh_runtime_fn):
try:
refresh_runtime_fn(preserve_selection=False)
except Exception:
pass
force_static_idle_fn = getattr(ssbo_editor, "force_static_chunk_idle_state", None)
if callable(force_static_idle_fn):
try:
force_static_idle_fn()
except Exception:
pass
scene_manager = getattr(self.world, "scene_manager", None)
if scene_manager:
source_root = getattr(ssbo_editor, "source_model_root", None)
merged_models = []
def _append_model(candidate):
if not candidate:
return
try:
if candidate.isEmpty():
return
except Exception:
try:
if candidate.is_empty():
return
except Exception:
return
try:
if ssbo_editor and hasattr(ssbo_editor, "is_source_tree_node") and ssbo_editor.is_source_tree_node(candidate):
pass
except Exception:
pass
for existing in merged_models:
try:
if existing == candidate:
return
except Exception:
continue
merged_models.append(candidate)
for candidate in list(getattr(scene_manager, "models", []) or []):
try:
if ssbo_editor and hasattr(ssbo_editor, "is_source_tree_node") and ssbo_editor.is_source_tree_node(candidate):
continue
except Exception:
pass
_append_model(candidate)
if source_root and not source_root.isEmpty():
try:
for child in list(source_root.getChildren()):
_append_model(child)
except Exception:
pass
render = getattr(self.world, "render", None)
if render and not render.isEmpty():
try:
for child in list(render.getChildren()):
if child == getattr(ssbo_editor, "model", None):
continue
try:
is_model_root = (
child.hasTag("is_model_root")
or child.hasTag("asset_guid")
or child.hasTag("model_path")
or child.hasTag("saved_model_path")
)
except Exception:
is_model_root = False
if not is_model_root:
continue
try:
if ssbo_editor and hasattr(ssbo_editor, "is_source_tree_node") and ssbo_editor.is_source_tree_node(child):
continue
except Exception:
pass
_append_model(child)
except Exception:
pass
scene_manager.models = merged_models
scene_manager.Spotlight = []
scene_manager.Pointlight = []
update_tree_fn = getattr(scene_manager, "updateSceneTree", None)
if callable(update_tree_fn):
try:
update_tree_fn()
except Exception:
pass
return True
def _should_keep_scene_description_node(self, node_id, node, node_lookup):
if not node_id:
return False
if node.get("asset_guid"):
return True
components = node.get("components", {}) or {}
if components.get("light"):
return True
if node.get("scripts"):
return True
tags = node.get("tags", {}) or {}
if any(key in tags for key in ("is_scene_element", "runtime_interactive", "element_type", "has_scripts")):
return True
parent_id = node.get("parent_id")
while parent_id:
parent_node = node_lookup.get(parent_id)
if not parent_node:
break
if parent_node.get("asset_guid"):
return False
parent_id = parent_node.get("parent_id")
return True
def _resolve_scene_description_parent(self, node, keep_nodes, node_lookup):
parent_id = node.get("parent_id")
while parent_id:
if parent_id in keep_nodes:
return parent_id
parent_node = node_lookup.get(parent_id)
parent_id = parent_node.get("parent_id") if parent_node else None
return None
def _instantiate_scene_description_node(self, *, node, parent_np, project_path, asset_database):
node_name = str(node.get("name", "") or "Node")
tags = dict(node.get("tags", {}) or {})
components = dict(node.get("components", {}) or {})
model_component = dict(components.get("model", {}) or {})
scripts_component = dict(components.get("scripts", {}) or {})
metadata_component = dict(components.get("metadata", {}) or {})
asset_guid = str(model_component.get("asset_guid", "") or node.get("asset_guid", "") or "")
imported_node_key = str(model_component.get("imported_node_key", "") or node.get("imported_node_key", "") or "").strip()
if asset_guid:
loaded_np = self._load_asset_node_for_scene_description(
asset_guid,
project_path,
asset_database,
imported_node_key=imported_node_key,
node_name=node_name,
)
if not loaded_np or loaded_np.isEmpty():
rebuilt_np = parent_np.attachNewNode(node_name)
else:
rebuilt_np = loaded_np
rebuilt_np.reparentTo(parent_np)
rebuilt_np.setName(node_name)
else:
rebuilt_np = parent_np.attachNewNode(node_name)
transform = node.get("transform", {}) or {}
position = list(transform.get("position", [0, 0, 0]) or [0, 0, 0])
rotation = list(transform.get("rotation", [0, 0, 0]) or [0, 0, 0])
scale = list(transform.get("scale", [1, 1, 1]) or [1, 1, 1])
if len(position) >= 3:
rebuilt_np.setPos(float(position[0]), float(position[1]), float(position[2]))
if len(rotation) >= 3:
rebuilt_np.setHpr(float(rotation[0]), float(rotation[1]), float(rotation[2]))
if len(scale) >= 3:
rebuilt_np.setScale(float(scale[0]), float(scale[1]), float(scale[2]))
visibility = node.get("visibility", {}) or {}
user_visible = bool(visibility.get("user_visible", True))
rebuilt_np.setPythonTag("user_visible", user_visible)
rebuilt_np.setTag("user_visible", "true" if user_visible else "false")
if user_visible:
rebuilt_np.show()
else:
rebuilt_np.hide()
for tag_name, tag_value in tags.items():
if tag_value is None:
continue
rebuilt_np.setTag(str(tag_name), str(tag_value))
runtime_interactive = bool(metadata_component.get("runtime_interactive", node.get("runtime_interactive", False)))
if runtime_interactive:
rebuilt_np.setTag("runtime_interactive", "true")
if asset_guid:
rebuilt_np.setTag("asset_guid", asset_guid)
asset_path = str(model_component.get("asset_path", "") or node.get("asset_path", "") or "")
if asset_path:
asset_abs_path = os.path.join(project_path, asset_path.replace("/", os.sep))
rebuilt_np.setTag("asset_path", asset_path)
rebuilt_np.setTag("model_path", asset_abs_path)
rebuilt_np.setTag("saved_model_path", asset_abs_path)
if imported_node_key:
rebuilt_np.setTag("imported_node_key", imported_node_key)
scripts = list(scripts_component.get("entries", []) or node.get("scripts", []) or [])
if scripts:
rebuilt_np.setTag("has_scripts", "true")
rebuilt_np.setTag("scripts_info", json.dumps(scripts, ensure_ascii=False))
light_component = components.get("light", {}) or {}
if light_component.get("type"):
rebuilt_np.setTag("light_type", str(light_component.get("type")))
for input_name, tag_name in (
("energy", "light_energy"),
("radius", "light_radius"),
("fov", "light_fov"),
("stored_energy", "stored_energy"),
):
if input_name in light_component:
rebuilt_np.setTag(tag_name, str(light_component[input_name]))
return rebuilt_np
def _clone_imported_subnode(self, loaded_np, imported_node_key, node_name):
imported_node_key = str(imported_node_key or "").strip().strip("/")
if not imported_node_key:
return loaded_np
target_np = loaded_np
try:
for part in imported_node_key.split("/"):
if part == "":
continue
child_index = int(part)
children = list(target_np.getChildren())
if child_index < 0 or child_index >= len(children):
return loaded_np
target_np = children[child_index]
except Exception:
return loaded_np
try:
clone_root = NodePath(ModelRoot(node_name or target_np.getName() or "ImportedNode"))
cloned_child = target_np.copyTo(clone_root)
cloned_child.wrtReparentTo(clone_root)
cloned_child.setName(node_name or target_np.getName())
return cloned_child
except Exception:
return target_np
def _load_imported_hierarchy_index(self, asset_record, project_path):
imported_cache = asset_record.get("imported_cache", {}) or {}
hierarchy_rel = imported_cache.get("hierarchy", "")
if not hierarchy_rel:
return {}
hierarchy_path = os.path.join(project_path, hierarchy_rel.replace("/", os.sep))
if not os.path.exists(hierarchy_path):
return {}
try:
with open(hierarchy_path, "r", encoding="utf-8") as f:
payload = json.load(f)
index = {}
for item in payload if isinstance(payload, list) else []:
key = str((item or {}).get("key", "") or "").strip()
if key:
index[key] = dict(item or {})
return index
except Exception as e:
print(f"⚠️ 读取导入层级缓存失败: {hierarchy_path} ({e})")
return {}
def _find_subnode_by_name(self, loaded_np, expected_name):
expected_name = str(expected_name or "").strip()
if not expected_name:
return None
for candidate in loaded_np.findAllMatches("**"):
try:
if candidate.getName() == expected_name:
return candidate
except Exception:
continue
return None
def _load_asset_node_for_scene_description(self, asset_guid, project_path, asset_database, imported_node_key="", node_name=""):
asset_record = asset_database.get_asset(asset_guid)
if not asset_record:
return None
imported_cache = asset_record.get("imported_cache", {}) or {}
model_bam_rel = imported_cache.get("model_bam", "")
source_path_rel = asset_record.get("asset_path", "")
hierarchy_index = self._load_imported_hierarchy_index(asset_record, project_path)
candidate_paths = []
if model_bam_rel:
candidate_paths.append(os.path.join(project_path, model_bam_rel.replace("/", os.sep)))
if source_path_rel:
candidate_paths.append(os.path.join(project_path, source_path_rel.replace("/", os.sep)))
for candidate_path in candidate_paths:
if not candidate_path or not os.path.exists(candidate_path):
continue
try:
loaded_np = self.world.loader.loadModel(Filename.fromOsSpecific(candidate_path))
if loaded_np and not loaded_np.isEmpty():
if imported_node_key and candidate_path.endswith(".bam"):
expected_item = hierarchy_index.get(imported_node_key, {}) if hierarchy_index else {}
expected_name = str(expected_item.get("name", "") or node_name or "").strip()
cloned_np = self._clone_imported_subnode(loaded_np, imported_node_key, node_name)
cloned_name = ""
try:
cloned_name = cloned_np.getName()
except Exception:
cloned_name = ""
if expected_name and cloned_name and cloned_name != expected_name and cloned_name != node_name:
fallback_np = self._find_subnode_by_name(loaded_np, expected_name)
if fallback_np is not None and not fallback_np.isEmpty():
try:
clone_root = NodePath(ModelRoot(node_name or expected_name or "ImportedNode"))
fixed_np = fallback_np.copyTo(clone_root)
fixed_np.wrtReparentTo(clone_root)
fixed_np.setName(node_name or expected_name)
return fixed_np
except Exception:
pass
return cloned_np
return loaded_np
except Exception as e:
print(f"⚠️ 加载场景描述资产失败 {candidate_path}: {e}")
return None
def _is_scene_description_node_interactive(self, node):
if not isinstance(node, dict):
return False
if bool(node.get("runtime_interactive", False)):
return True
scripts = list(node.get("scripts", []) or [])
if scripts:
return True
components = dict(node.get("components", {}) or {})
scripts_component = dict(components.get("scripts", {}) or {})
if list(scripts_component.get("entries", []) or []):
return True
tags = dict(node.get("tags", {}) or {})
if str(tags.get("runtime_interactive", "")).lower() == "true":
return True
if str(tags.get("has_scripts", "")).lower() == "true":
return True
return False
def _node_belongs_to_asset_hierarchy(self, node, node_lookup):
if not isinstance(node, dict):
return False
asset_guid = str(node.get("asset_guid", "") or (node.get("components", {}) or {}).get("model", {}).get("asset_guid", "") or "").strip()
if asset_guid:
return True
parent_id = node.get("parent_id")
while parent_id:
parent_node = node_lookup.get(parent_id)
if not parent_node:
break
parent_asset_guid = str(parent_node.get("asset_guid", "") or (parent_node.get("components", {}) or {}).get("model", {}).get("asset_guid", "") or "").strip()
if parent_asset_guid:
return True
parent_id = parent_node.get("parent_id")
return False
def _build_scene_root_from_description(self, scene_description, project_path, asset_database, include_mode="all", skip_asset_nodes=False, return_node_map=False):
nodes = list(scene_description.get("nodes", []) or [])
node_lookup = {
str(node.get("node_id", "") or ""): dict(node)
for node in nodes
if node.get("node_id") is not None
}
keep_nodes = {}
for node_id, node in node_lookup.items():
if not self._should_keep_scene_description_node(node_id, node, node_lookup):
continue
if skip_asset_nodes:
if self._node_belongs_to_asset_hierarchy(node, node_lookup):
continue
imported_node_key = str(
node.get("imported_node_key", "")
or ((node.get("components", {}) or {}).get("model", {}) or {}).get("imported_node_key", "")
or ""
).strip()
if imported_node_key:
continue
if not self._is_scene_description_node_interactive(node):
components = dict(node.get("components", {}) or {})
tags = dict(node.get("tags", {}) or {})
has_light = bool(components.get("light"))
has_gui_tag = (
str(tags.get("is_gui_element", "")).lower() == "true"
or str(tags.get("gui_type", "")).strip() != ""
)
has_scene_element_tag = str(tags.get("is_scene_element", "")).lower() == "true"
if not has_light and not has_gui_tag and not has_scene_element_tag:
continue
is_interactive = self._is_scene_description_node_interactive(node)
if include_mode == "interactive" and not is_interactive:
continue
if include_mode == "static" and is_interactive:
continue
keep_nodes[node_id] = node
scene_root = NodePath(ModelRoot("SceneRoot"))
built_nodes = {}
for node_id, node in keep_nodes.items():
parent_id = self._resolve_scene_description_parent(node, keep_nodes, node_lookup)
parent_np = built_nodes.get(parent_id, scene_root)
rebuilt_np = self._instantiate_scene_description_node(
node=node,
parent_np=parent_np,
project_path=project_path,
asset_database=asset_database,
)
if rebuilt_np:
built_nodes[node_id] = rebuilt_np
if return_node_map:
return scene_root, keep_nodes, built_nodes
return scene_root, keep_nodes
def saveProject(self):
"""保存项目"""
try:
if not self.current_project_path:
print("错误: 请先创建或打开一个项目!")
return False
ssbo_editor = getattr(self.world, "ssbo_editor", None)
snapshot_material_fn = getattr(ssbo_editor, "_snapshot_runtime_materials_to_source_root", None) if ssbo_editor else None
if callable(snapshot_material_fn):
try:
snapshot_material_fn()
except Exception as e:
print(f"⚠️ 保存前同步 SSBO 材质到源场景失败: {e}")
project_path = self.current_project_path
ensure_project_directories(ProjectLayout(project_path))
project_config = self._ensure_v2_project_defaults(project_path, dict(self.project_config or {}))
_, project_config = self._upgrade_scene_description_files(project_path, project_config)
scene_entry = self._get_scene_entry(project_config=project_config)
if not scene_entry:
print("错误: 当前项目没有可保存的场景")
return False
self._retarget_live_scene_asset_paths()
self._write_scene_description_from_live_scene(scene_entry, project_path)
project_config["last_modified"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
project_config["last_open_scene_guid"] = scene_entry["guid"]
project_config["scene_file"] = scene_entry.get("path", project_config.get("scene_file", ""))
build_settings = project_config.get("build_settings", {}) or {}
enabled_scene_guids = list(build_settings.get("enabled_scene_guids", []) or [])
if scene_entry["guid"] not in enabled_scene_guids:
enabled_scene_guids.append(scene_entry["guid"])
build_settings["enabled_scene_guids"] = enabled_scene_guids
project_config["build_settings"] = build_settings
self._write_project_config(project_path, project_config)
self.project_config = project_config
self.current_scene_guid = scene_entry["guid"]
self._cleanup_legacy_scene_artifacts(scene_entry, project_path)
print("项目保存成功!")
return True
except Exception as e:
print(f"保存项目时发生错误:{str(e)}")
return False
# ==================== 项目打包功能 ====================
def _load_scene_description(self, scene_entry, project_path):
scene_file_path = os.path.join(project_path, scene_entry["path"].replace("/", os.sep))
return normalize_scene_description(load_json(scene_file_path, {}))
def _resolve_enabled_scene_entries(self, project_config):
_, active_profile = self._get_active_build_profile(project_config)
runtime_settings = dict((active_profile or {}).get("runtime", {}) or {})
enabled_guids = set((runtime_settings.get("enabled_scene_guids", []) or (project_config.get("build_settings") or {}).get("enabled_scene_guids", []) or []))
scene_entries = self._get_scene_entries(project_config)
if not enabled_guids:
return scene_entries
return [scene for scene in scene_entries if scene.get("guid") in enabled_guids]
def _copy_asset_record_to_runtime(self, asset_record, data_root, project_path):
if not asset_record:
return
asset_guid = asset_record.get("guid", "")
if not asset_guid:
return
source_root = os.path.join(project_path, asset_record.get("asset_path", "").replace("/", os.sep))
target_root = os.path.join(data_root, "assets", asset_guid)
os.makedirs(target_root, exist_ok=True)
if os.path.exists(source_root) and os.path.isfile(source_root):
shutil.copy2(source_root, os.path.join(target_root, os.path.basename(source_root)))
imported_cache = asset_record.get("imported_cache", {}) or {}
imported_root_rel = imported_cache.get("root", "")
if imported_root_rel:
imported_root = os.path.join(project_path, imported_root_rel.replace("/", os.sep))
if os.path.exists(imported_root):
runtime_imported_root = os.path.join(target_root, "imported")
if os.path.exists(runtime_imported_root):
shutil.rmtree(runtime_imported_root)
shutil.copytree(imported_root, runtime_imported_root)
def _stage_runtime_data(self, project_path, project_config, data_root):
layout = self.get_project_layout(project_path)
asset_database = self.get_asset_database(project_path, reload=True)
asset_database.ensure_project_assets_registered()
assets_runtime_root = os.path.join(data_root, "Assets")
if os.path.exists(layout.assets_root):
for root, dirs, files in os.walk(layout.assets_root):
rel_root = os.path.relpath(root, layout.assets_root)
normalized_rel_root = "" if rel_root == "." else rel_root
dirs[:] = [name for name in dirs if name != "Scripts"]
target_root = assets_runtime_root if not normalized_rel_root else os.path.join(assets_runtime_root, normalized_rel_root)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
if file_name.endswith(".meta"):
continue
shutil.copy2(os.path.join(root, file_name), os.path.join(target_root, file_name))
runtime_scenes = []
referenced_asset_guids = set()
cook_manifests = []
def _rewrite_gui_media_paths(scene_description):
gui_items = list(scene_description.get("gui", []) or [])
for gui_info in gui_items:
for key_name, preferred_subdir in (
("image_path", "UI"),
("bg_image_path", "UI"),
("video_path", "Video"),
):
raw_value = str(gui_info.get(key_name, "") or "").strip()
if not raw_value or raw_value.startswith(("http://", "https://")):
continue
resolved_path = raw_value
if not os.path.isabs(resolved_path):
resolved_path = os.path.join(project_path, raw_value.replace("/", os.sep))
if not os.path.exists(resolved_path):
continue
asset_record = asset_database.register_asset(resolved_path, preferred_subdir=preferred_subdir, copy_into_assets=False)
if asset_record:
referenced_asset_guids.add(asset_record.get("guid", ""))
runtime_file = f"assets/{asset_record['guid']}/{os.path.basename(resolved_path)}"
gui_info[key_name] = runtime_file.replace("\\", "/")
scene_description["gui"] = gui_items
return scene_description
for scene_entry in self._resolve_enabled_scene_entries(project_config):
scene_description = self._load_scene_description(scene_entry, project_path)
if not scene_description:
continue
scene_description = _rewrite_gui_media_paths(dict(scene_description))
build_cache_dir = layout.build_cache_dir(scene_entry["guid"])
if os.path.exists(build_cache_dir):
shutil.rmtree(build_cache_dir)
os.makedirs(build_cache_dir, exist_ok=True)
runtime_scene = build_runtime_scene(scene_description)
cook_manifest = build_scene_cook_manifest(scene_description, runtime_scene)
runtime_scenes.append(runtime_scene)
cook_manifests.append(cook_manifest)
save_json(
os.path.join(data_root, "scenes", f"{scene_entry['guid']}.runtime.json"),
runtime_scene,
)
save_json(
os.path.join(build_cache_dir, "runtime_scene.json"),
runtime_scene,
)
save_json(
os.path.join(build_cache_dir, "scene_description.json"),
scene_description,
)
save_json(
os.path.join(build_cache_dir, "cook_manifest.json"),
cook_manifest,
)
referenced_asset_guids.update(scene_description.get("referenced_asset_guids", []) or [])
referenced_asset_guids.update(scene_description.get("referenced_script_guids", []) or [])
asset_records = []
for asset_guid in sorted(guid for guid in referenced_asset_guids if guid):
asset_record = asset_database.get_asset(asset_guid)
if not asset_record:
continue
asset_records.append(asset_record)
self._copy_asset_record_to_runtime(asset_record, data_root, project_path)
manifest = build_runtime_manifest(project_config, runtime_scenes, asset_records)
save_json(os.path.join(data_root, "manifest.json"), manifest)
save_json(
os.path.join(data_root, "cook_manifest.json"),
{
"schema_version": 1,
"project_name": project_config.get("name", f"{ENGINE_NAME}Project"),
"scene_guids": [scene.get("scene_guid", "") for scene in runtime_scenes if scene.get("scene_guid")],
"scenes": cook_manifests,
},
)
cook_summary = dict(manifest.get("cook_summary", {}) or {})
print(
"✓ Cook 构建统计: "
f"场景 {cook_summary.get('scene_count', 0)} 个, "
f"资源 {cook_summary.get('asset_count', 0)} 个, "
f"交互节点 {cook_summary.get('interactive_node_count', 0)} 个, "
f"静态节点 {cook_summary.get('static_node_count', 0)}"
)
return manifest
def buildWebGLPackage(self, output_dir):
"""将当前项目导出为 WebGL 静态站点目录。"""
try:
if not self.current_project_path:
print("错误: 请先创建或打开一个项目!")
return False
if not output_dir:
print("错误: 请指定 WebGL 打包输出目录!")
return False
project_path = normalize_path(self.current_project_path)
ensure_project_directories(ProjectLayout(project_path))
if hasattr(self.world, "selection") and self.world.selection:
self.world.selection.clearSelection()
print("已取消场景中的物体选中状态")
if not self.saveProject():
print("错误: WebGL 打包前保存场景失败!")
return False
output_dir = normalize_path(output_dir)
packager = WebGLPackager(self.world)
report = packager.package(project_path, output_dir)
self.last_webgl_export_report = report
status = str(report.get("status", "failed") or "failed")
report_path = os.path.join(
str(report.get("output_dir", "") or ""),
"reports",
"export_report.json",
)
if status in ("success", "partial"):
print(f"WebGL打包完成: {status}")
print(f"输出目录: {report.get('output_dir', '')}")
print(f"报告路径: {report_path}")
return True
print("WebGL打包失败")
if report_path:
print(f"报告路径: {report_path}")
return False
except Exception as exc:
print(f"WebGL打包过程出错: {exc}")
return False
def buildPackage(self, build_dir):
"""将当前项目打包为最终运行程序。"""
try:
if not self.current_project_path:
print("错误: 请先创建或打开一个项目!")
return False
project_path = normalize_path(self.current_project_path)
ensure_project_directories(ProjectLayout(project_path))
self.project_config = self._ensure_v2_project_defaults(project_path, self.project_config or {})
_, self.project_config = self._upgrade_scene_description_files(project_path, self.project_config)
_, active_profile = self._get_active_build_profile(self.project_config)
profile_runtime = dict((active_profile or {}).get("runtime", {}) or {})
profile_windows = dict((active_profile or {}).get("windows", {}) or {})
configured_startup_scene = str(profile_runtime.get("startup_scene_guid", "") or "").strip()
if configured_startup_scene:
self.project_config["startup_scene_guid"] = configured_startup_scene
project_display_name = (
(self.project_config or {}).get("name")
or os.path.basename(project_path)
or f"{ENGINE_NAME}Project"
)
configured_exe_name = str(profile_windows.get("exe_name", "") or "").strip()
project_name = self._sanitize_build_name(configured_exe_name or project_display_name)
if not build_dir:
print("错误: 请指定打包输出目录!")
return False
if hasattr(self.world, "selection") and self.world.selection:
self.world.selection.clearSelection()
print("已取消场景中的物体选中状态")
self._sync_project_script_manager(project_path, reload_scripts=True)
if not self.saveProject():
print("错误: 保存场景失败!")
return False
output_root = normalize_path(build_dir)
staging_root = os.path.join(output_root, f".{project_name}_staging")
source_root = os.path.join(staging_root, "source")
data_root = os.path.join(staging_root, "data")
dist_dir = os.path.join(output_root, project_name)
if os.path.exists(staging_root):
shutil.rmtree(staging_root)
os.makedirs(source_root, exist_ok=True)
os.makedirs(data_root, exist_ok=True)
if os.path.exists(dist_dir):
shutil.rmtree(dist_dir)
manifest = self._stage_runtime_data(project_path, self.project_config or {}, data_root)
if not manifest.get("scenes"):
print("错误: 当前项目没有可打包的场景")
return False
self._copyScriptsToBuild(data_root, project_path)
self._copyRenderPipelineRuntime(os.path.join(data_root, "pipeline"))
runtime_entry = self._createProjectRuntimeEntry(source_root, project_display_name)
if not runtime_entry:
print("错误: 生成项目运行时入口失败")
return False
success = self._executeProjectBuild(
source_root=source_root,
data_root=data_root,
build_root=output_root,
project_name=project_name,
)
if not success:
return False
if os.path.exists(staging_root):
shutil.rmtree(staging_root, ignore_errors=True)
exe_path = os.path.join(dist_dir, f"{project_name}.exe")
print(f"打包完成!项目运行程序: {exe_path}")
build_result = {
"success": True,
"project_name": project_name,
"output_dir": dist_dir,
"exe_path": exe_path,
"manifest": manifest,
"cook_summary": dict(manifest.get("cook_summary", {}) or {}),
}
build_result["validation"] = self._validate_build_output(dist_dir, build_result)
report_path = self._write_build_report(dist_dir, build_result)
if report_path:
build_result["report_path"] = report_path
print(f"✓ 构建报告已生成: {report_path}")
summary_path = self._write_build_summary(dist_dir, build_result)
if summary_path:
build_result["summary_path"] = summary_path
print(f"✓ 构建摘要已生成: {summary_path}")
return build_result
except Exception as e:
print(f"打包过程出错:{str(e)}")
return False
def _merge_folder_contents(self, source_folder, destination_folder):
"""将 source_folder 的内容合并到 destination_folder。"""
if not source_folder or not os.path.exists(source_folder):
return False
os.makedirs(destination_folder, exist_ok=True)
for root, _, files in os.walk(source_folder):
rel_root = os.path.relpath(root, source_folder)
target_root = destination_folder if rel_root == "." else os.path.join(destination_folder, rel_root)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
source_file = os.path.join(root, file_name)
target_file = os.path.join(target_root, file_name)
shutil.copy2(source_file, target_file)
return True
def _collect_output_file_stats(self, root_path):
stats = {
"file_count": 0,
"dir_count": 0,
"total_size_bytes": 0,
}
if not root_path or not os.path.exists(root_path):
return stats
for root, dirs, files in os.walk(root_path):
stats["dir_count"] += len(dirs)
stats["file_count"] += len(files)
for file_name in files:
file_path = os.path.join(root, file_name)
try:
stats["total_size_bytes"] += os.path.getsize(file_path)
except OSError:
continue
return stats
def _validate_build_output(self, output_dir, build_result):
validation = {
"ok": True,
"checks": [],
}
if not output_dir or not isinstance(build_result, dict):
validation["ok"] = False
validation["checks"].append({"name": "build_result", "ok": False, "detail": "missing output_dir or build_result"})
return validation
def add_check(name, path_value, required=True):
exists = bool(path_value) and os.path.exists(path_value)
validation["checks"].append(
{
"name": name,
"path": path_value,
"ok": exists if required else True,
"required": required,
}
)
if required and not exists:
validation["ok"] = False
exe_path = str(build_result.get("exe_path", "") or "")
add_check("exe", exe_path)
data_root = os.path.join(output_dir, "_internal", "data")
if not os.path.exists(data_root):
data_root = os.path.join(output_dir, "data")
add_check("data_root", data_root)
manifest_path = os.path.join(data_root, "manifest.json") if data_root else ""
add_check("manifest", manifest_path)
manifest = dict(build_result.get("manifest", {}) or {})
scenes = list(manifest.get("scenes", []) or [])
for scene_entry in scenes:
runtime_rel = str(scene_entry.get("runtime_path", "") or "")
runtime_abs = os.path.join(data_root, runtime_rel.replace("/", os.sep)) if runtime_rel else ""
scene_guid = str(scene_entry.get("guid", "") or "")
add_check(f"runtime_scene:{scene_guid}", runtime_abs)
add_check("scripts_dir", os.path.join(data_root, "scripts"))
add_check("pipeline_dir", os.path.join(data_root, "pipeline"))
legacy_scene_bams = []
for root, _, files in os.walk(output_dir):
for file_name in files:
if file_name.lower() != "scene.bam":
continue
legacy_scene_bams.append(os.path.join(root, file_name))
validation["checks"].append(
{
"name": "no_scene_bam_in_scene_pipeline",
"path": ", ".join(legacy_scene_bams),
"ok": not legacy_scene_bams,
"required": True,
"detail": "scene.bam 仅允许作为模型资源/模型缓存存在,不允许进入项目场景与运行时场景链路",
}
)
if legacy_scene_bams:
validation["ok"] = False
return validation
def _write_build_report(self, output_dir, build_result):
if not output_dir or not isinstance(build_result, dict):
return ""
report_payload = {
"schema_version": 1,
"generated_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"engine_name": ENGINE_NAME,
"scene_format": SCENE_DESCRIPTION_EXTENSION.lstrip("."),
"project_name": build_result.get("project_name", ""),
"exe_path": build_result.get("exe_path", ""),
"output_dir": build_result.get("output_dir", ""),
"cook_summary": dict(build_result.get("cook_summary", {}) or {}),
"output_stats": self._collect_output_file_stats(output_dir),
"validation": self._validate_build_output(output_dir, build_result),
}
report_path = os.path.join(output_dir, "build_report.json")
save_json(report_path, report_payload)
return report_path
def _write_build_summary(self, output_dir, build_result):
if not output_dir or not isinstance(build_result, dict):
return ""
cook_summary = dict(build_result.get("cook_summary", {}) or {})
validation = dict(build_result.get("validation", {}) or {})
lines = [
f"Project: {build_result.get('project_name', '')}",
f"Output: {build_result.get('output_dir', '')}",
f"EXE: {build_result.get('exe_path', '')}",
f"Validation: {'OK' if validation.get('ok', False) else 'FAILED'}",
f"Scenes: {cook_summary.get('scene_count', 0)}",
f"Assets: {cook_summary.get('asset_count', 0)}",
f"Interactive Nodes: {cook_summary.get('interactive_node_count', 0)}",
f"Static Nodes: {cook_summary.get('static_node_count', 0)}",
f"Interactive Models: {cook_summary.get('interactive_model_count', 0)}",
f"Static Models: {cook_summary.get('static_model_count', 0)}",
"",
"Checks:",
]
for check in list(validation.get("checks", []) or []):
status = "OK" if check.get("ok") else "MISSING"
lines.append(f"- {status}: {check.get('name', '')} -> {check.get('path', '')}")
summary_path = os.path.join(output_dir, "build_summary.txt")
with open(summary_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
return summary_path
def _load_runtime_template(self):
template_path = os.path.join(self._get_repo_root(), "templates", "main_template.py")
with open(template_path, "r", encoding="utf-8") as f:
return f.read()
def _createProjectRuntimeEntry(self, source_root, project_name):
try:
template_content = self._load_runtime_template()
safe_project_name = project_name.replace("\\", "\\\\").replace('"', '\\"')
runtime_source = template_content.replace("__EG_PROJECT_NAME__", safe_project_name)
runtime_entry = os.path.join(source_root, "main.py")
with open(runtime_entry, "w", encoding="utf-8") as f:
f.write(runtime_source)
return runtime_entry
except Exception as e:
print(f"创建项目运行时入口失败: {e}")
return ""
def _executeProjectBuild(self, source_root, data_root, build_root, project_name):
repo_root = self._get_repo_root()
work_root = os.path.join(build_root, f".{project_name}_work")
spec_root = os.path.join(build_root, f".{project_name}_spec")
for directory in (work_root, spec_root):
if os.path.exists(directory):
shutil.rmtree(directory)
os.makedirs(directory, exist_ok=True)
pyinstaller_runner = None
pyinstaller_candidates = [
[sys.executable, "-m", "PyInstaller"],
[sys.executable, "-m", "pyinstaller"],
["pyinstaller"],
]
for candidate in pyinstaller_candidates:
pyinstaller_probe = subprocess.run(
[*candidate, "--version"],
capture_output=True,
text=True,
check=False,
)
if pyinstaller_probe.returncode == 0:
pyinstaller_runner = candidate
break
if not pyinstaller_runner:
print("错误: 当前环境未安装 PyInstaller无法执行项目打包。")
return False
sep = ";" if os.name == "nt" else ":"
command = [
*pyinstaller_runner,
"--noconfirm",
"--clean",
"--windowed",
"--name",
project_name,
"--distpath",
build_root,
"--workpath",
work_root,
"--specpath",
spec_root,
"--paths",
source_root,
"--paths",
repo_root,
"--paths",
os.path.join(repo_root, "RenderPipelineFile"),
"--exclude-module",
"imgui_bundle",
"--exclude-module",
"p3dimgui",
"--hidden-import",
"core",
"--hidden-import",
"core.script_system",
"--hidden-import",
"core.CustomMouseController",
"--hidden-import",
"ssbo_component.runtime_importer",
"--hidden-import",
"ssbo_component.ssbo_controller",
"--hidden-import",
"rpcore",
"--collect-submodules",
"rpcore",
"--collect-submodules",
"rpplugins",
"--collect-submodules",
"rplibs",
]
panda_binary_dir = self._get_panda3d_binary_dir()
panda_display_binaries = [
"libpandagl.dll",
"libp3windisplay.dll",
"libpandadx9.dll",
"libp3tinydisplay.dll",
"cgGL.dll",
"cgD3D9.dll",
"d3dx9_43.dll",
]
if panda_binary_dir:
for binary_name in panda_display_binaries:
binary_path = os.path.join(panda_binary_dir, binary_name)
if os.path.exists(binary_path):
command.extend(["--add-binary", f"{binary_path}{sep}."])
if os.path.exists(data_root):
command.extend(["--add-data", f"{data_root}{sep}data"])
command.append(os.path.join(source_root, "main.py"))
result = subprocess.run(
command,
cwd=source_root,
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
print("项目打包失败。")
output = (result.stdout or "") + "\n" + (result.stderr or "")
print(output[-4000:])
return False
for directory in (work_root, spec_root):
if os.path.exists(directory):
shutil.rmtree(directory, ignore_errors=True)
print(f"✓ 项目已打包到: {os.path.join(build_root, project_name)}")
return True
def _get_panda3d_binary_dir(self):
try:
panda3d_spec = importlib.util.find_spec("panda3d.core")
if panda3d_spec and panda3d_spec.origin:
panda_package_dir = os.path.dirname(os.path.dirname(os.path.abspath(panda3d_spec.origin)))
candidate_dirs = [
os.path.join(panda_package_dir, "bin"),
os.path.join(os.path.dirname(panda_package_dir), "bin"),
]
for candidate_dir in candidate_dirs:
if os.path.isdir(candidate_dir):
return candidate_dir
except Exception as e:
print(f"⚠️ 获取 Panda3D 二进制目录失败: {e}")
return ""
def _copyScriptsToBuild(self, build_dir, project_path):
"""将项目脚本编译后复制到构建目录,避免直接暴露源码。"""
try:
scripts_dest = os.path.join(build_dir, "scripts")
scripts_src = self.get_project_scripts_dir(project_path)
if os.path.exists(scripts_dest):
shutil.rmtree(scripts_dest)
os.makedirs(scripts_dest, exist_ok=True)
if os.path.exists(scripts_src):
compiled_count = 0
fallback_count = 0
for root, _, files in os.walk(scripts_src):
rel_root = os.path.relpath(root, scripts_src)
target_root = scripts_dest if rel_root == "." else os.path.join(scripts_dest, rel_root)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
if file_name.startswith("__"):
continue
source_file = os.path.join(root, file_name)
if file_name.endswith(".py"):
compiled_file = os.path.join(
target_root,
f"{os.path.splitext(file_name)[0]}.pyc",
)
try:
py_compile.compile(source_file, cfile=compiled_file, doraise=True)
compiled_count += 1
except py_compile.PyCompileError as e:
fallback_file = os.path.join(target_root, file_name)
shutil.copy2(source_file, fallback_file)
fallback_count += 1
print(f"⚠️ 编译脚本失败,已回退复制源码: {source_file} -> {fallback_file} ({e})")
elif not file_name.endswith((".pyc", ".pyo", ".log")):
shutil.copy2(source_file, os.path.join(target_root, file_name))
print(f"✓ Scripts已导出到 build/scripts编译 {compiled_count} 个脚本")
if fallback_count:
print(f"⚠️ {fallback_count} 个脚本因编译失败保留为 .py 源码")
else:
print("⚠️ 项目中没有 Assets/Scripts 目录")
except Exception as e:
print(f"⚠️ 复制脚本文件时出错: {str(e)}")
def _copyRenderPipelineRuntime(self, build_dir):
"""复制运行时需要的 RenderPipeline 资源,并将源码编译为 pyc。"""
try:
source_root = os.path.join(self._get_repo_root(), "RenderPipelineFile")
target_root = build_dir
if os.path.exists(target_root):
shutil.rmtree(target_root)
if not os.path.exists(source_root):
print("⚠️ RenderPipelineFile 文件夹未找到")
return False
blocked_dirs = {"__pycache__", ".git", ".vscode", "samples", "toolkit"}
blocked_root_files = {"setup.py", "start_daytime_editor.py", "start_plugin_configurator.py"}
blocked_relative_dirs = {
os.path.normpath(os.path.join("rplibs", "yaml", "yaml_py2")),
}
for root, dirs, files in os.walk(source_root):
relative_root = os.path.relpath(root, source_root)
normalized_relative_root = os.path.normpath(relative_root)
dirs[:] = [
name for name in dirs
if name not in blocked_dirs
and os.path.normpath(os.path.join(normalized_relative_root, name)) not in blocked_relative_dirs
]
target_dir = target_root if relative_root == "." else os.path.join(target_root, relative_root)
os.makedirs(target_dir, exist_ok=True)
for file_name in files:
if file_name.endswith((".pyc", ".pyo", ".log")):
continue
if relative_root == "." and file_name in blocked_root_files:
continue
source_file = os.path.join(root, file_name)
if file_name.endswith(".py"):
compiled_file = os.path.join(
target_dir,
f"{os.path.splitext(file_name)[0]}.pyc",
)
try:
py_compile.compile(source_file, cfile=compiled_file, doraise=True)
except py_compile.PyCompileError as e:
print(f"⚠️ RenderPipeline 脚本编译失败,保留源码: {source_file} ({e})")
shutil.copy2(source_file, os.path.join(target_dir, file_name))
else:
shutil.copy2(source_file, os.path.join(target_dir, file_name))
print("✓ RenderPipeline 运行时资源已导出")
return True
except Exception as e:
print(f"⚠️ 导出 RenderPipeline 运行时资源失败: {e}")
return False
# ==================== 辅助方法 ====================
def _clearCurrentScene(self):
"""清空当前场景"""
try:
selection = getattr(self.world, "selection", None)
if selection and hasattr(selection, "clearSelection"):
try:
selection.clearSelection()
except Exception:
pass
script_manager = getattr(self.world, "script_manager", None)
if script_manager and hasattr(script_manager, "reset_scene_state"):
try:
script_manager.reset_scene_state()
except Exception:
pass
ssbo_editor = getattr(self.world, "ssbo_editor", None)
if ssbo_editor and hasattr(ssbo_editor, "reset_scene_state"):
try:
ssbo_editor.reset_scene_state()
except Exception:
pass
scene_manager = getattr(self.world, "scene_manager", None)
if scene_manager:
tree_widget = scene_manager._get_tree_widget() if hasattr(scene_manager, "_get_tree_widget") else None
for model in list(getattr(scene_manager, "models", []) or []):
try:
if tree_widget:
try:
tree_widget.delete_item(model)
except Exception:
pass
if model and not model.isEmpty():
model.removeNode()
except Exception:
pass
scene_manager.models = []
for collection_name in ("Spotlight", "Pointlight"):
collection = list(getattr(scene_manager, collection_name, []) or [])
for light_node in collection:
try:
if tree_widget:
try:
tree_widget.delete_item(light_node)
except Exception:
pass
if light_node and not light_node.isEmpty():
light_node.removeNode()
except Exception:
pass
setattr(scene_manager, collection_name, [])
terrain_manager = getattr(self.world, "terrain_manager", None)
terrains = list(getattr(terrain_manager, "terrains", []) or []) if terrain_manager else []
for terrain in terrains:
try:
if tree_widget:
try:
tree_widget.delete_item(terrain)
except Exception:
pass
if terrain and not terrain.isEmpty():
terrain.removeNode()
except Exception:
pass
cleanup_aux_fn = getattr(scene_manager, "_cleanupAuxiliaryNodes", None)
if callable(cleanup_aux_fn):
try:
cleanup_aux_fn()
except Exception:
pass
cleanup_render_fn = getattr(scene_manager, "_cleanup_untracked_render_children", None)
if callable(cleanup_render_fn):
try:
cleanup_render_fn()
except Exception:
pass
update_tree_fn = getattr(scene_manager, "updateSceneTree", None)
if callable(update_tree_fn):
try:
update_tree_fn()
except Exception:
pass
lui_manager = getattr(self.world, "lui_manager", None)
if lui_manager:
clear_lui_fn = getattr(lui_manager, "_clear_lui_structure_runtime", None)
if callable(clear_lui_fn):
try:
clear_lui_fn()
except Exception:
pass
print("✓ 当前场景已清空")
except Exception as e:
print(f"清空场景失败: {str(e)}")
def updateWindowTitle(self, parent_window, project_name):
"""更新窗口标题(保留方法以兼容旧代码)"""
# 这个方法现在不需要做任何事情因为我们不再处理UI
pass