#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 文件系统监控器 实时监测目录和文件的变化 """ import os from PyQt5.QtCore import QObject, QFileSystemWatcher, pyqtSignal, QTimer from PyQt5.QtWidgets import QMessageBox class FileWatcher(QObject): """文件系统监控器""" # 信号定义 directory_changed = pyqtSignal(str) # 目录变化信号 file_changed = pyqtSignal(str) # 文件变化信号 project_added = pyqtSignal(str) # 项目添加信号 project_removed = pyqtSignal(str) # 项目删除信号 def __init__(self, parent=None): super().__init__(parent) # 创建文件系统监控器 self.watcher = QFileSystemWatcher() # 连接信号 self.watcher.directoryChanged.connect(self.on_directory_changed) self.watcher.fileChanged.connect(self.on_file_changed) # 监控的路径列表 self.watched_directories = set() self.watched_files = set() # 目录内容缓存(用于检测具体变化) self.directory_contents = {} # 延迟处理定时器(避免频繁触发) self.change_timer = QTimer() self.change_timer.setSingleShot(True) self.change_timer.timeout.connect(self.process_pending_changes) self.pending_changes = set() def add_directory(self, directory_path): """ 添加目录监控 Args: directory_path (str): 要监控的目录路径 """ if not os.path.exists(directory_path): print(f"警告: 目录不存在 - {directory_path}") return False if not os.path.isdir(directory_path): print(f"警告: 路径不是目录 - {directory_path}") return False # 规范化路径 directory_path = os.path.abspath(directory_path) if directory_path not in self.watched_directories: # 添加到监控列表 self.watcher.addPath(directory_path) self.watched_directories.add(directory_path) # 缓存目录内容 self.cache_directory_contents(directory_path) print(f"开始监控目录: {directory_path}") return True else: print(f"目录已在监控中: {directory_path}") return False def remove_directory(self, directory_path): """ 移除目录监控 Args: directory_path (str): 要移除监控的目录路径 """ directory_path = os.path.abspath(directory_path) if directory_path in self.watched_directories: self.watcher.removePath(directory_path) self.watched_directories.remove(directory_path) # 清除缓存 if directory_path in self.directory_contents: del self.directory_contents[directory_path] print(f"停止监控目录: {directory_path}") return True else: print(f"目录未在监控中: {directory_path}") return False def add_file(self, file_path): """ 添加文件监控 Args: file_path (str): 要监控的文件路径 """ if not os.path.exists(file_path): print(f"警告: 文件不存在 - {file_path}") return False if not os.path.isfile(file_path): print(f"警告: 路径不是文件 - {file_path}") return False file_path = os.path.abspath(file_path) if file_path not in self.watched_files: self.watcher.addPath(file_path) self.watched_files.add(file_path) print(f"开始监控文件: {file_path}") return True else: print(f"文件已在监控中: {file_path}") return False def remove_file(self, file_path): """ 移除文件监控 Args: file_path (str): 要移除监控的文件路径 """ file_path = os.path.abspath(file_path) if file_path in self.watched_files: self.watcher.removePath(file_path) self.watched_files.remove(file_path) print(f"停止监控文件: {file_path}") return True else: print(f"文件未在监控中: {file_path}") return False def cache_directory_contents(self, directory_path): """缓存目录内容""" try: contents = set(os.listdir(directory_path)) self.directory_contents[directory_path] = contents except OSError as e: print(f"无法读取目录内容: {directory_path} - {e}") def on_directory_changed(self, directory_path): """目录变化处理""" print(f"检测到目录变化: {directory_path}") # 添加到待处理变化列表 self.pending_changes.add(directory_path) # 启动延迟处理定时器 self.change_timer.start(500) # 500ms延迟 def on_file_changed(self, file_path): """文件变化处理""" print(f"检测到文件变化: {file_path}") # 立即发出文件变化信号 self.file_changed.emit(file_path) def process_pending_changes(self): """处理待处理的目录变化""" for directory_path in self.pending_changes: self.analyze_directory_changes(directory_path) self.pending_changes.clear() def analyze_directory_changes(self, directory_path): """分析目录具体变化""" if not os.path.exists(directory_path): print(f"目录已被删除: {directory_path}") self.directory_changed.emit(directory_path) return try: # 获取当前目录内容 current_contents = set(os.listdir(directory_path)) # 获取之前缓存的内容 previous_contents = self.directory_contents.get(directory_path, set()) # 检测新增的项目 added_items = current_contents - previous_contents for item in added_items: item_path = os.path.join(directory_path, item) if os.path.isdir(item_path): print(f"检测到新增目录: {item_path}") self.project_added.emit(item_path) else: print(f"检测到新增文件: {item_path}") # 检测删除的项目 removed_items = previous_contents - current_contents for item in removed_items: item_path = os.path.join(directory_path, item) print(f"检测到删除项目: {item_path}") self.project_removed.emit(item_path) # 更新缓存 self.directory_contents[directory_path] = current_contents # 发出目录变化信号 if added_items or removed_items: self.directory_changed.emit(directory_path) except OSError as e: print(f"分析目录变化失败: {directory_path} - {e}") def get_watched_directories(self): """获取所有监控的目录""" return list(self.watched_directories) def get_watched_files(self): """获取所有监控的文件""" return list(self.watched_files) def clear_all_watches(self): """清除所有监控""" # 移除所有路径 for path in list(self.watched_directories): self.remove_directory(path) for path in list(self.watched_files): self.remove_file(path) print("已清除所有文件监控") def is_watching(self, path): """检查是否正在监控指定路径""" path = os.path.abspath(path) return path in self.watched_directories or path in self.watched_files