import json from pathlib import Path import shutil import subprocess import tempfile import unittest from engine.decision_engine import decide_candidate from engine.models import ConstraintSpec, ObjectiveSpec, RunResult, ScoreResult from engine.runner import run_command from engine.scorer import parse_score_output class ExecutionPipelineTest(unittest.TestCase): def test_run_command_captures_stdout(self) -> None: with tempfile.TemporaryDirectory() as tmp: result = run_command("python -c \"print('ok')\"", Path(tmp), timeout_seconds=5) self.assertEqual(result.exit_code, 0) self.assertIn("ok", result.stdout) def test_run_command_returns_result_on_timeout(self) -> None: with tempfile.TemporaryDirectory() as tmp: result = run_command( "python -c \"import time; time.sleep(2)\"", Path(tmp), timeout_seconds=1, ) self.assertNotEqual(result.exit_code, 0) self.assertIn("timed out", result.stderr.lower()) def test_parse_score_output_reads_primary_score(self) -> None: score = parse_score_output( '{"score": 4.5, "metrics": {"violation_count": 0}}', score_field="score", metrics_field="metrics", ) self.assertEqual(score.primary_score, 4.5) self.assertEqual(score.metrics["violation_count"], 0) def test_decide_candidate_rejects_constraint_failures(self) -> None: decision = decide_candidate( baseline=3.0, candidate=ScoreResult( primary_score=5.0, metrics={"violation_count": 1}, raw_output={"score": 5.0, "metrics": {"violation_count": 1}}, ), objective=ObjectiveSpec(primary_metric="score", direction="maximize"), constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)], tie_breakers=[], run_result=RunResult( command="python -c \"print('ok')\"", cwd=Path("."), exit_code=0, runtime_seconds=0.1, stdout="ok\n", stderr="", ), ) self.assertEqual(decision.status, "discard") self.assertIn("violation_count", decision.reason) class RunTaskCliTest(unittest.TestCase): def _copy_repo_layout(self, destination: Path) -> None: source_root = Path(__file__).resolve().parents[1] shutil.copytree( source_root / "engine", destination / "engine", ignore=shutil.ignore_patterns("__pycache__"), ) for relative_dir in ("scripts", "tasks"): source_dir = source_root / relative_dir if source_dir.exists(): shutil.copytree( source_dir, destination / relative_dir, ignore=shutil.ignore_patterns("__pycache__"), ) def test_run_task_cli_writes_results_jsonl(self) -> None: with tempfile.TemporaryDirectory() as tmp: temp_root = Path(tmp) self._copy_repo_layout(temp_root) completed = subprocess.run( ["uv", "run", "python", "scripts/run_task.py", "--task", "tasks/skill-quality/task.yaml"], cwd=str(temp_root), capture_output=True, text=True, encoding="utf-8", check=False, ) self.assertEqual(completed.returncode, 0, msg=completed.stderr) results_path = temp_root / "work" / "results.jsonl" self.assertTrue(results_path.exists()) lines = results_path.read_text(encoding="utf-8").splitlines() self.assertEqual(len(lines), 1) record = json.loads(lines[0]) self.assertEqual(record["task_id"], "skill-quality") self.assertEqual(record["status"], "keep") self.assertEqual(record["reason"], "no baseline available") self.assertEqual(record["candidate_score"], 4.0) self.assertEqual(record["diff_summary"], "") def test_run_task_cli_uses_repo_root_for_absolute_task_path(self) -> None: with tempfile.TemporaryDirectory() as tmp: temp_root = Path(tmp) self._copy_repo_layout(temp_root) outside_root = temp_root / "outside" outside_root.mkdir() completed = subprocess.run( [ "uv", "run", "python", str(temp_root / "scripts" / "run_task.py"), "--task", str(temp_root / "tasks" / "skill-quality" / "task.yaml"), ], cwd=str(outside_root), capture_output=True, text=True, encoding="utf-8", check=False, ) self.assertEqual(completed.returncode, 0, msg=completed.stderr) results_path = temp_root / "work" / "results.jsonl" self.assertTrue(results_path.exists()) lines = results_path.read_text(encoding="utf-8").splitlines() self.assertEqual(len(lines), 1) record = json.loads(lines[0]) self.assertEqual(record["task_id"], "skill-quality") self.assertEqual(record["status"], "keep") def test_run_task_cli_records_scorer_failure(self) -> None: with tempfile.TemporaryDirectory() as tmp: temp_root = Path(tmp) self._copy_repo_layout(temp_root) task_dir = temp_root / "tasks" / "scorer-failure" fixture_dir = task_dir / "fixtures" fixture_dir.mkdir(parents=True) (fixture_dir / "SKILL.md").write_text("# Fixture\n", encoding="utf-8") (task_dir / "task.yaml").write_text( "\n".join( [ "id: scorer-failure", "description: Scorer failure fixture.", "artifacts:", " include:", " - fixtures/SKILL.md", " exclude: []", " max_files_per_iteration: 1", "mutation:", " mode: direct_edit", " allowed_file_types:", " - .md", " max_changed_lines: 20", "runner:", " command: python -c \"print('runner ok')\"", " cwd: tasks/scorer-failure", " timeout_seconds: 30", "scorer:", " type: command", " command: python -c \"import sys; sys.stderr.write('boom\\n'); raise SystemExit(7)\"", " parse:", " format: json", " score_field: score", " metrics_field: metrics", "objective:", " primary_metric: score", " direction: maximize", "constraints: []", "policy:", " keep_if: better_primary", " tie_breakers: []", " on_failure: discard", "budget:", " max_iterations: 1", " max_failures: 1", "logging:", " results_file: work/results.jsonl", " candidate_dir: work/candidates", "", ] ), encoding="utf-8", ) completed = subprocess.run( ["uv", "run", "python", "scripts/run_task.py", "--task", "tasks/scorer-failure/task.yaml"], cwd=str(temp_root), capture_output=True, text=True, encoding="utf-8", check=False, ) self.assertEqual(completed.returncode, 0, msg=completed.stderr) results_path = temp_root / "work" / "results.jsonl" self.assertTrue(results_path.exists()) lines = results_path.read_text(encoding="utf-8").splitlines() self.assertEqual(len(lines), 1) record = json.loads(lines[0]) self.assertEqual(record["task_id"], "scorer-failure") self.assertEqual(record["status"], "crash") self.assertEqual(record["reason"], "scorer failed with exit code 7") self.assertIsNone(record["candidate_score"]) def test_run_task_cli_records_score_parse_failure(self) -> None: with tempfile.TemporaryDirectory() as tmp: temp_root = Path(tmp) self._copy_repo_layout(temp_root) task_dir = temp_root / "tasks" / "score-parse-failure" fixture_dir = task_dir / "fixtures" fixture_dir.mkdir(parents=True) (fixture_dir / "SKILL.md").write_text("# Fixture\n", encoding="utf-8") (task_dir / "task.yaml").write_text( "\n".join( [ "id: score-parse-failure", "description: Score parse failure fixture.", "artifacts:", " include:", " - fixtures/SKILL.md", " exclude: []", " max_files_per_iteration: 1", "mutation:", " mode: direct_edit", " allowed_file_types:", " - .md", " max_changed_lines: 20", "runner:", " command: python -c \"print('runner ok')\"", " cwd: tasks/score-parse-failure", " timeout_seconds: 30", "scorer:", " type: command", " command: python -c \"print('not-json')\"", " parse:", " format: json", " score_field: score", " metrics_field: metrics", "objective:", " primary_metric: score", " direction: maximize", "constraints: []", "policy:", " keep_if: better_primary", " tie_breakers: []", " on_failure: discard", "budget:", " max_iterations: 1", " max_failures: 1", "logging:", " results_file: work/results.jsonl", " candidate_dir: work/candidates", "", ] ), encoding="utf-8", ) completed = subprocess.run( ["uv", "run", "python", "scripts/run_task.py", "--task", "tasks/score-parse-failure/task.yaml"], cwd=str(temp_root), capture_output=True, text=True, encoding="utf-8", check=False, ) self.assertEqual(completed.returncode, 0, msg=completed.stderr) results_path = temp_root / "work" / "results.jsonl" self.assertTrue(results_path.exists()) lines = results_path.read_text(encoding="utf-8").splitlines() self.assertEqual(len(lines), 1) record = json.loads(lines[0]) self.assertEqual(record["task_id"], "score-parse-failure") self.assertEqual(record["status"], "crash") self.assertIn("score parse failed:", record["reason"]) self.assertIsNone(record["candidate_score"]) if __name__ == "__main__": unittest.main()