238 lines
8.1 KiB
Python
238 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
文件系统监控器
|
|
实时监测目录和文件的变化
|
|
"""
|
|
|
|
import os
|
|
from PyQt5.QtCore import QObject, QFileSystemWatcher, pyqtSignal, QTimer
|
|
from PyQt5.QtWidgets import QMessageBox
|
|
|
|
class FileWatcher(QObject):
|
|
"""文件系统监控器"""
|
|
|
|
# 信号定义
|
|
directory_changed = pyqtSignal(str) # 目录变化信号
|
|
file_changed = pyqtSignal(str) # 文件变化信号
|
|
project_added = pyqtSignal(str) # 项目添加信号
|
|
project_removed = pyqtSignal(str) # 项目删除信号
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
|
|
# 创建文件系统监控器
|
|
self.watcher = QFileSystemWatcher()
|
|
|
|
# 连接信号
|
|
self.watcher.directoryChanged.connect(self.on_directory_changed)
|
|
self.watcher.fileChanged.connect(self.on_file_changed)
|
|
|
|
# 监控的路径列表
|
|
self.watched_directories = set()
|
|
self.watched_files = set()
|
|
|
|
# 目录内容缓存(用于检测具体变化)
|
|
self.directory_contents = {}
|
|
|
|
# 延迟处理定时器(避免频繁触发)
|
|
self.change_timer = QTimer()
|
|
self.change_timer.setSingleShot(True)
|
|
self.change_timer.timeout.connect(self.process_pending_changes)
|
|
self.pending_changes = set()
|
|
|
|
def add_directory(self, directory_path):
|
|
"""
|
|
添加目录监控
|
|
|
|
Args:
|
|
directory_path (str): 要监控的目录路径
|
|
"""
|
|
if not os.path.exists(directory_path):
|
|
print(f"警告: 目录不存在 - {directory_path}")
|
|
return False
|
|
|
|
if not os.path.isdir(directory_path):
|
|
print(f"警告: 路径不是目录 - {directory_path}")
|
|
return False
|
|
|
|
# 规范化路径
|
|
directory_path = os.path.abspath(directory_path)
|
|
|
|
if directory_path not in self.watched_directories:
|
|
# 添加到监控列表
|
|
self.watcher.addPath(directory_path)
|
|
self.watched_directories.add(directory_path)
|
|
|
|
# 缓存目录内容
|
|
self.cache_directory_contents(directory_path)
|
|
|
|
print(f"开始监控目录: {directory_path}")
|
|
return True
|
|
else:
|
|
print(f"目录已在监控中: {directory_path}")
|
|
return False
|
|
|
|
def remove_directory(self, directory_path):
|
|
"""
|
|
移除目录监控
|
|
|
|
Args:
|
|
directory_path (str): 要移除监控的目录路径
|
|
"""
|
|
directory_path = os.path.abspath(directory_path)
|
|
|
|
if directory_path in self.watched_directories:
|
|
self.watcher.removePath(directory_path)
|
|
self.watched_directories.remove(directory_path)
|
|
|
|
# 清除缓存
|
|
if directory_path in self.directory_contents:
|
|
del self.directory_contents[directory_path]
|
|
|
|
print(f"停止监控目录: {directory_path}")
|
|
return True
|
|
else:
|
|
print(f"目录未在监控中: {directory_path}")
|
|
return False
|
|
|
|
def add_file(self, file_path):
|
|
"""
|
|
添加文件监控
|
|
|
|
Args:
|
|
file_path (str): 要监控的文件路径
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
print(f"警告: 文件不存在 - {file_path}")
|
|
return False
|
|
|
|
if not os.path.isfile(file_path):
|
|
print(f"警告: 路径不是文件 - {file_path}")
|
|
return False
|
|
|
|
file_path = os.path.abspath(file_path)
|
|
|
|
if file_path not in self.watched_files:
|
|
self.watcher.addPath(file_path)
|
|
self.watched_files.add(file_path)
|
|
print(f"开始监控文件: {file_path}")
|
|
return True
|
|
else:
|
|
print(f"文件已在监控中: {file_path}")
|
|
return False
|
|
|
|
def remove_file(self, file_path):
|
|
"""
|
|
移除文件监控
|
|
|
|
Args:
|
|
file_path (str): 要移除监控的文件路径
|
|
"""
|
|
file_path = os.path.abspath(file_path)
|
|
|
|
if file_path in self.watched_files:
|
|
self.watcher.removePath(file_path)
|
|
self.watched_files.remove(file_path)
|
|
print(f"停止监控文件: {file_path}")
|
|
return True
|
|
else:
|
|
print(f"文件未在监控中: {file_path}")
|
|
return False
|
|
|
|
def cache_directory_contents(self, directory_path):
|
|
"""缓存目录内容"""
|
|
try:
|
|
contents = set(os.listdir(directory_path))
|
|
self.directory_contents[directory_path] = contents
|
|
except OSError as e:
|
|
print(f"无法读取目录内容: {directory_path} - {e}")
|
|
|
|
def on_directory_changed(self, directory_path):
|
|
"""目录变化处理"""
|
|
print(f"检测到目录变化: {directory_path}")
|
|
|
|
# 添加到待处理变化列表
|
|
self.pending_changes.add(directory_path)
|
|
|
|
# 启动延迟处理定时器
|
|
self.change_timer.start(500) # 500ms延迟
|
|
|
|
def on_file_changed(self, file_path):
|
|
"""文件变化处理"""
|
|
print(f"检测到文件变化: {file_path}")
|
|
|
|
# 立即发出文件变化信号
|
|
self.file_changed.emit(file_path)
|
|
|
|
def process_pending_changes(self):
|
|
"""处理待处理的目录变化"""
|
|
for directory_path in self.pending_changes:
|
|
self.analyze_directory_changes(directory_path)
|
|
|
|
self.pending_changes.clear()
|
|
|
|
def analyze_directory_changes(self, directory_path):
|
|
"""分析目录具体变化"""
|
|
if not os.path.exists(directory_path):
|
|
print(f"目录已被删除: {directory_path}")
|
|
self.directory_changed.emit(directory_path)
|
|
return
|
|
|
|
try:
|
|
# 获取当前目录内容
|
|
current_contents = set(os.listdir(directory_path))
|
|
|
|
# 获取之前缓存的内容
|
|
previous_contents = self.directory_contents.get(directory_path, set())
|
|
|
|
# 检测新增的项目
|
|
added_items = current_contents - previous_contents
|
|
for item in added_items:
|
|
item_path = os.path.join(directory_path, item)
|
|
if os.path.isdir(item_path):
|
|
print(f"检测到新增目录: {item_path}")
|
|
self.project_added.emit(item_path)
|
|
else:
|
|
print(f"检测到新增文件: {item_path}")
|
|
|
|
# 检测删除的项目
|
|
removed_items = previous_contents - current_contents
|
|
for item in removed_items:
|
|
item_path = os.path.join(directory_path, item)
|
|
print(f"检测到删除项目: {item_path}")
|
|
self.project_removed.emit(item_path)
|
|
|
|
# 更新缓存
|
|
self.directory_contents[directory_path] = current_contents
|
|
|
|
# 发出目录变化信号
|
|
if added_items or removed_items:
|
|
self.directory_changed.emit(directory_path)
|
|
|
|
except OSError as e:
|
|
print(f"分析目录变化失败: {directory_path} - {e}")
|
|
|
|
def get_watched_directories(self):
|
|
"""获取所有监控的目录"""
|
|
return list(self.watched_directories)
|
|
|
|
def get_watched_files(self):
|
|
"""获取所有监控的文件"""
|
|
return list(self.watched_files)
|
|
|
|
def clear_all_watches(self):
|
|
"""清除所有监控"""
|
|
# 移除所有路径
|
|
for path in list(self.watched_directories):
|
|
self.remove_directory(path)
|
|
|
|
for path in list(self.watched_files):
|
|
self.remove_file(path)
|
|
|
|
print("已清除所有文件监控")
|
|
|
|
def is_watching(self, path):
|
|
"""检查是否正在监控指定路径"""
|
|
path = os.path.abspath(path)
|
|
return path in self.watched_directories or path in self.watched_files |