CommonAutoRearsh/docs/superpowers/plans/2026-04-02-artifact-loop-engine.md

47 KiB

Artifact Loop Engine Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build a reusable optimization engine for editable text artifacts with declarative task specs, structured scoring, strict keep/discard policy, and one working skill-quality sample task.

Architecture: Add a small engine/ package that owns task parsing, artifact safety, execution, scoring, and decisions. Drive the loop from a single CLI in scripts/run_task.py, prove it with a deterministic skill-quality task, then add a bounded mutation layer that validates and accepts agent edits without opening the whole repository.

Tech Stack: Python 3.10+, standard library, PyYAML, uv, unittest


File Map

New Files

  • engine/__init__.py - package marker
  • engine/models.py - dataclasses for task specs, results, and decisions
  • engine/task_loader.py - YAML parsing and validation
  • engine/artifact_manager.py - artifact resolution, snapshots, diff summaries, restore
  • engine/runner.py - subprocess runner with timeout and log capture
  • engine/scorer.py - command-based scorer and JSON normalization
  • engine/decision_engine.py - objective, constraints, and keep/discard decisions
  • engine/mutation_engine.py - bounded mutation validation and optional external mutator hook
  • scripts/run_task.py - top-level orchestration CLI
  • scripts/evaluate_skill_task.py - deterministic sample task runner
  • scripts/score_skill_task.py - deterministic sample task scorer
  • tasks/skill-quality/task.yaml - sample task spec
  • tasks/skill-quality/rubric.md - sample evaluation rubric
  • tasks/skill-quality/prompt.md - sample mutation guidance
  • tasks/skill-quality/fixtures/SKILL.md - sample artifact under optimization
  • tests/__init__.py - package marker for unittest
  • tests/test_task_loader.py - task loader coverage
  • tests/test_artifact_manager.py - snapshot, diff, restore coverage
  • tests/test_execution_pipeline.py - runner, scorer, decision, and CLI coverage
  • tests/test_mutation_engine.py - mutation guardrail coverage

Modified Files

  • pyproject.toml - add PyYAML
  • README.md - document the new engine workflow and sample task

Task 1: Bootstrap The Engine Package

Files:

  • Modify: pyproject.toml

  • Create: engine/__init__.py

  • Create: engine/models.py

  • Create: tests/__init__.py

  • Test: tests/test_task_loader.py

  • Step 1: Write the failing model smoke test

# tests/test_task_loader.py
from pathlib import Path
import tempfile
import unittest

from engine.task_loader import load_task


class TaskLoaderSmokeTest(unittest.TestCase):
    def test_loads_minimal_task(self) -> None:
        task_yaml = """
id: demo
description: Demo task
artifacts:
  include:
    - tasks/demo/sample.txt
  exclude: []
  max_files_per_iteration: 1
mutation:
  mode: direct_edit
  allowed_file_types: [".txt"]
  max_changed_lines: 10
runner:
  command: "python -c \\"print('run')\\""
  cwd: "."
  timeout_seconds: 10
scorer:
  type: command
  command: "python -c \\"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\\""
  parse:
    format: json
    score_field: "score"
    metrics_field: "metrics"
objective:
  primary_metric: score
  direction: maximize
constraints:
  - metric: violation_count
    op: "<="
    value: 0
policy:
  keep_if: better_primary
  tie_breakers: []
  on_failure: discard
budget:
  max_iterations: 3
  max_failures: 1
logging:
  results_file: work/results.jsonl
  candidate_dir: work/candidates
"""
        with tempfile.TemporaryDirectory() as tmp:
            task_path = Path(tmp) / "task.yaml"
            task_path.write_text(task_yaml, encoding="utf-8")
            task = load_task(task_path)
        self.assertEqual(task.id, "demo")
        self.assertEqual(task.objective.direction, "maximize")


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run the tests to verify they fail

Run: uv run python -m unittest tests.test_mutation_engine -v
Expected: ModuleNotFoundError: No module named 'engine.mutation_engine'

  • Step 3: Implement mutation validation and wire it into the CLI
# engine/mutation_engine.py
from __future__ import annotations

from difflib import unified_diff

from engine.models import BaselineSnapshot, TaskSpec


class MutationValidationError(ValueError):
    """Raised when a candidate edit exceeds task limits."""


def validate_candidate_changes(task: TaskSpec, snapshot: BaselineSnapshot) -> None:
    changed_files = 0
    changed_lines = 0

    for path, before in snapshot.file_contents.items():
        after = path.read_text(encoding="utf-8")
        if before == after:
            continue
        changed_files += 1
        if path.suffix not in task.mutation.allowed_file_types:
            raise MutationValidationError(f"disallowed file type: {path.suffix}")
        diff_lines = list(unified_diff(before.splitlines(), after.splitlines(), lineterm=""))
        changed_lines += sum(1 for line in diff_lines if line.startswith("+") or line.startswith("-"))

    if changed_files > task.artifacts.max_files_per_iteration:
        raise MutationValidationError("too many files changed")
    if changed_lines > task.mutation.max_changed_lines:
        raise MutationValidationError("too many changed lines")
# scripts/run_task.py
from __future__ import annotations

import argparse
import json
from pathlib import Path

