EG/core/drag_drop/simple_detector.py
2026-01-21 09:59:13 +08:00

234 lines
8.9 KiB
Python

#!/usr/bin/env python3
"""
简化的拖拽检测器
使用文件系统监控和剪贴板检测实现拖拽功能
"""
import os
import sys
import time
import subprocess
import threading
from typing import List, Optional, Callable
from collections import deque
import tempfile
class SimpleDragDetector:
"""简化的拖拽检测器"""
def __init__(self, supported_formats: List[str]):
self.supported_formats = supported_formats
self.is_running = False
self.drop_callback = None
self.dropped_files = deque()
self.monitor_thread = None
self.last_clipboard = ""
def set_drop_callback(self, callback: Callable[[List[str]], None]):
"""设置拖拽回调函数"""
self.drop_callback = callback
def start_monitoring(self):
"""开始监控"""
if self.is_running:
return
self.is_running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print("✓ 简化拖拽监控已启动")
def stop_monitoring(self):
"""停止监控"""
self.is_running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=1)
def get_dropped_files(self) -> List[str]:
"""获取拖拽的文件列表"""
files = list(self.dropped_files)
self.dropped_files.clear()
return files
def add_dropped_file(self, file_path: str):
"""添加拖拽的文件"""
if os.path.exists(file_path):
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in self.supported_formats:
self.dropped_files.append(file_path)
print(f"检测到拖拽文件: {file_path}")
def _monitor_loop(self):
"""监控循环"""
watch_dirs = self._get_watch_directories()
known_files = set()
# 初始化已知文件
for watch_dir in watch_dirs:
self._scan_directory(watch_dir, known_files)
print(f"监控目录: {watch_dirs}")
print(f"已知文件数量: {len(known_files)}")
while self.is_running:
try:
current_time = time.time()
# 1. 检查剪贴板变化
self._check_clipboard()
# 2. 检查文件系统变化
for watch_dir in watch_dirs:
if not os.path.exists(watch_dir):
continue
try:
current_files = set()
for filename in os.listdir(watch_dir):
filepath = os.path.join(watch_dir, filename)
if self._is_supported_format(filepath):
try:
stat = os.stat(filepath)
current_files.add(filepath)
# 检查新文件
if filepath not in known_files:
time_diff = current_time - stat.st_mtime
if time_diff < 5.0: # 5秒内创建的文件
if self._is_file_stable(filepath):
print(f"检测到新文件: {filepath}")
self.add_dropped_file(filepath)
if self.drop_callback:
self.drop_callback([filepath])
except OSError:
continue
known_files.update(current_files)
except PermissionError:
continue
except Exception as e:
print(f"监控目录 {watch_dir} 时出错: {e}")
except Exception as e:
print(f"监控循环错误: {e}")
time.sleep(0.3) # 更频繁的检查
def _check_clipboard(self):
"""检查剪贴板变化"""
try:
# 检查剪贴板
result = subprocess.run(['xclip', '-selection', 'clipboard', '-o'],
capture_output=True, text=True, timeout=0.5)
if result.returncode == 0 and result.stdout.strip():
current_clipboard = result.stdout.strip()
if current_clipboard != self.last_clipboard:
self.last_clipboard = current_clipboard
files = self._parse_clipboard_content(current_clipboard)
if files:
print(f"剪贴板检测到文件: {files}")
for file_path in files:
self.add_dropped_file(file_path)
if self.drop_callback:
self.drop_callback(files)
except Exception:
# 尝试使用xsel
try:
result = subprocess.run(['xsel', '--clipboard', '--output'],
capture_output=True, text=True, timeout=0.5)
if result.returncode == 0 and result.stdout.strip():
current_clipboard = result.stdout.strip()
if current_clipboard != self.last_clipboard:
self.last_clipboard = current_clipboard
files = self._parse_clipboard_content(current_clipboard)
if files:
for file_path in files:
self.add_dropped_file(file_path)
if self.drop_callback:
self.drop_callback(files)
except Exception:
pass
def _parse_clipboard_content(self, content: str) -> List[str]:
"""解析剪贴板内容"""
files = []
for line in content.split('\n'):
line = line.strip()
if line.startswith('file://'):
file_path = line[7:]
file_path = file_path.replace('%20', ' ')
if os.path.exists(file_path):
files.append(file_path)
elif os.path.isabs(line) and os.path.exists(line):
files.append(line)
return files
def _get_watch_directories(self) -> List[str]:
"""获取监控目录"""
dirs = [
os.path.expanduser('~/Desktop'),
os.path.expanduser('~/Downloads'),
tempfile.gettempdir()
]
# 添加一些常见的拖拽目标目录
additional_dirs = [
os.path.expanduser('~/Documents'),
os.path.expanduser('~'),
'/tmp'
]
for dir_path in additional_dirs:
if os.path.exists(dir_path) and os.path.isdir(dir_path):
dirs.append(dir_path)
return [d for d in dirs if os.path.exists(d)]
def _scan_directory(self, directory: str, known_files: set):
"""扫描目录"""
try:
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if self._is_supported_format(filepath):
try:
known_files.add(filepath)
except OSError:
pass
except Exception:
pass
def _is_supported_format(self, file_path: str) -> bool:
"""检查文件格式是否支持"""
if not os.path.isfile(file_path):
return False
file_ext = os.path.splitext(file_path)[1].lower()
return file_ext in self.supported_formats
def _is_file_stable(self, file_path: str, wait_time: float = 0.5) -> bool:
"""检查文件是否稳定"""
try:
initial_size = os.path.getsize(file_path)
time.sleep(wait_time)
current_size = os.path.getsize(file_path)
return initial_size == current_size
except:
return False
def is_supported(self) -> bool:
"""检查是否支持"""
return True # 这个版本总是支持的
def get_platform_info(self) -> dict:
"""获取平台信息"""
return {
'system': 'Linux',
'detector_type': 'SimpleDragDetector',
'supported': True,
'method': 'File system + clipboard monitoring'
}