""" 模型拖拽服务 集中管理: 1. 系统外部文件拖入窗口(Windows WM_DROPFILES) 2. 资源管理器内部拖拽(移动资源、拖入场景、拖入场景树) 3. 模型导入后的命名冲突处理 """ from __future__ import annotations import ctypes import os from pathlib import Path from typing import List, Optional, Tuple from imgui_bundle import imgui try: from ctypes import wintypes except Exception: # pragma: no cover - non-Windows fallback wintypes = None class NativeFileDropMonitor: """Windows 文件拖拽桥接,收集外部文件拖入事件。""" WM_DROPFILES = 0x0233 WM_NCDESTROY = 0x0082 GWL_WNDPROC = -4 def __init__(self, world): self.world = world self.enabled = False self._hwnd = None self._window_proc = None self._old_wndproc = None self._set_wndproc = None self._user32 = None self._shell32 = None self._pending_events = [] self._hook_wndproc_ptr = None def start(self): """启动系统文件拖拽捕获。""" if self.enabled: return if os.environ.get("EG_DISABLE_NATIVE_DROP") == "1": print("DragDropMonitor: 已通过 EG_DISABLE_NATIVE_DROP 禁用原生拖拽") return if os.name != "nt" or wintypes is None: print("DragDropMonitor: 非 Windows 平台,未启用系统拖拽桥接") return try: win_handle = self.world.win.getWindowHandle() if self.world and self.world.win else None if not win_handle: return self._hwnd = int(win_handle.getIntHandle()) if not self._hwnd: return self._user32 = ctypes.windll.user32 self._shell32 = ctypes.windll.shell32 long_ptr_t = ctypes.c_ssize_t wparam_t = wintypes.WPARAM lparam_t = wintypes.LPARAM wndproc_t = ctypes.WINFUNCTYPE( long_ptr_t, wintypes.HWND, wintypes.UINT, wparam_t, lparam_t ) if ctypes.sizeof(ctypes.c_void_p) == 8: self._set_wndproc = self._user32.SetWindowLongPtrW get_wndproc = self._user32.GetWindowLongPtrW else: self._set_wndproc = self._user32.SetWindowLongW get_wndproc = self._user32.GetWindowLongW self._set_wndproc.argtypes = [wintypes.HWND, ctypes.c_int, long_ptr_t] self._set_wndproc.restype = long_ptr_t get_wndproc.argtypes = [wintypes.HWND, ctypes.c_int] get_wndproc.restype = long_ptr_t self._user32.CallWindowProcW.argtypes = [ ctypes.c_void_p, wintypes.HWND, wintypes.UINT, wparam_t, lparam_t ] self._user32.CallWindowProcW.restype = long_ptr_t self._user32.IsWindow.argtypes = [wintypes.HWND] self._user32.IsWindow.restype = wintypes.BOOL self._shell32.DragQueryFileW.argtypes = [wintypes.HANDLE, wintypes.UINT, wintypes.LPWSTR, wintypes.UINT] self._shell32.DragQueryFileW.restype = wintypes.UINT self._shell32.DragQueryPoint.argtypes = [wintypes.HANDLE, ctypes.POINTER(wintypes.POINT)] self._shell32.DragQueryPoint.restype = wintypes.BOOL self._shell32.DragFinish.argtypes = [wintypes.HANDLE] self._shell32.DragFinish.restype = None self._shell32.DragAcceptFiles.argtypes = [wintypes.HWND, wintypes.BOOL] self._shell32.DragAcceptFiles.restype = None self._old_wndproc = get_wndproc(wintypes.HWND(self._hwnd), self.GWL_WNDPROC) if not self._old_wndproc: raise RuntimeError("GetWindowLongPtrW returned null WndProc") def _window_proc(hwnd, msg, wparam, lparam): try: if msg == self.WM_DROPFILES: files, drop_pos = self._extract_drop_files(wparam) if files: self._pending_events.append((files, drop_pos)) return 0 if msg == self.WM_NCDESTROY: result = self._user32.CallWindowProcW( ctypes.c_void_p(self._old_wndproc), hwnd, msg, wparam, lparam ) self.enabled = False self._hwnd = None self._old_wndproc = None self._hook_wndproc_ptr = None return result except Exception as cb_err: print(f"[DragDropMonitor] 回调异常: {cb_err}") if not self._old_wndproc: return 0 return self._user32.CallWindowProcW( ctypes.c_void_p(self._old_wndproc), hwnd, msg, wparam, lparam ) self._window_proc = wndproc_t(_window_proc) self._hook_wndproc_ptr = ctypes.cast(self._window_proc, ctypes.c_void_p).value if not self._hook_wndproc_ptr: raise RuntimeError("无法获取 WndProc 回调指针") self._set_wndproc( wintypes.HWND(self._hwnd), self.GWL_WNDPROC, self._hook_wndproc_ptr, ) installed_wndproc = get_wndproc(wintypes.HWND(self._hwnd), self.GWL_WNDPROC) if int(installed_wndproc) != int(self._hook_wndproc_ptr): raise RuntimeError("WndProc 安装校验失败") self._shell32.DragAcceptFiles(wintypes.HWND(self._hwnd), True) self.enabled = True print("拖拽监控已启用") except Exception as e: print(f"拖拽监控启动失败: {e}") self.enabled = False def stop(self): """恢复窗口过程并停止拖拽捕获。""" if not self.enabled or os.name != "nt" or wintypes is None: return try: if self._shell32 and self._hwnd: self._shell32.DragAcceptFiles(wintypes.HWND(self._hwnd), False) if ( self._set_wndproc and self._hwnd and self._old_wndproc and self._user32 and self._user32.IsWindow(wintypes.HWND(self._hwnd)) ): self._set_wndproc( wintypes.HWND(self._hwnd), self.GWL_WNDPROC, self._old_wndproc, ) except Exception: pass finally: self.enabled = False self._window_proc = None self._hook_wndproc_ptr = None def _extract_drop_files(self, hdrop): """从 Win32 HDROP 中提取文件路径和投放点。""" files = [] drop_pos = None if not self._shell32 or wintypes is None: return files, drop_pos hdrop_handle = wintypes.HANDLE(int(hdrop)) if not hdrop_handle: return files, drop_pos try: file_count = self._shell32.DragQueryFileW(hdrop_handle, 0xFFFFFFFF, None, 0) for i in range(file_count): path_len = self._shell32.DragQueryFileW(hdrop_handle, i, None, 0) buffer = ctypes.create_unicode_buffer(path_len + 1) self._shell32.DragQueryFileW(hdrop_handle, i, buffer, path_len + 1) files.append(buffer.value) point = wintypes.POINT() if self._shell32.DragQueryPoint(hdrop_handle, ctypes.byref(point)): drop_pos = (int(point.x), int(point.y)) except Exception as e: print(f"[DragDropMonitor] 解析拖拽数据失败: {e}") finally: try: self._shell32.DragFinish(hdrop_handle) except Exception: pass return files, drop_pos def add_file_from_external(self, file_path): """手动压入外部文件(保留给其他集成调用)。""" if file_path: self._pending_events.append(([str(file_path)], None)) def pop_pending_events(self): """弹出并清空待处理的外部拖拽事件。""" events = list(self._pending_events) self._pending_events.clear() return events class ModelDragDropService: """模型拖拽业务入口。""" SUPPORTED_MODEL_EXTS = {".gltf", ".glb", ".fbx", ".bam", ".egg", ".obj"} def __init__(self, app): self.app = app def setup_drag_drop_support(self): """初始化系统外部拖拽支持。""" try: monitor = NativeFileDropMonitor(self.app) monitor.start() self.app.drag_drop_monitor = monitor except Exception as e: print(f"拖拽监控启动失败: {e}") def is_point_in_resource_manager(self, drop_pos): """判断投放坐标是否命中资源管理器窗口。""" if not drop_pos: return False rect = getattr(self.app, "_resource_manager_window_rect", None) if not rect: return False x, y = drop_pos rx, ry, rw, rh = rect return (rx <= x <= rx + rw) and (ry <= y <= ry + rh) def resolve_resource_drop_target_dir(self, drop_pos): """根据投放坐标解析资源管理器中的目标目录。""" rm = getattr(self.app, "resource_manager", None) default_dir = rm.current_path if rm else None targets = getattr(self.app, "_resource_drop_targets", None) or [] if not drop_pos or not targets: return default_dir x, y = drop_pos hits = [] for tx, ty, tw, th, path in targets: if tx <= x <= tx + tw and ty <= y <= ty + th: hits.append((tw * th, path)) if not hits: return default_dir hits.sort(key=lambda item: item[0]) target_path = Path(hits[0][1]) if target_path.exists() and target_path.is_dir(): return target_path return default_dir def process_external_drop_events(self): """处理外部拖入(系统文件拖拽)事件。""" monitor = getattr(self.app, "drag_drop_monitor", None) if not monitor: return events = monitor.pop_pending_events() if not events: return for file_paths, drop_pos in events: self.handle_external_drop(file_paths, drop_pos) def handle_external_drop(self, file_paths, drop_pos=None): """把外部拖入文件分发到资源管理器或场景。""" if not file_paths: return paths = [Path(p) for p in file_paths if p] if not paths: return if self.is_point_in_resource_manager(drop_pos): target_dir = self.resolve_resource_drop_target_dir(drop_pos) imported, errors = self.app.resource_manager.import_external_files(paths, target_dir=target_dir) if imported: target_label = self.app.resource_manager.get_relative_path(target_dir) if target_dir else "资源管理器" self.app.add_success_message(f"已导入 {len(imported)} 个外部资源到 {target_label}") if errors: self.app.add_warning_message(f"有 {len(errors)} 个资源导入失败,请查看控制台日志") for err in errors: print(f"[资源导入] {err}") return imported_models = [] for path in paths: if not path.exists() or not self._is_supported_model(path): continue try: model_node = self.app._import_model_with_menu_logic( str(path), select_model=False, set_origin=True, show_info_message=False, show_success_message=False, ) if not model_node: continue imported_models.append(model_node) except Exception as e: print(f"[外部拖拽] 导入模型失败 {path}: {e}") if imported_models: if getattr(self.app, "selection", None): try: self.app.selection.updateSelection(imported_models[-1]) except Exception as e: print(f"[外部拖拽] 更新选择失败: {e}") self.app.add_success_message(f"外部拖拽导入场景成功: {len(imported_models)} 个模型") else: self.app.add_info_message("将文件拖放到“资源管理器”窗口可导入到项目资源") def draw_drag_drop_interface(self): """每帧处理资源管理器内部拖拽的释放事件。""" rm = getattr(self.app, "resource_manager", None) if not rm: return if rm.is_dragging(): self.app.is_dragging = True self.app.dragged_files = rm.get_dragged_files() if self.app.is_dragging and self.app.dragged_files and imgui.is_mouse_released(0): mouse_pos = imgui.get_mouse_pos() self.handle_internal_drag_drop_completion((float(mouse_pos.x), float(mouse_pos.y))) def handle_internal_drag_drop_completion(self, mouse_pos: Optional[Tuple[float, float]] = None): """处理资源管理器内部拖拽落点。""" rm = self.app.resource_manager dragged_files = list(self.app.dragged_files) if self.app.dragged_files else rm.get_dragged_files() if not dragged_files: self._reset_drag_state() return drop_kind, target_dir, target_parent = self._resolve_drop_target(mouse_pos) if drop_kind == "resource_manager": moved, errors = rm.move_files_to_directory(dragged_files, target_dir) if moved: label = rm.get_relative_path(target_dir) if target_dir else "资源管理器" self.app.add_success_message(f"已移动 {len(moved)} 个资源到 {label}") if errors: self.app.add_warning_message(f"有 {len(errors)} 个资源移动失败,请查看控制台") for err in errors: print(f"[资源移动] {err}") self._reset_drag_state() return if drop_kind not in ("scene_view", "scene_tree"): self._reset_drag_state() return imported_models = [] for file_path in dragged_files: path = Path(file_path) if not self._is_supported_model(path): continue try: model_node = self.app._import_model_with_menu_logic( str(path), select_model=False, set_origin=True, show_info_message=False, show_success_message=False, ) if not model_node: continue if drop_kind == "scene_tree" and self._is_valid_drop_parent(target_parent): try: model_node.wrtReparentTo(target_parent) except Exception: model_node.reparentTo(target_parent) imported_models.append(model_node) except Exception as e: print(f"[拖拽导入] 导入失败 {path}: {e}") if imported_models and getattr(self.app, "selection", None): try: self.app.selection.updateSelection(imported_models[-1]) except Exception: pass if imported_models: self.app.add_success_message(f"拖拽导入成功: {len(imported_models)} 个模型") self._reset_drag_state() def add_dragged_file(self, file_path): """添加拖拽的文件。""" if file_path not in self.app.dragged_files: self.app.dragged_files.append(file_path) self.app.is_dragging = True self.app.show_drag_overlay = True print(f"检测到拖拽文件: {file_path}") def clear_dragged_files(self): """清空拖拽文件列表。""" self.app.dragged_files.clear() self.app.is_dragging = False self.app.show_drag_overlay = False def process_dragged_files(self): """处理拖拽的文件。""" if not self.app.dragged_files: return imported_count = 0 for file_path in self.app.dragged_files: if self.import_model_from_path(file_path): imported_count += 1 if imported_count > 0: self.app.add_success_message(f"成功导入 {imported_count} 个模型文件") else: self.app.add_error_message("没有成功导入任何文件") self.clear_dragged_files() def import_model_from_path(self, file_path): """从路径导入模型的内部方法。""" try: path = Path(file_path) if not path.exists(): self.app.add_error_message(f"文件不存在: {path}") return False if not self._is_supported_model(path): self.app.add_error_message(f"不支持的文件格式: {path.suffix.lower()}") return False model_node = self.app._import_model_with_menu_logic(str(path)) if not model_node: return False return True except Exception as e: self.app.add_error_message(f"导入模型时发生错误: {e}") return False @staticmethod def draw_drag_overlay(): """兼容接口:Unity 风格不再绘制拖拽遮罩。""" return @staticmethod def draw_drag_status(): """兼容接口:Unity 风格不再绘制拖拽状态窗。""" return def _postprocess_imported_model(self, model_node, set_origin=False): if getattr(self.app, "scene_manager", None): try: self.app.scene_manager.processMaterials(model_node) except Exception: pass if set_origin: try: model_node.setPos(0, 0, 0) except Exception: pass self._ensure_unique_name(model_node) def _resolve_drop_target(self, mouse_pos: Optional[Tuple[float, float]]): point = mouse_pos if point is None: m = imgui.get_mouse_pos() point = (float(m.x), float(m.y)) if self._point_in_rect(point, getattr(self.app, "_resource_manager_window_rect", None)): target_dir = self.resolve_resource_drop_target_dir(point) return "resource_manager", target_dir, None if self._point_in_rect(point, getattr(self.app, "_scene_tree_window_rect", None)): return "scene_tree", None, getattr(self.app, "_drag_scene_tree_hover_node", None) for rect_name in ( "_property_panel_window_rect", "_script_panel_window_rect", "_console_window_rect", "_toolbar_window_rect", ): if self._point_in_rect(point, getattr(self.app, rect_name, None)): return "ui_panel", None, None if imgui.get_io().want_capture_mouse: return "ui_panel", None, None return "scene_view", None, None @staticmethod def _point_in_rect(point, rect): if not point or not rect: return False x, y = point rx, ry, rw, rh = rect return rx <= x <= rx + rw and ry <= y <= ry + rh @classmethod def _is_supported_model(cls, path: Path): return path.suffix.lower() in cls.SUPPORTED_MODEL_EXTS @staticmethod def _is_valid_drop_parent(node): if not node: return False try: return not node.isEmpty() except Exception: return False @staticmethod def _ensure_unique_name(model_node): if not model_node or model_node.isEmpty(): return parent = model_node.getParent() if not parent or parent.isEmpty(): return current_name = model_node.getName() or "model" stem, suffix = os.path.splitext(current_name) if not stem: stem = current_name existing_names = set() try: siblings = parent.getChildren() for i in range(siblings.getNumPaths()): sibling = siblings.getPath(i) if sibling == model_node: continue existing_names.add(sibling.getName()) except Exception: return if current_name not in existing_names: return index = 1 while True: candidate = f"{stem}_{index}{suffix}" if suffix else f"{stem}_{index}" if candidate not in existing_names: model_node.setName(candidate) return index += 1 def _reset_drag_state(self): self.app.is_dragging = False self.app.dragged_files.clear() self.app.show_drag_overlay = False if getattr(self.app, "resource_manager", None): self.app.resource_manager.clear_drag() self.app._drag_scene_tree_hover_node = None