CommonAutoRearsh/tests/test_orchestrator.py

508 lines
22 KiB
Python

from __future__ import annotations
import os
import tempfile
import unittest
from dataclasses import replace
from pathlib import Path
from engine.models import (
ArtifactSpec,
BudgetSpec,
LoggingSpec,
MutationSpec,
MutatorSpec,
ObjectiveSpec,
PolicySpec,
RunnerSpec,
ScorerParseSpec,
ScorerSpec,
TaskSpec,
)
from engine.orchestrator import run_single_iteration
def make_task(task_root: Path, max_changed_lines: int = 20, runner_command: str | None = None) -> TaskSpec:
return TaskSpec(
id="demo",
description="Demo task",
artifacts=ArtifactSpec(include=["fixtures/*.md"], exclude=[], max_files_per_iteration=1),
mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=max_changed_lines),
mutator=MutatorSpec(
type="command",
command="python ../../scripts/mutate_demo.py --task-dir . --artifact fixtures/sample.md",
cwd="tasks/demo",
timeout_seconds=30,
),
runner=RunnerSpec(
command=runner_command
or "python ../../scripts/evaluate_demo.py --task-dir . --artifact fixtures/sample.md --output ../../work/run.json",
cwd="tasks/demo",
timeout_seconds=30,
),
scorer=ScorerSpec(
type="command",
command="python scripts/score_demo.py --input work/run.json",
timeout_seconds=30,
parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"),
),
objective=ObjectiveSpec(primary_metric="score", direction="maximize"),
constraints=[],
policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"),
budget=BudgetSpec(max_iterations=1, max_failures=1),
logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"),
root_dir=task_root,
)
class OrchestratorTest(unittest.TestCase):
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory()
self.addCleanup(self.temp_dir.cleanup)
self.repo_root = Path(self.temp_dir.name)
self.task_root = self.repo_root / "tasks" / "demo"
(self.task_root / "fixtures").mkdir(parents=True)
(self.task_root / "subdir").mkdir()
(self.repo_root / "scripts").mkdir()
(self.repo_root / "work").mkdir()
(self.task_root / "fixtures" / "sample.md").write_text("# Original\n", encoding="utf-8")
def write_mutator(self, body: str | None = None) -> None:
script = body or (
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"(task_dir / args.artifact).write_text('# Candidate\\n', encoding='utf-8')\n"
)
(self.repo_root / "scripts" / "mutate_demo.py").write_text(script, encoding="utf-8")
def write_runner(self, score: float = 2.0, body: str | None = None) -> None:
script = body or (
"from pathlib import Path\n"
"import argparse\n"
"import json\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"artifact_path = task_dir / args.artifact\n"
"payload = {'score': " + repr(score) + ", 'metrics': {'artifact_text': artifact_path.read_text(encoding='utf-8')}}\n"
"output_path = (task_dir / args.output).resolve()\n"
"output_path.parent.mkdir(parents=True, exist_ok=True)\n"
"output_path.write_text(json.dumps(payload), encoding='utf-8')\n"
)
(self.repo_root / "scripts" / "evaluate_demo.py").write_text(script, encoding="utf-8")
def write_scorer(self) -> None:
(self.repo_root / "scripts" / "score_demo.py").write_text(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--input')\n"
"args = parser.parse_args()\n"
"print(Path(args.input).read_text(encoding='utf-8'))\n",
encoding="utf-8",
)
def test_scorer_uses_its_own_timeout_seconds(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
(self.repo_root / "scripts" / "score_demo.py").write_text(
"from pathlib import Path\n"
"import argparse\n"
"import time\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--input')\n"
"args = parser.parse_args()\n"
"time.sleep(2)\n"
"print(Path(args.input).read_text(encoding='utf-8'))\n",
encoding="utf-8",
)
task = replace(
make_task(self.task_root),
runner=replace(make_task(self.task_root).runner, timeout_seconds=5),
scorer=replace(make_task(self.task_root).scorer, timeout_seconds=1),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertEqual(decision.reason, "scorer failed with exit code 124")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_keep_uses_repo_relative_cwd_and_syncs_candidate_back(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
def test_crash_leaves_main_workspace_unchanged(self) -> None:
self.write_mutator()
self.write_runner(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"(task_dir / args.artifact).write_text('# Runner Modified\\n', encoding='utf-8')\n"
"raise SystemExit(9)\n"
)
)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertEqual(decision.reason, "command failed with exit code 9")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_discard_leaves_main_workspace_unchanged_when_candidate_does_not_improve(self) -> None:
self.write_mutator()
self.write_runner(score=0.5)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "discard")
self.assertEqual(decision.reason, "candidate did not improve primary score")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_validation_discard_happens_before_runner_execution(self) -> None:
self.write_mutator(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"(task_dir / args.artifact).write_text('# Candidate\\nextra line\\n', encoding='utf-8')\n"
)
)
runner_marker = self.repo_root / "work" / "runner-executed.txt"
self.write_runner(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"args = parser.parse_args()\n"
"marker = Path('../../work/runner-executed.txt').resolve()\n"
"marker.parent.mkdir(parents=True, exist_ok=True)\n"
"marker.write_text('ran\\n', encoding='utf-8')\n"
"raise SystemExit(0)\n"
)
)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root, max_changed_lines=1), baseline_score=1.0)
self.assertEqual(decision.status, "discard")
self.assertIn("too many changed lines", decision.reason)
self.assertFalse(runner_marker.exists())
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_validation_rejects_non_artifact_mutation_before_runner_execution(self) -> None:
self.write_mutator(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"(task_dir / args.artifact).write_text('# Candidate\\n', encoding='utf-8')\n"
"external_path = (task_dir / '../../scripts/payload.txt').resolve()\n"
"external_path.write_text('mutated\\n', encoding='utf-8')\n"
)
)
runner_marker = self.repo_root / "work" / "runner-executed.txt"
self.write_runner(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"parser.add_argument('--score')\n"
"args = parser.parse_args()\n"
"marker = Path('../../work/runner-executed.txt').resolve()\n"
"marker.parent.mkdir(parents=True, exist_ok=True)\n"
"marker.write_text('ran\\n', encoding='utf-8')\n"
)
)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "discard")
self.assertIn("non-artifact", decision.reason)
self.assertFalse(runner_marker.exists())
self.assertFalse((self.repo_root / "scripts" / "payload.txt").exists())
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_keep_revalidates_final_candidate_state_before_sync_back(self) -> None:
self.write_mutator()
self.write_runner(
body=(
"from pathlib import Path\n"
"import argparse\n"
"import json\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"artifact_path = task_dir / args.artifact\n"
"artifact_path.write_text('# Candidate\\nextra line\\n', encoding='utf-8')\n"
"payload = {'score': 2.0, 'metrics': {'artifact_text': artifact_path.read_text(encoding='utf-8')}}\n"
"output_path = (task_dir / args.output).resolve()\n"
"output_path.parent.mkdir(parents=True, exist_ok=True)\n"
"output_path.write_text(json.dumps(payload), encoding='utf-8')\n"
)
)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root, max_changed_lines=2), baseline_score=1.0)
self.assertEqual(decision.status, "discard")
self.assertIn("too many changed lines", decision.reason)
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_keep_ignores_populated_runtime_work_directory(self) -> None:
runtime_path = self.repo_root / "work" / "cache" / "seed.txt"
runtime_path.parent.mkdir(parents=True)
runtime_path.write_text("seed\n", encoding="utf-8")
self.write_mutator(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"artifact_path = task_dir / args.artifact\n"
"runtime_path = (task_dir / '../../work/cache/seed.txt').resolve()\n"
"artifact_text = '# Candidate\\n'\n"
"if runtime_path.exists():\n"
" runtime_path.write_text('mutated\\n', encoding='utf-8')\n"
" artifact_text = '# Candidate saw runtime state\\n'\n"
"artifact_path.write_text(artifact_text, encoding='utf-8')\n"
)
)
self.write_runner(score=2.0)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
self.assertEqual(runtime_path.read_text(encoding="utf-8"), "seed\n")
def test_keep_ignores_runtime_cache_directories_in_copy_and_hash_validation(self) -> None:
venv_path = self.repo_root / ".venv" / "seed.txt"
venv_path.parent.mkdir(parents=True)
venv_path.write_text("seed\n", encoding="utf-8")
pytest_cache_path = self.repo_root / ".pytest_cache" / "seed.txt"
pytest_cache_path.parent.mkdir(parents=True)
pytest_cache_path.write_text("seed\n", encoding="utf-8")
self.write_mutator(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"artifact_path = task_dir / args.artifact\n"
"venv_path = (task_dir / '../../.venv/seed.txt').resolve()\n"
"pytest_cache_path = (task_dir / '../../.pytest_cache/seed.txt').resolve()\n"
"artifact_text = '# Candidate\\n'\n"
"if venv_path.exists() or pytest_cache_path.exists():\n"
" artifact_text = '# Candidate saw runtime cache\\n'\n"
" if venv_path.exists():\n"
" venv_path.write_text('mutated\\n', encoding='utf-8')\n"
" if pytest_cache_path.exists():\n"
" pytest_cache_path.write_text('mutated\\n', encoding='utf-8')\n"
"artifact_path.write_text(artifact_text, encoding='utf-8')\n"
)
)
self.write_runner(score=2.0)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
self.assertEqual(venv_path.read_text(encoding="utf-8"), "seed\n")
self.assertEqual(pytest_cache_path.read_text(encoding="utf-8"), "seed\n")
def test_rejects_parent_segment_in_mutator_cwd(self) -> None:
marker_path = self.repo_root / "work" / "mutator-executed.txt"
self.write_mutator(
body=(
"from pathlib import Path\n"
f"Path({str(marker_path)!r}).write_text('ran\\n', encoding='utf-8')\n"
)
)
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
mutator=replace(make_task(self.task_root).mutator, cwd="../tasks/demo"),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertIn("mutator.cwd", decision.reason)
self.assertFalse(marker_path.exists())
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_crashes_when_mutator_cwd_does_not_exist(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
mutator=replace(make_task(self.task_root).mutator, cwd="tasks/missing"),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertIn("mutator.cwd", decision.reason)
self.assertIn("does not exist", decision.reason)
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_rejects_parent_segment_in_runner_cwd(self) -> None:
self.write_mutator()
marker_path = self.repo_root / "work" / "runner-executed.txt"
self.write_runner(
body=(
"from pathlib import Path\n"
f"Path({str(marker_path)!r}).write_text('ran\\n', encoding='utf-8')\n"
)
)
self.write_scorer()
task = replace(
make_task(self.task_root),
runner=replace(make_task(self.task_root).runner, cwd="../tasks/demo"),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertIn("runner.cwd", decision.reason)
self.assertFalse(marker_path.exists())
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_crashes_when_runner_cwd_does_not_exist(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
runner=replace(make_task(self.task_root).runner, cwd="tasks/missing"),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertIn("runner.cwd", decision.reason)
self.assertIn("does not exist", decision.reason)
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_path_isolation_works_when_invoked_outside_repo_root(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
outside_dir = self.repo_root.parent
original_cwd = Path.cwd()
self.addCleanup(os.chdir, str(original_cwd))
os.chdir(str(outside_dir))
decision = run_single_iteration(make_task(self.task_root.resolve()), baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
def test_keep_succeeds_when_mutator_and_runner_cwd_are_nested_under_task_root(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
mutator=replace(
make_task(self.task_root).mutator,
command="python ../../../scripts/mutate_demo.py --task-dir .. --artifact fixtures/sample.md",
cwd="tasks/demo/subdir",
),
runner=replace(
make_task(self.task_root).runner,
command=(
"python ../../../scripts/evaluate_demo.py --task-dir .. --artifact fixtures/sample.md "
"--output ../../work/run.json"
),
cwd="tasks/demo/subdir",
),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
def test_keep_succeeds_when_mutator_and_runner_cwd_are_repo_relative_outside_task_tree(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
mutator=replace(
make_task(self.task_root).mutator,
command="python mutate_demo.py --task-dir ../tasks/demo --artifact fixtures/sample.md",
cwd="scripts",
),
runner=replace(
make_task(self.task_root).runner,
command=(
"python evaluate_demo.py --task-dir ../tasks/demo --artifact fixtures/sample.md "
"--output ../../work/run.json"
),
cwd="scripts",
),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
if __name__ == "__main__":
unittest.main()