# Artifact Loop Engine Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Build a reusable optimization engine for editable text artifacts with declarative task specs, structured scoring, strict keep/discard policy, and one working `skill-quality` sample task. **Architecture:** Add a small `engine/` package that owns task parsing, artifact safety, execution, scoring, and decisions. Drive the loop from a single CLI in `scripts/run_task.py`, prove it with a deterministic `skill-quality` task, then add a bounded mutation layer that validates and accepts agent edits without opening the whole repository. **Tech Stack:** Python 3.10+, standard library, `PyYAML`, `uv`, `unittest` --- ## File Map ### New Files - `engine/__init__.py` - package marker - `engine/models.py` - dataclasses for task specs, results, and decisions - `engine/task_loader.py` - YAML parsing and validation - `engine/artifact_manager.py` - artifact resolution, snapshots, diff summaries, restore - `engine/runner.py` - subprocess runner with timeout and log capture - `engine/scorer.py` - command-based scorer and JSON normalization - `engine/decision_engine.py` - objective, constraints, and keep/discard decisions - `engine/mutation_engine.py` - bounded mutation validation and optional external mutator hook - `scripts/run_task.py` - top-level orchestration CLI - `scripts/evaluate_skill_task.py` - deterministic sample task runner - `scripts/score_skill_task.py` - deterministic sample task scorer - `tasks/skill-quality/task.yaml` - sample task spec - `tasks/skill-quality/rubric.md` - sample evaluation rubric - `tasks/skill-quality/prompt.md` - sample mutation guidance - `tasks/skill-quality/fixtures/SKILL.md` - sample artifact under optimization - `tests/__init__.py` - package marker for `unittest` - `tests/test_task_loader.py` - task loader coverage - `tests/test_artifact_manager.py` - snapshot, diff, restore coverage - `tests/test_execution_pipeline.py` - runner, scorer, decision, and CLI coverage - `tests/test_mutation_engine.py` - mutation guardrail coverage ### Modified Files - `pyproject.toml` - add `PyYAML` - `README.md` - document the new engine workflow and sample task ## Task 1: Bootstrap The Engine Package **Files:** - Modify: `pyproject.toml` - Create: `engine/__init__.py` - Create: `engine/models.py` - Create: `tests/__init__.py` - Test: `tests/test_task_loader.py` - [ ] **Step 1: Write the failing model smoke test** ```python # tests/test_task_loader.py from pathlib import Path import tempfile import unittest from engine.task_loader import load_task class TaskLoaderSmokeTest(unittest.TestCase): def test_loads_minimal_task(self) -> None: task_yaml = """ id: demo description: Demo task artifacts: include: - tasks/demo/sample.txt exclude: [] max_files_per_iteration: 1 mutation: mode: direct_edit allowed_file_types: [".txt"] max_changed_lines: 10 runner: command: "python -c \\"print('run')\\"" cwd: "." timeout_seconds: 10 scorer: type: command command: "python -c \\"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\\"" parse: format: json score_field: "score" metrics_field: "metrics" objective: primary_metric: score direction: maximize constraints: - metric: violation_count op: "<=" value: 0 policy: keep_if: better_primary tie_breakers: [] on_failure: discard budget: max_iterations: 3 max_failures: 1 logging: results_file: work/results.jsonl candidate_dir: work/candidates """ with tempfile.TemporaryDirectory() as tmp: task_path = Path(tmp) / "task.yaml" task_path.write_text(task_yaml, encoding="utf-8") task = load_task(task_path) self.assertEqual(task.id, "demo") self.assertEqual(task.objective.direction, "maximize") if __name__ == "__main__": unittest.main() ``` - [ ] **Step 2: Run the tests to verify they fail** Run: `uv run python -m unittest tests.test_mutation_engine -v` Expected: `ModuleNotFoundError: No module named 'engine.mutation_engine'` - [ ] **Step 3: Implement mutation validation and wire it into the CLI** ```python # engine/mutation_engine.py from __future__ import annotations from difflib import unified_diff from engine.models import BaselineSnapshot, TaskSpec class MutationValidationError(ValueError): """Raised when a candidate edit exceeds task limits.""" def validate_candidate_changes(task: TaskSpec, snapshot: BaselineSnapshot) -> None: changed_files = 0 changed_lines = 0 for path, before in snapshot.file_contents.items(): after = path.read_text(encoding="utf-8") if before == after: continue changed_files += 1 if path.suffix not in task.mutation.allowed_file_types: raise MutationValidationError(f"disallowed file type: {path.suffix}") diff_lines = list(unified_diff(before.splitlines(), after.splitlines(), lineterm="")) changed_lines += sum(1 for line in diff_lines if line.startswith("+") or line.startswith("-")) if changed_files > task.artifacts.max_files_per_iteration: raise MutationValidationError("too many files changed") if changed_lines > task.mutation.max_changed_lines: raise MutationValidationError("too many changed lines") ``` ```python # scripts/run_task.py from __future__ import annotations import argparse import json from pathlib import Path from engine.artifact_manager import ArtifactManager from engine.decision_engine import decide_candidate from engine.mutation_engine import MutationValidationError, validate_candidate_changes from engine.runner import run_command from engine.scorer import parse_score_output from engine.task_loader import load_task def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--task", required=True) args = parser.parse_args() root_dir = Path.cwd() task_path = (root_dir / args.task).resolve() task = load_task(task_path) manager = ArtifactManager(task) snapshot = manager.snapshot() try: validate_candidate_changes(task, snapshot) except MutationValidationError as exc: decision_payload = { "task_id": task.id, "status": "discard", "reason": str(exc), "candidate_score": None, "diff_summary": manager.diff_summary(snapshot), } results_path = (root_dir / task.logging.results_file).resolve() results_path.parent.mkdir(parents=True, exist_ok=True) with results_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(decision_payload) + "\n") return 0 run_result = run_command( command=task.runner.command, cwd=(root_dir / task.runner.cwd).resolve(), timeout_seconds=task.runner.timeout_seconds, ) score_run = run_command( command=task.scorer.command, cwd=root_dir, timeout_seconds=task.runner.timeout_seconds, ) score = parse_score_output( score_run.stdout, score_field=task.scorer.parse.score_field, metrics_field=task.scorer.parse.metrics_field, ) decision = decide_candidate( baseline=None, candidate=score, objective=task.objective, constraints=task.constraints, tie_breakers=task.policy.tie_breakers, run_result=run_result, ) results_path = (root_dir / task.logging.results_file).resolve() results_path.parent.mkdir(parents=True, exist_ok=True) with results_path.open("a", encoding="utf-8") as handle: handle.write( json.dumps( { "task_id": task.id, "status": decision.status, "reason": decision.reason, "candidate_score": decision.candidate_score, "diff_summary": manager.diff_summary(snapshot), } ) + "\n" ) return 0 if __name__ == "__main__": raise SystemExit(main()) ``` - [ ] **Step 4: Run the mutation tests** Run: `uv run python -m unittest tests.test_mutation_engine -v` Expected: `OK` - [ ] **Step 5: Commit mutation validation** ```bash git add engine/mutation_engine.py scripts/run_task.py tests/test_mutation_engine.py git commit -m "feat: add bounded mutation validation" ``` ## Task 7: Document The New Workflow **Files:** - Modify: `README.md` - Test: none - [ ] **Step 1: Update the README overview and quick start** ```markdown ## Artifact Loop Engine This repository now also includes a generic optimization engine for editable text artifacts such as prompts, skills, config files, and small code paths. ### Sample task Run the deterministic sample task: ```bash uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml ``` The task writes structured iteration results to `work/results.jsonl`. ### Engine concepts - `artifacts`: files the engine may inspect and compare - `runner`: command that executes a candidate - `scorer`: command that returns a structured score payload - `policy`: keep or discard logic based on objective and constraints ``` - [ ] **Step 2: Review the README change for consistency** Read: `README.md` Expected: the original training workflow remains documented, and the new engine section does not claim unsupported features such as multi-agent project autonomy. - [ ] **Step 3: Commit the docs update** ```bash git add README.md git commit -m "docs: add artifact loop engine usage" ``` ## Final Verification - [ ] **Step 1: Run the targeted test suite** Run: `uv run python -m unittest tests.test_task_loader tests.test_artifact_manager tests.test_execution_pipeline tests.test_mutation_engine -v` Expected: `OK` - [ ] **Step 2: Run the sample task** Run: `uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml` Expected: exit code `0` and a new line appended to `work/results.jsonl` - [ ] **Step 3: Inspect the output record** Read: `work/results.jsonl` Expected fields in the latest line: - `task_id` - `status` - `reason` - `candidate_score` - `diff_summary` - [ ] **Step 4: Commit the final verified state** ```bash git add README.md pyproject.toml engine scripts tasks tests git commit -m "feat: ship artifact loop engine v1" ``` - [ ] **Step 2: Run the smoke test to verify it fails** Run: `uv run python -m unittest tests.test_task_loader -v` Expected: `ModuleNotFoundError: No module named 'engine'` - [ ] **Step 3: Add the package scaffold and shared dataclasses** ```python # engine/__init__.py """Artifact Loop Engine package.""" ``` ```python # engine/models.py from __future__ import annotations from dataclasses import dataclass, field from pathlib import Path from typing import Any @dataclass(frozen=True) class ArtifactSpec: include: list[str] exclude: list[str] max_files_per_iteration: int @dataclass(frozen=True) class MutationSpec: mode: str allowed_file_types: list[str] max_changed_lines: int @dataclass(frozen=True) class RunnerSpec: command: str cwd: str timeout_seconds: int @dataclass(frozen=True) class ScorerParseSpec: format: str score_field: str metrics_field: str @dataclass(frozen=True) class ScorerSpec: type: str command: str parse: ScorerParseSpec @dataclass(frozen=True) class ObjectiveSpec: primary_metric: str direction: str @dataclass(frozen=True) class ConstraintSpec: metric: str op: str value: Any @dataclass(frozen=True) class PolicySpec: keep_if: str tie_breakers: list[dict[str, str]] on_failure: str @dataclass(frozen=True) class BudgetSpec: max_iterations: int max_failures: int @dataclass(frozen=True) class LoggingSpec: results_file: str candidate_dir: str @dataclass(frozen=True) class TaskSpec: id: str description: str artifacts: ArtifactSpec mutation: MutationSpec runner: RunnerSpec scorer: ScorerSpec objective: ObjectiveSpec constraints: list[ConstraintSpec] policy: PolicySpec budget: BudgetSpec logging: LoggingSpec root_dir: Path @dataclass(frozen=True) class BaselineSnapshot: file_contents: dict[Path, str] file_hashes: dict[Path, str] @dataclass(frozen=True) class RunResult: command: str cwd: Path exit_code: int runtime_seconds: float stdout: str stderr: str @dataclass(frozen=True) class ScoreResult: primary_score: float metrics: dict[str, Any] raw_output: dict[str, Any] @dataclass(frozen=True) class DecisionResult: status: str reason: str baseline_score: float | None candidate_score: float | None constraint_failures: list[str] = field(default_factory=list) ``` ```python # tests/__init__.py """Unit tests for the artifact loop engine.""" ``` - [ ] **Step 4: Add `PyYAML` to dependencies** ```toml [project] name = "autoresearch" version = "0.1.0" description = "Autonomous pretraining research swarm" readme = "README.md" requires-python = ">=3.10" dependencies = [ "kernels>=0.11.7", "matplotlib>=3.10.8", "numpy>=2.2.6", "pandas>=2.3.3", "pyarrow>=21.0.0", "PyYAML>=6.0.2", "requests>=2.32.0", "rustbpe>=0.1.0", "tiktoken>=0.11.0", "torch==2.9.1", ] ``` - [ ] **Step 5: Run the smoke test again** Run: `uv run python -m unittest tests.test_task_loader -v` Expected: FAIL with `ModuleNotFoundError: No module named 'engine.task_loader'` - [ ] **Step 6: Commit the bootstrap** ```bash git add pyproject.toml engine/__init__.py engine/models.py tests/__init__.py tests/test_task_loader.py git commit -m "feat: bootstrap artifact loop engine package" ``` ## Task 2: Implement YAML Task Loading And Validation **Files:** - Create: `engine/task_loader.py` - Modify: `tests/test_task_loader.py` - Test: `tests/test_task_loader.py` - [ ] **Step 1: Expand the failing tests to cover validation** ```python # tests/test_task_loader.py from pathlib import Path import tempfile import unittest from engine.task_loader import TaskValidationError, load_task VALID_TASK = """ id: demo description: Demo task artifacts: include: - tasks/demo/sample.txt exclude: [] max_files_per_iteration: 1 mutation: mode: direct_edit allowed_file_types: [".txt"] max_changed_lines: 10 runner: command: "python -c \\"print('run')\\"" cwd: "." timeout_seconds: 10 scorer: type: command command: "python -c \\"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\\"" parse: format: json score_field: "score" metrics_field: "metrics" objective: primary_metric: score direction: maximize constraints: - metric: violation_count op: "<=" value: 0 policy: keep_if: better_primary tie_breakers: [] on_failure: discard budget: max_iterations: 3 max_failures: 1 logging: results_file: work/results.jsonl candidate_dir: work/candidates """ class TaskLoaderTest(unittest.TestCase): def write_task(self, content: str) -> Path: temp_dir = tempfile.TemporaryDirectory() self.addCleanup(temp_dir.cleanup) task_path = Path(temp_dir.name) / "task.yaml" task_path.write_text(content, encoding="utf-8") return task_path def test_loads_minimal_task(self) -> None: task = load_task(self.write_task(VALID_TASK)) self.assertEqual(task.id, "demo") self.assertEqual(task.artifacts.max_files_per_iteration, 1) self.assertEqual(task.constraints[0].metric, "violation_count") def test_rejects_missing_required_section(self) -> None: content = VALID_TASK.replace("objective:\n primary_metric: score\n direction: maximize\n", "") with self.assertRaises(TaskValidationError) as ctx: load_task(self.write_task(content)) self.assertIn("objective", str(ctx.exception)) def test_rejects_invalid_direction(self) -> None: content = VALID_TASK.replace("direction: maximize", "direction: sideways") with self.assertRaises(TaskValidationError) as ctx: load_task(self.write_task(content)) self.assertIn("direction", str(ctx.exception)) if __name__ == "__main__": unittest.main() ``` - [ ] **Step 2: Run the tests to verify they fail** Run: `uv run python -m unittest tests.test_task_loader -v` Expected: `ModuleNotFoundError: No module named 'engine.task_loader'` - [ ] **Step 3: Implement the loader and validator** ```python # engine/task_loader.py from __future__ import annotations from pathlib import Path from typing import Any import yaml from engine.models import ( ArtifactSpec, BudgetSpec, ConstraintSpec, LoggingSpec, MutationSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec, TaskSpec, ) class TaskValidationError(ValueError): """Raised when a task spec is invalid.""" def _require_mapping(data: Any, name: str) -> dict[str, Any]: if not isinstance(data, dict): raise TaskValidationError(f"{name} must be a mapping") return data def _require_list(data: Any, name: str) -> list[Any]: if not isinstance(data, list): raise TaskValidationError(f"{name} must be a list") return data def _require_value(mapping: dict[str, Any], key: str) -> Any: if key not in mapping: raise TaskValidationError(f"missing required field: {key}") return mapping[key] def load_task(task_path: Path) -> TaskSpec: raw = yaml.safe_load(task_path.read_text(encoding="utf-8")) data = _require_mapping(raw, "task") objective = _require_mapping(_require_value(data, "objective"), "objective") direction = _require_value(objective, "direction") if direction not in {"maximize", "minimize"}: raise TaskValidationError("objective.direction must be maximize or minimize") artifacts = _require_mapping(_require_value(data, "artifacts"), "artifacts") mutation = _require_mapping(_require_value(data, "mutation"), "mutation") runner = _require_mapping(_require_value(data, "runner"), "runner") scorer = _require_mapping(_require_value(data, "scorer"), "scorer") scorer_parse = _require_mapping(_require_value(scorer, "parse"), "scorer.parse") policy = _require_mapping(_require_value(data, "policy"), "policy") budget = _require_mapping(_require_value(data, "budget"), "budget") logging = _require_mapping(_require_value(data, "logging"), "logging") constraint_specs = [] for item in _require_list(_require_value(data, "constraints"), "constraints"): mapping = _require_mapping(item, "constraint") constraint_specs.append( ConstraintSpec( metric=str(_require_value(mapping, "metric")), op=str(_require_value(mapping, "op")), value=_require_value(mapping, "value"), ) ) return TaskSpec( id=str(_require_value(data, "id")), description=str(_require_value(data, "description")), artifacts=ArtifactSpec( include=[str(item) for item in _require_list(_require_value(artifacts, "include"), "artifacts.include")], exclude=[str(item) for item in _require_list(artifacts.get("exclude", []), "artifacts.exclude")], max_files_per_iteration=int(_require_value(artifacts, "max_files_per_iteration")), ), mutation=MutationSpec( mode=str(_require_value(mutation, "mode")), allowed_file_types=[str(item) for item in _require_list(_require_value(mutation, "allowed_file_types"), "mutation.allowed_file_types")], max_changed_lines=int(_require_value(mutation, "max_changed_lines")), ), runner=RunnerSpec( command=str(_require_value(runner, "command")), cwd=str(_require_value(runner, "cwd")), timeout_seconds=int(_require_value(runner, "timeout_seconds")), ), scorer=ScorerSpec( type=str(_require_value(scorer, "type")), command=str(_require_value(scorer, "command")), parse=ScorerParseSpec( format=str(_require_value(scorer_parse, "format")), score_field=str(_require_value(scorer_parse, "score_field")), metrics_field=str(_require_value(scorer_parse, "metrics_field")), ), ), objective=ObjectiveSpec( primary_metric=str(_require_value(objective, "primary_metric")), direction=str(direction), ), constraints=constraint_specs, policy=PolicySpec( keep_if=str(_require_value(policy, "keep_if")), tie_breakers=[dict(item) for item in _require_list(policy.get("tie_breakers", []), "policy.tie_breakers")], on_failure=str(_require_value(policy, "on_failure")), ), budget=BudgetSpec( max_iterations=int(_require_value(budget, "max_iterations")), max_failures=int(_require_value(budget, "max_failures")), ), logging=LoggingSpec( results_file=str(_require_value(logging, "results_file")), candidate_dir=str(_require_value(logging, "candidate_dir")), ), root_dir=task_path.parent, ) ``` - [ ] **Step 4: Run the task loader tests to verify they pass** Run: `uv run python -m unittest tests.test_task_loader -v` Expected: `OK` - [ ] **Step 5: Commit the task loader** ```bash git add engine/task_loader.py tests/test_task_loader.py git commit -m "feat: add yaml task loader" ``` ## Task 3: Add Artifact Snapshot, Diff, And Restore **Files:** - Create: `engine/artifact_manager.py` - Create: `tests/test_artifact_manager.py` - Test: `tests/test_artifact_manager.py` - [ ] **Step 1: Write failing artifact manager tests** ```python # tests/test_artifact_manager.py from pathlib import Path import tempfile import unittest from engine.artifact_manager import ArtifactManager from engine.models import ArtifactSpec, BaselineSnapshot, TaskSpec from engine.models import BudgetSpec, ConstraintSpec, LoggingSpec, MutationSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec def make_task(root_dir: Path) -> TaskSpec: return TaskSpec( id="demo", description="Demo", artifacts=ArtifactSpec(include=["artifacts/*.md"], exclude=["artifacts/ignore.md"], max_files_per_iteration=1), mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=20), runner=RunnerSpec(command="python -c \"print('run')\"", cwd=".", timeout_seconds=10), scorer=ScorerSpec( type="command", command="python -c \"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\"", parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"), ), objective=ObjectiveSpec(primary_metric="score", direction="maximize"), constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)], policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"), budget=BudgetSpec(max_iterations=1, max_failures=1), logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"), root_dir=root_dir, ) class ArtifactManagerTest(unittest.TestCase): def test_snapshot_and_restore(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) artifact_dir = root / "artifacts" artifact_dir.mkdir() target = artifact_dir / "sample.md" target.write_text("hello\n", encoding="utf-8") manager = ArtifactManager(make_task(root)) snapshot = manager.snapshot() target.write_text("changed\n", encoding="utf-8") manager.restore(snapshot) self.assertEqual(target.read_text(encoding="utf-8"), "hello\n") def test_diff_summary_contains_changed_line(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) artifact_dir = root / "artifacts" artifact_dir.mkdir() target = artifact_dir / "sample.md" target.write_text("before\n", encoding="utf-8") manager = ArtifactManager(make_task(root)) snapshot = manager.snapshot() target.write_text("after\n", encoding="utf-8") summary = manager.diff_summary(snapshot) self.assertIn("-before", summary) self.assertIn("+after", summary) if __name__ == "__main__": unittest.main() ``` - [ ] **Step 2: Run the tests to verify they fail** Run: `uv run python -m unittest tests.test_artifact_manager -v` Expected: `ModuleNotFoundError: No module named 'engine.artifact_manager'` - [ ] **Step 3: Implement snapshot, diff, and restore** ```python # engine/artifact_manager.py from __future__ import annotations from difflib import unified_diff from fnmatch import fnmatch import hashlib from pathlib import Path from engine.models import BaselineSnapshot, TaskSpec class ArtifactManager: def __init__(self, task: TaskSpec) -> None: self.task = task def resolve_paths(self) -> list[Path]: matched: list[Path] = [] for pattern in self.task.artifacts.include: matched.extend(self.task.root_dir.glob(pattern)) files = [path for path in matched if path.is_file()] excluded = set() for path in files: relative = path.relative_to(self.task.root_dir).as_posix() if any(fnmatch(relative, pattern) for pattern in self.task.artifacts.exclude): excluded.add(path) resolved = [path for path in files if path not in excluded] return sorted(dict.fromkeys(resolved)) def snapshot(self) -> BaselineSnapshot: file_contents: dict[Path, str] = {} file_hashes: dict[Path, str] = {} for path in self.resolve_paths(): content = path.read_text(encoding="utf-8") file_contents[path] = content file_hashes[path] = hashlib.sha256(content.encode("utf-8")).hexdigest() return BaselineSnapshot(file_contents=file_contents, file_hashes=file_hashes) def restore(self, snapshot: BaselineSnapshot) -> None: for path, content in snapshot.file_contents.items(): path.write_text(content, encoding="utf-8") def diff_summary(self, snapshot: BaselineSnapshot) -> str: chunks: list[str] = [] for path, before in snapshot.file_contents.items(): after = path.read_text(encoding="utf-8") if before == after: continue diff = unified_diff( before.splitlines(), after.splitlines(), fromfile=str(path), tofile=str(path), lineterm="", ) chunks.append("\n".join(diff)) return "\n\n".join(chunks) ``` - [ ] **Step 4: Run the artifact manager tests to verify they pass** Run: `uv run python -m unittest tests.test_artifact_manager -v` Expected: `OK` - [ ] **Step 5: Commit the artifact manager** ```bash git add engine/artifact_manager.py tests/test_artifact_manager.py git commit -m "feat: add artifact snapshot and restore support" ``` ## Task 4: Implement Runner, Scorer, And Decision Engine **Files:** - Create: `engine/runner.py` - Create: `engine/scorer.py` - Create: `engine/decision_engine.py` - Create: `tests/test_execution_pipeline.py` - Test: `tests/test_execution_pipeline.py` - [ ] **Step 1: Write failing execution pipeline tests** ```python # tests/test_execution_pipeline.py from pathlib import Path import tempfile import unittest from engine.decision_engine import decide_candidate from engine.models import ConstraintSpec, ObjectiveSpec, RunResult, ScoreResult from engine.runner import run_command from engine.scorer import parse_score_output class ExecutionPipelineTest(unittest.TestCase): def test_run_command_captures_stdout(self) -> None: with tempfile.TemporaryDirectory() as tmp: result = run_command("python -c \"print('ok')\"", Path(tmp), timeout_seconds=5) self.assertEqual(result.exit_code, 0) self.assertIn("ok", result.stdout) def test_parse_score_output_reads_primary_score(self) -> None: score = parse_score_output( '{"score": 4.5, "metrics": {"violation_count": 0}}', score_field="score", metrics_field="metrics", ) self.assertEqual(score.primary_score, 4.5) self.assertEqual(score.metrics["violation_count"], 0) def test_decide_candidate_rejects_constraint_failures(self) -> None: decision = decide_candidate( baseline=3.0, candidate=ScoreResult( primary_score=5.0, metrics={"violation_count": 1}, raw_output={"score": 5.0, "metrics": {"violation_count": 1}}, ), objective=ObjectiveSpec(primary_metric="score", direction="maximize"), constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)], tie_breakers=[], run_result=RunResult( command="python -c \"print('ok')\"", cwd=Path("."), exit_code=0, runtime_seconds=0.1, stdout="ok\n", stderr="", ), ) self.assertEqual(decision.status, "discard") self.assertIn("violation_count", decision.reason) if __name__ == "__main__": unittest.main() ``` - [ ] **Step 2: Run the tests to verify they fail** Run: `uv run python -m unittest tests.test_execution_pipeline -v` Expected: `ModuleNotFoundError` for the new engine modules - [ ] **Step 3: Implement subprocess execution** ```python # engine/runner.py from __future__ import annotations from pathlib import Path import subprocess import time from engine.models import RunResult def run_command(command: str, cwd: Path, timeout_seconds: int) -> RunResult: start = time.perf_counter() completed = subprocess.run( command, cwd=str(cwd), shell=True, capture_output=True, text=True, encoding="utf-8", timeout=timeout_seconds, check=False, ) runtime = time.perf_counter() - start return RunResult( command=command, cwd=cwd, exit_code=completed.returncode, runtime_seconds=runtime, stdout=completed.stdout, stderr=completed.stderr, ) ``` ```python # engine/scorer.py from __future__ import annotations import json from engine.models import ScoreResult def parse_score_output(output: str, score_field: str, metrics_field: str) -> ScoreResult: payload = json.loads(output) metrics = payload[metrics_field] return ScoreResult( primary_score=float(payload[score_field]), metrics=dict(metrics), raw_output=payload, ) ``` ```python # engine/decision_engine.py from __future__ import annotations from engine.models import ConstraintSpec, DecisionResult, ObjectiveSpec, RunResult, ScoreResult def _constraint_failed(score: ScoreResult, constraint: ConstraintSpec) -> bool: value = score.metrics.get(constraint.metric) if constraint.op == "<=": return value > constraint.value if constraint.op == ">=": return value < constraint.value if constraint.op == "==": return value != constraint.value raise ValueError(f"unsupported constraint operator: {constraint.op}") def decide_candidate( baseline: float | None, candidate: ScoreResult, objective: ObjectiveSpec, constraints: list[ConstraintSpec], tie_breakers: list[dict[str, str]], run_result: RunResult, ) -> DecisionResult: if run_result.exit_code != 0: return DecisionResult(status="crash", reason="runner exited with non-zero status", baseline_score=baseline, candidate_score=None) failures = [constraint.metric for constraint in constraints if _constraint_failed(candidate, constraint)] if failures: return DecisionResult( status="discard", reason=f"constraint failure: {', '.join(failures)}", baseline_score=baseline, candidate_score=candidate.primary_score, constraint_failures=failures, ) if baseline is None: return DecisionResult(status="keep", reason="no baseline yet", baseline_score=None, candidate_score=candidate.primary_score) is_better = candidate.primary_score > baseline if objective.direction == "maximize" else candidate.primary_score < baseline if is_better: return DecisionResult(status="keep", reason="primary metric improved", baseline_score=baseline, candidate_score=candidate.primary_score) return DecisionResult(status="discard", reason="primary metric did not improve", baseline_score=baseline, candidate_score=candidate.primary_score) ``` - [ ] **Step 4: Run the execution pipeline tests to verify they pass** Run: `uv run python -m unittest tests.test_execution_pipeline -v` Expected: `OK` - [ ] **Step 5: Commit the execution core** ```bash git add engine/runner.py engine/scorer.py engine/decision_engine.py tests/test_execution_pipeline.py git commit -m "feat: add execution, scoring, and decision modules" ``` ## Task 5: Build The CLI And A Deterministic Sample Task **Files:** - Create: `scripts/run_task.py` - Create: `scripts/evaluate_skill_task.py` - Create: `scripts/score_skill_task.py` - Create: `tasks/skill-quality/task.yaml` - Create: `tasks/skill-quality/rubric.md` - Create: `tasks/skill-quality/prompt.md` - Create: `tasks/skill-quality/fixtures/SKILL.md` - Modify: `tests/test_execution_pipeline.py` - Test: `tests/test_execution_pipeline.py` - [ ] **Step 1: Add a failing end-to-end CLI test** ```python # tests/test_execution_pipeline.py from pathlib import Path import json import subprocess import tempfile import textwrap import unittest class RunTaskCliTest(unittest.TestCase): def test_run_task_writes_results_jsonl(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) (root / "tasks" / "skill-quality" / "fixtures").mkdir(parents=True) (root / "work").mkdir() (root / "tasks" / "skill-quality" / "fixtures" / "SKILL.md").write_text( "# Skill\n\n## Goal\nWrite clear plans.\n", encoding="utf-8", ) (root / "tasks" / "skill-quality" / "rubric.md").write_text( "Required headings: Goal, Constraints, Examples\n", encoding="utf-8", ) (root / "tasks" / "skill-quality" / "prompt.md").write_text( "Keep the skill concise and structured.\n", encoding="utf-8", ) (root / "tasks" / "skill-quality" / "task.yaml").write_text( textwrap.dedent( ''' id: skill-quality description: Score a skill file artifacts: include: - fixtures/SKILL.md exclude: [] max_files_per_iteration: 1 mutation: mode: direct_edit allowed_file_types: [".md"] max_changed_lines: 20 runner: command: "python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json" cwd: "tasks/skill-quality" timeout_seconds: 10 scorer: type: command command: "python scripts/score_skill_task.py --input work/skill-run.json" parse: format: json score_field: score metrics_field: metrics objective: primary_metric: score direction: maximize constraints: - metric: violation_count op: "<=" value: 0 policy: keep_if: better_primary tie_breakers: [] on_failure: discard budget: max_iterations: 1 max_failures: 1 logging: results_file: work/results.jsonl candidate_dir: work/candidates ''' ).strip(), encoding="utf-8", ) result = subprocess.run( ["uv", "run", "python", "scripts/run_task.py", "--task", "tasks/skill-quality/task.yaml"], cwd=root, capture_output=True, text=True, encoding="utf-8", check=False, ) self.assertEqual(result.returncode, 0, msg=result.stderr) results_path = root / "work" / "results.jsonl" self.assertTrue(results_path.exists()) payload = json.loads(results_path.read_text(encoding="utf-8").splitlines()[0]) self.assertEqual(payload["status"], "discard") self.assertGreater(payload["candidate_score"], 0) if __name__ == "__main__": unittest.main() ``` - [ ] **Step 2: Run the CLI test to verify it fails** Run: `uv run python -m unittest tests.test_execution_pipeline.RunTaskCliTest -v` Expected: FAIL with `No such file or directory: scripts/run_task.py` - [ ] **Step 3: Implement the CLI and deterministic sample task** ```python # scripts/evaluate_skill_task.py from __future__ import annotations import argparse import json from pathlib import Path def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--task-dir", required=True) parser.add_argument("--artifact", required=True) parser.add_argument("--output", required=True) args = parser.parse_args() task_dir = Path(args.task_dir).resolve() artifact_path = (task_dir / args.artifact).resolve() rubric_text = (task_dir / "rubric.md").read_text(encoding="utf-8") artifact_text = artifact_path.read_text(encoding="utf-8") required_headings = ["## Goal", "## Constraints", "## Examples"] present = sum(1 for heading in required_headings if heading in artifact_text) coverage = present / len(required_headings) lines = [line.strip() for line in artifact_text.splitlines() if line.strip()] average_line_length = sum(len(line) for line in lines) / max(len(lines), 1) clarity = max(0.0, 1.0 - max(0.0, average_line_length - 80.0) / 120.0) violation_count = 0 if "Do not" in artifact_text else 1 score = round((coverage * 70.0) + (clarity * 30.0), 4) payload = { "score": score, "metrics": { "coverage": round(coverage, 4), "clarity": round(clarity, 4), "violation_count": violation_count, "length_tokens": len(artifact_text.split()), "rubric_excerpt": rubric_text[:80], }, } output_path = Path(args.output).resolve() output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(payload), encoding="utf-8") return 0 if __name__ == "__main__": raise SystemExit(main()) ``` ```python # scripts/score_skill_task.py from __future__ import annotations import argparse from pathlib import Path def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--input", required=True) args = parser.parse_args() payload = Path(args.input).read_text(encoding="utf-8") print(payload) return 0 if __name__ == "__main__": raise SystemExit(main()) ``` ```python # scripts/run_task.py from __future__ import annotations import argparse import json from pathlib import Path from engine.artifact_manager import ArtifactManager from engine.decision_engine import decide_candidate from engine.runner import run_command from engine.scorer import parse_score_output from engine.task_loader import load_task def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--task", required=True) args = parser.parse_args() root_dir = Path.cwd() task_path = (root_dir / args.task).resolve() task = load_task(task_path) manager = ArtifactManager(task) snapshot = manager.snapshot() run_result = run_command( command=task.runner.command, cwd=(root_dir / task.runner.cwd).resolve(), timeout_seconds=task.runner.timeout_seconds, ) score_run = run_command( command=task.scorer.command, cwd=root_dir, timeout_seconds=task.runner.timeout_seconds, ) score = parse_score_output( score_run.stdout, score_field=task.scorer.parse.score_field, metrics_field=task.scorer.parse.metrics_field, ) decision = decide_candidate( baseline=None, candidate=score, objective=task.objective, constraints=task.constraints, tie_breakers=task.policy.tie_breakers, run_result=run_result, ) results_path = (root_dir / task.logging.results_file).resolve() results_path.parent.mkdir(parents=True, exist_ok=True) record = { "task_id": task.id, "status": decision.status, "reason": decision.reason, "candidate_score": decision.candidate_score, "diff_summary": manager.diff_summary(snapshot), } with results_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(record) + "\n") return 0 if __name__ == "__main__": raise SystemExit(main()) ``` ```yaml # tasks/skill-quality/task.yaml id: skill-quality description: Score one skill file against a deterministic rubric. artifacts: include: - fixtures/SKILL.md exclude: [] max_files_per_iteration: 1 mutation: mode: direct_edit allowed_file_types: [".md"] max_changed_lines: 20 runner: command: "python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json" cwd: "tasks/skill-quality" timeout_seconds: 30 scorer: type: command command: "python scripts/score_skill_task.py --input work/skill-run.json" parse: format: json score_field: score metrics_field: metrics objective: primary_metric: score direction: maximize constraints: - metric: violation_count op: "<=" value: 0 policy: keep_if: better_primary tie_breakers: [] on_failure: discard budget: max_iterations: 5 max_failures: 3 logging: results_file: work/results.jsonl candidate_dir: work/candidates ``` ```markdown # tasks/skill-quality/rubric.md # Skill Quality Rubric - Required headings: `## Goal`, `## Constraints`, `## Examples` - Must include at least one explicit prohibition using `Do not` - Prefer short, direct sentences ``` ```markdown # tasks/skill-quality/prompt.md Improve the skill file while preserving its intent. Priorities: - Add missing required sections - Keep guidance concise - Include at least one explicit prohibition - Avoid filler text ``` ```markdown # tasks/skill-quality/fixtures/SKILL.md # Planning Skill ## Goal Write clear implementation plans for multi-step work. ## Constraints Do not omit concrete commands or expected outcomes. ## Examples - Show exact test commands. - Keep tasks small and reviewable. ``` - [ ] **Step 4: Run the end-to-end tests** Run: `uv run python -m unittest tests.test_execution_pipeline -v` Expected: `OK` - [ ] **Step 5: Manually run the sample task** Run: `uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml` Expected: exit code `0` and one JSON line in `work/results.jsonl` - [ ] **Step 6: Commit the CLI and sample task** ```bash git add scripts/run_task.py scripts/evaluate_skill_task.py scripts/score_skill_task.py tasks/skill-quality tests/test_execution_pipeline.py git commit -m "feat: add artifact loop cli and sample skill task" ``` ## Task 6: Add Bounded Mutation Validation **Files:** - Create: `engine/mutation_engine.py` - Create: `tests/test_mutation_engine.py` - Modify: `scripts/run_task.py` - Test: `tests/test_mutation_engine.py` - [ ] **Step 1: Write failing mutation guard tests** ```python # tests/test_mutation_engine.py from pathlib import Path import tempfile import unittest from engine.mutation_engine import MutationValidationError, validate_candidate_changes from engine.models import ArtifactSpec, BaselineSnapshot, MutationSpec, TaskSpec from engine.models import BudgetSpec, ConstraintSpec, LoggingSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec def make_task(root_dir: Path) -> TaskSpec: return TaskSpec( id="demo", description="Demo", artifacts=ArtifactSpec(include=["artifacts/*.md"], exclude=[], max_files_per_iteration=1), mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=3), runner=RunnerSpec(command="python -c \"print('run')\"", cwd=".", timeout_seconds=10), scorer=ScorerSpec( type="command", command="python -c \"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\"", parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"), ), objective=ObjectiveSpec(primary_metric="score", direction="maximize"), constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)], policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"), budget=BudgetSpec(max_iterations=1, max_failures=1), logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"), root_dir=root_dir, ) class MutationEngineTest(unittest.TestCase): def test_rejects_too_many_changed_lines(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) artifact_dir = root / "artifacts" artifact_dir.mkdir() target = artifact_dir / "sample.md" target.write_text("a\nb\nc\n", encoding="utf-8") snapshot = BaselineSnapshot(file_contents={target: "a\nb\nc\n"}, file_hashes={target: "hash"}) target.write_text("a\nx\ny\nz\n", encoding="utf-8") with self.assertRaises(MutationValidationError): validate_candidate_changes(make_task(root), snapshot) def test_rejects_disallowed_extension(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) artifact_dir = root / "artifacts" artifact_dir.mkdir() target = artifact_dir / "sample.txt" target.write_text("before\n", encoding="utf-8") snapshot = BaselineSnapshot(file_contents={target: "before\n"}, file_hashes={target: "hash"}) target.write_text("after\n", encoding="utf-8") with self.assertRaises(MutationValidationError): validate_candidate_changes(make_task(root), snapshot) if __name__ == "__main__": unittest.main() ``` ```