from __future__ import annotations from dataclasses import replace from difflib import unified_diff from pathlib import Path from engine.artifact_manager import ArtifactManager from engine.models import BaselineSnapshot, TaskSpec class MutationValidationError(ValueError): pass def _count_changed_lines(before: str, after: str, path: Path) -> int: diff = unified_diff( before.splitlines(keepends=True), after.splitlines(keepends=True), fromfile=f"{path.as_posix()} (before)", tofile=f"{path.as_posix()} (after)", ) changed_lines = 0 for line in diff: if line.startswith(("---", "+++", "@@")): continue if line.startswith(("+", "-")): changed_lines += 1 return changed_lines def validate_candidate_changes(task: TaskSpec, snapshot: BaselineSnapshot, candidate_root: Path) -> None: changed_files = 0 changed_lines = 0 allowed_file_types = set(task.mutation.allowed_file_types) candidate_task = replace(task, root_dir=candidate_root) candidate_paths = set(ArtifactManager(candidate_task).resolve_paths()) for path, baseline_text in snapshot.file_contents.items(): relative_path = path.relative_to(task.root_dir) candidate_path = candidate_root / relative_path current_text = candidate_path.read_text(encoding="utf-8") if candidate_path.exists() else "" if current_text == baseline_text: candidate_paths.discard(candidate_path) continue changed_files += 1 if candidate_path.suffix not in allowed_file_types: raise MutationValidationError(f"disallowed file type: {candidate_path.suffix}") changed_lines += _count_changed_lines(baseline_text, current_text, candidate_path) candidate_paths.discard(candidate_path) for candidate_path in sorted(candidate_paths): changed_files += 1 if candidate_path.suffix not in allowed_file_types: raise MutationValidationError(f"disallowed file type: {candidate_path.suffix}") changed_lines += _count_changed_lines("", candidate_path.read_text(encoding="utf-8"), candidate_path) if changed_files > task.artifacts.max_files_per_iteration: raise MutationValidationError( f"too many changed files: {changed_files} > {task.artifacts.max_files_per_iteration}" ) if changed_lines > task.mutation.max_changed_lines: raise MutationValidationError( f"too many changed lines: {changed_lines} > {task.mutation.max_changed_lines}" )