from engine.artifact_manager import ArtifactManager
from engine.decision_engine import decide_candidate
from engine.mutation_engine import MutationValidationError, validate_candidate_changes
from engine.runner import run_command
from engine.scorer import parse_score_output
from engine.task_loader import load_task


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("--task", required=True)
    args = parser.parse_args()

    root_dir = Path.cwd()
    task_path = (root_dir / args.task).resolve()
    task = load_task(task_path)
    manager = ArtifactManager(task)
    snapshot = manager.snapshot()

    try:
        validate_candidate_changes(task, snapshot)
    except MutationValidationError as exc:
        decision_payload = {
            "task_id": task.id,
            "status": "discard",
            "reason": str(exc),
            "candidate_score": None,
            "diff_summary": manager.diff_summary(snapshot),
        }
        results_path = (root_dir / task.logging.results_file).resolve()
        results_path.parent.mkdir(parents=True, exist_ok=True)
        with results_path.open("a", encoding="utf-8") as handle:
            handle.write(json.dumps(decision_payload) + "\n")
        return 0

    run_result = run_command(
        command=task.runner.command,
        cwd=(root_dir / task.runner.cwd).resolve(),
        timeout_seconds=task.runner.timeout_seconds,
    )
    score_run = run_command(
        command=task.scorer.command,
        cwd=root_dir,
        timeout_seconds=task.runner.timeout_seconds,
    )
    score = parse_score_output(
        score_run.stdout,
        score_field=task.scorer.parse.score_field,
        metrics_field=task.scorer.parse.metrics_field,
    )
    decision = decide_candidate(
        baseline=None,
        candidate=score,
        objective=task.objective,
        constraints=task.constraints,
        tie_breakers=task.policy.tie_breakers,
        run_result=run_result,
    )
    results_path = (root_dir / task.logging.results_file).resolve()
    results_path.parent.mkdir(parents=True, exist_ok=True)
    with results_path.open("a", encoding="utf-8") as handle:
        handle.write(
            json.dumps(
                {
                    "task_id": task.id,
                    "status": decision.status,
                    "reason": decision.reason,
                    "candidate_score": decision.candidate_score,
                    "diff_summary": manager.diff_summary(snapshot),
                }
            )
            + "\n"
        )
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
  • Step 4: Run the mutation tests

Run: uv run python -m unittest tests.test_mutation_engine -v
Expected: OK

  • Step 5: Commit mutation validation
git add engine/mutation_engine.py scripts/run_task.py tests/test_mutation_engine.py
git commit -m "feat: add bounded mutation validation"

Task 7: Document The New Workflow

Files:

  • Modify: README.md

  • Test: none

  • Step 1: Update the README overview and quick start

## Artifact Loop Engine

This repository now also includes a generic optimization engine for editable text artifacts such as prompts, skills, config files, and small code paths.

### Sample task

Run the deterministic sample task:

```bash
uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml

The task writes structured iteration results to work/results.jsonl.

Engine concepts

  • artifacts: files the engine may inspect and compare
  • runner: command that executes a candidate
  • scorer: command that returns a structured score payload
  • policy: keep or discard logic based on objective and constraints

- [ ] **Step 2: Review the README change for consistency**

Read: `README.md`  
Expected: the original training workflow remains documented, and the new engine section does not claim unsupported features such as multi-agent project autonomy.

- [ ] **Step 3: Commit the docs update**

```bash
git add README.md
git commit -m "docs: add artifact loop engine usage"

Final Verification

  • Step 1: Run the targeted test suite

Run: uv run python -m unittest tests.test_task_loader tests.test_artifact_manager tests.test_execution_pipeline tests.test_mutation_engine -v
Expected: OK

  • Step 2: Run the sample task

Run: uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml
Expected: exit code 0 and a new line appended to work/results.jsonl

  • Step 3: Inspect the output record

Read: work/results.jsonl
Expected fields in the latest line:

  • task_id

  • status

  • reason

  • candidate_score

  • diff_summary

  • Step 4: Commit the final verified state

git add README.md pyproject.toml engine scripts tasks tests
git commit -m "feat: ship artifact loop engine v1"
  • Step 2: Run the smoke test to verify it fails

Run: uv run python -m unittest tests.test_task_loader -v
Expected: ModuleNotFoundError: No module named 'engine'

  • Step 3: Add the package scaffold and shared dataclasses
# engine/__init__.py
"""Artifact Loop Engine package."""
# engine/models.py
from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import Any


@dataclass(frozen=True)
class ArtifactSpec:
    include: list[str]
    exclude: list[str]
    max_files_per_iteration: int


@dataclass(frozen=True)
class MutationSpec:
    mode: str
    allowed_file_types: list[str]
    max_changed_lines: int


@dataclass(frozen=True)
class RunnerSpec:
    command: str
    cwd: str
    timeout_seconds: int


@dataclass(frozen=True)
class ScorerParseSpec:
    format: str
    score_field: str
    metrics_field: str


@dataclass(frozen=True)
class ScorerSpec:
    type: str
    command: str
    parse: ScorerParseSpec


@dataclass(frozen=True)
class ObjectiveSpec:
    primary_metric: str
    direction: str


@dataclass(frozen=True)
class ConstraintSpec:
    metric: str
    op: str
    value: Any


@dataclass(frozen=True)
class PolicySpec:
    keep_if: str
    tie_breakers: list[dict[str, str]]
    on_failure: str


@dataclass(frozen=True)
class BudgetSpec:
    max_iterations: int
    max_failures: int


