CommonAutoRearsh/engine/artifact_manager.py

116 lines
4.7 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from difflib import unified_diff
from fnmatch import fnmatch
from hashlib import sha256
from pathlib import Path
from engine.models import BaselineSnapshot, TaskSpec
@dataclass(frozen=True)
class ArtifactManager:
task: TaskSpec
def _is_excluded(self, relative_path: str) -> bool:
return any(fnmatch(relative_path, exclude) for exclude in self.task.artifacts.exclude)
def _artifact_roots(self) -> list[Path]:
root_dir = self.task.root_dir
roots: list[Path] = []
for pattern in self.task.artifacts.include:
normalized_pattern = pattern.replace("\\", "/")
segments = [segment for segment in normalized_pattern.split("/") if segment and segment != "."]
prefix: list[str] = []
saw_glob = False
for segment in segments:
if any(char in segment for char in "*?["):
saw_glob = True
break
prefix.append(segment)
candidate = root_dir.joinpath(*prefix) if prefix else root_dir
if prefix and not saw_glob and (not candidate.exists() or candidate.is_file()):
candidate = candidate.parent
roots.append(candidate)
minimal_roots: list[Path] = []
for root in sorted(set(roots), key=lambda candidate: (len(candidate.parts), candidate.as_posix())):
if any(existing == root or existing in root.parents for existing in minimal_roots):
continue
minimal_roots.append(root)
return minimal_roots
def _current_paths_within_roots(self) -> list[Path]:
root_dir = self.task.root_dir
discovered: set[Path] = set()
for root in self._artifact_roots():
if not root.exists():
continue
if root.is_file():
relative_path = root.relative_to(root_dir).as_posix()
if not self._is_excluded(relative_path):
discovered.add(root)
continue
for path in root.rglob("*"):
if not path.is_file():
continue
relative_path = path.relative_to(root_dir).as_posix()
if self._is_excluded(relative_path):
continue
discovered.add(path)
return sorted(discovered)
def resolve_paths(self) -> list[Path]:
root_dir = self.task.root_dir
resolved: set[Path] = set()
for pattern in self.task.artifacts.include:
for path in root_dir.glob(pattern):
if not path.is_file():
continue
relative_path = path.relative_to(root_dir).as_posix()
if self._is_excluded(relative_path):
continue
resolved.add(path)
return sorted(resolved)
def snapshot(self) -> BaselineSnapshot:
file_contents: dict[Path, str] = {}
file_hashes: dict[Path, str] = {}
for path in self.resolve_paths():
with path.open("r", encoding="utf-8", newline="") as handle:
content = handle.read()
file_contents[path] = content
file_hashes[path] = sha256(content.encode("utf-8")).hexdigest()
return BaselineSnapshot(file_contents=file_contents, file_hashes=file_hashes)
def restore(self, snapshot: BaselineSnapshot) -> None:
current_paths = set(self._current_paths_within_roots())
snapshot_paths = set(snapshot.file_contents)
for path in current_paths - snapshot_paths:
path.unlink()
for path, content in snapshot.file_contents.items():
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="") as handle:
handle.write(content)
def diff_summary(self, snapshot: BaselineSnapshot) -> str:
lines: list[str] = []
current_contents: dict[Path, str] = {}
for path in self._current_paths_within_roots():
with path.open("r", encoding="utf-8", newline="") as handle:
current_contents[path] = handle.read()
all_paths = sorted(set(snapshot.file_contents) | set(current_contents))
for path in all_paths:
before = snapshot.file_contents.get(path, "")
after = current_contents.get(path, "")
if before == after:
continue
diff = unified_diff(
before.splitlines(keepends=True),
after.splitlines(keepends=True),
fromfile=f"{path.as_posix()} (before)",
tofile=f"{path.as_posix()} (after)",
)
lines.extend(diff)
return "".join(lines)