This commit is contained in:
Rowland 2026-01-27 10:04:02 +08:00
parent ced0eedc7a
commit a167767890
10 changed files with 88 additions and 1577 deletions

121
IFLOW.md
View File

@ -6,18 +6,22 @@
核心架构围绕 `MyWorld` 类构建,该类继承自 `CoreWorld`并集成了各种管理器模块如选择系统、工具管理器、脚本管理器、GUI 管理器、场景管理器、项目管理器、地形管理器、碰撞管理器、视频管理器、资源管理器和 VR 管理器。
项目目前处于 "imgui" 分支,已全面转向基于 ImGui 的现代化用户界面,提供了完整的 dockable 界面系统,包括菜单栏、工具栏、场景树、资源管理器、属性面板和控制台等组件。
## 核心技术栈
- **核心引擎**: Panda3D 1.10.15
- **图形渲染**: 可选择普通渲染或 RenderPipeline 高级渲染管线
- **VR 支持**: OpenVR/SteamVR 2.2.0
- **用户界面**: PyQt5 5.15.9 + ImGui 集成
- **用户界面**: PyQt5 5.15.9 + ImGui 集成 (imgui_bundle)
- **3D 模型格式**: 支持 glTF, FBX (需转换), BAM, EGG, OBJ 等
- **脚本语言**: Python (内嵌脚本系统)
- **物理/碰撞**: Panda3D 内置碰撞系统
- **视频支持**: MovieTexture (MP4, AVI, MOV, MKV, WebM)
- **插件架构**: 动态插件加载系统
- **资源管理**: 文件浏览、拖拽、右键菜单集成
- **GUI 框架**: p3dimgui + imgui_bundle (支持 docking 功能)
- **拖拽系统**: 跨平台拖拽文件监控系统 (core/drag_drop/)
## 项目结构
@ -27,6 +31,7 @@
│ ├── world.py # CoreWorld 基础世界类
│ ├── vr/ # VR 子模块 (完整模块化结构)
│ │ ├── __init__.py
│ │ ├── README.md
│ │ ├── config/ # VR 配置管理
│ │ ├── interaction/ # VR 交互系统 (动作、抓取、摇杆、传送)
│ │ ├── performance/ # VR 性能监控
@ -34,7 +39,6 @@
│ │ ├── testing/ # VR 测试模式
│ │ ├── tracking/ # VR 设备跟踪
│ │ └── visualization/ # VR 可视化 (控制器模型)
│ ├── drag_drop/ # 拖拽文件监控系统
│ ├── vr_manager.py # VR 管理器主文件
│ ├── selection.py # 选择系统
│ ├── tool_manager.py # 工具管理器
@ -49,45 +53,77 @@
│ ├── resource_manager.py # 资源管理器 (ImGui版本)
│ ├── CustomMouseController.py # 自定义鼠标控制器
│ ├── maintenance_gui.py # 维护GUI系统
│ └── assembly_interaction.py # 装配交互系统
├── demo/ # 示例和测试文件 (包含大量测试和演示)
│ ├── assembly_interaction.py # 装配交互系统
│ ├── imgui_style_manager.py # ImGui样式管理器
│ └── [工具模型文件] # .fbx 格式的工具手柄模型
│ ├── RotationHandleFull.fbx
│ ├── RotationHandleQuarter.fbx
│ ├── TranslateArrowHandle.fbx
│ └── UniformScaleHandle.fbx
├── gui/ # GUI 相关模块
│ ├── __init__.py
│ └── gui_manager.py # GUI管理器
├── project/ # 项目管理模块
│ ├── __init__.py
│ └── project_manager.py
├── scene/ # 场景管理模块
│ ├── __init__.py
│ ├── scene_manager.py
│ └── util.py
├── scripts/ # 脚本文件目录
├── ui/ # UI 组件和主窗口
│ ├── widgets.py # 自定义UI组件
│ ├── property_panel.py # 属性面板
│ ├── interface_manager.py # 界面管理器
│ ├── main_window.py # 主窗口
│ ├── icon_manager.py # 图标管理器
│ └── maintenance_system.py # 维护系统UI
│ └── [各种脚本文件]
├── ui/ # UI 组件
│ ├── __init__.py
│ └── icon_manager.py # 图标管理器
├── plugins/ # 插件系统
│ ├── __init__.py
│ ├── plugin_manager.py
│ ├── plugin_interface_spec.py
│ ├── core/ # 核心插件
│ ├── third_party/ # 第三方插件
│ ├── user/ # 用户插件
│ ├── plugin_manager.py # 插件管理器
│ └── plugin_interface_spec.py # 插件接口规范
│ └── user/ # 用户插件
├── QPanda3D/ # Panda3D 与 PyQt 集成库
├── QMeta3D/ # Meta3D 集成库
├── RenderPipelineFile/ # RenderPipeline 高级渲染管线
├── Resources/ # 资源文件 (模型、纹理等)
│ ├── a/ # 资源子目录
│ ├── animations/
│ ├── c/
│ ├── icons/
│ ├── materials/
│ ├── models/
│ └── textures/
├── config/ # 配置文件
│ └── vr_settings.json # VR 配置文件
├── requirements/ # 依赖项文件
│ ├── requirements.txt # 主要依赖项
│ ├── conda-requirements.txt # Conda环境依赖
│ └── clean-requirements.txt # 精简依赖项
│ ├── clean-requirements.txt # 精简依赖项
│ └── DEPLOYMENT_README.md # 部署说明
├── vr_actions/ # VR动作配置
│ ├── actions.json
│ ├── bindings_index.json
│ ├── bindings_oculus.json
│ └── bindings_vive.json
├── icons/ # 图标资源
│ └── file_types/ # 文件类型图标
├── templates/ # 模板文件
│ └── main_template.py
├── tools/ # 工具脚本
│ └── open_source_rate.py
├── tex/ # 纹理资源
├── main.py # 程序入口点
├── demo.py # ImGui演示程序
├── gui_preview_window.py # GUI预览窗口
│ ├── empty_basecolor.png
│ ├── empty_normal.png
│ ├── empty_roughness.png
│ └── empty_specular.png
├── new/ # 新项目目录
│ └── project.json
├── test_project/ # 测试项目
│ └── project.json
├── main.py # 程序入口点 (集成ImGui界面)
├── Start_Run.py # 启动脚本
└── imgui.ini # ImGui配置文件
├── imgui.ini # ImGui配置文件
└── IFLOW.md # 项目文档
```
## 核心功能模块
@ -134,15 +170,25 @@
- `VRTestMode`: 提供不同的测试显示模式和功能开关。
- `VRPerformanceMonitor`: 性能监控和报告。
### 3. GUI 系统 (core/gui_manager.py, gui/)
### 3. GUI 系统 (core/gui_manager.py, gui/, core/imgui_style_manager.py)
- **GUIManager**: 管理 2D 和 3D GUI 元素的创建、编辑、删除。
- **ImGui 集成**: 完整的 ImGui 界面系统,支持 docking 功能。
- **功能**:
- 创建按钮、标签、输入框、2D/3D 图像、视频屏幕等。
- GUI 编辑模式,支持拖拽创建和属性编辑。
- 与场景树和属性面板集成。
- 独立的 GUI 预览窗口 (`gui_preview_window.py`)。
- ImGui 集成支持现代化的界面设计。
- **现代化界面**: 基于 imgui_bundle 的完整 docking 界面系统,包括:
- 菜单栏 (文件、编辑、创建、视图、工具、VR、帮助)
- 工具栏 (选择、移动、旋转、缩放工具)
- 场景树 (层级结构显示和管理)
- 资源管理器 (文件浏览、拖拽导入)
- 属性面板 (对象属性编辑)
- 控制台 (信息输出)
- 脚本管理面板
- **ImGui 样式管理**: `ImGuiStyleManager` 提供统一的界面风格管理。
- **中文字体支持**: 自动加载中文字体,适合中文环境开发。
- **快捷键支持**: 完整的键盘快捷键系统。
### 4. 场景与模型管理 (scene/scene_manager.py)
@ -259,37 +305,43 @@
### 入口点
- `main.py`: 主程序入口,创建 `MyWorld` 实例并启动 PyQt5 主窗口。
- `demo.py`: ImGui演示程序展示现代化界面功能。
- `main.py`: 主程序入口,创建 `MyWorld` 实例并启动基于 ImGui 的现代化界面。
- `Start_Run.py`: 启动脚本。
- `gui_preview_window.py`: GUI预览窗口用于独立测试GUI元素。
### 运行方式
1. 确保已安装所有依赖项(详见 requirements/requirements.txt
2. 运行 `python main.py` 启动主应用程序。
3. 运行 `python demo.py` 启动ImGui演示程序。
4. 如果连接了 VR 设备并安装了 SteamVR可以在应用内启用 VR 模式。
5. VR配置可通过 `config/vr_settings.json` 进行调整。
2. 运行 `python main.py` 启动主应用程序,将显示完整的 ImGui 界面。
3. 如果连接了 VR 设备并安装了 SteamVR可以通过菜单 "VR" -> "进入VR模式" 启用 VR 模式。
4. VR配置可通过 `config/vr_settings.json` 进行调整,或在界面中通过 "VR" -> "VR设置" 进行配置。
### 依赖项
项目提供完整的依赖项文件在 `requirements/` 目录:
- **requirements.txt**: 主要依赖项,包含:
- **requirements.txt**: 完整依赖项,包含:
- `Panda3D==1.10.15` (核心3D引擎)
- `PyQt5==5.15.9` (GUI框架)
- `PySide6>=6.8.1` (Qt6支持)
- `openvr==2.2.0` (VR支持)
- `p3dimgui` (ImGui集成)
- `QPanda3D==0.2.10` (Panda3D与PyQt5集成)
- `Pillow>=9.0.1` (图像处理)
- `python-dotenv>=1.0.1` (环境变量管理)
- `pyassimp>=5.2.5` (3D模型导入)
- 以及其他辅助库
- **conda-requirements.txt**: Conda环境依赖项
- **clean-requirements.txt**: 精简依赖项
- **clean-requirements.txt**: 精简依赖项,仅包含核心库
- **conda-requirements.txt**: Conda环境依赖项 (基础Python环境)
- **DEPLOYMENT_README.md**: 部署说明文档
安装命令:
```bash
# 安装完整依赖
pip install -r requirements/requirements.txt
# 或安装精简依赖
pip install -r requirements/clean-requirements.txt
```
## 开发约定
@ -306,4 +358,7 @@ pip install -r requirements/requirements.txt
- **国际化支持**: 内置中文字体支持,适合中文环境开发。
- **事件驱动**: 完善的事件处理系统支持键盘快捷键如F键聚焦、P键巡检
- **命令模式**: 使用命令系统实现撤销/重做功能。
- **演示系统**: demo/ 目录包含大量测试和演示文件,便于功能验证和学习。
- **拖拽系统**: 跨平台拖拽文件监控系统,支持从外部文件管理器拖拽模型到场景中。
- **ImGui Docking**: 完整的 docking 界面系统,支持窗口自由布局和停靠。
- **分支管理**: 当前开发在 "imgui" 分支,专注于基于 ImGui 的现代化界面开发。
- **工具集成**: 内置多种工具手柄模型支持专业的3D变换操作。

View File

@ -1,10 +0,0 @@
"""
跨平台拖拽检测模块
提供统一的拖拽检测接口支持WindowsLinux和macOS平台
"""
from .base_detector import BaseDragDetector
from .platform_detector import PlatformDragDetector
__all__ = ['BaseDragDetector', 'PlatformDragDetector']

View File

@ -1,120 +0,0 @@
"""
拖拽检测器基类
定义所有平台拖拽检测器的通用接口
"""
import os
import platform
from abc import ABC, abstractmethod
from typing import List, Optional, Callable
import threading
import queue
import time
class BaseDragDetector(ABC):
"""拖拽检测器基类"""
def __init__(self, supported_formats: List[str] = None):
"""
初始化拖拽检测器
Args:
supported_formats: 支持的文件格式列表 ['.gltf', '.glb', '.fbx']
"""
self.supported_formats = supported_formats or ['.gltf', '.glb', '.fbx', '.bam', '.egg', '.obj']
self.is_running = False
self.monitor_thread = None
self.file_queue = queue.Queue()
self.drop_callback: Optional[Callable[[List[str]], None]] = None
def set_drop_callback(self, callback: Callable[[List[str]], None]):
"""
设置文件拖拽回调函数
Args:
callback: 接收文件路径列表的回调函数
"""
self.drop_callback = callback
def start_monitoring(self):
"""开始监控拖拽事件"""
if not self.is_running:
self.is_running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
def stop_monitoring(self):
"""停止监控拖拽事件"""
self.is_running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=1.0)
def get_dropped_files(self) -> List[str]:
"""获取拖拽的文件列表"""
files = []
while not self.file_queue.empty():
try:
files.append(self.file_queue.get_nowait())
except queue.Empty:
break
return files
def add_dropped_file(self, file_path: str):
"""
添加拖拽的文件路径
Args:
file_path: 拖拽的文件路径
"""
if self._is_supported_format(file_path):
self.file_queue.put(file_path)
if self.drop_callback:
self.drop_callback([file_path])
def _is_supported_format(self, file_path: str) -> bool:
"""检查文件格式是否支持"""
_, ext = os.path.splitext(file_path.lower())
return ext in self.supported_formats
@abstractmethod
def _monitor_loop(self):
"""监控循环,由子类实现"""
pass
@abstractmethod
def is_supported(self) -> bool:
"""检查当前平台是否支持此检测器"""
pass
class DragDetectorFactory:
"""拖拽检测器工厂类"""
@staticmethod
def create_detector(supported_formats: List[str] = None) -> BaseDragDetector:
"""
根据当前平台创建合适的拖拽检测器
Args:
supported_formats: 支持的文件格式列表
Returns:
适合当前平台的拖拽检测器实例
"""
system = platform.system().lower()
if system == 'windows':
from .windows_detector import WindowsDragDetector
return WindowsDragDetector(supported_formats)
elif system == 'linux':
from .linux_detector import LinuxDragDetector
return LinuxDragDetector(supported_formats)
elif system == 'darwin': # macOS
from .macos_detector import MacOSDragDetector
return MacOSDragDetector(supported_formats)
else:
# 降级到基础检测器
from .fallback_detector import FallbackDragDetector
return FallbackDragDetector(supported_formats)

View File

@ -1,83 +0,0 @@
"""
降级拖拽检测器
当平台特定检测器不可用时的降级方案
"""
import os
import time
import threading
from typing import List
from .base_detector import BaseDragDetector
class FallbackDragDetector(BaseDragDetector):
"""降级拖拽检测器"""
def __init__(self, supported_formats: List[str] = None):
"""
初始化降级检测器
Args:
supported_formats: 支持的文件格式列表
"""
super().__init__(supported_formats)
self._watch_directory = os.path.expanduser("~/Desktop") # 监控桌面
self._known_files = set()
self._scan_interval = 0.5 # 扫描间隔(秒)
# 初始化已知文件列表
self._scan_known_files()
def _scan_known_files(self):
"""扫描已知文件"""
self._known_files.clear()
if os.path.exists(self._watch_directory):
for filename in os.listdir(self._watch_directory):
if self._is_supported_format(filename):
filepath = os.path.join(self._watch_directory, filename)
self._known_files.add(filepath)
def _monitor_loop(self):
"""监控循环 - 通过文件系统变化检测"""
while self.is_running:
try:
if os.path.exists(self._watch_directory):
current_files = set()
for filename in os.listdir(self._watch_directory):
if self._is_supported_format(filename):
filepath = os.path.join(self._watch_directory, filename)
current_files.add(filepath)
# 检测新文件
new_files = current_files - self._known_files
for filepath in new_files:
# 检查文件是否最近创建1秒内
try:
file_time = os.path.getmtime(filepath)
if time.time() - file_time < 1.0:
self.add_dropped_file(filepath)
except OSError:
pass
self._known_files = current_files
except Exception as e:
print(f"降级检测器错误: {e}")
time.sleep(self._scan_interval)
def is_supported(self) -> bool:
"""降级检测器总是支持"""
return True
def set_watch_directory(self, directory: str):
"""
设置监控目录
Args:
directory: 要监控的目录路径
"""
if os.path.exists(directory):
self._watch_directory = directory
self._scan_known_files()

View File

@ -1,395 +0,0 @@
"""
Linux平台拖拽检测器
支持X11和Wayland窗口系统的拖拽检测
"""
import os
import sys
import time
import threading
import subprocess
from typing import List, Optional
from .base_detector import BaseDragDetector
class LinuxDragDetector(BaseDragDetector):
"""Linux平台拖拽检测器"""
def __init__(self, supported_formats: List[str] = None):
"""
初始化Linux拖拽检测器
Args:
supported_formats: 支持的文件格式列表
"""
super().__init__(supported_formats)
self._window_id = None
self._display = None
self._is_x11 = self._check_x11()
self._is_wayland = self._check_wayland()
self._xdg_active = self._check_xdg()
def _check_x11(self) -> bool:
"""检查是否使用X11"""
try:
# 检查DISPLAY环境变量
if os.environ.get('DISPLAY'):
# 尝试连接X11显示
result = subprocess.run(['xset', 'q'],
capture_output=True,
timeout=2)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return False
def _check_wayland(self) -> bool:
"""检查是否使用Wayland"""
return os.environ.get('WAYLAND_DISPLAY') is not None
def _check_xdg(self) -> bool:
"""检查XDG桌面环境是否可用"""
try:
# 检查是否安装了xdg-user-dir
result = subprocess.run(['which', 'xdg-user-dir'],
capture_output=True)
return result.returncode == 0
except FileNotFoundError:
return False
def _get_window_id(self) -> Optional[str]:
"""获取当前窗口ID"""
try:
# 使用xdotool获取活动窗口
result = subprocess.run(['xdotool', 'getactivewindow'],
capture_output=True,
text=True,
timeout=2)
if result.returncode == 0:
return result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# 备用方法使用wmctrl
try:
result = subprocess.run(['wmctrl', '-l'],
capture_output=True,
text=True,
timeout=2)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
if lines:
# 获取第一个窗口的ID
return lines[0].split()[0]
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return None
def _monitor_x11_drag(self):
"""监控X11拖拽事件"""
try:
# 使用xev监控窗口事件
window_id = self._get_window_id()
if not window_id:
print("无法获取窗口ID切换到文件监控")
self._monitor_file_drag()
return
cmd = ['xev', '-id', window_id, '-event', 'pointer']
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True)
print(f"开始监控X11窗口 {window_id} 的拖拽事件")
while self.is_running:
line = process.stdout.readline()
if not line:
break
# 检测拖拽相关事件
if any(keyword in line for keyword in ['DND', 'Xdnd', 'ClientMessage', 'SelectionNotify']):
# 尝试获取拖拽的文件
self._extract_x11_drag_files()
except Exception as e:
print(f"X11拖拽监控错误: {e}")
print("切换到文件监控模式")
self._monitor_file_drag()
finally:
if 'process' in locals():
process.terminate()
def _extract_x11_drag_files(self):
"""从X11拖拽事件中提取文件路径"""
try:
# 使用xclip获取剪贴板内容可能包含拖拽的文件
result = subprocess.run(['xclip', '-selection', 'primary', '-o'],
capture_output=True,
text=True,
timeout=1)
if result.returncode == 0 and result.stdout.strip():
self._process_clipboard_content(result.stdout.strip())
except (subprocess.TimeoutExpired, FileNotFoundError):
# 尝试使用xsel作为备选
try:
result = subprocess.run(['xsel', '--primary', '--output'],
capture_output=True,
text=True,
timeout=1)
if result.returncode == 0 and result.stdout.strip():
self._process_clipboard_content(result.stdout.strip())
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
def _process_clipboard_content(self, content):
"""处理剪贴板内容,提取文件路径"""
# 检查是否包含文件路径
lines = content.split('\n')
for line in lines:
line = line.strip()
if line.startswith('file://'):
# URI格式
file_path = line[7:] # 移除 'file://' 前缀
file_path = file_path.replace('%20', ' ') # 解码空格
if self._is_supported_format(file_path):
self.add_dropped_file(file_path)
elif os.path.isabs(line) and self._is_supported_format(line):
# 绝对路径
self.add_dropped_file(line)
def _monitor_wayland_drag(self):
"""监控Wayland拖拽事件"""
try:
# 尝试使用wlrctl监控拖拽事件
if self._check_command_exists('wlrctl'):
self._monitor_wlrctl_drag()
else:
# 尝试使用wl-paste监控剪贴板
if self._check_command_exists('wl-paste'):
self._monitor_wayland_clipboard()
else:
print("Wayland拖拽监控工具不可用切换到文件监控")
self._monitor_file_drag()
except Exception as e:
print(f"Wayland拖拽监控错误: {e}")
print("切换到文件监控模式")
self._monitor_file_drag()
def _check_command_exists(self, command):
"""检查命令是否存在"""
try:
result = subprocess.run(['which', command],
capture_output=True,
timeout=1)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def _monitor_wlrctl_drag(self):
"""使用wlrctl监控拖拽事件"""
print("使用wlrctl监控Wayland拖拽事件")
try:
# wlrctl可能支持拖拽监控
process = subprocess.Popen(['wlrctl', 'drag'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
while self.is_running:
line = process.stdout.readline()
if not line:
break
# 解析wlrctl输出
if 'file://' in line or os.path.isabs(line.strip()):
self._process_drag_line(line.strip())
except Exception as e:
print(f"wlrctl监控失败: {e}")
raise
finally:
if 'process' in locals():
process.terminate()
def _monitor_wayland_clipboard(self):
"""监控Wayland剪贴板变化"""
print("使用wl-paste监控Wayland剪贴板")
last_clipboard = ""
while self.is_running:
try:
# 获取当前剪贴板内容
result = subprocess.run(['wl-paste', '--type', 'text/uri-list'],
capture_output=True,
text=True,
timeout=1)
if result.returncode == 0:
current_clipboard = result.stdout.strip()
if current_clipboard and current_clipboard != last_clipboard:
last_clipboard = current_clipboard
self._process_clipboard_content(current_clipboard)
except subprocess.TimeoutExpired:
pass
except Exception as e:
print(f"Wayland剪贴板监控错误: {e}")
time.sleep(0.5)
def _process_drag_line(self, line):
"""处理拖拽行内容"""
if line.startswith('file://'):
file_path = line[7:]
file_path = file_path.replace('%20', ' ')
if self._is_supported_format(file_path):
self.add_dropped_file(file_path)
elif os.path.isabs(line) and self._is_supported_format(line):
self.add_dropped_file(line)
def _get_user_directories(self):
"""获取用户常用目录"""
directories = []
# 获取桌面目录
desktop = os.path.expanduser("~/Desktop")
if os.path.exists(desktop):
directories.append(desktop)
# 获取下载目录
downloads = os.path.expanduser("~/Downloads")
if os.path.exists(downloads):
directories.append(downloads)
# 获取主目录
home = os.path.expanduser("~")
if os.path.exists(home):
directories.append(home)
# 获取临时目录
temp_dirs = ['/tmp', '/var/tmp']
for temp_dir in temp_dirs:
if os.path.exists(temp_dir):
directories.append(temp_dir)
return directories
def _monitor_file_drag(self):
"""通过文件系统变化监控拖拽"""
watch_dirs = self._get_user_directories()
known_files = {}
recent_files = {} # 用于跟踪最近添加的文件
# 初始化已知文件
for watch_dir in watch_dirs:
if os.path.exists(watch_dir):
for filename in os.listdir(watch_dir):
filepath = os.path.join(watch_dir, filename)
if self._is_supported_format(filepath):
try:
stat = os.stat(filepath)
known_files[filepath] = stat.st_mtime
except OSError:
pass
print(f"开始监控目录: {watch_dirs}")
print(f"已知文件数量: {len(known_files)}")
while self.is_running:
try:
current_time = time.time()
for watch_dir in watch_dirs:
if not os.path.exists(watch_dir):
continue
try:
# 获取当前目录中的文件
current_files = {}
for filename in os.listdir(watch_dir):
filepath = os.path.join(watch_dir, filename)
if self._is_supported_format(filepath):
try:
stat = os.stat(filepath)
current_files[filepath] = stat.st_mtime
# 检查是否是新文件最近5秒内创建
if filepath not in known_files:
time_diff = current_time - stat.st_mtime
if time_diff < 5.0:
# 检查文件大小是否稳定(避免检测到正在下载的文件)
if self._is_file_stable(filepath):
print(f"检测到新文件: {filepath}")
recent_files[filepath] = current_time
self.add_dropped_file(filepath)
# 检查已知文件是否被更新
elif filepath in known_files:
if stat.st_mtime > known_files[filepath]:
time_diff = current_time - stat.st_mtime
if time_diff < 5.0 and filepath not in recent_files:
print(f"检测到文件更新: {filepath}")
recent_files[filepath] = current_time
self.add_dropped_file(filepath)
except OSError:
continue
# 更新已知文件列表
known_files.update(current_files)
except PermissionError:
print(f"无权限访问目录: {watch_dir}")
continue
except Exception as e:
print(f"监控目录 {watch_dir} 时出错: {e}")
# 清理过期的最近文件记录超过30秒
expired_files = [path for path, timestamp in recent_files.items()
if current_time - timestamp > 30]
for path in expired_files:
del recent_files[path]
except Exception as e:
print(f"文件拖拽监控错误: {e}")
time.sleep(0.5)
def _is_file_stable(self, filepath, check_interval=0.5, max_checks=3):
"""检查文件大小是否稳定(避免检测到正在下载的文件)"""
try:
initial_size = os.path.getsize(filepath)
for i in range(max_checks):
time.sleep(check_interval)
current_size = os.path.getsize(filepath)
if current_size != initial_size:
return False
return True
except OSError:
return False
def _monitor_loop(self):
"""主监控循环"""
if self._is_x11:
try:
self._monitor_x11_drag()
except Exception as e:
print(f"X11监控失败切换到文件监控: {e}")
self._monitor_file_drag()
elif self._is_wayland:
try:
self._monitor_wayland_drag()
except Exception as e:
print(f"Wayland监控失败切换到文件监控: {e}")
self._monitor_file_drag()
else:
# 降级到文件监控
self._monitor_file_drag()
def is_supported(self) -> bool:
"""检查Linux平台是否支持"""
return self._is_x11 or self._is_wayland or self._xdg_active

View File

@ -1,105 +0,0 @@
"""
macOS平台拖拽检测器
使用macOS Cocoa API实现系统级拖拽检测
"""
import os
import sys
import time
import threading
from typing import List, Optional
from .base_detector import BaseDragDetector
class MacOSDragDetector(BaseDragDetector):
"""macOS平台拖拽检测器"""
def __init__(self, supported_formats: List[str] = None):
"""
初始化macOS拖拽检测器
Args:
supported_formats: 支持的文件格式列表
"""
super().__init__(supported_formats)
self._app_kit_available = self._check_app_kit()
self._cocoa_available = self._check_cocoa()
def _check_app_kit(self) -> bool:
"""检查AppKit是否可用"""
try:
import AppKit
return True
except ImportError:
return False
def _check_cocoa(self) -> bool:
"""检查Cocoa是否可用"""
try:
import Cocoa
return True
except ImportError:
return False
def _setup_cocoa_drag_drop(self) -> bool:
"""设置Cocoa拖拽接收"""
try:
if not self._app_kit_available:
return False
import AppKit
# 创建拖拽接收器
# 这里需要实现macOS的拖拽API调用
# 暂时返回False需要进一步实现
return False
except Exception as e:
print(f"设置Cocoa拖拽失败: {e}")
return False
def _monitor_file_drag(self):
"""通过文件系统变化监控拖拽"""
# 监控用户桌面和下载目录
desktop_path = os.path.expanduser("~/Desktop")
downloads_path = os.path.expanduser("~/Downloads")
watch_dirs = [desktop_path, downloads_path]
while self.is_running:
try:
current_time = time.time()
for watch_dir in watch_dirs:
if not os.path.exists(watch_dir):
continue
for filename in os.listdir(watch_dir):
filepath = os.path.join(watch_dir, filename)
try:
file_time = os.path.getmtime(filepath)
# 如果文件是最近1秒内创建的
if current_time - file_time < 1.0:
if self._is_supported_format(filepath):
self.add_dropped_file(filepath)
except OSError:
continue
except Exception as e:
print(f"文件拖拽监控错误: {e}")
time.sleep(0.5)
def _monitor_loop(self):
"""主监控循环"""
if self._app_kit_available and self._cocoa_available:
# 尝试设置真正的拖拽接收
if not self._setup_cocoa_drag_drop():
# 降级到文件系统监控
self._monitor_file_drag()
else:
# 降级到文件系统监控
self._monitor_file_drag()
def is_supported(self) -> bool:
"""检查macOS平台是否支持"""
return sys.platform.startswith('darwin')

View File

@ -1,66 +0,0 @@
"""
平台拖拽检测器
自动选择合适的平台特定拖拽检测器
"""
from .base_detector import BaseDragDetector, DragDetectorFactory
from typing import List
class PlatformDragDetector:
"""平台拖拽检测器包装类"""
def __init__(self, supported_formats: List[str] = None):
"""
初始化平台拖拽检测器
Args:
supported_formats: 支持的文件格式列表
"""
self.supported_formats = supported_formats or ['.gltf', '.glb', '.fbx', '.bam', '.egg', '.obj']
self.detector = DragDetectorFactory.create_detector(self.supported_formats)
self._is_monitoring = False
def set_drop_callback(self, callback):
"""设置拖拽回调函数"""
if self.detector:
self.detector.set_drop_callback(callback)
def start_monitoring(self):
"""开始监控"""
if self.detector and not self._is_monitoring:
self.detector.start_monitoring()
self._is_monitoring = True
def stop_monitoring(self):
"""停止监控"""
if self.detector and self._is_monitoring:
self.detector.stop_monitoring()
self._is_monitoring = False
def get_dropped_files(self):
"""获取拖拽的文件"""
if self.detector:
return self.detector.get_dropped_files()
return []
def is_supported(self) -> bool:
"""检查当前平台是否支持"""
return self.detector.is_supported() if self.detector else False
def add_dropped_file(self, file_path: str):
"""添加拖拽的文件路径"""
if self.detector:
self.detector.add_dropped_file(file_path)
def get_platform_info(self) -> dict:
"""获取平台信息"""
import platform
return {
'system': platform.system(),
'release': platform.release(),
'version': platform.version(),
'detector_type': type(self.detector).__name__,
'supported': self.is_supported()
}

View File

@ -1,233 +0,0 @@
#!/usr/bin/env python3
"""
简化的拖拽检测器
使用文件系统监控和剪贴板检测实现拖拽功能
"""
import os
import sys
import time
import subprocess
import threading
from typing import List, Optional, Callable
from collections import deque
import tempfile
class SimpleDragDetector:
"""简化的拖拽检测器"""
def __init__(self, supported_formats: List[str]):
self.supported_formats = supported_formats
self.is_running = False
self.drop_callback = None
self.dropped_files = deque()
self.monitor_thread = None
self.last_clipboard = ""
def set_drop_callback(self, callback: Callable[[List[str]], None]):
"""设置拖拽回调函数"""
self.drop_callback = callback
def start_monitoring(self):
"""开始监控"""
if self.is_running:
return
self.is_running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print("✓ 简化拖拽监控已启动")
def stop_monitoring(self):
"""停止监控"""
self.is_running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=1)
def get_dropped_files(self) -> List[str]:
"""获取拖拽的文件列表"""
files = list(self.dropped_files)
self.dropped_files.clear()
return files
def add_dropped_file(self, file_path: str):
"""添加拖拽的文件"""
if os.path.exists(file_path):
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in self.supported_formats:
self.dropped_files.append(file_path)
print(f"检测到拖拽文件: {file_path}")
def _monitor_loop(self):
"""监控循环"""
watch_dirs = self._get_watch_directories()
known_files = set()
# 初始化已知文件
for watch_dir in watch_dirs:
self._scan_directory(watch_dir, known_files)
print(f"监控目录: {watch_dirs}")
print(f"已知文件数量: {len(known_files)}")
while self.is_running:
try:
current_time = time.time()
# 1. 检查剪贴板变化
self._check_clipboard()
# 2. 检查文件系统变化
for watch_dir in watch_dirs:
if not os.path.exists(watch_dir):
continue
try:
current_files = set()
for filename in os.listdir(watch_dir):
filepath = os.path.join(watch_dir, filename)
if self._is_supported_format(filepath):
try:
stat = os.stat(filepath)
current_files.add(filepath)
# 检查新文件
if filepath not in known_files:
time_diff = current_time - stat.st_mtime
if time_diff < 5.0: # 5秒内创建的文件
if self._is_file_stable(filepath):
print(f"检测到新文件: {filepath}")
self.add_dropped_file(filepath)
if self.drop_callback:
self.drop_callback([filepath])
except OSError:
continue
known_files.update(current_files)
except PermissionError:
continue
except Exception as e:
print(f"监控目录 {watch_dir} 时出错: {e}")
except Exception as e:
print(f"监控循环错误: {e}")
time.sleep(0.3) # 更频繁的检查
def _check_clipboard(self):
"""检查剪贴板变化"""
try:
# 检查剪贴板
result = subprocess.run(['xclip', '-selection', 'clipboard', '-o'],
capture_output=True, text=True, timeout=0.5)
if result.returncode == 0 and result.stdout.strip():
current_clipboard = result.stdout.strip()
if current_clipboard != self.last_clipboard:
self.last_clipboard = current_clipboard
files = self._parse_clipboard_content(current_clipboard)
if files:
print(f"剪贴板检测到文件: {files}")
for file_path in files:
self.add_dropped_file(file_path)
if self.drop_callback:
self.drop_callback(files)
except Exception:
# 尝试使用xsel
try:
result = subprocess.run(['xsel', '--clipboard', '--output'],
capture_output=True, text=True, timeout=0.5)
if result.returncode == 0 and result.stdout.strip():
current_clipboard = result.stdout.strip()
if current_clipboard != self.last_clipboard:
self.last_clipboard = current_clipboard
files = self._parse_clipboard_content(current_clipboard)
if files:
for file_path in files:
self.add_dropped_file(file_path)
if self.drop_callback:
self.drop_callback(files)
except Exception:
pass
def _parse_clipboard_content(self, content: str) -> List[str]:
"""解析剪贴板内容"""
files = []
for line in content.split('\n'):
line = line.strip()
if line.startswith('file://'):
file_path = line[7:]
file_path = file_path.replace('%20', ' ')
if os.path.exists(file_path):
files.append(file_path)
elif os.path.isabs(line) and os.path.exists(line):
files.append(line)
return files
def _get_watch_directories(self) -> List[str]:
"""获取监控目录"""
dirs = [
os.path.expanduser('~/Desktop'),
os.path.expanduser('~/Downloads'),
tempfile.gettempdir()
]
# 添加一些常见的拖拽目标目录
additional_dirs = [
os.path.expanduser('~/Documents'),
os.path.expanduser('~'),
'/tmp'
]
for dir_path in additional_dirs:
if os.path.exists(dir_path) and os.path.isdir(dir_path):
dirs.append(dir_path)
return [d for d in dirs if os.path.exists(d)]
def _scan_directory(self, directory: str, known_files: set):
"""扫描目录"""
try:
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if self._is_supported_format(filepath):
try:
known_files.add(filepath)
except OSError:
pass
except Exception:
pass
def _is_supported_format(self, file_path: str) -> bool:
"""检查文件格式是否支持"""
if not os.path.isfile(file_path):
return False
file_ext = os.path.splitext(file_path)[1].lower()
return file_ext in self.supported_formats
def _is_file_stable(self, file_path: str, wait_time: float = 0.5) -> bool:
"""检查文件是否稳定"""
try:
initial_size = os.path.getsize(file_path)
time.sleep(wait_time)
current_size = os.path.getsize(file_path)
return initial_size == current_size
except:
return False
def is_supported(self) -> bool:
"""检查是否支持"""
return True # 这个版本总是支持的
def get_platform_info(self) -> dict:
"""获取平台信息"""
return {
'system': 'Linux',
'detector_type': 'SimpleDragDetector',
'supported': True,
'method': 'File system + clipboard monitoring'
}

View File

@ -1,221 +0,0 @@
"""
Windows平台拖拽检测器
使用Windows API实现系统级拖拽检测
"""
import os
import sys
import time
import threading
from typing import List, Optional
from .base_detector import BaseDragDetector
class WindowsDragDetector(BaseDragDetector):
"""Windows平台拖拽检测器"""
def __init__(self, supported_formats: List[str] = None):
"""
初始化Windows拖拽检测器
Args:
supported_formats: 支持的文件格式列表
"""
super().__init__(supported_formats)
self._hwnd = None
self._old_win_proc = None
self._is_com_initialized = False
self._ole_initialized = False
def _initialize_com(self) -> bool:
"""初始化COM"""
try:
import ctypes
from ctypes import wintypes
# 初始化COM
ctypes.windll.ole32.CoInitializeEx(None, 0) # COINIT_APARTMENTTHREADED
self._is_com_initialized = True
return True
except Exception as e:
print(f"COM初始化失败: {e}")
return False
def _initialize_ole(self) -> bool:
"""初始化OLE"""
try:
import ctypes
from ctypes import wintypes
# 初始化OLE
ctypes.windll.ole32.OleInitialize(None)
self._ole_initialized = True
return True
except Exception as e:
print(f"OLE初始化失败: {e}")
return False
def _get_window_handle(self) -> Optional[int]:
"""获取Panda3D窗口句柄"""
try:
# 这里需要获取Panda3D窗口的句柄
# 可能需要通过Panda3D的API获取
# 暂时返回None需要进一步实现
return None
except Exception as e:
print(f"获取窗口句柄失败: {e}")
return None
def _setup_drag_drop(self) -> bool:
"""设置拖拽接收"""
try:
import ctypes
from ctypes import wintypes
hwnd = self._get_window_handle()
if not hwnd:
return False
# 注册拖拽接收窗口
# 这里需要实现Windows拖拽API的调用
# 暂时返回False需要进一步实现
return False
except Exception as e:
print(f"设置拖拽接收失败: {e}")
return False
def _monitor_clipboard_drag(self):
"""通过剪贴板监控拖拽"""
try:
import ctypes
from ctypes import wintypes
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32
# 监控剪贴板变化
while self.is_running:
# 检查剪贴板是否包含文件
if user32.OpenClipboard(None):
try:
# 检查是否是文件格式
format_id = 15 # CF_HDROP
if user32.IsClipboardFormatAvailable(format_id):
# 获取拖拽文件列表
h_drop = user32.GetClipboardData(format_id)
if h_drop:
files = self._parse_drop_files(h_drop)
for file_path in files:
self.add_dropped_file(file_path)
finally:
user32.CloseClipboard()
time.sleep(0.1)
except Exception as e:
print(f"剪贴板监控错误: {e}")
def _parse_drop_files(self, h_drop) -> List[str]:
"""解析拖拽文件列表"""
try:
import ctypes
from ctypes import wintypes
kernel32 = ctypes.windll.kernel32
shell32 = ctypes.windll.shell32
# 获取文件数量
file_count = shell32.DragQueryFileW(h_drop, 0xFFFFFFFF, None, 0)
files = []
# 获取每个文件路径
for i in range(file_count):
# 获取文件路径长度
length = shell32.DragQueryFileW(h_drop, i, None, 0)
if length > 0:
# 分配缓冲区
buffer = ctypes.create_unicode_buffer(length + 1)
# 获取文件路径
shell32.DragQueryFileW(h_drop, i, buffer, length + 1)
files.append(buffer.value)
return files
except Exception as e:
print(f"解析拖拽文件失败: {e}")
return []
def _monitor_file_drag(self):
"""通过文件系统变化监控拖拽"""
# 监控用户桌面和下载目录
import winreg
try:
# 获取桌面路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders")
desktop_path = winreg.QueryValueEx(key, "Desktop")[0]
downloads_path = winreg.QueryValueEx(key, "{374DE290-123F-4565-9164-39C4925E467B}")[0] # 下载文件夹
winreg.CloseKey(key)
except Exception:
# 降级到默认路径
desktop_path = os.path.expanduser("~/Desktop")
downloads_path = os.path.expanduser("~/Downloads")
watch_dirs = [desktop_path, downloads_path]
while self.is_running:
try:
current_time = time.time()
for watch_dir in watch_dirs:
if not os.path.exists(watch_dir):
continue
for filename in os.listdir(watch_dir):
filepath = os.path.join(watch_dir, filename)
try:
file_time = os.path.getmtime(filepath)
# 如果文件是最近1秒内创建的
if current_time - file_time < 1.0:
if self._is_supported_format(filepath):
self.add_dropped_file(filepath)
except OSError:
continue
except Exception as e:
print(f"文件拖拽监控错误: {e}")
time.sleep(0.5)
def _monitor_loop(self):
"""主监控循环"""
# 尝试初始化COM和OLE
com_ready = self._initialize_com()
ole_ready = self._initialize_ole()
if com_ready and ole_ready:
# 尝试设置真正的拖拽接收
if self._setup_drag_drop():
# 使用Windows API拖拽检测
pass
else:
# 降级到剪贴板监控
self._monitor_clipboard_drag()
else:
# 降级到文件系统监控
self._monitor_file_drag()
# 清理COM和OLE
try:
if self._ole_initialized:
import ctypes
ctypes.windll.ole32.OleUninitialize()
if self._is_com_initialized:
import ctypes
ctypes.windll.ole32.CoUninitialize()
except Exception:
pass
def is_supported(self) -> bool:
"""检查Windows平台是否支持"""
return sys.platform.startswith('win')

View File

@ -1,311 +0,0 @@
#!/usr/bin/env python3
"""
X11窗口拖拽事件接收器
使用X11的Xdnd协议实现真正的窗口拖拽功能
"""
import os
import sys
import time
import subprocess
import threading
from typing import List, Optional, Callable
from collections import deque
class X11DragReceiver:
"""X11窗口拖拽事件接收器"""
def __init__(self, supported_formats: List[str]):
self.supported_formats = supported_formats
self.window_id = None
self.is_running = False
self.drop_callback = None
self.dropped_files = deque()
self.monitor_thread = None
# X11相关
self.x11_display = os.environ.get('DISPLAY', ':0')
self.xdnd_aware = True # 标记窗口支持拖拽
def set_window(self, window_id: int):
"""设置要监控的窗口ID"""
self.window_id = window_id
print(f"设置拖拽监控窗口: {window_id}")
def set_drop_callback(self, callback: Callable[[List[str]], None]):
"""设置拖拽回调函数"""
self.drop_callback = callback
def start_monitoring(self):
"""开始监控拖拽事件"""
if self.is_running:
return
self.is_running = True
self.monitor_thread = threading.Thread(target=self._monitor_drag_events)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print("X11拖拽监控已启动")
def stop_monitoring(self):
"""停止监控"""
self.is_running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=1)
def get_dropped_files(self) -> List[str]:
"""获取拖拽的文件列表"""
files = list(self.dropped_files)
self.dropped_files.clear()
return files
def add_dropped_file(self, file_path: str):
"""添加拖拽的文件"""
if os.path.exists(file_path):
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in self.supported_formats:
self.dropped_files.append(file_path)
print(f"检测到拖拽文件: {file_path}")
def _monitor_drag_events(self):
"""监控拖拽事件"""
if not self.window_id:
print("窗口ID未设置无法监控拖拽事件")
return
try:
# 首先注册窗口为拖拽目标
self._register_drag_target()
# 使用xev监控窗口事件
cmd = ['xev', '-id', str(self.window_id), '-event', 'dnd']
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
print(f"开始监控窗口 {self.window_id} 的拖拽事件")
while self.is_running:
line = process.stdout.readline()
if not line:
break
# 解析拖拽事件
if 'XdndEnter' in line:
self._handle_drag_enter(line)
elif 'XdndPosition' in line:
self._handle_drag_position(line)
elif 'XdndDrop' in line:
self._handle_drag_drop(line)
elif 'XdndLeave' in line:
self._handle_drag_leave(line)
except Exception as e:
print(f"拖拽事件监控错误: {e}")
# 降级到文件监控
self._fallback_to_file_monitor()
finally:
if 'process' in locals():
process.terminate()
def _register_drag_target(self):
"""注册窗口为拖拽目标"""
try:
# 使用xprop设置窗口属性标记支持拖拽
cmd = ['xprop', '-id', str(self.window_id), '-f', '_NET_WM_WINDOW_TYPE', '32a',
'-set', '_NET_WM_WINDOW_TYPE', '_NET_WM_WINDOW_TYPE_NORMAL']
subprocess.run(cmd, capture_output=True, timeout=2)
# 设置XdndAware属性
cmd = ['xprop', '-id', str(self.window_id), '-f', 'XdndAware', '32c',
'-set', 'XdndAware', '5']
subprocess.run(cmd, capture_output=True, timeout=2)
print(f"窗口 {self.window_id} 已注册为拖拽目标")
except Exception as e:
print(f"注册拖拽目标失败: {e}")
def _handle_drag_enter(self, event_line: str):
"""处理拖拽进入事件"""
print("检测到拖拽进入")
def _handle_drag_position(self, event_line: str):
"""处理拖拽位置事件"""
# 可以在这里更新拖拽位置显示
pass
def _handle_drag_drop(self, event_line: str):
"""处理拖拽释放事件"""
print("检测到拖拽释放")
# 尝试获取拖拽的文件
files = self._extract_dropped_files()
if files:
for file_path in files:
self.add_dropped_file(file_path)
# 调用回调函数
if self.drop_callback:
self.drop_callback(list(self.dropped_files))
def _handle_drag_leave(self, event_line: str):
"""处理拖拽离开事件"""
print("拖拽离开窗口")
def _extract_dropped_files(self) -> List[str]:
"""提取拖拽的文件路径"""
files = []
try:
# 方法1尝试从剪贴板获取
result = subprocess.run(['xclip', '-selection', 'clipboard', '-o'],
capture_output=True, text=True, timeout=1)
if result.returncode == 0 and result.stdout.strip():
files.extend(self._parse_uri_list(result.stdout.strip()))
# 方法2尝试从主选择获取
result = subprocess.run(['xclip', '-selection', 'primary', '-o'],
capture_output=True, text=True, timeout=1)
if result.returncode == 0 and result.stdout.strip():
files.extend(self._parse_uri_list(result.stdout.strip()))
# 方法3使用xsel作为备选
if not files:
result = subprocess.run(['xsel', '--clipboard', '--output'],
capture_output=True, text=True, timeout=1)
if result.returncode == 0 and result.stdout.strip():
files.extend(self._parse_uri_list(result.stdout.strip()))
except Exception as e:
print(f"提取拖拽文件失败: {e}")
return files
def _parse_uri_list(self, uri_list: str) -> List[str]:
"""解析URI列表提取文件路径"""
files = []
for line in uri_list.split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
if line.startswith('file://'):
# URI格式
file_path = line[7:] # 移除 'file://' 前缀
file_path = file_path.replace('%20', ' ') # 解码空格
file_path = file_path.strip()
# 移除可能的回车符
if file_path.endswith('\r'):
file_path = file_path[:-1]
if os.path.exists(file_path):
files.append(file_path)
elif os.path.isabs(line):
# 绝对路径
if os.path.exists(line):
files.append(line)
return files
def _fallback_to_file_monitor(self):
"""降级到文件监控"""
print("降级到文件监控模式")
# 监控桌面和下载目录
watch_dirs = [
os.path.expanduser('~/Desktop'),
os.path.expanduser('~/Downloads'),
'/tmp'
]
known_files = set()
# 初始化已知文件
for watch_dir in watch_dirs:
if os.path.exists(watch_dir):
for filename in os.listdir(watch_dir):
filepath = os.path.join(watch_dir, filename)
if self._is_supported_format(filepath):
try:
known_files.add(filepath)
except OSError:
pass
while self.is_running:
try:
current_time = time.time()
for watch_dir in watch_dirs:
if not os.path.exists(watch_dir):
continue
try:
for filename in os.listdir(watch_dir):
filepath = os.path.join(watch_dir, filename)
if self._is_supported_format(filepath):
try:
file_time = os.path.getmtime(filepath)
# 检查是否是新文件最近5秒内创建
if filepath not in known_files and current_time - file_time < 5.0:
print(f"检测到新文件: {filepath}")
self.add_dropped_file(filepath)
# 调用回调函数
if self.drop_callback:
self.drop_callback([filepath])
known_files.add(filepath)
except OSError:
continue
except PermissionError:
continue
except Exception as e:
print(f"监控目录 {watch_dir} 时出错: {e}")
except Exception as e:
print(f"文件监控错误: {e}")
time.sleep(0.5)
def _is_supported_format(self, file_path: str) -> bool:
"""检查文件格式是否支持"""
if not os.path.isfile(file_path):
return False
file_ext = os.path.splitext(file_path)[1].lower()
return file_ext in self.supported_formats
def is_supported(self) -> bool:
"""检查是否支持拖拽功能"""
# 检查X11环境
if not os.environ.get('DISPLAY'):
return False
# 检查必要工具
required_tools = ['xev', 'xprop', 'xclip']
for tool in required_tools:
try:
subprocess.run(['which', tool], capture_output=True, check=True)
except subprocess.CalledProcessError:
print(f"缺少必要工具: {tool}")
return False
return True
def get_platform_info(self) -> dict:
"""获取平台信息"""
return {
'system': 'Linux',
'detector_type': 'X11DragReceiver',
'supported': self.is_supported(),
'display': self.x11_display,
'window_id': self.window_id
}