Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/benchflow/_utils/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,56 @@ def classify_verifier_error(verifier_error: str | None) -> str | None:
return "verifier_other"


def classify_result(
*,
reward: float | None,
error: str | None,
verifier_error: str | None,
) -> str:
"""Classify a single result into exactly one terminal bucket.

Returns one of ``"passed"``, ``"failed"``, ``"errored"``,
``"verifier_errored"``. The buckets are disjoint and exhaustive, so
``passed + failed + errored + verifier_errored == total`` holds
structurally for any set of results.

Precedence:

1. A result with a reward is ``"passed"`` (reward == 1.0) or
``"failed"`` (any other reward) — an explicit reward always wins,
even when an ``error`` string is also present (e.g. a warning).
2. With no reward, an agent ``error`` makes it ``"errored"``.
3. With no reward and no agent error, a ``verifier_error`` makes it
``"verifier_errored"``. ``errored`` therefore takes precedence over
``verifier_errored`` when both errors are present.
4. With no reward and no error of either kind, the result is
``"errored"`` — a terminal result must land in some bucket, and an
absent reward with no recorded cause is still a failure to produce
a verdict.
"""
if reward is not None:
return "passed" if reward == 1.0 else "failed"
if error:
return "errored"
if verifier_error:
return "verifier_errored"
return "errored"


def classify_result_dict(result: dict) -> str:
"""Classify a result dict (as persisted to ``result.json``) into a bucket.

Thin wrapper over :func:`classify_result` for the dict-shaped results
used by ``Evaluation.run()``. See :func:`classify_result` for the
bucket precedence rules.
"""
return classify_result(
reward=extract_reward(result),
error=result.get("error"),
verifier_error=result.get("verifier_error"),
)


def pass_rate(*, passed: int, total: int) -> float:
"""Pass rate over all tasks."""
return passed / total if total > 0 else 0.0
Expand Down
53 changes: 46 additions & 7 deletions src/benchflow/agents/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,40 @@ def infer_env_key_for_model(model: str) -> str | None:

VALID_PROTOCOLS = {"acp", "acpx"}

# ---------------------------------------------------------------------------
# The ``acpx:`` runtime-key namespace
# ---------------------------------------------------------------------------
#
# An ``acpx/<agent>`` spec resolves to an acpx-wrapped AgentConfig whose
# install/launch commands route through the acpx CLI. That wrapped config is
# registered into ``AGENTS`` (and the installer/launch maps) under a stable
# runtime key prefixed with ``ACPX_KEY_PREFIX`` so later *name-keyed* lookups
# in the Rollout/Evaluation path pick up the wrapped commands.
#
# This namespace is owned end to end by ``resolve_agent_key``: it is the only
# function that *mints* an ``acpx:`` key (by registering the wrapped config).
# Two other sites must agree with that convention and are documented here so
# the contract is explicit rather than implied:
#
# - ``_acpx_wrap`` produces the wrapped config whose ``name`` carries the
# ``acpx:`` prefix (via ``acpx_runtime_key``).
# - ``resolve_agent`` round-trips an already-registered ``acpx:`` key:
# ``parse_agent_spec`` leaves it whole under the default ``acp`` protocol,
# and the ``protocol == "acp" and name in AGENTS`` branch returns it as-is.
#
# Changing the prefix or this round-trip behavior requires updating all three.
ACPX_KEY_PREFIX = "acpx:"


def acpx_runtime_key(canonical_name: str) -> str:
"""Return the stable ``acpx:`` runtime key for a canonical agent name.

Single source of truth for the ``acpx:`` namespace — see the module-level
comment above. ``resolve_agent_key`` registers the wrapped config under
this key; ``resolve_agent`` round-trips it back to that config.
"""
return f"{ACPX_KEY_PREFIX}{canonical_name}"


def parse_agent_spec(spec: str) -> tuple[str, str]:
"""Parse an agent spec like 'acp/claude-agent-acp', 'acpx/claude', or 'claude'.
Expand Down Expand Up @@ -667,7 +701,8 @@ def _acpx_wrap(config: AgentConfig) -> AgentConfig:
break

return AgentConfig(
name=f"acpx:{config.name}",
# ``acpx:`` runtime key — see acpx_runtime_key / module-level contract.
name=acpx_runtime_key(config.name),
install_cmd=f"{config.install_cmd} && {_ACPX_INSTALL}",
launch_cmd=(
f'export PATH="{_JS_AGENT_PATH}" && acpx {acpx_agent_name} --approve-all'
Expand Down Expand Up @@ -702,7 +737,8 @@ def resolve_agent(spec: str) -> AgentConfig:
)

# An already-resolved acpx runtime key (e.g. "acpx:claude-agent-acp")
# round-trips: parse_agent_spec leaves it whole and it lives in AGENTS.
# round-trips: parse_agent_spec leaves it whole under the default "acp"
# protocol and it lives in AGENTS. See the ACPX_KEY_PREFIX contract.
if protocol == "acp" and name in AGENTS:
return AGENTS[name]

Expand All @@ -725,12 +761,15 @@ def resolve_agent(spec: str) -> AgentConfig:
def resolve_agent_key(spec: str) -> str:
"""Resolve an agent spec to a stable registry key.

