#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 项目数据管理器模块 这个模块提供了完整的项目管理功能,包括: - 项目数据的创建、读取、更新、删除 (CRUD) - 项目文件结构的自动生成 - 项目验证和重名检查 - 项目数据的持久化存储 - 基于Qt信号的事件通知系统 主要类: - Project: 项目数据模型类 - ProjectManager: 项目管理器类,提供所有项目管理功能 作者: MetaCore Team 版本: 1.0.0 创建时间: 2024 """ # 标准库导入 import json # JSON数据序列化和反序列化 import os # 操作系统接口,用于文件和目录操作 import platform import stat import threading import time from datetime import datetime # 日期时间处理 from pathlib import Path # 现代路径处理 from typing import List, Dict, Optional # 类型提示 import glob import subprocess from PyQt5.QtWidgets import QDialog from ui.widget import UniversalMessageDialog # 第三方库导入 from PyQt5.QtCore import QObject, pyqtSignal, QFileSystemWatcher, QTimer # Qt核心对象和信号系统 class PathUtils: """跨平台路径处理工具类""" @staticmethod def normalize_path(path_str: str) -> Path: """规范化路径为Path对象""" if not path_str: return Path() return Path(path_str).resolve() @staticmethod def ensure_path_exists(path: Path, is_file: bool = False) -> bool: """确保路径存在,如果是文件则确保其父目录存在""" try: if is_file: path.parent.mkdir(parents=True, exist_ok=True) else: path.mkdir(parents=True, exist_ok=True) return True except Exception as e: print(f"创建路径失败 {path}: {e}") return False @staticmethod def safe_path_join(*parts) -> Path: """安全地连接路径部分""" if not parts: return Path() return Path(parts[0]).joinpath(*parts[1:]) @staticmethod def get_relative_path(path: Path, base: Path) -> Path: """获取相对路径""" try: return path.relative_to(base) except ValueError: return path @staticmethod def is_valid_path(path_str: str) -> bool: """检查路径字符串是否有效""" if not path_str: return False # 检查非法字符(跨平台) invalid_chars = ['<', '>', ':', '"', '|', '?', '*'] if platform.system().lower() == 'windows': # Windows额外的限制 invalid_chars.extend(['\\', '/']) return not any(char in path_str for char in invalid_chars) class PlatformUtils: """跨平台工具类""" @staticmethod def get_system_type() -> str: """获取系统类型""" return platform.system().lower() @staticmethod def get_executable_extension() -> str: """获取可执行文件扩展名""" return '.exe' if PlatformUtils.get_system_type() == 'windows' else '' @staticmethod def get_script_extension() -> str: """获取脚本文件扩展名""" system = PlatformUtils.get_system_type() if system == 'windows': return '.bat' else: return '.sh' @staticmethod def get_python_executable(venv_path: Path) -> Path: """获取虚拟环境中的Python可执行文件路径""" system = PlatformUtils.get_system_type() if system == 'windows': return venv_path / 'Scripts' / 'python.exe' else: return venv_path / 'bin' / 'python' @staticmethod def find_venv_path(project_path: Path) -> Optional[Path]: """ 查找项目中的虚拟环境路径 根据平台规定虚拟环境目录名称: - Windows: .venv - Ubuntu/Linux: venv - macOS: venv """ system = PlatformUtils.get_system_type() # 根据平台确定虚拟环境目录名 if system == 'windows': venv_name = '.venv' else: # Linux, macOS等 venv_name = 'venv' venv_path = project_path / venv_name # 检查虚拟环境是否存在 if venv_path.exists() and venv_path.is_dir(): # 验证是否是有效的虚拟环境目录 python_exec = PlatformUtils.get_python_executable(venv_path) if python_exec.exists(): print(f"找到虚拟环境: {venv_path}") return venv_path else: print(f"虚拟环境目录存在但无效(缺少Python解释器): {venv_path}") return None else: print(f"虚拟环境不存在: {venv_path} (平台: {system})") return None @staticmethod def get_pycharm_paths() -> List[Path]: """获取各平台PyCharm可能的安装路径""" system = PlatformUtils.get_system_type() home = Path.home() if system == "windows": return [ Path("pycharm.exe"), Path("pycharm64.exe"), *Path("C:/Program Files/JetBrains").glob("PyCharm*/bin/pycharm64.exe"), *Path("C:/Program Files (x86)/JetBrains").glob("PyCharm*/bin/pycharm.exe"), *home.glob("AppData/Local/JetBrains/Toolbox/apps/PyCharm-P/ch-*/bin/pycharm64.exe"), *home.glob("AppData/Local/JetBrains/Toolbox/apps/PyCharm-P/ch-*/bin/pycharm.exe") ] elif system == "darwin": # macOS return [ Path("pycharm"), *Path("/Applications").glob("PyCharm*.app/Contents/MacOS/pycharm"), Path("/Applications/PyCharm.app/Contents/MacOS/pycharm"), *home.glob("Applications/PyCharm*.app/Contents/MacOS/pycharm") ] else: # Linux return [ Path("pycharm"), Path("pycharm.sh"), *Path("/opt").glob("pycharm*/bin/pycharm.sh"), Path("/usr/local/bin/pycharm"), Path("/usr/bin/pycharm"), *home.glob(".local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-*/bin/pycharm.sh") ] @staticmethod def find_executable(paths: List[Path]) -> Optional[Path]: """在指定路径列表中查找可执行文件""" system = PlatformUtils.get_system_type() # 首先尝试直接调用(如果在PATH中) try: cmd = "pycharm" + PlatformUtils.get_executable_extension() subprocess.run([cmd, "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return Path(cmd) except (subprocess.CalledProcessError, FileNotFoundError): pass # 在指定路径中查找 for path in paths: if path.exists(): if system == "windows" or os.access(path, os.X_OK): return path return None class Project: """ 项目数据模型类 这个类表示一个项目的所有基本信息,包括项目的元数据、路径信息和描述。 它提供了项目数据的序列化和反序列化功能,用于数据持久化。 属性: id (int): 项目的唯一标识符 title (str): 项目名称 date (str): 项目创建日期,格式为 "YYYY-MM-DD HH:MM:SS" type (str): 项目类型,如 "empty", "web", "mobile" 等 image (str): 项目图标路径 path (Path): 项目选择的基础路径(用户选择的父目录) project_dir (Path): 实际项目目录的完整路径 description (str): 项目描述信息 status (str): 项目状态 """ def __init__(self, id: int, title: str, date: str, type: str, image: str, path: str = "", project_dir: str = "", description: str = "", status: str = "normal"): """ 初始化项目对象 Args: id (int): 项目唯一标识符 title (str): 项目名称 date (str): 创建日期字符串 type (str): 项目类型 image (str): 项目图标 path (str, optional): 基础路径. Defaults to "". project_dir (str, optional): 项目目录路径. Defaults to "". description (str, optional): 项目描述. Defaults to "". status (str, optional): 项目状态. Defaults to "normal". """ self.id = id self.title = title self.date = date self.type = type self.image = image # 使用PathUtils处理路径 self.path = PathUtils.normalize_path(path) if path else Path() self.project_dir = PathUtils.normalize_path(project_dir) if project_dir else Path() self.description = description self.status = status self.pycharm_process = None self.pycharm_ready = False def to_dict(self) -> Dict: """ 将项目对象转换为字典格式 Returns: Dict: 包含项目所有属性的字典 """ return { 'id': self.id, 'title': self.title, 'date': self.date, 'type': self.type, 'image': self.image, 'path': str(self.path), 'project_dir': str(self.project_dir), 'description': self.description, 'status': self.status } @classmethod def from_dict(cls, data: Dict) -> 'Project': """ 从字典创建项目对象 Args: data (Dict): 包含项目数据的字典 Returns: Project: 新创建的项目对象 """ return cls( id=data.get('id', 0), title=data.get('title', ''), date=data.get('date', ''), type=data.get('type', ''), image=data.get('image', ''), path=data.get('path', ''), project_dir=data.get('project_dir', ''), description=data.get('description', ''), status=data.get('status', 'normal') ) class ProjectManager(QObject): """ 项目管理器类 这是整个项目管理系统的核心类,继承自QObject以支持Qt信号系统。 它负责管理所有项目的生命周期,包括创建、读取、更新、删除操作, 以及项目数据的持久化存储和验证功能。 """ # ==================== 信号定义 ==================== projects_changed = pyqtSignal() # 项目列表发生变化时发出 project_added = pyqtSignal(Project) # 添加新项目时发出 project_removed = pyqtSignal(int) # 删除项目时发出 project_updated = pyqtSignal(Project) # 更新项目时发出 pycharm_started = pyqtSignal() # PyCharm启动完成信号 project_method_called = pyqtSignal(str) # 项目方法调用完成信号 check_pycharm_ready_signal = pyqtSignal(str) # 检查PyCharm准备信号 pycharm_check_complete = pyqtSignal(bool, str) # PyCharm检查完成信号 def __init__(self): """初始化项目管理器""" super().__init__() self.projects: List[Project] = [] self.data_file = PathUtils.safe_path_join("data", "projects.json") # 初始化文件系统监控器 self.file_watcher = QFileSystemWatcher() self.file_watcher.directoryChanged.connect(self.on_directory_changed) self.file_watcher.fileChanged.connect(self.on_file_changed) # 监控的项目路径映射 {路径字符串: 项目ID} self.watched_paths = {} self.load_projects() self.setup_project_monitoring() # 连接信号到槽 self.check_pycharm_ready_signal.connect(self._on_check_pycharm_ready) # 添加定时器用于轮询检查 self.pycharm_check_timer = QTimer() self.pycharm_check_timer.timeout.connect(self._check_pycharm_status) self.target_project_path = None # 添加缺失的属性 self.max_wait_time = 180 self.check_start_time = 0 self.current_opened_project = None self.project_run_started = False self.project_run_completed = False # ==================== 数据持久化方法 ==================== def load_projects(self): """从JSON文件加载项目数据""" try: if self.data_file.exists(): with open(self.data_file, 'r', encoding='utf-8') as f: data = json.load(f) self.projects = [Project.from_dict(item) for item in data] else: self.create_default_projects() except Exception as e: print(f"加载项目数据失败: {e}") self.create_default_projects() def save_projects(self): """将项目数据保存到JSON文件""" try: # 确保数据文件的目录存在 PathUtils.ensure_path_exists(self.data_file, is_file=True) with open(self.data_file, 'w', encoding='utf-8') as f: data = [project.to_dict() for project in self.projects] json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"保存项目数据失败: {e}") def create_default_projects(self): """创建默认项目数据""" default_projects = [ Project(1, "示例项目1", "2024-06-08 15:56:35", "empty", "📁", "", "", "这是一个示例空白项目"), Project(2, "示例项目2", "2023-01-10 12:09:04", "empty", "📁", "", "", "这是另一个示例空白项目"), Project(3, "我的第一个项目", "2024-06-07 06:57:46", "empty", "📁", "", "", "从这里开始您的第一个项目"), ] self.projects = default_projects # ==================== 项目查询方法 ==================== def get_all_projects(self) -> List[Project]: """获取所有项目列表""" return self.projects.copy() def get_project_by_id(self, project_id: int) -> Optional[Project]: """根据ID获取项目""" for project in self.projects: if project.id == project_id: return project return None def get_project_by_name(self, name: str) -> Optional[Project]: """根据项目名称获取特定项目""" for project in self.projects: if project.title == name: return project return None # ==================== 项目验证方法 ==================== def is_project_name_exists(self, name: str) -> bool: """检查项目名称是否已存在""" return self.get_project_by_name(name) is not None def is_project_directory_exists(self, path: Path, name: str) -> bool: """检查项目目录是否已存在于文件系统中""" project_dir = path / name return project_dir.exists() def is_project_path_exists(self, path: Path) -> bool: """检查项目路径是否已存在于项目列表中""" normalized_path = PathUtils.normalize_path(str(path)) for project in self.projects: if project.path and PathUtils.normalize_path(str(project.path)) == normalized_path: return True if project.project_dir and PathUtils.normalize_path(str(project.project_dir)) == normalized_path: return True return False def validate_project_creation(self, name: str, path: str) -> tuple[bool, str]: """验证项目创建的有效性""" if not name or not name.strip(): return False, "请输入项目名称" if not PathUtils.is_valid_path(name): return False, f"项目名称包含非法字符" path_obj = PathUtils.normalize_path(path) if self.is_project_directory_exists(path_obj, name): return False, f"目录已存在" return True, "" def validate_project_import(self, name: str, path: str) -> tuple[bool, str]: """验证项目导入的有效性""" if not name or not name.strip(): return False, "项目名称不能为空" if not PathUtils.is_valid_path(name): return False, f"项目名称包含非法字符" path_obj = PathUtils.normalize_path(path) if not path_obj.exists(): return False, f"文件夹不存在" if not path_obj.is_dir(): return False, f"选择的路径不是文件夹" if self.is_project_path_exists(path_obj): return False, f"该文件夹已被导入" return True, "" def validate_project_open(self, project_path: str) -> tuple[bool, str]: """验证项目是否可以打开""" if not project_path: return False, "项目路径不存在或无效" path_obj = PathUtils.normalize_path(project_path) if not path_obj.exists(): return False, "项目路径不存在或无效" if not path_obj.is_dir(): return False, "项目路径不是有效的文件夹" if not os.access(path_obj, os.R_OK): return False, "没有权限访问项目文件夹" try: if not any(path_obj.iterdir()): return False, "项目文件夹为空" except PermissionError: return False, "没有权限读取项目文件夹内容" return True, "" # ==================== 项目CRUD操作方法 ==================== def add_project(self, title: str, description: str, project_type: str, path: str) -> Project: """添加新项目""" new_id = max([p.id for p in self.projects], default=0) + 1 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") path_obj = PathUtils.normalize_path(path) project_dir = path_obj / title try: PathUtils.ensure_path_exists(project_dir) self._create_project_structure(project_dir, project_type, title, description) except Exception as e: raise Exception(f"创建项目目录失败: {str(e)}") # 创建项目图标路径 filename = f'{title}.png' full_path = project_dir / filename new_project = Project( id=new_id, title=title, date=current_time, type=project_type, image=str(full_path), path=str(path_obj), project_dir=str(project_dir), description=description ) self.projects.insert(0, new_project) self.add_project_to_watcher(new_project) self.project_added.emit(new_project) self.projects_changed.emit() self.save_projects() return new_project def import_project(self, title: str, description: str, project_type: str, project_dir: str) -> Project: """导入现有项目文件夹""" project_dir_path = PathUtils.normalize_path(project_dir) if not project_dir_path.exists(): raise Exception(f"项目目录不存在: {project_dir}") if not project_dir_path.is_dir(): raise Exception(f"路径不是文件夹: {project_dir}") new_id = max([p.id for p in self.projects], default=0) + 1 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") parent_path = project_dir_path.parent filename = f'{title}.png' full_path = project_dir_path / filename new_project = Project( id=new_id, title=title, date=current_time, type=project_type, image=str(full_path), path=str(parent_path), project_dir=str(project_dir_path), description=description ) self.projects.insert(0, new_project) self.add_project_to_watcher(new_project) self.project_added.emit(new_project) self.projects_changed.emit() self.save_projects() return new_project def _create_project_structure(self, project_dir: Path, project_type: str, title: str, description: str): """创建项目文件结构""" try: # 创建基本目录结构 directories = ['models', 'textures', 'scenes'] for directory in directories: dir_path = project_dir / directory PathUtils.ensure_path_exists(dir_path) # 创建项目配置文件 project_config = { "name": title, "path": str(project_dir), "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "last_modified": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "version": "1.0.0", "engine_version": "1.0.0" } config_file = project_dir / 'project.json' with open(config_file, 'w', encoding='utf-8') as f: json.dump(project_config, f, ensure_ascii=False, indent=2) self._create_type_specific_files(project_dir, project_type) except Exception as e: print(f"创建项目文件结构失败: {e}") def _create_type_specific_files(self, project_dir: Path, project_type: str): """根据项目类型创建特定文件""" try: if project_type == 'empty': # 创建配置目录和文件 config_dir = project_dir / 'config' PathUtils.ensure_path_exists(config_dir) config_content = { 'project_settings': { 'version': '1.0.0', 'build_target': 'development', 'dependencies': [] }, 'custom_settings': {} } config_file = config_dir / 'settings.json' with open(config_file, 'w', encoding='utf-8') as f: json.dump(config_content, f, ensure_ascii=False, indent=2) # 创建源代码目录和示例文件 src_dir = project_dir / 'src' PathUtils.ensure_path_exists(src_dir) src_file = src_dir / 'main.py' with open(src_file, 'w', encoding='utf-8') as f: f.write('''#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 主程序入口文件 """ def main(): """主函数""" print("Hello, MetaCore!") print("这是一个空白项目模板,您可以在此基础上开始开发。") if __name__ == "__main__": main() ''') except Exception as e: print(f"创建项目类型特定文件失败: {e}") def remove_project(self, project_id: int) -> bool: """删除项目""" for i, project in enumerate(self.projects): if project.id == project_id: self.remove_project_from_watcher(project) del self.projects[i] self.project_removed.emit(project_id) self.projects_changed.emit() self.save_projects() return True return False def update_project(self, project: Project): """更新项目""" for i, p in enumerate(self.projects): if p.id == project.id: self.projects[i] = project self.project_updated.emit(project) self.projects_changed.emit() self.save_projects() break def cleanup_old_preview_images(self, project_id: int): """清理指定项目的旧预览图""" try: preview_dir = Path.cwd() / 'MetaCore' / 'Resources' / 'ProjectPreviews' if not preview_dir.exists(): return # 查找该项目的所有预览图 pattern = f"*preview_{project_id}_*.png" old_previews = list(preview_dir.glob(pattern)) # 按修改时间排序,保留最新的一个,删除其他的 if len(old_previews) > 1: old_previews.sort(key=lambda x: x.stat().st_mtime, reverse=True) # 保留最新的,删除其他的 for old_preview in old_previews[1:]: try: old_preview.unlink() print(f"已删除旧预览图: {old_preview}") except Exception as e: print(f"删除旧预览图失败 {old_preview}: {e}") except Exception as e: print(f"清理旧预览图时发生错误: {e}") def get_projects_by_type(self, project_type: str) -> List[Project]: """根据类型获取项目""" if project_type in ["overview", "all"]: return self.get_all_projects() elif project_type in ["management", "resource_category", "resource_management", "system_settings"]: return [] else: return [p for p in self.projects if p.type == project_type] def search_projects(self, keyword: str) -> List[Project]: """搜索项目""" if not keyword: return self.get_all_projects() keyword = keyword.lower() return [p for p in self.projects if keyword in p.title.lower()] def get_project_types(self) -> List[str]: """获取所有项目类型""" types = set(p.type for p in self.projects) return sorted(list(types)) # ==================== 项目监控方法 ==================== def setup_project_monitoring(self): """设置项目监控""" for project in self.projects: self.add_project_to_watcher(project) # 添加定时检查机制 self.check_timer = QTimer() self.check_timer.timeout.connect(self.check_all_projects_existence) self.check_timer.start(3000) # 每3秒检查一次 def check_all_projects_existence(self): """检查所有项目是否仍然存在""" deleted_projects = [] restored_projects = [] for project in self.projects: if project.project_dir: project_path = PathUtils.normalize_path(str(project.project_dir)) if project.status == 'normal' and not project_path.exists(): deleted_projects.append(project) elif project.status == 'pending_delete' and project_path.exists(): restored_projects.append(project) if deleted_projects: self.handle_multiple_projects_deleted(deleted_projects) if restored_projects: self.handle_multiple_projects_restored(restored_projects) def handle_multiple_projects_deleted(self, deleted_projects): """处理多个项目目录被删除的情况""" from PyQt5.QtWidgets import QMessageBox if len(deleted_projects) == 1: self.handle_project_directory_deleted(deleted_projects[0]) else: project_names = [p.title for p in deleted_projects] project_list = "\n".join([f"• {name}" for name in project_names]) # reply = QMessageBox.question( # None, # "多个项目目录已删除", # f"检测到以下 {len(deleted_projects)} 个项目的目录已被删除:\n\n" # f"{project_list}\n\n" # f"是否要从项目列表中移除这些项目?", # QMessageBox.Yes | QMessageBox.No, # QMessageBox.Yes # ) reply = UniversalMessageDialog.show_info("多个项目目录已删除", f"检测到以下 {len(deleted_projects)} 个项目的目录已被删除:\n\n" f"{project_list}\n\n" f"是否要从项目列表中移除这些项目?", True, "确定", "取消") if reply == QDialog.Accepted: for project in deleted_projects: self.remove_project_from_watcher(project) self.remove_project(project.id) print(f"已自动移除 {len(deleted_projects)} 个被删除的项目") else: for project in deleted_projects: self.remove_project_from_watcher(project) project.status = 'pending_delete' self.project_updated.emit(project) self.projects_changed.emit() self.save_projects() print(f"已将 {len(deleted_projects)} 个项目标记为待删除状态") def handle_multiple_projects_restored(self, restored_projects): """处理多个项目目录恢复的情况""" for project in restored_projects: self.auto_restore_project(project) def auto_restore_project(self, project: Project): """自动恢复项目到正常状态""" if project.status == 'pending_delete' and project.project_dir: project_path = PathUtils.normalize_path(str(project.project_dir)) if project_path.exists(): project.status = 'normal' self.add_project_to_watcher(project) self.project_updated.emit(project) self.projects_changed.emit() self.save_projects() print(f"项目已自动恢复: {project.title}") return True return False def add_project_to_watcher(self, project: Project): """将项目添加到文件监控器""" if not project.project_dir: return project_path = PathUtils.normalize_path(str(project.project_dir)) if not project_path.exists(): return path_str = str(project_path) if path_str not in self.watched_paths: self.file_watcher.addPath(path_str) self.watched_paths[path_str] = project.id print(f"开始监控项目目录: {path_str}") def remove_project_from_watcher(self, project: Project): """从文件监控器中移除项目""" if not project.project_dir: return path_str = str(PathUtils.normalize_path(str(project.project_dir))) if path_str in self.watched_paths: self.file_watcher.removePath(path_str) del self.watched_paths[path_str] print(f"停止监控项目目录: {path_str}") def on_directory_changed(self, path: str): """目录变化处理""" print(f"检测到目录变化: {path}") if path in self.watched_paths: path_id = self.watched_paths[path] if isinstance(path_id, int): self.check_project_directory(path_id, path) def on_file_changed(self, path: str): """文件变化处理""" print(f"检测到文件变化: {path}") def check_project_directory(self, project_id: int, directory_path: str): """检查项目目录状态""" project = self.get_project_by_id(project_id) if not project: return project_path = PathUtils.normalize_path(str(project.project_dir)) if project.status == 'normal' and not project_path.exists(): print(f"项目目录已被删除: {project_path}") self.handle_project_directory_deleted(project) def handle_project_directory_deleted(self, project: Project): """处理项目目录被删除的情况""" from PyQt5.QtWidgets import QMessageBox if project.status != 'normal': return # reply = QMessageBox.question( # None, # "项目目录已删除", # f"检测到项目 \"{project.title}\" 的目录已被删除:\n{project.project_dir}\n\n" # f"是否要从项目列表中移除此项目?", # QMessageBox.Yes | QMessageBox.No, # QMessageBox.Yes # ) reply = UniversalMessageDialog.show_info("项目目录已删除", f"检测到项目 \"{project.title}\" 的目录已被删除:\n{project.project_dir}\n\n" f"是否要从项目列表中移除此项目?", True, "确定", "取消") if reply == QDialog.Accepted: self.remove_project_from_watcher(project) self.remove_project(project.id) print(f"已自动移除被删除的项目: {project.title}") else: self.remove_project_from_watcher(project) project.status = 'pending_delete' self.project_updated.emit(project) self.projects_changed.emit() self.save_projects() print(f"项目标记为待删除状态: {project.title}") def restore_project(self, project_id: int): """恢复项目到正常状态(手动恢复)""" project = self.get_project_by_id(project_id) if project and project.status == 'pending_delete': project_path = PathUtils.normalize_path(str(project.project_dir)) if project_path.exists(): return self.auto_restore_project(project) else: print(f"项目目录仍不存在,无法恢复: {project.title}") return False return False def confirm_delete_project(self, project_id: int): """确认删除项目(用于待删除状态的项目)""" project = self.get_project_by_id(project_id) if project and project.status == 'pending_delete': self.remove_project(project_id) print(f"已确认删除项目: {project.title}") return True return False # ==================== PyCharm集成方法 ==================== def run_python_in_venv(self, venv_path: str, script_path: str, args=None, cwd=None, timeout=30): """在指定虚拟环境中运行Python脚本""" try: venv_path_obj = PathUtils.normalize_path(venv_path) script_path_obj = PathUtils.normalize_path(script_path) # 获取虚拟环境中的Python解释器 python_executable = PlatformUtils.get_python_executable(venv_path_obj) # 构建命令 cmd = [str(python_executable), str(script_path_obj)] if args: # 将所有参数转换为字符串,处理 Path 对象 str_args = [str(arg) for arg in args] print("参数:", str_args) cmd.extend(str_args) # 设置环境变量 env = os.environ.copy() env['VIRTUAL_ENV'] = str(venv_path_obj) # 强制子进程使用 UTF-8 编码,避免 emoji 和特殊字符编码错误 env['PYTHONIOENCODING'] = 'utf-8' system = PlatformUtils.get_system_type() if system == 'windows': env['PATH'] = str(venv_path_obj / 'Scripts') + os.pathsep + env['PATH'] else: env['PATH'] = str(venv_path_obj / 'bin') + os.pathsep + env['PATH'] # 根据平台确定编码 # 注意:子进程会使用 UTF-8(通过 PYTHONIOENCODING),这里用于读取子进程输出 encoding = 'utf-8' # 统一使用 UTF-8 # 执行命令 process = subprocess.Popen( cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding=encoding, errors='replace', # 遇到无法解码的字符时用替换字符代替,避免崩溃 env=env ) return process, "", "" except Exception as e: return None, "", f"在虚拟环境中运行Python脚本时出错: {str(e)}" def run_project_command(self, project_path: str, target_project_path: str): """通过PyCharm命令行接口运行项目""" try: project_path_obj = PathUtils.normalize_path(project_path) print(f"Finding entry point for project: {project_path_obj}") entry_point = self.find_project_entry_point(project_path_obj) if not entry_point: print(f"No entry point found for project: {project_path_obj}") return False # 使用跨平台的虚拟环境查找方法 venv_path = PlatformUtils.find_venv_path(project_path_obj) if not venv_path: print(f"错误: 未找到虚拟环境") return False print(f"虚拟环境路径: {venv_path}") print(f"脚本路径: {entry_point}") if not entry_point.exists(): print(f"错误: 脚本文件不存在: {entry_point}") return False print(f'target_project_path:{target_project_path}') process, stdout, stderr = self.run_python_in_venv(str(venv_path), str(entry_point), [target_project_path]) try: stdout, stderr = process.communicate(timeout=2) if process.returncode == 0: print("运行成功,输出:", stdout) return True else: print("运行失败,错误:", stderr) return False except subprocess.TimeoutExpired: print("进程仍在运行(认为启动成功)") return True except subprocess.TimeoutExpired: print("PyCharm run command timeout - project may be running") return True except Exception as e: print(f"PyCharm run command failed: {e}") return False def launch_pycharm_and_open_project(self, project_path: str, target_project_path: str): """启动PyCharm,等待加载完成后调用项目方法打开指定项目""" try: if not self.start_pycharm(project_path): return False self.wait_for_pycharm_ready_and_run_project(project_path, target_project_path) return True except Exception as e: print(f"启动PyCharm并打开项目失败: {e}") return False def start_pycharm(self, project_path: str): """启动PyCharm并打开指定项目""" try: project_path_obj = PathUtils.normalize_path(project_path) if not project_path_obj.exists(): print(f"Project path does not exist: {project_path_obj}") return False self.current_opened_project = str(project_path_obj) # 获取PyCharm路径 pycharm_paths = PlatformUtils.get_pycharm_paths() pycharm_cmd = PlatformUtils.find_executable(pycharm_paths) if not pycharm_cmd: print("PyCharm not found. Please install PyCharm or add it to PATH.") return False # 构建启动命令 cmd = [str(pycharm_cmd), str(project_path_obj)] # 启动PyCharm进程 system = PlatformUtils.get_system_type() if system == "windows": self.pycharm_process = subprocess.Popen( cmd, shell=True, creationflags=subprocess.CREATE_NEW_CONSOLE ) else: self.pycharm_process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) print(f"Starting PyCharm with project: {project_path_obj}") self.pycharm_ready = False return True except Exception as e: print(f"Failed to start PyCharm: {e}") return False def wait_for_pycharm_ready_and_run_project(self, project_path: str, target_project_path: str): """等待PyCharm启动完成并运行项目""" print(f"Setting up PyCharm ready check and project run for: {project_path}") self.project_path = project_path self.target_project_path = target_project_path self.check_start_time = time.time() self.project_run_started = False self.project_run_completed = False self.pycharm_check_timer.start(3000) # 每3秒检查一次 print("PyCharm check timer started") def _check_pycharm_status(self): """定时检查PyCharm状态的槽函数""" try: elapsed_time = time.time() - self.check_start_time print(f"Checking PyCharm status... ({elapsed_time:.1f}s elapsed)") if elapsed_time > self.max_wait_time: print("Timeout waiting for PyCharm to be ready") self.pycharm_check_timer.stop() return if self.is_pycharm_ready(): print("PyCharm is ready") self.pycharm_ready = True self.pycharm_started.emit() if not self.project_run_started: print(f"Running project main function: {self.project_path}") if self.run_project_main_function(self.project_path): self.project_run_started = True self.pycharm_check_timer.stop() print("Project main function started, waiting for completion...") else: print("Failed to run project main function") self.pycharm_check_timer.stop() return else: print("PyCharm not ready yet, continuing to wait...") except Exception as e: print(f"Error in _check_pycharm_status: {e}") self.pycharm_check_timer.stop() def run_project_main_function(self, project_path: str): """运行项目主函数""" try: if self.run_project_via_pycharm_command(project_path): return True print("All methods to run project main function failed") return False except Exception as e: print(f"Error running project main function: {e}") return False def run_project_via_pycharm_command(self, project_path: str): """通过PyCharm命令行接口运行项目""" try: project_path_obj = PathUtils.normalize_path(project_path) entry_point = self.find_project_entry_point(project_path_obj) if not entry_point: print(f"No entry point found for project: {project_path_obj}") return False # 构建运行命令 run_cmd = [ "python3.10", str(entry_point), self.target_project_path ] result = subprocess.Popen(run_cmd) print(f'result:{result.returncode}') if result.returncode == 0: print(f"Successfully started project run: {project_path_obj}") return True except subprocess.TimeoutExpired: print("PyCharm run command timeout - project may be running") return True except Exception as e: print(f"PyCharm run command failed: {e}") return False def find_project_entry_point(self, project_path: Path) -> Optional[Path]: """查找项目的入口点文件""" entry_point_names = [ 'Start_Run.py', 'main.py', 'app.py', 'run.py', 'start.py', 'application.py', ] # 在项目根目录中查找 for entry_name in entry_point_names: entry_path = project_path / entry_name if entry_path.exists(): print(f"Found entry point: {entry_path}") return entry_path # 如果没有找到标准入口点,返回src/main.py default_entry = project_path / 'src' / 'main.py' print(f"Using default entry point: {default_entry}") return default_entry if default_entry.parent.exists() else None def find_project_point(self, project_path: str, entry_point_name: str) -> Optional[str]: """使用pathlib安全地查找项目的入口点文件""" try: root = PathUtils.normalize_path(project_path) except Exception: print(f"❌ 错误: 项目路径不存在或不是一个目录 -> '{project_path}'") return None print(f"🔍 正在搜索项目: {root}") # 定义要搜索的目录列表 search_relative_dirs = ['src', '.'] # 遍历搜索 for rel_dir in search_relative_dirs: search_dir = root / rel_dir if not search_dir.is_dir(): continue print(f" -> 正在检查目录: {search_dir}") entry_path = search_dir / entry_point_name if entry_path.is_file(): print(f"✅ 找到入口文件: {entry_path}") return str(entry_path) print(f"❌ 未能在项目中找到常见的入口文件。") return None def _on_check_pycharm_ready(self, target_project_path: str): """处理检查PyCharm准备信号的槽函数""" pass def is_pycharm_ready(self): """检查PyCharm是否准备就绪""" try: if self.pycharm_process and self.pycharm_process.poll() is not None: print("PyCharm process has terminated") return False if self.check_pycharm_responsive(): return True if self.is_pycharm_window_visible(): time.sleep(2) return True return False except Exception as e: print(f"Error checking PyCharm status: {e}") return False def check_pycharm_responsive(self): """检查PyCharm是否响应命令""" try: system = PlatformUtils.get_system_type() encoding = 'gbk' if system == 'windows' else 'utf-8' result = subprocess.run( ["pycharm", "--version"], capture_output=True, text=True, encoding=encoding, errors='replace', timeout=5 ) if result.returncode == 0: print("PyCharm is responsive") return True except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): pass return False def is_pycharm_window_visible(self): """检查PyCharm窗口是否可见(跨平台实现)""" try: system = PlatformUtils.get_system_type() if system == "windows": return self._check_pycharm_window_windows() elif system == "darwin": # macOS return self._check_pycharm_window_macos() else: # Linux return self._check_pycharm_window_linux() except Exception as e: print(f"Error checking PyCharm window: {e}") return True def _check_pycharm_window_windows(self): """检查Windows上的PyCharm窗口""" try: import win32gui import win32process def enum_windows_callback(hwnd, pid_list): _, window_pid = win32process.GetWindowThreadProcessId(hwnd) if window_pid in pid_list: window_title = win32gui.GetWindowText(hwnd) if "pycharm" in window_title.lower(): pid_list['found'] = True return True if self.pycharm_process: pycharm_pid = self.pycharm_process.pid pid_list = {'found': False, pycharm_pid: True} win32gui.EnumWindows(enum_windows_callback, pid_list) return pid_list['found'] return False except ImportError: return True except Exception: return True def _check_pycharm_window_macos(self): """检查macOS上的PyCharm窗口""" try: result = subprocess.run( ['osascript', '-e', 'tell application "System Events" to count windows of process "PyCharm"'], capture_output=True, text=True, encoding='utf-8', errors='replace' ) if result.returncode == 0 and result.stdout.strip().isdigit(): window_count = int(result.stdout.strip()) return window_count > 0 return True except Exception: return True def _check_pycharm_window_linux(self): """检查Linux上的PyCharm窗口""" try: result = subprocess.run( ['xdotool', 'search', '--name', 'PyCharm'], capture_output=True, text=True, encoding='utf-8', errors='replace' ) if result.returncode == 0 and result.stdout.strip(): return True return True except FileNotFoundError: return True except Exception: return True def stop_pycharm(self): """停止PyCharm进程""" try: if self.pycharm_process and self.pycharm_process.poll() is None: self.pycharm_process.terminate() self.pycharm_process.wait(timeout=10) print("PyCharm process terminated") except Exception as e: print(f"Error stopping PyCharm: {e}")