EG/core/model_drag_drop.py

581 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
模型拖拽服务
集中管理:
1. 系统外部文件拖入窗口Windows WM_DROPFILES
2. 资源管理器内部拖拽(移动资源、拖入场景、拖入场景树)
3. 模型导入后的命名冲突处理
"""
from __future__ import annotations
import ctypes
import os
from pathlib import Path
from typing import List, Optional, Tuple
from imgui_bundle import imgui
try:
from ctypes import wintypes
except Exception: # pragma: no cover - non-Windows fallback
wintypes = None
class NativeFileDropMonitor:
"""Windows 文件拖拽桥接,收集外部文件拖入事件。"""
WM_DROPFILES = 0x0233
WM_NCDESTROY = 0x0082
GWL_WNDPROC = -4
def __init__(self, world):
self.world = world
self.enabled = False
self._hwnd = None
self._window_proc = None
self._old_wndproc = None
self._set_wndproc = None
self._user32 = None
self._shell32 = None
self._pending_events = []
self._hook_wndproc_ptr = None
def start(self):
"""启动系统文件拖拽捕获。"""
if self.enabled:
return
if os.environ.get("EG_DISABLE_NATIVE_DROP") == "1":
print("DragDropMonitor: 已通过 EG_DISABLE_NATIVE_DROP 禁用原生拖拽")
return
if os.name != "nt" or wintypes is None:
print("DragDropMonitor: 非 Windows 平台,未启用系统拖拽桥接")
return
try:
win_handle = self.world.win.getWindowHandle() if self.world and self.world.win else None
if not win_handle:
return
self._hwnd = int(win_handle.getIntHandle())
if not self._hwnd:
return
self._user32 = ctypes.windll.user32
self._shell32 = ctypes.windll.shell32
long_ptr_t = ctypes.c_ssize_t
wparam_t = wintypes.WPARAM
lparam_t = wintypes.LPARAM
wndproc_t = ctypes.WINFUNCTYPE(
long_ptr_t, wintypes.HWND, wintypes.UINT, wparam_t, lparam_t
)
if ctypes.sizeof(ctypes.c_void_p) == 8:
self._set_wndproc = self._user32.SetWindowLongPtrW
get_wndproc = self._user32.GetWindowLongPtrW
else:
self._set_wndproc = self._user32.SetWindowLongW
get_wndproc = self._user32.GetWindowLongW
self._set_wndproc.argtypes = [wintypes.HWND, ctypes.c_int, long_ptr_t]
self._set_wndproc.restype = long_ptr_t
get_wndproc.argtypes = [wintypes.HWND, ctypes.c_int]
get_wndproc.restype = long_ptr_t
self._user32.CallWindowProcW.argtypes = [
ctypes.c_void_p, wintypes.HWND, wintypes.UINT, wparam_t, lparam_t
]
self._user32.CallWindowProcW.restype = long_ptr_t
self._user32.IsWindow.argtypes = [wintypes.HWND]
self._user32.IsWindow.restype = wintypes.BOOL
self._shell32.DragQueryFileW.argtypes = [wintypes.HANDLE, wintypes.UINT, wintypes.LPWSTR, wintypes.UINT]
self._shell32.DragQueryFileW.restype = wintypes.UINT
self._shell32.DragQueryPoint.argtypes = [wintypes.HANDLE, ctypes.POINTER(wintypes.POINT)]
self._shell32.DragQueryPoint.restype = wintypes.BOOL
self._shell32.DragFinish.argtypes = [wintypes.HANDLE]
self._shell32.DragFinish.restype = None
self._shell32.DragAcceptFiles.argtypes = [wintypes.HWND, wintypes.BOOL]
self._shell32.DragAcceptFiles.restype = None
self._old_wndproc = get_wndproc(wintypes.HWND(self._hwnd), self.GWL_WNDPROC)
if not self._old_wndproc:
raise RuntimeError("GetWindowLongPtrW returned null WndProc")
def _window_proc(hwnd, msg, wparam, lparam):
try:
if msg == self.WM_DROPFILES:
files, drop_pos = self._extract_drop_files(wparam)
if files:
self._pending_events.append((files, drop_pos))
return 0
if msg == self.WM_NCDESTROY:
result = self._user32.CallWindowProcW(
ctypes.c_void_p(self._old_wndproc), hwnd, msg, wparam, lparam
)
self.enabled = False
self._hwnd = None
self._old_wndproc = None
self._hook_wndproc_ptr = None
return result
except Exception as cb_err:
print(f"[DragDropMonitor] 回调异常: {cb_err}")
if not self._old_wndproc:
return 0
return self._user32.CallWindowProcW(
ctypes.c_void_p(self._old_wndproc), hwnd, msg, wparam, lparam
)
self._window_proc = wndproc_t(_window_proc)
self._hook_wndproc_ptr = ctypes.cast(self._window_proc, ctypes.c_void_p).value
if not self._hook_wndproc_ptr:
raise RuntimeError("无法获取 WndProc 回调指针")
self._set_wndproc(
wintypes.HWND(self._hwnd),
self.GWL_WNDPROC,
self._hook_wndproc_ptr,
)
installed_wndproc = get_wndproc(wintypes.HWND(self._hwnd), self.GWL_WNDPROC)
if int(installed_wndproc) != int(self._hook_wndproc_ptr):
raise RuntimeError("WndProc 安装校验失败")
self._shell32.DragAcceptFiles(wintypes.HWND(self._hwnd), True)
self.enabled = True
print("拖拽监控已启用")
except Exception as e:
print(f"拖拽监控启动失败: {e}")
self.enabled = False
def stop(self):
"""恢复窗口过程并停止拖拽捕获。"""
if not self.enabled or os.name != "nt" or wintypes is None:
return
try:
if self._shell32 and self._hwnd:
self._shell32.DragAcceptFiles(wintypes.HWND(self._hwnd), False)
if (
self._set_wndproc
and self._hwnd
and self._old_wndproc
and self._user32
and self._user32.IsWindow(wintypes.HWND(self._hwnd))
):
self._set_wndproc(
wintypes.HWND(self._hwnd),
self.GWL_WNDPROC,
self._old_wndproc,
)
except Exception:
pass
finally:
self.enabled = False
self._window_proc = None
self._hook_wndproc_ptr = None
def _extract_drop_files(self, hdrop):
"""从 Win32 HDROP 中提取文件路径和投放点。"""
files = []
drop_pos = None
if not self._shell32 or wintypes is None:
return files, drop_pos
hdrop_handle = wintypes.HANDLE(int(hdrop))
if not hdrop_handle:
return files, drop_pos
try:
file_count = self._shell32.DragQueryFileW(hdrop_handle, 0xFFFFFFFF, None, 0)
for i in range(file_count):
path_len = self._shell32.DragQueryFileW(hdrop_handle, i, None, 0)
buffer = ctypes.create_unicode_buffer(path_len + 1)
self._shell32.DragQueryFileW(hdrop_handle, i, buffer, path_len + 1)
files.append(buffer.value)
point = wintypes.POINT()
if self._shell32.DragQueryPoint(hdrop_handle, ctypes.byref(point)):
drop_pos = (int(point.x), int(point.y))
except Exception as e:
print(f"[DragDropMonitor] 解析拖拽数据失败: {e}")
finally:
try:
self._shell32.DragFinish(hdrop_handle)
except Exception:
pass
return files, drop_pos
def add_file_from_external(self, file_path):
"""手动压入外部文件(保留给其他集成调用)。"""
if file_path:
self._pending_events.append(([str(file_path)], None))
def pop_pending_events(self):
"""弹出并清空待处理的外部拖拽事件。"""
events = list(self._pending_events)
self._pending_events.clear()
return events
class ModelDragDropService:
"""模型拖拽业务入口。"""
SUPPORTED_MODEL_EXTS = {".gltf", ".glb", ".fbx", ".bam", ".egg", ".obj"}
def __init__(self, app):
self.app = app
def setup_drag_drop_support(self):
"""初始化系统外部拖拽支持。"""
try:
monitor = NativeFileDropMonitor(self.app)
monitor.start()
self.app.drag_drop_monitor = monitor
except Exception as e:
print(f"拖拽监控启动失败: {e}")
def is_point_in_resource_manager(self, drop_pos):
"""判断投放坐标是否命中资源管理器窗口。"""
if not drop_pos:
return False
rect = getattr(self.app, "_resource_manager_window_rect", None)
if not rect:
return False
x, y = drop_pos
rx, ry, rw, rh = rect
return (rx <= x <= rx + rw) and (ry <= y <= ry + rh)
def resolve_resource_drop_target_dir(self, drop_pos):
"""根据投放坐标解析资源管理器中的目标目录。"""
rm = getattr(self.app, "resource_manager", None)
default_dir = rm.current_path if rm else None
targets = getattr(self.app, "_resource_drop_targets", None) or []
if not drop_pos or not targets:
return default_dir
x, y = drop_pos
hits = []
for tx, ty, tw, th, path in targets:
if tx <= x <= tx + tw and ty <= y <= ty + th:
hits.append((tw * th, path))
if not hits:
return default_dir
hits.sort(key=lambda item: item[0])
target_path = Path(hits[0][1])
if target_path.exists() and target_path.is_dir():
return target_path
return default_dir
def process_external_drop_events(self):
"""处理外部拖入(系统文件拖拽)事件。"""
monitor = getattr(self.app, "drag_drop_monitor", None)
if not monitor:
return
events = monitor.pop_pending_events()
if not events:
return
for file_paths, drop_pos in events:
self.handle_external_drop(file_paths, drop_pos)
def handle_external_drop(self, file_paths, drop_pos=None):
"""把外部拖入文件分发到资源管理器或场景。"""
if not file_paths:
return
paths = [Path(p) for p in file_paths if p]
if not paths:
return
if self.is_point_in_resource_manager(drop_pos):
target_dir = self.resolve_resource_drop_target_dir(drop_pos)
imported, errors = self.app.resource_manager.import_external_files(paths, target_dir=target_dir)
if imported:
target_label = self.app.resource_manager.get_relative_path(target_dir) if target_dir else "资源管理器"
self.app.add_success_message(f"已导入 {len(imported)} 个外部资源到 {target_label}")
if errors:
self.app.add_warning_message(f"{len(errors)} 个资源导入失败,请查看控制台日志")
for err in errors:
print(f"[资源导入] {err}")
return
imported_models = []
for path in paths:
if not path.exists() or not self._is_supported_model(path):
continue
try:
model_node = self.app._import_model_with_menu_logic(
str(path),
select_model=False,
set_origin=True,
show_info_message=False,
show_success_message=False,
)
if not model_node:
continue
imported_models.append(model_node)
except Exception as e:
print(f"[外部拖拽] 导入模型失败 {path}: {e}")
if imported_models:
if getattr(self.app, "selection", None):
try:
self.app.selection.updateSelection(imported_models[-1])
except Exception as e:
print(f"[外部拖拽] 更新选择失败: {e}")
self.app.add_success_message(f"外部拖拽导入场景成功: {len(imported_models)} 个模型")
else:
self.app.add_info_message("将文件拖放到“资源管理器”窗口可导入到项目资源")
def draw_drag_drop_interface(self):
"""每帧处理资源管理器内部拖拽的释放事件。"""
rm = getattr(self.app, "resource_manager", None)
if not rm:
return
if rm.is_dragging():
self.app.is_dragging = True
self.app.dragged_files = rm.get_dragged_files()
if self.app.is_dragging and self.app.dragged_files and imgui.is_mouse_released(0):
mouse_pos = imgui.get_mouse_pos()
self.handle_internal_drag_drop_completion((float(mouse_pos.x), float(mouse_pos.y)))
def handle_internal_drag_drop_completion(self, mouse_pos: Optional[Tuple[float, float]] = None):
"""处理资源管理器内部拖拽落点。"""
rm = self.app.resource_manager
dragged_files = list(self.app.dragged_files) if self.app.dragged_files else rm.get_dragged_files()
if not dragged_files:
self._reset_drag_state()
return
drop_kind, target_dir, target_parent = self._resolve_drop_target(mouse_pos)
if drop_kind == "resource_manager":
moved, errors = rm.move_files_to_directory(dragged_files, target_dir)
if moved:
label = rm.get_relative_path(target_dir) if target_dir else "资源管理器"
self.app.add_success_message(f"已移动 {len(moved)} 个资源到 {label}")
if errors:
self.app.add_warning_message(f"{len(errors)} 个资源移动失败,请查看控制台")
for err in errors:
print(f"[资源移动] {err}")
self._reset_drag_state()
return
if drop_kind not in ("scene_view", "scene_tree"):
self._reset_drag_state()
return
imported_models = []
for file_path in dragged_files:
path = Path(file_path)
if not self._is_supported_model(path):
continue
try:
model_node = self.app._import_model_with_menu_logic(
str(path),
select_model=False,
set_origin=True,
show_info_message=False,
show_success_message=False,
)
if not model_node:
continue
if drop_kind == "scene_tree" and self._is_valid_drop_parent(target_parent):
try:
model_node.wrtReparentTo(target_parent)
except Exception:
model_node.reparentTo(target_parent)
imported_models.append(model_node)
except Exception as e:
print(f"[拖拽导入] 导入失败 {path}: {e}")
if imported_models and getattr(self.app, "selection", None):
try:
self.app.selection.updateSelection(imported_models[-1])
except Exception:
pass
if imported_models:
self.app.add_success_message(f"拖拽导入成功: {len(imported_models)} 个模型")
self._reset_drag_state()
def add_dragged_file(self, file_path):
"""添加拖拽的文件。"""
if file_path not in self.app.dragged_files:
self.app.dragged_files.append(file_path)
self.app.is_dragging = True
self.app.show_drag_overlay = True
print(f"检测到拖拽文件: {file_path}")
def clear_dragged_files(self):
"""清空拖拽文件列表。"""
self.app.dragged_files.clear()
self.app.is_dragging = False
self.app.show_drag_overlay = False
def process_dragged_files(self):
"""处理拖拽的文件。"""
if not self.app.dragged_files:
return
imported_count = 0
for file_path in self.app.dragged_files:
if self.import_model_from_path(file_path):
imported_count += 1
if imported_count > 0:
self.app.add_success_message(f"成功导入 {imported_count} 个模型文件")
else:
self.app.add_error_message("没有成功导入任何文件")
self.clear_dragged_files()
def import_model_from_path(self, file_path):
"""从路径导入模型的内部方法。"""
try:
path = Path(file_path)
if not path.exists():
self.app.add_error_message(f"文件不存在: {path}")
return False
if not self._is_supported_model(path):
self.app.add_error_message(f"不支持的文件格式: {path.suffix.lower()}")
return False
model_node = self.app._import_model_with_menu_logic(str(path))
if not model_node:
return False
return True
except Exception as e:
self.app.add_error_message(f"导入模型时发生错误: {e}")
return False
@staticmethod
def draw_drag_overlay():
"""兼容接口Unity 风格不再绘制拖拽遮罩。"""
return
@staticmethod
def draw_drag_status():
"""兼容接口Unity 风格不再绘制拖拽状态窗。"""
return
def _postprocess_imported_model(self, model_node, set_origin=False):
if getattr(self.app, "scene_manager", None):
try:
self.app.scene_manager.processMaterials(model_node)
except Exception:
pass
if set_origin:
try:
model_node.setPos(0, 0, 0)
except Exception:
pass
self._ensure_unique_name(model_node)
def _resolve_drop_target(self, mouse_pos: Optional[Tuple[float, float]]):
point = mouse_pos
if point is None:
m = imgui.get_mouse_pos()
point = (float(m.x), float(m.y))
if self._point_in_rect(point, getattr(self.app, "_resource_manager_window_rect", None)):
target_dir = self.resolve_resource_drop_target_dir(point)
return "resource_manager", target_dir, None
if self._point_in_rect(point, getattr(self.app, "_scene_tree_window_rect", None)):
return "scene_tree", None, getattr(self.app, "_drag_scene_tree_hover_node", None)
for rect_name in (
"_property_panel_window_rect",
"_script_panel_window_rect",
"_console_window_rect",
"_toolbar_window_rect",
):
if self._point_in_rect(point, getattr(self.app, rect_name, None)):
return "ui_panel", None, None
if imgui.get_io().want_capture_mouse:
return "ui_panel", None, None
return "scene_view", None, None
@staticmethod
def _point_in_rect(point, rect):
if not point or not rect:
return False
x, y = point
rx, ry, rw, rh = rect
return rx <= x <= rx + rw and ry <= y <= ry + rh
@classmethod
def _is_supported_model(cls, path: Path):
return path.suffix.lower() in cls.SUPPORTED_MODEL_EXTS
@staticmethod
def _is_valid_drop_parent(node):
if not node:
return False
try:
return not node.isEmpty()
except Exception:
return False
@staticmethod
def _ensure_unique_name(model_node):
if not model_node or model_node.isEmpty():
return
parent = model_node.getParent()
if not parent or parent.isEmpty():
return
current_name = model_node.getName() or "model"
stem, suffix = os.path.splitext(current_name)
if not stem:
stem = current_name
existing_names = set()
try:
siblings = parent.getChildren()
for i in range(siblings.getNumPaths()):
sibling = siblings.getPath(i)
if sibling == model_node:
continue
existing_names.add(sibling.getName())
except Exception:
return
if current_name not in existing_names:
return
index = 1
while True:
candidate = f"{stem}_{index}{suffix}" if suffix else f"{stem}_{index}"
if candidate not in existing_names:
model_node.setName(candidate)
return
index += 1
def _reset_drag_state(self):
self.app.is_dragging = False
self.app.dragged_files.clear()
self.app.show_drag_overlay = False
if getattr(self.app, "resource_manager", None):
self.app.resource_manager.clear_drag()
self.app._drag_scene_tree_hover_node = None