@dataclass(frozen=True)
class LoggingSpec:
    results_file: str
    candidate_dir: str


@dataclass(frozen=True)
class TaskSpec:
    id: str
    description: str
    artifacts: ArtifactSpec
    mutation: MutationSpec
    runner: RunnerSpec
    scorer: ScorerSpec
    objective: ObjectiveSpec
    constraints: list[ConstraintSpec]
    policy: PolicySpec
    budget: BudgetSpec
    logging: LoggingSpec
    root_dir: Path


@dataclass(frozen=True)
class BaselineSnapshot:
    file_contents: dict[Path, str]
    file_hashes: dict[Path, str]


@dataclass(frozen=True)
class RunResult:
    command: str
    cwd: Path
    exit_code: int
    runtime_seconds: float
    stdout: str
    stderr: str


@dataclass(frozen=True)
class ScoreResult:
    primary_score: float
    metrics: dict[str, Any]
    raw_output: dict[str, Any]


@dataclass(frozen=True)
class DecisionResult:
    status: str
    reason: str
    baseline_score: float | None
    candidate_score: float | None
    constraint_failures: list[str] = field(default_factory=list)
# tests/__init__.py
"""Unit tests for the artifact loop engine."""
  • Step 4: Add PyYAML to dependencies
[project]
name = "autoresearch"
version = "0.1.0"
description = "Autonomous pretraining research swarm"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
    "kernels>=0.11.7",
    "matplotlib>=3.10.8",
    "numpy>=2.2.6",
    "pandas>=2.3.3",
    "pyarrow>=21.0.0",
    "PyYAML>=6.0.2",
    "requests>=2.32.0",
    "rustbpe>=0.1.0",
    "tiktoken>=0.11.0",
    "torch==2.9.1",
]
  • Step 5: Run the smoke test again

Run: uv run python -m unittest tests.test_task_loader -v
Expected: FAIL with ModuleNotFoundError: No module named 'engine.task_loader'

  • Step 6: Commit the bootstrap
git add pyproject.toml engine/__init__.py engine/models.py tests/__init__.py tests/test_task_loader.py
git commit -m "feat: bootstrap artifact loop engine package"

Task 2: Implement YAML Task Loading And Validation

Files:

  • Create: engine/task_loader.py

  • Modify: tests/test_task_loader.py

  • Test: tests/test_task_loader.py

  • Step 1: Expand the failing tests to cover validation

# tests/test_task_loader.py
from pathlib import Path
import tempfile
import unittest

from engine.task_loader import TaskValidationError, load_task


VALID_TASK = """
id: demo
description: Demo task
artifacts:
  include:
    - tasks/demo/sample.txt
  exclude: []
  max_files_per_iteration: 1
mutation:
  mode: direct_edit
  allowed_file_types: [".txt"]
  max_changed_lines: 10
runner:
  command: "python -c \\"print('run')\\""
  cwd: "."
  timeout_seconds: 10
scorer:
  type: command
  command: "python -c \\"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\\""
  parse:
    format: json
    score_field: "score"
    metrics_field: "metrics"
objective:
  primary_metric: score
  direction: maximize
constraints:
  - metric: violation_count
    op: "<="
    value: 0
policy:
  keep_if: better_primary
  tie_breakers: []
  on_failure: discard
budget:
  max_iterations: 3
  max_failures: 1
logging:
  results_file: work/results.jsonl
  candidate_dir: work/candidates
"""


class TaskLoaderTest(unittest.TestCase):
    def write_task(self, content: str) -> Path:
        temp_dir = tempfile.TemporaryDirectory()
        self.addCleanup(temp_dir.cleanup)
        task_path = Path(temp_dir.name) / "task.yaml"
        task_path.write_text(content, encoding="utf-8")
        return task_path

    def test_loads_minimal_task(self) -> None:
        task = load_task(self.write_task(VALID_TASK))
        self.assertEqual(task.id, "demo")
        self.assertEqual(task.artifacts.max_files_per_iteration, 1)
        self.assertEqual(task.constraints[0].metric, "violation_count")

    def test_rejects_missing_required_section(self) -> None:
        content = VALID_TASK.replace("objective:\n  primary_metric: score\n  direction: maximize\n", "")
        with self.assertRaises(TaskValidationError) as ctx:
            load_task(self.write_task(content))
        self.assertIn("objective", str(ctx.exception))

    def test_rejects_invalid_direction(self) -> None:
        content = VALID_TASK.replace("direction: maximize", "direction: sideways")
        with self.assertRaises(TaskValidationError) as ctx:
            load_task(self.write_task(content))
        self.assertIn("direction", str(ctx.exception))


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run the tests to verify they fail

Run: uv run python -m unittest tests.test_task_loader -v
Expected: ModuleNotFoundError: No module named 'engine.task_loader'

  • Step 3: Implement the loader and validator
# engine/task_loader.py
from __future__ import annotations

from pathlib import Path
from typing import Any

import yaml

from engine.models import (
    ArtifactSpec,
    BudgetSpec,
    ConstraintSpec,
    LoggingSpec,
    MutationSpec,
    ObjectiveSpec,
    PolicySpec,
    RunnerSpec,
    ScorerParseSpec,
    ScorerSpec,
    TaskSpec,
)


class TaskValidationError(ValueError):
    """Raised when a task spec is invalid."""


