from __future__ import annotations import os import tempfile import unittest from dataclasses import replace from pathlib import Path from engine.models import ( ArtifactSpec, BudgetSpec, LoggingSpec, MutationSpec, MutatorSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec, TaskSpec, ) from engine.orchestrator import run_single_iteration def make_task(task_root: Path, max_changed_lines: int = 20, runner_command: str | None = None) -> TaskSpec: return TaskSpec( id="demo", description="Demo task", artifacts=ArtifactSpec(include=["fixtures/*.md"], exclude=[], max_files_per_iteration=1), mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=max_changed_lines), mutator=MutatorSpec( type="command", command="python ../../scripts/mutate_demo.py --task-dir . --artifact fixtures/sample.md", cwd="tasks/demo", timeout_seconds=30, ), runner=RunnerSpec( command=runner_command or "python ../../scripts/evaluate_demo.py --task-dir . --artifact fixtures/sample.md --output ../../work/run.json", cwd="tasks/demo", timeout_seconds=30, ), scorer=ScorerSpec( type="command", command="python scripts/score_demo.py --input work/run.json", timeout_seconds=30, parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"), ), objective=ObjectiveSpec(primary_metric="score", direction="maximize"), constraints=[], policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"), budget=BudgetSpec(max_iterations=1, max_failures=1), logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"), root_dir=task_root, ) class OrchestratorTest(unittest.TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.addCleanup(self.temp_dir.cleanup) self.repo_root = Path(self.temp_dir.name) self.task_root = self.repo_root / "tasks" / "demo" (self.task_root / "fixtures").mkdir(parents=True) (self.task_root / "subdir").mkdir() (self.repo_root / "scripts").mkdir() (self.repo_root / "work").mkdir() (self.task_root / "fixtures" / "sample.md").write_text("# Original\n", encoding="utf-8") def write_mutator(self, body: str | None = None) -> None: script = body or ( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "args = parser.parse_args()\n" "task_dir = Path(args.task_dir).resolve()\n" "(task_dir / args.artifact).write_text('# Candidate\\n', encoding='utf-8')\n" ) (self.repo_root / "scripts" / "mutate_demo.py").write_text(script, encoding="utf-8") def write_runner(self, score: float = 2.0, body: str | None = None) -> None: script = body or ( "from pathlib import Path\n" "import argparse\n" "import json\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "parser.add_argument('--output')\n" "args = parser.parse_args()\n" "task_dir = Path(args.task_dir).resolve()\n" "artifact_path = task_dir / args.artifact\n" "payload = {'score': " + repr(score) + ", 'metrics': {'artifact_text': artifact_path.read_text(encoding='utf-8')}}\n" "output_path = (task_dir / args.output).resolve()\n" "output_path.parent.mkdir(parents=True, exist_ok=True)\n" "output_path.write_text(json.dumps(payload), encoding='utf-8')\n" ) (self.repo_root / "scripts" / "evaluate_demo.py").write_text(script, encoding="utf-8") def write_scorer(self) -> None: (self.repo_root / "scripts" / "score_demo.py").write_text( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--input')\n" "args = parser.parse_args()\n" "print(Path(args.input).read_text(encoding='utf-8'))\n", encoding="utf-8", ) def test_scorer_uses_its_own_timeout_seconds(self) -> None: self.write_mutator() self.write_runner(score=2.0) self.write_scorer() (self.repo_root / "scripts" / "score_demo.py").write_text( "from pathlib import Path\n" "import argparse\n" "import time\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--input')\n" "args = parser.parse_args()\n" "time.sleep(2)\n" "print(Path(args.input).read_text(encoding='utf-8'))\n", encoding="utf-8", ) task = replace( make_task(self.task_root), runner=replace(make_task(self.task_root).runner, timeout_seconds=5), scorer=replace(make_task(self.task_root).scorer, timeout_seconds=1), ) decision = run_single_iteration(task, baseline_score=1.0) self.assertEqual(decision.status, "crash") self.assertEqual(decision.reason, "scorer failed with exit code 124") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_keep_uses_repo_relative_cwd_and_syncs_candidate_back(self) -> None: self.write_mutator() self.write_runner(score=2.0) self.write_scorer() decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0) self.assertEqual(decision.status, "keep") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n") def test_crash_leaves_main_workspace_unchanged(self) -> None: self.write_mutator() self.write_runner( body=( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "parser.add_argument('--output')\n" "args = parser.parse_args()\n" "task_dir = Path(args.task_dir).resolve()\n" "(task_dir / args.artifact).write_text('# Runner Modified\\n', encoding='utf-8')\n" "raise SystemExit(9)\n" ) ) self.write_scorer() decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0) self.assertEqual(decision.status, "crash") self.assertEqual(decision.reason, "command failed with exit code 9") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_discard_leaves_main_workspace_unchanged_when_candidate_does_not_improve(self) -> None: self.write_mutator() self.write_runner(score=0.5) self.write_scorer() decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0) self.assertEqual(decision.status, "discard") self.assertEqual(decision.reason, "candidate did not improve primary score") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_validation_discard_happens_before_runner_execution(self) -> None: self.write_mutator( body=( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "args = parser.parse_args()\n" "task_dir = Path(args.task_dir).resolve()\n" "(task_dir / args.artifact).write_text('# Candidate\\nextra line\\n', encoding='utf-8')\n" ) ) runner_marker = self.repo_root / "work" / "runner-executed.txt" self.write_runner( body=( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "parser.add_argument('--output')\n" "args = parser.parse_args()\n" "marker = Path('../../work/runner-executed.txt').resolve()\n" "marker.parent.mkdir(parents=True, exist_ok=True)\n" "marker.write_text('ran\\n', encoding='utf-8')\n" "raise SystemExit(0)\n" ) ) self.write_scorer() decision = run_single_iteration(make_task(self.task_root, max_changed_lines=1), baseline_score=1.0) self.assertEqual(decision.status, "discard") self.assertIn("too many changed lines", decision.reason) self.assertFalse(runner_marker.exists()) self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_validation_rejects_non_artifact_mutation_before_runner_execution(self) -> None: self.write_mutator( body=( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "args = parser.parse_args()\n" "task_dir = Path(args.task_dir).resolve()\n" "(task_dir / args.artifact).write_text('# Candidate\\n', encoding='utf-8')\n" "external_path = (task_dir / '../../scripts/payload.txt').resolve()\n" "external_path.write_text('mutated\\n', encoding='utf-8')\n" ) ) runner_marker = self.repo_root / "work" / "runner-executed.txt" self.write_runner( body=( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "parser.add_argument('--output')\n" "parser.add_argument('--score')\n" "args = parser.parse_args()\n" "marker = Path('../../work/runner-executed.txt').resolve()\n" "marker.parent.mkdir(parents=True, exist_ok=True)\n" "marker.write_text('ran\\n', encoding='utf-8')\n" ) ) self.write_scorer() decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0) self.assertEqual(decision.status, "discard") self.assertIn("non-artifact", decision.reason) self.assertFalse(runner_marker.exists()) self.assertFalse((self.repo_root / "scripts" / "payload.txt").exists()) self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_keep_revalidates_final_candidate_state_before_sync_back(self) -> None: self.write_mutator() self.write_runner( body=( "from pathlib import Path\n" "import argparse\n" "import json\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "parser.add_argument('--output')\n" "args = parser.parse_args()\n" "task_dir = Path(args.task_dir).resolve()\n" "artifact_path = task_dir / args.artifact\n" "artifact_path.write_text('# Candidate\\nextra line\\n', encoding='utf-8')\n" "payload = {'score': 2.0, 'metrics': {'artifact_text': artifact_path.read_text(encoding='utf-8')}}\n" "output_path = (task_dir / args.output).resolve()\n" "output_path.parent.mkdir(parents=True, exist_ok=True)\n" "output_path.write_text(json.dumps(payload), encoding='utf-8')\n" ) ) self.write_scorer() decision = run_single_iteration(make_task(self.task_root, max_changed_lines=2), baseline_score=1.0) self.assertEqual(decision.status, "discard") self.assertIn("too many changed lines", decision.reason) self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_keep_ignores_populated_runtime_work_directory(self) -> None: runtime_path = self.repo_root / "work" / "cache" / "seed.txt" runtime_path.parent.mkdir(parents=True) runtime_path.write_text("seed\n", encoding="utf-8") self.write_mutator( body=( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "args = parser.parse_args()\n" "task_dir = Path(args.task_dir).resolve()\n" "artifact_path = task_dir / args.artifact\n" "runtime_path = (task_dir / '../../work/cache/seed.txt').resolve()\n" "artifact_text = '# Candidate\\n'\n" "if runtime_path.exists():\n" " runtime_path.write_text('mutated\\n', encoding='utf-8')\n" " artifact_text = '# Candidate saw runtime state\\n'\n" "artifact_path.write_text(artifact_text, encoding='utf-8')\n" ) ) self.write_runner(score=2.0) self.write_scorer() decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0) self.assertEqual(decision.status, "keep") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n") self.assertEqual(runtime_path.read_text(encoding="utf-8"), "seed\n") def test_keep_ignores_runtime_cache_directories_in_copy_and_hash_validation(self) -> None: venv_path = self.repo_root / ".venv" / "seed.txt" venv_path.parent.mkdir(parents=True) venv_path.write_text("seed\n", encoding="utf-8") pytest_cache_path = self.repo_root / ".pytest_cache" / "seed.txt" pytest_cache_path.parent.mkdir(parents=True) pytest_cache_path.write_text("seed\n", encoding="utf-8") self.write_mutator( body=( "from pathlib import Path\n" "import argparse\n" "parser = argparse.ArgumentParser()\n" "parser.add_argument('--task-dir')\n" "parser.add_argument('--artifact')\n" "args = parser.parse_args()\n" "task_dir = Path(args.task_dir).resolve()\n" "artifact_path = task_dir / args.artifact\n" "venv_path = (task_dir / '../../.venv/seed.txt').resolve()\n" "pytest_cache_path = (task_dir / '../../.pytest_cache/seed.txt').resolve()\n" "artifact_text = '# Candidate\\n'\n" "if venv_path.exists() or pytest_cache_path.exists():\n" " artifact_text = '# Candidate saw runtime cache\\n'\n" " if venv_path.exists():\n" " venv_path.write_text('mutated\\n', encoding='utf-8')\n" " if pytest_cache_path.exists():\n" " pytest_cache_path.write_text('mutated\\n', encoding='utf-8')\n" "artifact_path.write_text(artifact_text, encoding='utf-8')\n" ) ) self.write_runner(score=2.0) self.write_scorer() decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0) self.assertEqual(decision.status, "keep") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n") self.assertEqual(venv_path.read_text(encoding="utf-8"), "seed\n") self.assertEqual(pytest_cache_path.read_text(encoding="utf-8"), "seed\n") def test_rejects_parent_segment_in_mutator_cwd(self) -> None: marker_path = self.repo_root / "work" / "mutator-executed.txt" self.write_mutator( body=( "from pathlib import Path\n" f"Path({str(marker_path)!r}).write_text('ran\\n', encoding='utf-8')\n" ) ) self.write_runner(score=2.0) self.write_scorer() task = replace( make_task(self.task_root), mutator=replace(make_task(self.task_root).mutator, cwd="../tasks/demo"), ) decision = run_single_iteration(task, baseline_score=1.0) self.assertEqual(decision.status, "crash") self.assertIn("mutator.cwd", decision.reason) self.assertFalse(marker_path.exists()) self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_crashes_when_mutator_cwd_does_not_exist(self) -> None: self.write_mutator() self.write_runner(score=2.0) self.write_scorer() task = replace( make_task(self.task_root), mutator=replace(make_task(self.task_root).mutator, cwd="tasks/missing"), ) decision = run_single_iteration(task, baseline_score=1.0) self.assertEqual(decision.status, "crash") self.assertIn("mutator.cwd", decision.reason) self.assertIn("does not exist", decision.reason) self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_rejects_parent_segment_in_runner_cwd(self) -> None: self.write_mutator() marker_path = self.repo_root / "work" / "runner-executed.txt" self.write_runner( body=( "from pathlib import Path\n" f"Path({str(marker_path)!r}).write_text('ran\\n', encoding='utf-8')\n" ) ) self.write_scorer() task = replace( make_task(self.task_root), runner=replace(make_task(self.task_root).runner, cwd="../tasks/demo"), ) decision = run_single_iteration(task, baseline_score=1.0) self.assertEqual(decision.status, "crash") self.assertIn("runner.cwd", decision.reason) self.assertFalse(marker_path.exists()) self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_crashes_when_runner_cwd_does_not_exist(self) -> None: self.write_mutator() self.write_runner(score=2.0) self.write_scorer() task = replace( make_task(self.task_root), runner=replace(make_task(self.task_root).runner, cwd="tasks/missing"), ) decision = run_single_iteration(task, baseline_score=1.0) self.assertEqual(decision.status, "crash") self.assertIn("runner.cwd", decision.reason) self.assertIn("does not exist", decision.reason) self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n") def test_path_isolation_works_when_invoked_outside_repo_root(self) -> None: self.write_mutator() self.write_runner(score=2.0) self.write_scorer() outside_dir = self.repo_root.parent original_cwd = Path.cwd() self.addCleanup(os.chdir, str(original_cwd)) os.chdir(str(outside_dir)) decision = run_single_iteration(make_task(self.task_root.resolve()), baseline_score=1.0) self.assertEqual(decision.status, "keep") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n") def test_keep_succeeds_when_mutator_and_runner_cwd_are_nested_under_task_root(self) -> None: self.write_mutator() self.write_runner(score=2.0) self.write_scorer() task = replace( make_task(self.task_root), mutator=replace( make_task(self.task_root).mutator, command="python ../../../scripts/mutate_demo.py --task-dir .. --artifact fixtures/sample.md", cwd="tasks/demo/subdir", ), runner=replace( make_task(self.task_root).runner, command=( "python ../../../scripts/evaluate_demo.py --task-dir .. --artifact fixtures/sample.md " "--output ../../work/run.json" ), cwd="tasks/demo/subdir", ), ) decision = run_single_iteration(task, baseline_score=1.0) self.assertEqual(decision.status, "keep") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n") def test_keep_succeeds_when_mutator_and_runner_cwd_are_repo_relative_outside_task_tree(self) -> None: self.write_mutator() self.write_runner(score=2.0) self.write_scorer() task = replace( make_task(self.task_root), mutator=replace( make_task(self.task_root).mutator, command="python mutate_demo.py --task-dir ../tasks/demo --artifact fixtures/sample.md", cwd="scripts", ), runner=replace( make_task(self.task_root).runner, command=( "python evaluate_demo.py --task-dir ../tasks/demo --artifact fixtures/sample.md " "--output ../../work/run.json" ), cwd="scripts", ), ) decision = run_single_iteration(task, baseline_score=1.0) self.assertEqual(decision.status, "keep") self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n") if __name__ == "__main__": unittest.main()