EG/project/asset_database.py
2026-03-19 11:06:17 +08:00

355 lines
14 KiB
Python

"""Asset database and import cache helpers for MetaCore project v2."""
from __future__ import annotations
import hashlib
import json
import os
import shutil
from datetime import datetime
from panda3d.core import Filename
from project.project_schema import (
ASSET_DB_SCHEMA_VERSION,
ProjectLayout,
detect_asset_type,
generate_guid,
get_asset_subdir_for_type,
normalize_path,
relative_project_path,
)
class AssetDatabase:
def __init__(self, project_root: str, world=None):
self.layout = ProjectLayout(project_root)
self.world = world
self.data = {
"schema_version": ASSET_DB_SCHEMA_VERSION,
"assets": {},
"path_to_guid": {},
}
self.ensure_database()
def _get_all_meta_files(self) -> list[str]:
meta_files = []
if not os.path.exists(self.layout.assets_root):
return meta_files
for root, _, files in os.walk(self.layout.assets_root):
for file_name in files:
if file_name.endswith(".meta"):
meta_files.append(os.path.join(root, file_name))
return meta_files
def _rebuild_path_index(self):
rebuilt = {}
for asset_guid, asset_record in (self.data.get("assets", {}) or {}).items():
asset_path = str((asset_record or {}).get("asset_path", "") or "").replace("\\", "/")
if asset_path:
rebuilt[asset_path] = asset_guid
self.data["path_to_guid"] = rebuilt
def _sync_assets_from_meta_scan(self):
assets = self.data.setdefault("assets", {})
changed = False
for meta_path in self._get_all_meta_files():
meta_payload = self._read_meta(meta_path)
asset_guid = str(meta_payload.get("guid", "") or "").strip()
if not asset_guid:
continue
asset_path = meta_path[:-5]
if not os.path.exists(asset_path):
continue
relative_asset_path = relative_project_path(self.layout.project_root, asset_path)
if not relative_asset_path:
continue
asset_type = str(meta_payload.get("asset_type") or detect_asset_type(asset_path))
source_hash = self._hash_file(asset_path)
record = dict(assets.get(asset_guid, {}) or {})
previous_asset_path = str(record.get("asset_path", "") or "")
record.update(
{
"guid": asset_guid,
"asset_path": relative_asset_path,
"asset_type": asset_type,
"meta_path": relative_project_path(self.layout.project_root, meta_path),
"source_hash": source_hash,
"importer": str(meta_payload.get("importer") or f"{asset_type}_importer"),
"import_settings": meta_payload.get("import_settings", {}) or {},
"dependency_guids": meta_payload.get("dependency_guids", []) or [],
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
)
if asset_type == "model":
self._build_model_import_cache(record)
assets[asset_guid] = record
if previous_asset_path != relative_asset_path:
changed = True
self._rebuild_path_index()
return changed
def ensure_database(self):
os.makedirs(self.layout.library_root, exist_ok=True)
if os.path.exists(self.layout.asset_db_path):
try:
with open(self.layout.asset_db_path, "r", encoding="utf-8") as f:
payload = json.load(f)
if isinstance(payload, dict):
self.data["schema_version"] = payload.get("schema_version", ASSET_DB_SCHEMA_VERSION)
self.data["assets"] = payload.get("assets", {}) or {}
self.data["path_to_guid"] = payload.get("path_to_guid", {}) or {}
except Exception as e:
print(f"⚠️ 读取 AssetDB 失败,已重建为空数据库: {e}")
self._sync_assets_from_meta_scan()
self.save()
def save(self):
os.makedirs(os.path.dirname(self.layout.asset_db_path), exist_ok=True)
with open(self.layout.asset_db_path, "w", encoding="utf-8") as f:
json.dump(self.data, f, ensure_ascii=False, indent=4)
def get_asset(self, asset_guid: str) -> dict:
record = dict(self.data.get("assets", {}).get(asset_guid, {}) or {})
if not record:
self._sync_assets_from_meta_scan()
record = dict(self.data.get("assets", {}).get(asset_guid, {}) or {})
if not record:
return {}
asset_path = record.get("asset_path", "")
asset_abs_path = os.path.join(self.layout.project_root, asset_path.replace("/", os.sep)) if asset_path else ""
if asset_abs_path and not os.path.exists(asset_abs_path):
if self._sync_assets_from_meta_scan():
record = dict(self.data.get("assets", {}).get(asset_guid, {}) or {})
return record
def find_by_relative_path(self, relative_path: str) -> dict:
relative_path = str(relative_path or "").replace("\\", "/")
asset_guid = self.data.get("path_to_guid", {}).get(relative_path)
if not asset_guid:
self._sync_assets_from_meta_scan()
asset_guid = self.data.get("path_to_guid", {}).get(relative_path)
if not asset_guid:
return {}
return self.get_asset(asset_guid)
def find_by_absolute_path(self, asset_path: str) -> dict:
relative_path = relative_project_path(self.layout.project_root, asset_path)
if not relative_path:
return {}
return self.find_by_relative_path(relative_path)
def _hash_file(self, file_path: str) -> str:
file_path = normalize_path(file_path)
digest = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def _read_meta(self, meta_path: str) -> dict:
if not os.path.exists(meta_path):
return {}
try:
with open(meta_path, "r", encoding="utf-8") as f:
payload = json.load(f)
if isinstance(payload, dict):
return payload
except Exception as e:
print(f"⚠️ 读取资源 meta 失败 {meta_path}: {e}")
return {}
def _write_meta(self, meta_path: str, payload: dict):
os.makedirs(os.path.dirname(meta_path), exist_ok=True)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=4)
def _copy_into_assets(self, source_path: str, preferred_subdir: str = "") -> str:
source_path = normalize_path(source_path)
asset_type = detect_asset_type(source_path)
subdir = preferred_subdir or get_asset_subdir_for_type(asset_type)
target_dir = os.path.join(self.layout.assets_root, subdir)
os.makedirs(target_dir, exist_ok=True)
file_name = os.path.basename(source_path)
target_path = os.path.join(target_dir, file_name)
name_root, name_ext = os.path.splitext(file_name)
index = 1
while os.path.exists(target_path) and self._hash_file(target_path) != self._hash_file(source_path):
target_path = os.path.join(target_dir, f"{name_root}_{index}{name_ext}")
index += 1
if not os.path.exists(target_path):
shutil.copy2(source_path, target_path)
return target_path
def _build_model_import_cache(self, asset_record: dict):
if not self.world or not getattr(self.world, "loader", None):
return
asset_path = os.path.join(self.layout.project_root, asset_record["asset_path"].replace("/", os.sep))
if not os.path.exists(asset_path):
return
cache_dir = self.layout.imported_asset_dir(asset_record["guid"])
os.makedirs(cache_dir, exist_ok=True)
model_bam_path = os.path.join(cache_dir, "model.bam")
hierarchy_path = os.path.join(cache_dir, "hierarchy.json")
materials_path = os.path.join(cache_dir, "materials.json")
import_info_path = os.path.join(cache_dir, "import_info.json")
try:
model_np = self.world.loader.loadModel(Filename.fromOsSpecific(asset_path))
except Exception as e:
print(f"⚠️ 生成模型导入缓存失败 {asset_path}: {e}")
return
if not model_np or model_np.isEmpty():
return
try:
model_np.writeBamFile(Filename.fromOsSpecific(model_bam_path))
except Exception as e:
print(f"⚠️ 写入模型缓存 BAM 失败 {asset_path}: {e}")
return
hierarchy = []
materials = []
def _walk(node, parent_key=""):
try:
node_name = node.getName()
except Exception:
node_name = "node"
try:
children = list(node.getChildren())
except Exception:
children = []
node_key = parent_key
hierarchy.append(
{
"key": node_key,
"name": node_name,
"child_count": len(children),
}
)
for index, child in enumerate(children):
_walk(child, f"{node_key}/{index}" if node_key else str(index))
def _collect_materials(root_np):
for geom_np in root_np.findAllMatches("**/+GeomNode"):
try:
geom_node = geom_np.node()
except Exception:
continue
for geom_index in range(geom_node.getNumGeoms()):
materials.append(
{
"node_name": geom_np.getName(),
"geom_index": geom_index,
"material_name": "",
}
)
_walk(model_np)
_collect_materials(model_np)
with open(hierarchy_path, "w", encoding="utf-8") as f:
json.dump(hierarchy, f, ensure_ascii=False, indent=4)
with open(materials_path, "w", encoding="utf-8") as f:
json.dump(materials, f, ensure_ascii=False, indent=4)
with open(import_info_path, "w", encoding="utf-8") as f:
json.dump(
{
"asset_guid": asset_record["guid"],
"asset_path": asset_record["asset_path"],
"asset_type": asset_record["asset_type"],
"source_hash": asset_record["source_hash"],
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
f,
ensure_ascii=False,
indent=4,
)
asset_record["imported_cache"] = {
"root": relative_project_path(self.layout.project_root, cache_dir),
"model_bam": relative_project_path(self.layout.project_root, model_bam_path),
"hierarchy": relative_project_path(self.layout.project_root, hierarchy_path),
"materials": relative_project_path(self.layout.project_root, materials_path),
"import_info": relative_project_path(self.layout.project_root, import_info_path),
}
def register_asset(self, asset_path: str, preferred_subdir: str = "", copy_into_assets: bool = False) -> dict:
asset_path = normalize_path(asset_path)
if not os.path.exists(asset_path):
return {}
if copy_into_assets or not relative_project_path(self.layout.project_root, asset_path).startswith("Assets/"):
asset_path = self._copy_into_assets(asset_path, preferred_subdir=preferred_subdir)
relative_asset_path = relative_project_path(self.layout.project_root, asset_path)
if not relative_asset_path:
return {}
meta_path = self.layout.meta_path_for_asset(asset_path)
meta_payload = self._read_meta(meta_path)
asset_guid = str(meta_payload.get("guid") or generate_guid())
asset_type = str(meta_payload.get("asset_type") or detect_asset_type(asset_path))
source_hash = self._hash_file(asset_path)
record = self.data.get("assets", {}).get(asset_guid, {}) or {}
record.update(
{
"guid": asset_guid,
"asset_path": relative_asset_path,
"asset_type": asset_type,
"meta_path": relative_project_path(self.layout.project_root, meta_path),
"source_hash": source_hash,
"importer": f"{asset_type}_importer",
"import_settings": meta_payload.get("import_settings", {}) or {},
"dependency_guids": meta_payload.get("dependency_guids", []) or [],
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
)
if asset_type == "model":
self._build_model_import_cache(record)
meta_payload = {
"guid": asset_guid,
"asset_type": asset_type,
"source_hash": source_hash,
"importer": record["importer"],
"import_settings": record["import_settings"],
"dependency_guids": record["dependency_guids"],
}
self._write_meta(meta_path, meta_payload)
self.data.setdefault("assets", {})[asset_guid] = record
self._rebuild_path_index()
self.save()
return dict(record)
def import_asset(self, source_path: str, preferred_subdir: str = "") -> dict:
return self.register_asset(source_path, preferred_subdir=preferred_subdir, copy_into_assets=True)
def ensure_project_assets_registered(self):
self._sync_assets_from_meta_scan()
for root, _, files in os.walk(self.layout.assets_root):
for file_name in files:
if file_name.endswith(".meta"):
continue
asset_path = os.path.join(root, file_name)
self.register_asset(asset_path, copy_into_assets=False)