""" 文件重命名管理器 提供批量重命名策略和执行功能 """ from pathlib import Path from typing import List, Dict, Optional, Callable from datetime import datetime import re import logging logger = logging.getLogger(__name__) class RenameStrategy: """重命名策略基类""" def __init__(self, name: str, description: str): self.name = name self.description = description def apply(self, filename: str, index: int, params: dict) -> str: """ 应用重命名策略 Args: filename: 原文件名 index: 文件索引(从0开始) params: 策略参数 Returns: 新文件名 """ raise NotImplementedError class AddPrefixStrategy(RenameStrategy): """添加前缀策略""" def __init__(self): super().__init__( name="add_prefix", description="在文件名前添加前缀" ) def apply(self, filename: str, index: int, params: dict) -> str: prefix = params.get("prefix", "") return f"{prefix}{filename}" class AddSuffixStrategy(RenameStrategy): """添加后缀策略(在扩展名前)""" def __init__(self): super().__init__( name="add_suffix", description="在扩展名前添加后缀" ) def apply(self, filename: str, index: int, params: dict) -> str: suffix = params.get("suffix", "") # 分离文件名和扩展名(包括版本号) name_part, ext_part = self._split_name_ext(filename) return f"{name_part}{suffix}{ext_part}" @staticmethod def _split_name_ext(filename: str) -> tuple: """分离文件名和扩展名(保留版本号)""" # 处理如 file.prt.1 这样的文件 parts = filename.split('.') if len(parts) > 1: # 检查最后一部分是否为数字(版本号) if parts[-1].isdigit() and len(parts) > 2: # 有版本号:file.prt.1 -> file, .prt.1 name = parts[0] ext = '.' + '.'.join(parts[1:]) else: # 无版本号:file.prt -> file, .prt name = '.'.join(parts[:-1]) ext = '.' + parts[-1] else: name = filename ext = '' return name, ext class SequenceStrategy(RenameStrategy): """序号重命名策略""" def __init__(self): super().__init__( name="sequence", description="使用序号重命名文件" ) def apply(self, filename: str, index: int, params: dict) -> str: base_name = params.get("base_name", "file") start_number = params.get("start_number", 1) digits = params.get("digits", 3) separator = params.get("separator", "_") # 保留扩展名 _, ext_part = AddSuffixStrategy._split_name_ext(filename) # 生成序号 number = start_number + index number_str = str(number).zfill(digits) return f"{base_name}{separator}{number_str}{ext_part}" class ReplaceTextStrategy(RenameStrategy): """文本替换策略""" def __init__(self): super().__init__( name="replace_text", description="替换文件名中的文本" ) def apply(self, filename: str, index: int, params: dict) -> str: search_text = params.get("search", "") replace_text = params.get("replace", "") case_sensitive = params.get("case_sensitive", True) if not search_text: return filename if case_sensitive: return filename.replace(search_text, replace_text) else: # 不区分大小写的替换 pattern = re.compile(re.escape(search_text), re.IGNORECASE) return pattern.sub(replace_text, filename) class AddDateTimeStrategy(RenameStrategy): """添加日期时间策略""" def __init__(self): super().__init__( name="add_datetime", description="添加日期时间戳" ) def apply(self, filename: str, index: int, params: dict) -> str: format_str = params.get("format", "%Y%m%d_%H%M%S") position = params.get("position", "suffix") # prefix or suffix separator = params.get("separator", "_") timestamp = datetime.now().strftime(format_str) name_part, ext_part = AddSuffixStrategy._split_name_ext(filename) if position == "prefix": return f"{timestamp}{separator}{name_part}{ext_part}" else: return f"{name_part}{separator}{timestamp}{ext_part}" class ChangeCaseStrategy(RenameStrategy): """大小写转换策略""" def __init__(self): super().__init__( name="change_case", description="转换文件名大小写" ) def apply(self, filename: str, index: int, params: dict) -> str: case_type = params.get("case_type", "lower") # lower, upper, title name_part, ext_part = AddSuffixStrategy._split_name_ext(filename) if case_type == "lower": name_part = name_part.lower() elif case_type == "upper": name_part = name_part.upper() elif case_type == "title": name_part = name_part.title() return f"{name_part}{ext_part}" class RenameManager: """重命名管理器""" def __init__(self): self.strategies: Dict[str, RenameStrategy] = {} self._register_default_strategies() def _register_default_strategies(self): """注册默认策略""" strategies = [ AddPrefixStrategy(), AddSuffixStrategy(), SequenceStrategy(), ReplaceTextStrategy(), AddDateTimeStrategy(), ChangeCaseStrategy() ] for strategy in strategies: self.strategies[strategy.name] = strategy def get_available_strategies(self) -> List[Dict[str, str]]: """获取可用的重命名策略列表""" return [ { "name": strategy.name, "description": strategy.description } for strategy in self.strategies.values() ] def preview_rename( self, filenames: List[str], strategy_name: str, params: dict ) -> List[Dict[str, str]]: """ 预览重命名结果 Args: filenames: 文件名列表 strategy_name: 策略名称 params: 策略参数 Returns: 预览结果列表,包含原文件名和新文件名 """ if strategy_name not in self.strategies: raise ValueError(f"未知的重命名策略: {strategy_name}") strategy = self.strategies[strategy_name] results = [] for index, filename in enumerate(filenames): try: new_name = strategy.apply(filename, index, params) results.append({ "original": filename, "new": new_name, "success": True }) except Exception as e: results.append({ "original": filename, "new": filename, "success": False, "error": str(e) }) return results def execute_rename( self, base_path: Path, file_paths: List[str], strategy_name: str, params: dict ) -> Dict[str, any]: """ 执行批量重命名 Args: base_path: 基础路径 file_paths: 相对文件路径列表 strategy_name: 策略名称 params: 策略参数 Returns: 执行结果 """ if strategy_name not in self.strategies: raise ValueError(f"未知的重命名策略: {strategy_name}") strategy = self.strategies[strategy_name] results = { "success_count": 0, "failed_count": 0, "results": [] } for index, file_path in enumerate(file_paths): try: # 获取完整路径 full_path = base_path / file_path # 安全检查 full_path = full_path.resolve() base_path_resolved = base_path.resolve() if not str(full_path).startswith(str(base_path_resolved)): raise ValueError("访问被拒绝") if not full_path.exists(): raise FileNotFoundError("文件不存在") # 生成新文件名 original_filename = full_path.name new_filename = strategy.apply(original_filename, index, params) # 处理文件名冲突 new_path = full_path.parent / new_filename new_path = self._resolve_conflict(new_path) # 执行重命名 full_path.rename(new_path) results["results"].append({ "original_path": file_path, "original_name": original_filename, "new_name": new_path.name, "new_path": str(new_path.relative_to(base_path_resolved)).replace('\\', '/'), "success": True }) results["success_count"] += 1 logger.info(f"重命名成功: {original_filename} -> {new_path.name}") except Exception as e: results["results"].append({ "original_path": file_path, "original_name": Path(file_path).name, "success": False, "error": str(e) }) results["failed_count"] += 1 logger.error(f"重命名失败: {file_path}, 错误: {str(e)}") return results def _resolve_conflict(self, path: Path) -> Path: """ 解决文件名冲突 Args: path: 目标路径 Returns: 解决冲突后的路径 """ if not path.exists(): return path # 分离文件名和扩展名 name_part, ext_part = AddSuffixStrategy._split_name_ext(path.name) parent = path.parent # 添加数字后缀 counter = 1 while True: new_name = f"{name_part}_{counter}{ext_part}" new_path = parent / new_name if not new_path.exists(): return new_path counter += 1 # 防止无限循环 if counter > 1000: raise ValueError("无法解决文件名冲突") # 全局实例 rename_manager = RenameManager()