Implement Trajectory collection#86
Conversation
|
Thanks for this PR! Sorry for the slow review, scheduled time tomorrow to take a detailed look. |
joshgreaves
left a comment
There was a problem hiding this comment.
Thanks for making this PR! It took me a long time to think about what I think should be the best design here. This is what I got to:
Let's make the TrajectoryCollectingEnvironment(env, trajectory_collector) a wrapper.
- Takes an env and trajectory_collector in init
- Follows the Environment protocol
- Records based on calls to aenter, reset, step, close, and aexit, using methods on trajectory_collector
The big benefits are:
- It's backwards compatible. We aren't updating any signatures.
- It will apply to all environments wrapped this way, not just code environments.
The cases I’d treat as episode end are:
- step() returns LAST -> normal finish
- reset() while an episode is active -> previous episode is abandoned/interrupted
- close() / aexit() while active -> previous episode is aborted/closed
- step() or reset() raises -> current episode is errored/aborted
- task cancellation during step() / reset() -> aborted/cancelled
| from ares.environments.trajectory import StepRecord | ||
| from ares.environments.trajectory import TrajectoryCollector | ||
|
|
||
| __all__ = [ |
There was a problem hiding this comment.
This seems reasonable, but we might want to consider where is the right place to export these.
src/ares/environments/code_env.py
Outdated
| from ares.containers import containers | ||
| from ares.containers import daytona as ares_daytona | ||
| from ares.environments import base | ||
| from ares.environments import trajectory as trajectory_mod |
There was a problem hiding this comment.
Nit: prefer trajectory_lib over trajectory_mod, since it's consistent with other parts of the codebase.
src/ares/environments/code_env.py
Outdated
| self._step_limit = step_limit | ||
| self._prefix = prefix | ||
| self._tracker = tracker if tracker is not None else stat_tracker.NullStatTracker() | ||
| self._trajectory_collector = trajectory_collector |
There was a problem hiding this comment.
Let's add a NullTrajectoryTracker (like NullStattracker), since it simplifies the control flow a bit (don't have to check if the tracker is None in multiple places)
📝 WalkthroughWalkthroughThis PR adds episode trajectory collection capabilities to ARES. It introduces data structures ( Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Registry as ares.make()
participant Environment as CodeEnvironment
participant Collector as TrajectoryCollector
participant Storage as JSON File
User->>Registry: make(preset_id, trajectory_collector=...)
Registry->>Environment: __init__(trajectory_collector=...)
Environment->>Environment: _trajectory_collector = collector
User->>Environment: reset()
Environment->>Collector: begin_episode(task_name)
Collector->>Collector: Initialize episode state
Environment->>Collector: record_step(StepRecord(FIRST, ...))
loop Per Environment Step
User->>Environment: step(action)
Environment->>Collector: record_step(StepRecord(MID, ...))
Collector->>Collector: Append step
end
Note over Environment,Collector: Episode terminates
Environment->>Collector: end_episode(truncated=...)
Collector->>Storage: Write {episode_id}.json
Collector-->>Environment: Return EpisodeTrajectory
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Comment |
| # Record the FIRST step in the trajectory. | ||
| # FIRST steps have only observation; action/reward/discount are None per dm_env semantics. | ||
| assert self._current_task is not None | ||
| self._trajectory_collector.begin_episode(task_name=self._current_task.name) | ||
| self._trajectory_collector.record_step( | ||
| trajectory_lib.StepRecord( | ||
| step_index=0, | ||
| step_type="FIRST", | ||
| observation=trajectory_lib.serialize_llm_request(result.observation), | ||
| action=None, | ||
| reward=None, | ||
| discount=None, |
There was a problem hiding this comment.
reset() begins a new episode without ending the previous one, causing interrupted trajectories to be discarded — should we call _trajectory_collector.end_episode(truncated=True) before starting a new episode and on close()?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
Before applying, verify this suggestion against the current code. In
src/ares/environments/code_env.py around lines 128-142, the reset() method currently
calls _trajectory_collector.begin_episode(...) and record_step(...) without first ending
a previously active episode. Refactor by inserting a check to call
self._trajectory_collector.end_episode(truncated=True) if an episode is active (or
unconditionally call end_episode(truncated=True) before begin_episode) so interrupted
episodes are finalized rather than discarded, then call begin_episode and record the
FIRST step as before. Also modify the close() method (where the environment is closed)
to call self._trajectory_collector.end_episode(truncated=True) if an episode is active
so partial episodes are finalized on close as well.
| @dataclasses.dataclass | ||
| class EpisodeTrajectory: | ||
| """A complete episode trajectory with metadata and step records. | ||
|
|
||
| Attributes: |
There was a problem hiding this comment.
EpisodeTrajectory is mutable despite CLAUDE.md recommending frozen dataclasses — should we make it frozen=True and return updated copies or explicitly document/allow this mutation?
Finding type: AI Coding Guidelines | Severity: 🟠 Medium
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
Before applying, verify this suggestion against the current code. In
src/ares/environments/trajectory.py around lines 103-107 (the EpisodeTrajectory
dataclass) and around lines 268-276 (JsonTrajectoryCollector.end_episode): Make
EpisodeTrajectory a frozen dataclass (add frozen=True) and change its steps field to an
immutable sequence type (e.g. tuple[StepRecord, ...]). Then refactor
JsonTrajectoryCollector so it no longer mutates an EpisodeTrajectory in place: keep
internal mutable state (e.g. self._current_steps: list[StepRecord] plus
episode_id/task_name/start_time) in begin_episode and record_step, and in end_episode
construct and return a new EpisodeTrajectory instance (with
steps=tuple(self._current_steps), end_time, total_reward, num_steps, truncated) instead
of mutating fields on an existing object; clear the internal state afterwards. Update
NullTrajectoryCollector and any creation sites accordingly to match the new frozen
EpisodeTrajectory signature.
| env = spec.get_env( | ||
| selector=selector, | ||
| container_factory=container_factory, | ||
| tracker=tracker, | ||
| trajectory_collector=trajectory_collector, | ||
| ) |
There was a problem hiding this comment.
registry.make() forwards trajectory_collector to spec.get_env() causing TypeError for specs that don't accept that kwarg — should we only pass supported args or add a compatibility shim/**kwargs?
Finding type: Breaking Changes | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
Before applying, verify this suggestion against the current code. In
src/ares/registry.py around lines 619-624, the make() function unconditionally forwards
trajectory_collector to spec.get_env causing TypeError for specs that don't accept that
kwarg. Change the call to inspect spec.get_env's parameters (via inspect.signature) or
attempt the call and fall back: build a kwargs dict with selector, container_factory,
tracker and only include trajectory_collector if the target function accepts it; then
call spec.get_env(**kwargs). Also adjust the decorator-generated spec at lines 407-421
to accept trajectory_collector as an optional kwarg or accept **kwargs and forward them
to func, so auto-generated specs remain compatible with older user functions.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/ares/registry.py (1)
407-421:⚠️ Potential issue | 🟠 MajorKeep legacy
@register_envfactories working when no collector is requested.This wrapper now always injects
trajectory_collector=None. Any existing factory that still has the old(..., tracker=None)signature will start failing withunexpected keyword argument 'trajectory_collector'even on plainmake(...)calls. Only forward the kwarg when the caller actually provided a collector, or gate it on the wrapped function's signature.💡 Minimal backwards-compatible forwarding
def get_env( self, *, selector: TaskSelector, container_factory: containers.ContainerFactory, tracker: stat_tracker.StatTracker | None = None, trajectory_collector: trajectory.TrajectoryCollector | None = None, ) -> base.Environment: """Delegate to the decorated function.""" - return func( - selector=selector, - container_factory=container_factory, - tracker=tracker, - trajectory_collector=trajectory_collector, - ) + kwargs = { + "selector": selector, + "container_factory": container_factory, + "tracker": tracker, + } + if trajectory_collector is not None: + kwargs["trajectory_collector"] = trajectory_collector + return func(**kwargs)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ares/registry.py` around lines 407 - 421, The wrapper function get_env in the registry is always passing trajectory_collector=None into the decorated factory (func) which breaks legacy factories that only accept tracker; update get_env to only forward the trajectory_collector kwarg when a collector was actually provided (trajectory_collector is not None) or when the wrapped function accepts that parameter (inspect.signature(func) includes "trajectory_collector"); modify the call site inside get_env to build kwargs dynamically (always include selector, container_factory, tracker, and conditionally include trajectory_collector) or gate forwarding based on the func signature so old factories continue to work.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ares/environments/code_env.py`:
- Around line 164-197: The step() flow currently leaves an in-progress
trajectory open if _get_time_step() (or subsequent processing) raises; catch
exceptions around the time-step retrieval/processing in step() and ensure the
episode is finalized before re-raising: if an exception occurs, set
self._requires_reset = True, cancel self._code_agent_task if not None, and call
self._trajectory_collector.end_episode(truncated=True) (or end_episode with an
appropriate truncated flag) so the JSON trajectory is closed, then re-raise the
original exception; locate this handling around the call to
self._get_time_step(), the surrounding logic that sets truncated and LAST, and
the recording block that uses trajectory_lib.StepRecord and
_trajectory_collector.
In `@src/ares/registry.py`:
- Around line 619-624: The call to spec.get_env in make() passes
trajectory_collector unconditionally which breaks third‑party EnvironmentSpec
implementations; instead build the kwargs dynamically (e.g., create a dict with
selector, container_factory, tracker and only set 'trajectory_collector' when
trajectory_collector is not None) and call spec.get_env(**kwargs) so older
register_preset() specs that lack that parameter continue to work; modify the
code around the spec.get_env invocation to conditionally include the
trajectory_collector key.
---
Outside diff comments:
In `@src/ares/registry.py`:
- Around line 407-421: The wrapper function get_env in the registry is always
passing trajectory_collector=None into the decorated factory (func) which breaks
legacy factories that only accept tracker; update get_env to only forward the
trajectory_collector kwarg when a collector was actually provided
(trajectory_collector is not None) or when the wrapped function accepts that
parameter (inspect.signature(func) includes "trajectory_collector"); modify the
call site inside get_env to build kwargs dynamically (always include selector,
container_factory, tracker, and conditionally include trajectory_collector) or
gate forwarding based on the func signature so old factories continue to work.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b89f7305-5ee5-4162-9ebf-cbcc8d77336b
📒 Files selected for processing (7)
src/ares/__init__.pysrc/ares/environments/__init__.pysrc/ares/environments/code_env.pysrc/ares/environments/trajectory.pysrc/ares/environments/trajectory_test.pysrc/ares/presets.pysrc/ares/registry.py
| with self._tracker.timeit(f"{self._prefix}/get_time_step"): | ||
| ts = await self._get_time_step() | ||
|
|
||
| truncated = False | ||
| if self._step_count >= self._step_limit: | ||
| _LOGGER.debug("[%d] Step limit reached. Returning LAST timestep.", id(self)) | ||
| assert self._code_agent_task is not None | ||
| self._code_agent_task.cancel() | ||
| # Truncation: step_type="LAST", discount=1.0, unless we're _also_ already in a terminal state. | ||
| truncated = ts.step_type != "LAST" | ||
| ts = base.TimeStep(step_type="LAST", reward=ts.reward, discount=ts.discount, observation=ts.observation) | ||
|
|
||
| if ts.step_type == "LAST": | ||
| self._requires_reset = True | ||
|
|
||
| # Record the step in the trajectory. | ||
| self._trajectory_collector.record_step( | ||
| trajectory_lib.StepRecord( | ||
| step_index=self._step_count, | ||
| step_type=ts.step_type, | ||
| observation=( | ||
| trajectory_lib.serialize_llm_request(ts.observation) | ||
| if ts.observation is not None | ||
| else None | ||
| ), | ||
| action=trajectory_lib.serialize_llm_response(action), | ||
| reward=ts.reward, | ||
| discount=ts.discount, | ||
| timestamp=time.time(), | ||
| ) | ||
| ) | ||
| if ts.step_type == "LAST": | ||
| self._trajectory_collector.end_episode(truncated=truncated) | ||
|
|
There was a problem hiding this comment.
Finalize the trajectory when step() exits via an error.
_get_time_step() already raises on agent failures (Lines 214-220), but this block only flips _requires_reset and calls end_episode() on the happy-path LAST branch. After an exception, the environment still looks reusable and the in-progress trajectory is left hanging; JsonTrajectoryCollector.begin_episode() later overwrites it on the next reset (src/ares/environments/trajectory.py Lines 237-243). Please abort/finalize the episode and force a reset before re-raising.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ares/environments/code_env.py` around lines 164 - 197, The step() flow
currently leaves an in-progress trajectory open if _get_time_step() (or
subsequent processing) raises; catch exceptions around the time-step
retrieval/processing in step() and ensure the episode is finalized before
re-raising: if an exception occurs, set self._requires_reset = True, cancel
self._code_agent_task if not None, and call
self._trajectory_collector.end_episode(truncated=True) (or end_episode with an
appropriate truncated flag) so the JSON trajectory is closed, then re-raise the
original exception; locate this handling around the call to
self._get_time_step(), the surrounding logic that sets truncated and LAST, and
the recording block that uses trajectory_lib.StepRecord and
_trajectory_collector.
| env = spec.get_env( | ||
| selector=selector, | ||
| container_factory=container_factory, | ||
| tracker=tracker, | ||
| trajectory_collector=trajectory_collector, | ||
| ) |
There was a problem hiding this comment.
Don't break existing register_preset() specs by passing the new kwarg unconditionally.
make() now always calls spec.get_env(..., trajectory_collector=None). Any third-party EnvironmentSpec that still implements the previous signature will now fail on ordinary make() usage, even though trajectory collection was not requested. Build the kwargs dynamically and only include trajectory_collector when it is non-None.
💡 Backwards-compatible call construction
- env = spec.get_env(
- selector=selector,
- container_factory=container_factory,
- tracker=tracker,
- trajectory_collector=trajectory_collector,
- )
+ get_env_kwargs = {
+ "selector": selector,
+ "container_factory": container_factory,
+ "tracker": tracker,
+ }
+ if trajectory_collector is not None:
+ get_env_kwargs["trajectory_collector"] = trajectory_collector
+ env = spec.get_env(**get_env_kwargs)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| env = spec.get_env( | |
| selector=selector, | |
| container_factory=container_factory, | |
| tracker=tracker, | |
| trajectory_collector=trajectory_collector, | |
| ) | |
| get_env_kwargs = { | |
| "selector": selector, | |
| "container_factory": container_factory, | |
| "tracker": tracker, | |
| } | |
| if trajectory_collector is not None: | |
| get_env_kwargs["trajectory_collector"] = trajectory_collector | |
| env = spec.get_env(**get_env_kwargs) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ares/registry.py` around lines 619 - 624, The call to spec.get_env in
make() passes trajectory_collector unconditionally which breaks third‑party
EnvironmentSpec implementations; instead build the kwargs dynamically (e.g.,
create a dict with selector, container_factory, tracker and only set
'trajectory_collector' when trajectory_collector is not None) and call
spec.get_env(**kwargs) so older register_preset() specs that lack that parameter
continue to work; modify the code around the spec.get_env invocation to
conditionally include the trajectory_collector key.
User description
Summary
Adds optional trajectory collection to ARES environments. Episodes can be recorded as full (observation, action, reward, discount) sequences for behavior cloning, offline RL, and debugging.
Changes
New module:
ares.environments.trajectoryStepRecord– Frozen dataclass for a single step (step_index, step_type, observation, action, reward, discount, timestamp).EpisodeTrajectory– Dataclass for episode metadata (episode_id, task_name, steps, start_time, end_time, total_reward, truncated).TrajectoryCollector– Protocol withbegin_episode(),record_step(), andend_episode().JsonTrajectoryCollector– Writes each episode to a JSON file in a configurable output directory.serialize_llm_request()andserialize_llm_response()for JSON-safe storage.Integration in
CodeEnvironmenttrajectory_collectorparameter.reset(): callsbegin_episode()and records the FIRST step (initial observation).step(): records each step (action, next observation, reward, discount); on LAST step, callsend_episode()with truncation status.Registry and presets
ares.make()acceptstrajectory_collectorand forwards it to presets.trajectory_collectorthrough toCodeEnvironment.Step semantics (dm_env-style)
None.None; reward is the episode reward; discount is0.0(terminal) or1.0(truncated).Usage
Testing
trajectory_test.pyJsonTrajectoryCollectorlifecycleRun the trajectory collection tests
Design Notes
Opt-in collection
trajectory_collector=Nonedisables recordingProtocol-based design
Non-intrusive integration
reset/stepflowGenerated description
Below is a concise technical summary of the changes proposed in this PR:
Add optional trajectory collection by introducing the
ares.environments.trajectorymodule with data models, collectors, and serialization helpers for capturing episodes. Integratetrajectory_collectorwiring throughCodeEnvironment, the registry, and presets soares.makecan record episodes when provided a collector.trajectorymodule featuringStepRecord,EpisodeTrajectory,TrajectoryCollector,JsonTrajectoryCollector, and JSON serialization helpers, along with pytest coverage for serialization and persistence.Modified files (3)
Latest Contributors(1)
trajectory_collectorintoCodeEnvironment.reset/step, the preset plumbing, and the public registry soares.makecan transparently forward collectors and persist episodes.Modified files (4)
Latest Contributors(2)
Summary by CodeRabbit