For plain ACP agents this is the canonical agent name. For ``acpx/<agent>``
specs the acpx-wrapped config (acpx install/launch commands) is registered
into ``AGENTS``/``AGENT_INSTALLERS``/``AGENT_LAUNCH`` under a stable runtime
key (``acpx:<canonical>``) so that name-keyed lookups in the
This function owns the ``acpx:`` runtime-key namespace (see the
``ACPX_KEY_PREFIX`` module-level contract). For plain ACP agents the key
is the canonical agent name. For ``acpx/<agent>`` specs the acpx-wrapped
config (acpx install/launch commands) is registered into
``AGENTS``/``AGENT_INSTALLERS``/``AGENT_LAUNCH`` under the stable runtime
key ``acpx_runtime_key(<canonical>)`` so that name-keyed lookups in the
Rollout/Evaluation path use the wrapped commands instead of the literal
spec string.
spec string. ``resolve_agent`` then round-trips that key back to the
wrapped config.

Unknown agents are returned unchanged so callers can still surface their
own diagnostics (raw-command fallback).
Expand Down
31 changes: 9 additions & 22 deletions src/benchflow/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
INSTALL_FAILED,
PIPE_CLOSED,
classify_error,
extract_reward,
classify_result_dict,
pass_rate,
pass_rate_excl_errors,
)
Expand Down Expand Up @@ -662,31 +662,18 @@ async def bounded(td: Path) -> tuple[str, RunResult]:
"agent_result": _agent_result_from_rollout(result),
}

# Count — all values are dicts now, no type branching needed
# Count — every result lands in exactly one bucket via the shared
# classifier, so passed+failed+errored+verifier_errored == total
# holds structurally (see classify_result in _utils.scoring).
buckets = [classify_result_dict(r) for r in all_results.values()]
job_result = EvaluationResult(
job_name=self._job_name,
config=cfg,
total=len(task_dirs),
passed=sum(1 for r in all_results.values() if extract_reward(r) == 1.0),
failed=sum(
1
for r in all_results.values()
if (rw := extract_reward(r)) is not None and rw != 1.0
),
errored=sum(
1
for r in all_results.values()
if r.get("error") and r.get("rewards") is None
),
# Disjoint from `errored`: a result with both `error` and
# `verifier_error` is counted only as `errored`, so the
# passed+failed+errored+verifier_errored == total invariant holds.
verifier_errored=sum(
1
for r in all_results.values()
if r.get("verifier_error")
and not (r.get("error") and r.get("rewards") is None)
),
passed=buckets.count("passed"),
failed=buckets.count("failed"),
errored=buckets.count("errored"),
verifier_errored=buckets.count("verifier_errored"),
elapsed_sec=elapsed,
)

