"""WebGL project packager for EG editor (Three.js static scene export).""" from __future__ import annotations import datetime import json import os import re import shutil import stat import subprocess import tempfile from pathlib import Path from typing import Any, Dict, List, Optional, Tuple class WebGLPackager: """Export current EG scene into a static WebGL package directory.""" BASIS_MATRIX_ROW_MAJOR = [ 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, -1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ] ENERGY_TO_INTENSITY_SCALE = 0.001 def __init__(self, world): self.world = world self.scene_manager = getattr(world, "scene_manager", None) self._project_path = "" self._output_root = "" self._assets_model_dir = "" self._assets_texture_dir = "" self._copied_source_to_uri: Dict[str, str] = {} self._name_counter: Dict[str, int] = {} self._node_id_by_pointer: Dict[int, str] = {} self.report: Dict[str, Any] = { "status": "failed", "warnings": [], "missing_assets": [], "unsupported_assets": [], "converted_assets": [], "copied_assets": [], "output_dir": "", } def package(self, project_path: str, output_dir: str) -> Dict[str, Any]: """Export project as WebGL static site and return export report.""" self._project_path = os.path.normpath(project_path) project_name = os.path.basename(self._project_path.rstrip(os.sep)) or "project" self._output_root = os.path.normpath(os.path.join(output_dir, f"{project_name}_webgl")) self._assets_model_dir = os.path.join(self._output_root, "assets", "models") self._assets_texture_dir = os.path.join(self._output_root, "assets", "textures") self.report["output_dir"] = self._output_root try: if not os.path.isdir(self._project_path): self._fail(f"项目路径不存在: {self._project_path}") return self.report self._prepare_output_dir() self._copy_templates() scene_manifest = self._build_scene_manifest(project_name) self._write_json( os.path.join(self._output_root, "scene", "scene_webgl.json"), scene_manifest, ) self._write_preview_scripts() status = "success" if self.report["missing_assets"] or self.report["unsupported_assets"]: status = "partial" self.report["status"] = status except Exception as exc: self._fail(f"WebGL打包失败: {exc}") finally: self._write_json( os.path.join(self._output_root, "reports", "export_report.json"), self.report, ) return self.report def _prepare_output_dir(self) -> None: if os.path.isdir(self._output_root): shutil.rmtree(self._output_root) os.makedirs(os.path.join(self._output_root, "js"), exist_ok=True) os.makedirs(os.path.join(self._output_root, "vendor"), exist_ok=True) os.makedirs(self._assets_model_dir, exist_ok=True) os.makedirs(self._assets_texture_dir, exist_ok=True) os.makedirs(os.path.join(self._output_root, "scene"), exist_ok=True) os.makedirs(os.path.join(self._output_root, "reports"), exist_ok=True) def _copy_templates(self) -> None: repo_root = Path(__file__).resolve().parent.parent template_root = repo_root / "templates" / "webgl" if not template_root.exists(): raise FileNotFoundError(f"模板目录不存在: {template_root}") file_mapping = { "index.html": "index.html", "style.css": "style.css", "viewer.js": os.path.join("js", "viewer.js"), } for src_name, dst_rel in file_mapping.items(): src = template_root / src_name dst = Path(self._output_root) / dst_rel if not src.exists(): raise FileNotFoundError(f"模板文件不存在: {src}") dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(str(src), str(dst)) vendor_src = template_root / "vendor" vendor_dst = Path(self._output_root) / "vendor" if vendor_src.exists(): for entry in vendor_src.iterdir(): if entry.is_file(): shutil.copy2(str(entry), str(vendor_dst / entry.name)) self._try_resolve_vendor_files(vendor_dst) # Placeholder marker warning placeholder_file = vendor_dst / "three.module.min.js" if placeholder_file.exists(): content = placeholder_file.read_text(encoding="utf-8", errors="ignore") if "EG_VENDOR_PLACEHOLDER" in content: self.report["warnings"].append( "当前 vendor 为占位文件,请替换为官方 three.module.min.js / OrbitControls.js / GLTFLoader.js 后再预览。" ) def _try_resolve_vendor_files(self, vendor_dst: Path) -> None: """Try to replace template placeholders with system-installed Three.js modules.""" lookup_roots = [ Path("/usr/share/javascript/three"), Path("/usr/share/nodejs/three"), Path("/usr/lib/node_modules/three"), Path.home() / ".local/lib/node_modules/three", ] targets = { "three.module.min.js": [ "build/three.module.min.js", "build/three.module.js", ], "OrbitControls.js": [ "examples/jsm/controls/OrbitControls.js", ], "GLTFLoader.js": [ "examples/jsm/loaders/GLTFLoader.js", ], } for dst_name, rel_candidates in targets.items(): dst_path = vendor_dst / dst_name if not dst_path.exists(): continue try: content = dst_path.read_text(encoding="utf-8", errors="ignore") except Exception: content = "" # Only replace placeholders. if "EG_VENDOR_PLACEHOLDER" not in content: continue found_source = None for root in lookup_roots: if not root.exists(): continue for rel in rel_candidates: candidate = root / rel if candidate.exists() and candidate.is_file(): found_source = candidate break if found_source: break if found_source: shutil.copy2(str(found_source), str(dst_path)) def _build_scene_manifest(self, project_name: str) -> Dict[str, Any]: render = getattr(self.world, "render", None) if not render: raise RuntimeError("world.render 不可用") export_nodes: List[Any] = [] model_nodes = self._collect_model_nodes() spot_nodes = self._collect_valid_nodes(getattr(self.scene_manager, "Spotlight", [])) point_nodes = self._collect_valid_nodes(getattr(self.scene_manager, "Pointlight", [])) ground_node = self._get_default_ground_node() export_nodes.extend(model_nodes) export_nodes.extend(spot_nodes) export_nodes.extend(point_nodes) if ground_node is not None: export_nodes.append(ground_node) # Unique by pointer uniq: Dict[int, Any] = {} for node in export_nodes: uniq[id(node)] = node export_nodes = list(uniq.values()) for index, node in enumerate(export_nodes, start=1): self._node_id_by_pointer[id(node)] = f"node_{index:04d}" nodes_json: List[Dict[str, Any]] = [] for node in model_nodes: entry = self._build_model_node_entry(node) if entry: nodes_json.append(entry) for node in point_nodes: entry = self._build_light_node_entry(node, kind="point_light") if entry: nodes_json.append(entry) for node in spot_nodes: entry = self._build_light_node_entry(node, kind="spot_light") if entry: nodes_json.append(entry) if ground_node is not None: entry = self._build_ground_node_entry(ground_node) if entry: nodes_json.append(entry) manifest = { "meta": { "format_version": "1.0", "project_name": project_name, "exported_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, "coordinate": { "source": "panda3d_zup", "target": "threejs_yup", "matrix_convention": "panda_row_vector_row_major", "basis_matrix": self.BASIS_MATRIX_ROW_MAJOR, }, "camera": self._build_camera_entry(), "environment": self._build_environment_entry(), "nodes": nodes_json, } return manifest def _collect_model_nodes(self) -> List[Any]: if not self.scene_manager: return [] all_models = self._collect_valid_nodes(getattr(self.scene_manager, "models", [])) light_ptrs = {id(n) for n in self._collect_valid_nodes(getattr(self.scene_manager, "Spotlight", []))} light_ptrs |= {id(n) for n in self._collect_valid_nodes(getattr(self.scene_manager, "Pointlight", []))} models: List[Any] = [] for node in all_models: if id(node) in light_ptrs: continue if node.hasTag("light_type"): continue if node.getName() in {"render", "camera", "cam"}: continue models.append(node) return models @staticmethod def _collect_valid_nodes(nodes: List[Any]) -> List[Any]: valid = [] for node in nodes or []: if not node: continue try: if node.isEmpty(): continue except Exception: continue valid.append(node) return valid def _get_default_ground_node(self): ground = getattr(self.world, "ground", None) if not ground: return None try: if ground.isEmpty(): return None return ground except Exception: return None def _build_camera_entry(self) -> Dict[str, Any]: render = getattr(self.world, "render", None) cam = getattr(self.world, "cam", None) or getattr(self.world, "camera", None) default = { "matrix_local_row_major": [ 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, -50.0, 20.0, 1.0, ], "fov_deg": 80.0, "near": 0.1, "far": 10000.0, } if not cam or not render: return default try: cam_mat = cam.getMat(render) fov = 80.0 near = 0.1 far = 10000.0 lens = cam.node().getLens() if cam.node() else None if lens: try: fov = float(lens.getFov()[0]) except Exception: pass try: near = float(lens.getNear()) except Exception: pass try: far = float(lens.getFar()) except Exception: pass return { "matrix_local_row_major": self._mat4_to_row_major_list(cam_mat), "fov_deg": fov, "near": near, "far": far, } except Exception: return default def _build_environment_entry(self) -> Dict[str, Any]: ambient = getattr(self.world, "ambient_light", None) directional = getattr(self.world, "directional_light", None) ambient_color = [0.2, 0.2, 0.2] directional_color = [0.8, 0.8, 0.8] directional_dir = [0.0, 0.0, -1.0] if ambient and not ambient.isEmpty(): try: c = ambient.node().getColor() ambient_color = [float(c[0]), float(c[1]), float(c[2])] except Exception: pass if directional and not directional.isEmpty(): try: c = directional.node().getColor() directional_color = [float(c[0]), float(c[1]), float(c[2])] except Exception: pass try: q = directional.getQuat(getattr(self.world, "render", directional.getParent())) fwd = q.getForward() directional_dir = [float(fwd[0]), float(fwd[1]), float(fwd[2])] except Exception: pass return { "include_default_ground": True, "ambient_light": { "color": ambient_color, "intensity": 1.0, }, "directional_light": { "color": directional_color, "intensity": 1.0, "direction": directional_dir, }, } def _build_model_node_entry(self, node) -> Optional[Dict[str, Any]]: node_id = self._node_id_by_pointer.get(id(node)) if not node_id: return None parent_id, mat = self._get_parent_and_matrix(node) node_name = node.getName() or node_id model_source, source_tag = self._resolve_model_source(node) if not model_source: self.report["missing_assets"].append( { "node": node_name, "reason": "model_path_not_found", "tags_checked": ["model_path", "saved_model_path", "original_path", "file"], } ) return None model_uri = self._prepare_model_asset(model_source, node_name) if not model_uri: self.report["unsupported_assets"].append( { "node": node_name, "source": model_source, "reason": "model_conversion_failed", } ) return None material_override = self._extract_material_override(node) textures = self._collect_and_copy_texture_overrides(node, model_source) entry: Dict[str, Any] = { "id": node_id, "name": node_name, "kind": "model", "parent_id": parent_id, "matrix_local_row_major": self._mat4_to_row_major_list(mat), "model": {"uri": model_uri}, "material_override": material_override, "source_model_tag": source_tag, } if textures: entry["texture_overrides"] = textures return entry def _build_light_node_entry(self, node, kind: str) -> Optional[Dict[str, Any]]: node_id = self._node_id_by_pointer.get(id(node)) if not node_id: return None parent_id, mat = self._get_parent_and_matrix(node) node_name = node.getName() or node_id light_obj = node.getPythonTag("rp_light_object") if node.hasPythonTag("rp_light_object") else None color = [1.0, 1.0, 1.0] energy = self._safe_float(node.getTag("light_energy"), 5000.0) if node.hasTag("light_energy") else 5000.0 radius = self._safe_float(node.getTag("light_radius"), 30.0) if node.hasTag("light_radius") else 30.0 spot_fov = self._safe_float(node.getTag("light_fov"), 45.0) if node.hasTag("light_fov") else 45.0 inner_ratio = 0.4 if light_obj is not None: try: c = getattr(light_obj, "color", None) if c is not None: color = [float(c[0]), float(c[1]), float(c[2])] except Exception: pass try: energy = float(getattr(light_obj, "energy", energy)) except Exception: pass try: radius = float(getattr(light_obj, "radius", radius)) except Exception: pass try: if hasattr(light_obj, "fov"): spot_fov = float(getattr(light_obj, "fov", spot_fov)) except Exception: pass try: if hasattr(light_obj, "inner_radius"): inner_ratio = float(getattr(light_obj, "inner_radius", inner_ratio)) except Exception: pass intensity = max(0.0, energy * self.ENERGY_TO_INTENSITY_SCALE) return { "id": node_id, "name": node_name, "kind": kind, "parent_id": parent_id, "matrix_local_row_major": self._mat4_to_row_major_list(mat), "light": { "color": color, "intensity": intensity, "range": max(0.0, radius), "spot_angle_deg": max(1.0, spot_fov), "inner_cone_ratio": max(0.0, min(1.0, inner_ratio)), }, } def _build_ground_node_entry(self, node) -> Optional[Dict[str, Any]]: node_id = self._node_id_by_pointer.get(id(node)) if not node_id: return None parent_id, mat = self._get_parent_and_matrix(node) node_name = node.getName() or "ground" color = [0.8, 0.8, 0.8, 1.0] roughness = 1.0 metallic = 0.0 material = None try: material = node.getMaterial() except Exception: material = None if material: try: if material.hasBaseColor(): c = material.getBaseColor() color = [float(c[0]), float(c[1]), float(c[2]), float(c[3])] except Exception: pass try: roughness = float(material.getRoughness()) except Exception: pass try: metallic = float(material.getMetallic()) except Exception: pass return { "id": node_id, "name": node_name, "kind": "ground", "parent_id": parent_id, "matrix_local_row_major": self._mat4_to_row_major_list(mat), "ground": { "width": 100.0, "height": 100.0, }, "material_override": { "base_color": [color[0], color[1], color[2], 1.0], "roughness": roughness, "metallic": metallic, "opacity": 1.0, }, } def _resolve_model_source(self, node) -> Tuple[Optional[str], str]: tags = ["model_path", "saved_model_path", "original_path", "file"] for tag in tags: if not node.hasTag(tag): continue value = (node.getTag(tag) or "").strip() if not value: continue resolved = self._resolve_asset_path(value) if resolved: return resolved, tag return None, "" def _resolve_asset_path(self, candidate: str) -> Optional[str]: candidate = (candidate or "").strip() if not candidate: return None if candidate.startswith(("http://", "https://")): return None # Absolute path: use directly. if os.path.isabs(candidate): abs_candidate = os.path.normpath(candidate) if os.path.exists(abs_candidate): return abs_candidate return None # Relative path resolution policy. search_roots = [ self._project_path, os.path.join(self._project_path, "models"), os.path.join(self._project_path, "Resources"), os.path.join(self._project_path, "scenes", "resources"), os.getcwd(), ] for root in search_roots: full = os.path.normpath(os.path.join(root, candidate)) if os.path.exists(full): return full return None def _prepare_model_asset(self, source_path: str, node_name: str) -> Optional[str]: source_path = os.path.normpath(source_path) ext = os.path.splitext(source_path)[1].lower() if ext in {".gltf", ".glb"}: return self._copy_asset_to_models(source_path) with tempfile.TemporaryDirectory(prefix="eg_webgl_conv_") as temp_dir: conversion_source = source_path if ext == ".bam": temp_egg = os.path.join(temp_dir, self._sanitize_filename(Path(source_path).stem) + ".egg") if not self._run_tool_command(["bam2egg", source_path, temp_egg], timeout=120): return None temp_obj = os.path.join(temp_dir, self._sanitize_filename(Path(source_path).stem) + ".obj") if not self._run_tool_command(["egg2obj", temp_egg, temp_obj], timeout=120): return None conversion_source = temp_obj elif ext == ".egg": temp_obj = os.path.join(temp_dir, self._sanitize_filename(Path(source_path).stem) + ".obj") if not self._run_tool_command(["egg2obj", source_path, temp_obj], timeout=120): return None conversion_source = temp_obj target_filename = self._unique_filename(node_name or Path(source_path).stem, ".glb") target_abs = os.path.join(self._assets_model_dir, target_filename) converter = self._convert_to_glb(conversion_source, target_abs) if not converter: return None self.report["converted_assets"].append( { "source": source_path, "converted_from": conversion_source, "target": os.path.relpath(target_abs, self._output_root).replace("\\", "/"), "converter": converter, } ) return os.path.relpath(target_abs, self._output_root).replace("\\", "/") def _convert_to_glb(self, source_path: str, target_path: str) -> str: scene_manager = self.scene_manager if scene_manager is None: return "" conversion_order = [ "_convertWithBlender", "_convertWithFBX2glTF", "_convertWithAssimp", ] for method_name in conversion_order: method = getattr(scene_manager, method_name, None) if not callable(method): continue try: ok = method(source_path, target_path, None) except TypeError: ok = method(source_path, target_path) except Exception: ok = False if ok and os.path.exists(target_path): return method_name return "" def _run_tool_command(self, args: List[str], timeout: int) -> bool: executable = shutil.which(args[0]) if not executable: self.report["warnings"].append(f"缺少转换工具: {args[0]}") return False try: result = subprocess.run( args, check=False, capture_output=True, text=True, timeout=timeout, ) except Exception as exc: self.report["warnings"].append(f"执行命令失败: {' '.join(args)} ({exc})") return False if result.returncode != 0: stderr = (result.stderr or "").strip() stdout = (result.stdout or "").strip() detail = stderr or stdout or f"exit={result.returncode}" self.report["warnings"].append(f"命令失败: {' '.join(args)} -> {detail}") return False return True def _copy_asset_to_models(self, source_path: str) -> str: source_path = os.path.normpath(source_path) if source_path in self._copied_source_to_uri: return self._copied_source_to_uri[source_path] ext = os.path.splitext(source_path)[1].lower() or ".bin" safe_name = self._unique_filename(Path(source_path).stem, ext) target_abs = os.path.join(self._assets_model_dir, safe_name) shutil.copy2(source_path, target_abs) rel_uri = os.path.relpath(target_abs, self._output_root).replace("\\", "/") self._copied_source_to_uri[source_path] = rel_uri self.report["copied_assets"].append( { "source": source_path, "target": rel_uri, "type": "model", } ) return rel_uri def _collect_and_copy_texture_overrides(self, node, model_source: str) -> List[Dict[str, Any]]: textures: List[Dict[str, Any]] = [] texture_pairs = self._extract_texture_stage_and_paths(node) if not texture_pairs: return textures model_dir = os.path.dirname(model_source) for stage_name, tex_path in texture_pairs: abs_path = self._resolve_texture_path(tex_path, model_dir) if not abs_path: continue rel_uri = self._copy_asset_to_textures(abs_path) if rel_uri: textures.append({"stage": stage_name, "uri": rel_uri}) return textures def _extract_texture_stage_and_paths(self, node) -> List[Tuple[str, str]]: pairs: List[Tuple[str, str]] = [] seen: set = set() nodes_to_scan = [node] try: geom_paths = node.findAllMatches("**/+GeomNode") for i in range(geom_paths.getNumPaths()): nodes_to_scan.append(geom_paths.getPath(i)) except Exception: pass for np in nodes_to_scan: try: stages = np.findAllTextureStages() except Exception: continue try: stage_count = stages.getNumTextureStages() except Exception: stage_count = 0 for idx in range(stage_count): try: stage = stages.getTextureStage(idx) texture = np.getTexture(stage) except Exception: continue if not texture: continue tex_path = "" try: if texture.hasFullpath(): tex_path = texture.getFullpath().toOsSpecific() except Exception: tex_path = "" if not tex_path: continue stage_name = stage.getName() if stage else f"stage_{idx}" key = (stage_name, tex_path) if key in seen: continue seen.add(key) pairs.append(key) return pairs def _resolve_texture_path(self, path_hint: str, model_dir: str) -> Optional[str]: path_hint = (path_hint or "").strip() if not path_hint: return None if os.path.isabs(path_hint) and os.path.exists(path_hint): return os.path.normpath(path_hint) search_roots = [ model_dir, self._project_path, os.path.join(self._project_path, "scenes", "resources"), os.getcwd(), ] for root in search_roots: full = os.path.normpath(os.path.join(root, path_hint)) if os.path.exists(full): return full return None def _copy_asset_to_textures(self, source_path: str) -> Optional[str]: source_path = os.path.normpath(source_path) cache_key = f"texture::{source_path}" if cache_key in self._copied_source_to_uri: return self._copied_source_to_uri[cache_key] ext = os.path.splitext(source_path)[1].lower() or ".png" safe_name = self._unique_filename(Path(source_path).stem, ext) target_abs = os.path.join(self._assets_texture_dir, safe_name) try: shutil.copy2(source_path, target_abs) except Exception: return None rel_uri = os.path.relpath(target_abs, self._output_root).replace("\\", "/") self._copied_source_to_uri[cache_key] = rel_uri self.report["copied_assets"].append( { "source": source_path, "target": rel_uri, "type": "texture", } ) return rel_uri def _extract_material_override(self, node) -> Dict[str, Any]: base_color = [1.0, 1.0, 1.0, 1.0] roughness = 0.5 metallic = 0.0 material = None try: material = node.getMaterial() except Exception: material = None if material: try: if material.hasBaseColor(): c = material.getBaseColor() base_color = [float(c[0]), float(c[1]), float(c[2]), float(c[3])] except Exception: pass try: roughness = float(material.getRoughness()) except Exception: pass try: metallic = float(material.getMetallic()) except Exception: pass else: try: c = node.getColor() base_color = [float(c[0]), float(c[1]), float(c[2]), float(c[3])] except Exception: pass opacity = max(0.0, min(1.0, float(base_color[3]))) if opacity >= 0.999: opacity = 1.0 elif opacity <= 0.001: opacity = 0.0 return { "base_color": base_color, "roughness": roughness, "metallic": metallic, "opacity": opacity, } def _get_parent_and_matrix(self, node) -> Tuple[Optional[str], Any]: render = getattr(self.world, "render", None) parent_id = None try: parent = node.getParent() except Exception: parent = None if parent and not parent.isEmpty() and id(parent) in self._node_id_by_pointer: parent_id = self._node_id_by_pointer[id(parent)] try: return parent_id, node.getMat(parent) except Exception: pass if render: try: return None, node.getMat(render) except Exception: pass try: return None, node.getMat() except Exception: return None, node.getTransform().getMat() @staticmethod def _mat4_to_row_major_list(mat4_obj) -> List[float]: try: values = [] for r in range(4): for c in range(4): values.append(float(mat4_obj.getCell(r, c))) return values except Exception: return [ 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, ] @staticmethod def _safe_float(value: Any, default: float) -> float: try: return float(value) except Exception: return float(default) def _unique_filename(self, stem: str, suffix: str) -> str: safe_stem = self._sanitize_filename(stem) or "asset" key = f"{safe_stem}{suffix.lower()}" index = self._name_counter.get(key, 0) self._name_counter[key] = index + 1 if index == 0: return f"{safe_stem}{suffix}" return f"{safe_stem}_{index:03d}{suffix}" @staticmethod def _sanitize_filename(name: str) -> str: normalized = re.sub(r"[^A-Za-z0-9._-]+", "_", str(name or "")).strip("._") return normalized or "asset" def _write_preview_scripts(self) -> None: sh_path = os.path.join(self._output_root, "run_preview.sh") bat_path = os.path.join(self._output_root, "run_preview.bat") sh_content = "#!/usr/bin/env bash\npython3 -m http.server 8000\n" bat_content = "@echo off\npython -m http.server 8000\n" with open(sh_path, "w", encoding="utf-8") as f: f.write(sh_content) with open(bat_path, "w", encoding="utf-8") as f: f.write(bat_content) current_mode = os.stat(sh_path).st_mode os.chmod(sh_path, current_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) @staticmethod def _write_json(path: str, payload: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) def _fail(self, message: str) -> None: self.report["status"] = "failed" self.report["warnings"].append(message)