feat: add baseline-aware artifact loop orchestration

This commit is contained in:
sladro 2026-04-02 17:30:26 +08:00
parent 2d2e89eed4
commit 159d816d7f
16 changed files with 1537 additions and 145 deletions

View File

@ -6,17 +6,17 @@
The idea: give an AI agent a small but real LLM training setup and let it experiment autonomously overnight. It modifies the code, trains for 5 minutes, checks if the result improved, keeps or discards, and repeats. You wake up in the morning to a log of experiments and (hopefully) a better model. The training code here is a simplified single-GPU implementation of [nanochat](https://github.com/karpathy/nanochat). The core idea is that you're not touching any of the Python files like you normally would as a researcher. Instead, you are programming the `program.md` Markdown files that provide context to the AI agents and set up your autonomous research org. The default `program.md` in this repo is intentionally kept as a bare bones baseline, though it's obvious how one would iterate on it over time to find the "research org code" that achieves the fastest research progress, how you'd add more agents to the mix, etc. A bit more context on this project is here in this [tweet](https://x.com/karpathy/status/2029701092347630069) and [this tweet](https://x.com/karpathy/status/2031135152349524125).
The repo also includes a generic Artifact Loop Engine for editable text artifacts such as prompts, skills, config files, and small code paths. It applies the same iterate-evaluate-repeat pattern to these artifacts and writes structured iteration results to `work/results.jsonl`.
The repo also includes a generic Artifact Loop Engine for editable text artifacts such as prompts, skills, config files, and small code paths. It now runs a baseline-aware single iteration in an isolated sandbox and writes structured iteration results to `work/results.jsonl`.
Engine concepts:
- **`artifacts`** — the editable inputs the task is allowed to change.
- **`mutation`** — the file-count and line-count limits for candidate changes.
- **`mutator`** — the task-specific command that generates a candidate in the sandbox.
- **`runner`** — executes an iteration over the selected artifact set.
- **`scorer`** — evaluates each iteration and records the outcome.
- **`policy`** — decides what to keep, discard, or try next.
The task schema and validator include a `mutation` section, but the current CLI loop does not enforce mutation budgets yet because enforcement requires a baseline-aware orchestration layer.
## How it works
The repo is deliberately kept small. The original training workflow centers on three main files, while the Artifact Loop Engine adds a separate task runner path for editable text artifacts:
@ -55,6 +55,15 @@ If the above commands all work ok, your setup is working and you can go into aut
This repository also includes a generic optimization engine for editable text artifacts such as prompts, skills, config files, and small code paths. It uses the same iterate-evaluate-repeat loop as the training workflow, but applies it to task-defined artifacts instead of model code.
The current CLI runs one baseline-aware single iteration:
1. Snapshot the allowed artifact baseline.
2. Copy the repo into a temporary sandbox.
3. Run a task-specific mutator inside the sandbox.
4. Validate the candidate against mutation limits.
5. Run and score the candidate in the sandbox.
6. Keep or discard the candidate without mutating the main workspace unless the candidate is accepted.
Optional sample task command:
```bash

354
USAGE.md Normal file
View File

@ -0,0 +1,354 @@
# Usage
## What This Is
This project should be used as a safe single-iteration optimization engine.
It is not a free-form "let the AI edit the repo however it wants" loop.
It gives the AI a controlled way to:
1. define what may change
2. generate one candidate in a sandbox
3. run evaluation in isolation
4. score the result
5. keep or discard the candidate
If you want repeated optimization, the AI should call this workflow repeatedly.
## How AI Should Use This Project
The correct mental model is:
- the AI is the search and decision layer
- this project is the sandboxed execution and evaluation layer
The AI should work in this loop:
1. Pick one concrete optimization target.
2. Create or update a task for that target.
3. Run one iteration with `scripts/run_task.py`.
4. Read the JSON result and the latest line in `work/results.jsonl`.
5. Decide what to do next:
- `keep`: continue from the new baseline
- `discard`: adjust the mutator or task constraints and try again
- `crash`: fix the task, runner, scorer, or paths before continuing
6. Repeat until the budget or goal is reached.
Do not treat the main workspace as the experiment loop.
## Quick Start
Run the sample task from the repo root:
```bash
uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml
```
Expected result:
- stdout prints one JSON record
- `work/results.jsonl` gets one new JSON line
- `tasks/skill-quality/fixtures/SKILL.md` changes only if the candidate is kept
Example:
```json
{"task_id":"skill-quality","status":"keep","reason":"no baseline available","candidate_score":4.0,"diff_summary":""}
```
## What A Task Must Define
Each task file must include:
- `artifacts`
- `mutation`
- `mutator`
- `runner`
- `scorer`
- `objective`
- `constraints`
- `policy`
- `budget`
- `logging`
Important runtime fields:
```yaml
mutator:
type: command
command: "..."
cwd: "tasks/your-task"
timeout_seconds: 30
runner:
command: "..."
cwd: "tasks/your-task"
timeout_seconds: 30
scorer:
type: command
command: "..."
timeout_seconds: 30
parse:
format: json
score_field: score
metrics_field: metrics
```
## What Each Part Means
### `artifacts`
The files that may be accepted back into the main workspace.
Keep this set narrow.
Start with one file whenever possible.
### `mutation`
The safety budget:
- how many files may change
- how many lines may change
- which file types are allowed
### `mutator`
The command that creates one candidate in the sandbox.
This is where the AI's proposal becomes an actual candidate artifact.
### `runner`
The command that evaluates the candidate.
Examples:
- run a deterministic checker for a skill
- run a short training job
- run validation on a model checkpoint
- run a forecasting backtest
- run a scheduling simulation
### `scorer`
The command that returns structured JSON for the decision step.
Examples:
- rubric score
- `mAP50-95`
- F1 score
- validation loss
- RMSE
- maintenance-event prediction precision/recall
- production schedule cost or lateness penalty
## Path Rules
- artifact paths are task-relative
- `mutator.cwd` and `runner.cwd` are repo-relative
- absolute `cwd` values are rejected
- `..` in `cwd` is rejected
## Status Meaning
- `keep`: candidate accepted and allowed artifacts synced back
- `discard`: candidate rejected and main workspace unchanged
- `crash`: execution failed, main workspace unchanged, CLI exits non-zero
## How AI Should Interpret Results
### `keep`
Meaning:
- the candidate is better than the current baseline
- the main workspace now contains the accepted artifact
What the AI should do next:
- treat the current artifact as the new baseline
- decide whether another iteration is still worth trying
### `discard`
Meaning:
- the candidate was valid enough to evaluate, or was rejected during validation, but it should not replace the baseline
Common reasons:
- too many changed files
- too many changed lines
- disallowed file type
- non-artifact change detected
- candidate did not improve
What the AI should do next:
- shrink the mutator if the candidate was too aggressive
- fix task boundaries if the candidate touched the wrong files
- adjust strategy if the score did not improve
### `crash`
Meaning:
- mutator, runner, scorer, or task configuration failed
Common reasons:
- command failed
- scorer output was not valid JSON
- configured sandbox `cwd` does not exist
What the AI should do next:
- fix the task definition or scripts first
- do not continue blind iteration until the crash cause is removed
## What AI Must Not Do
- do not directly use the main workspace as the experiment loop
- do not manually copy files out of sandbox logic
- do not treat `work`, `.venv`, or `.pytest_cache` as source artifacts
- do not let runner/scorer outputs become accepted artifacts unless explicitly intended
- do not start with a wide artifact scope if one-file optimization is possible
- do not open up entire model codebases when a config or recipe file is enough
## How To Create A New Task
When the AI wants to optimize a new problem, use this sequence:
1. Pick the smallest artifact that can express the change.
2. Define a strict mutation budget.
3. Write a deterministic mutator.
4. Write a deterministic runner.
5. Write a scorer that emits structured JSON.
6. Run one iteration.
7. Inspect the result.
8. Tighten or broaden the task only if the current boundary is too small.
## Recommended Optimization Strategy
Start from the narrowest controllable object.
Good first artifacts:
- one `SKILL.md`
- one prompt file
- one YAML config
- one training recipe
- one augmentation policy
- one scheduling policy file
- one feature engineering config
Avoid starting with:
- large source trees
- multiple unrelated configs
- code plus docs plus scripts in one task
## General Deep Learning Use Cases
This engine is not limited to skills or YOLO.
It fits any scenario where:
- one candidate can be generated deterministically
- one evaluation run can be executed
- one score can be extracted
Examples:
- object detection tuning
- classification tuning
- time-series forecasting
- predictive maintenance
- production scheduling
- demand prediction
- anomaly detection
- tabular model feature-policy search
- inference prompt or tool-policy optimization
## Pattern For Deep Learning Tasks
For most deep learning tasks, the AI should optimize one of these first:
- training config
- augmentation config
- feature config
- inference threshold config
- loss-weight config
- scheduling or dispatch policy config
Typical setup:
- `artifacts`: one config file
- `mutator`: edits config values
- `runner`: runs a short fixed-budget experiment or simulation
- `scorer`: emits a JSON score and metrics
This is usually better than letting the AI directly rewrite model code.
## Example Mental Templates
### Skill Optimization
- artifact: one `SKILL.md`
- mutator: rewrites structure or instructions
- runner: checks required sections and formatting
- scorer: outputs rubric-based score
### YOLO Or Vision Model Tuning
- artifact: one experiment YAML
- mutator: changes lr, augmentation, image size, loss weights, thresholds
- runner: runs a short training/eval job
- scorer: outputs `mAP`, latency, memory, and constraint violations
### Predictive Maintenance
- artifact: one feature/training config
- mutator: changes window sizes, features, thresholds, class weighting
- runner: runs a short train + validation or backtest
- scorer: outputs precision, recall, F1, false-alarm penalty
### Production Scheduling
- artifact: one scheduling policy or simulation config
- mutator: changes dispatch heuristics, penalty weights, batching rules
- runner: runs a deterministic simulation
- scorer: outputs lateness, throughput, utilization, total cost
## Ignored Runtime Directories
These repo-root directories are ignored by sandbox copy/hash validation:
- `work`
- `.venv`
- `.pytest_cache`
Reason:
- they are runtime or cache state, not accepted source artifacts
## When To Extend The Project Itself
Do not default to changing the engine.
Only extend the engine when the AI cannot express the optimization problem cleanly with:
- a task file
- a mutator
- a runner
- a scorer
In most cases, the right move is to add a new task, not to modify the orchestrator.
## More Detail
See the full guide:
[2026-04-02-baseline-aware-single-iteration-orchestrator-usage.md](D:\App\GitHub\autoresearch\docs\superpowers\usage\2026-04-02-baseline-aware-single-iteration-orchestrator-usage.md)

View File

@ -0,0 +1,179 @@
# Baseline-Aware Single-Iteration Orchestrator Usage
## What It Does
The Artifact Loop Engine now runs one baseline-aware optimization iteration in a sandbox:
1. Load a task spec.
2. Snapshot the current accepted artifact baseline.
3. Copy the repo into a temporary sandbox.
4. Run the task mutator in the sandbox.
5. Validate candidate changes against mutation limits.
6. Run and score the candidate in the sandbox.
7. Keep or discard the candidate.
8. Sync back only allowed artifact files on `keep`.
The main workspace stays unchanged on `discard` and `crash`.
## Quick Start
Run the sample task from the repo root:
```bash
uv run python scripts/run_task.py --task tasks/skill-quality/task.yaml
```
Expected behavior:
- The command prints one JSON record to stdout.
- A matching JSON line is appended to `work/results.jsonl`.
- `tasks/skill-quality/fixtures/SKILL.md` is updated only if the candidate is kept.
Example result:
```json
{"task_id":"skill-quality","status":"keep","reason":"no baseline available","candidate_score":4.0,"diff_summary":""}
```
## Task Schema
A task file must include these sections:
- `id`
- `description`
- `artifacts`
- `mutation`
- `mutator`
- `runner`
- `scorer`
- `objective`
- `constraints`
- `policy`
- `budget`
- `logging`
Important runtime fields:
```yaml
mutator:
type: command
command: "python ../../scripts/mutate_skill_task.py --task-dir . --artifact fixtures/SKILL.md"
cwd: "tasks/skill-quality"
timeout_seconds: 30
runner:
command: "python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json"
cwd: "tasks/skill-quality"
timeout_seconds: 30
scorer:
type: command
command: "python scripts/score_skill_task.py --input work/skill-run.json"
timeout_seconds: 30
parse:
format: json
score_field: score
metrics_field: metrics
```
## Path Rules
- `task.root_dir` is the directory containing `task.yaml`.
- `artifacts.include` paths are resolved relative to the task directory.
- `mutator.cwd` and `runner.cwd` are repo-relative paths.
- Absolute `cwd` values are rejected.
- `..` segments in `cwd` are rejected.
## Keep, Discard, Crash
### Keep
- Candidate is accepted.
- Only allowed artifact files are copied back into the main workspace.
### Discard
- Candidate is rejected.
- Main workspace remains unchanged.
### Crash
- Mutator, runner, or scorer execution failed.
- Main workspace remains unchanged.
- CLI exits non-zero.
## Validation Rules
The orchestrator rejects a candidate before runner execution when:
- changed file count exceeds `artifacts.max_files_per_iteration`
- changed line count exceeds `mutation.max_changed_lines`
- changed file type is not allowed
- a non-artifact file was mutated
The orchestrator also revalidates artifact state before sync-back on `keep`, so later runner or scorer edits cannot bypass mutation limits.
## Repo Directories Ignored By Sandbox State Checks
These repo-root directories are intentionally ignored during sandbox copy/hash validation:
- `work`
- `.venv`
- `.pytest_cache`
Reason:
- They are runtime or cache state, not accepted source artifacts.
- Including them can distort keepability validation or make real runs unnecessarily slow.
## Output Record
Each CLI run appends one JSON line to `work/results.jsonl` with:
- `task_id`
- `status`
- `reason`
- `candidate_score`
- `diff_summary`
## Recommended Workflow For Adding A New Task
1. Create a task directory under `tasks/`.
2. Define the artifact set narrowly.
3. Set conservative mutation limits first.
4. Add a deterministic mutator command.
5. Add a deterministic runner and scorer.
6. Run `scripts/run_task.py` directly.
7. Inspect the latest line in `work/results.jsonl`.
## Common Failure Cases
### `status = "discard"`
Usually means:
- mutation budget exceeded
- disallowed file type
- non-artifact change detected
- candidate did not improve
### `status = "crash"`
Usually means:
- mutator command failed
- runner command failed
- scorer command failed
- scorer output was not parseable
- configured `cwd` does not exist in the sandbox
## Current Scope
This implementation supports exactly one isolated optimization iteration.
It does not yet implement:
- multi-iteration search
- parallel candidate execution
- git-backed sandboxing
- branch-per-candidate workflows

View File

@ -19,6 +19,14 @@ class MutationSpec:
max_changed_lines: int
@dataclass(frozen=True)
class MutatorSpec:
type: str
command: str
cwd: str
timeout_seconds: int
@dataclass(frozen=True)
class RunnerSpec:
command: str
@ -37,6 +45,7 @@ class ScorerParseSpec:
class ScorerSpec:
type: str
command: str
timeout_seconds: int
parse: ScorerParseSpec
@ -86,6 +95,7 @@ class TaskSpec:
budget: BudgetSpec
logging: LoggingSpec
root_dir: Path
mutator: MutatorSpec
@dataclass(frozen=True)

View File

@ -1,8 +1,10 @@
from __future__ import annotations
from dataclasses import replace
from difflib import unified_diff
from pathlib import Path
from engine.artifact_manager import ArtifactManager
from engine.models import BaselineSnapshot, TaskSpec
@ -26,21 +28,33 @@ def _count_changed_lines(before: str, after: str, path: Path) -> int:
return changed_lines
def validate_candidate_changes(task: TaskSpec, snapshot: BaselineSnapshot) -> None:
def validate_candidate_changes(task: TaskSpec, snapshot: BaselineSnapshot, candidate_root: Path) -> None:
changed_files = 0
changed_lines = 0
allowed_file_types = set(task.mutation.allowed_file_types)
candidate_task = replace(task, root_dir=candidate_root)
candidate_paths = set(ArtifactManager(candidate_task).resolve_paths())
for path, baseline_text in snapshot.file_contents.items():
current_text = path.read_text(encoding="utf-8") if path.exists() else ""
relative_path = path.relative_to(task.root_dir)
candidate_path = candidate_root / relative_path
current_text = candidate_path.read_text(encoding="utf-8") if candidate_path.exists() else ""
if current_text == baseline_text:
candidate_paths.discard(candidate_path)
continue
changed_files += 1
if path.suffix not in allowed_file_types:
raise MutationValidationError(f"disallowed file type: {path.suffix}")
if candidate_path.suffix not in allowed_file_types:
raise MutationValidationError(f"disallowed file type: {candidate_path.suffix}")
changed_lines += _count_changed_lines(baseline_text, current_text, path)
changed_lines += _count_changed_lines(baseline_text, current_text, candidate_path)
candidate_paths.discard(candidate_path)
for candidate_path in sorted(candidate_paths):
changed_files += 1
if candidate_path.suffix not in allowed_file_types:
raise MutationValidationError(f"disallowed file type: {candidate_path.suffix}")
changed_lines += _count_changed_lines("", candidate_path.read_text(encoding="utf-8"), candidate_path)
if changed_files > task.artifacts.max_files_per_iteration:
raise MutationValidationError(

252
engine/orchestrator.py Normal file
View File

@ -0,0 +1,252 @@
from __future__ import annotations
import shutil
import tempfile
from dataclasses import replace
from hashlib import sha256
from pathlib import Path
from engine.artifact_manager import ArtifactManager
from engine.decision_engine import decide_candidate
from engine.models import BaselineSnapshot, DecisionResult, TaskSpec
from engine.mutation_engine import MutationValidationError, validate_candidate_changes
from engine.runner import run_command
from engine.scorer import parse_score_output
_SANDBOX_EXCLUDED_ROOTS = frozenset({"work", ".venv", ".pytest_cache"})
def _normalize_relative_path(raw_path: str) -> Path:
path = Path(raw_path)
return Path(*[part for part in path.parts if part not in ("", ".")])
def _validate_sandbox_relative_path(raw_path: str, field_name: str) -> Path:
path = Path(raw_path)
if path.is_absolute():
raise ValueError(f"{field_name} must be relative to the sandbox root")
if any(part == ".." for part in path.parts):
raise ValueError(f"{field_name} must not contain '..'")
return _normalize_relative_path(raw_path)
def _infer_repo_root(task: TaskSpec, candidate_paths: list[Path]) -> Path:
task_root = task.root_dir.resolve()
non_empty_paths = [path for path in candidate_paths if path.parts]
if not non_empty_paths:
return task_root
best_root = task_root
best_match_count = -1
candidate_root = task_root
while True:
match_count = sum(1 for relative_path in non_empty_paths if (candidate_root / relative_path).is_dir())
if match_count > best_match_count:
best_root = candidate_root
best_match_count = match_count
if candidate_root.parent == candidate_root:
break
candidate_root = candidate_root.parent
return best_root
def _is_sandbox_excluded_path(relative_path: Path) -> bool:
return bool(relative_path.parts) and relative_path.parts[0] in _SANDBOX_EXCLUDED_ROOTS
def _copy_repo_to_sandbox(repo_root: Path, sandbox_root: Path) -> None:
for child in repo_root.iterdir():
if child.name == ".git" or _is_sandbox_excluded_path(Path(child.name)):
continue
destination = sandbox_root / child.name
if child.is_dir():
shutil.copytree(child, destination, dirs_exist_ok=True)
continue
shutil.copy2(child, destination)
def _sandbox_task(task: TaskSpec, sandbox_root: Path, repo_root: Path) -> TaskSpec:
relative_task_root = task.root_dir.resolve().relative_to(repo_root)
return replace(task, root_dir=sandbox_root / relative_task_root)
def _sandbox_snapshot(task: TaskSpec, sandbox_task: TaskSpec, snapshot: BaselineSnapshot) -> BaselineSnapshot:
file_contents: dict[Path, str] = {}
file_hashes: dict[Path, str] = {}
for path, content in snapshot.file_contents.items():
relative_path = path.relative_to(task.root_dir)
sandbox_path = sandbox_task.root_dir / relative_path
file_contents[sandbox_path] = content
file_hashes[sandbox_path] = snapshot.file_hashes[path]
return BaselineSnapshot(file_contents=file_contents, file_hashes=file_hashes)
def _repo_file_hashes(root: Path) -> dict[Path, str]:
file_hashes: dict[Path, str] = {}
for path in root.rglob("*"):
if not path.is_file():
continue
relative_path = path.relative_to(root)
if ".git" in path.parts or _is_sandbox_excluded_path(relative_path):
continue
file_hashes[relative_path] = sha256(path.read_bytes()).hexdigest()
return file_hashes
def _validate_keepable_candidate(
task: TaskSpec,
sandbox_task: TaskSpec,
baseline_snapshot: BaselineSnapshot,
repo_root: Path,
sandbox_root: Path,
) -> None:
task_root_relative = task.root_dir.resolve().relative_to(repo_root.resolve())
allowed_relative_paths = {
task_root_relative / path.relative_to(task.root_dir) for path in baseline_snapshot.file_contents
}
allowed_relative_paths.update(
task_root_relative / path.relative_to(sandbox_task.root_dir)
for path in ArtifactManager(sandbox_task).resolve_paths()
)
baseline_hashes = _repo_file_hashes(repo_root)
candidate_hashes = _repo_file_hashes(sandbox_root)
for relative_path in sorted(set(baseline_hashes) | set(candidate_hashes)):
if baseline_hashes.get(relative_path) == candidate_hashes.get(relative_path):
continue
if relative_path in allowed_relative_paths:
continue
raise MutationValidationError(f"non-artifact change detected: {relative_path.as_posix()}")
def _validate_candidate_state(
task: TaskSpec,
sandbox_task: TaskSpec,
baseline_snapshot: BaselineSnapshot,
repo_root: Path,
sandbox_root: Path,
) -> None:
_validate_keepable_candidate(task, sandbox_task, baseline_snapshot, repo_root, sandbox_root)
validate_candidate_changes(task, baseline_snapshot, sandbox_task.root_dir)
def _validate_final_candidate_artifacts(
task: TaskSpec,
sandbox_task: TaskSpec,
baseline_snapshot: BaselineSnapshot,
) -> None:
validate_candidate_changes(task, baseline_snapshot, sandbox_task.root_dir)
def _resolve_sandbox_cwd(sandbox_root: Path, relative_cwd: Path, field_name: str) -> Path:
sandbox_cwd = sandbox_root / relative_cwd
if not sandbox_cwd.is_dir():
raise ValueError(f"{field_name} does not exist in sandbox: {relative_cwd.as_posix()}")
return sandbox_cwd
def _sync_artifacts_back(task: TaskSpec, sandbox_task: TaskSpec) -> None:
source_manager = ArtifactManager(sandbox_task)
target_manager = ArtifactManager(task)
source_paths = source_manager.resolve_paths()
source_relative_paths = {path.relative_to(sandbox_task.root_dir) for path in source_paths}
for path in source_paths:
relative_path = path.relative_to(sandbox_task.root_dir)
target_path = task.root_dir / relative_path
target_path.parent.mkdir(parents=True, exist_ok=True)
with path.open("r", encoding="utf-8", newline="") as source_handle:
with target_path.open("w", encoding="utf-8", newline="") as target_handle:
target_handle.write(source_handle.read())
for path in target_manager.resolve_paths():
relative_path = path.relative_to(task.root_dir)
if relative_path in source_relative_paths:
continue
path.unlink()
def _crash(reason: str, baseline_score: float | None) -> DecisionResult:
return DecisionResult(
status="crash",
reason=reason,
baseline_score=baseline_score,
candidate_score=None,
)
def run_single_iteration(task: TaskSpec, baseline_score: float | None) -> DecisionResult:
manager = ArtifactManager(task)
baseline_snapshot = manager.snapshot()
try:
mutator_relative_cwd = _validate_sandbox_relative_path(task.mutator.cwd, "mutator.cwd")
runner_relative_cwd = _validate_sandbox_relative_path(task.runner.cwd, "runner.cwd")
except ValueError as exc:
return _crash(str(exc), baseline_score)
repo_root = _infer_repo_root(task, [mutator_relative_cwd, runner_relative_cwd])
with tempfile.TemporaryDirectory(prefix="autoresearch-orchestrator-") as sandbox_dir:
sandbox_root = Path(sandbox_dir)
_copy_repo_to_sandbox(repo_root, sandbox_root)
sandbox_task = _sandbox_task(task, sandbox_root, repo_root)
try:
mutator_cwd = _resolve_sandbox_cwd(sandbox_root, mutator_relative_cwd, "mutator.cwd")
except ValueError as exc:
return _crash(str(exc), baseline_score)
mutator_result = run_command(task.mutator.command, mutator_cwd, task.mutator.timeout_seconds)
if mutator_result.exit_code != 0:
return _crash(f"mutator failed with exit code {mutator_result.exit_code}", baseline_score)
try:
_validate_candidate_state(task, sandbox_task, baseline_snapshot, repo_root, sandbox_root)
except MutationValidationError as exc:
return DecisionResult(
status="discard",
reason=str(exc),
baseline_score=baseline_score,
candidate_score=None,
)
try:
runner_cwd = _resolve_sandbox_cwd(sandbox_root, runner_relative_cwd, "runner.cwd")
except ValueError as exc:
return _crash(str(exc), baseline_score)
runner_result = run_command(task.runner.command, runner_cwd, task.runner.timeout_seconds)
if runner_result.exit_code != 0:
return _crash(f"command failed with exit code {runner_result.exit_code}", baseline_score)
scorer_result = run_command(task.scorer.command, sandbox_root, task.scorer.timeout_seconds)
if scorer_result.exit_code != 0:
return _crash(f"scorer failed with exit code {scorer_result.exit_code}", baseline_score)
try:
candidate_score = parse_score_output(
scorer_result.stdout,
score_field=task.scorer.parse.score_field,
metrics_field=task.scorer.parse.metrics_field,
)
except ValueError as exc:
return _crash(f"score parse failed: {exc}", baseline_score)
decision = decide_candidate(
baseline=baseline_score,
candidate=candidate_score,
objective=task.objective,
constraints=task.constraints,
tie_breakers=task.policy.tie_breakers,
run_result=runner_result,
)
if decision.status == "keep":
try:
_validate_final_candidate_artifacts(task, sandbox_task, baseline_snapshot)
except MutationValidationError as exc:
return DecisionResult(
status="discard",
reason=str(exc),
baseline_score=baseline_score,
candidate_score=None,
)
_sync_artifacts_back(task, sandbox_task)
return decision

View File

@ -11,6 +11,7 @@ from engine.models import (
ConstraintSpec,
LoggingSpec,
MutationSpec,
MutatorSpec,
ObjectiveSpec,
PolicySpec,
RunnerSpec,
@ -98,6 +99,17 @@ def load_task(task_path: Path) -> TaskSpec:
max_changed_lines=_require_int(mutation_data, "max_changed_lines", "task.mutation"),
)
mutator_data = _require_mapping(_require_value(root, "mutator"), "task.mutator")
mutator_type = _require_str(mutator_data, "type", "task.mutator")
if mutator_type != "command":
raise TaskValidationError("task.mutator.type must be command")
mutator = MutatorSpec(
type=mutator_type,
command=_require_str(mutator_data, "command", "task.mutator"),
cwd=_require_str(mutator_data, "cwd", "task.mutator"),
timeout_seconds=_require_int(mutator_data, "timeout_seconds", "task.mutator"),
)
runner_data = _require_mapping(_require_value(root, "runner"), "task.runner")
runner = RunnerSpec(
command=_require_str(runner_data, "command", "task.runner"),
@ -116,6 +128,7 @@ def load_task(task_path: Path) -> TaskSpec:
scorer = ScorerSpec(
type=scorer_type,
command=_require_str(scorer_data, "command", "task.scorer"),
timeout_seconds=_require_int(scorer_data, "timeout_seconds", "task.scorer"),
parse=ScorerParseSpec(
format=scorer_format,
score_field=_require_str(scorer_parse_data, "score_field", "task.scorer.parse"),
@ -185,4 +198,5 @@ def load_task(task_path: Path) -> TaskSpec:
budget=budget,
logging=logging,
root_dir=task_path.parent,
mutator=mutator,
)

View File

@ -0,0 +1,42 @@
from __future__ import annotations
import argparse
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--task-dir", required=True)
parser.add_argument("--artifact", required=True)
return parser.parse_args()
def main() -> int:
args = parse_args()
task_dir = Path(args.task_dir).resolve()
artifact_path = (task_dir / args.artifact).resolve()
artifact_path.write_text(
"\n".join(
[
"# Deterministic Sample Skill",
"",
"## Sandbox Marker",
"Candidate emitted by sandbox mutator.",
"",
"## When to Use",
"Use this skill when you need a deterministic artifact for end-to-end testing.",
"",
"## Steps",
"1. Read the task instructions.",
"2. Compare the skill against the rubric.",
"3. Return the computed score.",
]
)
+ "\n",
encoding="utf-8",
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -9,10 +9,7 @@ ROOT_DIR = Path(__file__).resolve().parents[1]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
from engine.artifact_manager import ArtifactManager
from engine.decision_engine import decide_candidate
from engine.runner import run_command
from engine.scorer import parse_score_output
from engine.orchestrator import run_single_iteration
from engine.task_loader import load_task
@ -36,7 +33,15 @@ def _append_record(repo_root: Path, results_file: str, record: dict[str, object]
handle.write(json.dumps(record, ensure_ascii=False) + "\n")
def _emit_record(repo_root: Path, task_id: str, results_file: str, status: str, reason: str, candidate_score: float | None, diff_summary: str) -> int:
def _emit_record(
repo_root: Path,
task_id: str,
results_file: str,
status: str,
reason: str,
candidate_score: float | None,
diff_summary: str,
) -> int:
record = {
"task_id": task_id,
"status": status,
@ -49,113 +54,22 @@ def _emit_record(repo_root: Path, task_id: str, results_file: str, status: str,
return 0
def _finalize_outcome(
*,
repo_root: Path,
task_id: str,
results_file: str,
artifact_manager: ArtifactManager,
snapshot,
status: str,
reason: str,
candidate_score: float | None,
) -> int:
diff_summary = artifact_manager.diff_summary(snapshot)
if status in {"discard", "crash"}:
artifact_manager.restore(snapshot)
exit_code = 1 if status == "crash" else 0
_emit_record(
repo_root=repo_root,
task_id=task_id,
results_file=results_file,
status=status,
reason=reason,
candidate_score=candidate_score,
diff_summary=diff_summary,
)
return exit_code
def main() -> int:
args = parse_args()
repo_root = ROOT_DIR.resolve()
task_path = _resolve_repo_path(repo_root, args.task)
task = load_task(task_path)
artifact_manager = ArtifactManager(task)
snapshot = artifact_manager.snapshot()
run_result = run_command(
task.runner.command,
_resolve_repo_path(repo_root, task.runner.cwd),
task.runner.timeout_seconds,
)
if run_result.exit_code != 0:
return _finalize_outcome(
repo_root=repo_root,
task_id=task.id,
results_file=task.logging.results_file,
artifact_manager=artifact_manager,
snapshot=snapshot,
status="crash",
reason=f"command failed with exit code {run_result.exit_code}",
candidate_score=None,
)
scorer_result = run_command(
task.scorer.command,
repo_root,
task.runner.timeout_seconds,
)
if scorer_result.exit_code != 0:
return _finalize_outcome(
repo_root=repo_root,
task_id=task.id,
results_file=task.logging.results_file,
artifact_manager=artifact_manager,
snapshot=snapshot,
status="crash",
reason=f"scorer failed with exit code {scorer_result.exit_code}",
candidate_score=None,
)
try:
score_result = parse_score_output(
scorer_result.stdout,
score_field=task.scorer.parse.score_field,
metrics_field=task.scorer.parse.metrics_field,
)
except (KeyError, TypeError, ValueError) as exc:
return _finalize_outcome(
repo_root=repo_root,
task_id=task.id,
results_file=task.logging.results_file,
artifact_manager=artifact_manager,
snapshot=snapshot,
status="crash",
reason=f"score parse failed: {exc}",
candidate_score=None,
)
decision = decide_candidate(
baseline=None,
candidate=score_result,
objective=task.objective,
constraints=task.constraints,
tie_breakers=task.policy.tie_breakers,
run_result=run_result,
)
return _finalize_outcome(
decision = run_single_iteration(task, baseline_score=None)
_emit_record(
repo_root=repo_root,
task_id=task.id,
results_file=task.logging.results_file,
artifact_manager=artifact_manager,
snapshot=snapshot,
status=decision.status,
reason=decision.reason,
candidate_score=decision.candidate_score,
diff_summary="",
)
return 1 if decision.status == "crash" else 0
if __name__ == "__main__":

View File

@ -1,15 +1,12 @@
# Deterministic Sample Skill
## Purpose
Provide a stable sample skill document for the execution pipeline.
## Sandbox Marker
Candidate emitted by sandbox mutator.
## When to Use
Use this skill when you need a deterministic artifact for end-to-end testing.
## Steps
1. Read the task instructions.
2. Compare the skill against the rubric.
3. Return the computed score.

View File

@ -9,14 +9,20 @@ mutation:
mode: direct_edit
allowed_file_types:
- .md
max_changed_lines: 20
max_changed_lines: 40
mutator:
type: command
command: "python ../../scripts/mutate_skill_task.py --task-dir . --artifact fixtures/SKILL.md"
cwd: "tasks/skill-quality"
timeout_seconds: 30
runner:
command: python ../../scripts/evaluate_skill_task.py --task-dir . --artifact fixtures/SKILL.md --output ../../work/skill-run.json
cwd: tasks/skill-quality
cwd: "tasks/skill-quality"
timeout_seconds: 30
scorer:
type: command
command: python scripts/score_skill_task.py --input work/skill-run.json
timeout_seconds: 30
parse:
format: json
score_field: score
@ -33,8 +39,8 @@ policy:
tie_breakers: []
on_failure: discard
budget:
max_iterations: 1
max_failures: 1
max_iterations: 5
max_failures: 3
logging:
results_file: work/results.jsonl
candidate_dir: work/candidates

View File

@ -5,7 +5,7 @@ import unittest
from engine.artifact_manager import ArtifactManager
from engine.models import ArtifactSpec, BaselineSnapshot, TaskSpec
from engine.models import BudgetSpec, ConstraintSpec, LoggingSpec, MutationSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec
from engine.models import BudgetSpec, ConstraintSpec, LoggingSpec, MutationSpec, MutatorSpec, ObjectiveSpec, PolicySpec, RunnerSpec, ScorerParseSpec, ScorerSpec
def make_task(root_dir: Path) -> TaskSpec:
@ -14,10 +14,12 @@ def make_task(root_dir: Path) -> TaskSpec:
description="Demo",
artifacts=ArtifactSpec(include=["artifacts/*.md"], exclude=["artifacts/ignore.md"], max_files_per_iteration=1),
mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=20),
mutator=MutatorSpec(type="command", command="python -c \"print('mutate')\"", cwd=".", timeout_seconds=30),
runner=RunnerSpec(command="python -c \"print('run')\"", cwd=".", timeout_seconds=10),
scorer=ScorerSpec(
type="command",
command="python -c \"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\"",
timeout_seconds=10,
parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"),
),
objective=ObjectiveSpec(primary_metric="score", direction="maximize"),

View File

@ -132,7 +132,7 @@ class RunTaskCliTest(unittest.TestCase):
ignore=shutil.ignore_patterns("__pycache__"),
)
def test_run_task_cli_writes_results_jsonl(self) -> None:
def test_run_task_cli_keeps_candidate_from_sandbox(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
temp_root = Path(tmp)
self._copy_repo_layout(temp_root)
@ -153,11 +153,16 @@ class RunTaskCliTest(unittest.TestCase):
self.assertEqual(len(lines), 1)
record = json.loads(lines[0])
self.assertEqual(json.loads(completed.stdout), record)
self.assertEqual(record["task_id"], "skill-quality")
self.assertEqual(record["status"], "keep")
self.assertEqual(record["reason"], "no baseline available")
self.assertEqual(record["candidate_score"], 4.0)
self.assertEqual(record["diff_summary"], "")
artifact_text = (temp_root / "tasks" / "skill-quality" / "fixtures" / "SKILL.md").read_text(
encoding="utf-8"
)
self.assertIn("## Sandbox Marker", artifact_text)
def test_run_task_cli_uses_repo_root_for_absolute_task_path(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
@ -190,6 +195,10 @@ class RunTaskCliTest(unittest.TestCase):
record = json.loads(lines[0])
self.assertEqual(record["task_id"], "skill-quality")
self.assertEqual(record["status"], "keep")
artifact_text = (temp_root / "tasks" / "skill-quality" / "fixtures" / "SKILL.md").read_text(
encoding="utf-8"
)
self.assertIn("## Sandbox Marker", artifact_text)
def test_run_task_cli_restores_artifacts_after_crash(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
@ -216,6 +225,11 @@ class RunTaskCliTest(unittest.TestCase):
" allowed_file_types:",
" - .md",
" max_changed_lines: 20",
"mutator:",
" type: command",
" command: python -c \"print('mutator ok')\"",
" cwd: tasks/runner-crash-restores",
" timeout_seconds: 30",
"runner:",
" command: python -c \"from pathlib import Path; Path('fixtures/SKILL.md').write_text('# Mutated\\n', encoding='utf-8'); raise SystemExit(9)\"",
" cwd: tasks/runner-crash-restores",
@ -223,6 +237,7 @@ class RunTaskCliTest(unittest.TestCase):
"scorer:",
" type: command",
" command: python -c \"print('unused scorer')\"",
" timeout_seconds: 30",
" parse:",
" format: json",
" score_field: score",
@ -266,8 +281,7 @@ class RunTaskCliTest(unittest.TestCase):
self.assertEqual(record["task_id"], "runner-crash-restores")
self.assertEqual(record["status"], "crash")
self.assertEqual(record["reason"], "command failed with exit code 9")
self.assertIn("# Original", record["diff_summary"])
self.assertIn("# Mutated", record["diff_summary"])
self.assertIsNone(record["candidate_score"])
def test_run_task_cli_returns_nonzero_on_crash_and_writes_record(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
@ -293,6 +307,11 @@ class RunTaskCliTest(unittest.TestCase):
" allowed_file_types:",
" - .md",
" max_changed_lines: 20",
"mutator:",
" type: command",
" command: python -c \"print('mutator ok')\"",
" cwd: tasks/scorer-failure",
" timeout_seconds: 30",
"runner:",
" command: python -c \"print('runner ok')\"",
" cwd: tasks/scorer-failure",
@ -300,6 +319,7 @@ class RunTaskCliTest(unittest.TestCase):
"scorer:",
" type: command",
" command: python -c \"import sys; sys.stderr.write('boom\\n'); raise SystemExit(7)\"",
" timeout_seconds: 30",
" parse:",
" format: json",
" score_field: score",
@ -368,6 +388,11 @@ class RunTaskCliTest(unittest.TestCase):
" allowed_file_types:",
" - .md",
" max_changed_lines: 20",
"mutator:",
" type: command",
" command: python -c \"print('mutator ok')\"",
" cwd: tasks/score-parse-failure",
" timeout_seconds: 30",
"runner:",
" command: python -c \"print('runner ok')\"",
" cwd: tasks/score-parse-failure",
@ -375,6 +400,7 @@ class RunTaskCliTest(unittest.TestCase):
"scorer:",
" type: command",
" command: python -c \"print('not-json')\"",
" timeout_seconds: 30",
" parse:",
" format: json",
" score_field: score",

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import shutil
import tempfile
import unittest
from pathlib import Path
@ -8,9 +9,9 @@ from engine.artifact_manager import ArtifactManager
from engine.models import (
ArtifactSpec,
BudgetSpec,
ConstraintSpec,
LoggingSpec,
MutationSpec,
MutatorSpec,
ObjectiveSpec,
PolicySpec,
RunnerSpec,
@ -21,7 +22,7 @@ from engine.models import (
from engine.mutation_engine import MutationValidationError, validate_candidate_changes
def _make_task(root_dir: Path, allowed_file_types: list[str], max_changed_lines: int) -> TaskSpec:
def _make_task(task_root: Path, allowed_file_types: list[str], max_changed_lines: int) -> TaskSpec:
return TaskSpec(
id="mutation-test",
description="Mutation validation fixture.",
@ -31,10 +32,12 @@ def _make_task(root_dir: Path, allowed_file_types: list[str], max_changed_lines:
allowed_file_types=allowed_file_types,
max_changed_lines=max_changed_lines,
),
runner=RunnerSpec(command="python -c \"print('runner ok')\"", cwd=".", timeout_seconds=30),
mutator=MutatorSpec(type="command", command="python -c \"print('mutate')\"", cwd="tasks/demo", timeout_seconds=30),
runner=RunnerSpec(command="python -c \"print('runner ok')\"", cwd="tasks/demo", timeout_seconds=30),
scorer=ScorerSpec(
type="command",
command="python -c \"print('{\\\"score\\\": 1.0, \\\"metrics\\\": {}}')\"",
timeout_seconds=30,
parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"),
),
objective=ObjectiveSpec(primary_metric="score", direction="maximize"),
@ -42,42 +45,60 @@ def _make_task(root_dir: Path, allowed_file_types: list[str], max_changed_lines:
policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"),
budget=BudgetSpec(max_iterations=1, max_failures=1),
logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"),
root_dir=root_dir,
root_dir=task_root,
)
class MutationEngineTest(unittest.TestCase):
def test_rejects_too_many_changed_lines(self) -> None:
def test_rejects_too_many_changed_lines_in_candidate_root(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root_dir = Path(tmp)
fixture_dir = root_dir / "fixtures"
fixture_dir.mkdir(parents=True)
target = fixture_dir / "note.md"
target.write_text("line 1\nline 2\n", encoding="utf-8")
baseline_root = Path(tmp) / "baseline"
candidate_root = Path(tmp) / "candidate"
(baseline_root / "fixtures").mkdir(parents=True)
(baseline_root / "fixtures" / "note.md").write_text("line 1\nline 2\n", encoding="utf-8")
shutil.copytree(baseline_root, candidate_root)
task = _make_task(root_dir, allowed_file_types=[".md"], max_changed_lines=1)
snapshot = ArtifactManager(task).snapshot()
target.write_text("line 1\nline 2\nline 3\n", encoding="utf-8")
baseline_task = _make_task(baseline_root, allowed_file_types=[".md"], max_changed_lines=1)
snapshot = ArtifactManager(baseline_task).snapshot()
(candidate_root / "fixtures" / "note.md").write_text("line 1\nline 2\nline 3\n", encoding="utf-8")
with self.assertRaises(MutationValidationError) as ctx:
validate_candidate_changes(task, snapshot)
validate_candidate_changes(baseline_task, snapshot, candidate_root)
self.assertIn("changed lines", str(ctx.exception))
def test_rejects_disallowed_extension(self) -> None:
def test_rejects_new_file_with_disallowed_extension_in_candidate_root(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root_dir = Path(tmp)
fixture_dir = root_dir / "fixtures"
fixture_dir.mkdir(parents=True)
target = fixture_dir / "note.txt"
target.write_text("line 1\n", encoding="utf-8")
baseline_root = Path(tmp) / "baseline"
candidate_root = Path(tmp) / "candidate"
(baseline_root / "fixtures").mkdir(parents=True)
(baseline_root / "fixtures" / "note.md").write_text("line 1\n", encoding="utf-8")
shutil.copytree(baseline_root, candidate_root)
task = _make_task(root_dir, allowed_file_types=[".md"], max_changed_lines=10)
snapshot = ArtifactManager(task).snapshot()
target.write_text("line 1 changed\n", encoding="utf-8")
baseline_task = _make_task(baseline_root, allowed_file_types=[".md"], max_changed_lines=10)
snapshot = ArtifactManager(baseline_task).snapshot()
(candidate_root / "fixtures" / "extra.txt").write_text("new file\n", encoding="utf-8")
with self.assertRaises(MutationValidationError) as ctx:
validate_candidate_changes(task, snapshot)
validate_candidate_changes(baseline_task, snapshot, candidate_root)
self.assertIn("disallowed file type", str(ctx.exception))
def test_rejects_renamed_file_with_disallowed_extension(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
baseline_root = Path(tmp) / "baseline"
candidate_root = Path(tmp) / "candidate"
(baseline_root / "fixtures").mkdir(parents=True)
(baseline_root / "fixtures" / "note.md").write_text("line 1\n", encoding="utf-8")
shutil.copytree(baseline_root, candidate_root)
baseline_task = _make_task(baseline_root, allowed_file_types=[".md"], max_changed_lines=10)
snapshot = ArtifactManager(baseline_task).snapshot()
(candidate_root / "fixtures" / "note.md").unlink()
(candidate_root / "fixtures" / "note.txt").write_text("line 1\n", encoding="utf-8")
with self.assertRaises(MutationValidationError) as ctx:
validate_candidate_changes(baseline_task, snapshot, candidate_root)
self.assertIn("disallowed file type", str(ctx.exception))

507
tests/test_orchestrator.py Normal file
View File

@ -0,0 +1,507 @@
from __future__ import annotations
import os
import tempfile
import unittest
from dataclasses import replace
from pathlib import Path
from engine.models import (
ArtifactSpec,
BudgetSpec,
LoggingSpec,
MutationSpec,
MutatorSpec,
ObjectiveSpec,
PolicySpec,
RunnerSpec,
ScorerParseSpec,
ScorerSpec,
TaskSpec,
)
from engine.orchestrator import run_single_iteration
def make_task(task_root: Path, max_changed_lines: int = 20, runner_command: str | None = None) -> TaskSpec:
return TaskSpec(
id="demo",
description="Demo task",
artifacts=ArtifactSpec(include=["fixtures/*.md"], exclude=[], max_files_per_iteration=1),
mutation=MutationSpec(mode="direct_edit", allowed_file_types=[".md"], max_changed_lines=max_changed_lines),
mutator=MutatorSpec(
type="command",
command="python ../../scripts/mutate_demo.py --task-dir . --artifact fixtures/sample.md",
cwd="tasks/demo",
timeout_seconds=30,
),
runner=RunnerSpec(
command=runner_command
or "python ../../scripts/evaluate_demo.py --task-dir . --artifact fixtures/sample.md --output ../../work/run.json",
cwd="tasks/demo",
timeout_seconds=30,
),
scorer=ScorerSpec(
type="command",
command="python scripts/score_demo.py --input work/run.json",
timeout_seconds=30,
parse=ScorerParseSpec(format="json", score_field="score", metrics_field="metrics"),
),
objective=ObjectiveSpec(primary_metric="score", direction="maximize"),
constraints=[],
policy=PolicySpec(keep_if="better_primary", tie_breakers=[], on_failure="discard"),
budget=BudgetSpec(max_iterations=1, max_failures=1),
logging=LoggingSpec(results_file="work/results.jsonl", candidate_dir="work/candidates"),
root_dir=task_root,
)
class OrchestratorTest(unittest.TestCase):
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory()
self.addCleanup(self.temp_dir.cleanup)
self.repo_root = Path(self.temp_dir.name)
self.task_root = self.repo_root / "tasks" / "demo"
(self.task_root / "fixtures").mkdir(parents=True)
(self.task_root / "subdir").mkdir()
(self.repo_root / "scripts").mkdir()
(self.repo_root / "work").mkdir()
(self.task_root / "fixtures" / "sample.md").write_text("# Original\n", encoding="utf-8")
def write_mutator(self, body: str | None = None) -> None:
script = body or (
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"(task_dir / args.artifact).write_text('# Candidate\\n', encoding='utf-8')\n"
)
(self.repo_root / "scripts" / "mutate_demo.py").write_text(script, encoding="utf-8")
def write_runner(self, score: float = 2.0, body: str | None = None) -> None:
script = body or (
"from pathlib import Path\n"
"import argparse\n"
"import json\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"artifact_path = task_dir / args.artifact\n"
"payload = {'score': " + repr(score) + ", 'metrics': {'artifact_text': artifact_path.read_text(encoding='utf-8')}}\n"
"output_path = (task_dir / args.output).resolve()\n"
"output_path.parent.mkdir(parents=True, exist_ok=True)\n"
"output_path.write_text(json.dumps(payload), encoding='utf-8')\n"
)
(self.repo_root / "scripts" / "evaluate_demo.py").write_text(script, encoding="utf-8")
def write_scorer(self) -> None:
(self.repo_root / "scripts" / "score_demo.py").write_text(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--input')\n"
"args = parser.parse_args()\n"
"print(Path(args.input).read_text(encoding='utf-8'))\n",
encoding="utf-8",
)
def test_scorer_uses_its_own_timeout_seconds(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
(self.repo_root / "scripts" / "score_demo.py").write_text(
"from pathlib import Path\n"
"import argparse\n"
"import time\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--input')\n"
"args = parser.parse_args()\n"
"time.sleep(2)\n"
"print(Path(args.input).read_text(encoding='utf-8'))\n",
encoding="utf-8",
)
task = replace(
make_task(self.task_root),
runner=replace(make_task(self.task_root).runner, timeout_seconds=5),
scorer=replace(make_task(self.task_root).scorer, timeout_seconds=1),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertEqual(decision.reason, "scorer failed with exit code 124")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_keep_uses_repo_relative_cwd_and_syncs_candidate_back(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
def test_crash_leaves_main_workspace_unchanged(self) -> None:
self.write_mutator()
self.write_runner(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"(task_dir / args.artifact).write_text('# Runner Modified\\n', encoding='utf-8')\n"
"raise SystemExit(9)\n"
)
)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertEqual(decision.reason, "command failed with exit code 9")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_discard_leaves_main_workspace_unchanged_when_candidate_does_not_improve(self) -> None:
self.write_mutator()
self.write_runner(score=0.5)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "discard")
self.assertEqual(decision.reason, "candidate did not improve primary score")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_validation_discard_happens_before_runner_execution(self) -> None:
self.write_mutator(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"(task_dir / args.artifact).write_text('# Candidate\\nextra line\\n', encoding='utf-8')\n"
)
)
runner_marker = self.repo_root / "work" / "runner-executed.txt"
self.write_runner(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"args = parser.parse_args()\n"
"marker = Path('../../work/runner-executed.txt').resolve()\n"
"marker.parent.mkdir(parents=True, exist_ok=True)\n"
"marker.write_text('ran\\n', encoding='utf-8')\n"
"raise SystemExit(0)\n"
)
)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root, max_changed_lines=1), baseline_score=1.0)
self.assertEqual(decision.status, "discard")
self.assertIn("too many changed lines", decision.reason)
self.assertFalse(runner_marker.exists())
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_validation_rejects_non_artifact_mutation_before_runner_execution(self) -> None:
self.write_mutator(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"(task_dir / args.artifact).write_text('# Candidate\\n', encoding='utf-8')\n"
"external_path = (task_dir / '../../scripts/payload.txt').resolve()\n"
"external_path.write_text('mutated\\n', encoding='utf-8')\n"
)
)
runner_marker = self.repo_root / "work" / "runner-executed.txt"
self.write_runner(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"parser.add_argument('--score')\n"
"args = parser.parse_args()\n"
"marker = Path('../../work/runner-executed.txt').resolve()\n"
"marker.parent.mkdir(parents=True, exist_ok=True)\n"
"marker.write_text('ran\\n', encoding='utf-8')\n"
)
)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "discard")
self.assertIn("non-artifact", decision.reason)
self.assertFalse(runner_marker.exists())
self.assertFalse((self.repo_root / "scripts" / "payload.txt").exists())
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_keep_revalidates_final_candidate_state_before_sync_back(self) -> None:
self.write_mutator()
self.write_runner(
body=(
"from pathlib import Path\n"
"import argparse\n"
"import json\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"parser.add_argument('--output')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"artifact_path = task_dir / args.artifact\n"
"artifact_path.write_text('# Candidate\\nextra line\\n', encoding='utf-8')\n"
"payload = {'score': 2.0, 'metrics': {'artifact_text': artifact_path.read_text(encoding='utf-8')}}\n"
"output_path = (task_dir / args.output).resolve()\n"
"output_path.parent.mkdir(parents=True, exist_ok=True)\n"
"output_path.write_text(json.dumps(payload), encoding='utf-8')\n"
)
)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root, max_changed_lines=2), baseline_score=1.0)
self.assertEqual(decision.status, "discard")
self.assertIn("too many changed lines", decision.reason)
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_keep_ignores_populated_runtime_work_directory(self) -> None:
runtime_path = self.repo_root / "work" / "cache" / "seed.txt"
runtime_path.parent.mkdir(parents=True)
runtime_path.write_text("seed\n", encoding="utf-8")
self.write_mutator(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"artifact_path = task_dir / args.artifact\n"
"runtime_path = (task_dir / '../../work/cache/seed.txt').resolve()\n"
"artifact_text = '# Candidate\\n'\n"
"if runtime_path.exists():\n"
" runtime_path.write_text('mutated\\n', encoding='utf-8')\n"
" artifact_text = '# Candidate saw runtime state\\n'\n"
"artifact_path.write_text(artifact_text, encoding='utf-8')\n"
)
)
self.write_runner(score=2.0)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
self.assertEqual(runtime_path.read_text(encoding="utf-8"), "seed\n")
def test_keep_ignores_runtime_cache_directories_in_copy_and_hash_validation(self) -> None:
venv_path = self.repo_root / ".venv" / "seed.txt"
venv_path.parent.mkdir(parents=True)
venv_path.write_text("seed\n", encoding="utf-8")
pytest_cache_path = self.repo_root / ".pytest_cache" / "seed.txt"
pytest_cache_path.parent.mkdir(parents=True)
pytest_cache_path.write_text("seed\n", encoding="utf-8")
self.write_mutator(
body=(
"from pathlib import Path\n"
"import argparse\n"
"parser = argparse.ArgumentParser()\n"
"parser.add_argument('--task-dir')\n"
"parser.add_argument('--artifact')\n"
"args = parser.parse_args()\n"
"task_dir = Path(args.task_dir).resolve()\n"
"artifact_path = task_dir / args.artifact\n"
"venv_path = (task_dir / '../../.venv/seed.txt').resolve()\n"
"pytest_cache_path = (task_dir / '../../.pytest_cache/seed.txt').resolve()\n"
"artifact_text = '# Candidate\\n'\n"
"if venv_path.exists() or pytest_cache_path.exists():\n"
" artifact_text = '# Candidate saw runtime cache\\n'\n"
" if venv_path.exists():\n"
" venv_path.write_text('mutated\\n', encoding='utf-8')\n"
" if pytest_cache_path.exists():\n"
" pytest_cache_path.write_text('mutated\\n', encoding='utf-8')\n"
"artifact_path.write_text(artifact_text, encoding='utf-8')\n"
)
)
self.write_runner(score=2.0)
self.write_scorer()
decision = run_single_iteration(make_task(self.task_root), baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
self.assertEqual(venv_path.read_text(encoding="utf-8"), "seed\n")
self.assertEqual(pytest_cache_path.read_text(encoding="utf-8"), "seed\n")
def test_rejects_parent_segment_in_mutator_cwd(self) -> None:
marker_path = self.repo_root / "work" / "mutator-executed.txt"
self.write_mutator(
body=(
"from pathlib import Path\n"
f"Path({str(marker_path)!r}).write_text('ran\\n', encoding='utf-8')\n"
)
)
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
mutator=replace(make_task(self.task_root).mutator, cwd="../tasks/demo"),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertIn("mutator.cwd", decision.reason)
self.assertFalse(marker_path.exists())
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_crashes_when_mutator_cwd_does_not_exist(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
mutator=replace(make_task(self.task_root).mutator, cwd="tasks/missing"),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertIn("mutator.cwd", decision.reason)
self.assertIn("does not exist", decision.reason)
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_rejects_parent_segment_in_runner_cwd(self) -> None:
self.write_mutator()
marker_path = self.repo_root / "work" / "runner-executed.txt"
self.write_runner(
body=(
"from pathlib import Path\n"
f"Path({str(marker_path)!r}).write_text('ran\\n', encoding='utf-8')\n"
)
)
self.write_scorer()
task = replace(
make_task(self.task_root),
runner=replace(make_task(self.task_root).runner, cwd="../tasks/demo"),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertIn("runner.cwd", decision.reason)
self.assertFalse(marker_path.exists())
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_crashes_when_runner_cwd_does_not_exist(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
runner=replace(make_task(self.task_root).runner, cwd="tasks/missing"),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "crash")
self.assertIn("runner.cwd", decision.reason)
self.assertIn("does not exist", decision.reason)
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Original\n")
def test_path_isolation_works_when_invoked_outside_repo_root(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
outside_dir = self.repo_root.parent
original_cwd = Path.cwd()
self.addCleanup(os.chdir, str(original_cwd))
os.chdir(str(outside_dir))
decision = run_single_iteration(make_task(self.task_root.resolve()), baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
def test_keep_succeeds_when_mutator_and_runner_cwd_are_nested_under_task_root(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
mutator=replace(
make_task(self.task_root).mutator,
command="python ../../../scripts/mutate_demo.py --task-dir .. --artifact fixtures/sample.md",
cwd="tasks/demo/subdir",
),
runner=replace(
make_task(self.task_root).runner,
command=(
"python ../../../scripts/evaluate_demo.py --task-dir .. --artifact fixtures/sample.md "
"--output ../../work/run.json"
),
cwd="tasks/demo/subdir",
),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
def test_keep_succeeds_when_mutator_and_runner_cwd_are_repo_relative_outside_task_tree(self) -> None:
self.write_mutator()
self.write_runner(score=2.0)
self.write_scorer()
task = replace(
make_task(self.task_root),
mutator=replace(
make_task(self.task_root).mutator,
command="python mutate_demo.py --task-dir ../tasks/demo --artifact fixtures/sample.md",
cwd="scripts",
),
runner=replace(
make_task(self.task_root).runner,
command=(
"python evaluate_demo.py --task-dir ../tasks/demo --artifact fixtures/sample.md "
"--output ../../work/run.json"
),
cwd="scripts",
),
)
decision = run_single_iteration(task, baseline_score=1.0)
self.assertEqual(decision.status, "keep")
self.assertEqual((self.task_root / "fixtures" / "sample.md").read_text(encoding="utf-8"), "# Candidate\n")
if __name__ == "__main__":
unittest.main()

View File

@ -17,6 +17,11 @@ mutation:
mode: direct_edit
allowed_file_types: [".txt"]
max_changed_lines: 10
mutator:
type: command
command: "python -c \\\"print('mutate')\\\""
cwd: "."
timeout_seconds: 5
runner:
command: "python -c \\\"print('run')\\\""
cwd: "."
@ -24,6 +29,7 @@ runner:
scorer:
type: command
command: "python -c \\\"import json; print(json.dumps({'score': 1, 'metrics': {'violation_count': 0}}))\\\""
timeout_seconds: 15
parse:
format: json
score_field: "score"
@ -61,6 +67,21 @@ class TaskLoaderTest(unittest.TestCase):
self.assertEqual(task.id, "demo")
self.assertEqual(task.artifacts.max_files_per_iteration, 1)
self.assertEqual(task.constraints[0].metric, "violation_count")
self.assertEqual(task.mutator.type, "command")
self.assertEqual(task.mutator.command, "python -c \"print('mutate')\"")
self.assertEqual(task.mutator.cwd, ".")
self.assertEqual(task.mutator.timeout_seconds, 5)
self.assertEqual(task.runner.timeout_seconds, 10)
self.assertEqual(task.scorer.timeout_seconds, 15)
def test_loads_repository_sample_task(self) -> None:
task = load_task(Path("tasks/skill-quality/task.yaml"))
self.assertEqual(task.id, "skill-quality")
self.assertEqual(task.mutator.type, "command")
self.assertEqual(task.mutator.cwd, "tasks/skill-quality")
self.assertEqual(task.mutator.timeout_seconds, 30)
self.assertEqual(task.runner.timeout_seconds, 30)
self.assertEqual(task.scorer.timeout_seconds, 30)
def test_rejects_missing_required_section(self) -> None:
content = VALID_TASK.replace("objective:\n primary_metric: score\n direction: maximize\n", "")
@ -85,6 +106,30 @@ class TaskLoaderTest(unittest.TestCase):
load_task(self.write_task(content))
self.assertIn("mutation.mode", str(ctx.exception))
def test_rejects_invalid_mutator_type(self) -> None:
content = VALID_TASK.replace("type: command", "type: script", 1)
with self.assertRaises(TaskValidationError) as ctx:
load_task(self.write_task(content))
self.assertIn("mutator.type", str(ctx.exception))
def test_rejects_missing_mutator_cwd(self) -> None:
content = VALID_TASK.replace(" cwd: \".\"\n", "", 1)
with self.assertRaises(TaskValidationError) as ctx:
load_task(self.write_task(content))
self.assertIn("cwd", str(ctx.exception))
def test_rejects_missing_mutator_timeout_seconds(self) -> None:
content = VALID_TASK.replace(" timeout_seconds: 5\n", "", 1)
with self.assertRaises(TaskValidationError) as ctx:
load_task(self.write_task(content))
self.assertIn("timeout_seconds", str(ctx.exception))
def test_rejects_missing_scorer_timeout_seconds(self) -> None:
content = VALID_TASK.replace(" timeout_seconds: 15\n", "", 1)
with self.assertRaises(TaskValidationError) as ctx:
load_task(self.write_task(content))
self.assertIn("timeout_seconds", str(ctx.exception))
if __name__ == "__main__":
unittest.main()