Expand Down
30 changes: 19 additions & 11 deletions src/benchflow/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from benchflow._utils.scoring import (
classify_error,
classify_result,
classify_verifier_error,
pass_rate,
pass_rate_excl_errors,
Expand Down Expand Up @@ -41,17 +42,30 @@ class TaskMetrics:
cost_usd: float | None = None
usage_source: str = "unavailable"

@property
def _bucket(self) -> str:
"""Terminal bucket via the shared classifier (see _utils.scoring).

Guarantees passed/failed/errored/verifier_errored are disjoint and
exhaustive — the same classification used by ``Evaluation.run()``.
"""
return classify_result(
reward=self.reward,
error=self.error,
verifier_error=self.verifier_error,
)

@property
def passed(self) -> bool:
return self.reward == 1.0
return self._bucket == "passed"

@property
def failed(self) -> bool:
return self.reward is not None and self.reward != 1.0
return self._bucket == "failed"

@property
def errored(self) -> bool:
return self.reward is None and self.error is not None
return self._bucket == "errored"

@property
def verifier_errored(self) -> bool:
Expand All @@ -60,18 +74,12 @@ def verifier_errored(self) -> bool:
Disjoint from `errored`: a task with both `error` and `verifier_error`
is classified as `errored` only, so the count buckets never overlap.
"""
return (
self.reward is None
and self.verifier_error is not None
and self.error is None
)
return self._bucket == "verifier_errored"

@property
def completed(self) -> bool:
"""True when task reached a terminal non-error reward state."""
return (
not self.errored and not self.verifier_errored and self.reward is not None
)
return self._bucket in ("passed", "failed")


@dataclass
Expand Down
12 changes: 12 additions & 0 deletions src/benchflow/sandbox/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ def __init__(
def _uses_compose(self) -> bool:
return False

@property
def is_mounted(self) -> bool:
"""Whether the rollout dir is host-bind-mounted into the sandbox.

When ``True`` the agent container's verifier output is already visible
on the host, so the verifier can skip a ``download_dir`` round-trip.
Backends that run remotely (Daytona, Modal) have no bind mount and
override this to ``False``. The default is ``False`` so non-mounted
backends are the safe assumption.
"""
return False

def _maybe_resolve_task_env(self) -> None:
if self.task_env_config.env and not self._uses_compose:
resolved = resolve_env_vars(self.task_env_config.env)
Expand Down
9 changes: 6 additions & 3 deletions src/benchflow/sandbox/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,18 +498,21 @@ def _wrap_command_with_env_file(env: dict[str, str], command: str) -> str:

The env vars are base64-encoded into the command string (not visible
as individual ``KEY=VALUE`` args in ``ps aux``), decoded to a
mode-0600 file inside the container, sourced, then deleted before the
real command runs.
mode-0600 file inside the container, and sourced before the real
command runs. A ``trap ... EXIT`` deletes the temp file
unconditionally — even if the decode/source step fails — so a failed
``&&`` chain can never leave the env file behind.
"""
env_body = "".join(f"export {k}={shlex.quote(v)}\n" for k, v in env.items())
encoded = base64.b64encode(env_body.encode()).decode()
# Unique suffix so concurrent exec() calls in one container can't
# clobber each other's env file.
env_path = f"/tmp/.benchflow_exec_env_{uuid.uuid4().hex[:16]}"
return (
f"trap 'rm -f {env_path}' EXIT && "
f"umask 077 && printf %s {shlex.quote(encoded)} | base64 -d > "
f"{env_path} && set -a && . {env_path} && set +a && "
f"rm -f {env_path} && {command}"
f"{command}"
)

async def attach(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/benchflow/task/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def _verify_test_script(self) -> VerifierResult:
service so the verifier can inspect *target-side* state — RCE markers,
DB modifications — instead of only the agent workspace (#248).
"""
service = getattr(self._task.config.verifier, "service", "main")
service = self._task.config.verifier.service
try:
await self._sandbox.upload_dir(
source_dir=self._task.paths.tests_dir,
Expand Down Expand Up @@ -202,7 +202,7 @@ async def _verify_test_script(self) -> VerifierResult:
# Download verifier output if it is not host-mounted. Only the agent's
# ``main`` container has the rollout dir bind-mounted; a target service
# never does, so target-side rewards (#248) are always downloaded.
is_mounted = service == "main" and getattr(self._sandbox, "is_mounted", False)
is_mounted = service == "main" and self._sandbox.is_mounted
if not is_mounted:
try:
await self._sandbox.download_dir(
Expand Down
18 changes: 13 additions & 5 deletions src/benchflow/traces/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,24 @@ def _cache_dir() -> Path:
def _pick_split_file(repo_files: list[str], split: str, suffix: str) -> str | None:
"""Pick the repo file matching *split* and *suffix*, if any.

Matches files under ``data/`` whose basename starts with the split name
(e.g. ``data/test-00000-of-00001.parquet`` or ``data/test.jsonl`` for
``split="test"``). Returns ``None`` when no file matches so the caller
can fall back to constructed filename candidates.
Matches either the exact ``{split}{suffix}`` basename (e.g.
``data/test.jsonl`` for ``split="test"``) or the HF sharded-parquet
convention ``{split}-NNNNN-of-NNNNN`` (e.g.
``data/test-00000-of-00001.parquet``). The sharded match is anchored on
``\\d+-of-\\d+`` so a sibling *subset* such as
``test-small-00000-of-00001.parquet`` is not mistaken for ``split="test"``.
Returns ``None`` when no file matches so the caller can fall back to
constructed filename candidates.
"""
sharded_re = re.compile(rf"^{re.escape(split)}-\d+-of-\d+")
candidates = [
f
for f in repo_files
if f.endswith(suffix)
and (Path(f).name == f"{split}{suffix}" or Path(f).name.startswith(f"{split}-"))
and (
Path(f).name == f"{split}{suffix}"
or sharded_re.match(Path(f).name) is not None
)
]
if not candidates:
return None
Expand Down
36 changes: 11 additions & 25 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,19 @@ def test_error_and_verifier_error_counted_once(self):
"""A result with BOTH error and verifier_error (rewards=None) must be
classified into exactly one bucket so the count invariant holds.

Mirrors the disjoint bucketing in Evaluation.run(): a result is
``errored`` when it has an agent error, and ``verifier_errored`` only
when it has a verifier error AND no agent error.
Exercises the real shared classifier: a result with an agent error is
``errored``, and ``verifier_errored`` only applies when there is a
verifier error AND no agent error — ``errored`` takes precedence.
"""
results = {
"a": {
"rewards": None,
"error": "agent crashed",
"verifier_error": "verifier also failed",
},
from benchflow._utils.scoring import classify_result_dict

result = {
"rewards": None,
"error": "agent crashed",
"verifier_error": "verifier also failed",
}
errored = sum(
1 for r in results.values() if r.get("error") and r.get("rewards") is None
)
verifier_errored = sum(
1
for r in results.values()
if r.get("verifier_error")
and not (r.get("error") and r.get("rewards") is None)
)
# Exactly one bucket — never double-counted.
assert errored == 1
assert verifier_errored == 0
counts = self._count(results)
assert counts["passed"] + counts["failed"] + errored + verifier_errored == len(
results
)
# The real classifier puts a both-errors result in exactly `errored`.
assert classify_result_dict(result) == "errored"


class TestRunTaskLoop:
Expand Down
Loading
Loading