diff --git a/.gitignore b/.gitignore index 99c30f5..ca52d72 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,6 @@ dev/ # Results file results.tsv + +# Runtime outputs +work/ diff --git a/README.md b/README.md index 34b3cd8..a8130d4 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,8 @@ The current CLI runs one baseline-aware single iteration: 5. Run and score the candidate in the sandbox. 6. Keep or discard the candidate without mutating the main workspace unless the candidate is accepted. +For AI-oriented usage guidance, see [USAGE.md](USAGE.md). + Optional sample task command: ```bash diff --git a/docs/superpowers/plans/2026-04-02-artifact-loop-engine.md b/docs/superpowers/plans/2026-04-02-artifact-loop-engine.md new file mode 100644 index 0000000..4062a37 --- /dev/null +++ b/docs/superpowers/plans/2026-04-02-artifact-loop-engine.md @@ -0,0 +1,1502 @@ +# Artifact Loop Engine Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build a reusable optimization engine for editable text artifacts with declarative task specs, structured scoring, strict keep/discard policy, and one working `skill-quality` sample task. + +**Architecture:** Add a small `engine/` package that owns task parsing, artifact safety, execution, scoring, and decisions. Drive the loop from a single CLI in `scripts/run_task.py`, prove it with a deterministic `skill-quality` task, then add a bounded mutation layer that validates and accepts agent edits without opening the whole repository. + +**Tech Stack:** Python 3.10+, standard library, `PyYAML`, `uv`, `unittest` + +--- + +## File Map + +### New Files + +- `engine/__init__.py` - package marker +- `engine/models.py` - dataclasses for task specs, results, and decisions +- `engine/task_loader.py` - YAML parsing and validation +- `engine/artifact_manager.py` - artifact resolution, snapshots, diff summaries, restore +- `engine/runner.py` - subprocess runner with timeout and log capture +- `engine/scorer.py` - command-based scorer and JSON normalization +- `engine/decision_engine.py` - objective, constraints, and keep/discard decisions +- `engine/mutation_engine.py` - bounded mutation validation and optional external mutator hook +- `scripts/run_task.py` - top-level orchestration CLI +- `scripts/evaluate_skill_task.py` - deterministic sample task runner +- `scripts/score_skill_task.py` - deterministic sample task scorer +- `tasks/skill-quality/task.yaml` - sample task spec +- `tasks/skill-quality/rubric.md` - sample evaluation rubric +- `tasks/skill-quality/prompt.md` - sample mutation guidance +- `tasks/skill-quality/fixtures/SKILL.md` - sample artifact under optimization +- `tests/__init__.py` - package marker for `unittest` +- `tests/test_task_loader.py` - task loader coverage +- `tests/test_artifact_manager.py` - snapshot, diff, restore coverage +- `tests/test_execution_pipeline.py` - runner, scorer, decision, and CLI coverage +- `tests/test_mutation_engine.py` - mutation guardrail coverage + +### Modified Files + +- `pyproject.toml` - add `PyYAML` +- `README.md` - document the new engine workflow and sample task + +## Task 1: Bootstrap The Engine Package + +**Files:** +- Modify: `pyproject.toml` +- Create: `engine/__init__.py` +- Create: `engine/models.py` +- Create: `tests/__init__.py` +- Test: `tests/test_task_loader.py` + +- [ ] **Step 1: Write the failing model smoke test** + +```python +# tests/test_task_loader.py +from pathlib import Path +import tempfile +import unittest + +from engine.task_loader import load_task + + +class TaskLoaderSmokeTest(unittest.TestCase): + def test_loads_minimal_task(self) -> None: + task_yaml = """ +id: demo +description: Demo task +artifacts: + include: + - tasks/demo/sample.txt + exclude: [] + max_files_per_iteration: 1 +mutation: + mode: direct_edit + allowed_file_types: [".txt"] + max_changed_lines: 10 +runner: + command: "python -c \\"print('run')\\"" + cwd: "." + timeout_seconds: 10 +scorer: + type: command + command: "python -c \\"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\\"" + parse: + format: json + score_field: "score" + metrics_field: "metrics" +objective: + primary_metric: score + direction: maximize +constraints: + - metric: violation_count + op: "<=" + value: 0 +policy: + keep_if: better_primary + tie_breakers: [] + on_failure: discard +budget: + max_iterations: 3 + max_failures: 1 +logging: + results_file: work/results.jsonl + candidate_dir: work/candidates +""" + with tempfile.TemporaryDirectory() as tmp: + task_path = Path(tmp) / "task.yaml" + task_path.write_text(task_yaml, encoding="utf-8") + task = load_task(task_path) + self.assertEqual(task.id, "demo") + self.assertEqual(task.objective.direction, "maximize") + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run the tests to verify they fail** + +Run: `uv run python -m unittest tests.test_mutation_engine -v` +Expected: `ModuleNotFoundError: No module named 'engine.mutation_engine'` + +- [ ] **Step 3: Implement mutation validation and wire it into the CLI** + +```python +# engine/mutation_engine.py +from __future__ import annotations + +from difflib import unified_diff + +from engine.models import BaselineSnapshot, TaskSpec + + +class MutationValidationError(ValueError): + """Raised when a candidate edit exceeds task limits.""" + + +def validate_candidate_changes(task: TaskSpec, snapshot: BaselineSnapshot) -> None: + changed_files = 0 + changed_lines = 0 + + for path, before in snapshot.file_contents.items(): + after = path.read_text(encoding="utf-8") + if before == after: + continue + changed_files += 1 + if path.suffix not in task.mutation.allowed_file_types: + raise MutationValidationError(f"disallowed file type: {path.suffix}") + diff_lines = list(unified_diff(before.splitlines(), after.splitlines(), lineterm="")) + changed_lines += sum(1 for line in diff_lines if line.startswith("+") or line.startswith("-")) + + if changed_files > task.artifacts.max_files_per_iteration: + raise MutationValidationError("too many files changed") + if changed_lines > task.mutation.max_changed_lines: + raise MutationValidationError("too many changed lines") +``` + +```python +# scripts/run_task.py +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from engine.artifact_manager import ArtifactManager +from engine.decision_engine import decide_candidate +from engine.mutation_engine import MutationValidationError, validate_candidate_changes +from engine.runner import run_command +from engine.scorer import parse_score_output +from engine.task_loader import load_task + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--task", required=True) + args = parser.parse_args() + + root_dir = Path.cwd() + task_path = (root_dir / args.task).resolve() + task = load_task(task_path) + manager = ArtifactManager(task) + snapshot = manager.snapshot() + + try: + validate_candidate_changes(task, snapshot) + except MutationValidationError as exc: + decision_payload = { + "task_id": task.id, + "status": "discard", + "reason": str(exc), + "candidate_score": None, + "diff_summary": manager.diff_summary(snapshot), + } + results_path = (root_dir / task.logging.results_file).resolve() + results_path.parent.mkdir(parents=True, exist_ok=True) + with results_path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(decision_payload) + "\n") + return 0 + + run_result = run_command( + command=task.runner.command, + cwd=(root_dir / task.runner.cwd).resolve(), + timeout_seconds=task.runner.timeout_seconds, + ) + score_run = run_command( + command=task.scorer.command, + cwd=root_dir, + timeout_seconds=task.runner.timeout_seconds, + ) + score = parse_score_output( + score_run.stdout, + score_field=task.scorer.parse.score_field, + metrics_field=task.scorer.parse.metrics_field, + ) + decision = decide_candidate( + baseline=None, + candidate=score, + objective=task.objective, + constraints=task.constraints, + tie_breakers=task.policy.tie_breakers, + run_result=run_result, + ) + results_path = (root_dir / task.logging.results_file).resolve() + results_path.parent.mkdir(parents=True, exist_ok=True) + with results_path.open("a", encoding="utf-8") as handle: + handle.write( + json.dumps( + { + "task_id": task.id, + "status": decision.status, + "reason": decision.reason, + "candidate_score": decision.candidate_score, + "diff_summary": manager.diff_summary(snapshot), + } + ) + + "\n" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) +``` + +- [ ] **Step 4: Run the mutation tests** + +Run: `uv run python -m unittest tests.test_mutation_engine -v` +Expected: `OK` + +- [ ] **Step 5: Commit mutation validation** + +```bash +git add engine/mutation_engine.py scripts/run_task.py tests/test_mutation_engine.py +git commit -m "feat: add bounded mutation validation" +``` + +## Task 7: Document The New Workflow + +**Files:** +- Modify: `README.md` +- Test: none + +- [ ] **Step 1: Update the README overview and quick start** + +```markdown +## Artifact Loop Engine + +This repository now also includes a generic optimization engine for editable text artifacts such as prompts, skills, config files, and small code paths. + +### Sample task + +Run the deterministic sample task: + +```bash +uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml +``` + +The task writes structured iteration results to `work/results.jsonl`. + +### Engine concepts + +- `artifacts`: files the engine may inspect and compare +- `runner`: command that executes a candidate +- `scorer`: command that returns a structured score payload +- `policy`: keep or discard logic based on objective and constraints +``` + +- [ ] **Step 2: Review the README change for consistency** + +Read: `README.md` +Expected: the original training workflow remains documented, and the new engine section does not claim unsupported features such as multi-agent project autonomy. + +- [ ] **Step 3: Commit the docs update** + +```bash +git add README.md +git commit -m "docs: add artifact loop engine usage" +``` + +## Final Verification + +- [ ] **Step 1: Run the targeted test suite** + +Run: `uv run python -m unittest tests.test_task_loader tests.test_artifact_manager tests.test_execution_pipeline tests.test_mutation_engine -v` +Expected: `OK` + +- [ ] **Step 2: Run the sample task** + +Run: `uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml` +Expected: exit code `0` and a new line appended to `work/results.jsonl` + +- [ ] **Step 3: Inspect the output record** + +Read: `work/results.jsonl` +Expected fields in the latest line: +- `task_id` +- `status` +- `reason` +- `candidate_score` +- `diff_summary` + +- [ ] **Step 4: Commit the final verified state** + +```bash +git add README.md pyproject.toml engine scripts tasks tests +git commit -m "feat: ship artifact loop engine v1" +``` + +- [ ] **Step 2: Run the smoke test to verify it fails** + +Run: `uv run python -m unittest tests.test_task_loader -v` +Expected: `ModuleNotFoundError: No module named 'engine'` + +- [ ] **Step 3: Add the package scaffold and shared dataclasses** + +```python +# engine/__init__.py +"""Artifact Loop Engine package.""" +``` + +```python +# engine/models.py +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +@dataclass(frozen=True) +class ArtifactSpec: + include: list[str] + exclude: list[str] + max_files_per_iteration: int + + +@dataclass(frozen=True) +class MutationSpec: + mode: str + allowed_file_types: list[str] + max_changed_lines: int + + +@dataclass(frozen=True) +class RunnerSpec: + command: str + cwd: str + timeout_seconds: int + + +@dataclass(frozen=True) +class ScorerParseSpec: + format: str + score_field: str + metrics_field: str + + +@dataclass(frozen=True) +class ScorerSpec: + type: str + command: str + parse: ScorerParseSpec + + +@dataclass(frozen=True) +class ObjectiveSpec: + primary_metric: str + direction: str + + +@dataclass(frozen=True) +class ConstraintSpec: + metric: str + op: str + value: Any + + +@dataclass(frozen=True) +class PolicySpec: + keep_if: str + tie_breakers: list[dict[str, str]] + on_failure: str + + +@dataclass(frozen=True) +class BudgetSpec: + max_iterations: int + max_failures: int + + +@dataclass(frozen=True) +class LoggingSpec: + results_file: str + candidate_dir: str + + +@dataclass(frozen=True) +class TaskSpec: + id: str + description: str + artifacts: ArtifactSpec + mutation: MutationSpec + runner: RunnerSpec + scorer: ScorerSpec + objective: ObjectiveSpec + constraints: list[ConstraintSpec] + policy: PolicySpec + budget: BudgetSpec + logging: LoggingSpec + root_dir: Path + + +@dataclass(frozen=True) +class BaselineSnapshot: + file_contents: dict[Path, str] + file_hashes: dict[Path, str] + + +@dataclass(frozen=True) +class RunResult: + command: str + cwd: Path + exit_code: int + runtime_seconds: float + stdout: str + stderr: str + + +@dataclass(frozen=True) +class ScoreResult: + primary_score: float + metrics: dict[str, Any] + raw_output: dict[str, Any] + + +@dataclass(frozen=True) +class DecisionResult: + status: str + reason: str + baseline_score: float | None + candidate_score: float | None + constraint_failures: list[str] = field(default_factory=list) +``` + +```python +# tests/__init__.py +"""Unit tests for the artifact loop engine.""" +``` + +- [ ] **Step 4: Add `PyYAML` to dependencies** + +```toml +[project] +name = "autoresearch" +version = "0.1.0" +description = "Autonomous pretraining research swarm" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "kernels>=0.11.7", + "matplotlib>=3.10.8", + "numpy>=2.2.6", + "pandas>=2.3.3", + "pyarrow>=21.0.0", + "PyYAML>=6.0.2", + "requests>=2.32.0", + "rustbpe>=0.1.0", + "tiktoken>=0.11.0", + "torch==2.9.1", +] +``` + +- [ ] **Step 5: Run the smoke test again** + +Run: `uv run python -m unittest tests.test_task_loader -v` +Expected: FAIL with `ModuleNotFoundError: No module named 'engine.task_loader'` + +- [ ] **Step 6: Commit the bootstrap** + +```bash +git add pyproject.toml engine/__init__.py engine/models.py tests/__init__.py tests/test_task_loader.py +git commit -m "feat: bootstrap artifact loop engine package" +``` + +## Task 2: Implement YAML Task Loading And Validation + +**Files:** +- Create: `engine/task_loader.py` +- Modify: `tests/test_task_loader.py` +- Test: `tests/test_task_loader.py` + +- [ ] **Step 1: Expand the failing tests to cover validation** + +```python +# tests/test_task_loader.py +from pathlib import Path +import tempfile +import unittest + +from engine.task_loader import TaskValidationError, load_task + + +VALID_TASK = """ +id: demo +description: Demo task +artifacts: + include: + - tasks/demo/sample.txt + exclude: [] + max_files_per_iteration: 1 +mutation: + mode: direct_edit + allowed_file_types: [".txt"] + max_changed_lines: 10 +runner: + command: "python -c \\"print('run')\\"" + cwd: "." + timeout_seconds: 10 +scorer: + type: command + command: "python -c \\"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\\"" + parse: + format: json + score_field: "score" + metrics_field: "metrics" +objective: + primary_metric: score + direction: maximize +constraints: + - metric: violation_count + op: "<=" + value: 0 +policy: + keep_if: better_primary + tie_breakers: [] + on_failure: discard +budget: + max_iterations: 3 + max_failures: 1 +logging: + results_file: work/results.jsonl + candidate_dir: work/candidates +""" + + +class TaskLoaderTest(unittest.TestCase): + def write_task(self, content: str) -> Path: + temp_dir = tempfile.TemporaryDirectory() + self.addCleanup(temp_dir.cleanup) + task_path = Path(temp_dir.name) / "task.yaml" + task_path.write_text(content, encoding="utf-8") + return task_path + + def test_loads_minimal_task(self) -> None: + task = load_task(self.write_task(VALID_TASK)) + self.assertEqual(task.id, "demo") + self.assertEqual(task.artifacts.max_files_per_iteration, 1) + self.assertEqual(task.constraints[0].metric, "violation_count") + + def test_rejects_missing_required_section(self) -> None: + content = VALID_TASK.replace("objective:\n primary_metric: score\n direction: maximize\n", "") + with self.assertRaises(TaskValidationError) as ctx: + load_task(self.write_task(content)) + self.assertIn("objective", str(ctx.exception)) + + def test_rejects_invalid_direction(self) -> None: + content = VALID_TASK.replace("direction: maximize", "direction: sideways") + with self.assertRaises(TaskValidationError) as ctx: + load_task(self.write_task(content)) + self.assertIn("direction", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run the tests to verify they fail** + +Run: `uv run python -m unittest tests.test_task_loader -v` +Expected: `ModuleNotFoundError: No module named 'engine.task_loader'` + +- [ ] **Step 3: Implement the loader and validator** + +```python +# engine/task_loader.py +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import yaml + +from engine.models import ( + ArtifactSpec, + BudgetSpec, + ConstraintSpec, + LoggingSpec, + MutationSpec, + ObjectiveSpec, + PolicySpec, + RunnerSpec, + ScorerParseSpec, + ScorerSpec, + TaskSpec, +) + + +class TaskValidationError(ValueError): + """Raised when a task spec is invalid.""" + + +def _require_mapping(data: Any, name: str) -> dict[str, Any]: + if not isinstance(data, dict): + raise TaskValidationError(f"{name} must be a mapping") + return data + + +def _require_list(data: Any, name: str) -> list[Any]: + if not isinstance(data, list): + raise TaskValidationError(f"{name} must be a list") + return data + + +def _require_value(mapping: dict[str, Any], key: str) -> Any: + if key not in mapping: + raise TaskValidationError(f"missing required field: {key}") + return mapping[key] + + +def load_task(task_path: Path) -> TaskSpec: + raw = yaml.safe_load(task_path.read_text(encoding="utf-8")) + data = _require_mapping(raw, "task") + + objective = _require_mapping(_require_value(data, "objective"), "objective") + direction = _require_value(objective, "direction") + if direction not in {"maximize", "minimize"}: + raise TaskValidationError("objective.direction must be maximize or minimize") + + artifacts = _require_mapping(_require_value(data, "artifacts"), "artifacts") + mutation = _require_mapping(_require_value(data, "mutation"), "mutation") + runner = _require_mapping(_require_value(data, "runner"), "runner") + scorer = _require_mapping(_require_value(data, "scorer"), "scorer") + scorer_parse = _require_mapping(_require_value(scorer, "parse"), "scorer.parse") + policy = _require_mapping(_require_value(data, "policy"), "policy") + budget = _require_mapping(_require_value(data, "budget"), "budget") + logging = _require_mapping(_require_value(data, "logging"), "logging") + + constraint_specs = [] + for item in _require_list(_require_value(data, "constraints"), "constraints"): + mapping = _require_mapping(item, "constraint") + constraint_specs.append( + ConstraintSpec( + metric=str(_require_value(mapping, "metric")), + op=str(_require_value(mapping, "op")), + value=_require_value(mapping, "value"), + ) + ) + + return TaskSpec( + id=str(_require_value(data, "id")), + description=str(_require_value(data, "description")), + artifacts=ArtifactSpec( + include=[str(item) for item in _require_list(_require_value(artifacts, "include"), "artifacts.include")], + exclude=[str(item) for item in _require_list(artifacts.get("exclude", []), "artifacts.exclude")], + max_files_per_iteration=int(_require_value(artifacts, "max_files_per_iteration")), + ), + mutation=MutationSpec( + mode=str(_require_value(mutation, "mode")), + allowed_file_types=[str(item) for item in _require_list(_require_value(mutation, "allowed_file_types"), "mutation.allowed_file_types")], + max_changed_lines=int(_require_value(mutation, "max_changed_lines")), + ), + runner=RunnerSpec( + command=str(_require_value(runner, "command")), + cwd=str(_require_value(runner, "cwd")), + timeout_seconds=int(_require_value(runner, "timeout_seconds")), + ), + scorer=ScorerSpec( + type=str(_require_value(scorer, "type")), + command=str(_require_value(scorer, "command")), + parse=ScorerParseSpec( + format=str(_require_value(scorer_parse, "format")), + score_field=str(_require_value(scorer_parse, "score_field")), + metrics_field=str(_require_value(scorer_parse, "metrics_field")), + ), + ), + objective=ObjectiveSpec( + primary_metric=str(_require_value(objective, "primary_metric")), + direction=str(direction), + ), + constraints=constraint_specs, + policy=PolicySpec( + keep_if=str(_require_value(policy, "keep_if")), + tie_breakers=[dict(item) for item in _require_list(policy.get("tie_breakers", []), "policy.tie_breakers")], + on_failure=str(_require_value(policy, "on_failure")), + ), + budget=BudgetSpec( + max_iterations=int(_require_value(budget, "max_iterations")), + max_failures=int(_require_value(budget, "max_failures")), + ), + logging=LoggingSpec( + results_file=str(_require_value(logging, "results_file")), + candidate_dir=str(_require_value(logging, "candidate_dir")), + ), + root_dir=task_path.parent, + ) +``` + +- [ ] **Step 4: Run the task loader tests to verify they pass** + +Run: `uv run python -m unittest tests.test_task_loader -v` +Expected: `OK` + +- [ ] **Step 5: Commit the task loader** + +```bash +git add engine/task_loader.py tests/test_task_loader.py +git commit -m "feat: add yaml task loader" +``` + +## Task 3: Add Artifact Snapshot, Diff, And Restore + +**Files:** +- Create: `engine/artifact_manager.py` +- Create: `tests/test_artifact_manager.py` +- Test: `tests/test_artifact_manager.py` + +- [ ] **Step 1: Write failing artifact manager tests** + +```python +# tests/test_artifact_manager.py +from pathlib import Path +import tempfile +import unittest + +from engine.artifact_manager import ArtifactManager +from engine.models import ArtifactSpec, BaselineSnapshot, TaskSpec +from engine.models import BudgetSpec, ConstraintSpec, LoggingSpec, MutationSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec + + +def make_task(root_dir: Path) -> TaskSpec: + return TaskSpec( + id="demo", + description="Demo", + artifacts=ArtifactSpec(include=["artifacts/*.md"], exclude=["artifacts/ignore.md"], max_files_per_iteration=1), + mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=20), + runner=RunnerSpec(command="python -c \"print('run')\"", cwd=".", timeout_seconds=10), + scorer=ScorerSpec( + type="command", + command="python -c \"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\"", + parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"), + ), + objective=ObjectiveSpec(primary_metric="score", direction="maximize"), + constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)], + policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"), + budget=BudgetSpec(max_iterations=1, max_failures=1), + logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"), + root_dir=root_dir, + ) + + +class ArtifactManagerTest(unittest.TestCase): + def test_snapshot_and_restore(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + artifact_dir = root / "artifacts" + artifact_dir.mkdir() + target = artifact_dir / "sample.md" + target.write_text("hello\n", encoding="utf-8") + manager = ArtifactManager(make_task(root)) + snapshot = manager.snapshot() + target.write_text("changed\n", encoding="utf-8") + manager.restore(snapshot) + self.assertEqual(target.read_text(encoding="utf-8"), "hello\n") + + def test_diff_summary_contains_changed_line(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + artifact_dir = root / "artifacts" + artifact_dir.mkdir() + target = artifact_dir / "sample.md" + target.write_text("before\n", encoding="utf-8") + manager = ArtifactManager(make_task(root)) + snapshot = manager.snapshot() + target.write_text("after\n", encoding="utf-8") + summary = manager.diff_summary(snapshot) + self.assertIn("-before", summary) + self.assertIn("+after", summary) + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run the tests to verify they fail** + +Run: `uv run python -m unittest tests.test_artifact_manager -v` +Expected: `ModuleNotFoundError: No module named 'engine.artifact_manager'` + +- [ ] **Step 3: Implement snapshot, diff, and restore** + +```python +# engine/artifact_manager.py +from __future__ import annotations + +from difflib import unified_diff +from fnmatch import fnmatch +import hashlib +from pathlib import Path + +from engine.models import BaselineSnapshot, TaskSpec + + +class ArtifactManager: + def __init__(self, task: TaskSpec) -> None: + self.task = task + + def resolve_paths(self) -> list[Path]: + matched: list[Path] = [] + for pattern in self.task.artifacts.include: + matched.extend(self.task.root_dir.glob(pattern)) + files = [path for path in matched if path.is_file()] + excluded = set() + for path in files: + relative = path.relative_to(self.task.root_dir).as_posix() + if any(fnmatch(relative, pattern) for pattern in self.task.artifacts.exclude): + excluded.add(path) + resolved = [path for path in files if path not in excluded] + return sorted(dict.fromkeys(resolved)) + + def snapshot(self) -> BaselineSnapshot: + file_contents: dict[Path, str] = {} + file_hashes: dict[Path, str] = {} + for path in self.resolve_paths(): + content = path.read_text(encoding="utf-8") + file_contents[path] = content + file_hashes[path] = hashlib.sha256(content.encode("utf-8")).hexdigest() + return BaselineSnapshot(file_contents=file_contents, file_hashes=file_hashes) + + def restore(self, snapshot: BaselineSnapshot) -> None: + for path, content in snapshot.file_contents.items(): + path.write_text(content, encoding="utf-8") + + def diff_summary(self, snapshot: BaselineSnapshot) -> str: + chunks: list[str] = [] + for path, before in snapshot.file_contents.items(): + after = path.read_text(encoding="utf-8") + if before == after: + continue + diff = unified_diff( + before.splitlines(), + after.splitlines(), + fromfile=str(path), + tofile=str(path), + lineterm="", + ) + chunks.append("\n".join(diff)) + return "\n\n".join(chunks) +``` + +- [ ] **Step 4: Run the artifact manager tests to verify they pass** + +Run: `uv run python -m unittest tests.test_artifact_manager -v` +Expected: `OK` + +- [ ] **Step 5: Commit the artifact manager** + +```bash +git add engine/artifact_manager.py tests/test_artifact_manager.py +git commit -m "feat: add artifact snapshot and restore support" +``` + +## Task 4: Implement Runner, Scorer, And Decision Engine + +**Files:** +- Create: `engine/runner.py` +- Create: `engine/scorer.py` +- Create: `engine/decision_engine.py` +- Create: `tests/test_execution_pipeline.py` +- Test: `tests/test_execution_pipeline.py` + +- [ ] **Step 1: Write failing execution pipeline tests** + +```python +# tests/test_execution_pipeline.py +from pathlib import Path +import tempfile +import unittest + +from engine.decision_engine import decide_candidate +from engine.models import ConstraintSpec, ObjectiveSpec, RunResult, ScoreResult +from engine.runner import run_command +from engine.scorer import parse_score_output + + +class ExecutionPipelineTest(unittest.TestCase): + def test_run_command_captures_stdout(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + result = run_command("python -c \"print('ok')\"", Path(tmp), timeout_seconds=5) + self.assertEqual(result.exit_code, 0) + self.assertIn("ok", result.stdout) + + def test_parse_score_output_reads_primary_score(self) -> None: + score = parse_score_output( + '{"score": 4.5, "metrics": {"violation_count": 0}}', + score_field="score", + metrics_field="metrics", + ) + self.assertEqual(score.primary_score, 4.5) + self.assertEqual(score.metrics["violation_count"], 0) + + def test_decide_candidate_rejects_constraint_failures(self) -> None: + decision = decide_candidate( + baseline=3.0, + candidate=ScoreResult( + primary_score=5.0, + metrics={"violation_count": 1}, + raw_output={"score": 5.0, "metrics": {"violation_count": 1}}, + ), + objective=ObjectiveSpec(primary_metric="score", direction="maximize"), + constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)], + tie_breakers=[], + run_result=RunResult( + command="python -c \"print('ok')\"", + cwd=Path("."), + exit_code=0, + runtime_seconds=0.1, + stdout="ok\n", + stderr="", + ), + ) + self.assertEqual(decision.status, "discard") + self.assertIn("violation_count", decision.reason) + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run the tests to verify they fail** + +Run: `uv run python -m unittest tests.test_execution_pipeline -v` +Expected: `ModuleNotFoundError` for the new engine modules + +- [ ] **Step 3: Implement subprocess execution** + +```python +# engine/runner.py +from __future__ import annotations + +from pathlib import Path +import subprocess +import time + +from engine.models import RunResult + + +def run_command(command: str, cwd: Path, timeout_seconds: int) -> RunResult: + start = time.perf_counter() + completed = subprocess.run( + command, + cwd=str(cwd), + shell=True, + capture_output=True, + text=True, + encoding="utf-8", + timeout=timeout_seconds, + check=False, + ) + runtime = time.perf_counter() - start + return RunResult( + command=command, + cwd=cwd, + exit_code=completed.returncode, + runtime_seconds=runtime, + stdout=completed.stdout, + stderr=completed.stderr, + ) +``` + +```python +# engine/scorer.py +from __future__ import annotations + +import json + +from engine.models import ScoreResult + + +def parse_score_output(output: str, score_field: str, metrics_field: str) -> ScoreResult: + payload = json.loads(output) + metrics = payload[metrics_field] + return ScoreResult( + primary_score=float(payload[score_field]), + metrics=dict(metrics), + raw_output=payload, + ) +``` + +```python +# engine/decision_engine.py +from __future__ import annotations + +from engine.models import ConstraintSpec, DecisionResult, ObjectiveSpec, RunResult, ScoreResult + + +def _constraint_failed(score: ScoreResult, constraint: ConstraintSpec) -> bool: + value = score.metrics.get(constraint.metric) + if constraint.op == "<=": + return value > constraint.value + if constraint.op == ">=": + return value < constraint.value + if constraint.op == "==": + return value != constraint.value + raise ValueError(f"unsupported constraint operator: {constraint.op}") + + +def decide_candidate( + baseline: float | None, + candidate: ScoreResult, + objective: ObjectiveSpec, + constraints: list[ConstraintSpec], + tie_breakers: list[dict[str, str]], + run_result: RunResult, +) -> DecisionResult: + if run_result.exit_code != 0: + return DecisionResult(status="crash", reason="runner exited with non-zero status", baseline_score=baseline, candidate_score=None) + + failures = [constraint.metric for constraint in constraints if _constraint_failed(candidate, constraint)] + if failures: + return DecisionResult( + status="discard", + reason=f"constraint failure: {', '.join(failures)}", + baseline_score=baseline, + candidate_score=candidate.primary_score, + constraint_failures=failures, + ) + + if baseline is None: + return DecisionResult(status="keep", reason="no baseline yet", baseline_score=None, candidate_score=candidate.primary_score) + + is_better = candidate.primary_score > baseline if objective.direction == "maximize" else candidate.primary_score < baseline + if is_better: + return DecisionResult(status="keep", reason="primary metric improved", baseline_score=baseline, candidate_score=candidate.primary_score) + + return DecisionResult(status="discard", reason="primary metric did not improve", baseline_score=baseline, candidate_score=candidate.primary_score) +``` + +- [ ] **Step 4: Run the execution pipeline tests to verify they pass** + +Run: `uv run python -m unittest tests.test_execution_pipeline -v` +Expected: `OK` + +- [ ] **Step 5: Commit the execution core** + +```bash +git add engine/runner.py engine/scorer.py engine/decision_engine.py tests/test_execution_pipeline.py +git commit -m "feat: add execution, scoring, and decision modules" +``` + +## Task 5: Build The CLI And A Deterministic Sample Task + +**Files:** +- Create: `scripts/run_task.py` +- Create: `scripts/evaluate_skill_task.py` +- Create: `scripts/score_skill_task.py` +- Create: `tasks/skill-quality/task.yaml` +- Create: `tasks/skill-quality/rubric.md` +- Create: `tasks/skill-quality/prompt.md` +- Create: `tasks/skill-quality/fixtures/SKILL.md` +- Modify: `tests/test_execution_pipeline.py` +- Test: `tests/test_execution_pipeline.py` + +- [ ] **Step 1: Add a failing end-to-end CLI test** + +```python +# tests/test_execution_pipeline.py +from pathlib import Path +import json +import subprocess +import tempfile +import textwrap +import unittest + + +class RunTaskCliTest(unittest.TestCase): + def test_run_task_writes_results_jsonl(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + (root / "tasks" / "skill-quality" / "fixtures").mkdir(parents=True) + (root / "work").mkdir() + (root / "tasks" / "skill-quality" / "fixtures" / "SKILL.md").write_text( + "# Skill\n\n## Goal\nWrite clear plans.\n", + encoding="utf-8", + ) + (root / "tasks" / "skill-quality" / "rubric.md").write_text( + "Required headings: Goal, Constraints, Examples\n", + encoding="utf-8", + ) + (root / "tasks" / "skill-quality" / "prompt.md").write_text( + "Keep the skill concise and structured.\n", + encoding="utf-8", + ) + (root / "tasks" / "skill-quality" / "task.yaml").write_text( + textwrap.dedent( + ''' + id: skill-quality + description: Score a skill file + artifacts: + include: + - fixtures/SKILL.md + exclude: [] + max_files_per_iteration: 1 + mutation: + mode: direct_edit + allowed_file_types: [".md"] + max_changed_lines: 20 + runner: + command: "python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json" + cwd: "tasks/skill-quality" + timeout_seconds: 10 + scorer: + type: command + command: "python scripts/score_skill_task.py --input work/skill-run.json" + parse: + format: json + score_field: score + metrics_field: metrics + objective: + primary_metric: score + direction: maximize + constraints: + - metric: violation_count + op: "<=" + value: 0 + policy: + keep_if: better_primary + tie_breakers: [] + on_failure: discard + budget: + max_iterations: 1 + max_failures: 1 + logging: + results_file: work/results.jsonl + candidate_dir: work/candidates + ''' + ).strip(), + encoding="utf-8", + ) + result = subprocess.run( + ["uv", "run", "python", "scripts/run_task.py", "--task", "tasks/skill-quality/task.yaml"], + cwd=root, + capture_output=True, + text=True, + encoding="utf-8", + check=False, + ) + self.assertEqual(result.returncode, 0, msg=result.stderr) + results_path = root / "work" / "results.jsonl" + self.assertTrue(results_path.exists()) + payload = json.loads(results_path.read_text(encoding="utf-8").splitlines()[0]) + self.assertEqual(payload["status"], "discard") + self.assertGreater(payload["candidate_score"], 0) + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run the CLI test to verify it fails** + +Run: `uv run python -m unittest tests.test_execution_pipeline.RunTaskCliTest -v` +Expected: FAIL with `No such file or directory: scripts/run_task.py` + +- [ ] **Step 3: Implement the CLI and deterministic sample task** + +```python +# scripts/evaluate_skill_task.py +from __future__ import annotations + +import argparse +import json +from pathlib import Path + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--task-dir", required=True) + parser.add_argument("--artifact", required=True) + parser.add_argument("--output", required=True) + args = parser.parse_args() + + task_dir = Path(args.task_dir).resolve() + artifact_path = (task_dir / args.artifact).resolve() + rubric_text = (task_dir / "rubric.md").read_text(encoding="utf-8") + artifact_text = artifact_path.read_text(encoding="utf-8") + + required_headings = ["## Goal", "## Constraints", "## Examples"] + present = sum(1 for heading in required_headings if heading in artifact_text) + coverage = present / len(required_headings) + lines = [line.strip() for line in artifact_text.splitlines() if line.strip()] + average_line_length = sum(len(line) for line in lines) / max(len(lines), 1) + clarity = max(0.0, 1.0 - max(0.0, average_line_length - 80.0) / 120.0) + violation_count = 0 if "Do not" in artifact_text else 1 + score = round((coverage * 70.0) + (clarity * 30.0), 4) + + payload = { + "score": score, + "metrics": { + "coverage": round(coverage, 4), + "clarity": round(clarity, 4), + "violation_count": violation_count, + "length_tokens": len(artifact_text.split()), + "rubric_excerpt": rubric_text[:80], + }, + } + output_path = Path(args.output).resolve() + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(payload), encoding="utf-8") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) +``` + +```python +# scripts/score_skill_task.py +from __future__ import annotations + +import argparse +from pathlib import Path + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--input", required=True) + args = parser.parse_args() + payload = Path(args.input).read_text(encoding="utf-8") + print(payload) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) +``` + +```python +# scripts/run_task.py +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from engine.artifact_manager import ArtifactManager +from engine.decision_engine import decide_candidate +from engine.runner import run_command +from engine.scorer import parse_score_output +from engine.task_loader import load_task + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--task", required=True) + args = parser.parse_args() + + root_dir = Path.cwd() + task_path = (root_dir / args.task).resolve() + task = load_task(task_path) + manager = ArtifactManager(task) + snapshot = manager.snapshot() + + run_result = run_command( + command=task.runner.command, + cwd=(root_dir / task.runner.cwd).resolve(), + timeout_seconds=task.runner.timeout_seconds, + ) + score_run = run_command( + command=task.scorer.command, + cwd=root_dir, + timeout_seconds=task.runner.timeout_seconds, + ) + score = parse_score_output( + score_run.stdout, + score_field=task.scorer.parse.score_field, + metrics_field=task.scorer.parse.metrics_field, + ) + decision = decide_candidate( + baseline=None, + candidate=score, + objective=task.objective, + constraints=task.constraints, + tie_breakers=task.policy.tie_breakers, + run_result=run_result, + ) + + results_path = (root_dir / task.logging.results_file).resolve() + results_path.parent.mkdir(parents=True, exist_ok=True) + record = { + "task_id": task.id, + "status": decision.status, + "reason": decision.reason, + "candidate_score": decision.candidate_score, + "diff_summary": manager.diff_summary(snapshot), + } + with results_path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(record) + "\n") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) +``` + +```yaml +# tasks/skill-quality/task.yaml +id: skill-quality +description: Score one skill file against a deterministic rubric. +artifacts: + include: + - fixtures/SKILL.md + exclude: [] + max_files_per_iteration: 1 +mutation: + mode: direct_edit + allowed_file_types: [".md"] + max_changed_lines: 20 +runner: + command: "python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json" + cwd: "tasks/skill-quality" + timeout_seconds: 30 +scorer: + type: command + command: "python scripts/score_skill_task.py --input work/skill-run.json" + parse: + format: json + score_field: score + metrics_field: metrics +objective: + primary_metric: score + direction: maximize +constraints: + - metric: violation_count + op: "<=" + value: 0 +policy: + keep_if: better_primary + tie_breakers: [] + on_failure: discard +budget: + max_iterations: 5 + max_failures: 3 +logging: + results_file: work/results.jsonl + candidate_dir: work/candidates +``` + +```markdown +# tasks/skill-quality/rubric.md +# Skill Quality Rubric + +- Required headings: `## Goal`, `## Constraints`, `## Examples` +- Must include at least one explicit prohibition using `Do not` +- Prefer short, direct sentences +``` + +```markdown +# tasks/skill-quality/prompt.md +Improve the skill file while preserving its intent. + +Priorities: +- Add missing required sections +- Keep guidance concise +- Include at least one explicit prohibition +- Avoid filler text +``` + +```markdown +# tasks/skill-quality/fixtures/SKILL.md +# Planning Skill + +## Goal +Write clear implementation plans for multi-step work. + +## Constraints +Do not omit concrete commands or expected outcomes. + +## Examples +- Show exact test commands. +- Keep tasks small and reviewable. +``` + +- [ ] **Step 4: Run the end-to-end tests** + +Run: `uv run python -m unittest tests.test_execution_pipeline -v` +Expected: `OK` + +- [ ] **Step 5: Manually run the sample task** + +Run: `uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml` +Expected: exit code `0` and one JSON line in `work/results.jsonl` + +- [ ] **Step 6: Commit the CLI and sample task** + +```bash +git add scripts/run_task.py scripts/evaluate_skill_task.py scripts/score_skill_task.py tasks/skill-quality tests/test_execution_pipeline.py +git commit -m "feat: add artifact loop cli and sample skill task" +``` + +## Task 6: Add Bounded Mutation Validation + +**Files:** +- Create: `engine/mutation_engine.py` +- Create: `tests/test_mutation_engine.py` +- Modify: `scripts/run_task.py` +- Test: `tests/test_mutation_engine.py` + +- [ ] **Step 1: Write failing mutation guard tests** + +```python +# tests/test_mutation_engine.py +from pathlib import Path +import tempfile +import unittest + +from engine.mutation_engine import MutationValidationError, validate_candidate_changes +from engine.models import ArtifactSpec, BaselineSnapshot, MutationSpec, TaskSpec +from engine.models import BudgetSpec, ConstraintSpec, LoggingSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec + + +def make_task(root_dir: Path) -> TaskSpec: + return TaskSpec( + id="demo", + description="Demo", + artifacts=ArtifactSpec(include=["artifacts/*.md"], exclude=[], max_files_per_iteration=1), + mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=3), + runner=RunnerSpec(command="python -c \"print('run')\"", cwd=".", timeout_seconds=10), + scorer=ScorerSpec( + type="command", + command="python -c \"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\"", + parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"), + ), + objective=ObjectiveSpec(primary_metric="score", direction="maximize"), + constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)], + policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"), + budget=BudgetSpec(max_iterations=1, max_failures=1), + logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"), + root_dir=root_dir, + ) + + +class MutationEngineTest(unittest.TestCase): + def test_rejects_too_many_changed_lines(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + artifact_dir = root / "artifacts" + artifact_dir.mkdir() + target = artifact_dir / "sample.md" + target.write_text("a\nb\nc\n", encoding="utf-8") + snapshot = BaselineSnapshot(file_contents={target: "a\nb\nc\n"}, file_hashes={target: "hash"}) + target.write_text("a\nx\ny\nz\n", encoding="utf-8") + with self.assertRaises(MutationValidationError): + validate_candidate_changes(make_task(root), snapshot) + + def test_rejects_disallowed_extension(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + artifact_dir = root / "artifacts" + artifact_dir.mkdir() + target = artifact_dir / "sample.txt" + target.write_text("before\n", encoding="utf-8") + snapshot = BaselineSnapshot(file_contents={target: "before\n"}, file_hashes={target: "hash"}) + target.write_text("after\n", encoding="utf-8") + with self.assertRaises(MutationValidationError): + validate_candidate_changes(make_task(root), snapshot) + + +if __name__ == "__main__": + unittest.main() +``` +``` diff --git a/docs/superpowers/plans/2026-04-02-baseline-aware-single-iteration-orchestrator.md b/docs/superpowers/plans/2026-04-02-baseline-aware-single-iteration-orchestrator.md new file mode 100644 index 0000000..2db3195 --- /dev/null +++ b/docs/superpowers/plans/2026-04-02-baseline-aware-single-iteration-orchestrator.md @@ -0,0 +1,1289 @@ +# Baseline-Aware Single-Iteration Orchestrator Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a baseline-aware single-iteration orchestrator that generates one candidate in a sandbox, validates it against mutation budgets, runs and scores it in isolation, then keeps or discards the candidate without corrupting the main workspace. + +**Architecture:** Extend the task schema with a `mutator` section, then introduce an `engine/orchestrator.py` layer that owns sandbox lifecycle and candidate sync-back. Keep the existing loader, artifact manager, runner, scorer, and decision engine modules as foundations, but move orchestration decisions into the new layer and have `scripts/run_task.py` become a thin entrypoint. + +**Tech Stack:** Python 3.10+, standard library, `PyYAML`, `uv`, `unittest` + +--- + +## File Map + +### New Files + +- `engine/orchestrator.py` - baseline-aware single-iteration orchestration +- `scripts/mutate_skill_task.py` - deterministic sample mutator for the `skill-quality` task +- `tests/test_orchestrator.py` - sandbox keep/discard/crash coverage + +### Modified Files + +- `engine/models.py` - add `MutatorSpec` and extend `TaskSpec` +- `engine/task_loader.py` - parse and validate `mutator` +- `scripts/run_task.py` - delegate to orchestrator instead of hand-rolled flow +- `tasks/skill-quality/task.yaml` - add a concrete `mutator` +- `README.md` - document the baseline-aware single-iteration behavior + +## Task 1: Extend The Task Schema For Mutators + +**Files:** +- Modify: `engine/models.py` +- Modify: `engine/task_loader.py` +- Modify: `tests/test_task_loader.py` +- Test: `tests/test_task_loader.py` + +- [ ] **Step 1: Add a failing mutator test to the task loader suite** + +```python +# tests/test_task_loader.py +from pathlib import Path +import tempfile +import unittest + +from engine.task_loader import TaskValidationError, load_task + + +VALID_TASK = """ +id: demo +description: Demo task +artifacts: + include: + - tasks/demo/sample.txt + exclude: [] + max_files_per_iteration: 1 +mutation: + mode: direct_edit + allowed_file_types: [".txt"] + max_changed_lines: 10 +mutator: + type: command + command: "python scripts/mutate.py" + cwd: "." + timeout_seconds: 30 +runner: + command: "python -c \\\"print('run')\\\"" + cwd: "." + timeout_seconds: 10 +scorer: + type: command + command: "python -c \\\"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\\\"" + parse: + format: json + score_field: "score" + metrics_field: "metrics" +objective: + primary_metric: score + direction: maximize +constraints: + - metric: violation_count + op: "<=" + value: 0 +policy: + keep_if: better_primary + tie_breakers: [] + on_failure: discard +budget: + max_iterations: 3 + max_failures: 1 +logging: + results_file: work/results.jsonl + candidate_dir: work/candidates +""" + + +class TaskLoaderTest(unittest.TestCase): + def write_task(self, content: str) -> Path: + temp_dir = tempfile.TemporaryDirectory() + self.addCleanup(temp_dir.cleanup) + task_path = Path(temp_dir.name) / "task.yaml" + task_path.write_text(content, encoding="utf-8") + return task_path + + def test_loads_minimal_task(self) -> None: + task = load_task(self.write_task(VALID_TASK)) + self.assertEqual(task.id, "demo") + self.assertEqual(task.artifacts.max_files_per_iteration, 1) + self.assertEqual(task.constraints[0].metric, "violation_count") + + def test_loads_mutator_spec(self) -> None: + task = load_task(self.write_task(VALID_TASK)) + self.assertEqual(task.mutator.type, "command") + self.assertEqual(task.mutator.command, "python scripts/mutate.py") + self.assertEqual(task.mutator.timeout_seconds, 30) + + def test_rejects_missing_required_section(self) -> None: + content = VALID_TASK.replace("objective:\n primary_metric: score\n direction: maximize\n", "") + with self.assertRaises(TaskValidationError) as ctx: + load_task(self.write_task(content)) + self.assertIn("objective", str(ctx.exception)) + + def test_rejects_invalid_direction(self) -> None: + content = VALID_TASK.replace("direction: maximize", "direction: sideways") + with self.assertRaises(TaskValidationError) as ctx: + load_task(self.write_task(content)) + self.assertIn("direction", str(ctx.exception)) + + def test_rejects_invalid_mutator_type(self) -> None: + content = VALID_TASK.replace("type: command", "type: agent", 1) + with self.assertRaises(TaskValidationError) as ctx: + load_task(self.write_task(content)) + self.assertIn("mutator.type", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run the mutation tests to verify they fail** + +Run: `uv run python -m unittest tests.test_mutation_engine -v` +Expected: FAIL because `validate_candidate_changes` does not accept a candidate root yet + +- [ ] **Step 3: Make validation compare baseline snapshot to candidate workspace** + +```python +# engine/mutation_engine.py +from __future__ import annotations + +from difflib import unified_diff +from pathlib import Path + +from engine.models import BaselineSnapshot, TaskSpec + + +class MutationValidationError(ValueError): + pass + + +def _count_changed_lines(before: str, after: str, path: Path) -> int: + diff = unified_diff( + before.splitlines(keepends=True), + after.splitlines(keepends=True), + fromfile=f"{path.as_posix()} (before)", + tofile=f"{path.as_posix()} (after)", + ) + changed_lines = 0 + for line in diff: + if line.startswith(("---", "+++", "@@")): + continue + if line.startswith(("+", "-")): + changed_lines += 1 + return changed_lines + + +def validate_candidate_changes(task: TaskSpec, snapshot: BaselineSnapshot, candidate_root: Path) -> None: + changed_files = 0 + changed_lines = 0 + allowed_file_types = set(task.mutation.allowed_file_types) + + for baseline_path, baseline_text in snapshot.file_contents.items(): + relative = baseline_path.relative_to(task.root_dir) + candidate_path = candidate_root / relative + current_text = candidate_path.read_text(encoding="utf-8") if candidate_path.exists() else "" + if current_text == baseline_text: + continue + + changed_files += 1 + if candidate_path.suffix not in allowed_file_types: + raise MutationValidationError(f"disallowed file type: {candidate_path.suffix}") + changed_lines += _count_changed_lines(baseline_text, current_text, candidate_path) + + for path in sorted(candidate_root.rglob("*")): + if not path.is_file(): + continue + relative = path.relative_to(candidate_root) + baseline_path = task.root_dir / relative + if baseline_path in snapshot.file_contents: + continue + changed_files += 1 + if path.suffix not in allowed_file_types: + raise MutationValidationError(f"disallowed file type: {path.suffix}") + changed_lines += _count_changed_lines("", path.read_text(encoding="utf-8"), path) + + if changed_files > task.artifacts.max_files_per_iteration: + raise MutationValidationError( + f"too many changed files: {changed_files} > {task.artifacts.max_files_per_iteration}" + ) + if changed_lines > task.mutation.max_changed_lines: + raise MutationValidationError( + f"too many changed lines: {changed_lines} > {task.mutation.max_changed_lines}" + ) +``` + +- [ ] **Step 4: Run the mutation tests to verify they pass** + +Run: `uv run python -m unittest tests.test_mutation_engine -v` +Expected: `OK` + +- [ ] **Step 5: Commit the baseline-aware mutation validation** + +```bash +git add engine/mutation_engine.py tests/test_mutation_engine.py +git commit -m "feat: compare candidate workspace against baseline snapshot" +``` + +## Task 4: Wire The CLI To The Orchestrator + +**Files:** +- Modify: `scripts/run_task.py` +- Modify: `tests/test_execution_pipeline.py` +- Test: `tests/test_execution_pipeline.py` + +- [ ] **Step 1: Add a failing sandbox orchestration test** + +```python +# tests/test_execution_pipeline.py +from pathlib import Path +import json +import shutil +import subprocess +import tempfile +import unittest + + +class RunTaskCliTest(unittest.TestCase): + def test_run_task_cli_keeps_candidate_from_sandbox(self) -> None: + source_root = Path(__file__).resolve().parents[1] + with tempfile.TemporaryDirectory() as tmp: + temp_root = Path(tmp) + shutil.copytree(source_root / "engine", temp_root / "engine") + shutil.copytree(source_root / "scripts", temp_root / "scripts") + shutil.copytree(source_root / "tasks", temp_root / "tasks") + + completed = subprocess.run( + ["uv", "run", "python", "scripts/run_task.py", "--task", "tasks/skill-quality/task.yaml"], + cwd=str(temp_root), + capture_output=True, + text=True, + encoding="utf-8", + check=False, + ) + + self.assertEqual(completed.returncode, 0, msg=completed.stderr) + artifact_text = (temp_root / "tasks" / "skill-quality" / "fixtures" / "SKILL.md").read_text(encoding="utf-8") + self.assertIn("## When to Use", artifact_text) + record = json.loads((temp_root / "work" / "results.jsonl").read_text(encoding="utf-8").splitlines()[-1]) + self.assertEqual(record["status"], "keep") + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run the execution pipeline tests to verify they fail** + +Run: `uv run python -m unittest tests.test_execution_pipeline -v` +Expected: FAIL because the current CLI has no mutator support or sandbox orchestration + +- [ ] **Step 3: Replace the hand-built CLI flow with orchestrator delegation** + +```python +# scripts/run_task.py +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parents[1] +if str(ROOT_DIR) not in sys.path: + sys.path.insert(0, str(ROOT_DIR)) + +from engine.orchestrator import run_single_iteration +from engine.task_loader import load_task + + +def _resolve_repo_path(repo_root: Path, raw_path: str) -> Path: + path = Path(raw_path) + if path.is_absolute(): + return path.resolve() + return (repo_root / path).resolve() + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--task", required=True) + return parser.parse_args() + + +def _append_record(repo_root: Path, results_file: str, record: dict[str, object]) -> None: + results_path = _resolve_repo_path(repo_root, results_file) + results_path.parent.mkdir(parents=True, exist_ok=True) + with results_path.open("a", encoding="utf-8", newline="") as handle: + handle.write(json.dumps(record, ensure_ascii=False) + "\n") + + +def main() -> int: + args = parse_args() + repo_root = ROOT_DIR.resolve() + task_path = _resolve_repo_path(repo_root, args.task) + task = load_task(task_path) + + decision = run_single_iteration(task, baseline_score=None) + record = { + "task_id": task.id, + "status": decision.status, + "reason": decision.reason, + "candidate_score": decision.candidate_score, + "diff_summary": "", + } + _append_record(repo_root, task.logging.results_file, record) + print(json.dumps(record, ensure_ascii=False)) + return 1 if decision.status == "crash" else 0 + + +if __name__ == "__main__": + raise SystemExit(main()) +``` + +- [ ] **Step 4: Run the execution pipeline tests to verify they pass** + +Run: `uv run python -m unittest tests.test_execution_pipeline -v` +Expected: `OK` + +- [ ] **Step 5: Commit the CLI orchestration wiring** + +```bash +git add scripts/run_task.py tests/test_execution_pipeline.py +git commit -m "feat: route task runner through sandbox orchestrator" +``` + +## Task 5: Add A Deterministic Sample Mutator + +**Files:** +- Create: `scripts/mutate_skill_task.py` +- Modify: `tasks/skill-quality/task.yaml` +- Test: `tests/test_execution_pipeline.py` + +- [ ] **Step 1: Add the sample mutator script** + +```python +# scripts/mutate_skill_task.py +from __future__ import annotations + +import argparse +from pathlib import Path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--task-dir", required=True) + parser.add_argument("--artifact", required=True) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + task_dir = Path(args.task_dir).resolve() + artifact_path = (task_dir / args.artifact).resolve() + sections = [ + "# Planning Skill", + "", + "## When to Use", + "- Use this task when a skill file is missing structure.", + "", + "## Steps", + "1. Add the missing sections.", + "2. Keep the instructions direct.", + "", + "## Constraints", + "Do not add filler content.", + "", + "## Examples", + "- Show concrete commands.", + ] + artifact_path.write_text("\n".join(sections) + "\n", encoding="utf-8") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) +``` + +- [ ] **Step 2: Add `mutator` to the sample task** + +```yaml +# tasks/skill-quality/task.yaml +id: skill-quality +description: Score one skill file against a deterministic rubric. +artifacts: + include: + - fixtures/SKILL.md + exclude: [] + max_files_per_iteration: 1 +mutation: + mode: direct_edit + allowed_file_types: [".md"] + max_changed_lines: 40 +mutator: + type: command + command: "python ../../scripts/mutate_skill_task.py --task-dir . --artifact fixtures/SKILL.md" + cwd: "tasks/skill-quality" + timeout_seconds: 30 +runner: + command: "python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json" + cwd: "tasks/skill-quality" + timeout_seconds: 30 +scorer: + type: command + command: "python scripts/score_skill_task.py --input work/skill-run.json" + parse: + format: json + score_field: score + metrics_field: metrics +objective: + primary_metric: score + direction: maximize +constraints: + - metric: violation_count + op: "<=" + value: 0 +policy: + keep_if: better_primary + tie_breakers: [] + on_failure: discard +budget: + max_iterations: 5 + max_failures: 3 +logging: + results_file: work/results.jsonl + candidate_dir: work/candidates +``` + +- [ ] **Step 3: Run the execution pipeline tests to verify the sample task passes** + +Run: `uv run python -m unittest tests.test_execution_pipeline -v` +Expected: `OK` + +- [ ] **Step 4: Commit the sample mutator** + +```bash +git add scripts/mutate_skill_task.py tasks/skill-quality/task.yaml +git commit -m "feat: add deterministic sample mutator" +``` + +## Task 6: Update The README For The Baseline-Aware Iteration Model + +**Files:** +- Modify: `README.md` +- Test: none + +- [ ] **Step 1: Update the Artifact Loop Engine section** + +```markdown +## Artifact Loop Engine + +This repository also includes a generic optimization engine for editable text artifacts such as prompts, skills, config files, and small code paths. + +The current CLI now runs a baseline-aware single iteration: + +1. Build a baseline view of the allowed artifacts +2. Create a temporary candidate sandbox +3. Run a task-specific mutator in the sandbox +4. Validate the candidate against mutation limits +5. Run and score the candidate in the sandbox +6. Keep or discard the candidate + +Run the deterministic sample task: + +```bash +uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml +``` + +The task writes structured iteration results to `work/results.jsonl`. + +Engine concepts: + +- `artifacts`: files that may be accepted back into the main workspace +- `mutation`: file-count and line-count limits for candidate changes +- `mutator`: command that generates a candidate inside the sandbox +- `runner`: command that evaluates a candidate +- `scorer`: command that returns a structured score payload +- `policy`: keep or discard logic based on objective and constraints +``` + +- [ ] **Step 2: Review the README update for consistency** + +Read: `README.md` +Expected: the original training flow remains intact, and the engine section now describes the baseline-aware single iteration accurately + +- [ ] **Step 3: Commit the README update** + +```bash +git add README.md +git commit -m "docs: document baseline-aware single iteration" +``` + +## Final Verification + +- [ ] **Step 1: Run the targeted test suite** + +Run: `uv run python -m unittest tests.test_task_loader tests.test_artifact_manager tests.test_execution_pipeline tests.test_mutation_engine tests.test_orchestrator -v` +Expected: `OK` + +- [ ] **Step 2: Run the sample task manually** + +Run: `uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml` +Expected: +- exit code `0` +- a new JSON line in `work/results.jsonl` +- `tasks/skill-quality/fixtures/SKILL.md` updated only if the candidate is kept + +- [ ] **Step 3: Inspect the latest record** + +Read: `work/results.jsonl` +Expected latest record fields: +- `task_id` +- `status` +- `reason` +- `candidate_score` +- `diff_summary` + +- [ ] **Step 4: Commit the final verified state** + +```bash +git add README.md engine scripts tasks tests +git commit -m "feat: ship baseline-aware single-iteration orchestrator" +``` + +- [ ] **Step 2: Run the orchestrator tests to verify they fail** + +Run: `uv run python -m unittest tests.test_orchestrator -v` +Expected: FAIL with `ModuleNotFoundError: No module named 'engine.orchestrator'` + +- [ ] **Step 3: Implement the orchestrator** + +```python +# engine/orchestrator.py +from __future__ import annotations + +import shutil +import tempfile +from pathlib import Path + +from engine.artifact_manager import ArtifactManager +from engine.decision_engine import decide_candidate +from engine.models import DecisionResult, ScoreResult, TaskSpec +from engine.mutation_engine import MutationValidationError, validate_candidate_changes +from engine.runner import run_command +from engine.scorer import parse_score_output + + +def _copy_repo_to_sandbox(repo_root: Path, sandbox_root: Path) -> None: + for child in repo_root.iterdir(): + if child.name == ".git": + continue + target = sandbox_root / child.name + if child.is_dir(): + shutil.copytree(child, target, dirs_exist_ok=True) + else: + shutil.copy2(child, target) + + +def _sync_artifacts_back(task: TaskSpec, sandbox_task: TaskSpec) -> None: + source_manager = ArtifactManager(sandbox_task) + target_manager = ArtifactManager(task) + target_snapshot = target_manager.snapshot() + + for path in source_manager.resolve_paths(): + relative = path.relative_to(sandbox_task.root_dir) + target_path = task.root_dir / relative + target_path.parent.mkdir(parents=True, exist_ok=True) + with path.open("r", encoding="utf-8", newline="") as src: + with target_path.open("w", encoding="utf-8", newline="") as dst: + dst.write(src.read()) + + for baseline_path, baseline_text in target_snapshot.file_contents.items(): + if baseline_path.exists(): + continue + baseline_path.parent.mkdir(parents=True, exist_ok=True) + with baseline_path.open("w", encoding="utf-8", newline="") as handle: + handle.write(baseline_text) + + +def run_single_iteration(task: TaskSpec, baseline_score: float | ScoreResult | None) -> DecisionResult: + repo_root = task.root_dir.parent.parent if task.root_dir.name == "skill-quality" else task.root_dir + with tempfile.TemporaryDirectory(prefix="artifact-loop-") as tmp: + sandbox_root = Path(tmp) + _copy_repo_to_sandbox(repo_root, sandbox_root) + + from engine.task_loader import load_task + + sandbox_task_path = sandbox_root / task.root_dir.relative_to(repo_root) / "task.yaml" + sandbox_task = load_task(sandbox_task_path) + baseline_snapshot = ArtifactManager(task).snapshot() + + mutator_result = run_command( + sandbox_task.mutator.command, + (sandbox_root / sandbox_task.mutator.cwd).resolve(), + sandbox_task.mutator.timeout_seconds, + ) + if mutator_result.exit_code != 0: + return DecisionResult( + status="crash", + reason=f"mutator failed with exit code {mutator_result.exit_code}", + baseline_score=None if baseline_score is None or isinstance(baseline_score, ScoreResult) else baseline_score, + candidate_score=None, + ) + + try: + validate_candidate_changes( + sandbox_task, + baseline_snapshot, + sandbox_task.root_dir, + ) + except MutationValidationError as exc: + return DecisionResult( + status="discard", + reason=str(exc), + baseline_score=None if baseline_score is None or isinstance(baseline_score, ScoreResult) else baseline_score, + candidate_score=None, + ) + + run_result = run_command( + sandbox_task.runner.command, + (sandbox_root / sandbox_task.runner.cwd).resolve(), + sandbox_task.runner.timeout_seconds, + ) + if run_result.exit_code != 0: + return DecisionResult( + status="crash", + reason=f"command failed with exit code {run_result.exit_code}", + baseline_score=None if baseline_score is None or isinstance(baseline_score, ScoreResult) else baseline_score, + candidate_score=None, + ) + + scorer_result = run_command( + sandbox_task.scorer.command, + sandbox_root, + sandbox_task.runner.timeout_seconds, + ) + if scorer_result.exit_code != 0: + return DecisionResult( + status="crash", + reason=f"scorer failed with exit code {scorer_result.exit_code}", + baseline_score=None if baseline_score is None or isinstance(baseline_score, ScoreResult) else baseline_score, + candidate_score=None, + ) + + score_result = parse_score_output( + scorer_result.stdout, + score_field=sandbox_task.scorer.parse.score_field, + metrics_field=sandbox_task.scorer.parse.metrics_field, + ) + decision = decide_candidate( + baseline=baseline_score, + candidate=score_result, + objective=sandbox_task.objective, + constraints=sandbox_task.constraints, + tie_breakers=sandbox_task.policy.tie_breakers, + run_result=run_result, + ) + + if decision.status == "keep": + _sync_artifacts_back(task, sandbox_task) + + return decision +``` + +- [ ] **Step 4: Run the orchestrator tests to verify they pass** + +Run: `uv run python -m unittest tests.test_orchestrator -v` +Expected: `OK` + +- [ ] **Step 5: Commit the orchestrator** + +```bash +git add engine/orchestrator.py tests/test_orchestrator.py +git commit -m "feat: add single-iteration orchestrator" +``` + +## Task 3: Make Mutation Validation Baseline-Aware + +**Files:** +- Modify: `engine/mutation_engine.py` +- Modify: `tests/test_mutation_engine.py` +- Test: `tests/test_mutation_engine.py` + +- [ ] **Step 1: Add a failing cross-workspace validation test** + +```python +# tests/test_mutation_engine.py +from pathlib import Path +import shutil +import tempfile +import unittest + +from engine.artifact_manager import ArtifactManager +from engine.models import ( + ArtifactSpec, + BudgetSpec, + ConstraintSpec, + LoggingSpec, + MutationSpec, + MutatorSpec, + ObjectiveSpec, + PolicySpec, + RunnerSpec, + ScorerParseSpec, + ScorerSpec, + TaskSpec, +) +from engine.mutation_engine import MutationValidationError, validate_candidate_changes + + +def _make_task(root_dir: Path) -> TaskSpec: + return TaskSpec( + id="mutation-test", + description="Mutation validation fixture.", + artifacts=ArtifactSpec(include=["fixtures/*"], exclude=[], max_files_per_iteration=1), + mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=1), + mutator=MutatorSpec(type="command", command="python mutate.py", cwd=".", timeout_seconds=30), + runner=RunnerSpec(command="python -c \"print('runner ok')\"", cwd=".", timeout_seconds=30), + scorer=ScorerSpec( + type="command", + command="python -c \"print('{\\\"score\\\": 1.0, \\\"metrics\\\": {}}')\"", + parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"), + ), + objective=ObjectiveSpec(primary_metric="score", direction="maximize"), + constraints=[], + policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"), + budget=BudgetSpec(max_iterations=1, max_failures=1), + logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"), + root_dir=root_dir, + ) + + +class MutationEngineTest(unittest.TestCase): + def test_rejects_too_many_changed_lines(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root_dir = Path(tmp) + fixture_dir = root_dir / "fixtures" + fixture_dir.mkdir(parents=True) + target = fixture_dir / "note.md" + target.write_text("line 1\nline 2\n", encoding="utf-8") + task = _make_task(root_dir) + snapshot = ArtifactManager(task).snapshot() + target.write_text("line 1\nline 2\nline 3\n", encoding="utf-8") + with self.assertRaises(MutationValidationError): + validate_candidate_changes(task, snapshot, root_dir) + + def test_rejects_disallowed_extension(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root_dir = Path(tmp) + fixture_dir = root_dir / "fixtures" + fixture_dir.mkdir(parents=True) + target = fixture_dir / "note.md" + target.write_text("line 1\n", encoding="utf-8") + task = _make_task(root_dir) + snapshot = ArtifactManager(task).snapshot() + target.unlink() + (fixture_dir / "note.txt").write_text("line 1 changed\n", encoding="utf-8") + with self.assertRaises(MutationValidationError): + validate_candidate_changes(task, snapshot, root_dir) + + def test_rejects_candidate_workspace_changes_against_baseline_snapshot(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + baseline_root = Path(tmp) / "baseline" + candidate_root = Path(tmp) / "candidate" + (baseline_root / "fixtures").mkdir(parents=True) + (baseline_root / "fixtures" / "note.md").write_text("base\n", encoding="utf-8") + shutil.copytree(baseline_root, candidate_root) + task = _make_task(candidate_root) + baseline_task = _make_task(baseline_root) + snapshot = ArtifactManager(baseline_task).snapshot() + (candidate_root / "fixtures" / "note.md").write_text("base\nextra\n", encoding="utf-8") + with self.assertRaises(MutationValidationError): + validate_candidate_changes(task, snapshot, candidate_root) + + +if __name__ == "__main__": + unittest.main() +``` + +- [ ] **Step 2: Run the task loader tests to verify they fail** + +Run: `uv run python -m unittest tests.test_task_loader -v` +Expected: FAIL because `TaskSpec` has no `mutator` field and the loader does not parse it yet + +- [ ] **Step 3: Extend the shared models with `MutatorSpec`** + +```python +# engine/models.py +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +@dataclass(frozen=True) +class ArtifactSpec: + include: list[str] + exclude: list[str] + max_files_per_iteration: int + + +@dataclass(frozen=True) +class MutationSpec: + mode: str + allowed_file_types: list[str] + max_changed_lines: int + + +@dataclass(frozen=True) +class MutatorSpec: + type: str + command: str + cwd: str + timeout_seconds: int + + +@dataclass(frozen=True) +class RunnerSpec: + command: str + cwd: str + timeout_seconds: int + + +@dataclass(frozen=True) +class ScorerParseSpec: + format: str + score_field: str + metrics_field: str + + +@dataclass(frozen=True) +class ScorerSpec: + type: str + command: str + parse: ScorerParseSpec + + +@dataclass(frozen=True) +class ObjectiveSpec: + primary_metric: str + direction: str + + +@dataclass(frozen=True) +class ConstraintSpec: + metric: str + op: str + value: Any + + +@dataclass(frozen=True) +class PolicySpec: + keep_if: str + tie_breakers: list[dict[str, str]] + on_failure: str + + +@dataclass(frozen=True) +class BudgetSpec: + max_iterations: int + max_failures: int + + +@dataclass(frozen=True) +class LoggingSpec: + results_file: str + candidate_dir: str + + +@dataclass(frozen=True) +class TaskSpec: + id: str + description: str + artifacts: ArtifactSpec + mutation: MutationSpec + mutator: MutatorSpec + runner: RunnerSpec + scorer: ScorerSpec + objective: ObjectiveSpec + constraints: list[ConstraintSpec] + policy: PolicySpec + budget: BudgetSpec + logging: LoggingSpec + root_dir: Path + + +@dataclass(frozen=True) +class BaselineSnapshot: + file_contents: dict[Path, str] + file_hashes: dict[Path, str] + + +@dataclass(frozen=True) +class RunResult: + command: str + cwd: Path + exit_code: int + runtime_seconds: float + stdout: str + stderr: str + + +@dataclass(frozen=True) +class ScoreResult: + primary_score: float + metrics: dict[str, Any] + raw_output: dict[str, Any] + + +@dataclass(frozen=True) +class DecisionResult: + status: str + reason: str + baseline_score: float | None + candidate_score: float | None + constraint_failures: list[str] = field(default_factory=list) +``` + +- [ ] **Step 4: Parse and validate the mutator block** + +```python +# engine/task_loader.py +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import yaml + +from engine.models import ( + ArtifactSpec, + BudgetSpec, + ConstraintSpec, + LoggingSpec, + MutationSpec, + MutatorSpec, + ObjectiveSpec, + PolicySpec, + RunnerSpec, + ScorerParseSpec, + ScorerSpec, + TaskSpec, +) + + +class TaskValidationError(ValueError): + pass + + +def _require_mapping(value: Any, path: str) -> dict[str, Any]: + if not isinstance(value, dict): + raise TaskValidationError(f"{path} must be a mapping") + return value + + +def _require_list(value: Any, path: str) -> list[Any]: + if not isinstance(value, list): + raise TaskValidationError(f"{path} must be a list") + return value + + +def _require_value(mapping: dict[str, Any], key: str) -> Any: + if key not in mapping: + raise TaskValidationError(f"missing required field: {key}") + return mapping[key] + + +def load_task(task_path: Path) -> TaskSpec: + try: + task_data = yaml.safe_load(task_path.read_text(encoding="utf-8")) + except yaml.YAMLError as exc: + raise TaskValidationError(str(exc)) from exc + + def _require_str(mapping: dict[str, Any], key: str, path: str) -> str: + value = _require_value(mapping, key) + if not isinstance(value, str): + raise TaskValidationError(f"{path}.{key} must be a string") + return value + + def _require_int(mapping: dict[str, Any], key: str, path: str) -> int: + value = _require_value(mapping, key) + if not isinstance(value, int) or isinstance(value, bool): + raise TaskValidationError(f"{path}.{key} must be an integer") + return value + + def _require_str_list(mapping: dict[str, Any], key: str, path: str) -> list[str]: + items = _require_list(_require_value(mapping, key), f"{path}.{key}") + result: list[str] = [] + for index, item in enumerate(items): + if not isinstance(item, str): + raise TaskValidationError(f"{path}.{key}[{index}] must be a string") + result.append(item) + return result + + def _require_tie_breakers(mapping: dict[str, Any], key: str, path: str) -> list[dict[str, str]]: + items = _require_list(_require_value(mapping, key), f"{path}.{key}") + result: list[dict[str, str]] = [] + for index, item in enumerate(items): + entry = _require_mapping(item, f"{path}.{key}[{index}]") + result.append({str(k): str(v) for k, v in entry.items()}) + return result + + root = _require_mapping(task_data, "task") + + artifacts_data = _require_mapping(_require_value(root, "artifacts"), "task.artifacts") + mutation_data = _require_mapping(_require_value(root, "mutation"), "task.mutation") + mutator_data = _require_mapping(_require_value(root, "mutator"), "task.mutator") + runner_data = _require_mapping(_require_value(root, "runner"), "task.runner") + scorer_data = _require_mapping(_require_value(root, "scorer"), "task.scorer") + scorer_parse_data = _require_mapping(_require_value(scorer_data, "parse"), "task.scorer.parse") + objective_data = _require_mapping(_require_value(root, "objective"), "task.objective") + policy_data = _require_mapping(_require_value(root, "policy"), "task.policy") + budget_data = _require_mapping(_require_value(root, "budget"), "task.budget") + logging_data = _require_mapping(_require_value(root, "logging"), "task.logging") + + direction = _require_str(objective_data, "direction", "task.objective") + if direction not in {"maximize", "minimize"}: + raise TaskValidationError("task.objective.direction must be maximize or minimize") + + mode = _require_str(mutation_data, "mode", "task.mutation") + if mode != "direct_edit": + raise TaskValidationError("task.mutation.mode must be direct_edit") + + mutator_type = _require_str(mutator_data, "type", "task.mutator") + if mutator_type != "command": + raise TaskValidationError("task.mutator.type must be command") + + scorer_type = _require_str(scorer_data, "type", "task.scorer") + if scorer_type != "command": + raise TaskValidationError("task.scorer.type must be command") + + parse_format = _require_str(scorer_parse_data, "format", "task.scorer.parse") + if parse_format != "json": + raise TaskValidationError("task.scorer.parse.format must be json") + + constraints_data = _require_list(_require_value(root, "constraints"), "task.constraints") + constraints = [] + for index, item in enumerate(constraints_data): + constraint_data = _require_mapping(item, f"task.constraints[{index}]") + op = _require_str(constraint_data, "op", f"task.constraints[{index}]") + if op not in {"<=", ">=", "=="}: + raise TaskValidationError(f"task.constraints[{index}].op must be <=, >=, or ==") + constraints.append( + ConstraintSpec( + metric=_require_str(constraint_data, "metric", f"task.constraints[{index}]"), + op=op, + value=_require_value(constraint_data, "value"), + ) + ) + + keep_if = _require_str(policy_data, "keep_if", "task.policy") + if keep_if != "better_primary": + raise TaskValidationError("task.policy.keep_if must be better_primary") + + on_failure = _require_str(policy_data, "on_failure", "task.policy") + if on_failure != "discard": + raise TaskValidationError("task.policy.on_failure must be discard") + + return TaskSpec( + id=_require_str(root, "id", "task"), + description=_require_str(root, "description", "task"), + artifacts=ArtifactSpec( + include=_require_str_list(artifacts_data, "include", "task.artifacts"), + exclude=_require_str_list(artifacts_data, "exclude", "task.artifacts"), + max_files_per_iteration=_require_int(artifacts_data, "max_files_per_iteration", "task.artifacts"), + ), + mutation=MutationSpec( + mode=mode, + allowed_file_types=_require_str_list(mutation_data, "allowed_file_types", "task.mutation"), + max_changed_lines=_require_int(mutation_data, "max_changed_lines", "task.mutation"), + ), + mutator=MutatorSpec( + type=mutator_type, + command=_require_str(mutator_data, "command", "task.mutator"), + cwd=_require_str(mutator_data, "cwd", "task.mutator"), + timeout_seconds=_require_int(mutator_data, "timeout_seconds", "task.mutator"), + ), + runner=RunnerSpec( + command=_require_str(runner_data, "command", "task.runner"), + cwd=_require_str(runner_data, "cwd", "task.runner"), + timeout_seconds=_require_int(runner_data, "timeout_seconds", "task.runner"), + ), + scorer=ScorerSpec( + type=scorer_type, + command=_require_str(scorer_data, "command", "task.scorer"), + parse=ScorerParseSpec( + format=parse_format, + score_field=_require_str(scorer_parse_data, "score_field", "task.scorer.parse"), + metrics_field=_require_str(scorer_parse_data, "metrics_field", "task.scorer.parse"), + ), + ), + objective=ObjectiveSpec( + primary_metric=_require_str(objective_data, "primary_metric", "task.objective"), + direction=direction, + ), + constraints=constraints, + policy=PolicySpec( + keep_if=keep_if, + tie_breakers=_require_tie_breakers(policy_data, "tie_breakers", "task.policy"), + on_failure=on_failure, + ), + budget=BudgetSpec( + max_iterations=_require_int(budget_data, "max_iterations", "task.budget"), + max_failures=_require_int(budget_data, "max_failures", "task.budget"), + ), + logging=LoggingSpec( + results_file=_require_str(logging_data, "results_file", "task.logging"), + candidate_dir=_require_str(logging_data, "candidate_dir", "task.logging"), + ), + root_dir=task_path.parent, + ) +``` + +- [ ] **Step 5: Run the task loader tests to verify they pass** + +Run: `uv run python -m unittest tests.test_task_loader -v` +Expected: `OK` + +- [ ] **Step 6: Commit the schema extension** + +```bash +git add engine/models.py engine/task_loader.py tests/test_task_loader.py +git commit -m "feat: add mutator spec to task schema" +``` + +## Task 2: Add The Baseline-Aware Orchestrator Core + +**Files:** +- Create: `engine/orchestrator.py` +- Create: `tests/test_orchestrator.py` +- Test: `tests/test_orchestrator.py` + +- [ ] **Step 1: Write failing orchestrator tests** + +```python +# tests/test_orchestrator.py +from pathlib import Path +import tempfile +import unittest + +from engine.orchestrator import run_single_iteration +from engine.models import ( + ArtifactSpec, + BudgetSpec, + ConstraintSpec, + LoggingSpec, + MutationSpec, + MutatorSpec, + ObjectiveSpec, + PolicySpec, + RunnerSpec, + ScorerParseSpec, + ScorerSpec, + TaskSpec, +) + + +def make_task(root_dir: Path) -> TaskSpec: + return TaskSpec( + id="demo", + description="Demo task", + artifacts=ArtifactSpec(include=["task/*.md"], exclude=[], max_files_per_iteration=1), + mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=20), + mutator=MutatorSpec( + type="command", + command="python scripts/mutate_demo.py --artifact task/sample.md", + cwd=".", + timeout_seconds=30, + ), + runner=RunnerSpec( + command="python scripts/evaluate_demo.py --artifact task/sample.md --output work/run.json", + cwd=".", + timeout_seconds=30, + ), + scorer=ScorerSpec( + type="command", + command="python scripts/score_demo.py --input work/run.json", + parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"), + ), + objective=ObjectiveSpec(primary_metric="score", direction="maximize"), + constraints=[], + policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"), + budget=BudgetSpec(max_iterations=1, max_failures=1), + logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"), + root_dir=root_dir, + ) + + +class OrchestratorTest(unittest.TestCase): + def test_discard_leaves_main_workspace_unchanged(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + (root / "task").mkdir() + (root / "scripts").mkdir() + (root / "work").mkdir() + (root / "task" / "sample.md").write_text("# Original\n", encoding="utf-8") + (root / "scripts" / "mutate_demo.py").write_text( + "from pathlib import Path\n" + "import argparse\n" + "p=argparse.ArgumentParser(); p.add_argument('--artifact'); args=p.parse_args()\n" + "Path(args.artifact).write_text('# Candidate\\n', encoding='utf-8')\n", + encoding="utf-8", + ) + (root / "scripts" / "evaluate_demo.py").write_text( + "from pathlib import Path\n" + "import argparse, json\n" + "p=argparse.ArgumentParser(); p.add_argument('--artifact'); p.add_argument('--output'); args=p.parse_args()\n" + "payload={'score': 0.5, 'metrics': {}}\n" + "Path(args.output).parent.mkdir(parents=True, exist_ok=True)\n" + "Path(args.output).write_text(json.dumps(payload), encoding='utf-8')\n", + encoding="utf-8", + ) + (root / "scripts" / "score_demo.py").write_text( + "from pathlib import Path\n" + "import argparse\n" + "p=argparse.ArgumentParser(); p.add_argument('--input'); args=p.parse_args()\n" + "print(Path(args.input).read_text(encoding='utf-8'))\n", + encoding="utf-8", + ) + decision = run_single_iteration(make_task(root), baseline_score=1.0) + self.assertEqual(decision.status, "discard") + self.assertEqual((root / "task" / "sample.md").read_text(encoding="utf-8"), "# Original\n") + + def test_keep_syncs_candidate_back(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + (root / "task").mkdir() + (root / "scripts").mkdir() + (root / "work").mkdir() + (root / "task" / "sample.md").write_text("# Original\n", encoding="utf-8") + (root / "scripts" / "mutate_demo.py").write_text( + "from pathlib import Path\n" + "import argparse\n" + "p=argparse.ArgumentParser(); p.add_argument('--artifact'); args=p.parse_args()\n" + "Path(args.artifact).write_text('# Candidate\\n', encoding='utf-8')\n", + encoding="utf-8", + ) + (root / "scripts" / "evaluate_demo.py").write_text( + "from pathlib import Path\n" + "import argparse, json\n" + "p=argparse.ArgumentParser(); p.add_argument('--artifact'); p.add_argument('--output'); args=p.parse_args()\n" + "payload={'score': 2.0, 'metrics': {}}\n" + "Path(args.output).parent.mkdir(parents=True, exist_ok=True)\n" + "Path(args.output).write_text(json.dumps(payload), encoding='utf-8')\n", + encoding="utf-8", + ) + (root / "scripts" / "score_demo.py").write_text( + "from pathlib import Path\n" + "import argparse\n" + "p=argparse.ArgumentParser(); p.add_argument('--input'); args=p.parse_args()\n" + "print(Path(args.input).read_text(encoding='utf-8'))\n", + encoding="utf-8", + ) + decision = run_single_iteration(make_task(root), baseline_score=1.0) + self.assertEqual(decision.status, "keep") + self.assertEqual((root / "task" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n") + + +if __name__ == "__main__": + unittest.main() +``` diff --git a/uv.lock b/uv.lock index c840d62..561dbb7 100644 --- a/uv.lock +++ b/uv.lock @@ -53,6 +53,7 @@ dependencies = [ { name = "pandas", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "pandas", version = "3.0.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "pyarrow" }, + { name = "pyyaml" }, { name = "requests" }, { name = "rustbpe" }, { name = "tiktoken" }, @@ -66,6 +67,7 @@ requires-dist = [ { name = "numpy", specifier = ">=2.2.6" }, { name = "pandas", specifier = ">=2.3.3" }, { name = "pyarrow", specifier = ">=21.0.0" }, + { name = "pyyaml", specifier = ">=6.0.2" }, { name = "requests", specifier = ">=2.32.0" }, { name = "rustbpe", specifier = ">=0.1.0" }, { name = "tiktoken", specifier = ">=0.11.0" },