581 lines
21 KiB
Python
581 lines
21 KiB
Python
"""
|
||
模型拖拽服务
|
||
|
||
集中管理:
|
||
1. 系统外部文件拖入窗口(Windows WM_DROPFILES)
|
||
2. 资源管理器内部拖拽(移动资源、拖入场景、拖入场景树)
|
||
3. 模型导入后的命名冲突处理
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import ctypes
|
||
import os
|
||
from pathlib import Path
|
||
from typing import List, Optional, Tuple
|
||
|
||
from imgui_bundle import imgui
|
||
|
||
try:
|
||
from ctypes import wintypes
|
||
except Exception: # pragma: no cover - non-Windows fallback
|
||
wintypes = None
|
||
|
||
|
||
class NativeFileDropMonitor:
|
||
"""Windows 文件拖拽桥接,收集外部文件拖入事件。"""
|
||
|
||
WM_DROPFILES = 0x0233
|
||
WM_NCDESTROY = 0x0082
|
||
GWL_WNDPROC = -4
|
||
|
||
def __init__(self, world):
|
||
self.world = world
|
||
self.enabled = False
|
||
self._hwnd = None
|
||
self._window_proc = None
|
||
self._old_wndproc = None
|
||
self._set_wndproc = None
|
||
self._user32 = None
|
||
self._shell32 = None
|
||
self._pending_events = []
|
||
self._hook_wndproc_ptr = None
|
||
|
||
def start(self):
|
||
"""启动系统文件拖拽捕获。"""
|
||
if self.enabled:
|
||
return
|
||
|
||
if os.environ.get("EG_DISABLE_NATIVE_DROP") == "1":
|
||
print("DragDropMonitor: 已通过 EG_DISABLE_NATIVE_DROP 禁用原生拖拽")
|
||
return
|
||
|
||
if os.name != "nt" or wintypes is None:
|
||
print("DragDropMonitor: 非 Windows 平台,未启用系统拖拽桥接")
|
||
return
|
||
|
||
try:
|
||
win_handle = self.world.win.getWindowHandle() if self.world and self.world.win else None
|
||
if not win_handle:
|
||
return
|
||
|
||
self._hwnd = int(win_handle.getIntHandle())
|
||
if not self._hwnd:
|
||
return
|
||
|
||
self._user32 = ctypes.windll.user32
|
||
self._shell32 = ctypes.windll.shell32
|
||
|
||
long_ptr_t = ctypes.c_ssize_t
|
||
wparam_t = wintypes.WPARAM
|
||
lparam_t = wintypes.LPARAM
|
||
wndproc_t = ctypes.WINFUNCTYPE(
|
||
long_ptr_t, wintypes.HWND, wintypes.UINT, wparam_t, lparam_t
|
||
)
|
||
|
||
if ctypes.sizeof(ctypes.c_void_p) == 8:
|
||
self._set_wndproc = self._user32.SetWindowLongPtrW
|
||
get_wndproc = self._user32.GetWindowLongPtrW
|
||
else:
|
||
self._set_wndproc = self._user32.SetWindowLongW
|
||
get_wndproc = self._user32.GetWindowLongW
|
||
|
||
self._set_wndproc.argtypes = [wintypes.HWND, ctypes.c_int, long_ptr_t]
|
||
self._set_wndproc.restype = long_ptr_t
|
||
get_wndproc.argtypes = [wintypes.HWND, ctypes.c_int]
|
||
get_wndproc.restype = long_ptr_t
|
||
|
||
self._user32.CallWindowProcW.argtypes = [
|
||
ctypes.c_void_p, wintypes.HWND, wintypes.UINT, wparam_t, lparam_t
|
||
]
|
||
self._user32.CallWindowProcW.restype = long_ptr_t
|
||
self._user32.IsWindow.argtypes = [wintypes.HWND]
|
||
self._user32.IsWindow.restype = wintypes.BOOL
|
||
|
||
self._shell32.DragQueryFileW.argtypes = [wintypes.HANDLE, wintypes.UINT, wintypes.LPWSTR, wintypes.UINT]
|
||
self._shell32.DragQueryFileW.restype = wintypes.UINT
|
||
self._shell32.DragQueryPoint.argtypes = [wintypes.HANDLE, ctypes.POINTER(wintypes.POINT)]
|
||
self._shell32.DragQueryPoint.restype = wintypes.BOOL
|
||
self._shell32.DragFinish.argtypes = [wintypes.HANDLE]
|
||
self._shell32.DragFinish.restype = None
|
||
self._shell32.DragAcceptFiles.argtypes = [wintypes.HWND, wintypes.BOOL]
|
||
self._shell32.DragAcceptFiles.restype = None
|
||
|
||
self._old_wndproc = get_wndproc(wintypes.HWND(self._hwnd), self.GWL_WNDPROC)
|
||
if not self._old_wndproc:
|
||
raise RuntimeError("GetWindowLongPtrW returned null WndProc")
|
||
|
||
def _window_proc(hwnd, msg, wparam, lparam):
|
||
try:
|
||
if msg == self.WM_DROPFILES:
|
||
files, drop_pos = self._extract_drop_files(wparam)
|
||
if files:
|
||
self._pending_events.append((files, drop_pos))
|
||
return 0
|
||
if msg == self.WM_NCDESTROY:
|
||
result = self._user32.CallWindowProcW(
|
||
ctypes.c_void_p(self._old_wndproc), hwnd, msg, wparam, lparam
|
||
)
|
||
self.enabled = False
|
||
self._hwnd = None
|
||
self._old_wndproc = None
|
||
self._hook_wndproc_ptr = None
|
||
return result
|
||
except Exception as cb_err:
|
||
print(f"[DragDropMonitor] 回调异常: {cb_err}")
|
||
|
||
if not self._old_wndproc:
|
||
return 0
|
||
return self._user32.CallWindowProcW(
|
||
ctypes.c_void_p(self._old_wndproc), hwnd, msg, wparam, lparam
|
||
)
|
||
|
||
self._window_proc = wndproc_t(_window_proc)
|
||
self._hook_wndproc_ptr = ctypes.cast(self._window_proc, ctypes.c_void_p).value
|
||
if not self._hook_wndproc_ptr:
|
||
raise RuntimeError("无法获取 WndProc 回调指针")
|
||
|
||
self._set_wndproc(
|
||
wintypes.HWND(self._hwnd),
|
||
self.GWL_WNDPROC,
|
||
self._hook_wndproc_ptr,
|
||
)
|
||
installed_wndproc = get_wndproc(wintypes.HWND(self._hwnd), self.GWL_WNDPROC)
|
||
if int(installed_wndproc) != int(self._hook_wndproc_ptr):
|
||
raise RuntimeError("WndProc 安装校验失败")
|
||
|
||
self._shell32.DragAcceptFiles(wintypes.HWND(self._hwnd), True)
|
||
self.enabled = True
|
||
print("拖拽监控已启用")
|
||
except Exception as e:
|
||
print(f"拖拽监控启动失败: {e}")
|
||
self.enabled = False
|
||
|
||
def stop(self):
|
||
"""恢复窗口过程并停止拖拽捕获。"""
|
||
if not self.enabled or os.name != "nt" or wintypes is None:
|
||
return
|
||
|
||
try:
|
||
if self._shell32 and self._hwnd:
|
||
self._shell32.DragAcceptFiles(wintypes.HWND(self._hwnd), False)
|
||
if (
|
||
self._set_wndproc
|
||
and self._hwnd
|
||
and self._old_wndproc
|
||
and self._user32
|
||
and self._user32.IsWindow(wintypes.HWND(self._hwnd))
|
||
):
|
||
self._set_wndproc(
|
||
wintypes.HWND(self._hwnd),
|
||
self.GWL_WNDPROC,
|
||
self._old_wndproc,
|
||
)
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
self.enabled = False
|
||
self._window_proc = None
|
||
self._hook_wndproc_ptr = None
|
||
|
||
def _extract_drop_files(self, hdrop):
|
||
"""从 Win32 HDROP 中提取文件路径和投放点。"""
|
||
files = []
|
||
drop_pos = None
|
||
if not self._shell32 or wintypes is None:
|
||
return files, drop_pos
|
||
|
||
hdrop_handle = wintypes.HANDLE(int(hdrop))
|
||
if not hdrop_handle:
|
||
return files, drop_pos
|
||
|
||
try:
|
||
file_count = self._shell32.DragQueryFileW(hdrop_handle, 0xFFFFFFFF, None, 0)
|
||
for i in range(file_count):
|
||
path_len = self._shell32.DragQueryFileW(hdrop_handle, i, None, 0)
|
||
buffer = ctypes.create_unicode_buffer(path_len + 1)
|
||
self._shell32.DragQueryFileW(hdrop_handle, i, buffer, path_len + 1)
|
||
files.append(buffer.value)
|
||
|
||
point = wintypes.POINT()
|
||
if self._shell32.DragQueryPoint(hdrop_handle, ctypes.byref(point)):
|
||
drop_pos = (int(point.x), int(point.y))
|
||
except Exception as e:
|
||
print(f"[DragDropMonitor] 解析拖拽数据失败: {e}")
|
||
finally:
|
||
try:
|
||
self._shell32.DragFinish(hdrop_handle)
|
||
except Exception:
|
||
pass
|
||
|
||
return files, drop_pos
|
||
|
||
def add_file_from_external(self, file_path):
|
||
"""手动压入外部文件(保留给其他集成调用)。"""
|
||
if file_path:
|
||
self._pending_events.append(([str(file_path)], None))
|
||
|
||
def pop_pending_events(self):
|
||
"""弹出并清空待处理的外部拖拽事件。"""
|
||
events = list(self._pending_events)
|
||
self._pending_events.clear()
|
||
return events
|
||
|
||
|
||
class ModelDragDropService:
|
||
"""模型拖拽业务入口。"""
|
||
|
||
SUPPORTED_MODEL_EXTS = {".gltf", ".glb", ".fbx", ".bam", ".egg", ".obj"}
|
||
|
||
def __init__(self, app):
|
||
self.app = app
|
||
|
||
def setup_drag_drop_support(self):
|
||
"""初始化系统外部拖拽支持。"""
|
||
try:
|
||
monitor = NativeFileDropMonitor(self.app)
|
||
monitor.start()
|
||
self.app.drag_drop_monitor = monitor
|
||
except Exception as e:
|
||
print(f"拖拽监控启动失败: {e}")
|
||
|
||
def is_point_in_resource_manager(self, drop_pos):
|
||
"""判断投放坐标是否命中资源管理器窗口。"""
|
||
if not drop_pos:
|
||
return False
|
||
rect = getattr(self.app, "_resource_manager_window_rect", None)
|
||
if not rect:
|
||
return False
|
||
|
||
x, y = drop_pos
|
||
rx, ry, rw, rh = rect
|
||
return (rx <= x <= rx + rw) and (ry <= y <= ry + rh)
|
||
|
||
def resolve_resource_drop_target_dir(self, drop_pos):
|
||
"""根据投放坐标解析资源管理器中的目标目录。"""
|
||
rm = getattr(self.app, "resource_manager", None)
|
||
default_dir = rm.current_path if rm else None
|
||
targets = getattr(self.app, "_resource_drop_targets", None) or []
|
||
if not drop_pos or not targets:
|
||
return default_dir
|
||
|
||
x, y = drop_pos
|
||
hits = []
|
||
for tx, ty, tw, th, path in targets:
|
||
if tx <= x <= tx + tw and ty <= y <= ty + th:
|
||
hits.append((tw * th, path))
|
||
|
||
if not hits:
|
||
return default_dir
|
||
|
||
hits.sort(key=lambda item: item[0])
|
||
target_path = Path(hits[0][1])
|
||
if target_path.exists() and target_path.is_dir():
|
||
return target_path
|
||
return default_dir
|
||
|
||
def process_external_drop_events(self):
|
||
"""处理外部拖入(系统文件拖拽)事件。"""
|
||
monitor = getattr(self.app, "drag_drop_monitor", None)
|
||
if not monitor:
|
||
return
|
||
|
||
events = monitor.pop_pending_events()
|
||
if not events:
|
||
return
|
||
|
||
for file_paths, drop_pos in events:
|
||
self.handle_external_drop(file_paths, drop_pos)
|
||
|
||
def handle_external_drop(self, file_paths, drop_pos=None):
|
||
"""把外部拖入文件分发到资源管理器或场景。"""
|
||
if not file_paths:
|
||
return
|
||
|
||
paths = [Path(p) for p in file_paths if p]
|
||
if not paths:
|
||
return
|
||
|
||
if self.is_point_in_resource_manager(drop_pos):
|
||
target_dir = self.resolve_resource_drop_target_dir(drop_pos)
|
||
imported, errors = self.app.resource_manager.import_external_files(paths, target_dir=target_dir)
|
||
if imported:
|
||
target_label = self.app.resource_manager.get_relative_path(target_dir) if target_dir else "资源管理器"
|
||
self.app.add_success_message(f"已导入 {len(imported)} 个外部资源到 {target_label}")
|
||
if errors:
|
||
self.app.add_warning_message(f"有 {len(errors)} 个资源导入失败,请查看控制台日志")
|
||
for err in errors:
|
||
print(f"[资源导入] {err}")
|
||
return
|
||
|
||
imported_models = []
|
||
for path in paths:
|
||
if not path.exists() or not self._is_supported_model(path):
|
||
continue
|
||
try:
|
||
model_node = self.app._import_model_with_menu_logic(
|
||
str(path),
|
||
select_model=False,
|
||
set_origin=True,
|
||
show_info_message=False,
|
||
show_success_message=False,
|
||
)
|
||
if not model_node:
|
||
continue
|
||
imported_models.append(model_node)
|
||
except Exception as e:
|
||
print(f"[外部拖拽] 导入模型失败 {path}: {e}")
|
||
|
||
if imported_models:
|
||
if getattr(self.app, "selection", None):
|
||
try:
|
||
self.app.selection.updateSelection(imported_models[-1])
|
||
except Exception as e:
|
||
print(f"[外部拖拽] 更新选择失败: {e}")
|
||
self.app.add_success_message(f"外部拖拽导入场景成功: {len(imported_models)} 个模型")
|
||
else:
|
||
self.app.add_info_message("将文件拖放到“资源管理器”窗口可导入到项目资源")
|
||
|
||
def draw_drag_drop_interface(self):
|
||
"""每帧处理资源管理器内部拖拽的释放事件。"""
|
||
rm = getattr(self.app, "resource_manager", None)
|
||
if not rm:
|
||
return
|
||
|
||
if rm.is_dragging():
|
||
self.app.is_dragging = True
|
||
self.app.dragged_files = rm.get_dragged_files()
|
||
|
||
if self.app.is_dragging and self.app.dragged_files and imgui.is_mouse_released(0):
|
||
mouse_pos = imgui.get_mouse_pos()
|
||
self.handle_internal_drag_drop_completion((float(mouse_pos.x), float(mouse_pos.y)))
|
||
|
||
def handle_internal_drag_drop_completion(self, mouse_pos: Optional[Tuple[float, float]] = None):
|
||
"""处理资源管理器内部拖拽落点。"""
|
||
rm = self.app.resource_manager
|
||
dragged_files = list(self.app.dragged_files) if self.app.dragged_files else rm.get_dragged_files()
|
||
if not dragged_files:
|
||
self._reset_drag_state()
|
||
return
|
||
|
||
drop_kind, target_dir, target_parent = self._resolve_drop_target(mouse_pos)
|
||
|
||
if drop_kind == "resource_manager":
|
||
moved, errors = rm.move_files_to_directory(dragged_files, target_dir)
|
||
if moved:
|
||
label = rm.get_relative_path(target_dir) if target_dir else "资源管理器"
|
||
self.app.add_success_message(f"已移动 {len(moved)} 个资源到 {label}")
|
||
if errors:
|
||
self.app.add_warning_message(f"有 {len(errors)} 个资源移动失败,请查看控制台")
|
||
for err in errors:
|
||
print(f"[资源移动] {err}")
|
||
self._reset_drag_state()
|
||
return
|
||
|
||
if drop_kind not in ("scene_view", "scene_tree"):
|
||
self._reset_drag_state()
|
||
return
|
||
|
||
imported_models = []
|
||
for file_path in dragged_files:
|
||
path = Path(file_path)
|
||
if not self._is_supported_model(path):
|
||
continue
|
||
try:
|
||
model_node = self.app._import_model_with_menu_logic(
|
||
str(path),
|
||
select_model=False,
|
||
set_origin=True,
|
||
show_info_message=False,
|
||
show_success_message=False,
|
||
)
|
||
if not model_node:
|
||
continue
|
||
|
||
if drop_kind == "scene_tree" and self._is_valid_drop_parent(target_parent):
|
||
try:
|
||
model_node.wrtReparentTo(target_parent)
|
||
except Exception:
|
||
model_node.reparentTo(target_parent)
|
||
|
||
imported_models.append(model_node)
|
||
except Exception as e:
|
||
print(f"[拖拽导入] 导入失败 {path}: {e}")
|
||
|
||
if imported_models and getattr(self.app, "selection", None):
|
||
try:
|
||
self.app.selection.updateSelection(imported_models[-1])
|
||
except Exception:
|
||
pass
|
||
|
||
if imported_models:
|
||
self.app.add_success_message(f"拖拽导入成功: {len(imported_models)} 个模型")
|
||
|
||
self._reset_drag_state()
|
||
|
||
def add_dragged_file(self, file_path):
|
||
"""添加拖拽的文件。"""
|
||
if file_path not in self.app.dragged_files:
|
||
self.app.dragged_files.append(file_path)
|
||
self.app.is_dragging = True
|
||
self.app.show_drag_overlay = True
|
||
print(f"检测到拖拽文件: {file_path}")
|
||
|
||
def clear_dragged_files(self):
|
||
"""清空拖拽文件列表。"""
|
||
self.app.dragged_files.clear()
|
||
self.app.is_dragging = False
|
||
self.app.show_drag_overlay = False
|
||
|
||
def process_dragged_files(self):
|
||
"""处理拖拽的文件。"""
|
||
if not self.app.dragged_files:
|
||
return
|
||
|
||
imported_count = 0
|
||
for file_path in self.app.dragged_files:
|
||
if self.import_model_from_path(file_path):
|
||
imported_count += 1
|
||
|
||
if imported_count > 0:
|
||
self.app.add_success_message(f"成功导入 {imported_count} 个模型文件")
|
||
else:
|
||
self.app.add_error_message("没有成功导入任何文件")
|
||
|
||
self.clear_dragged_files()
|
||
|
||
def import_model_from_path(self, file_path):
|
||
"""从路径导入模型的内部方法。"""
|
||
try:
|
||
path = Path(file_path)
|
||
if not path.exists():
|
||
self.app.add_error_message(f"文件不存在: {path}")
|
||
return False
|
||
|
||
if not self._is_supported_model(path):
|
||
self.app.add_error_message(f"不支持的文件格式: {path.suffix.lower()}")
|
||
return False
|
||
|
||
model_node = self.app._import_model_with_menu_logic(str(path))
|
||
if not model_node:
|
||
return False
|
||
|
||
return True
|
||
except Exception as e:
|
||
self.app.add_error_message(f"导入模型时发生错误: {e}")
|
||
return False
|
||
|
||
@staticmethod
|
||
def draw_drag_overlay():
|
||
"""兼容接口:Unity 风格不再绘制拖拽遮罩。"""
|
||
return
|
||
|
||
@staticmethod
|
||
def draw_drag_status():
|
||
"""兼容接口:Unity 风格不再绘制拖拽状态窗。"""
|
||
return
|
||
|
||
def _postprocess_imported_model(self, model_node, set_origin=False):
|
||
if getattr(self.app, "scene_manager", None):
|
||
try:
|
||
self.app.scene_manager.processMaterials(model_node)
|
||
except Exception:
|
||
pass
|
||
if set_origin:
|
||
try:
|
||
model_node.setPos(0, 0, 0)
|
||
except Exception:
|
||
pass
|
||
self._ensure_unique_name(model_node)
|
||
|
||
def _resolve_drop_target(self, mouse_pos: Optional[Tuple[float, float]]):
|
||
point = mouse_pos
|
||
if point is None:
|
||
m = imgui.get_mouse_pos()
|
||
point = (float(m.x), float(m.y))
|
||
|
||
if self._point_in_rect(point, getattr(self.app, "_resource_manager_window_rect", None)):
|
||
target_dir = self.resolve_resource_drop_target_dir(point)
|
||
return "resource_manager", target_dir, None
|
||
|
||
if self._point_in_rect(point, getattr(self.app, "_scene_tree_window_rect", None)):
|
||
return "scene_tree", None, getattr(self.app, "_drag_scene_tree_hover_node", None)
|
||
|
||
for rect_name in (
|
||
"_property_panel_window_rect",
|
||
"_script_panel_window_rect",
|
||
"_console_window_rect",
|
||
"_toolbar_window_rect",
|
||
):
|
||
if self._point_in_rect(point, getattr(self.app, rect_name, None)):
|
||
return "ui_panel", None, None
|
||
|
||
if imgui.get_io().want_capture_mouse:
|
||
return "ui_panel", None, None
|
||
|
||
return "scene_view", None, None
|
||
|
||
@staticmethod
|
||
def _point_in_rect(point, rect):
|
||
if not point or not rect:
|
||
return False
|
||
x, y = point
|
||
rx, ry, rw, rh = rect
|
||
return rx <= x <= rx + rw and ry <= y <= ry + rh
|
||
|
||
@classmethod
|
||
def _is_supported_model(cls, path: Path):
|
||
return path.suffix.lower() in cls.SUPPORTED_MODEL_EXTS
|
||
|
||
@staticmethod
|
||
def _is_valid_drop_parent(node):
|
||
if not node:
|
||
return False
|
||
try:
|
||
return not node.isEmpty()
|
||
except Exception:
|
||
return False
|
||
|
||
@staticmethod
|
||
def _ensure_unique_name(model_node):
|
||
if not model_node or model_node.isEmpty():
|
||
return
|
||
parent = model_node.getParent()
|
||
if not parent or parent.isEmpty():
|
||
return
|
||
|
||
current_name = model_node.getName() or "model"
|
||
stem, suffix = os.path.splitext(current_name)
|
||
if not stem:
|
||
stem = current_name
|
||
|
||
existing_names = set()
|
||
try:
|
||
siblings = parent.getChildren()
|
||
for i in range(siblings.getNumPaths()):
|
||
sibling = siblings.getPath(i)
|
||
if sibling == model_node:
|
||
continue
|
||
existing_names.add(sibling.getName())
|
||
except Exception:
|
||
return
|
||
|
||
if current_name not in existing_names:
|
||
return
|
||
|
||
index = 1
|
||
while True:
|
||
candidate = f"{stem}_{index}{suffix}" if suffix else f"{stem}_{index}"
|
||
if candidate not in existing_names:
|
||
model_node.setName(candidate)
|
||
return
|
||
index += 1
|
||
|
||
def _reset_drag_state(self):
|
||
self.app.is_dragging = False
|
||
self.app.dragged_files.clear()
|
||
self.app.show_drag_overlay = False
|
||
if getattr(self.app, "resource_manager", None):
|
||
self.app.resource_manager.clear_drag()
|
||
self.app._drag_scene_tree_hover_node = None
|