MetaCore-startup/MetaCore/ui/file_watcher.py
2025-10-11 09:24:06 +08:00

238 lines
8.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件系统监控器
实时监测目录和文件的变化
"""
import os
from PyQt5.QtCore import QObject, QFileSystemWatcher, pyqtSignal, QTimer
from PyQt5.QtWidgets import QMessageBox
class FileWatcher(QObject):
"""文件系统监控器"""
# 信号定义
directory_changed = pyqtSignal(str) # 目录变化信号
file_changed = pyqtSignal(str) # 文件变化信号
project_added = pyqtSignal(str) # 项目添加信号
project_removed = pyqtSignal(str) # 项目删除信号
def __init__(self, parent=None):
super().__init__(parent)
# 创建文件系统监控器
self.watcher = QFileSystemWatcher()
# 连接信号
self.watcher.directoryChanged.connect(self.on_directory_changed)
self.watcher.fileChanged.connect(self.on_file_changed)
# 监控的路径列表
self.watched_directories = set()
self.watched_files = set()
# 目录内容缓存(用于检测具体变化)
self.directory_contents = {}
# 延迟处理定时器(避免频繁触发)
self.change_timer = QTimer()
self.change_timer.setSingleShot(True)
self.change_timer.timeout.connect(self.process_pending_changes)
self.pending_changes = set()
def add_directory(self, directory_path):
"""
添加目录监控
Args:
directory_path (str): 要监控的目录路径
"""
if not os.path.exists(directory_path):
print(f"警告: 目录不存在 - {directory_path}")
return False
if not os.path.isdir(directory_path):
print(f"警告: 路径不是目录 - {directory_path}")
return False
# 规范化路径
directory_path = os.path.abspath(directory_path)
if directory_path not in self.watched_directories:
# 添加到监控列表
self.watcher.addPath(directory_path)
self.watched_directories.add(directory_path)
# 缓存目录内容
self.cache_directory_contents(directory_path)
print(f"开始监控目录: {directory_path}")
return True
else:
print(f"目录已在监控中: {directory_path}")
return False
def remove_directory(self, directory_path):
"""
移除目录监控
Args:
directory_path (str): 要移除监控的目录路径
"""
directory_path = os.path.abspath(directory_path)
if directory_path in self.watched_directories:
self.watcher.removePath(directory_path)
self.watched_directories.remove(directory_path)
# 清除缓存
if directory_path in self.directory_contents:
del self.directory_contents[directory_path]
print(f"停止监控目录: {directory_path}")
return True
else:
print(f"目录未在监控中: {directory_path}")
return False
def add_file(self, file_path):
"""
添加文件监控
Args:
file_path (str): 要监控的文件路径
"""
if not os.path.exists(file_path):
print(f"警告: 文件不存在 - {file_path}")
return False
if not os.path.isfile(file_path):
print(f"警告: 路径不是文件 - {file_path}")
return False
file_path = os.path.abspath(file_path)
if file_path not in self.watched_files:
self.watcher.addPath(file_path)
self.watched_files.add(file_path)
print(f"开始监控文件: {file_path}")
return True
else:
print(f"文件已在监控中: {file_path}")
return False
def remove_file(self, file_path):
"""
移除文件监控
Args:
file_path (str): 要移除监控的文件路径
"""
file_path = os.path.abspath(file_path)
if file_path in self.watched_files:
self.watcher.removePath(file_path)
self.watched_files.remove(file_path)
print(f"停止监控文件: {file_path}")
return True
else:
print(f"文件未在监控中: {file_path}")
return False
def cache_directory_contents(self, directory_path):
"""缓存目录内容"""
try:
contents = set(os.listdir(directory_path))
self.directory_contents[directory_path] = contents
except OSError as e:
print(f"无法读取目录内容: {directory_path} - {e}")
def on_directory_changed(self, directory_path):
"""目录变化处理"""
print(f"检测到目录变化: {directory_path}")
# 添加到待处理变化列表
self.pending_changes.add(directory_path)
# 启动延迟处理定时器
self.change_timer.start(500) # 500ms延迟
def on_file_changed(self, file_path):
"""文件变化处理"""
print(f"检测到文件变化: {file_path}")
# 立即发出文件变化信号
self.file_changed.emit(file_path)
def process_pending_changes(self):
"""处理待处理的目录变化"""
for directory_path in self.pending_changes:
self.analyze_directory_changes(directory_path)
self.pending_changes.clear()
def analyze_directory_changes(self, directory_path):
"""分析目录具体变化"""
if not os.path.exists(directory_path):
print(f"目录已被删除: {directory_path}")
self.directory_changed.emit(directory_path)
return
try:
# 获取当前目录内容
current_contents = set(os.listdir(directory_path))
# 获取之前缓存的内容
previous_contents = self.directory_contents.get(directory_path, set())
# 检测新增的项目
added_items = current_contents - previous_contents
for item in added_items:
item_path = os.path.join(directory_path, item)
if os.path.isdir(item_path):
print(f"检测到新增目录: {item_path}")
self.project_added.emit(item_path)
else:
print(f"检测到新增文件: {item_path}")
# 检测删除的项目
removed_items = previous_contents - current_contents
for item in removed_items:
item_path = os.path.join(directory_path, item)
print(f"检测到删除项目: {item_path}")
self.project_removed.emit(item_path)
# 更新缓存
self.directory_contents[directory_path] = current_contents
# 发出目录变化信号
if added_items or removed_items:
self.directory_changed.emit(directory_path)
except OSError as e:
print(f"分析目录变化失败: {directory_path} - {e}")
def get_watched_directories(self):
"""获取所有监控的目录"""
return list(self.watched_directories)
def get_watched_files(self):
"""获取所有监控的文件"""
return list(self.watched_files)
def clear_all_watches(self):
"""清除所有监控"""
# 移除所有路径
for path in list(self.watched_directories):
self.remove_directory(path)
for path in list(self.watched_files):
self.remove_file(path)
print("已清除所有文件监控")
def is_watching(self, path):
"""检查是否正在监控指定路径"""
path = os.path.abspath(path)
return path in self.watched_directories or path in self.watched_files