CadHubManage/app/core/rename_manager.py

367 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文件重命名管理器
提供批量重命名策略和执行功能
"""
from pathlib import Path
from typing import List, Dict, Optional, Callable
from datetime import datetime
import re
import logging
logger = logging.getLogger(__name__)
class RenameStrategy:
"""重命名策略基类"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
def apply(self, filename: str, index: int, params: dict) -> str:
"""
应用重命名策略
Args:
filename: 原文件名
index: 文件索引从0开始
params: 策略参数
Returns:
新文件名
"""
raise NotImplementedError
class AddPrefixStrategy(RenameStrategy):
"""添加前缀策略"""
def __init__(self):
super().__init__(
name="add_prefix",
description="在文件名前添加前缀"
)
def apply(self, filename: str, index: int, params: dict) -> str:
prefix = params.get("prefix", "")
return f"{prefix}{filename}"
class AddSuffixStrategy(RenameStrategy):
"""添加后缀策略(在扩展名前)"""
def __init__(self):
super().__init__(
name="add_suffix",
description="在扩展名前添加后缀"
)
def apply(self, filename: str, index: int, params: dict) -> str:
suffix = params.get("suffix", "")
# 分离文件名和扩展名(包括版本号)
name_part, ext_part = self._split_name_ext(filename)
return f"{name_part}{suffix}{ext_part}"
@staticmethod
def _split_name_ext(filename: str) -> tuple:
"""分离文件名和扩展名(保留版本号)"""
# 处理如 file.prt.1 这样的文件
parts = filename.split('.')
if len(parts) > 1:
# 检查最后一部分是否为数字(版本号)
if parts[-1].isdigit() and len(parts) > 2:
# 有版本号file.prt.1 -> file, .prt.1
name = parts[0]
ext = '.' + '.'.join(parts[1:])
else:
# 无版本号file.prt -> file, .prt
name = '.'.join(parts[:-1])
ext = '.' + parts[-1]
else:
name = filename
ext = ''
return name, ext
class SequenceStrategy(RenameStrategy):
"""序号重命名策略"""
def __init__(self):
super().__init__(
name="sequence",
description="使用序号重命名文件"
)
def apply(self, filename: str, index: int, params: dict) -> str:
base_name = params.get("base_name", "file")
start_number = params.get("start_number", 1)
digits = params.get("digits", 3)
separator = params.get("separator", "_")
# 保留扩展名
_, ext_part = AddSuffixStrategy._split_name_ext(filename)
# 生成序号
number = start_number + index
number_str = str(number).zfill(digits)
return f"{base_name}{separator}{number_str}{ext_part}"
class ReplaceTextStrategy(RenameStrategy):
"""文本替换策略"""
def __init__(self):
super().__init__(
name="replace_text",
description="替换文件名中的文本"
)
def apply(self, filename: str, index: int, params: dict) -> str:
search_text = params.get("search", "")
replace_text = params.get("replace", "")
case_sensitive = params.get("case_sensitive", True)
if not search_text:
return filename
if case_sensitive:
return filename.replace(search_text, replace_text)
else:
# 不区分大小写的替换
pattern = re.compile(re.escape(search_text), re.IGNORECASE)
return pattern.sub(replace_text, filename)
class AddDateTimeStrategy(RenameStrategy):
"""添加日期时间策略"""
def __init__(self):
super().__init__(
name="add_datetime",
description="添加日期时间戳"
)
def apply(self, filename: str, index: int, params: dict) -> str:
format_str = params.get("format", "%Y%m%d_%H%M%S")
position = params.get("position", "suffix") # prefix or suffix
separator = params.get("separator", "_")
timestamp = datetime.now().strftime(format_str)
name_part, ext_part = AddSuffixStrategy._split_name_ext(filename)
if position == "prefix":
return f"{timestamp}{separator}{name_part}{ext_part}"
else:
return f"{name_part}{separator}{timestamp}{ext_part}"
class ChangeCaseStrategy(RenameStrategy):
"""大小写转换策略"""
def __init__(self):
super().__init__(
name="change_case",
description="转换文件名大小写"
)
def apply(self, filename: str, index: int, params: dict) -> str:
case_type = params.get("case_type", "lower") # lower, upper, title
name_part, ext_part = AddSuffixStrategy._split_name_ext(filename)
if case_type == "lower":
name_part = name_part.lower()
elif case_type == "upper":
name_part = name_part.upper()
elif case_type == "title":
name_part = name_part.title()
return f"{name_part}{ext_part}"
class RenameManager:
"""重命名管理器"""
def __init__(self):
self.strategies: Dict[str, RenameStrategy] = {}
self._register_default_strategies()
def _register_default_strategies(self):
"""注册默认策略"""
strategies = [
AddPrefixStrategy(),
AddSuffixStrategy(),
SequenceStrategy(),
ReplaceTextStrategy(),
AddDateTimeStrategy(),
ChangeCaseStrategy()
]
for strategy in strategies:
self.strategies[strategy.name] = strategy
def get_available_strategies(self) -> List[Dict[str, str]]:
"""获取可用的重命名策略列表"""
return [
{
"name": strategy.name,
"description": strategy.description
}
for strategy in self.strategies.values()
]
def preview_rename(
self,
filenames: List[str],
strategy_name: str,
params: dict
) -> List[Dict[str, str]]:
"""
预览重命名结果
Args:
filenames: 文件名列表
strategy_name: 策略名称
params: 策略参数
Returns:
预览结果列表,包含原文件名和新文件名
"""
if strategy_name not in self.strategies:
raise ValueError(f"未知的重命名策略: {strategy_name}")
strategy = self.strategies[strategy_name]
results = []
for index, filename in enumerate(filenames):
try:
new_name = strategy.apply(filename, index, params)
results.append({
"original": filename,
"new": new_name,
"success": True
})
except Exception as e:
results.append({
"original": filename,
"new": filename,
"success": False,
"error": str(e)
})
return results
def execute_rename(
self,
base_path: Path,
file_paths: List[str],
strategy_name: str,
params: dict
) -> Dict[str, any]:
"""
执行批量重命名
Args:
base_path: 基础路径
file_paths: 相对文件路径列表
strategy_name: 策略名称
params: 策略参数
Returns:
执行结果
"""
if strategy_name not in self.strategies:
raise ValueError(f"未知的重命名策略: {strategy_name}")
strategy = self.strategies[strategy_name]
results = {
"success_count": 0,
"failed_count": 0,
"results": []
}
for index, file_path in enumerate(file_paths):
try:
# 获取完整路径
full_path = base_path / file_path
# 安全检查
full_path = full_path.resolve()
base_path_resolved = base_path.resolve()
if not str(full_path).startswith(str(base_path_resolved)):
raise ValueError("访问被拒绝")
if not full_path.exists():
raise FileNotFoundError("文件不存在")
# 生成新文件名
original_filename = full_path.name
new_filename = strategy.apply(original_filename, index, params)
# 处理文件名冲突
new_path = full_path.parent / new_filename
new_path = self._resolve_conflict(new_path)
# 执行重命名
full_path.rename(new_path)
results["results"].append({
"original_path": file_path,
"original_name": original_filename,
"new_name": new_path.name,
"new_path": str(new_path.relative_to(base_path_resolved)).replace('\\', '/'),
"success": True
})
results["success_count"] += 1
logger.info(f"重命名成功: {original_filename} -> {new_path.name}")
except Exception as e:
results["results"].append({
"original_path": file_path,
"original_name": Path(file_path).name,
"success": False,
"error": str(e)
})
results["failed_count"] += 1
logger.error(f"重命名失败: {file_path}, 错误: {str(e)}")
return results
def _resolve_conflict(self, path: Path) -> Path:
"""
解决文件名冲突
Args:
path: 目标路径
Returns:
解决冲突后的路径
"""
if not path.exists():
return path
# 分离文件名和扩展名
name_part, ext_part = AddSuffixStrategy._split_name_ext(path.name)
parent = path.parent
# 添加数字后缀
counter = 1
while True:
new_name = f"{name_part}_{counter}{ext_part}"
new_path = parent / new_name
if not new_path.exists():
return new_path
counter += 1
# 防止无限循环
if counter > 1000:
raise ValueError("无法解决文件名冲突")
# 全局实例
rename_manager = RenameManager()