From a063a69bff1d85fd18779ecb8c951f7d103e1dc3 Mon Sep 17 00:00:00 2001 From: Xeophon <46377542+xeophon@users.noreply.github.com> Date: Wed, 20 May 2026 11:21:57 +0200 Subject: [PATCH] APR-262 Add Harbor verifier modes for v1 --- docs/byo-harness.md | 13 + tests/test_v1_harbor_cli.py | 233 +++++++++++++- verifiers/v1/packages/tasksets/harbor.py | 370 +++++++++++++++++++---- 3 files changed, 561 insertions(+), 55 deletions(-) diff --git a/docs/byo-harness.md b/docs/byo-harness.md index a3ac46cff..76d5a7ed2 100644 --- a/docs/byo-harness.md +++ b/docs/byo-harness.md @@ -453,6 +453,19 @@ taskset owns Harbor task loading, sandbox overrides, task uploads, and test scoring. CLI harnesses own CLI installation/config/run behavior and work with any taskset that supplies a prompt. + +Harbor verifier environment settings in `task.toml` are honored by the v1 +taskset. By default, `[verifier]` runs in the agent sandbox. Setting +`[verifier.environment]` implies `environment_mode = "separate"`, and +`[verifier] environment_mode = "separate"` without a verifier environment starts +a fresh verifier sandbox from the task's main `[environment]`. Separate verifier +sandbox runs receive `/logs/artifacts` plus task `artifacts = [...]` entries, +then run `/tests/test.sh`; when no verifier-specific environment is declared, +v1 uploads the task's `tests/` directory because there is no Harbor Docker build +context to bake those tests into the image. Harbor reward parsing follows the +current Harbor order: `/logs/verifier/reward.json` first, then +`/logs/verifier/reward.txt`. + Tasksets can expose package-owned upload directories with `get_upload_dirs()`. The base `Taskset` discovers a sibling `skills/` directory by default, and `RLM` uploads that directory to `/rlm/skills` unless `skills=` is passed diff --git a/tests/test_v1_harbor_cli.py b/tests/test_v1_harbor_cli.py index d996ccc4c..aae7ca17d 100644 --- a/tests/test_v1_harbor_cli.py +++ b/tests/test_v1_harbor_cli.py @@ -1,9 +1,11 @@ import importlib import json import sys +import tarfile import types from pathlib import Path from types import ModuleType +from types import SimpleNamespace from typing import cast from uuid import uuid4 @@ -21,7 +23,7 @@ Terminus2, terminus_2_agent_script, ) -from verifiers.v1.packages.tasksets.harbor import harbor_reward +from verifiers.v1.packages.tasksets.harbor import HARBOR_REWARD_COMMAND, harbor_reward from verifiers.v1.utils.program_utils import merge_task_program, merge_task_sandbox @@ -140,6 +142,112 @@ def test_harbor_taskset_constructs_env_with_opencode( assert "task_dir" not in cast(dict[str, object], env.harness.program) +def test_harbor_reward_command_prefers_json_reward() -> None: + assert HARBOR_REWARD_COMMAND.index("reward.json") < HARBOR_REWARD_COMMAND.index( + "reward.txt" + ) + + +def test_harbor_taskset_resolves_verifier_environment_modes( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + package = write_harbor_package(tmp_path, monkeypatch) + tasks_root = cast(Path, getattr(package, "tasks_root")) + write_harbor_task(tasks_root, "shared-default") + implicit = write_harbor_task(tasks_root, "separate-implicit") + explicit = write_harbor_task(tasks_root, "separate-reuse-env") + implicit.joinpath("task.toml").write_text( + """ +schema_version = "1.2" +artifacts = ["/logs/agent/trajectory.json"] + +[environment] +docker_image = "agent:latest" +cpus = 1 +memory_mb = 2048 +storage_mb = 4096 + +[verifier] +timeout_sec = 30 + +[verifier.environment] +docker_image = "verifier:latest" +cpus = 2 +memory_mb = 1024 +storage_mb = 2048 +allow_internet = false +""".strip() + ) + explicit.joinpath("task.toml").write_text( + """ +schema_version = "1.2" + +[environment] +docker_image = "agent-reused:latest" +cpus = 3 +memory_mb = 3072 +storage_mb = 6144 + +[verifier] +environment_mode = "separate" +timeout_sec = 45 +""".strip() + ) + + rows = {task["task_name"]: task for task in getattr(package, "load_taskset")()} + + assert rows["shared-default"]["harbor"]["verifier_mode"] == "shared" + assert rows["shared-default"]["harbor"]["verifier_sandbox"] is None + assert rows["shared-default"]["harbor"]["verifier_upload_tests"] is False + assert rows["separate-implicit"]["harbor"]["verifier_mode"] == "separate" + assert rows["separate-implicit"]["harbor"]["verifier_sandbox"] == { + "image": "verifier:latest", + "cpu_cores": 2.0, + "memory_gb": 1.0, + "disk_size_gb": 2.0, + "timeout_minutes": 120, + "command_timeout": 30, + "workdir": "/app", + "scope": "rollout", + "network_access": False, + } + assert rows["separate-implicit"]["harbor"]["artifacts"] == [ + "/logs/agent/trajectory.json" + ] + assert rows["separate-implicit"]["harbor"]["verifier_upload_tests"] is False + assert rows["separate-reuse-env"]["harbor"]["verifier_mode"] == "separate" + assert rows["separate-reuse-env"]["harbor"]["verifier_sandbox"]["image"] == ( + "agent-reused:latest" + ) + assert rows["separate-reuse-env"]["harbor"]["verifier_sandbox"]["memory_gb"] == 3.0 + assert rows["separate-reuse-env"]["harbor"]["verifier_upload_tests"] is True + + +def test_harbor_taskset_rejects_shared_mode_with_verifier_environment( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + package = write_harbor_package(tmp_path, monkeypatch) + task_dir = write_harbor_task( + cast(Path, getattr(package, "tasks_root")), + "bad-shared-env", + ) + task_dir.joinpath("task.toml").write_text( + """ +[environment] +docker_image = "agent:latest" + +[verifier] +environment_mode = "shared" + +[verifier.environment] +docker_image = "verifier:latest" +""".strip() + ) + + with pytest.raises(ValueError, match="incompatible"): + list(getattr(package, "load_taskset")()) + + class FakeHarborCommandResult: def __init__( self, @@ -157,12 +265,56 @@ class FakeHarborSandboxClient: instances: list["FakeHarborSandboxClient"] = [] def __init__(self): + self.create_requests: list[object] = [] + self.created: list[str] = [] + self.deleted: list[str] = [] + self.upload_files: list[tuple[str, str, str]] = [] + self.download_files: list[tuple[str, str, str]] = [] self.execute_commands: list[tuple[str, int | None, str | None]] = [] self.background_jobs: list[tuple[str, str, int | None, str | None]] = [] type(self).instances.append(self) + async def create(self, request: object) -> object: + self.create_requests.append(request) + sandbox_id = f"verifier-sbx-{len(type(self).instances)}" + self.created.append(sandbox_id) + return SimpleNamespace(id=sandbox_id) + + async def wait_for_creation(self, sandbox_id: str) -> None: + assert sandbox_id + + async def delete(self, sandbox_id: str) -> None: + self.deleted.append(sandbox_id) + async def upload_file(self, *args: object, **kwargs: object) -> None: + sandbox_id = str(kwargs.get("sandbox_id") or args[0]) + file_path = str(kwargs.get("file_path") or args[1]) + local_file_path = str(kwargs.get("local_file_path") or args[2]) + self.upload_files.append((sandbox_id, file_path, local_file_path)) + + async def upload_bytes(self, *args: object, **kwargs: object) -> None: + sandbox_id = str(kwargs.get("sandbox_id") or args[0]) + file_path = str(kwargs.get("file_path") or args[1]) + self.upload_files.append((sandbox_id, file_path, "")) + + async def download_file(self, *args: object, **kwargs: object) -> None: + sandbox_id = str(kwargs.get("sandbox_id") or args[0]) + file_path = str(kwargs.get("file_path") or args[1]) + local_file_path = str(kwargs.get("local_file_path") or args[2]) + self.download_files.append((sandbox_id, file_path, local_file_path)) + local_path = Path(local_file_path) + local_path.parent.mkdir(parents=True, exist_ok=True) + if file_path.endswith(".tar.gz"): + with tarfile.open(local_path, "w:gz") as tar: + marker = local_path.parent / "marker.txt" + marker.write_text(file_path) + tar.add(marker, arcname="marker.txt") + return + local_path.write_text(file_path) + + async def read_file(self, *args: object, **kwargs: object) -> str: _ = args, kwargs + return "" async def execute_command( self, *args: object, **kwargs: object @@ -206,10 +358,89 @@ async def test_harbor_reward_uses_background_job_for_tests( client = FakeHarborSandboxClient.instances[0] assert reward == 1.0 + assert client.created == [] assert client.background_jobs == [("sbx-1", "bash test.sh", 120, "/tests")] assert ("bash test.sh", 120, "/tests") not in client.execute_commands +@pytest.mark.asyncio +async def test_harbor_reward_uses_fresh_separate_verifier_sandbox( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + task_dir = write_harbor_task(tmp_path) + fake_module = types.ModuleType("prime_sandboxes") + fake_module.AsyncSandboxClient = FakeHarborSandboxClient + + class FakeCreateSandboxRequest: + def __init__(self, **kwargs: object): + self.__dict__.update(kwargs) + + fake_module.CreateSandboxRequest = FakeCreateSandboxRequest + monkeypatch.setitem(sys.modules, "prime_sandboxes", fake_module) + FakeHarborSandboxClient.instances = [] + + reward = await harbor_reward( + { + "harbor": { + "task_dir": str(task_dir), + "test_timeout": 120, + "verifier_mode": "separate", + "verifier_sandbox": { + "image": "verifier:latest", + "scope": "rollout", + }, + "verifier_upload_tests": True, + "verifier_env": {"MODEL": "judge"}, + "artifacts": [ + {"source": "/logs/artifacts", "exclude": ["*.pt", "cache"]}, + "/logs/agent/trajectory.json", + {"source": "/tmp/answer.json", "destination": "answers/final.json"}, + "/data", + "/a/data", + "/b/data", + ], + } + }, + {"sandbox_id": "agent-sbx"}, + ) + + agent_client, verifier_client = FakeHarborSandboxClient.instances + assert reward == 1.0 + assert agent_client.created == [] + assert verifier_client.create_requests[0].docker_image == "verifier:latest" + assert verifier_client.deleted == ["verifier-sbx-2"] + assert verifier_client.background_jobs == [ + ("verifier-sbx-2", "bash test.sh", 120, "/tests") + ] + assert not any( + path == "/tmp/harbor_tests.tar.gz" for _, path, _ in agent_client.upload_files + ) + assert any( + path == "/tmp/harbor_tests.tar.gz" + for _, path, _ in verifier_client.upload_files + ) + transfer_command = agent_client.execute_commands[0][0] + assert "/logs/artifacts" in transfer_command + assert transfer_command.count("if [ -e /logs/artifacts ]; then") == 1 + assert "--exclude='*.pt'" in transfer_command + assert "--exclude=cache" in transfer_command + assert "/logs/agent/trajectory.json" in transfer_command + assert "/tmp/answer.json" in transfer_command + assert "answers/final.json" not in transfer_command + assert 'if [ -e /data ]; then\n mkdir -p "$tmp"\n' in transfer_command + assert "/a/data" in transfer_command + assert "/b/data" in transfer_command + assert agent_client.download_files[0][1] == "/tmp/_vf_harbor_inputs.tar.gz" + assert any( + path == "/tmp/_vf_harbor_inputs.tar.gz" + for _, path, _ in verifier_client.upload_files + ) + assert any( + "tar -xzf" in command for command, _, _ in verifier_client.execute_commands + ) + assert verifier_client.background_jobs[0][3] == "/tests" + + def test_packaged_harbor_and_opencode_imports_are_reexported() -> None: from verifiers.v1.packages.harnesses import OpenCode, OpenCodeConfig, Pi from verifiers.v1.packages.tasksets import HarborTaskset diff --git a/verifiers/v1/packages/tasksets/harbor.py b/verifiers/v1/packages/tasksets/harbor.py index d84e25f3b..14e08be14 100644 --- a/verifiers/v1/packages/tasksets/harbor.py +++ b/verifiers/v1/packages/tasksets/harbor.py @@ -2,6 +2,7 @@ import json import os import re +import shlex import shutil import subprocess import sys @@ -9,18 +10,27 @@ import tempfile from collections.abc import Iterable, Mapping from importlib.resources import files -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import cast from verifiers.utils.import_utils import load_toml from ...config import TasksetConfig from ...taskset import Taskset -from ...utils.sandbox_utils import SandboxClient +from ...utils.sandbox_utils import SandboxClient, create_sandbox_lease from verifiers.decorators import reward -from ...types import ConfigData +from ...types import ConfigData, ConfigMap TASKS_SUBDIR = "tasks" +VERIFIER_MODE_SHARED = "shared" +VERIFIER_MODE_SEPARATE = "separate" +HARBOR_ARTIFACTS_DIR = "/logs/artifacts" +HARBOR_REWARD_COMMAND = ( + "if [ -s /logs/verifier/reward.json ]; then " + "cat /logs/verifier/reward.json; " + "elif [ -s /logs/verifier/reward.txt ]; then " + "cat /logs/verifier/reward.txt; fi" +) def _resolve_caller_package() -> str | None: @@ -137,26 +147,58 @@ def task_row(self, task_dir: Path, index: int) -> ConfigData: raise TypeError(f"{task_toml_path} [environment] must be a mapping.") agent_config = config.get("agent", {}) or {} verifier_config = config.get("verifier", {}) or {} + if not isinstance(agent_config, Mapping): + raise TypeError(f"{task_toml_path} [agent] must be a mapping.") + if not isinstance(verifier_config, Mapping): + raise TypeError(f"{task_toml_path} [verifier] must be a mapping.") instruction = instruction_path.read_text().strip() task_remote_dir = self.config.task_dir.rstrip("/") or "/task" - sandbox = { - "image": environment.get("docker_image") or self.config.docker_image, - "cpu_cores": parse_number(environment.get("cpus"), self.config.cpu_cores), - "memory_gb": parse_gb(environment.get("memory"), self.config.memory_gb), - "disk_size_gb": parse_gb( - environment.get("storage"), self.config.disk_size_gb - ), - "timeout_minutes": self.config.timeout_minutes, - "command_timeout": int( - parse_number( - agent_config.get("timeout_sec"), self.config.agent_timeout_seconds + test_timeout = parse_number( + verifier_config.get("timeout_sec"), + self.config.verifier_timeout_seconds, + ) + verifier_environment = verifier_config.get("environment") + verifier_mode = verifier_config.get("environment_mode") + if verifier_mode is not None: + verifier_mode = str(verifier_mode) + if verifier_mode not in {VERIFIER_MODE_SHARED, VERIFIER_MODE_SEPARATE}: + raise ValueError( + f"{task_toml_path} [verifier].environment_mode must be " + "'shared' or 'separate'." ) - ), - "workdir": self.config.workdir, - "scope": self.config.scope, - } - if "allow_internet" in environment: - sandbox["network_access"] = bool(environment["allow_internet"]) + elif verifier_environment is not None: + verifier_mode = VERIFIER_MODE_SEPARATE + else: + verifier_mode = VERIFIER_MODE_SHARED + if verifier_mode == VERIFIER_MODE_SHARED and verifier_environment is not None: + raise ValueError( + f"{task_toml_path} [verifier].environment_mode='shared' is " + "incompatible with [verifier.environment]." + ) + if ( + verifier_mode == VERIFIER_MODE_SEPARATE + and verifier_environment is not None + and not isinstance(verifier_environment, Mapping) + ): + raise TypeError( + f"{task_toml_path} [verifier.environment] must be a mapping." + ) + agent_timeout = int( + parse_number( + agent_config.get("timeout_sec"), self.config.agent_timeout_seconds + ) + ) + sandbox = self.sandbox_config(environment, agent_timeout) + verifier_sandbox: ConfigData | None = None + verifier_upload_tests = False + if verifier_mode == VERIFIER_MODE_SEPARATE and verifier_environment is None: + verifier_sandbox = {**sandbox, "command_timeout": int(test_timeout)} + verifier_upload_tests = True + elif verifier_mode == VERIFIER_MODE_SEPARATE: + verifier_environment = cast(ConfigMap, verifier_environment) + verifier_sandbox = self.sandbox_config( + verifier_environment, int(test_timeout) + ) return { "example_id": index, "task_name": task_dir.name, @@ -183,10 +225,12 @@ def task_row(self, task_dir: Path, index: int) -> ConfigData: "task_name": task_dir.name, "config": config, "docker_image": environment.get("docker_image"), - "test_timeout": parse_number( - verifier_config.get("timeout_sec"), - self.config.verifier_timeout_seconds, - ), + "test_timeout": test_timeout, + "verifier_mode": verifier_mode, + "verifier_sandbox": verifier_sandbox, + "verifier_upload_tests": verifier_upload_tests, + "verifier_env": verifier_config.get("env") or {}, + "artifacts": config.get("artifacts") or [], }, "info": { "harbor": { @@ -196,6 +240,33 @@ def task_row(self, task_dir: Path, index: int) -> ConfigData: }, } + def sandbox_config( + self, environment: ConfigMap, command_timeout: int + ) -> ConfigData: + memory = ( + f"{environment['memory_mb']}mb" + if "memory_mb" in environment + else environment.get("memory") + ) + storage = ( + f"{environment['storage_mb']}mb" + if "storage_mb" in environment + else environment.get("storage") + ) + sandbox: ConfigData = { + "image": environment.get("docker_image") or self.config.docker_image, + "cpu_cores": parse_number(environment.get("cpus"), self.config.cpu_cores), + "memory_gb": parse_gb(memory, self.config.memory_gb), + "disk_size_gb": parse_gb(storage, self.config.disk_size_gb), + "timeout_minutes": self.config.timeout_minutes, + "command_timeout": command_timeout, + "workdir": self.config.workdir, + "scope": self.config.scope, + } + if "allow_internet" in environment: + sandbox["network_access"] = bool(environment["allow_internet"]) + return sandbox + def harbor_task_dirs(root: Path, task_names: Iterable[str] | None = None) -> list[Path]: selected = set(task_names or []) @@ -312,38 +383,209 @@ async def harbor_reward(task, state) -> float: if not isinstance(harbor, Mapping): return 0.0 task_dir = Path(str(harbor["task_dir"])) + mode = str(harbor.get("verifier_mode") or VERIFIER_MODE_SHARED) + timeout = int(parse_number(harbor.get("test_timeout"), 900)) + verifier_env = harbor.get("verifier_env") or {} + if not isinstance(verifier_env, Mapping): + raise TypeError("[verifier].env must be a mapping.") + verifier_env = {str(key): str(value) for key, value in verifier_env.items()} + verifier_env = verifier_env or None from prime_sandboxes import AsyncSandboxClient client = cast(SandboxClient, AsyncSandboxClient()) try: - await upload_harbor_tests(client, sandbox_id, task_dir) - test_timeout = int(parse_number(harbor.get("test_timeout"), 900)) - result = await client.run_background_job( - sandbox_id=sandbox_id, - command="bash test.sh", - working_dir="/tests", - timeout=test_timeout, - ) - state["harbor_tests"] = { - "returncode": result.exit_code, - "stdout": result.stdout or "", - "stderr": result.stderr or "", - } - reward_result = await client.execute_command( - sandbox_id=sandbox_id, - command=( - "if [ -s /logs/verifier/reward.txt ]; then " - "cat /logs/verifier/reward.txt; " - "elif [ -s /logs/verifier/reward.json ]; then " - "cat /logs/verifier/reward.json; fi" - ), - ) + if mode == VERIFIER_MODE_SEPARATE: + reward_text = await run_separate_harbor_verifier( + client, sandbox_id, harbor, state, timeout, verifier_env + ) + else: + await upload_harbor_tests(client, sandbox_id, task_dir) + reward_text = await run_harbor_tests( + client, + sandbox_id, + state, + command="bash test.sh", + working_dir="/tests", + timeout=timeout, + env=verifier_env, + ) except Exception as e: state["harbor_error"] = str(e) return 0.0 finally: await client.aclose() - return parse_reward_text(str(reward_result.stdout or "").strip()) + return parse_reward_text(str(reward_text or "").strip()) + + +async def run_separate_harbor_verifier( + agent_client: SandboxClient, + agent_sandbox_id: str, + harbor: ConfigMap, + state: ConfigData, + timeout: int, + verifier_env: dict[str, str] | None, +) -> str: + """Run Harbor's separate verifier mode in a fresh sandbox. + + This is needed only when Harbor resolves the verifier environment separately + from the agent environment; the verifier image owns /tests/test.sh, so we + transfer just the configured grading inputs before running it. + """ + sandbox = harbor.get("verifier_sandbox") + if not isinstance(sandbox, Mapping): + raise RuntimeError("Separate Harbor verifier did not resolve a sandbox.") + lease = await create_sandbox_lease(cast(ConfigData, dict(sandbox)), "harbor") + state["harbor_verifier_sandbox_id"] = lease.id + try: + await lease.execute("mkdir -p /logs/verifier /logs/artifacts /tests") + if harbor.get("verifier_upload_tests"): + await upload_harbor_tests( + lease.client, lease.id, Path(str(harbor["task_dir"])) + ) + await transfer_harbor_verifier_inputs( + agent_client, + agent_sandbox_id, + lease.client, + lease.id, + harbor, + ) + return await run_harbor_tests( + lease.client, + lease.id, + state, + command="bash test.sh", + working_dir="/tests", + timeout=timeout, + env=verifier_env, + ) + finally: + await lease.delete() + + +async def run_harbor_tests( + client: SandboxClient, + sandbox_id: str, + state: ConfigData, + *, + command: str, + working_dir: str | None, + timeout: int, + env: dict[str, str] | None, +) -> str: + result = await client.run_background_job( + sandbox_id=sandbox_id, + command=command, + working_dir=working_dir, + timeout=timeout, + env=env, + ) + state["harbor_tests"] = { + "returncode": result.exit_code, + "stdout": result.stdout or "", + "stderr": result.stderr or "", + } + reward_result = await client.execute_command( + sandbox_id=sandbox_id, + command=HARBOR_REWARD_COMMAND, + ) + return reward_result.stdout or "" + + +async def transfer_harbor_verifier_inputs( + agent_client: SandboxClient, + agent_sandbox_id: str, + verifier_client: SandboxClient, + verifier_sandbox_id: str, + harbor: ConfigMap, +) -> None: + raw_artifacts = harbor.get("artifacts") or [] + if not isinstance(raw_artifacts, list): + raise TypeError("Harbor task artifacts must be a list.") + artifacts: list[tuple[str, list[str]]] = [] + has_artifacts_dir = False + for artifact in raw_artifacts: + if isinstance(artifact, str): + source = artifact + exclude: list[str] = [] + elif isinstance(artifact, Mapping): + artifact_data = cast(ConfigMap, artifact) + source_value = artifact_data.get("source") + if not isinstance(source_value, str): + raise TypeError("Harbor artifacts must be strings or source mappings.") + source = source_value + raw_exclude = artifact_data.get("exclude") or [] + if not isinstance(raw_exclude, list): + raise TypeError("Harbor artifact exclude must be a list.") + exclude = [str(item) for item in raw_exclude] + else: + raise TypeError("Harbor artifacts must be strings or source mappings.") + + if source.rstrip("/") == HARBOR_ARTIFACTS_DIR: + has_artifacts_dir = True + artifacts.append((source, exclude)) + if not has_artifacts_dir: + artifacts.insert(0, (HARBOR_ARTIFACTS_DIR, [])) + + with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: + local_tar = Path(tmp_file.name) + remote_tar = "/tmp/_vf_harbor_inputs.tar.gz" + archive_command_lines = [ + "set -e", + f"rm -f {shlex.quote(remote_tar)}", + "tmp=$(mktemp -d /tmp/_vf_harbor_inputs.XXXXXX)", + "trap 'rm -rf \"$tmp\"' EXIT", + "added=0", + ] + for source, exclude in artifacts: + source = source.rstrip("/") or "/" + path = source.lstrip("/") + if not path: + continue + target = f'"$tmp"/{shlex.quote(path)}' + parent = str(PurePosixPath(path).parent) + mkdir_target = '"$tmp"' if parent == "." else f'"$tmp"/{shlex.quote(parent)}' + exclude_args = " ".join(f"--exclude={shlex.quote(item)}" for item in exclude) + archive_command_lines.extend( + [ + f"if [ -e {shlex.quote(source)} ]; then", + f" mkdir -p {mkdir_target}", + f" if [ -d {shlex.quote(source)} ]; then", + f" mkdir -p {target}", + f" tar -C {shlex.quote(source)} {exclude_args} -cf - . | " + f"tar -C {target} -xf -", + " else", + f" cp {shlex.quote(source)} {target}", + " fi", + " added=1", + "fi", + ] + ) + archive_command_lines.extend( + [ + 'if [ "$added" -eq 0 ]; then exit 42; fi', + f'tar -czf {shlex.quote(remote_tar)} -C "$tmp" .', + ] + ) + archive_command = "\n".join(archive_command_lines) + try: + result = await agent_client.execute_command( + sandbox_id=agent_sandbox_id, + command=archive_command, + ) + if result.exit_code == 42: + return + if result.exit_code: + raise RuntimeError(result.stderr or result.stdout or "tar failed") + await agent_client.download_file(agent_sandbox_id, remote_tar, str(local_tar)) + await upload_harbor_archive( + verifier_client, verifier_sandbox_id, local_tar, remote_tar + ) + finally: + local_tar.unlink(missing_ok=True) + await agent_client.execute_command( + sandbox_id=agent_sandbox_id, + command=f"rm -f {shlex.quote(remote_tar)}", + ) async def upload_harbor_tests( @@ -354,19 +596,39 @@ async def upload_harbor_tests( try: await build_harbor_tests_archive(task_dir, tar_path) remote_tar = "/tmp/harbor_tests.tar.gz" - await client.upload_file(sandbox_id, remote_tar, str(tar_path)) - await client.execute_command( - sandbox_id=sandbox_id, - command=( - f"mkdir -p /oracle /tests /logs/verifier && " - f"tar -xzf {remote_tar} -C / && rm {remote_tar}" - ), + await upload_harbor_archive( + client, + sandbox_id, + tar_path, + remote_tar, + before_extract="mkdir -p /oracle /tests /logs/verifier", timeout=900, ) finally: tar_path.unlink(missing_ok=True) +async def upload_harbor_archive( + client: SandboxClient, + sandbox_id: str, + local_tar: Path, + remote_tar: str, + *, + before_extract: str | None = None, + timeout: int | None = None, +) -> None: + await client.upload_file(sandbox_id, remote_tar, str(local_tar)) + extract = f"tar -xzf {shlex.quote(remote_tar)} -C / && rm {shlex.quote(remote_tar)}" + command = f"{before_extract} && {extract}" if before_extract else extract + result = await client.execute_command( + sandbox_id=sandbox_id, + command=command, + timeout=timeout, + ) + if result.exit_code: + raise RuntimeError(result.stderr or result.stdout or "tar extract failed") + + async def build_harbor_tests_archive(task_dir: Path, tar_path: Path) -> None: with tarfile.open(tar_path, "w:gz") as tar: for dirname, arc_root in (("solution", "oracle"), ("tests", "tests")):