Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/ares/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
>>> tracker = stat_tracker.LoggingStatTracker()
>>> env = ares.make("sbv-mswea", container_factory=daytona.DaytonaContainer, tracker=tracker)

Collect episode trajectories:

>>> from ares.environments.trajectory import JsonTrajectoryCollector
>>> collector = JsonTrajectoryCollector(output_dir="./trajectories")
>>> env = ares.make("sbv-mswea", trajectory_collector=collector)

To see available presets:

>>> all_presets = ares.info() # Get list of all presets
Expand All @@ -35,7 +41,7 @@
>>> ares.registry.register_preset("my-env", MyEnvSpec())

All other functionality is available via submodules:
- ares.environments: Environment implementations
- ares.environments: Environment implementations and trajectory collection
- ares.code_agents: Code agent implementations
- ares.containers: Container management
- ares.llms: LLM client implementations
Expand All @@ -47,6 +53,12 @@
from ares import presets # noqa: F401
from ares.environments.base import Environment
from ares.environments.base import TimeStep

# Trajectory collection
from ares.environments.trajectory import EpisodeTrajectory
from ares.environments.trajectory import JsonTrajectoryCollector
from ares.environments.trajectory import StepRecord
from ares.environments.trajectory import TrajectoryCollector
from ares.registry import EnvironmentInfo

# Import registry functions to expose at top level
Expand All @@ -58,7 +70,11 @@
__all__ = [
"Environment",
"EnvironmentInfo",
"EpisodeTrajectory",
"JsonTrajectoryCollector",
"StepRecord",
"TimeStep",
"TrajectoryCollector",
"info",
"list_presets",
"make",
Expand Down
13 changes: 13 additions & 0 deletions src/ares/environments/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Environment implementations for ARES."""

from ares.environments.trajectory import EpisodeTrajectory
from ares.environments.trajectory import JsonTrajectoryCollector
from ares.environments.trajectory import StepRecord
from ares.environments.trajectory import TrajectoryCollector

__all__ = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems reasonable, but we might want to consider where is the right place to export these.

"EpisodeTrajectory",
"JsonTrajectoryCollector",
"StepRecord",
"TrajectoryCollector",
]
40 changes: 40 additions & 0 deletions src/ares/environments/code_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ares.containers import containers
from ares.containers import daytona as ares_daytona
from ares.environments import base
from ares.environments import trajectory as trajectory_lib
from ares.experiment_tracking import stat_tracker
from ares.llms import queue_mediated_client
from ares.llms import request
Expand Down Expand Up @@ -67,13 +68,15 @@ def __init__(
step_limit: int = 250, # Same as mini-swe-agent default.
prefix: str = "harbor_env",
tracker: stat_tracker.StatTracker | None = None,
trajectory_collector: trajectory_lib.TrajectoryCollector | None = None,
):
self._tasks = tasks
self._container_factory = container_factory
self._code_agent_factory = code_agent_factory
self._step_limit = step_limit
self._prefix = prefix
self._tracker = tracker if tracker is not None else stat_tracker.NullStatTracker()
self._trajectory_collector = trajectory_collector if trajectory_collector is not None else trajectory_lib.NullTrajectoryCollector()

# We set the LLM client to a queue mediated client so that
# we can return LLM requests in the reset and step methods.
Expand Down Expand Up @@ -122,6 +125,22 @@ async def reset(self) -> base.TimeStep[request.LLMRequest, float, float]:
assert ts.observation is not None
result = base.TimeStep(step_type="FIRST", reward=ts.reward, discount=ts.discount, observation=ts.observation)

# Record the FIRST step in the trajectory.
# FIRST steps have only observation; action/reward/discount are None per dm_env semantics.
assert self._current_task is not None
self._trajectory_collector.begin_episode(task_name=self._current_task.name)
self._trajectory_collector.record_step(
trajectory_lib.StepRecord(
step_index=0,
step_type="FIRST",
observation=trajectory_lib.serialize_llm_request(result.observation),
action=None,
reward=None,
discount=None,
Comment on lines +128 to +139
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset() begins a new episode without ending the previous one, causing interrupted trajectories to be discarded — should we call _trajectory_collector.end_episode(truncated=True) before starting a new episode and on close()?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

Before applying, verify this suggestion against the current code. In
src/ares/environments/code_env.py around lines 128-142, the reset() method currently
calls _trajectory_collector.begin_episode(...) and record_step(...) without first ending
a previously active episode. Refactor by inserting a check to call
self._trajectory_collector.end_episode(truncated=True) if an episode is active (or
unconditionally call end_episode(truncated=True) before begin_episode) so interrupted
episodes are finalized rather than discarded, then call begin_episode and record the
FIRST step as before. Also modify the close() method (where the environment is closed)
to call self._trajectory_collector.end_episode(truncated=True) if an episode is active
so partial episodes are finalized on close as well.

timestamp=time.time(),
)
)

reset_end_time = time.time()
self._tracker.scalar(f"{self._prefix}/reset", reset_end_time - reset_start_time)
return result
Expand All @@ -145,16 +164,37 @@ async def step(self, action: response.LLMResponse) -> base.TimeStep[request.LLMR
with self._tracker.timeit(f"{self._prefix}/get_time_step"):
ts = await self._get_time_step()

truncated = False
if self._step_count >= self._step_limit:
_LOGGER.debug("[%d] Step limit reached. Returning LAST timestep.", id(self))
assert self._code_agent_task is not None
self._code_agent_task.cancel()
# Truncation: step_type="LAST", discount=1.0, unless we're _also_ already in a terminal state.
truncated = ts.step_type != "LAST"
ts = base.TimeStep(step_type="LAST", reward=ts.reward, discount=ts.discount, observation=ts.observation)

if ts.step_type == "LAST":
self._requires_reset = True

# Record the step in the trajectory.
self._trajectory_collector.record_step(
trajectory_lib.StepRecord(
step_index=self._step_count,
step_type=ts.step_type,
observation=(
trajectory_lib.serialize_llm_request(ts.observation)
if ts.observation is not None
else None
),
action=trajectory_lib.serialize_llm_response(action),
reward=ts.reward,
discount=ts.discount,
timestamp=time.time(),
)
)
if ts.step_type == "LAST":
self._trajectory_collector.end_episode(truncated=truncated)

Comment on lines 164 to +197
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Finalize the trajectory when step() exits via an error.

_get_time_step() already raises on agent failures (Lines 214-220), but this block only flips _requires_reset and calls end_episode() on the happy-path LAST branch. After an exception, the environment still looks reusable and the in-progress trajectory is left hanging; JsonTrajectoryCollector.begin_episode() later overwrites it on the next reset (src/ares/environments/trajectory.py Lines 237-243). Please abort/finalize the episode and force a reset before re-raising.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ares/environments/code_env.py` around lines 164 - 197, The step() flow
currently leaves an in-progress trajectory open if _get_time_step() (or
subsequent processing) raises; catch exceptions around the time-step
retrieval/processing in step() and ensure the episode is finalized before
re-raising: if an exception occurs, set self._requires_reset = True, cancel
self._code_agent_task if not None, and call
self._trajectory_collector.end_episode(truncated=True) (or end_episode with an
appropriate truncated flag) so the JSON trajectory is closed, then re-raise the
original exception; locate this handling around the call to
self._get_time_step(), the surrounding logic that sets truncated and LAST, and
the recording block that uses trajectory_lib.StepRecord and
_trajectory_collector.

step_end_time = time.time()
self._tracker.scalar(f"{self._prefix}/step", step_end_time - step_start_time)

Expand Down
Loading