def _require_mapping(data: Any, name: str) -> dict[str, Any]:
    if not isinstance(data, dict):
        raise TaskValidationError(f"{name} must be a mapping")
    return data


def _require_list(data: Any, name: str) -> list[Any]:
    if not isinstance(data, list):
        raise TaskValidationError(f"{name} must be a list")
    return data


def _require_value(mapping: dict[str, Any], key: str) -> Any:
    if key not in mapping:
        raise TaskValidationError(f"missing required field: {key}")
    return mapping[key]


def load_task(task_path: Path) -> TaskSpec:
    raw = yaml.safe_load(task_path.read_text(encoding="utf-8"))
    data = _require_mapping(raw, "task")

    objective = _require_mapping(_require_value(data, "objective"), "objective")
    direction = _require_value(objective, "direction")
    if direction not in {"maximize", "minimize"}:
        raise TaskValidationError("objective.direction must be maximize or minimize")

    artifacts = _require_mapping(_require_value(data, "artifacts"), "artifacts")
    mutation = _require_mapping(_require_value(data, "mutation"), "mutation")
    runner = _require_mapping(_require_value(data, "runner"), "runner")
    scorer = _require_mapping(_require_value(data, "scorer"), "scorer")
    scorer_parse = _require_mapping(_require_value(scorer, "parse"), "scorer.parse")
    policy = _require_mapping(_require_value(data, "policy"), "policy")
    budget = _require_mapping(_require_value(data, "budget"), "budget")
    logging = _require_mapping(_require_value(data, "logging"), "logging")

    constraint_specs = []
    for item in _require_list(_require_value(data, "constraints"), "constraints"):
        mapping = _require_mapping(item, "constraint")
        constraint_specs.append(
            ConstraintSpec(
                metric=str(_require_value(mapping, "metric")),
                op=str(_require_value(mapping, "op")),
                value=_require_value(mapping, "value"),
            )
        )

    return TaskSpec(
        id=str(_require_value(data, "id")),
        description=str(_require_value(data, "description")),
        artifacts=ArtifactSpec(
            include=[str(item) for item in _require_list(_require_value(artifacts, "include"), "artifacts.include")],
            exclude=[str(item) for item in _require_list(artifacts.get("exclude", []), "artifacts.exclude")],
            max_files_per_iteration=int(_require_value(artifacts, "max_files_per_iteration")),
        ),
        mutation=MutationSpec(
            mode=str(_require_value(mutation, "mode")),
            allowed_file_types=[str(item) for item in _require_list(_require_value(mutation, "allowed_file_types"), "mutation.allowed_file_types")],
            max_changed_lines=int(_require_value(mutation, "max_changed_lines")),
        ),
        runner=RunnerSpec(
            command=str(_require_value(runner, "command")),
            cwd=str(_require_value(runner, "cwd")),
            timeout_seconds=int(_require_value(runner, "timeout_seconds")),
        ),
        scorer=ScorerSpec(
            type=str(_require_value(scorer, "type")),
            command=str(_require_value(scorer, "command")),
            parse=ScorerParseSpec(
                format=str(_require_value(scorer_parse, "format")),
                score_field=str(_require_value(scorer_parse, "score_field")),
                metrics_field=str(_require_value(scorer_parse, "metrics_field")),
            ),
        ),
        objective=ObjectiveSpec(
            primary_metric=str(_require_value(objective, "primary_metric")),
            direction=str(direction),
        ),
        constraints=constraint_specs,
        policy=PolicySpec(
            keep_if=str(_require_value(policy, "keep_if")),
            tie_breakers=[dict(item) for item in _require_list(policy.get("tie_breakers", []), "policy.tie_breakers")],
            on_failure=str(_require_value(policy, "on_failure")),
        ),
        budget=BudgetSpec(
            max_iterations=int(_require_value(budget, "max_iterations")),
            max_failures=int(_require_value(budget, "max_failures")),
        ),
        logging=LoggingSpec(
            results_file=str(_require_value(logging, "results_file")),
            candidate_dir=str(_require_value(logging, "candidate_dir")),
        ),
        root_dir=task_path.parent,
    )
  • Step 4: Run the task loader tests to verify they pass

Run: uv run python -m unittest tests.test_task_loader -v
Expected: OK

  • Step 5: Commit the task loader
git add engine/task_loader.py tests/test_task_loader.py
git commit -m "feat: add yaml task loader"

Task 3: Add Artifact Snapshot, Diff, And Restore

Files:

  • Create: engine/artifact_manager.py

  • Create: tests/test_artifact_manager.py

  • Test: tests/test_artifact_manager.py

  • Step 1: Write failing artifact manager tests

# tests/test_artifact_manager.py
from pathlib import Path
import tempfile
import unittest

from engine.artifact_manager import ArtifactManager
from engine.models import ArtifactSpec, BaselineSnapshot, TaskSpec
from engine.models import BudgetSpec, ConstraintSpec, LoggingSpec, MutationSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec


def make_task(root_dir: Path) -> TaskSpec:
    return TaskSpec(
        id="demo",
        description="Demo",
        artifacts=ArtifactSpec(include=["artifacts/*.md"], exclude=["artifacts/ignore.md"], max_files_per_iteration=1),
        mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=20),
        runner=RunnerSpec(command="python -c \"print('run')\"", cwd=".", timeout_seconds=10),
        scorer=ScorerSpec(
            type="command",
            command="python -c \"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\"",
            parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"),
        ),
        objective=ObjectiveSpec(primary_metric="score", direction="maximize"),
        constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)],
        policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"),
        budget=BudgetSpec(max_iterations=1, max_failures=1),
        logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"),
        root_dir=root_dir,
    )


