"""Scene description and runtime scene helpers for MetaCore project v2.""" from __future__ import annotations import json import os from datetime import datetime from project.project_schema import ( ENGINE_NAME, MANIFEST_SCHEMA_VERSION, METASCENE_SCHEMA_VERSION, RUNTIME_SCENE_SCHEMA_VERSION, relative_project_path, ) def load_json(file_path: str, default_value): if not file_path or not os.path.exists(file_path): return default_value try: with open(file_path, "r", encoding="utf-8-sig") as f: return json.load(f) except Exception as e: print(f"⚠️ 读取 JSON 失败 {file_path}: {e}") return default_value def save_json(file_path: str, payload): os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=4) def _migrate_scene_description_v1_to_v2(payload: dict) -> dict: payload = dict(payload or {}) scene_components = dict(payload.get("scene_components", {}) or {}) camera_component = dict(scene_components.get("camera", {}) or payload.get("camera", {}) or {}) gui_component = dict(scene_components.get("gui", {}) or {}) gui_elements = list(gui_component.get("elements", []) or payload.get("gui", []) or []) lui_component = dict(scene_components.get("lui", {}) or payload.get("lui", {}) or {}) scene_components["camera"] = camera_component scene_components["gui"] = {"elements": gui_elements} scene_components["lui"] = lui_component payload["scene_components"] = scene_components payload["camera"] = camera_component payload["gui"] = gui_elements payload["lui"] = lui_component normalized_nodes = [] for raw_node in list(payload.get("nodes", []) or []): node = dict(raw_node or {}) components = dict(node.get("components", {}) or {}) runtime_interactive = bool( (components.get("metadata", {}) or {}).get("runtime_interactive", node.get("runtime_interactive", False)) ) scripts = list((components.get("scripts", {}) or {}).get("entries", []) or node.get("scripts", []) or []) model_component = dict(components.get("model", {}) or {}) if not model_component and (node.get("asset_guid") or node.get("asset_path") or node.get("imported_node_key")): model_component = { "asset_guid": node.get("asset_guid", ""), "asset_path": node.get("asset_path", ""), "imported_node_key": node.get("imported_node_key", ""), "is_model_root": bool((node.get("tags", {}) or {}).get("is_model_root")), } components["model"] = model_component components["scripts"] = {"entries": scripts} if scripts else {} components["metadata"] = dict(components.get("metadata", {}) or {}) components["metadata"]["runtime_interactive"] = runtime_interactive components["metadata"].setdefault("node_class", node.get("node_class", "")) node["components"] = components node["runtime_interactive"] = runtime_interactive node["scripts"] = scripts if model_component: node["asset_guid"] = str(model_component.get("asset_guid", "") or node.get("asset_guid", "") or "") node["asset_path"] = str(model_component.get("asset_path", "") or node.get("asset_path", "") or "") node["imported_node_key"] = str(model_component.get("imported_node_key", "") or node.get("imported_node_key", "") or "") normalized_nodes.append(node) payload["nodes"] = normalized_nodes payload["schema_version"] = 2 return payload def normalize_scene_description(payload: dict) -> dict: payload = dict(payload or {}) version = int(payload.get("schema_version", 1) or 1) original_version = version while version < METASCENE_SCHEMA_VERSION: if version == 1: payload = _migrate_scene_description_v1_to_v2(payload) version = 2 continue break payload["schema_version"] = METASCENE_SCHEMA_VERSION if original_version != METASCENE_SCHEMA_VERSION: print(f"ℹ️ 场景描述已从 schema v{original_version} 迁移到 v{METASCENE_SCHEMA_VERSION}") return payload def _migrate_runtime_scene_v1_to_v2(payload: dict) -> dict: payload = dict(payload or {}) scene_components = dict(payload.get("scene_components", {}) or {}) camera_component = dict(scene_components.get("camera", {}) or payload.get("camera", {}) or {}) gui_component = dict(scene_components.get("gui", {}) or {}) gui_elements = list(gui_component.get("elements", []) or payload.get("gui", []) or []) lui_component = dict(scene_components.get("lui", {}) or payload.get("lui", {}) or {}) payload["scene_components"] = { "camera": camera_component, "gui": {"elements": gui_elements}, "lui": lui_component, } payload["camera"] = camera_component payload["gui"] = gui_elements payload["lui"] = lui_component payload["nodes"] = normalize_scene_description({"nodes": payload.get("nodes", []) or []}).get("nodes", []) payload["schema_version"] = 2 return payload def normalize_runtime_scene(payload: dict) -> dict: payload = dict(payload or {}) version = int(payload.get("schema_version", 1) or 1) original_version = version while version < RUNTIME_SCENE_SCHEMA_VERSION: if version == 1: payload = _migrate_runtime_scene_v1_to_v2(payload) version = 2 continue break payload["schema_version"] = RUNTIME_SCENE_SCHEMA_VERSION if original_version != RUNTIME_SCENE_SCHEMA_VERSION: print(f"ℹ️ 运行时场景已从 schema v{original_version} 迁移到 v{RUNTIME_SCENE_SCHEMA_VERSION}") return payload def _node_path_id(parent_id: str, index: int) -> str: return f"{parent_id}/{index}" if parent_id else str(index) def _collect_light_component(node): if not node.hasTag("light_type"): return {} component = {"type": node.getTag("light_type")} for tag_name, output_name in ( ("light_energy", "energy"), ("light_radius", "radius"), ("light_fov", "fov"), ("stored_energy", "stored_energy"), ): if node.hasTag(tag_name): component[output_name] = node.getTag(tag_name) return component def _collect_model_component(node, asset_guid: str, asset_path: str, imported_node_key: str) -> dict: if not asset_guid and not node.hasTag("is_model_root"): return {} return { "asset_guid": asset_guid, "asset_path": asset_path, "imported_node_key": imported_node_key, "is_model_root": bool(node.hasTag("is_model_root")), } def _collect_script_component(scripts: list[dict]) -> dict: return {"entries": list(scripts or [])} if scripts else {} def _collect_node_metadata_component(node, runtime_interactive: bool) -> dict: metadata = { "node_class": node.node().getClassType().getName() if node.node() else "", "runtime_interactive": runtime_interactive, } for tag_name in ( "has_animations", "has_animations_checked", "can_create_actor_from_memory", "saved_has_animations", "saved_can_create_actor_from_memory", ): if node.hasTag(tag_name): metadata[tag_name] = node.getTag(tag_name) return metadata def _collect_material_override_component(node): material_tags = {} for tag_name in ( "material_ambient", "material_diffuse", "material_specular", "material_emission", "material_shininess", "material_basecolor", "material_roughness", "material_metallic", "material_ior", "material_effect_metallic_enabled", "material_effect_default_texture_enabled", "material_effect_parallax_enabled", "material_render_effect_signature", "material_texture_diffuse", "material_texture_normal", "material_texture_ior", "material_texture_roughness", "material_texture_parallax", "material_texture_metallic", "material_texture_emission", "material_texture_ao", "material_texture_alpha", "material_texture_detail", "material_texture_gloss", ): if node.hasTag(tag_name): material_tags[tag_name] = node.getTag(tag_name) return material_tags def _resolve_imported_node_key(node, fallback_key: str) -> str: if node.hasTag("is_model_root"): for tag_name in ("imported_node_key", "source_model_node_key", "ssbo_tree_key", "tree_item_key"): if node.hasTag(tag_name): tag_value = str(node.getTag(tag_name) or "").strip() if tag_value: return tag_value return "" for tag_name in ("imported_node_key", "source_model_node_key", "ssbo_tree_key", "tree_item_key"): if node.hasTag(tag_name): tag_value = str(node.getTag(tag_name) or "").strip() if tag_value: return tag_value return fallback_key def _should_serialize_child_nodes(node, asset_guid: str, scripts: list[dict], runtime_interactive: bool, light_component: dict, material_component: dict) -> bool: if not node: return False if asset_guid: return True if scripts: return True if runtime_interactive: return True if light_component: return True if material_component: return True if node.hasTag("scene_transform_dirty") and node.getTag("scene_transform_dirty").lower() == "true": return True if node.hasTag("scene_material_dirty") and node.getTag("scene_material_dirty").lower() == "true": return True if node.hasTag("user_visible") and node.getTag("user_visible").lower() != "true": return True return False def _build_scene_components(camera_state: dict, gui_elements: list, lui_snapshot: dict) -> dict: return { "camera": dict(camera_state or {}), "gui": {"elements": list(gui_elements or [])}, "lui": dict(lui_snapshot or {}), } def _build_scene_description_payload( *, asset_database, project_root: str, scene_guid: str, scene_name: str, scene_file_rel: str, root_nodes, cache_bam_path: str = "", cache_gui_path: str = "", cache_lui_path: str = "", gui_elements: list | None = None, lui_snapshot: dict | None = None, camera_state: dict | None = None, ): gui_elements = list(gui_elements or []) lui_snapshot = dict(lui_snapshot or {}) camera_state = dict(camera_state or {}) node_entries = [] referenced_asset_guids = set() referenced_script_guids = set() subtree_serializable_cache = {} def _node_is_serializable(node) -> bool: if not node or node.isEmpty(): return False cache_key = id(node) if cache_key in subtree_serializable_cache: return subtree_serializable_cache[cache_key] runtime_interactive = ( node.hasTag("runtime_interactive") and node.getTag("runtime_interactive").lower() == "true" ) scripts = [] if node.hasTag("scripts_info"): try: scripts = json.loads(node.getTag("scripts_info")) except Exception: scripts = [] asset_guid = "" if node.hasTag("is_model_root"): asset_guid = "root" light_component = _collect_light_component(node) material_component = _collect_material_override_component(node) serializable_here = _should_serialize_child_nodes( node, asset_guid, scripts, runtime_interactive, light_component, material_component, ) if serializable_here: subtree_serializable_cache[cache_key] = True return True try: children = list(node.getChildren()) except Exception: children = [] result = any(_node_is_serializable(child) for child in children if child and not child.isEmpty()) subtree_serializable_cache[cache_key] = result return result def walk_nodes(children, parent_id="", force_include=False): for index, child in enumerate(list(children or [])): if not child or child.isEmpty(): continue node_id = _node_path_id(parent_id, index) child_force_include = bool(force_include) if not child_force_include: try: child_force_include = ( child.hasTag("is_model_root") and child.hasTag("ssbo_managed") and child.getTag("ssbo_managed").strip().lower() in ("1", "true", "yes", "on") ) except Exception: child_force_include = False # If we are force-including (e.g. ssbo_managed model skeleton), # we must include this node regardless of its individual "serializable" status. if not child_force_include and not _node_is_serializable(child): continue node_name = child.getName() imported_node_key = _resolve_imported_node_key(child, node_id) runtime_interactive = ( child.hasTag("runtime_interactive") and child.getTag("runtime_interactive").lower() == "true" ) scripts = [] if child.hasTag("scripts_info"): try: scripts = json.loads(child.getTag("scripts_info")) except Exception: scripts = [] asset_guid = "" asset_path = "" if child.hasTag("is_model_root"): for tag_name in ("model_path", "saved_model_path", "original_path"): if child.hasTag(tag_name): candidate = child.getTag(tag_name) if candidate: asset_record = asset_database.register_asset(candidate, copy_into_assets=False) if asset_record: asset_guid = asset_record.get("guid", "") asset_path = asset_record.get("asset_path", "") referenced_asset_guids.add(asset_guid) break for script_info in scripts: script_guid = str(script_info.get("script_guid", "") or "").strip() asset_record = asset_database.get_asset(script_guid) if script_guid else {} if not asset_record: project_relative_path = str(script_info.get("project_relative_path", "") or "") if project_relative_path: script_abs_path = os.path.join(project_root, project_relative_path.replace("/", os.sep)) asset_record = asset_database.register_asset(script_abs_path, copy_into_assets=False) if asset_record: resolved_guid = asset_record.get("guid", "") if resolved_guid: script_info["script_guid"] = resolved_guid referenced_script_guids.add(resolved_guid) asset_path_for_script = str(asset_record.get("asset_path", "") or "") if asset_path_for_script: script_info["project_relative_path"] = asset_path_for_script script_info["file"] = asset_path_for_script light_component = _collect_light_component(child) material_component = _collect_material_override_component(child) node_entries.append( { "node_id": node_id, "parent_id": parent_id or None, "name": node_name, "imported_node_key": imported_node_key, "asset_guid": asset_guid, "asset_path": asset_path, "node_class": child.node().getClassType().getName() if child.node() else "", "transform": { "position": [child.getX(), child.getY(), child.getZ()], "rotation": [child.getH(), child.getP(), child.getR()], "scale": [child.getSx(), child.getSy(), child.getSz()], }, "visibility": { "user_visible": ( child.getTag("user_visible").lower() == "true" if child.hasTag("user_visible") else True ) }, "runtime_interactive": runtime_interactive, "scripts": scripts, "components": { "model": _collect_model_component(child, asset_guid, asset_path, imported_node_key), "scripts": _collect_script_component(scripts), "light": light_component, "material_overrides": material_component, "metadata": _collect_node_metadata_component(child, runtime_interactive), }, "tags": { tag_name: child.getTag(tag_name) for tag_name in child.getTagKeys() }, } ) # Recursive step: if force_include is True, we must always recurse to capture the full skeleton. if ( child_force_include or _should_serialize_child_nodes( child, asset_guid, scripts, runtime_interactive, light_component, material_component, ) or any( _node_is_serializable(grandchild) for grandchild in list(child.getChildren()) if grandchild and not grandchild.isEmpty() ) ): try: child_nodes = list(child.getChildren()) except Exception: child_nodes = [] walk_nodes(child_nodes, node_id, force_include=child_force_include) walk_nodes(root_nodes) description = { "schema_version": METASCENE_SCHEMA_VERSION, "scene_guid": scene_guid, "scene_name": scene_name, "scene_file": scene_file_rel, "saved_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "cache": { "scene_bam": relative_project_path(project_root, cache_bam_path), "gui_json": relative_project_path(project_root, cache_gui_path), "lui_json": relative_project_path(project_root, cache_lui_path), }, "referenced_asset_guids": sorted(guid for guid in referenced_asset_guids if guid), "referenced_script_guids": sorted(guid for guid in referenced_script_guids if guid), "scene_components": _build_scene_components(camera_state, gui_elements, lui_snapshot), "camera": camera_state, "gui": gui_elements, "lui": lui_snapshot, "nodes": node_entries, } return description def build_scene_description_from_world( *, asset_database, project_root: str, scene_guid: str, scene_name: str, scene_file_rel: str, root_nodes, cache_bam_path: str = "", cache_gui_path: str = "", cache_lui_path: str = "", gui_elements: list | None = None, lui_snapshot: dict | None = None, camera_state: dict | None = None, ): return _build_scene_description_payload( asset_database=asset_database, project_root=project_root, scene_guid=scene_guid, scene_name=scene_name, scene_file_rel=scene_file_rel, root_nodes=root_nodes, cache_bam_path=cache_bam_path, cache_gui_path=cache_gui_path, cache_lui_path=cache_lui_path, gui_elements=gui_elements, lui_snapshot=lui_snapshot, camera_state=camera_state, ) def build_runtime_scene(scene_description: dict): nodes = list(scene_description.get("nodes", []) or []) scene_components = dict(scene_description.get("scene_components", {}) or {}) camera_component = dict(scene_components.get("camera", {}) or scene_description.get("camera", {}) or {}) gui_component = dict(scene_components.get("gui", {}) or {}) gui_elements = list(gui_component.get("elements", []) or scene_description.get("gui", []) or []) lui_component = dict(scene_components.get("lui", {}) or scene_description.get("lui", {}) or {}) interactive_model_names = [] interactive_node_ids = [] static_model_names = [] static_node_ids = [] for node in nodes: if not node.get("asset_guid"): continue metadata_component = dict((node.get("components", {}) or {}).get("metadata", {}) or {}) has_animations = str( metadata_component.get( "has_animations", metadata_component.get( "saved_has_animations", (node.get("tags", {}) or {}).get( "has_animations", (node.get("tags", {}) or {}).get("saved_has_animations", ""), ), ), ) or "" ).lower() == "true" can_create_actor = str( metadata_component.get( "can_create_actor_from_memory", metadata_component.get( "saved_can_create_actor_from_memory", (node.get("tags", {}) or {}).get( "can_create_actor_from_memory", (node.get("tags", {}) or {}).get("saved_can_create_actor_from_memory", ""), ), ), ) or "" ).lower() == "true" if has_animations or can_create_actor: interactive_node_ids.append(node.get("node_id", "")) interactive_model_names.append(node.get("name", "")) continue if node.get("runtime_interactive"): interactive_node_ids.append(node.get("node_id", "")) interactive_model_names.append(node.get("name", "")) continue if node.get("scripts"): interactive_node_ids.append(node.get("node_id", "")) interactive_model_names.append(node.get("name", "")) continue static_node_ids.append(node.get("node_id", "")) static_model_names.append(node.get("name", "")) return { "schema_version": RUNTIME_SCENE_SCHEMA_VERSION, "scene_guid": scene_description.get("scene_guid", ""), "scene_name": scene_description.get("scene_name", ""), "cook": { "interactive_node_ids": sorted({node_id for node_id in interactive_node_ids if node_id}), "interactive_model_names": sorted({name for name in interactive_model_names if name}), "static_node_ids": sorted({node_id for node_id in static_node_ids if node_id}), "static_model_names": sorted({name for name in static_model_names if name}), }, "interactive_model_names": sorted({name for name in interactive_model_names if name}), "scene_components": { "camera": camera_component, "gui": {"elements": gui_elements}, "lui": lui_component, }, "camera": camera_component, "gui": gui_elements, "lui": lui_component, "nodes": nodes, "referenced_asset_guids": scene_description.get("referenced_asset_guids", []) or [], "referenced_script_guids": scene_description.get("referenced_script_guids", []) or [], } def build_scene_cook_manifest(scene_description: dict, runtime_scene: dict): runtime_cook = dict(runtime_scene.get("cook", {}) or {}) return { "schema_version": 1, "scene_guid": scene_description.get("scene_guid", ""), "scene_name": scene_description.get("scene_name", ""), "source_scene_file": scene_description.get("scene_file", ""), "node_count": len(scene_description.get("nodes", []) or []), "interactive_node_ids": list(runtime_cook.get("interactive_node_ids", []) or []), "interactive_model_names": list(runtime_cook.get("interactive_model_names", []) or []), "static_node_ids": list(runtime_cook.get("static_node_ids", []) or []), "static_model_names": list(runtime_cook.get("static_model_names", []) or []), "referenced_asset_guids": list(scene_description.get("referenced_asset_guids", []) or []), "referenced_script_guids": list(scene_description.get("referenced_script_guids", []) or []), "scene_components": dict(scene_description.get("scene_components", {}) or {}), "cache": dict(scene_description.get("cache", {}) or {}), "saved_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } def build_runtime_manifest(project_config: dict, runtime_scenes: list[dict], asset_records: list[dict]): scene_guids = [scene.get("scene_guid", "") for scene in runtime_scenes] startup_scene_guid = project_config.get("startup_scene_guid", "") if startup_scene_guid not in scene_guids: startup_scene_guid = scene_guids[0] if scene_guids else "" total_interactive_nodes = 0 total_static_nodes = 0 total_interactive_models = 0 total_static_models = 0 for scene in runtime_scenes: cook = dict(scene.get("cook", {}) or {}) total_interactive_nodes += len(list(cook.get("interactive_node_ids", []) or [])) total_static_nodes += len(list(cook.get("static_node_ids", []) or [])) total_interactive_models += len(list(cook.get("interactive_model_names", []) or [])) total_static_models += len(list(cook.get("static_model_names", []) or [])) return { "schema_version": MANIFEST_SCHEMA_VERSION, "project_name": project_config.get("name", f"{ENGINE_NAME}Project"), "startup_scene_guid": startup_scene_guid, "scene_guids": scene_guids, "scenes": [ { "guid": scene.get("scene_guid", ""), "name": scene.get("scene_name", ""), "runtime_path": f"scenes/{scene.get('scene_guid', '')}.runtime.json", } for scene in runtime_scenes ], "assets": [ { "guid": asset.get("guid", ""), "asset_type": asset.get("asset_type", ""), "asset_path": asset.get("asset_path", ""), "imported_cache": asset.get("imported_cache", {}) or {}, } for asset in asset_records if asset ], "cook_summary": { "scene_count": len(runtime_scenes), "asset_count": len(asset_records), "interactive_node_count": total_interactive_nodes, "static_node_count": total_static_nodes, "interactive_model_count": total_interactive_models, "static_model_count": total_static_models, }, "build_settings": project_config.get("build_settings", {}) or {}, }