"""Asset database and import cache helpers for MetaCore project v2.""" from __future__ import annotations import hashlib import json import os import shutil from datetime import datetime from panda3d.core import Filename from project.project_schema import ( ASSET_DB_SCHEMA_VERSION, ProjectLayout, detect_asset_type, generate_guid, get_asset_subdir_for_type, normalize_path, relative_project_path, ) class AssetDatabase: def __init__(self, project_root: str, world=None): self.layout = ProjectLayout(project_root) self.world = world self.data = { "schema_version": ASSET_DB_SCHEMA_VERSION, "assets": {}, "path_to_guid": {}, } self.ensure_database() def _get_all_meta_files(self) -> list[str]: meta_files = [] if not os.path.exists(self.layout.assets_root): return meta_files for root, _, files in os.walk(self.layout.assets_root): for file_name in files: if file_name.endswith(".meta"): meta_files.append(os.path.join(root, file_name)) return meta_files def _rebuild_path_index(self): rebuilt = {} for asset_guid, asset_record in (self.data.get("assets", {}) or {}).items(): asset_path = str((asset_record or {}).get("asset_path", "") or "").replace("\\", "/") if asset_path: rebuilt[asset_path] = asset_guid self.data["path_to_guid"] = rebuilt def _sync_assets_from_meta_scan(self): assets = self.data.setdefault("assets", {}) changed = False for meta_path in self._get_all_meta_files(): meta_payload = self._read_meta(meta_path) asset_guid = str(meta_payload.get("guid", "") or "").strip() if not asset_guid: continue asset_path = meta_path[:-5] if not os.path.exists(asset_path): continue relative_asset_path = relative_project_path(self.layout.project_root, asset_path) if not relative_asset_path: continue asset_type = str(meta_payload.get("asset_type") or detect_asset_type(asset_path)) source_hash = self._hash_file(asset_path) record = dict(assets.get(asset_guid, {}) or {}) previous_asset_path = str(record.get("asset_path", "") or "") record.update( { "guid": asset_guid, "asset_path": relative_asset_path, "asset_type": asset_type, "meta_path": relative_project_path(self.layout.project_root, meta_path), "source_hash": source_hash, "importer": str(meta_payload.get("importer") or f"{asset_type}_importer"), "import_settings": meta_payload.get("import_settings", {}) or {}, "dependency_guids": meta_payload.get("dependency_guids", []) or [], "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } ) if asset_type == "model": self._build_model_import_cache(record) assets[asset_guid] = record if previous_asset_path != relative_asset_path: changed = True self._rebuild_path_index() return changed def ensure_database(self): os.makedirs(self.layout.library_root, exist_ok=True) if os.path.exists(self.layout.asset_db_path): try: with open(self.layout.asset_db_path, "r", encoding="utf-8") as f: payload = json.load(f) if isinstance(payload, dict): self.data["schema_version"] = payload.get("schema_version", ASSET_DB_SCHEMA_VERSION) self.data["assets"] = payload.get("assets", {}) or {} self.data["path_to_guid"] = payload.get("path_to_guid", {}) or {} except Exception as e: print(f"⚠️ 读取 AssetDB 失败,已重建为空数据库: {e}") self._sync_assets_from_meta_scan() self.save() def save(self): os.makedirs(os.path.dirname(self.layout.asset_db_path), exist_ok=True) with open(self.layout.asset_db_path, "w", encoding="utf-8") as f: json.dump(self.data, f, ensure_ascii=False, indent=4) def get_asset(self, asset_guid: str) -> dict: record = dict(self.data.get("assets", {}).get(asset_guid, {}) or {}) if not record: self._sync_assets_from_meta_scan() record = dict(self.data.get("assets", {}).get(asset_guid, {}) or {}) if not record: return {} asset_path = record.get("asset_path", "") asset_abs_path = os.path.join(self.layout.project_root, asset_path.replace("/", os.sep)) if asset_path else "" if asset_abs_path and not os.path.exists(asset_abs_path): if self._sync_assets_from_meta_scan(): record = dict(self.data.get("assets", {}).get(asset_guid, {}) or {}) return record def find_by_relative_path(self, relative_path: str) -> dict: relative_path = str(relative_path or "").replace("\\", "/") asset_guid = self.data.get("path_to_guid", {}).get(relative_path) if not asset_guid: self._sync_assets_from_meta_scan() asset_guid = self.data.get("path_to_guid", {}).get(relative_path) if not asset_guid: return {} return self.get_asset(asset_guid) def find_by_absolute_path(self, asset_path: str) -> dict: relative_path = relative_project_path(self.layout.project_root, asset_path) if not relative_path: return {} return self.find_by_relative_path(relative_path) def _hash_file(self, file_path: str) -> str: file_path = normalize_path(file_path) digest = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def _read_meta(self, meta_path: str) -> dict: if not os.path.exists(meta_path): return {} try: with open(meta_path, "r", encoding="utf-8") as f: payload = json.load(f) if isinstance(payload, dict): return payload except Exception as e: print(f"⚠️ 读取资源 meta 失败 {meta_path}: {e}") return {} def _write_meta(self, meta_path: str, payload: dict): os.makedirs(os.path.dirname(meta_path), exist_ok=True) with open(meta_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=4) def _copy_into_assets(self, source_path: str, preferred_subdir: str = "") -> str: source_path = normalize_path(source_path) asset_type = detect_asset_type(source_path) subdir = preferred_subdir or get_asset_subdir_for_type(asset_type) target_dir = os.path.join(self.layout.assets_root, subdir) os.makedirs(target_dir, exist_ok=True) file_name = os.path.basename(source_path) target_path = os.path.join(target_dir, file_name) name_root, name_ext = os.path.splitext(file_name) index = 1 while os.path.exists(target_path) and self._hash_file(target_path) != self._hash_file(source_path): target_path = os.path.join(target_dir, f"{name_root}_{index}{name_ext}") index += 1 if not os.path.exists(target_path): shutil.copy2(source_path, target_path) return target_path def _build_model_import_cache(self, asset_record: dict): if not self.world or not getattr(self.world, "loader", None): return asset_path = os.path.join(self.layout.project_root, asset_record["asset_path"].replace("/", os.sep)) if not os.path.exists(asset_path): return cache_dir = self.layout.imported_asset_dir(asset_record["guid"]) os.makedirs(cache_dir, exist_ok=True) model_bam_path = os.path.join(cache_dir, "model.bam") hierarchy_path = os.path.join(cache_dir, "hierarchy.json") materials_path = os.path.join(cache_dir, "materials.json") import_info_path = os.path.join(cache_dir, "import_info.json") try: model_np = self.world.loader.loadModel(Filename.fromOsSpecific(asset_path)) except Exception as e: print(f"⚠️ 生成模型导入缓存失败 {asset_path}: {e}") return if not model_np or model_np.isEmpty(): return try: model_np.writeBamFile(Filename.fromOsSpecific(model_bam_path)) except Exception as e: print(f"⚠️ 写入模型缓存 BAM 失败 {asset_path}: {e}") return hierarchy = [] materials = [] def _walk(node, parent_key=""): try: node_name = node.getName() except Exception: node_name = "node" try: children = list(node.getChildren()) except Exception: children = [] node_key = parent_key hierarchy.append( { "key": node_key, "name": node_name, "child_count": len(children), } ) for index, child in enumerate(children): _walk(child, f"{node_key}/{index}" if node_key else str(index)) def _collect_materials(root_np): for geom_np in root_np.findAllMatches("**/+GeomNode"): try: geom_node = geom_np.node() except Exception: continue for geom_index in range(geom_node.getNumGeoms()): materials.append( { "node_name": geom_np.getName(), "geom_index": geom_index, "material_name": "", } ) _walk(model_np) _collect_materials(model_np) with open(hierarchy_path, "w", encoding="utf-8") as f: json.dump(hierarchy, f, ensure_ascii=False, indent=4) with open(materials_path, "w", encoding="utf-8") as f: json.dump(materials, f, ensure_ascii=False, indent=4) with open(import_info_path, "w", encoding="utf-8") as f: json.dump( { "asset_guid": asset_record["guid"], "asset_path": asset_record["asset_path"], "asset_type": asset_record["asset_type"], "source_hash": asset_record["source_hash"], "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, f, ensure_ascii=False, indent=4, ) asset_record["imported_cache"] = { "root": relative_project_path(self.layout.project_root, cache_dir), "model_bam": relative_project_path(self.layout.project_root, model_bam_path), "hierarchy": relative_project_path(self.layout.project_root, hierarchy_path), "materials": relative_project_path(self.layout.project_root, materials_path), "import_info": relative_project_path(self.layout.project_root, import_info_path), } def register_asset(self, asset_path: str, preferred_subdir: str = "", copy_into_assets: bool = False) -> dict: asset_path = normalize_path(asset_path) if not os.path.exists(asset_path): return {} if copy_into_assets or not relative_project_path(self.layout.project_root, asset_path).startswith("Assets/"): asset_path = self._copy_into_assets(asset_path, preferred_subdir=preferred_subdir) relative_asset_path = relative_project_path(self.layout.project_root, asset_path) if not relative_asset_path: return {} meta_path = self.layout.meta_path_for_asset(asset_path) meta_payload = self._read_meta(meta_path) asset_guid = str(meta_payload.get("guid") or generate_guid()) asset_type = str(meta_payload.get("asset_type") or detect_asset_type(asset_path)) source_hash = self._hash_file(asset_path) record = self.data.get("assets", {}).get(asset_guid, {}) or {} record.update( { "guid": asset_guid, "asset_path": relative_asset_path, "asset_type": asset_type, "meta_path": relative_project_path(self.layout.project_root, meta_path), "source_hash": source_hash, "importer": f"{asset_type}_importer", "import_settings": meta_payload.get("import_settings", {}) or {}, "dependency_guids": meta_payload.get("dependency_guids", []) or [], "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } ) if asset_type == "model": self._build_model_import_cache(record) meta_payload = { "guid": asset_guid, "asset_type": asset_type, "source_hash": source_hash, "importer": record["importer"], "import_settings": record["import_settings"], "dependency_guids": record["dependency_guids"], } self._write_meta(meta_path, meta_payload) self.data.setdefault("assets", {})[asset_guid] = record self._rebuild_path_index() self.save() return dict(record) def import_asset(self, source_path: str, preferred_subdir: str = "") -> dict: return self.register_asset(source_path, preferred_subdir=preferred_subdir, copy_into_assets=True) def ensure_project_assets_registered(self): self._sync_assets_from_meta_scan() for root, _, files in os.walk(self.layout.assets_root): for file_name in files: if file_name.endswith(".meta"): continue asset_path = os.path.join(root, file_name) self.register_asset(asset_path, copy_into_assets=False)