class ArtifactManagerTest(unittest.TestCase):
    def test_snapshot_and_restore(self) -> None:
        with tempfile.TemporaryDirectory() as tmp:
            root = Path(tmp)
            artifact_dir = root / "artifacts"
            artifact_dir.mkdir()
            target = artifact_dir / "sample.md"
            target.write_text("hello\n", encoding="utf-8")
            manager = ArtifactManager(make_task(root))
            snapshot = manager.snapshot()
            target.write_text("changed\n", encoding="utf-8")
            manager.restore(snapshot)
            self.assertEqual(target.read_text(encoding="utf-8"), "hello\n")

    def test_diff_summary_contains_changed_line(self) -> None:
        with tempfile.TemporaryDirectory() as tmp:
            root = Path(tmp)
            artifact_dir = root / "artifacts"
            artifact_dir.mkdir()
            target = artifact_dir / "sample.md"
            target.write_text("before\n", encoding="utf-8")
            manager = ArtifactManager(make_task(root))
            snapshot = manager.snapshot()
            target.write_text("after\n", encoding="utf-8")
            summary = manager.diff_summary(snapshot)
            self.assertIn("-before", summary)
            self.assertIn("+after", summary)


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run the tests to verify they fail

Run: uv run python -m unittest tests.test_artifact_manager -v
Expected: ModuleNotFoundError: No module named 'engine.artifact_manager'

  • Step 3: Implement snapshot, diff, and restore
# engine/artifact_manager.py
from __future__ import annotations

from difflib import unified_diff
from fnmatch import fnmatch
import hashlib
from pathlib import Path

from engine.models import BaselineSnapshot, TaskSpec


class ArtifactManager:
    def __init__(self, task: TaskSpec) -> None:
        self.task = task

    def resolve_paths(self) -> list[Path]:
        matched: list[Path] = []
        for pattern in self.task.artifacts.include:
            matched.extend(self.task.root_dir.glob(pattern))
        files = [path for path in matched if path.is_file()]
        excluded = set()
        for path in files:
            relative = path.relative_to(self.task.root_dir).as_posix()
            if any(fnmatch(relative, pattern) for pattern in self.task.artifacts.exclude):
                excluded.add(path)
        resolved = [path for path in files if path not in excluded]
        return sorted(dict.fromkeys(resolved))

    def snapshot(self) -> BaselineSnapshot:
        file_contents: dict[Path, str] = {}
        file_hashes: dict[Path, str] = {}
        for path in self.resolve_paths():
            content = path.read_text(encoding="utf-8")
            file_contents[path] = content
            file_hashes[path] = hashlib.sha256(content.encode("utf-8")).hexdigest()
        return BaselineSnapshot(file_contents=file_contents, file_hashes=file_hashes)

    def restore(self, snapshot: BaselineSnapshot) -> None:
        for path, content in snapshot.file_contents.items():
            path.write_text(content, encoding="utf-8")

    def diff_summary(self, snapshot: BaselineSnapshot) -> str:
        chunks: list[str] = []
        for path, before in snapshot.file_contents.items():
            after = path.read_text(encoding="utf-8")
            if before == after:
                continue
            diff = unified_diff(
                before.splitlines(),
                after.splitlines(),
                fromfile=str(path),
                tofile=str(path),
                lineterm="",
            )
            chunks.append("\n".join(diff))
        return "\n\n".join(chunks)
  • Step 4: Run the artifact manager tests to verify they pass

Run: uv run python -m unittest tests.test_artifact_manager -v
Expected: OK

  • Step 5: Commit the artifact manager
git add engine/artifact_manager.py tests/test_artifact_manager.py
git commit -m "feat: add artifact snapshot and restore support"

Task 4: Implement Runner, Scorer, And Decision Engine

Files:

  • Create: engine/runner.py

  • Create: engine/scorer.py

  • Create: engine/decision_engine.py

  • Create: tests/test_execution_pipeline.py

  • Test: tests/test_execution_pipeline.py

  • Step 1: Write failing execution pipeline tests

# tests/test_execution_pipeline.py
from pathlib import Path
import tempfile
import unittest

from engine.decision_engine import decide_candidate
from engine.models import ConstraintSpec, ObjectiveSpec, RunResult, ScoreResult
from engine.runner import run_command
from engine.scorer import parse_score_output


