MetaCore-startup/MetaCore/data/project_manager.py

1328 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
项目数据管理器模块
这个模块提供了完整的项目管理功能,包括:
- 项目数据的创建、读取、更新、删除 (CRUD)
- 项目文件结构的自动生成
- 项目验证和重名检查
- 项目数据的持久化存储
- 基于Qt信号的事件通知系统
主要类:
- Project: 项目数据模型类
- ProjectManager: 项目管理器类,提供所有项目管理功能
作者: MetaCore Team
版本: 1.0.0
创建时间: 2024
"""
# 标准库导入
import json # JSON数据序列化和反序列化
import os # 操作系统接口,用于文件和目录操作
import platform
import stat
import threading
import time
from datetime import datetime # 日期时间处理
from pathlib import Path # 现代路径处理
from typing import List, Dict, Optional # 类型提示
import glob
import subprocess
# 第三方库导入
from PyQt5.QtCore import QObject, pyqtSignal, QFileSystemWatcher, QTimer # Qt核心对象和信号系统
class PathUtils:
"""跨平台路径处理工具类"""
@staticmethod
def normalize_path(path_str: str) -> Path:
"""规范化路径为Path对象"""
if not path_str:
return Path()
return Path(path_str).resolve()
@staticmethod
def ensure_path_exists(path: Path, is_file: bool = False) -> bool:
"""确保路径存在,如果是文件则确保其父目录存在"""
try:
if is_file:
path.parent.mkdir(parents=True, exist_ok=True)
else:
path.mkdir(parents=True, exist_ok=True)
return True
except Exception as e:
print(f"创建路径失败 {path}: {e}")
return False
@staticmethod
def safe_path_join(*parts) -> Path:
"""安全地连接路径部分"""
if not parts:
return Path()
return Path(parts[0]).joinpath(*parts[1:])
@staticmethod
def get_relative_path(path: Path, base: Path) -> Path:
"""获取相对路径"""
try:
return path.relative_to(base)
except ValueError:
return path
@staticmethod
def is_valid_path(path_str: str) -> bool:
"""检查路径字符串是否有效"""
if not path_str:
return False
# 检查非法字符(跨平台)
invalid_chars = ['<', '>', ':', '"', '|', '?', '*']
if platform.system().lower() == 'windows':
# Windows额外的限制
invalid_chars.extend(['\\', '/'])
return not any(char in path_str for char in invalid_chars)
class PlatformUtils:
"""跨平台工具类"""
@staticmethod
def get_system_type() -> str:
"""获取系统类型"""
return platform.system().lower()
@staticmethod
def get_executable_extension() -> str:
"""获取可执行文件扩展名"""
return '.exe' if PlatformUtils.get_system_type() == 'windows' else ''
@staticmethod
def get_script_extension() -> str:
"""获取脚本文件扩展名"""
system = PlatformUtils.get_system_type()
if system == 'windows':
return '.bat'
else:
return '.sh'
@staticmethod
def get_python_executable(venv_path: Path) -> Path:
"""获取虚拟环境中的Python可执行文件路径"""
system = PlatformUtils.get_system_type()
if system == 'windows':
return venv_path / 'Scripts' / 'python.exe'
else:
return venv_path / 'bin' / 'python'
@staticmethod
def find_venv_path(project_path: Path) -> Optional[Path]:
"""
查找项目中的虚拟环境路径
根据平台规定虚拟环境目录名称:
- Windows: .venv
- Ubuntu/Linux: venv
- macOS: venv
"""
system = PlatformUtils.get_system_type()
# 根据平台确定虚拟环境目录名
if system == 'windows':
venv_name = '.venv'
else: # Linux, macOS等
venv_name = 'venv'
venv_path = project_path / venv_name
# 检查虚拟环境是否存在
if venv_path.exists() and venv_path.is_dir():
# 验证是否是有效的虚拟环境目录
python_exec = PlatformUtils.get_python_executable(venv_path)
if python_exec.exists():
print(f"找到虚拟环境: {venv_path}")
return venv_path
else:
print(f"虚拟环境目录存在但无效缺少Python解释器: {venv_path}")
return None
else:
print(f"虚拟环境不存在: {venv_path} (平台: {system})")
return None
@staticmethod
def get_pycharm_paths() -> List[Path]:
"""获取各平台PyCharm可能的安装路径"""
system = PlatformUtils.get_system_type()
home = Path.home()
if system == "windows":
return [
Path("pycharm.exe"),
Path("pycharm64.exe"),
*Path("C:/Program Files/JetBrains").glob("PyCharm*/bin/pycharm64.exe"),
*Path("C:/Program Files (x86)/JetBrains").glob("PyCharm*/bin/pycharm.exe"),
*home.glob("AppData/Local/JetBrains/Toolbox/apps/PyCharm-P/ch-*/bin/pycharm64.exe"),
*home.glob("AppData/Local/JetBrains/Toolbox/apps/PyCharm-P/ch-*/bin/pycharm.exe")
]
elif system == "darwin": # macOS
return [
Path("pycharm"),
*Path("/Applications").glob("PyCharm*.app/Contents/MacOS/pycharm"),
Path("/Applications/PyCharm.app/Contents/MacOS/pycharm"),
*home.glob("Applications/PyCharm*.app/Contents/MacOS/pycharm")
]
else: # Linux
return [
Path("pycharm"),
Path("pycharm.sh"),
*Path("/opt").glob("pycharm*/bin/pycharm.sh"),
Path("/usr/local/bin/pycharm"),
Path("/usr/bin/pycharm"),
*home.glob(".local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-*/bin/pycharm.sh")
]
@staticmethod
def find_executable(paths: List[Path]) -> Optional[Path]:
"""在指定路径列表中查找可执行文件"""
system = PlatformUtils.get_system_type()
# 首先尝试直接调用如果在PATH中
try:
cmd = "pycharm" + PlatformUtils.get_executable_extension()
subprocess.run([cmd, "--version"], check=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return Path(cmd)
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# 在指定路径中查找
for path in paths:
if path.exists():
if system == "windows" or os.access(path, os.X_OK):
return path
return None
class Project:
"""
项目数据模型类
这个类表示一个项目的所有基本信息,包括项目的元数据、路径信息和描述。
它提供了项目数据的序列化和反序列化功能,用于数据持久化。
属性:
id (int): 项目的唯一标识符
title (str): 项目名称
date (str): 项目创建日期,格式为 "YYYY-MM-DD HH:MM:SS"
type (str): 项目类型,如 "empty", "web", "mobile"
image (str): 项目图标路径
path (Path): 项目选择的基础路径(用户选择的父目录)
project_dir (Path): 实际项目目录的完整路径
description (str): 项目描述信息
status (str): 项目状态
"""
def __init__(self, id: int, title: str, date: str, type: str, image: str,
path: str = "", project_dir: str = "", description: str = "",
status: str = "normal"):
"""
初始化项目对象
Args:
id (int): 项目唯一标识符
title (str): 项目名称
date (str): 创建日期字符串
type (str): 项目类型
image (str): 项目图标
path (str, optional): 基础路径. Defaults to "".
project_dir (str, optional): 项目目录路径. Defaults to "".
description (str, optional): 项目描述. Defaults to "".
status (str, optional): 项目状态. Defaults to "normal".
"""
self.id = id
self.title = title
self.date = date
self.type = type
self.image = image
# 使用PathUtils处理路径
self.path = PathUtils.normalize_path(path) if path else Path()
self.project_dir = PathUtils.normalize_path(project_dir) if project_dir else Path()
self.description = description
self.status = status
self.pycharm_process = None
self.pycharm_ready = False
def to_dict(self) -> Dict:
"""
将项目对象转换为字典格式
Returns:
Dict: 包含项目所有属性的字典
"""
return {
'id': self.id,
'title': self.title,
'date': self.date,
'type': self.type,
'image': self.image,
'path': str(self.path),
'project_dir': str(self.project_dir),
'description': self.description,
'status': self.status
}
@classmethod
def from_dict(cls, data: Dict) -> 'Project':
"""
从字典创建项目对象
Args:
data (Dict): 包含项目数据的字典
Returns:
Project: 新创建的项目对象
"""
return cls(
id=data.get('id', 0),
title=data.get('title', ''),
date=data.get('date', ''),
type=data.get('type', ''),
image=data.get('image', ''),
path=data.get('path', ''),
project_dir=data.get('project_dir', ''),
description=data.get('description', ''),
status=data.get('status', 'normal')
)
class ProjectManager(QObject):
"""
项目管理器类
这是整个项目管理系统的核心类继承自QObject以支持Qt信号系统。
它负责管理所有项目的生命周期,包括创建、读取、更新、删除操作,
以及项目数据的持久化存储和验证功能。
"""
# ==================== 信号定义 ====================
projects_changed = pyqtSignal() # 项目列表发生变化时发出
project_added = pyqtSignal(Project) # 添加新项目时发出
project_removed = pyqtSignal(int) # 删除项目时发出
project_updated = pyqtSignal(Project) # 更新项目时发出
pycharm_started = pyqtSignal() # PyCharm启动完成信号
project_method_called = pyqtSignal(str) # 项目方法调用完成信号
check_pycharm_ready_signal = pyqtSignal(str) # 检查PyCharm准备信号
pycharm_check_complete = pyqtSignal(bool, str) # PyCharm检查完成信号
def __init__(self):
"""初始化项目管理器"""
super().__init__()
self.projects: List[Project] = []
self.data_file = PathUtils.safe_path_join("data", "projects.json")
# 初始化文件系统监控器
self.file_watcher = QFileSystemWatcher()
self.file_watcher.directoryChanged.connect(self.on_directory_changed)
self.file_watcher.fileChanged.connect(self.on_file_changed)
# 监控的项目路径映射 {路径字符串: 项目ID}
self.watched_paths = {}
self.load_projects()
self.setup_project_monitoring()
# 连接信号到槽
self.check_pycharm_ready_signal.connect(self._on_check_pycharm_ready)
# 添加定时器用于轮询检查
self.pycharm_check_timer = QTimer()
self.pycharm_check_timer.timeout.connect(self._check_pycharm_status)
self.target_project_path = None
# 添加缺失的属性
self.max_wait_time = 180
self.check_start_time = 0
self.current_opened_project = None
self.project_run_started = False
self.project_run_completed = False
# ==================== 数据持久化方法 ====================
def load_projects(self):
"""从JSON文件加载项目数据"""
try:
if self.data_file.exists():
with open(self.data_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.projects = [Project.from_dict(item) for item in data]
else:
self.create_default_projects()
except Exception as e:
print(f"加载项目数据失败: {e}")
self.create_default_projects()
def save_projects(self):
"""将项目数据保存到JSON文件"""
try:
# 确保数据文件的目录存在
PathUtils.ensure_path_exists(self.data_file, is_file=True)
with open(self.data_file, 'w', encoding='utf-8') as f:
data = [project.to_dict() for project in self.projects]
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存项目数据失败: {e}")
def create_default_projects(self):
"""创建默认项目数据"""
default_projects = [
Project(1, "示例项目1", "2024-06-08 15:56:35", "empty", "📁",
"", "", "这是一个示例空白项目"),
Project(2, "示例项目2", "2023-01-10 12:09:04", "empty", "📁",
"", "", "这是另一个示例空白项目"),
Project(3, "我的第一个项目", "2024-06-07 06:57:46", "empty", "📁",
"", "", "从这里开始您的第一个项目"),
]
self.projects = default_projects
# ==================== 项目查询方法 ====================
def get_all_projects(self) -> List[Project]:
"""获取所有项目列表"""
return self.projects.copy()
def get_project_by_id(self, project_id: int) -> Optional[Project]:
"""根据ID获取项目"""
for project in self.projects:
if project.id == project_id:
return project
return None
def get_project_by_name(self, name: str) -> Optional[Project]:
"""根据项目名称获取特定项目"""
for project in self.projects:
if project.title == name:
return project
return None
# ==================== 项目验证方法 ====================
def is_project_name_exists(self, name: str) -> bool:
"""检查项目名称是否已存在"""
return self.get_project_by_name(name) is not None
def is_project_directory_exists(self, path: Path, name: str) -> bool:
"""检查项目目录是否已存在于文件系统中"""
project_dir = path / name
return project_dir.exists()
def is_project_path_exists(self, path: Path) -> bool:
"""检查项目路径是否已存在于项目列表中"""
normalized_path = PathUtils.normalize_path(str(path))
for project in self.projects:
if project.path and PathUtils.normalize_path(str(project.path)) == normalized_path:
return True
if project.project_dir and PathUtils.normalize_path(str(project.project_dir)) == normalized_path:
return True
return False
def validate_project_creation(self, name: str, path: str) -> tuple[bool, str]:
"""验证项目创建的有效性"""
if not name or not name.strip():
return False, "请输入项目名称"
if not PathUtils.is_valid_path(name):
return False, f"项目名称包含非法字符"
path_obj = PathUtils.normalize_path(path)
if self.is_project_directory_exists(path_obj, name):
return False, f"目录已存在"
return True, ""
def validate_project_import(self, name: str, path: str) -> tuple[bool, str]:
"""验证项目导入的有效性"""
if not name or not name.strip():
return False, "项目名称不能为空"
if not PathUtils.is_valid_path(name):
return False, f"项目名称包含非法字符"
path_obj = PathUtils.normalize_path(path)
if not path_obj.exists():
return False, f"文件夹不存在"
if not path_obj.is_dir():
return False, f"选择的路径不是文件夹"
if self.is_project_path_exists(path_obj):
return False, f"该文件夹已被导入"
return True, ""
def validate_project_open(self, project_path: str) -> tuple[bool, str]:
"""验证项目是否可以打开"""
if not project_path:
return False, "项目路径不存在或无效"
path_obj = PathUtils.normalize_path(project_path)
if not path_obj.exists():
return False, "项目路径不存在或无效"
if not path_obj.is_dir():
return False, "项目路径不是有效的文件夹"
if not os.access(path_obj, os.R_OK):
return False, "没有权限访问项目文件夹"
try:
if not any(path_obj.iterdir()):
return False, "项目文件夹为空"
except PermissionError:
return False, "没有权限读取项目文件夹内容"
return True, ""
# ==================== 项目CRUD操作方法 ====================
def add_project(self, title: str, description: str, project_type: str, path: str) -> Project:
"""添加新项目"""
new_id = max([p.id for p in self.projects], default=0) + 1
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
path_obj = PathUtils.normalize_path(path)
project_dir = path_obj / title
try:
PathUtils.ensure_path_exists(project_dir)
self._create_project_structure(project_dir, project_type, title, description)
except Exception as e:
raise Exception(f"创建项目目录失败: {str(e)}")
# 创建项目图标路径
filename = f'{title}.png'
full_path = project_dir / filename
new_project = Project(
id=new_id,
title=title,
date=current_time,
type=project_type,
image=str(full_path),
path=str(path_obj),
project_dir=str(project_dir),
description=description
)
self.projects.insert(0, new_project)
self.add_project_to_watcher(new_project)
self.project_added.emit(new_project)
self.projects_changed.emit()
self.save_projects()
return new_project
def import_project(self, title: str, description: str, project_type: str, project_dir: str) -> Project:
"""导入现有项目文件夹"""
project_dir_path = PathUtils.normalize_path(project_dir)
if not project_dir_path.exists():
raise Exception(f"项目目录不存在: {project_dir}")
if not project_dir_path.is_dir():
raise Exception(f"路径不是文件夹: {project_dir}")
new_id = max([p.id for p in self.projects], default=0) + 1
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
parent_path = project_dir_path.parent
filename = f'{title}.png'
full_path = project_dir_path / filename
new_project = Project(
id=new_id,
title=title,
date=current_time,
type=project_type,
image=str(full_path),
path=str(parent_path),
project_dir=str(project_dir_path),
description=description
)
self.projects.insert(0, new_project)
self.add_project_to_watcher(new_project)
self.project_added.emit(new_project)
self.projects_changed.emit()
self.save_projects()
return new_project
def _create_project_structure(self, project_dir: Path, project_type: str, title: str, description: str):
"""创建项目文件结构"""
try:
# 创建基本目录结构
directories = ['models', 'textures', 'scenes']
for directory in directories:
dir_path = project_dir / directory
PathUtils.ensure_path_exists(dir_path)
# 创建项目配置文件
project_config = {
"name": title,
"path": str(project_dir),
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"last_modified": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0.0",
"engine_version": "1.0.0"
}
config_file = project_dir / 'project.json'
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(project_config, f, ensure_ascii=False, indent=2)
self._create_type_specific_files(project_dir, project_type)
except Exception as e:
print(f"创建项目文件结构失败: {e}")
def _create_type_specific_files(self, project_dir: Path, project_type: str):
"""根据项目类型创建特定文件"""
try:
if project_type == 'empty':
# 创建配置目录和文件
config_dir = project_dir / 'config'
PathUtils.ensure_path_exists(config_dir)
config_content = {
'project_settings': {
'version': '1.0.0',
'build_target': 'development',
'dependencies': []
},
'custom_settings': {}
}
config_file = config_dir / 'settings.json'
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config_content, f, ensure_ascii=False, indent=2)
# 创建源代码目录和示例文件
src_dir = project_dir / 'src'
PathUtils.ensure_path_exists(src_dir)
src_file = src_dir / 'main.py'
with open(src_file, 'w', encoding='utf-8') as f:
f.write('''#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
主程序入口文件
"""
def main():
"""主函数"""
print("Hello, MetaCore!")
print("这是一个空白项目模板,您可以在此基础上开始开发。")
if __name__ == "__main__":
main()
''')
except Exception as e:
print(f"创建项目类型特定文件失败: {e}")
def remove_project(self, project_id: int) -> bool:
"""删除项目"""
for i, project in enumerate(self.projects):
if project.id == project_id:
self.remove_project_from_watcher(project)
del self.projects[i]
self.project_removed.emit(project_id)
self.projects_changed.emit()
self.save_projects()
return True
return False
def update_project(self, project: Project):
"""更新项目"""
for i, p in enumerate(self.projects):
if p.id == project.id:
self.projects[i] = project
self.project_updated.emit(project)
self.projects_changed.emit()
self.save_projects()
break
def cleanup_old_preview_images(self, project_id: int):
"""清理指定项目的旧预览图"""
try:
preview_dir = Path.cwd() / 'MetaCore' / 'Resources' / 'ProjectPreviews'
if not preview_dir.exists():
return
# 查找该项目的所有预览图
pattern = f"*preview_{project_id}_*.png"
old_previews = list(preview_dir.glob(pattern))
# 按修改时间排序,保留最新的一个,删除其他的
if len(old_previews) > 1:
old_previews.sort(key=lambda x: x.stat().st_mtime, reverse=True)
# 保留最新的,删除其他的
for old_preview in old_previews[1:]:
try:
old_preview.unlink()
print(f"已删除旧预览图: {old_preview}")
except Exception as e:
print(f"删除旧预览图失败 {old_preview}: {e}")
except Exception as e:
print(f"清理旧预览图时发生错误: {e}")
def get_projects_by_type(self, project_type: str) -> List[Project]:
"""根据类型获取项目"""
if project_type in ["overview", "all"]:
return self.get_all_projects()
elif project_type in ["management", "resource_category", "resource_management", "system_settings"]:
return []
else:
return [p for p in self.projects if p.type == project_type]
def search_projects(self, keyword: str) -> List[Project]:
"""搜索项目"""
if not keyword:
return self.get_all_projects()
keyword = keyword.lower()
return [p for p in self.projects if keyword in p.title.lower()]
def get_project_types(self) -> List[str]:
"""获取所有项目类型"""
types = set(p.type for p in self.projects)
return sorted(list(types))
# ==================== 项目监控方法 ====================
def setup_project_monitoring(self):
"""设置项目监控"""
for project in self.projects:
self.add_project_to_watcher(project)
# 添加定时检查机制
self.check_timer = QTimer()
self.check_timer.timeout.connect(self.check_all_projects_existence)
self.check_timer.start(3000) # 每3秒检查一次
def check_all_projects_existence(self):
"""检查所有项目是否仍然存在"""
deleted_projects = []
restored_projects = []
for project in self.projects:
if project.project_dir:
project_path = PathUtils.normalize_path(str(project.project_dir))
if project.status == 'normal' and not project_path.exists():
deleted_projects.append(project)
elif project.status == 'pending_delete' and project_path.exists():
restored_projects.append(project)
if deleted_projects:
self.handle_multiple_projects_deleted(deleted_projects)
if restored_projects:
self.handle_multiple_projects_restored(restored_projects)
def handle_multiple_projects_deleted(self, deleted_projects):
"""处理多个项目目录被删除的情况"""
from PyQt5.QtWidgets import QMessageBox
if len(deleted_projects) == 1:
self.handle_project_directory_deleted(deleted_projects[0])
else:
project_names = [p.title for p in deleted_projects]
project_list = "\n".join([f"{name}" for name in project_names])
reply = QMessageBox.question(
None,
"多个项目目录已删除",
f"检测到以下 {len(deleted_projects)} 个项目的目录已被删除:\n\n"
f"{project_list}\n\n"
f"是否要从项目列表中移除这些项目?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes
)
if reply == QMessageBox.Yes:
for project in deleted_projects:
self.remove_project_from_watcher(project)
self.remove_project(project.id)
print(f"已自动移除 {len(deleted_projects)} 个被删除的项目")
else:
for project in deleted_projects:
self.remove_project_from_watcher(project)
project.status = 'pending_delete'
self.project_updated.emit(project)
self.projects_changed.emit()
self.save_projects()
print(f"已将 {len(deleted_projects)} 个项目标记为待删除状态")
def handle_multiple_projects_restored(self, restored_projects):
"""处理多个项目目录恢复的情况"""
for project in restored_projects:
self.auto_restore_project(project)
def auto_restore_project(self, project: Project):
"""自动恢复项目到正常状态"""
if project.status == 'pending_delete' and project.project_dir:
project_path = PathUtils.normalize_path(str(project.project_dir))
if project_path.exists():
project.status = 'normal'
self.add_project_to_watcher(project)
self.project_updated.emit(project)
self.projects_changed.emit()
self.save_projects()
print(f"项目已自动恢复: {project.title}")
return True
return False
def add_project_to_watcher(self, project: Project):
"""将项目添加到文件监控器"""
if not project.project_dir:
return
project_path = PathUtils.normalize_path(str(project.project_dir))
if not project_path.exists():
return
path_str = str(project_path)
if path_str not in self.watched_paths:
self.file_watcher.addPath(path_str)
self.watched_paths[path_str] = project.id
print(f"开始监控项目目录: {path_str}")
def remove_project_from_watcher(self, project: Project):
"""从文件监控器中移除项目"""
if not project.project_dir:
return
path_str = str(PathUtils.normalize_path(str(project.project_dir)))
if path_str in self.watched_paths:
self.file_watcher.removePath(path_str)
del self.watched_paths[path_str]
print(f"停止监控项目目录: {path_str}")
def on_directory_changed(self, path: str):
"""目录变化处理"""
print(f"检测到目录变化: {path}")
if path in self.watched_paths:
path_id = self.watched_paths[path]
if isinstance(path_id, int):
self.check_project_directory(path_id, path)
def on_file_changed(self, path: str):
"""文件变化处理"""
print(f"检测到文件变化: {path}")
def check_project_directory(self, project_id: int, directory_path: str):
"""检查项目目录状态"""
project = self.get_project_by_id(project_id)
if not project:
return
project_path = PathUtils.normalize_path(str(project.project_dir))
if project.status == 'normal' and not project_path.exists():
print(f"项目目录已被删除: {project_path}")
self.handle_project_directory_deleted(project)
def handle_project_directory_deleted(self, project: Project):
"""处理项目目录被删除的情况"""
from PyQt5.QtWidgets import QMessageBox
if project.status != 'normal':
return
reply = QMessageBox.question(
None,
"项目目录已删除",
f"检测到项目 \"{project.title}\" 的目录已被删除:\n{project.project_dir}\n\n"
f"是否要从项目列表中移除此项目?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes
)
if reply == QMessageBox.Yes:
self.remove_project_from_watcher(project)
self.remove_project(project.id)
print(f"已自动移除被删除的项目: {project.title}")
else:
self.remove_project_from_watcher(project)
project.status = 'pending_delete'
self.project_updated.emit(project)
self.projects_changed.emit()
self.save_projects()
print(f"项目标记为待删除状态: {project.title}")
def restore_project(self, project_id: int):
"""恢复项目到正常状态(手动恢复)"""
project = self.get_project_by_id(project_id)
if project and project.status == 'pending_delete':
project_path = PathUtils.normalize_path(str(project.project_dir))
if project_path.exists():
return self.auto_restore_project(project)
else:
print(f"项目目录仍不存在,无法恢复: {project.title}")
return False
return False
def confirm_delete_project(self, project_id: int):
"""确认删除项目(用于待删除状态的项目)"""
project = self.get_project_by_id(project_id)
if project and project.status == 'pending_delete':
self.remove_project(project_id)
print(f"已确认删除项目: {project.title}")
return True
return False
# ==================== PyCharm集成方法 ====================
def run_python_in_venv(self, venv_path: str, script_path: str, args=None, cwd=None, timeout=30):
"""在指定虚拟环境中运行Python脚本"""
try:
venv_path_obj = PathUtils.normalize_path(venv_path)
script_path_obj = PathUtils.normalize_path(script_path)
# 获取虚拟环境中的Python解释器
python_executable = PlatformUtils.get_python_executable(venv_path_obj)
# 构建命令
cmd = [str(python_executable), str(script_path_obj)]
if args:
# 将所有参数转换为字符串,处理 Path 对象
str_args = [str(arg) for arg in args]
print("参数:", str_args)
cmd.extend(str_args)
# 设置环境变量
env = os.environ.copy()
env['VIRTUAL_ENV'] = str(venv_path_obj)
# 强制子进程使用 UTF-8 编码,避免 emoji 和特殊字符编码错误
env['PYTHONIOENCODING'] = 'utf-8'
system = PlatformUtils.get_system_type()
if system == 'windows':
env['PATH'] = str(venv_path_obj / 'Scripts') + os.pathsep + env['PATH']
else:
env['PATH'] = str(venv_path_obj / 'bin') + os.pathsep + env['PATH']
# 根据平台确定编码
# 注意:子进程会使用 UTF-8通过 PYTHONIOENCODING这里用于读取子进程输出
encoding = 'utf-8' # 统一使用 UTF-8
# 执行命令
process = subprocess.Popen(
cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding=encoding,
errors='replace', # 遇到无法解码的字符时用替换字符代替,避免崩溃
env=env
)
return process, "", ""
except Exception as e:
return None, "", f"在虚拟环境中运行Python脚本时出错: {str(e)}"
def run_project_command(self, project_path: str, target_project_path: str):
"""通过PyCharm命令行接口运行项目"""
try:
project_path_obj = PathUtils.normalize_path(project_path)
print(f"Finding entry point for project: {project_path_obj}")
entry_point = self.find_project_entry_point(project_path_obj)
if not entry_point:
print(f"No entry point found for project: {project_path_obj}")
return False
# 使用跨平台的虚拟环境查找方法
venv_path = PlatformUtils.find_venv_path(project_path_obj)
if not venv_path:
print(f"错误: 未找到虚拟环境")
return False
print(f"虚拟环境路径: {venv_path}")
print(f"脚本路径: {entry_point}")
if not entry_point.exists():
print(f"错误: 脚本文件不存在: {entry_point}")
return False
print(f'target_project_path:{target_project_path}')
process, stdout, stderr = self.run_python_in_venv(str(venv_path), str(entry_point), [target_project_path])
try:
stdout, stderr = process.communicate(timeout=2)
if process.returncode == 0:
print("运行成功,输出:", stdout)
return True
else:
print("运行失败,错误:", stderr)
return False
except subprocess.TimeoutExpired:
print("进程仍在运行(认为启动成功)")
return True
except subprocess.TimeoutExpired:
print("PyCharm run command timeout - project may be running")
return True
except Exception as e:
print(f"PyCharm run command failed: {e}")
return False
def launch_pycharm_and_open_project(self, project_path: str, target_project_path: str):
"""启动PyCharm等待加载完成后调用项目方法打开指定项目"""
try:
if not self.start_pycharm(project_path):
return False
self.wait_for_pycharm_ready_and_run_project(project_path, target_project_path)
return True
except Exception as e:
print(f"启动PyCharm并打开项目失败: {e}")
return False
def start_pycharm(self, project_path: str):
"""启动PyCharm并打开指定项目"""
try:
project_path_obj = PathUtils.normalize_path(project_path)
if not project_path_obj.exists():
print(f"Project path does not exist: {project_path_obj}")
return False
self.current_opened_project = str(project_path_obj)
# 获取PyCharm路径
pycharm_paths = PlatformUtils.get_pycharm_paths()
pycharm_cmd = PlatformUtils.find_executable(pycharm_paths)
if not pycharm_cmd:
print("PyCharm not found. Please install PyCharm or add it to PATH.")
return False
# 构建启动命令
cmd = [str(pycharm_cmd), str(project_path_obj)]
# 启动PyCharm进程
system = PlatformUtils.get_system_type()
if system == "windows":
self.pycharm_process = subprocess.Popen(
cmd,
shell=True,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
else:
self.pycharm_process = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
print(f"Starting PyCharm with project: {project_path_obj}")
self.pycharm_ready = False
return True
except Exception as e:
print(f"Failed to start PyCharm: {e}")
return False
def wait_for_pycharm_ready_and_run_project(self, project_path: str, target_project_path: str):
"""等待PyCharm启动完成并运行项目"""
print(f"Setting up PyCharm ready check and project run for: {project_path}")
self.project_path = project_path
self.target_project_path = target_project_path
self.check_start_time = time.time()
self.project_run_started = False
self.project_run_completed = False
self.pycharm_check_timer.start(3000) # 每3秒检查一次
print("PyCharm check timer started")
def _check_pycharm_status(self):
"""定时检查PyCharm状态的槽函数"""
try:
elapsed_time = time.time() - self.check_start_time
print(f"Checking PyCharm status... ({elapsed_time:.1f}s elapsed)")
if elapsed_time > self.max_wait_time:
print("Timeout waiting for PyCharm to be ready")
self.pycharm_check_timer.stop()
return
if self.is_pycharm_ready():
print("PyCharm is ready")
self.pycharm_ready = True
self.pycharm_started.emit()
if not self.project_run_started:
print(f"Running project main function: {self.project_path}")
if self.run_project_main_function(self.project_path):
self.project_run_started = True
self.pycharm_check_timer.stop()
print("Project main function started, waiting for completion...")
else:
print("Failed to run project main function")
self.pycharm_check_timer.stop()
return
else:
print("PyCharm not ready yet, continuing to wait...")
except Exception as e:
print(f"Error in _check_pycharm_status: {e}")
self.pycharm_check_timer.stop()
def run_project_main_function(self, project_path: str):
"""运行项目主函数"""
try:
if self.run_project_via_pycharm_command(project_path):
return True
print("All methods to run project main function failed")
return False
except Exception as e:
print(f"Error running project main function: {e}")
return False
def run_project_via_pycharm_command(self, project_path: str):
"""通过PyCharm命令行接口运行项目"""
try:
project_path_obj = PathUtils.normalize_path(project_path)
entry_point = self.find_project_entry_point(project_path_obj)
if not entry_point:
print(f"No entry point found for project: {project_path_obj}")
return False
# 构建运行命令
run_cmd = [
"python3.10",
str(entry_point),
self.target_project_path
]
result = subprocess.Popen(run_cmd)
print(f'result:{result.returncode}')
if result.returncode == 0:
print(f"Successfully started project run: {project_path_obj}")
return True
except subprocess.TimeoutExpired:
print("PyCharm run command timeout - project may be running")
return True
except Exception as e:
print(f"PyCharm run command failed: {e}")
return False
def find_project_entry_point(self, project_path: Path) -> Optional[Path]:
"""查找项目的入口点文件"""
entry_point_names = [
'Start_Run.py',
'main.py',
'app.py',
'run.py',
'start.py',
'application.py',
]
# 在项目根目录中查找
for entry_name in entry_point_names:
entry_path = project_path / entry_name
if entry_path.exists():
print(f"Found entry point: {entry_path}")
return entry_path
# 如果没有找到标准入口点返回src/main.py
default_entry = project_path / 'src' / 'main.py'
print(f"Using default entry point: {default_entry}")
return default_entry if default_entry.parent.exists() else None
def find_project_point(self, project_path: str, entry_point_name: str) -> Optional[str]:
"""使用pathlib安全地查找项目的入口点文件"""
try:
root = PathUtils.normalize_path(project_path)
except Exception:
print(f"❌ 错误: 项目路径不存在或不是一个目录 -> '{project_path}'")
return None
print(f"🔍 正在搜索项目: {root}")
# 定义要搜索的目录列表
search_relative_dirs = ['src', '.']
# 遍历搜索
for rel_dir in search_relative_dirs:
search_dir = root / rel_dir
if not search_dir.is_dir():
continue
print(f" -> 正在检查目录: {search_dir}")
entry_path = search_dir / entry_point_name
if entry_path.is_file():
print(f"✅ 找到入口文件: {entry_path}")
return str(entry_path)
print(f"❌ 未能在项目中找到常见的入口文件。")
return None
def _on_check_pycharm_ready(self, target_project_path: str):
"""处理检查PyCharm准备信号的槽函数"""
pass
def is_pycharm_ready(self):
"""检查PyCharm是否准备就绪"""
try:
if self.pycharm_process and self.pycharm_process.poll() is not None:
print("PyCharm process has terminated")
return False
if self.check_pycharm_responsive():
return True
if self.is_pycharm_window_visible():
time.sleep(2)
return True
return False
except Exception as e:
print(f"Error checking PyCharm status: {e}")
return False
def check_pycharm_responsive(self):
"""检查PyCharm是否响应命令"""
try:
system = PlatformUtils.get_system_type()
encoding = 'gbk' if system == 'windows' else 'utf-8'
result = subprocess.run(
["pycharm", "--version"],
capture_output=True,
text=True,
encoding=encoding,
errors='replace',
timeout=5
)
if result.returncode == 0:
print("PyCharm is responsive")
return True
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
return False
def is_pycharm_window_visible(self):
"""检查PyCharm窗口是否可见跨平台实现"""
try:
system = PlatformUtils.get_system_type()
if system == "windows":
return self._check_pycharm_window_windows()
elif system == "darwin": # macOS
return self._check_pycharm_window_macos()
else: # Linux
return self._check_pycharm_window_linux()
except Exception as e:
print(f"Error checking PyCharm window: {e}")
return True
def _check_pycharm_window_windows(self):
"""检查Windows上的PyCharm窗口"""
try:
import win32gui
import win32process
def enum_windows_callback(hwnd, pid_list):
_, window_pid = win32process.GetWindowThreadProcessId(hwnd)
if window_pid in pid_list:
window_title = win32gui.GetWindowText(hwnd)
if "pycharm" in window_title.lower():
pid_list['found'] = True
return True
if self.pycharm_process:
pycharm_pid = self.pycharm_process.pid
pid_list = {'found': False, pycharm_pid: True}
win32gui.EnumWindows(enum_windows_callback, pid_list)
return pid_list['found']
return False
except ImportError:
return True
except Exception:
return True
def _check_pycharm_window_macos(self):
"""检查macOS上的PyCharm窗口"""
try:
result = subprocess.run(
['osascript', '-e', 'tell application "System Events" to count windows of process "PyCharm"'],
capture_output=True,
text=True,
encoding='utf-8',
errors='replace'
)
if result.returncode == 0 and result.stdout.strip().isdigit():
window_count = int(result.stdout.strip())
return window_count > 0
return True
except Exception:
return True
def _check_pycharm_window_linux(self):
"""检查Linux上的PyCharm窗口"""
try:
result = subprocess.run(
['xdotool', 'search', '--name', 'PyCharm'],
capture_output=True,
text=True,
encoding='utf-8',
errors='replace'
)
if result.returncode == 0 and result.stdout.strip():
return True
return True
except FileNotFoundError:
return True
except Exception:
return True
def stop_pycharm(self):
"""停止PyCharm进程"""
try:
if self.pycharm_process and self.pycharm_process.poll() is None:
self.pycharm_process.terminate()
self.pycharm_process.wait(timeout=10)
print("PyCharm process terminated")
except Exception as e:
print(f"Error stopping PyCharm: {e}")