EG/project/project_manager.py
2026-03-18 18:34:44 +08:00

1949 lines
88 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import copy
import datetime
import subprocess
import shutil
import importlib.util
import py_compile
import uuid
from panda3d.core import Filename, ModelRoot, NodePath
from project.asset_database import AssetDatabase
from project.project_schema import (
PROJECT_SCHEMA_VERSION,
ProjectLayout,
default_build_settings,
ensure_project_directories,
generate_guid,
normalize_path,
relative_project_path,
)
from project.scene_description import (
build_scene_cook_manifest,
build_runtime_manifest,
build_runtime_scene,
build_scene_description_from_cache,
load_json,
normalize_scene_description,
save_json,
)
class ProjectManager:
def __init__(self, world):
self.world = world
self.current_project_path = None
self.project_config = None
self.current_scene_guid = None
self._asset_database = None
print("✓ 项目管理系统初始化完成")
def _get_repo_root(self):
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def get_project_scripts_dir(self, project_path=None):
project_path = os.path.normpath(project_path or self.current_project_path or "")
if not project_path:
return ""
return os.path.join(project_path, "Assets", "Scripts")
def get_project_layout(self, project_path=None):
project_path = os.path.normpath(project_path or self.current_project_path or "")
if not project_path:
return None
return ProjectLayout(project_path)
def get_asset_database(self, project_path=None, *, reload=False):
project_path = os.path.normpath(project_path or self.current_project_path or "")
if not project_path:
return None
if reload or self._asset_database is None or self._asset_database.layout.project_root != normalize_path(project_path):
self._asset_database = AssetDatabase(project_path, world=self.world)
return self._asset_database
def _sync_resource_manager_root(self, project_path=None):
resource_manager = getattr(self.world, "resource_manager", None)
if resource_manager and hasattr(resource_manager, "set_project_path"):
resource_manager.set_project_path(project_path or self.current_project_path)
def _build_scene_entry(self, scene_guid, scene_name):
return {
"guid": scene_guid,
"name": scene_name,
"path": f"Scenes/{scene_name}.egscene",
"cache_path": f"Library/SceneCache/{scene_guid}/scene.bam",
"enabled_in_build": True,
}
def _get_scene_entries(self, project_config=None):
project_config = project_config or self.project_config or {}
scenes = project_config.get("scenes", [])
return list(scenes) if isinstance(scenes, list) else []
def _get_scene_entry(self, scene_guid=None, project_config=None):
project_config = project_config or self.project_config or {}
target_guid = scene_guid or self.current_scene_guid or project_config.get("last_open_scene_guid") or project_config.get("startup_scene_guid")
for scene_entry in self._get_scene_entries(project_config):
if scene_entry.get("guid") == target_guid:
return dict(scene_entry)
scenes = self._get_scene_entries(project_config)
return dict(scenes[0]) if scenes else {}
def _scene_paths(self, scene_entry=None, project_path=None):
layout = self.get_project_layout(project_path)
if not layout:
return {}
scene_entry = scene_entry or self._get_scene_entry(project_config=self.project_config)
if not scene_entry:
return {}
scene_rel = scene_entry.get("path", "")
cache_rel = scene_entry.get("cache_path", "")
scene_abs = os.path.join(layout.project_root, scene_rel.replace("/", os.sep)) if scene_rel else ""
cache_abs = os.path.join(layout.project_root, cache_rel.replace("/", os.sep)) if cache_rel else ""
return {
"scene_file": scene_abs,
"cache_file": cache_abs,
"cache_gui_file": cache_abs.replace(".bam", "_gui.json") if cache_abs else "",
"cache_lui_file": cache_abs.replace(".bam", "_lui.json") if cache_abs else "",
}
def _create_default_project_config(self, project_root, project_name):
scene_guid = generate_guid()
scene_name = "Main"
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return {
"schema_version": PROJECT_SCHEMA_VERSION,
"name": project_name,
"path": normalize_path(project_root),
"created_at": timestamp,
"last_modified": timestamp,
"version": "2.0.0",
"engine_version": "2.0.0",
"scenes": [self._build_scene_entry(scene_guid, scene_name)],
"scene_file": f"Scenes/{scene_name}.egscene",
"startup_scene_guid": scene_guid,
"last_open_scene_guid": scene_guid,
"build_settings": {
**default_build_settings(),
"enabled_scene_guids": [scene_guid],
},
}
def _ensure_v2_project_defaults(self, project_path, project_config):
project_config = dict(project_config or {})
project_config["schema_version"] = PROJECT_SCHEMA_VERSION
project_config["path"] = normalize_path(project_path)
scenes = self._get_scene_entries(project_config)
if not scenes:
fallback_guid = generate_guid()
fallback_name = "Main"
scenes = [self._build_scene_entry(fallback_guid, fallback_name)]
project_config["scenes"] = scenes
startup_scene_guid = project_config.get("startup_scene_guid") or scenes[0]["guid"]
last_open_scene_guid = project_config.get("last_open_scene_guid") or startup_scene_guid
project_config["startup_scene_guid"] = startup_scene_guid
project_config["last_open_scene_guid"] = last_open_scene_guid
current_scene_entry = self._get_scene_entry(scene_guid=last_open_scene_guid, project_config=project_config) or scenes[0]
project_config["scene_file"] = current_scene_entry.get("path", project_config.get("scene_file", ""))
build_settings = project_config.get("build_settings", {}) or {}
enabled_scene_guids = list(build_settings.get("enabled_scene_guids", []) or [])
if not enabled_scene_guids:
enabled_scene_guids = [scene["guid"] for scene in scenes]
build_settings["enabled_scene_guids"] = enabled_scene_guids
build_settings.setdefault("output_format", "directory")
build_settings.setdefault("windows_player", True)
profiles = build_settings.get("profiles", {}) or {}
active_profile = str(build_settings.get("active_profile", "") or "default")
default_profile = dict((profiles.get(active_profile) or profiles.get("default") or {}))
if not default_profile:
default_profile = {
"name": "Default",
"target_platform": "windows",
"output_format": build_settings.get("output_format", "directory"),
"output_dir": "Builds",
"windows": {
"enabled": bool(build_settings.get("windows_player", True)),
"exe_name": "",
"windowed": True,
},
"runtime": {
"startup_scene_guid": project_config.get("startup_scene_guid", ""),
"enabled_scene_guids": enabled_scene_guids,
"script_mode": "pyc",
},
}
default_profile = self._normalize_build_profile(
default_profile,
scenes=scenes,
project_config=project_config,
default_enabled_scene_guids=enabled_scene_guids,
fallback_profile_name="Default",
fallback_output_dir="Builds",
fallback_output_format=build_settings.get("output_format", "directory"),
fallback_windows_enabled=build_settings.get("windows_player", True),
)
build_settings["active_profile"] = active_profile or "default"
build_settings["profiles"] = {"default": default_profile, **{k: v for k, v in profiles.items() if k != "default"}}
project_config["build_settings"] = build_settings
return project_config
def _normalize_enabled_scene_guids(self, enabled_scene_guids, scenes):
valid_scene_guids = [str(scene.get("guid", "") or "") for scene in (scenes or []) if scene.get("guid")]
valid_scene_guid_set = set(valid_scene_guids)
normalized = []
for scene_guid in enabled_scene_guids or []:
normalized_guid = str(scene_guid or "").strip()
if normalized_guid and normalized_guid in valid_scene_guid_set and normalized_guid not in normalized:
normalized.append(normalized_guid)
return normalized or valid_scene_guids
def _normalize_build_profile(
self,
profile,
*,
scenes,
project_config,
default_enabled_scene_guids,
fallback_profile_name,
fallback_output_dir,
fallback_output_format,
fallback_windows_enabled,
):
profile = copy.deepcopy(profile or {})
normalized_output_dir = str(profile.get("output_dir", fallback_output_dir) or fallback_output_dir).strip() or fallback_output_dir
normalized_output_dir = normalized_output_dir.replace("\\", "/")
while normalized_output_dir.startswith("./"):
normalized_output_dir = normalized_output_dir[2:]
normalized_output_dir = normalized_output_dir.strip("/") or fallback_output_dir
profile["name"] = str(profile.get("name", fallback_profile_name) or fallback_profile_name)
profile["target_platform"] = str(profile.get("target_platform", "windows") or "windows")
profile["output_format"] = str(profile.get("output_format", fallback_output_format) or fallback_output_format)
profile["output_dir"] = normalized_output_dir
profile["windows"] = dict(profile.get("windows", {}) or {})
profile["windows"]["enabled"] = bool(profile["windows"].get("enabled", fallback_windows_enabled))
profile["windows"]["exe_name"] = str(profile["windows"].get("exe_name", "") or "").strip()
profile["windows"]["windowed"] = bool(profile["windows"].get("windowed", True))
profile["runtime"] = dict(profile.get("runtime", {}) or {})
normalized_enabled_scene_guids = self._normalize_enabled_scene_guids(
profile["runtime"].get("enabled_scene_guids", []) or default_enabled_scene_guids,
scenes,
)
startup_scene_guid = str(
profile["runtime"].get("startup_scene_guid", project_config.get("startup_scene_guid", "")) or project_config.get("startup_scene_guid", "")
).strip()
if startup_scene_guid not in normalized_enabled_scene_guids:
startup_scene_guid = normalized_enabled_scene_guids[0] if normalized_enabled_scene_guids else ""
profile["runtime"]["enabled_scene_guids"] = normalized_enabled_scene_guids
profile["runtime"]["startup_scene_guid"] = startup_scene_guid
profile["runtime"]["script_mode"] = str(profile["runtime"].get("script_mode", "pyc") or "pyc")
return profile
def _get_active_build_profile(self, project_config=None):
project_config = self._ensure_v2_project_defaults(
self.current_project_path or (project_config or {}).get("path") or "",
project_config or self.project_config or {},
)
build_settings = project_config.get("build_settings", {}) or {}
active_profile = str(build_settings.get("active_profile", "default") or "default")
profiles = build_settings.get("profiles", {}) or {}
profile = dict((profiles.get(active_profile) or profiles.get("default") or {}))
return active_profile, profile
def get_build_profile_options(self, project_config=None):
project_config = self._ensure_v2_project_defaults(
self.current_project_path or (project_config or {}).get("path") or "",
project_config or self.project_config or {},
)
active_profile_name, profile = self._get_active_build_profile(project_config)
build_settings = project_config.get("build_settings", {}) or {}
profiles = dict(build_settings.get("profiles", {}) or {})
runtime_settings = dict((profile or {}).get("runtime", {}) or {})
windows_settings = dict((profile or {}).get("windows", {}) or {})
scene_entries = self._get_scene_entries(project_config)
enabled_scene_guids = self._normalize_enabled_scene_guids(runtime_settings.get("enabled_scene_guids", []) or [], scene_entries)
startup_scene_guid = str(runtime_settings.get("startup_scene_guid", project_config.get("startup_scene_guid", "")) or "")
if startup_scene_guid not in enabled_scene_guids:
startup_scene_guid = enabled_scene_guids[0] if enabled_scene_guids else ""
return {
"active_profile": active_profile_name,
"profile_names": list(profiles.keys()) or ["default"],
"output_dir": str((profile or {}).get("output_dir", "Builds") or "Builds"),
"exe_name": str(windows_settings.get("exe_name", "") or ""),
"startup_scene_guid": startup_scene_guid,
"enabled_scene_guids": enabled_scene_guids,
"scenes": scene_entries,
}
def update_active_build_profile(self, *, output_dir=None, exe_name=None, startup_scene_guid=None, enabled_scene_guids=None, active_profile_name=None):
if not self.current_project_path:
return False
project_config = self._ensure_v2_project_defaults(self.current_project_path, self.project_config or {})
current_profile_name, active_profile = self._get_active_build_profile(project_config)
build_settings = dict(project_config.get("build_settings", {}) or {})
profiles = dict(build_settings.get("profiles", {}) or {})
target_profile_name = str(active_profile_name or current_profile_name or "default").strip() or "default"
scene_entries = self._get_scene_entries(project_config)
if target_profile_name != current_profile_name:
active_profile = dict((profiles.get(target_profile_name) or profiles.get("default") or active_profile or {}))
profile = dict(active_profile or {})
profile["windows"] = dict(profile.get("windows", {}) or {})
profile["runtime"] = dict(profile.get("runtime", {}) or {})
if output_dir is not None:
profile["output_dir"] = str(output_dir or "Builds").strip() or "Builds"
if exe_name is not None:
profile["windows"]["exe_name"] = str(exe_name or "").strip()
if startup_scene_guid is not None:
startup_scene_guid = str(startup_scene_guid or "").strip()
profile["runtime"]["startup_scene_guid"] = startup_scene_guid
if startup_scene_guid:
project_config["startup_scene_guid"] = startup_scene_guid
if enabled_scene_guids is not None:
normalized_scene_guids = [str(scene_guid).strip() for scene_guid in enabled_scene_guids if str(scene_guid).strip()]
profile["runtime"]["enabled_scene_guids"] = normalized_scene_guids
build_settings["enabled_scene_guids"] = normalized_scene_guids
profile = self._normalize_build_profile(
profile,
scenes=scene_entries,
project_config=project_config,
default_enabled_scene_guids=build_settings.get("enabled_scene_guids", []) or [scene.get("guid", "") for scene in scene_entries if scene.get("guid")],
fallback_profile_name=target_profile_name,
fallback_output_dir="Builds",
fallback_output_format=build_settings.get("output_format", "directory"),
fallback_windows_enabled=build_settings.get("windows_player", True),
)
build_settings["enabled_scene_guids"] = list(profile.get("runtime", {}).get("enabled_scene_guids", []) or [])
if profile.get("runtime", {}).get("startup_scene_guid"):
project_config["startup_scene_guid"] = profile["runtime"]["startup_scene_guid"]
profiles[target_profile_name] = profile
build_settings["active_profile"] = target_profile_name
build_settings["profiles"] = profiles
project_config["build_settings"] = build_settings
self.project_config = project_config
self._write_project_config(self.current_project_path, project_config)
return True
def create_build_profile(self, profile_name):
if not self.current_project_path:
return False
profile_name = str(profile_name or "").strip()
if not profile_name:
return False
project_config = self._ensure_v2_project_defaults(self.current_project_path, self.project_config or {})
active_profile_name, active_profile = self._get_active_build_profile(project_config)
build_settings = dict(project_config.get("build_settings", {}) or {})
profiles = dict(build_settings.get("profiles", {}) or {})
if profile_name in profiles:
build_settings["active_profile"] = profile_name
project_config["build_settings"] = build_settings
self.project_config = project_config
self._write_project_config(self.current_project_path, project_config)
return True
profiles[profile_name] = copy.deepcopy(active_profile or {})
profiles[profile_name]["name"] = profile_name
build_settings["active_profile"] = profile_name
build_settings["profiles"] = profiles
project_config["build_settings"] = build_settings
self.project_config = project_config
self._write_project_config(self.current_project_path, project_config)
return True
def _ensure_project_scripts_dir(self, project_path):
scripts_dir = self.get_project_scripts_dir(project_path)
if scripts_dir:
os.makedirs(scripts_dir, exist_ok=True)
return scripts_dir
def _sync_project_script_manager(self, project_path, reload_scripts=True):
script_manager = getattr(self.world, "script_manager", None)
if not script_manager:
return ""
scripts_dir = self._ensure_project_scripts_dir(project_path)
script_manager.set_scripts_directory(
scripts_dir,
create=True,
reload_scripts=reload_scripts,
)
return scripts_dir
def _restore_project_context(self, project_path, project_config, reload_scripts=True):
normalized_project_path = os.path.normpath(project_path) if project_path else None
self.current_project_path = normalized_project_path
self.project_config = project_config
self.current_scene_guid = (project_config or {}).get("last_open_scene_guid") if project_config else None
self._asset_database = None
script_manager = getattr(self.world, "script_manager", None)
if not script_manager:
self._sync_resource_manager_root(normalized_project_path)
return
if normalized_project_path:
self._sync_project_script_manager(normalized_project_path, reload_scripts=reload_scripts)
else:
script_manager.set_scripts_directory(
"scripts",
create=True,
reload_scripts=reload_scripts,
)
self.get_asset_database(normalized_project_path, reload=True) if normalized_project_path else None
self._sync_resource_manager_root(normalized_project_path)
def _sanitize_build_name(self, project_name):
invalid_chars = '<>:"/\\|?*'
safe_name = "".join("_" if char in invalid_chars else char for char in str(project_name or "").strip())
safe_name = safe_name.rstrip(". ")
return safe_name or "EGProject"
def _write_project_config(self, project_path, project_config):
config_file = os.path.join(project_path, "project.json")
with open(config_file, "w", encoding="utf-8") as f:
json.dump(project_config, f, ensure_ascii=False, indent=4)
def _copy_if_exists(self, source_path, target_path):
if not source_path or not os.path.exists(source_path):
return
os.makedirs(os.path.dirname(target_path), exist_ok=True)
if os.path.isdir(source_path):
if os.path.exists(target_path):
shutil.rmtree(target_path)
shutil.copytree(source_path, target_path)
else:
shutil.copy2(source_path, target_path)
def _create_legacy_backup(self, project_path):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_root = os.path.join(project_path, "LegacyBackup", timestamp)
os.makedirs(backup_root, exist_ok=True)
for name in ("project.json", "models", "textures", "scenes", "Resources", "scripts"):
source_path = os.path.join(project_path, name)
if not os.path.exists(source_path):
continue
self._copy_if_exists(source_path, os.path.join(backup_root, name))
print(f"✓ 已创建旧项目备份: {backup_root}")
return backup_root
def _capture_camera_state(self):
camera = getattr(self.world, "camera", None) or getattr(self.world, "cam", None)
if not camera or camera.isEmpty():
return {}
return {
"position": list(camera.getPos()),
"rotation": list(camera.getHpr()),
"camera_control_enabled": bool(getattr(self.world, "camera_control_enabled", True)),
}
def import_asset_into_project(self, source_path, preferred_subdir=""):
asset_database = self.get_asset_database()
if not asset_database or not source_path:
return {}
return asset_database.import_asset(source_path, preferred_subdir=preferred_subdir)
def register_project_asset(self, asset_path):
asset_database = self.get_asset_database()
if not asset_database or not asset_path:
return {}
return asset_database.register_asset(asset_path, copy_into_assets=False)
def _migrate_legacy_project_if_needed(self, project_path, project_config):
project_path = normalize_path(project_path)
if int(project_config.get("schema_version", 1) or 1) >= PROJECT_SCHEMA_VERSION:
return project_config
print(f"开始迁移旧项目到 v{PROJECT_SCHEMA_VERSION}: {project_path}")
self._create_legacy_backup(project_path)
layout = ProjectLayout(project_path)
ensure_project_directories(layout)
asset_database = AssetDatabase(project_path, world=self.world)
legacy_sources = [
os.path.join(project_path, "models"),
os.path.join(project_path, "textures"),
os.path.join(project_path, "Resources"),
os.path.join(project_path, "scripts"),
]
for source_root in legacy_sources:
if not os.path.exists(source_root):
continue
for root, _, files in os.walk(source_root):
# 跳过缓存目录与无意义的中间文件
if os.path.basename(root) == "__pycache__":
continue
for file_name in files:
if file_name.endswith((".meta", ".pyc", ".pyo")):
continue
source_file = os.path.join(root, file_name)
asset_database.import_asset(source_file)
scene_guid = generate_guid()
scene_name = "Main"
scene_entry = self._build_scene_entry(scene_guid, scene_name)
scene_paths = self._scene_paths(scene_entry, project_path=project_path)
os.makedirs(os.path.dirname(scene_paths["cache_file"]), exist_ok=True)
legacy_scene_file = os.path.join(project_path, "scenes", "scene.bam")
if os.path.exists(legacy_scene_file):
shutil.copy2(legacy_scene_file, scene_paths["cache_file"])
legacy_gui = legacy_scene_file.replace(".bam", "_gui.json")
legacy_lui = legacy_scene_file.replace(".bam", "_lui.json")
if os.path.exists(legacy_gui):
shutil.copy2(legacy_gui, scene_paths["cache_gui_file"])
if os.path.exists(legacy_lui):
shutil.copy2(legacy_lui, scene_paths["cache_lui_file"])
migrated_config = self._create_default_project_config(project_path, project_config.get("name") or os.path.basename(project_path))
migrated_config["created_at"] = project_config.get("created_at") or project_config.get("created") or migrated_config["created_at"]
migrated_config["last_modified"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
migrated_config["scenes"] = [scene_entry]
migrated_config["startup_scene_guid"] = scene_guid
migrated_config["last_open_scene_guid"] = scene_guid
if os.path.exists(scene_paths["cache_file"]):
scene_description = build_scene_description_from_cache(
world=self.world,
asset_database=asset_database,
project_root=project_path,
scene_guid=scene_guid,
scene_name=scene_name,
scene_file_rel=scene_entry["path"],
cache_bam_path=scene_paths["cache_file"],
cache_gui_path=scene_paths["cache_gui_file"],
cache_lui_path=scene_paths["cache_lui_file"],
camera_state=self._capture_camera_state(),
)
save_json(os.path.join(project_path, scene_entry["path"].replace("/", os.sep)), scene_description)
self._write_project_config(project_path, migrated_config)
print(f"✓ 旧项目迁移完成: {project_path}")
return migrated_config
def _retarget_live_scene_asset_paths(self):
asset_database = self.get_asset_database()
if not asset_database or not getattr(self.world, "render", None):
return
try:
for node in self.world.render.findAllMatches("**"):
if not node.hasTag("is_model_root"):
continue
source_path = ""
for tag_name in ("model_path", "saved_model_path", "original_path"):
if node.hasTag(tag_name) and node.getTag(tag_name):
source_path = node.getTag(tag_name)
break
if not source_path:
continue
asset_record = asset_database.register_asset(source_path, copy_into_assets=False)
if not asset_record:
continue
asset_abs_path = os.path.join(self.current_project_path, asset_record["asset_path"].replace("/", os.sep))
node.setTag("model_path", asset_abs_path)
node.setTag("saved_model_path", asset_abs_path)
node.setTag("asset_guid", asset_record.get("guid", ""))
node.setTag("asset_path", asset_record.get("asset_path", ""))
for gui_node in getattr(self.world, "gui_elements", []) or []:
for tag_name, preferred_subdir in (
("image_path", "UI"),
("gui_image_path", "UI"),
("video_path", "Video"),
):
if not gui_node or gui_node.isEmpty() or not gui_node.hasTag(tag_name):
continue
raw_value = gui_node.getTag(tag_name)
if not raw_value or raw_value.startswith(("http://", "https://")):
continue
asset_record = asset_database.register_asset(raw_value, preferred_subdir=preferred_subdir, copy_into_assets=False)
if not asset_record:
continue
asset_abs_path = os.path.join(self.current_project_path, asset_record["asset_path"].replace("/", os.sep))
gui_node.setTag(tag_name, asset_abs_path)
except Exception as e:
print(f"⚠️ 同步场景资产路径失败: {e}")
def _load_scene_entry_into_editor(self, scene_entry, project_path):
scene_paths = self._scene_paths(scene_entry, project_path=project_path)
cache_file = scene_paths.get("cache_file", "")
if (not cache_file or not os.path.exists(cache_file)) and self._rebuild_scene_cache_from_description(scene_entry, project_path):
cache_file = scene_paths.get("cache_file", "")
if cache_file and os.path.exists(cache_file):
return self.world.scene_manager.loadScene(cache_file)
print(f"⚠️ 场景缓存不存在,已回退为空场景: {cache_file}")
self._clearCurrentScene()
return True
def createScene(self, scene_name):
if not self.current_project_path or not scene_name:
return False
scene_name = str(scene_name).strip()
if not scene_name:
return False
project_config = dict(self.project_config or {})
if any(scene.get("name") == scene_name for scene in self._get_scene_entries(project_config)):
print(f"错误: 场景已存在: {scene_name}")
return False
scene_guid = generate_guid()
scene_entry = self._build_scene_entry(scene_guid, scene_name)
project_config.setdefault("scenes", []).append(scene_entry)
project_config["last_open_scene_guid"] = scene_guid
project_config["scene_file"] = scene_entry["path"]
build_settings = project_config.get("build_settings", {}) or {}
enabled_scene_guids = list(build_settings.get("enabled_scene_guids", []) or [])
enabled_scene_guids.append(scene_guid)
build_settings["enabled_scene_guids"] = enabled_scene_guids
project_config["build_settings"] = build_settings
self._clearCurrentScene()
self.project_config = project_config
self.current_scene_guid = scene_guid
self._write_project_config(self.current_project_path, project_config)
return self.saveProject()
def openScene(self, scene_guid):
if not self.current_project_path or not scene_guid:
return False
project_config = dict(self.project_config or {})
scene_entry = self._get_scene_entry(scene_guid=scene_guid, project_config=project_config)
if not scene_entry:
return False
if not self._load_scene_entry_into_editor(scene_entry, self.current_project_path):
return False
self.current_scene_guid = scene_entry["guid"]
project_config["last_open_scene_guid"] = scene_entry["guid"]
project_config["scene_file"] = scene_entry["path"]
self.project_config = project_config
self._write_project_config(self.current_project_path, project_config)
return True
# ==================== 项目生命周期管理 ====================
def createNewProject(self, project_path, project_name):
"""创建新项目
Args:
project_path: 项目路径
project_name: 项目名称
Returns:
bool: 创建是否成功
"""
if not project_path or not project_name:
print("错误: 项目路径和名称不能为空")
return False
full_project_path = os.path.normpath(os.path.join(project_path, project_name))
print(f"full_project_path: {full_project_path}")
try:
if os.path.exists(full_project_path):
print("错误: 项目目录已存在")
return False
layout = ProjectLayout(full_project_path)
ensure_project_directories(layout)
project_config = self._create_default_project_config(full_project_path, project_name)
self._write_project_config(full_project_path, project_config)
print(f"项目配置文件已创建: {layout.project_file}")
self._clearCurrentScene()
self.current_project_path = full_project_path
self.project_config = project_config
self.current_scene_guid = project_config.get("startup_scene_guid")
self.get_asset_database(full_project_path, reload=True)
self._sync_project_script_manager(full_project_path, reload_scripts=True)
self._sync_resource_manager_root(full_project_path)
if not self.saveProject():
print("初始场景保存失败")
return False
print(f"项目 '{project_name}' 创建成功!")
return True
except Exception as e:
print(f"创建项目失败: {str(e)}")
return False
def openProject(self, project_path):
"""打开项目
Args:
project_path: 项目路径
Returns:
bool: 打开是否成功
"""
# print(f"\n[DEBUG] ===== 开始打开项目: {project_path} =====")
try:
if not project_path:
print("错误: 项目路径不能为空")
return False
project_path = normalize_path(project_path)
config_file = os.path.join(project_path, "project.json")
if not os.path.exists(config_file):
print("错误: 选择的不是有效的项目文件夹!")
return False
with open(config_file, "r", encoding="utf-8") as f:
project_config = json.load(f)
previous_project_path = self.current_project_path
previous_project_config = self.project_config
project_config = self._migrate_legacy_project_if_needed(project_path, project_config)
project_config = self._ensure_v2_project_defaults(project_path, project_config)
self.current_project_path = project_path
self.project_config = project_config
self.current_scene_guid = project_config.get("last_open_scene_guid") or project_config.get("startup_scene_guid")
self.get_asset_database(project_path, reload=True)
self._sync_project_script_manager(self.current_project_path, reload_scripts=True)
self._sync_resource_manager_root(project_path)
scene_entry = self._get_scene_entry(project_config=project_config)
if scene_entry:
if not self._load_scene_entry_into_editor(scene_entry, project_path):
self._restore_project_context(
previous_project_path,
previous_project_config,
reload_scripts=True,
)
return False
project_config["last_modified"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
project_config["last_open_scene_guid"] = self.current_scene_guid or project_config.get("startup_scene_guid")
current_scene_entry = self._get_scene_entry(project_config=project_config)
if current_scene_entry:
project_config["scene_file"] = current_scene_entry.get("path", project_config.get("scene_file", ""))
self._write_project_config(project_path, project_config)
self.current_project_path = project_path
self.project_config = project_config
print("项目加载成功!")
return True
except Exception as e:
self._restore_project_context(
locals().get("previous_project_path"),
locals().get("previous_project_config"),
reload_scripts=True,
)
# print(f"[DEBUG] ===== 项目打开失败 =====")
print(f"加载项目时发生错误:{str(e)}")
import traceback
traceback.print_exc()
return False
def openProjectForPath(self, project_path):
"""通过路径打开项目
Args:
project_path: 项目路径
Returns:
bool: 打开是否成功
"""
return self.openProject(project_path)
def _write_scene_description_from_cache(self, scene_entry, project_path):
scene_paths = self._scene_paths(scene_entry, project_path=project_path)
asset_database = self.get_asset_database(project_path, reload=True)
if asset_database:
asset_database.ensure_project_assets_registered()
scene_description = build_scene_description_from_cache(
world=self.world,
asset_database=asset_database,
project_root=project_path,
scene_guid=scene_entry["guid"],
scene_name=scene_entry["name"],
scene_file_rel=scene_entry["path"],
cache_bam_path=scene_paths["cache_file"],
cache_gui_path=scene_paths["cache_gui_file"],
cache_lui_path=scene_paths["cache_lui_file"],
camera_state=self._capture_camera_state(),
)
scene_file_path = os.path.join(project_path, scene_entry["path"].replace("/", os.sep))
save_json(scene_file_path, scene_description)
return scene_description
def _rebuild_scene_cache_from_description(self, scene_entry, project_path):
scene_description = self._load_scene_description(scene_entry, project_path)
if not scene_description:
return False
scene_paths = self._scene_paths(scene_entry, project_path=project_path)
cache_file = scene_paths.get("cache_file", "")
if not cache_file:
return False
asset_database = self.get_asset_database(project_path, reload=True)
if not asset_database:
return False
try:
scene_root, _ = self._build_scene_root_from_description(
scene_description,
project_path,
asset_database,
include_mode="all",
)
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
self._write_scene_root_bam(scene_root, cache_file)
scene_components = dict(scene_description.get("scene_components", {}) or {})
gui_component = dict(scene_components.get("gui", {}) or {})
lui_component = dict(scene_components.get("lui", {}) or {})
save_json(scene_paths["cache_gui_file"], gui_component.get("elements", []) or scene_description.get("gui", []) or [])
save_json(scene_paths["cache_lui_file"], lui_component or scene_description.get("lui", {}) or {})
print(f"✓ 已从 egscene 重建场景缓存: {cache_file}")
return True
except Exception as e:
print(f"⚠️ 从 egscene 重建场景缓存失败: {e}")
import traceback
traceback.print_exc()
return False
def _should_keep_scene_description_node(self, node_id, node, node_lookup):
if not node_id:
return False
if node.get("asset_guid"):
return True
components = node.get("components", {}) or {}
if components.get("light"):
return True
if node.get("scripts"):
return True
tags = node.get("tags", {}) or {}
if any(key in tags for key in ("is_scene_element", "runtime_interactive", "element_type", "has_scripts")):
return True
parent_id = node.get("parent_id")
while parent_id:
parent_node = node_lookup.get(parent_id)
if not parent_node:
break
if parent_node.get("asset_guid"):
return False
parent_id = parent_node.get("parent_id")
return True
def _resolve_scene_description_parent(self, node, keep_nodes, node_lookup):
parent_id = node.get("parent_id")
while parent_id:
if parent_id in keep_nodes:
return parent_id
parent_node = node_lookup.get(parent_id)
parent_id = parent_node.get("parent_id") if parent_node else None
return None
def _instantiate_scene_description_node(self, *, node, parent_np, project_path, asset_database):
node_name = str(node.get("name", "") or "Node")
tags = dict(node.get("tags", {}) or {})
components = dict(node.get("components", {}) or {})
model_component = dict(components.get("model", {}) or {})
scripts_component = dict(components.get("scripts", {}) or {})
metadata_component = dict(components.get("metadata", {}) or {})
asset_guid = str(model_component.get("asset_guid", "") or node.get("asset_guid", "") or "")
imported_node_key = str(model_component.get("imported_node_key", "") or node.get("imported_node_key", "") or "").strip()
if asset_guid:
loaded_np = self._load_asset_node_for_scene_description(
asset_guid,
project_path,
asset_database,
imported_node_key=imported_node_key,
node_name=node_name,
)
if not loaded_np or loaded_np.isEmpty():
rebuilt_np = parent_np.attachNewNode(node_name)
else:
rebuilt_np = loaded_np
rebuilt_np.reparentTo(parent_np)
rebuilt_np.setName(node_name)
else:
rebuilt_np = parent_np.attachNewNode(node_name)
transform = node.get("transform", {}) or {}
position = list(transform.get("position", [0, 0, 0]) or [0, 0, 0])
rotation = list(transform.get("rotation", [0, 0, 0]) or [0, 0, 0])
scale = list(transform.get("scale", [1, 1, 1]) or [1, 1, 1])
if len(position) >= 3:
rebuilt_np.setPos(float(position[0]), float(position[1]), float(position[2]))
if len(rotation) >= 3:
rebuilt_np.setHpr(float(rotation[0]), float(rotation[1]), float(rotation[2]))
if len(scale) >= 3:
rebuilt_np.setScale(float(scale[0]), float(scale[1]), float(scale[2]))
visibility = node.get("visibility", {}) or {}
user_visible = bool(visibility.get("user_visible", True))
rebuilt_np.setPythonTag("user_visible", user_visible)
rebuilt_np.setTag("user_visible", "true" if user_visible else "false")
if user_visible:
rebuilt_np.show()
else:
rebuilt_np.hide()
for tag_name, tag_value in tags.items():
if tag_value is None:
continue
rebuilt_np.setTag(str(tag_name), str(tag_value))
runtime_interactive = bool(metadata_component.get("runtime_interactive", node.get("runtime_interactive", False)))
if runtime_interactive:
rebuilt_np.setTag("runtime_interactive", "true")
if asset_guid:
rebuilt_np.setTag("asset_guid", asset_guid)
asset_path = str(model_component.get("asset_path", "") or node.get("asset_path", "") or "")
if asset_path:
asset_abs_path = os.path.join(project_path, asset_path.replace("/", os.sep))
rebuilt_np.setTag("asset_path", asset_path)
rebuilt_np.setTag("model_path", asset_abs_path)
rebuilt_np.setTag("saved_model_path", asset_abs_path)
if imported_node_key:
rebuilt_np.setTag("imported_node_key", imported_node_key)
scripts = list(scripts_component.get("entries", []) or node.get("scripts", []) or [])
if scripts:
rebuilt_np.setTag("has_scripts", "true")
rebuilt_np.setTag("scripts_info", json.dumps(scripts, ensure_ascii=False))
light_component = components.get("light", {}) or {}
if light_component.get("type"):
rebuilt_np.setTag("light_type", str(light_component.get("type")))
for input_name, tag_name in (
("energy", "light_energy"),
("radius", "light_radius"),
("fov", "light_fov"),
("stored_energy", "stored_energy"),
):
if input_name in light_component:
rebuilt_np.setTag(tag_name, str(light_component[input_name]))
return rebuilt_np
def _clone_imported_subnode(self, loaded_np, imported_node_key, node_name):
imported_node_key = str(imported_node_key or "").strip().strip("/")
if not imported_node_key:
return loaded_np
target_np = loaded_np
try:
for part in imported_node_key.split("/"):
if part == "":
continue
child_index = int(part)
children = list(target_np.getChildren())
if child_index < 0 or child_index >= len(children):
return loaded_np
target_np = children[child_index]
except Exception:
return loaded_np
try:
clone_root = NodePath(ModelRoot(node_name or target_np.getName() or "ImportedNode"))
cloned_child = target_np.copyTo(clone_root)
cloned_child.wrtReparentTo(clone_root)
cloned_child.setName(node_name or target_np.getName())
return cloned_child
except Exception:
return target_np
def _load_imported_hierarchy_index(self, asset_record, project_path):
imported_cache = asset_record.get("imported_cache", {}) or {}
hierarchy_rel = imported_cache.get("hierarchy", "")
if not hierarchy_rel:
return {}
hierarchy_path = os.path.join(project_path, hierarchy_rel.replace("/", os.sep))
if not os.path.exists(hierarchy_path):
return {}
try:
with open(hierarchy_path, "r", encoding="utf-8") as f:
payload = json.load(f)
index = {}
for item in payload if isinstance(payload, list) else []:
key = str((item or {}).get("key", "") or "").strip()
if key:
index[key] = dict(item or {})
return index
except Exception as e:
print(f"⚠️ 读取导入层级缓存失败: {hierarchy_path} ({e})")
return {}
def _find_subnode_by_name(self, loaded_np, expected_name):
expected_name = str(expected_name or "").strip()
if not expected_name:
return None
for candidate in loaded_np.findAllMatches("**"):
try:
if candidate.getName() == expected_name:
return candidate
except Exception:
continue
return None
def _load_asset_node_for_scene_description(self, asset_guid, project_path, asset_database, imported_node_key="", node_name=""):
asset_record = asset_database.get_asset(asset_guid)
if not asset_record:
return None
imported_cache = asset_record.get("imported_cache", {}) or {}
model_bam_rel = imported_cache.get("model_bam", "")
source_path_rel = asset_record.get("asset_path", "")
hierarchy_index = self._load_imported_hierarchy_index(asset_record, project_path)
candidate_paths = []
if model_bam_rel:
candidate_paths.append(os.path.join(project_path, model_bam_rel.replace("/", os.sep)))
if source_path_rel:
candidate_paths.append(os.path.join(project_path, source_path_rel.replace("/", os.sep)))
for candidate_path in candidate_paths:
if not candidate_path or not os.path.exists(candidate_path):
continue
try:
loaded_np = self.world.loader.loadModel(Filename.fromOsSpecific(candidate_path))
if loaded_np and not loaded_np.isEmpty():
if imported_node_key and candidate_path.endswith(".bam"):
expected_item = hierarchy_index.get(imported_node_key, {}) if hierarchy_index else {}
expected_name = str(expected_item.get("name", "") or node_name or "").strip()
cloned_np = self._clone_imported_subnode(loaded_np, imported_node_key, node_name)
cloned_name = ""
try:
cloned_name = cloned_np.getName()
except Exception:
cloned_name = ""
if expected_name and cloned_name and cloned_name != expected_name and cloned_name != node_name:
fallback_np = self._find_subnode_by_name(loaded_np, expected_name)
if fallback_np is not None and not fallback_np.isEmpty():
try:
clone_root = NodePath(ModelRoot(node_name or expected_name or "ImportedNode"))
fixed_np = fallback_np.copyTo(clone_root)
fixed_np.wrtReparentTo(clone_root)
fixed_np.setName(node_name or expected_name)
return fixed_np
except Exception:
pass
return cloned_np
return loaded_np
except Exception as e:
print(f"⚠️ 加载场景描述资产失败 {candidate_path}: {e}")
return None
def _is_scene_description_node_interactive(self, node):
if not isinstance(node, dict):
return False
if bool(node.get("runtime_interactive", False)):
return True
scripts = list(node.get("scripts", []) or [])
if scripts:
return True
components = dict(node.get("components", {}) or {})
scripts_component = dict(components.get("scripts", {}) or {})
if list(scripts_component.get("entries", []) or []):
return True
tags = dict(node.get("tags", {}) or {})
if str(tags.get("runtime_interactive", "")).lower() == "true":
return True
if str(tags.get("has_scripts", "")).lower() == "true":
return True
return False
def _build_scene_root_from_description(self, scene_description, project_path, asset_database, include_mode="all"):
nodes = list(scene_description.get("nodes", []) or [])
node_lookup = {
str(node.get("node_id", "") or ""): dict(node)
for node in nodes
if node.get("node_id") is not None
}
keep_nodes = {}
for node_id, node in node_lookup.items():
if not self._should_keep_scene_description_node(node_id, node, node_lookup):
continue
is_interactive = self._is_scene_description_node_interactive(node)
if include_mode == "interactive" and not is_interactive:
continue
if include_mode == "static" and is_interactive:
continue
keep_nodes[node_id] = node
scene_root = NodePath(ModelRoot("SceneRoot"))
built_nodes = {}
for node_id, node in keep_nodes.items():
parent_id = self._resolve_scene_description_parent(node, keep_nodes, node_lookup)
parent_np = built_nodes.get(parent_id, scene_root)
rebuilt_np = self._instantiate_scene_description_node(
node=node,
parent_np=parent_np,
project_path=project_path,
asset_database=asset_database,
)
if rebuilt_np:
built_nodes[node_id] = rebuilt_np
return scene_root, keep_nodes
def _write_scene_root_bam(self, scene_root, output_file):
os.makedirs(os.path.dirname(output_file), exist_ok=True)
scene_root.writeBamFile(Filename.fromOsSpecific(output_file))
scene_root.removeNode()
def _optimize_static_scene_root(self, scene_root):
if scene_root is None or scene_root.isEmpty():
return
try:
for node_path in scene_root.findAllMatches("**"):
try:
if node_path.hasTag("runtime_interactive"):
node_path.clearTag("runtime_interactive")
if node_path.hasTag("has_scripts"):
node_path.clearTag("has_scripts")
if node_path.hasTag("scripts_info"):
node_path.clearTag("scripts_info")
except Exception:
continue
scene_root.clearModelNodes()
scene_root.flattenStrong()
print("✓ 静态场景已执行构建期合并优化")
except Exception as e:
print(f"⚠️ 静态场景构建期优化失败,继续使用未优化结果: {e}")
def _write_scene_partition_cache(self, scene_description, project_path, asset_database, build_cache_dir, data_scene_dir):
partition_results = {}
for include_mode, file_name in (("interactive", "interactive_scene.bam"), ("static", "static_scene.bam")):
scene_root, keep_nodes = self._build_scene_root_from_description(
scene_description,
project_path,
asset_database,
include_mode=include_mode,
)
if include_mode == "static":
self._optimize_static_scene_root(scene_root)
output_file = os.path.join(build_cache_dir, file_name)
staging_file = os.path.join(data_scene_dir, file_name)
self._write_scene_root_bam(scene_root, output_file)
shutil.copy2(output_file, staging_file)
partition_results[include_mode] = {
"node_ids": sorted(keep_nodes.keys()),
"relative_path": relative_project_path(os.path.dirname(data_scene_dir), staging_file),
"build_cache_file": relative_project_path(project_path, output_file),
}
return partition_results
def saveProject(self):
"""保存项目"""
try:
if not self.current_project_path:
print("错误: 请先创建或打开一个项目!")
return False
project_path = self.current_project_path
project_config = dict(self.project_config or {})
scene_entry = self._get_scene_entry(project_config=project_config)
if not scene_entry:
print("错误: 当前项目没有可保存的场景")
return False
scene_paths = self._scene_paths(scene_entry, project_path=project_path)
cache_scene_file = scene_paths.get("cache_file", "")
if not cache_scene_file:
print("错误: 场景缓存路径无效")
return False
self._retarget_live_scene_asset_paths()
if self._save_scene_atomically(cache_scene_file, project_path):
self._write_scene_description_from_cache(scene_entry, project_path)
project_config["last_modified"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
project_config["last_open_scene_guid"] = scene_entry["guid"]
project_config["scene_file"] = scene_entry.get("path", project_config.get("scene_file", ""))
build_settings = project_config.get("build_settings", {}) or {}
enabled_scene_guids = list(build_settings.get("enabled_scene_guids", []) or [])
if scene_entry["guid"] not in enabled_scene_guids:
enabled_scene_guids.append(scene_entry["guid"])
build_settings["enabled_scene_guids"] = enabled_scene_guids
project_config["build_settings"] = build_settings
self._write_project_config(project_path, project_config)
self.project_config = project_config
self.current_scene_guid = scene_entry["guid"]
print("项目保存成功!")
return True
print("错误: 保存场景失败!")
return False
except Exception as e:
print(f"保存项目时发生错误:{str(e)}")
return False
def _save_scene_atomically(self, scene_file, project_path):
"""先保存到临时文件,成功后再原子替换正式场景文件。"""
scene_file = os.path.normpath(scene_file)
scene_dir = os.path.dirname(scene_file)
scene_name, scene_ext = os.path.splitext(os.path.basename(scene_file))
temp_scene_file = os.path.join(scene_dir, f"{scene_name}.tmp{scene_ext}")
final_gui_info_file = scene_file.replace('.bam', '_gui.json')
temp_gui_info_file = temp_scene_file.replace('.bam', '_gui.json')
final_lui_info_file = scene_file.replace('.bam', '_lui.json')
temp_lui_info_file = temp_scene_file.replace('.bam', '_lui.json')
for temp_path in (temp_scene_file, temp_gui_info_file, temp_lui_info_file):
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except OSError:
pass
try:
if not self.world.scene_manager.saveScene(temp_scene_file, project_path):
return False
os.replace(temp_scene_file, scene_file)
if os.path.exists(temp_gui_info_file):
os.replace(temp_gui_info_file, final_gui_info_file)
if os.path.exists(temp_lui_info_file):
os.replace(temp_lui_info_file, final_lui_info_file)
return True
except Exception as e:
print(f"原子保存场景失败: {e}")
return False
finally:
for temp_path in (temp_scene_file, temp_gui_info_file, temp_lui_info_file):
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except OSError:
pass
# ==================== 项目打包功能 ====================
def _load_scene_description(self, scene_entry, project_path):
scene_file_path = os.path.join(project_path, scene_entry["path"].replace("/", os.sep))
return normalize_scene_description(load_json(scene_file_path, {}))
def _resolve_enabled_scene_entries(self, project_config):
_, active_profile = self._get_active_build_profile(project_config)
runtime_settings = dict((active_profile or {}).get("runtime", {}) or {})
enabled_guids = set((runtime_settings.get("enabled_scene_guids", []) or (project_config.get("build_settings") or {}).get("enabled_scene_guids", []) or []))
scene_entries = self._get_scene_entries(project_config)
if not enabled_guids:
return scene_entries
return [scene for scene in scene_entries if scene.get("guid") in enabled_guids]
def _copy_asset_record_to_runtime(self, asset_record, data_root, project_path):
if not asset_record:
return
asset_guid = asset_record.get("guid", "")
if not asset_guid:
return
source_root = os.path.join(project_path, asset_record.get("asset_path", "").replace("/", os.sep))
target_root = os.path.join(data_root, "assets", asset_guid)
os.makedirs(target_root, exist_ok=True)
if os.path.exists(source_root) and os.path.isfile(source_root):
shutil.copy2(source_root, os.path.join(target_root, os.path.basename(source_root)))
imported_cache = asset_record.get("imported_cache", {}) or {}
imported_root_rel = imported_cache.get("root", "")
if imported_root_rel:
imported_root = os.path.join(project_path, imported_root_rel.replace("/", os.sep))
if os.path.exists(imported_root):
runtime_imported_root = os.path.join(target_root, "imported")
if os.path.exists(runtime_imported_root):
shutil.rmtree(runtime_imported_root)
shutil.copytree(imported_root, runtime_imported_root)
def _stage_runtime_data(self, project_path, project_config, data_root):
layout = self.get_project_layout(project_path)
asset_database = self.get_asset_database(project_path, reload=True)
asset_database.ensure_project_assets_registered()
assets_runtime_root = os.path.join(data_root, "Assets")
if os.path.exists(layout.assets_root):
for root, dirs, files in os.walk(layout.assets_root):
rel_root = os.path.relpath(root, layout.assets_root)
normalized_rel_root = "" if rel_root == "." else rel_root
dirs[:] = [name for name in dirs if name != "Scripts"]
target_root = assets_runtime_root if not normalized_rel_root else os.path.join(assets_runtime_root, normalized_rel_root)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
if file_name.endswith(".meta"):
continue
shutil.copy2(os.path.join(root, file_name), os.path.join(target_root, file_name))
runtime_scenes = []
referenced_asset_guids = set()
cook_manifests = []
def _rewrite_gui_media_paths(scene_description):
gui_items = list(scene_description.get("gui", []) or [])
for gui_info in gui_items:
for key_name, preferred_subdir in (
("image_path", "UI"),
("bg_image_path", "UI"),
("video_path", "Video"),
):
raw_value = str(gui_info.get(key_name, "") or "").strip()
if not raw_value or raw_value.startswith(("http://", "https://")):
continue
resolved_path = raw_value
if not os.path.isabs(resolved_path):
resolved_path = os.path.join(project_path, raw_value.replace("/", os.sep))
if not os.path.exists(resolved_path):
continue
asset_record = asset_database.register_asset(resolved_path, preferred_subdir=preferred_subdir, copy_into_assets=False)
if asset_record:
referenced_asset_guids.add(asset_record.get("guid", ""))
runtime_file = f"assets/{asset_record['guid']}/{os.path.basename(resolved_path)}"
gui_info[key_name] = runtime_file.replace("\\", "/")
scene_description["gui"] = gui_items
return scene_description
for scene_entry in self._resolve_enabled_scene_entries(project_config):
scene_description = self._load_scene_description(scene_entry, project_path)
if not scene_description:
continue
scene_description = _rewrite_gui_media_paths(dict(scene_description))
build_cache_dir = layout.build_cache_dir(scene_entry["guid"])
if os.path.exists(build_cache_dir):
shutil.rmtree(build_cache_dir)
os.makedirs(build_cache_dir, exist_ok=True)
cooked_scene_dir = os.path.join(data_root, "scene_cache", scene_entry["guid"])
os.makedirs(cooked_scene_dir, exist_ok=True)
source_cache_file = os.path.join(project_path, scene_entry["cache_path"].replace("/", os.sep))
cooked_scene_file = os.path.join(cooked_scene_dir, "scene.bam")
if os.path.exists(source_cache_file):
shutil.copy2(source_cache_file, cooked_scene_file)
shutil.copy2(source_cache_file, os.path.join(build_cache_dir, "scene.bam"))
partition_info = self._write_scene_partition_cache(
scene_description,
project_path,
asset_database,
build_cache_dir,
cooked_scene_dir,
)
runtime_scene = build_runtime_scene(
scene_description,
cooked_scene_relpath=relative_project_path(data_root, cooked_scene_file),
)
runtime_scene["cook"]["interactive_scene"] = partition_info.get("interactive", {}).get("relative_path", "")
runtime_scene["cook"]["static_scene"] = partition_info.get("static", {}).get("relative_path", "")
cook_manifest = build_scene_cook_manifest(
scene_description,
runtime_scene,
cooked_scene_relpath=relative_project_path(data_root, cooked_scene_file),
)
cook_manifest["interactive_scene"] = partition_info.get("interactive", {}).get("relative_path", "")
cook_manifest["static_scene"] = partition_info.get("static", {}).get("relative_path", "")
cook_manifest["build_cache"] = {
"interactive_scene": partition_info.get("interactive", {}).get("build_cache_file", ""),
"static_scene": partition_info.get("static", {}).get("build_cache_file", ""),
"full_scene": relative_project_path(project_path, os.path.join(build_cache_dir, "scene.bam")),
}
runtime_scenes.append(runtime_scene)
cook_manifests.append(cook_manifest)
save_json(
os.path.join(data_root, "scenes", f"{scene_entry['guid']}.runtime.json"),
runtime_scene,
)
save_json(
os.path.join(data_root, "scene_cache", scene_entry["guid"], "cook_manifest.json"),
cook_manifest,
)
save_json(
os.path.join(build_cache_dir, "runtime_scene.json"),
runtime_scene,
)
save_json(
os.path.join(build_cache_dir, "scene_description.json"),
scene_description,
)
save_json(
os.path.join(build_cache_dir, "cook_manifest.json"),
cook_manifest,
)
referenced_asset_guids.update(scene_description.get("referenced_asset_guids", []) or [])
referenced_asset_guids.update(scene_description.get("referenced_script_guids", []) or [])
asset_records = []
for asset_guid in sorted(guid for guid in referenced_asset_guids if guid):
asset_record = asset_database.get_asset(asset_guid)
if not asset_record:
continue
asset_records.append(asset_record)
self._copy_asset_record_to_runtime(asset_record, data_root, project_path)
manifest = build_runtime_manifest(project_config, runtime_scenes, asset_records)
save_json(os.path.join(data_root, "manifest.json"), manifest)
save_json(
os.path.join(data_root, "cook_manifest.json"),
{
"schema_version": 1,
"project_name": project_config.get("name", "EGProject"),
"scene_guids": [scene.get("scene_guid", "") for scene in runtime_scenes if scene.get("scene_guid")],
"scenes": cook_manifests,
},
)
cook_summary = dict(manifest.get("cook_summary", {}) or {})
print(
"✓ Cook 构建统计: "
f"场景 {cook_summary.get('scene_count', 0)} 个, "
f"资源 {cook_summary.get('asset_count', 0)} 个, "
f"交互节点 {cook_summary.get('interactive_node_count', 0)} 个, "
f"静态节点 {cook_summary.get('static_node_count', 0)}"
)
return manifest
def buildPackage(self, build_dir):
"""将当前项目打包为最终运行程序。"""
try:
if not self.current_project_path:
print("错误: 请先创建或打开一个项目!")
return False
project_path = normalize_path(self.current_project_path)
self.project_config = self._ensure_v2_project_defaults(project_path, self.project_config or {})
_, active_profile = self._get_active_build_profile(self.project_config)
profile_runtime = dict((active_profile or {}).get("runtime", {}) or {})
profile_windows = dict((active_profile or {}).get("windows", {}) or {})
configured_startup_scene = str(profile_runtime.get("startup_scene_guid", "") or "").strip()
if configured_startup_scene:
self.project_config["startup_scene_guid"] = configured_startup_scene
project_display_name = (
(self.project_config or {}).get("name")
or os.path.basename(project_path)
or "EGProject"
)
configured_exe_name = str(profile_windows.get("exe_name", "") or "").strip()
project_name = self._sanitize_build_name(configured_exe_name or project_display_name)
if not build_dir:
print("错误: 请指定打包输出目录!")
return False
if hasattr(self.world, "selection") and self.world.selection:
self.world.selection.clearSelection()
print("已取消场景中的物体选中状态")
self._sync_project_script_manager(project_path, reload_scripts=True)
if not self.saveProject():
print("错误: 保存场景失败!")
return False
output_root = normalize_path(build_dir)
staging_root = os.path.join(output_root, f".{project_name}_staging")
source_root = os.path.join(staging_root, "source")
data_root = os.path.join(staging_root, "data")
dist_dir = os.path.join(output_root, project_name)
if os.path.exists(staging_root):
shutil.rmtree(staging_root)
os.makedirs(source_root, exist_ok=True)
os.makedirs(data_root, exist_ok=True)
if os.path.exists(dist_dir):
shutil.rmtree(dist_dir)
manifest = self._stage_runtime_data(project_path, self.project_config or {}, data_root)
if not manifest.get("scenes"):
print("错误: 当前项目没有可打包的场景")
return False
self._copyScriptsToBuild(data_root, project_path)
self._copyRenderPipelineRuntime(os.path.join(data_root, "pipeline"))
runtime_entry = self._createProjectRuntimeEntry(source_root, project_display_name)
if not runtime_entry:
print("错误: 生成项目运行时入口失败")
return False
success = self._executeProjectBuild(
source_root=source_root,
data_root=data_root,
build_root=output_root,
project_name=project_name,
)
if not success:
return False
if os.path.exists(staging_root):
shutil.rmtree(staging_root, ignore_errors=True)
exe_path = os.path.join(dist_dir, f"{project_name}.exe")
print(f"打包完成!项目运行程序: {exe_path}")
build_result = {
"success": True,
"project_name": project_name,
"output_dir": dist_dir,
"exe_path": exe_path,
"manifest": manifest,
"cook_summary": dict(manifest.get("cook_summary", {}) or {}),
}
build_result["validation"] = self._validate_build_output(dist_dir, build_result)
report_path = self._write_build_report(dist_dir, build_result)
if report_path:
build_result["report_path"] = report_path
print(f"✓ 构建报告已生成: {report_path}")
summary_path = self._write_build_summary(dist_dir, build_result)
if summary_path:
build_result["summary_path"] = summary_path
print(f"✓ 构建摘要已生成: {summary_path}")
return build_result
except Exception as e:
print(f"打包过程出错:{str(e)}")
return False
def _merge_folder_contents(self, source_folder, destination_folder):
"""将 source_folder 的内容合并到 destination_folder。"""
if not source_folder or not os.path.exists(source_folder):
return False
os.makedirs(destination_folder, exist_ok=True)
for root, _, files in os.walk(source_folder):
rel_root = os.path.relpath(root, source_folder)
target_root = destination_folder if rel_root == "." else os.path.join(destination_folder, rel_root)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
source_file = os.path.join(root, file_name)
target_file = os.path.join(target_root, file_name)
shutil.copy2(source_file, target_file)
return True
def _collect_output_file_stats(self, root_path):
stats = {
"file_count": 0,
"dir_count": 0,
"total_size_bytes": 0,
}
if not root_path or not os.path.exists(root_path):
return stats
for root, dirs, files in os.walk(root_path):
stats["dir_count"] += len(dirs)
stats["file_count"] += len(files)
for file_name in files:
file_path = os.path.join(root, file_name)
try:
stats["total_size_bytes"] += os.path.getsize(file_path)
except OSError:
continue
return stats
def _validate_build_output(self, output_dir, build_result):
validation = {
"ok": True,
"checks": [],
}
if not output_dir or not isinstance(build_result, dict):
validation["ok"] = False
validation["checks"].append({"name": "build_result", "ok": False, "detail": "missing output_dir or build_result"})
return validation
def add_check(name, path_value, required=True):
exists = bool(path_value) and os.path.exists(path_value)
validation["checks"].append(
{
"name": name,
"path": path_value,
"ok": exists if required else True,
"required": required,
}
)
if required and not exists:
validation["ok"] = False
exe_path = str(build_result.get("exe_path", "") or "")
add_check("exe", exe_path)
data_root = os.path.join(output_dir, "_internal", "data")
if not os.path.exists(data_root):
data_root = os.path.join(output_dir, "data")
add_check("data_root", data_root)
manifest_path = os.path.join(data_root, "manifest.json") if data_root else ""
add_check("manifest", manifest_path)
manifest = dict(build_result.get("manifest", {}) or {})
scenes = list(manifest.get("scenes", []) or [])
for scene_entry in scenes:
runtime_rel = str(scene_entry.get("runtime_path", "") or "")
runtime_abs = os.path.join(data_root, runtime_rel.replace("/", os.sep)) if runtime_rel else ""
scene_guid = str(scene_entry.get("guid", "") or "")
add_check(f"runtime_scene:{scene_guid}", runtime_abs)
runtime_payload = load_json(runtime_abs, {})
cook_info = dict((runtime_payload or {}).get("cook", {}) or {})
static_rel = str(cook_info.get("static_scene", "") or "")
interactive_rel = str(cook_info.get("interactive_scene", "") or "")
if static_rel:
add_check(f"static_scene:{scene_guid}", os.path.join(data_root, static_rel.replace("/", os.sep)))
if interactive_rel:
add_check(f"interactive_scene:{scene_guid}", os.path.join(data_root, interactive_rel.replace("/", os.sep)))
add_check("scripts_dir", os.path.join(data_root, "scripts"))
add_check("pipeline_dir", os.path.join(data_root, "pipeline"))
return validation
def _write_build_report(self, output_dir, build_result):
if not output_dir or not isinstance(build_result, dict):
return ""
report_payload = {
"schema_version": 1,
"generated_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"project_name": build_result.get("project_name", ""),
"exe_path": build_result.get("exe_path", ""),
"output_dir": build_result.get("output_dir", ""),
"cook_summary": dict(build_result.get("cook_summary", {}) or {}),
"output_stats": self._collect_output_file_stats(output_dir),
"validation": self._validate_build_output(output_dir, build_result),
}
report_path = os.path.join(output_dir, "build_report.json")
save_json(report_path, report_payload)
return report_path
def _write_build_summary(self, output_dir, build_result):
if not output_dir or not isinstance(build_result, dict):
return ""
cook_summary = dict(build_result.get("cook_summary", {}) or {})
validation = dict(build_result.get("validation", {}) or {})
lines = [
f"Project: {build_result.get('project_name', '')}",
f"Output: {build_result.get('output_dir', '')}",
f"EXE: {build_result.get('exe_path', '')}",
f"Validation: {'OK' if validation.get('ok', False) else 'FAILED'}",
f"Scenes: {cook_summary.get('scene_count', 0)}",
f"Assets: {cook_summary.get('asset_count', 0)}",
f"Interactive Nodes: {cook_summary.get('interactive_node_count', 0)}",
f"Static Nodes: {cook_summary.get('static_node_count', 0)}",
f"Interactive Models: {cook_summary.get('interactive_model_count', 0)}",
f"Static Models: {cook_summary.get('static_model_count', 0)}",
"",
"Checks:",
]
for check in list(validation.get("checks", []) or []):
status = "OK" if check.get("ok") else "MISSING"
lines.append(f"- {status}: {check.get('name', '')} -> {check.get('path', '')}")
summary_path = os.path.join(output_dir, "build_summary.txt")
with open(summary_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
return summary_path
def _load_runtime_template(self):
template_path = os.path.join(self._get_repo_root(), "templates", "main_template.py")
with open(template_path, "r", encoding="utf-8") as f:
return f.read()
def _createProjectRuntimeEntry(self, source_root, project_name):
try:
template_content = self._load_runtime_template()
safe_project_name = project_name.replace("\\", "\\\\").replace('"', '\\"')
runtime_source = template_content.replace("__EG_PROJECT_NAME__", safe_project_name)
runtime_entry = os.path.join(source_root, "main.py")
with open(runtime_entry, "w", encoding="utf-8") as f:
f.write(runtime_source)
return runtime_entry
except Exception as e:
print(f"创建项目运行时入口失败: {e}")
return ""
def _executeProjectBuild(self, source_root, data_root, build_root, project_name):
repo_root = self._get_repo_root()
work_root = os.path.join(build_root, f".{project_name}_work")
spec_root = os.path.join(build_root, f".{project_name}_spec")
for directory in (work_root, spec_root):
if os.path.exists(directory):
shutil.rmtree(directory)
os.makedirs(directory, exist_ok=True)
pyinstaller_runner = None
pyinstaller_candidates = [
[sys.executable, "-m", "PyInstaller"],
[sys.executable, "-m", "pyinstaller"],
["pyinstaller"],
]
for candidate in pyinstaller_candidates:
pyinstaller_probe = subprocess.run(
[*candidate, "--version"],
capture_output=True,
text=True,
check=False,
)
if pyinstaller_probe.returncode == 0:
pyinstaller_runner = candidate
break
if not pyinstaller_runner:
print("错误: 当前环境未安装 PyInstaller无法执行项目打包。")
return False
sep = ";" if os.name == "nt" else ":"
command = [
*pyinstaller_runner,
"--noconfirm",
"--clean",
"--windowed",
"--name",
project_name,
"--distpath",
build_root,
"--workpath",
work_root,
"--specpath",
spec_root,
"--paths",
source_root,
"--paths",
repo_root,
"--paths",
os.path.join(repo_root, "RenderPipelineFile"),
"--exclude-module",
"imgui_bundle",
"--exclude-module",
"p3dimgui",
"--hidden-import",
"core",
"--hidden-import",
"core.script_system",
"--hidden-import",
"core.CustomMouseController",
"--hidden-import",
"ssbo_component.runtime_importer",
"--hidden-import",
"ssbo_component.ssbo_controller",
"--hidden-import",
"rpcore",
"--collect-submodules",
"rpcore",
"--collect-submodules",
"rpplugins",
"--collect-submodules",
"rplibs",
]
panda_binary_dir = self._get_panda3d_binary_dir()
panda_display_binaries = [
"libpandagl.dll",
"libp3windisplay.dll",
"libpandadx9.dll",
"libp3tinydisplay.dll",
"cgGL.dll",
"cgD3D9.dll",
"d3dx9_43.dll",
]
if panda_binary_dir:
for binary_name in panda_display_binaries:
binary_path = os.path.join(panda_binary_dir, binary_name)
if os.path.exists(binary_path):
command.extend(["--add-binary", f"{binary_path}{sep}."])
if os.path.exists(data_root):
command.extend(["--add-data", f"{data_root}{sep}data"])
command.append(os.path.join(source_root, "main.py"))
result = subprocess.run(
command,
cwd=source_root,
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
print("项目打包失败。")
output = (result.stdout or "") + "\n" + (result.stderr or "")
print(output[-4000:])
return False
for directory in (work_root, spec_root):
if os.path.exists(directory):
shutil.rmtree(directory, ignore_errors=True)
print(f"✓ 项目已打包到: {os.path.join(build_root, project_name)}")
return True
def _get_panda3d_binary_dir(self):
try:
panda3d_spec = importlib.util.find_spec("panda3d.core")
if panda3d_spec and panda3d_spec.origin:
panda_package_dir = os.path.dirname(os.path.dirname(os.path.abspath(panda3d_spec.origin)))
candidate_dirs = [
os.path.join(panda_package_dir, "bin"),
os.path.join(os.path.dirname(panda_package_dir), "bin"),
]
for candidate_dir in candidate_dirs:
if os.path.isdir(candidate_dir):
return candidate_dir
except Exception as e:
print(f"⚠️ 获取 Panda3D 二进制目录失败: {e}")
return ""
def _copyScriptsToBuild(self, build_dir, project_path):
"""将项目脚本编译后复制到构建目录,避免直接暴露源码。"""
try:
scripts_dest = os.path.join(build_dir, "scripts")
scripts_src = self.get_project_scripts_dir(project_path)
if os.path.exists(scripts_dest):
shutil.rmtree(scripts_dest)
os.makedirs(scripts_dest, exist_ok=True)
if os.path.exists(scripts_src):
compiled_count = 0
fallback_count = 0
for root, _, files in os.walk(scripts_src):
rel_root = os.path.relpath(root, scripts_src)
target_root = scripts_dest if rel_root == "." else os.path.join(scripts_dest, rel_root)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
if file_name.startswith("__"):
continue
source_file = os.path.join(root, file_name)
if file_name.endswith(".py"):
compiled_file = os.path.join(
target_root,
f"{os.path.splitext(file_name)[0]}.pyc",
)
try:
py_compile.compile(source_file, cfile=compiled_file, doraise=True)
compiled_count += 1
except py_compile.PyCompileError as e:
fallback_file = os.path.join(target_root, file_name)
shutil.copy2(source_file, fallback_file)
fallback_count += 1
print(f"⚠️ 编译脚本失败,已回退复制源码: {source_file} -> {fallback_file} ({e})")
elif not file_name.endswith((".pyc", ".pyo", ".log")):
shutil.copy2(source_file, os.path.join(target_root, file_name))
print(f"✓ Scripts已导出到 build/scripts编译 {compiled_count} 个脚本")
if fallback_count:
print(f"⚠️ {fallback_count} 个脚本因编译失败保留为 .py 源码")
else:
print("⚠️ 项目中没有 Assets/Scripts 目录")
except Exception as e:
print(f"⚠️ 复制脚本文件时出错: {str(e)}")
def _copyRenderPipelineRuntime(self, build_dir):
"""复制运行时需要的 RenderPipeline 资源,并将源码编译为 pyc。"""
try:
source_root = os.path.join(self._get_repo_root(), "RenderPipelineFile")
target_root = build_dir
if os.path.exists(target_root):
shutil.rmtree(target_root)
if not os.path.exists(source_root):
print("⚠️ RenderPipelineFile 文件夹未找到")
return False
blocked_dirs = {"__pycache__", ".git", ".vscode", "samples", "toolkit"}
blocked_root_files = {"setup.py", "start_daytime_editor.py", "start_plugin_configurator.py"}
blocked_relative_dirs = {
os.path.normpath(os.path.join("rplibs", "yaml", "yaml_py2")),
}
for root, dirs, files in os.walk(source_root):
relative_root = os.path.relpath(root, source_root)
normalized_relative_root = os.path.normpath(relative_root)
dirs[:] = [
name for name in dirs
if name not in blocked_dirs
and os.path.normpath(os.path.join(normalized_relative_root, name)) not in blocked_relative_dirs
]
target_dir = target_root if relative_root == "." else os.path.join(target_root, relative_root)
os.makedirs(target_dir, exist_ok=True)
for file_name in files:
if file_name.endswith((".pyc", ".pyo", ".log")):
continue
if relative_root == "." and file_name in blocked_root_files:
continue
source_file = os.path.join(root, file_name)
if file_name.endswith(".py"):
compiled_file = os.path.join(
target_dir,
f"{os.path.splitext(file_name)[0]}.pyc",
)
try:
py_compile.compile(source_file, cfile=compiled_file, doraise=True)
except py_compile.PyCompileError as e:
print(f"⚠️ RenderPipeline 脚本编译失败,保留源码: {source_file} ({e})")
shutil.copy2(source_file, os.path.join(target_dir, file_name))
else:
shutil.copy2(source_file, os.path.join(target_dir, file_name))
print("✓ RenderPipeline 运行时资源已导出")
return True
except Exception as e:
print(f"⚠️ 导出 RenderPipeline 运行时资源失败: {e}")
return False
# ==================== 辅助方法 ====================
def _clearCurrentScene(self):
"""清空当前场景"""
try:
if hasattr(self.world, 'scene_manager') and self.world.scene_manager:
self.world.scene_manager.clearScene()
print("✓ 当前场景已清空")
except Exception as e:
print(f"清空场景失败: {str(e)}")
def updateWindowTitle(self, parent_window, project_name):
"""更新窗口标题(保留方法以兼容旧代码)"""
# 这个方法现在不需要做任何事情因为我们不再处理UI
pass