class ExecutionPipelineTest(unittest.TestCase):
    def test_run_command_captures_stdout(self) -> None:
        with tempfile.TemporaryDirectory() as tmp:
            result = run_command("python -c \"print('ok')\"", Path(tmp), timeout_seconds=5)
        self.assertEqual(result.exit_code, 0)
        self.assertIn("ok", result.stdout)

    def test_parse_score_output_reads_primary_score(self) -> None:
        score = parse_score_output(
            '{"score": 4.5, "metrics": {"violation_count": 0}}',
            score_field="score",
            metrics_field="metrics",
        )
        self.assertEqual(score.primary_score, 4.5)
        self.assertEqual(score.metrics["violation_count"], 0)

    def test_decide_candidate_rejects_constraint_failures(self) -> None:
        decision = decide_candidate(
            baseline=3.0,
            candidate=ScoreResult(
                primary_score=5.0,
                metrics={"violation_count": 1},
                raw_output={"score": 5.0, "metrics": {"violation_count": 1}},
            ),
            objective=ObjectiveSpec(primary_metric="score", direction="maximize"),
            constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)],
            tie_breakers=[],
            run_result=RunResult(
                command="python -c \"print('ok')\"",
                cwd=Path("."),
                exit_code=0,
                runtime_seconds=0.1,
                stdout="ok\n",
                stderr="",
            ),
        )
        self.assertEqual(decision.status, "discard")
        self.assertIn("violation_count", decision.reason)


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run the tests to verify they fail

Run: uv run python -m unittest tests.test_execution_pipeline -v
Expected: ModuleNotFoundError for the new engine modules

  • Step 3: Implement subprocess execution
# engine/runner.py
from __future__ import annotations

from pathlib import Path
import subprocess
import time

from engine.models import RunResult


def run_command(command: str, cwd: Path, timeout_seconds: int) -> RunResult:
    start = time.perf_counter()
    completed = subprocess.run(
        command,
        cwd=str(cwd),
        shell=True,
        capture_output=True,
        text=True,
        encoding="utf-8",
        timeout=timeout_seconds,
        check=False,
    )
    runtime = time.perf_counter() - start
    return RunResult(
        command=command,
        cwd=cwd,
        exit_code=completed.returncode,
        runtime_seconds=runtime,
        stdout=completed.stdout,
        stderr=completed.stderr,
    )
# engine/scorer.py
from __future__ import annotations

import json

from engine.models import ScoreResult


def parse_score_output(output: str, score_field: str, metrics_field: str) -> ScoreResult:
    payload = json.loads(output)
    metrics = payload[metrics_field]
    return ScoreResult(
        primary_score=float(payload[score_field]),
        metrics=dict(metrics),
        raw_output=payload,
    )
# engine/decision_engine.py
from __future__ import annotations

from engine.models import ConstraintSpec, DecisionResult, ObjectiveSpec, RunResult, ScoreResult


def _constraint_failed(score: ScoreResult, constraint: ConstraintSpec) -> bool:
    value = score.metrics.get(constraint.metric)
    if constraint.op == "<=":
        return value > constraint.value
    if constraint.op == ">=":
        return value < constraint.value
    if constraint.op == "==":
        return value != constraint.value
    raise ValueError(f"unsupported constraint operator: {constraint.op}")


def decide_candidate(
    baseline: float | None,
    candidate: ScoreResult,
    objective: ObjectiveSpec,
    constraints: list[ConstraintSpec],
    tie_breakers: list[dict[str, str]],
    run_result: RunResult,
) -> DecisionResult:
    if run_result.exit_code != 0:
        return DecisionResult(status="crash", reason="runner exited with non-zero status", baseline_score=baseline, candidate_score=None)

    failures = [constraint.metric for constraint in constraints if _constraint_failed(candidate, constraint)]
    if failures:
        return DecisionResult(
            status="discard",
            reason=f"constraint failure: {', '.join(failures)}",
            baseline_score=baseline,
            candidate_score=candidate.primary_score,
            constraint_failures=failures,
        )

    if baseline is None:
        return DecisionResult(status="keep", reason="no baseline yet", baseline_score=None, candidate_score=candidate.primary_score)

    is_better = candidate.primary_score > baseline if objective.direction == "maximize" else candidate.primary_score < baseline
    if is_better:
        return DecisionResult(status="keep", reason="primary metric improved", baseline_score=baseline, candidate_score=candidate.primary_score)

    return DecisionResult(status="discard", reason="primary metric did not improve", baseline_score=baseline, candidate_score=candidate.primary_score)
  • Step 4: Run the execution pipeline tests to verify they pass

Run: uv run python -m unittest tests.test_execution_pipeline -v
Expected: OK

  • Step 5: Commit the execution core
git add engine/runner.py engine/scorer.py engine/decision_engine.py tests/test_execution_pipeline.py
git commit -m "feat: add execution, scoring, and decision modules"

Task 5: Build The CLI And A Deterministic Sample Task

Files:

  • Create: scripts/run_task.py

  • Create: scripts/evaluate_skill_task.py

  • Create: scripts/score_skill_task.py

  • Create: tasks/skill-quality/task.yaml

  • Create: tasks/skill-quality/rubric.md

  • Create: tasks/skill-quality/prompt.md

  • Create: tasks/skill-quality/fixtures/SKILL.md

  • Modify: tests/test_execution_pipeline.py

  • Test: tests/test_execution_pipeline.py

  • Step 1: Add a failing end-to-end CLI test

# tests/test_execution_pipeline.py
from pathlib import Path
import json
import subprocess
import tempfile
import textwrap
import unittest


