EG/core/resource_manager.py
2026-03-19 10:16:03 +08:00

549 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
资源管理器模块 - ImGui版本
提供文件浏览、图标显示、右键菜单等功能
"""
import os
import shutil
import subprocess
import platform
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Set
import time
class ResourceManager:
"""ImGui资源管理器类"""
DEFAULT_ASSET_SUBDIRS = (
"Models",
"Textures",
"Audio",
"Video",
"UI",
"Scripts",
)
def __init__(self, world):
self.world = world
self.project_root = Path(__file__).resolve().parent.parent
self.project_path: Optional[Path] = None
# 当前浏览路径,默认从统一的 Assets 结构开始
self.current_path = self._get_default_root_path()
# 历史记录,用于前进后退导航
self.navigation_history: List[Path] = [self.current_path]
self.history_index = 0
# 文件选择状态
self.selected_files: Set[Path] = set()
self.focused_file: Optional[Path] = None
# 右键菜单状态
self.show_context_menu = False
self.context_menu_file: Optional[Path] = None
self.context_menu_position = (0, 0)
# 搜索和过滤
self.search_filter = ""
self.show_hidden_files = False
# 文件系统监控
self.last_refresh_time = 0
self.refresh_interval = 1.0 # 秒
self.auto_refresh_enabled = True
self.last_directory_content = {} # 缓存目录内容,用于检测变化
# 拖拽状态
self.dragged_files: List[Path] = []
# 展开/折叠状态
self.expanded_directories: Set[Path] = set()
# 文件图标映射Unicode Emoji
self._init_icon_map()
def _get_default_root_path(self) -> Path:
assets_root = None
project_manager = getattr(self.world, "project_manager", None)
current_project_path = getattr(project_manager, "current_project_path", None)
if current_project_path:
assets_root = Path(current_project_path) / "Assets"
elif self.project_path:
assets_root = self.project_path / "Assets"
else:
assets_root = self.project_root / "Assets"
if assets_root:
self._ensure_assets_structure(assets_root)
if assets_root.exists():
return assets_root.resolve()
legacy_resources = self.project_root / "Resources"
if legacy_resources.exists():
return legacy_resources.resolve()
return self.project_root.resolve()
def _ensure_assets_structure(self, assets_root: Path):
"""确保资源浏览器默认使用统一的 Assets 目录结构。"""
try:
assets_root.mkdir(parents=True, exist_ok=True)
for subdir in self.DEFAULT_ASSET_SUBDIRS:
(assets_root / subdir).mkdir(parents=True, exist_ok=True)
except OSError:
# 目录创建失败时,保留后续兼容回退逻辑。
pass
def set_project_path(self, project_path: str):
self.project_path = Path(project_path).resolve() if project_path else None
self.current_path = self._get_default_root_path()
self.navigation_history = [self.current_path]
self.history_index = 0
self.selected_files.clear()
self.focused_file = None
def _init_icon_map(self):
"""初始化文件图标映射使用PNG图标文件"""
self.icon_map = {
# 3D模型文件
'.fbx': 'model', # FBX模型文件
'.obj': 'model', # OBJ模型文件
'.gltf': 'model', # glTF模型
'.glb': 'model', # glTF二进制模型
'.bam': 'model', # BAM模型文件
# 图像文件
'.jpg': 'image', # JPEG图像
'.jpeg': 'image', # JPEG图像
'.png': 'image', # PNG图像
'.bmp': 'image', # BMP图像
'.tga': 'image', # TGA图像
'.tif': 'image', # TIFF图像
'.tiff': 'image', # TIFF图像
'.hdr': 'image', # HDR图像
'.exr': 'image', # EXR图像
# 音频文件
'.mp3': 'audio', # MP3音频
'.wav': 'audio', # WAV音频
'.ogg': 'audio', # OGG音频
'.flac': 'audio', # FLAC音频
# 视频文件
'.mp4': 'video', # MP4视频
'.avi': 'video', # AVI视频
'.mov': 'video', # MOV视频
'.mkv': 'video', # MKV视频
# 文档文件
'.txt': 'document', # 文本文件
'.md': 'document', # Markdown文件
'.pdf': 'document', # PDF文件
'.doc': 'document', # Word文档
'.docx': 'document', # Word文档
'.rtf': 'document', # RTF文档
# 代码文件
'.py': 'python', # Python文件
'.js': 'code', # JavaScript文件
'.ts': 'code', # TypeScript文件
'.cpp': 'code', # C++文件
'.c': 'code', # C文件
'.h': 'code', # 头文件
'.java': 'code', # Java文件
'.cs': 'code', # C#文件
'.html': 'code', # HTML文件
'.css': 'code', # CSS文件
'.xml': 'code', # XML文件
'.json': 'config', # JSON文件
'.yaml': 'config', # YAML文件
'.yml': 'config', # YAML文件
# 配置文件
'.ini': 'config', # 配置文件
'.cfg': 'config', # 配置文件
'.conf': 'config', # 配置文件
'.toml': 'config', # 配置文件
# 压缩文件
'.zip': 'archive', # ZIP压缩包
'.rar': 'archive', # RAR压缩包
'.7z': 'archive', # 7Z压缩包
'.tar': 'archive', # TAR压缩包
'.gz': 'archive', # GZ压缩包
# 字体文件
'.ttf': 'font', # TrueType字体
'.otf': 'font', # OpenType字体
'.woff': 'font', # WOFF字体
'.woff2': 'font', # WOFF2字体
# 文件夹图标
'folder': 'folder', # 文件夹图标
'folder_open': 'folder', # 打开的文件夹图标
# 默认图标
'default': 'file', # 默认文件图标
}
def get_file_icon(self, filename: str, is_folder: bool = False) -> str:
"""根据文件名获取图标名称"""
if is_folder:
return self.icon_map['folder']
ext = Path(filename).suffix.lower()
return self.icon_map.get(ext, self.icon_map['default'])
def get_directory_contents(self, path: Path) -> Tuple[List[Path], List[Path]]:
"""获取目录内容,返回(目录列表, 文件列表)"""
if not path.exists() or not path.is_dir():
return [], []
dirs = []
files = []
try:
for item in path.iterdir():
# 跳过隐藏文件(除非启用显示隐藏文件)
if not self.show_hidden_files and item.name.startswith('.'):
continue
if item.is_dir():
dirs.append(item)
else:
files.append(item)
# 排序:目录和文件分别按名称排序
dirs.sort(key=lambda x: x.name.lower())
files.sort(key=lambda x: x.name.lower())
except PermissionError:
pass
return dirs, files
def navigate_to(self, path: Path):
"""导航到指定路径"""
if path.exists() and path.is_dir():
self.current_path = path.resolve()
# 更新导航历史
if self.history_index < len(self.navigation_history) - 1:
# 如果不在历史末尾,截断后面的历史
self.navigation_history = self.navigation_history[:self.history_index + 1]
self.navigation_history.append(self.current_path)
self.history_index += 1
# 清除选择状态
self.selected_files.clear()
self.focused_file = None
def navigate_back(self):
"""后退到上一个目录"""
if self.history_index > 0:
self.history_index -= 1
self.current_path = self.navigation_history[self.history_index]
self.selected_files.clear()
self.focused_file = None
def navigate_forward(self):
"""前进到下一个目录"""
if self.history_index < len(self.navigation_history) - 1:
self.history_index += 1
self.current_path = self.navigation_history[self.history_index]
self.selected_files.clear()
self.focused_file = None
def navigate_up(self):
"""导航到父目录"""
parent = self.current_path.parent
if parent != self.current_path: # 避免在根目录时循环
self.navigate_to(parent)
def navigate_to_selected(self):
"""导航到选中的目录"""
if self.focused_file and self.focused_file.is_dir():
self.navigate_to(self.focused_file)
def open_file(self, file_path: Path):
"""使用系统默认程序打开文件"""
try:
if platform.system() == "Windows":
os.startfile(str(file_path))
elif platform.system() == "Darwin": # macOS
subprocess.run(["open", str(file_path)], check=True)
else: # Linux
subprocess.run(["xdg-open", str(file_path)], check=True)
except Exception as e:
print(f"无法打开文件 {file_path}: {e}")
def get_file_size_string(self, path: Path) -> str:
"""获取文件大小的字符串表示"""
if path.is_dir():
return ""
try:
size = path.stat().st_size
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except:
return ""
def should_show_file(self, path: Path) -> bool:
"""判断文件是否应该显示(基于搜索过滤)"""
if not self.search_filter:
return True
return self.search_filter.lower() in path.name.lower()
def toggle_directory_expansion(self, path: Path):
"""切换目录展开/折叠状态"""
if path in self.expanded_directories:
self.expanded_directories.remove(path)
else:
self.expanded_directories.add(path)
def is_directory_expanded(self, path: Path) -> bool:
"""检查目录是否展开"""
return path in self.expanded_directories
def refresh_if_needed(self):
"""如果需要则刷新目录内容"""
if not self.auto_refresh_enabled:
return False
current_time = time.time()
if current_time - self.last_refresh_time > self.refresh_interval:
self.last_refresh_time = current_time
# 检查目录内容是否发生变化
if self._has_directory_changed():
return True
return False
def _has_directory_changed(self) -> bool:
"""检查目录内容是否发生变化"""
try:
# 获取当前目录内容
dirs, files = self.get_directory_contents(self.current_path)
current_content = {
'dirs': {d.name: d.stat().st_mtime for d in dirs},
'files': {f.name: f.stat().st_mtime for f in files}
}
# 获取缓存的内容
cache_key = str(self.current_path)
cached_content = self.last_directory_content.get(cache_key)
if cached_content is None:
# 首次访问,缓存内容
self.last_directory_content[cache_key] = current_content
return False
# 比较内容
if cached_content != current_content:
# 内容发生变化,更新缓存
self.last_directory_content[cache_key] = current_content
return True
return False
except Exception as e:
print(f"检查目录变化时出错: {e}")
return False
def force_refresh(self):
"""强制刷新当前目录"""
cache_key = str(self.current_path)
if cache_key in self.last_directory_content:
del self.last_directory_content[cache_key]
self.last_refresh_time = 0
def set_auto_refresh(self, enabled: bool):
"""设置自动刷新开关"""
self.auto_refresh_enabled = enabled
if enabled:
self.force_refresh()
def get_project_root(self) -> Path:
"""获取项目根目录"""
current = Path(__file__).resolve()
while current.parent != current:
if (current / "main.py").exists() or (current / ".git").exists():
return current
current = current.parent
return Path(__file__).resolve().parent.parent
def get_relative_path(self, path: Path) -> str:
"""获取相对于项目根目录的路径"""
try:
return str(path.relative_to(self.project_root))
except ValueError:
return str(path)
def _normalize_target_dir(self, target_dir: Optional[Path]) -> Path:
"""归一化目标目录,不合法时回退到当前目录。"""
candidate = Path(target_dir) if target_dir else self.current_path
if candidate.exists() and candidate.is_dir():
return candidate.resolve()
if self.current_path.exists() and self.current_path.is_dir():
return self.current_path.resolve()
return self.project_root.resolve()
def _get_unique_destination_path(self, target_dir: Path, source_name: str) -> Path:
"""生成不冲突的目标路径。"""
source_path = Path(source_name)
stem = source_path.stem or source_path.name or "item"
suffix = source_path.suffix
candidate = target_dir / source_path.name
if not candidate.exists():
return candidate
index = 1
while True:
if suffix:
unique_name = f"{stem}_{index}{suffix}"
else:
unique_name = f"{stem}_{index}"
candidate = target_dir / unique_name
if not candidate.exists():
return candidate
index += 1
def import_external_files(self, file_paths: List[Path], target_dir: Optional[Path] = None):
"""将外部文件导入资源目录(复制),返回(成功列表, 错误列表)。"""
destination_root = self._normalize_target_dir(target_dir)
destination_root.mkdir(parents=True, exist_ok=True)
imported = []
errors = []
for path in file_paths:
src = Path(path)
if not src.exists():
errors.append(f"文件不存在: {src}")
continue
dst = self._get_unique_destination_path(destination_root, src.name)
try:
if src.is_dir():
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
if src.suffix.lower() == '.fbx':
fbm_src = src.with_name(src.stem + '.fbm')
if fbm_src.exists() and fbm_src.is_dir():
fbm_dst = destination_root / fbm_src.name
if not fbm_dst.exists():
shutil.copytree(fbm_src, fbm_dst)
imported.append(dst)
except Exception as e:
errors.append(f"导入失败 {src}: {e}")
if imported:
self.force_refresh()
return imported, errors
def move_files_to_directory(self, file_paths: List[Path], target_dir: Optional[Path]):
"""资源管理器内部移动文件/文件夹,返回(成功列表, 错误列表)。"""
destination_root = self._normalize_target_dir(target_dir)
destination_root.mkdir(parents=True, exist_ok=True)
moved = []
errors = []
moved_map = {}
destination_root_resolved = destination_root.resolve()
for path in file_paths:
src = Path(path)
if not src.exists():
errors.append(f"源文件不存在: {src}")
continue
try:
src_resolved = src.resolve()
except Exception:
src_resolved = src
if src_resolved == destination_root_resolved:
continue
if src_resolved.parent == destination_root_resolved:
continue
if src.is_dir():
try:
if destination_root_resolved.is_relative_to(src_resolved):
errors.append(f"不能将目录移动到其子目录中: {src}")
continue
except Exception:
# Python 兼容保护is_relative_to 不可用时跳过该校验
pass
dst = self._get_unique_destination_path(destination_root, src.name)
try:
shutil.move(str(src), str(dst))
moved.append(dst)
moved_map[src_resolved] = dst.resolve()
except Exception as e:
errors.append(f"移动失败 {src}: {e}")
if moved:
updated_selection = set()
for selected in self.selected_files:
try:
selected_resolved = selected.resolve()
except Exception:
selected_resolved = selected
if selected_resolved in moved_map:
updated_selection.add(moved_map[selected_resolved])
elif selected.exists():
updated_selection.add(selected)
self.selected_files = updated_selection
if self.focused_file:
try:
focused_resolved = self.focused_file.resolve()
except Exception:
focused_resolved = self.focused_file
if focused_resolved in moved_map:
self.focused_file = moved_map[focused_resolved]
self.force_refresh()
return moved, errors
def start_drag(self, files: List[Path]):
"""开始拖拽操作"""
self.dragged_files = [Path(f) for f in files if Path(f).exists()]
def clear_drag(self):
"""清除拖拽状态"""
self.dragged_files.clear()
def is_dragging(self) -> bool:
"""检查是否正在拖拽"""
return len(self.dragged_files) > 0
def get_dragged_files(self) -> List[Path]:
"""获取当前拖拽的文件列表"""
return self.dragged_files.copy()
def can_drag_to_scene(self) -> bool:
"""检查是否可以拖拽到场景中"""
if not self.is_dragging():
return False
# 检查是否有支持的3D模型文件
supported_extensions = {'.gltf', '.glb', '.fbx', '.bam', '.egg', '.obj'}
for file_path in self.dragged_files:
if file_path.suffix.lower() in supported_extensions:
return True
return False