class RunTaskCliTest(unittest.TestCase):
    def test_run_task_writes_results_jsonl(self) -> None:
        with tempfile.TemporaryDirectory() as tmp:
            root = Path(tmp)
            (root / "tasks" / "skill-quality" / "fixtures").mkdir(parents=True)
            (root / "work").mkdir()
            (root / "tasks" / "skill-quality" / "fixtures" / "SKILL.md").write_text(
                "# Skill\n\n## Goal\nWrite clear plans.\n",
                encoding="utf-8",
            )
            (root / "tasks" / "skill-quality" / "rubric.md").write_text(
                "Required headings: Goal, Constraints, Examples\n",
                encoding="utf-8",
            )
            (root / "tasks" / "skill-quality" / "prompt.md").write_text(
                "Keep the skill concise and structured.\n",
                encoding="utf-8",
            )
            (root / "tasks" / "skill-quality" / "task.yaml").write_text(
                textwrap.dedent(
                    '''
                    id: skill-quality
                    description: Score a skill file
                    artifacts:
                      include:
                        - fixtures/SKILL.md
                      exclude: []
                      max_files_per_iteration: 1
                    mutation:
                      mode: direct_edit
                      allowed_file_types: [".md"]
                      max_changed_lines: 20
                    runner:
                      command: "python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json"
                      cwd: "tasks/skill-quality"
                      timeout_seconds: 10
                    scorer:
                      type: command
                      command: "python scripts/score_skill_task.py --input work/skill-run.json"
                      parse:
                        format: json
                        score_field: score
                        metrics_field: metrics
                    objective:
                      primary_metric: score
                      direction: maximize
                    constraints:
                      - metric: violation_count
                        op: "<="
                        value: 0
                    policy:
                      keep_if: better_primary
                      tie_breakers: []
                      on_failure: discard
                    budget:
                      max_iterations: 1
                      max_failures: 1
                    logging:
                      results_file: work/results.jsonl
                      candidate_dir: work/candidates
                    '''
                ).strip(),
                encoding="utf-8",
            )
            result = subprocess.run(
                ["uv", "run", "python", "scripts/run_task.py", "--task", "tasks/skill-quality/task.yaml"],
                cwd=root,
                capture_output=True,
                text=True,
                encoding="utf-8",
                check=False,
            )
            self.assertEqual(result.returncode, 0, msg=result.stderr)
            results_path = root / "work" / "results.jsonl"
            self.assertTrue(results_path.exists())
            payload = json.loads(results_path.read_text(encoding="utf-8").splitlines()[0])
            self.assertEqual(payload["status"], "discard")
            self.assertGreater(payload["candidate_score"], 0)


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run the CLI test to verify it fails

Run: uv run python -m unittest tests.test_execution_pipeline.RunTaskCliTest -v
Expected: FAIL with No such file or directory: scripts/run_task.py

  • Step 3: Implement the CLI and deterministic sample task
# scripts/evaluate_skill_task.py
from __future__ import annotations

import argparse
import json
from pathlib import Path


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("--task-dir", required=True)
    parser.add_argument("--artifact", required=True)
    parser.add_argument("--output", required=True)
    args = parser.parse_args()

    task_dir = Path(args.task_dir).resolve()
    artifact_path = (task_dir / args.artifact).resolve()
    rubric_text = (task_dir / "rubric.md").read_text(encoding="utf-8")
    artifact_text = artifact_path.read_text(encoding="utf-8")

    required_headings = ["## Goal", "## Constraints", "## Examples"]
    present = sum(1 for heading in required_headings if heading in artifact_text)
    coverage = present / len(required_headings)
    lines = [line.strip() for line in artifact_text.splitlines() if line.strip()]
    average_line_length = sum(len(line) for line in lines) / max(len(lines), 1)
    clarity = max(0.0, 1.0 - max(0.0, average_line_length - 80.0) / 120.0)
    violation_count = 0 if "Do not" in artifact_text else 1
    score = round((coverage * 70.0) + (clarity * 30.0), 4)

    payload = {
        "score": score,
        "metrics": {
            "coverage": round(coverage, 4),
            "clarity": round(clarity, 4),
            "violation_count": violation_count,
            "length_tokens": len(artifact_text.split()),
            "rubric_excerpt": rubric_text[:80],
        },
    }
    output_path = Path(args.output).resolve()
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(payload), encoding="utf-8")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
# scripts/score_skill_task.py
from __future__ import annotations

import argparse
from pathlib import Path


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    args = parser.parse_args()
    payload = Path(args.input).read_text(encoding="utf-8")
    print(payload)
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
# scripts/run_task.py
from __future__ import annotations

import argparse
import json
from pathlib import Path

from engine.artifact_manager import ArtifactManager
from engine.decision_engine import decide_candidate
from engine.runner import run_command
from engine.scorer import parse_score_output
from engine.task_loader import load_task


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("--task", required=True)
    args = parser.parse_args()

    root_dir = Path.cwd()
    task_path = (root_dir / args.task).resolve()
    task = load_task(task_path)
    manager = ArtifactManager(task)
    snapshot = manager.snapshot()

    run_result = run_command(
        command=task.runner.command,
        cwd=(root_dir / task.runner.cwd).resolve(),
        timeout_seconds=task.runner.timeout_seconds,
    )
    score_run = run_command(
        command=task.scorer.command,
        cwd=root_dir,
        timeout_seconds=task.runner.timeout_seconds,
    )
    score = parse_score_output(
        score_run.stdout,
        score_field=task.scorer.parse.score_field,
        metrics_field=task.scorer.parse.metrics_field,
    )
    decision = decide_candidate(
        baseline=None,
        candidate=score,
        objective=task.objective,
        constraints=task.constraints,
        tie_breakers=task.policy.tie_breakers,
        run_result=run_result,
    )

    results_path = (root_dir / task.logging.results_file).resolve()
    results_path.parent.mkdir(parents=True, exist_ok=True)
    record = {
        "task_id": task.id,
        "status": decision.status,
        "reason": decision.reason,
        "candidate_score": decision.candidate_score,
        "diff_summary": manager.diff_summary(snapshot),
    }
    with results_path.open("a", encoding="utf-8") as handle:
        handle.write(json.dumps(record) + "\n")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
# tasks/skill-quality/task.yaml
id: skill-quality
description: Score one skill file against a deterministic rubric.
artifacts:
  include:
    - fixtures/SKILL.md
  exclude: []
  max_files_per_iteration: 1
mutation:
  mode: direct_edit
  allowed_file_types: [".md"]
  max_changed_lines: 20
runner:
  command: "python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json"
  cwd: "tasks/skill-quality"
  timeout_seconds: 30
scorer:
  type: command
  command: "python scripts/score_skill_task.py --input work/skill-run.json"
  parse:
    format: json
    score_field: score
    metrics_field: metrics
objective:
  primary_metric: score
  direction: maximize
constraints:
  - metric: violation_count
    op: "<="
    value: 0
policy:
  keep_if: better_primary
  tie_breakers: []
  on_failure: discard
budget:
  max_iterations: 5
  max_failures: 3
logging:
  results_file: work/results.jsonl
  candidate_dir: work/candidates
# tasks/skill-quality/rubric.md
# Skill Quality Rubric

- Required headings: `## Goal`, `## Constraints`, `## Examples`
- Must include at least one explicit prohibition using `Do not`
- Prefer short, direct sentences
# tasks/skill-quality/prompt.md
Improve the skill file while preserving its intent.

Priorities:
- Add missing required sections
- Keep guidance concise
- Include at least one explicit prohibition
- Avoid filler text
# tasks/skill-quality/fixtures/SKILL.md
# Planning Skill

## Goal
Write clear implementation plans for multi-step work.

## Constraints
Do not omit concrete commands or expected outcomes.

## Examples
- Show exact test commands.
- Keep tasks small and reviewable.
  • Step 4: Run the end-to-end tests

Run: uv run python -m unittest tests.test_execution_pipeline -v
Expected: OK

  • Step 5: Manually run the sample task

Run: uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml
Expected: exit code 0 and one JSON line in work/results.jsonl

  • Step 6: Commit the CLI and sample task
git add scripts/run_task.py scripts/evaluate_skill_task.py scripts/score_skill_task.py tasks/skill-quality tests/test_execution_pipeline.py
git commit -m "feat: add artifact loop cli and sample skill task"

Task 6: Add Bounded Mutation Validation

Files:

  • Create: engine/mutation_engine.py

  • Create: tests/test_mutation_engine.py

  • Modify: scripts/run_task.py

  • Test: tests/test_mutation_engine.py

  • Step 1: Write failing mutation guard tests

# tests/test_mutation_engine.py
from pathlib import Path
import tempfile
import unittest

from engine.mutation_engine import MutationValidationError, validate_candidate_changes
from engine.models import ArtifactSpec, BaselineSnapshot, MutationSpec, TaskSpec
from engine.models import BudgetSpec, ConstraintSpec, LoggingSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec


def make_task(root_dir: Path) -> TaskSpec:
    return TaskSpec(
        id="demo",
        description="Demo",
        artifacts=ArtifactSpec(include=["artifacts/*.md"], exclude=[], max_files_per_iteration=1),
        mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=3),
        runner=RunnerSpec(command="python -c \"print('run')\"", cwd=".", timeout_seconds=10),
        scorer=ScorerSpec(
            type="command",
            command="python -c \"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\"",
            parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"),
        ),
        objective=ObjectiveSpec(primary_metric="score", direction="maximize"),
        constraints=[ConstraintSpec(metric="violation_count", op="<=", value=0)],
        policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"),
        budget=BudgetSpec(max_iterations=1, max_failures=1),
        logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"),
        root_dir=root_dir,
    )


class MutationEngineTest(unittest.TestCase):
    def test_rejects_too_many_changed_lines(self) -> None:
        with tempfile.TemporaryDirectory() as tmp:
            root = Path(tmp)
            artifact_dir = root / "artifacts"
            artifact_dir.mkdir()
            target = artifact_dir / "sample.md"
            target.write_text("a\nb\nc\n", encoding="utf-8")
            snapshot = BaselineSnapshot(file_contents={target: "a\nb\nc\n"}, file_hashes={target: "hash"})
            target.write_text("a\nx\ny\nz\n", encoding="utf-8")
            with self.assertRaises(MutationValidationError):
                validate_candidate_changes(make_task(root), snapshot)

    def test_rejects_disallowed_extension(self) -> None:
        with tempfile.TemporaryDirectory() as tmp:
            root = Path(tmp)
            artifact_dir = root / "artifacts"
            artifact_dir.mkdir()
            target = artifact_dir / "sample.txt"
            target.write_text("before\n", encoding="utf-8")
            snapshot = BaselineSnapshot(file_contents={target: "before\n"}, file_hashes={target: "hash"})
            target.write_text("after\n", encoding="utf-8")
            with self.assertRaises(MutationValidationError):
                validate_candidate_changes(make_task(root), snapshot)


if __name__ == "__main__":
    unittest.main()