Skip to content

feat: stream ACP trajectory incrementally to disk#566

Draft
Yiminnn wants to merge 1 commit into
mainfrom
incremental-trajectory-write
Draft

feat: stream ACP trajectory incrementally to disk#566
Yiminnn wants to merge 1 commit into
mainfrom
incremental-trajectory-write

Conversation

@Yiminnn
Copy link
Copy Markdown
Collaborator

@Yiminnn Yiminnn commented May 26, 2026

Summary

acp_trajectory.jsonl is now written to disk as the agent runs, not just at end-of-run. Mid-run crashes leave a usable partial trajectory; followers can cat the file during the run to see live tool calls.

Format and viewer are unchanged. Each line is still a merged event from _capture_session_trajectory; the writer just snapshots the current cumulative state on every ACP update.

Mechanism

  • ACPSession gains an optional on_change: Callable[[ACPSession], None] invoked after record_user_prompt, mark_prompt_end, and any recognized handle_update branch.
  • TrajectoryWriter (new) writes payloads atomically via tmp + os.replace; dedupes byte-identical re-snapshots.
  • _snapshot_session_trajectory is non-destructive (does not flush _pending_text), so streamed chunks stay mergeable until mark_prompt_end runs and produces the canonical merged events.
  • Rollout._attach_trajectory_writer wires the writer in connect() and connect_as() (multi-role), capturing the cumulative self._trajectory from prior scenes via make_trajectory_sink(writer, prior) — so scene-N's sink writes prior_scenes + current_session, not just the current session.
  • Final write in _build_rollout_result now goes through TrajectoryWriter.write_final (idempotent atomic write — important for oracle / scraped-fallback paths where no live writer ran).

Why this is safe

  • Backward compatible: same JSONL output, same path, same consumers (viewer.py, judge.py.tmpl, tests/test_smoke.py, etc.).
  • Multi-scene: cumulative-aware sink prevents earlier scenes from being wiped by the next scene's writer. New tests in TestMultiSceneCumulativeStreaming cover this.
  • Async: handle_update is single-thread cooperative within the asyncio event loop, so the sync write_text + os.replace per update is consistent with the existing model. See follow-up note below for one MEDIUM finding deferred to a separate PR.
  • Crash recovery: stale .tmp from a SIGKILLed writer is swept by the next TrajectoryWriter.__init__.
  • Unknown ACP updates: handle_update skips _notify_change for unrecognized sessionUpdate types (no wasted snapshots, no risk if a future ACP version adds new event kinds).
  • Callback errors are visible: _notify_change now logs at ERROR (was WARNING) — silent streaming degradation in a 64-concurrency log would otherwise be easy to miss.

End-to-end verified

openhands + gemini/gemini-3.1-flash-lite on SkillsBench citation-check via Daytona — reward 1.0, no errors, 2 tool calls, trajectory_source: acp, partial_trajectory: False. File lands at t=0 with the user_message and grows through each tool-call status transition:

t bytes event what
10:04:20 826 1 user_message
10:04:24 52,750 3 tool cat completed
10:04:48 82,612 5 tool edit completed
10:04:52 85,022 7 final agent_message

On main the file would have first appeared at the verifier-finished mark (~10:05:53 in this run).

Test plan

  • 7 new unit tests in tests/test_capture_trajectory.py:
    • TestTrajectoryWriter — file appears on first event, tool_call pending → completed transitions visible, exceptions swallowed, atomic rewrites (no torn lines / leftover .tmp), write_final overwrites.
    • TestSnapshotSessionTrajectory — non-destructive snapshot preserves _pending_text until mark_prompt_end.
    • TestMultiSceneCumulativeStreaming — prior scenes preserved on disk during current scene; empty current session doesn't wipe history; caller-mutated cumulative list does not double-count.
    • TestHandleUpdateUnknownType — unknown sessionUpdate skips on_change; known types still notify.
    • TestTrajectoryWriterStaleTmpCleanup — stale .tmp swept on __init__.
  • Full local test suite green: 1312 passed, 9 skipped, no regressions.
  • Live Daytona run on openhands + gemini/gemini-3.1-flash-lite + citation-check: reward 1.0.

Known follow-ups (out of scope for this PR)

Before: acp_trajectory.jsonl was written once, at end-of-run, from
_build_rollout_result. Crashes mid-run left no trajectory on disk;
followers couldn't see in-flight tool calls.

After: an optional `on_change` callback on ACPSession fires after every
mutating method (record_user_prompt, mark_prompt_end, handle_update).
Rollout wires it to a TrajectoryWriter that atomically rewrites
acp_trajectory.jsonl (tmp + os.replace) on each ACP update. Final
write at end-of-run is unchanged and idempotent.

Trajectory format and viewer are unchanged — same JSONL events, just
present on disk earlier.

Multi-scene safety: each scene wires its writer through
make_trajectory_sink(writer, prior=self._trajectory[:]), so scene N's
sink prepends the captured cumulative trajectory from prior scenes.
Without this, each scene would overwrite the file with only its own
session's events.

Robustness:
- non-destructive _snapshot_session_trajectory used by the live writer;
  pending_text stays merge-able for mark_prompt_end
- TrajectoryWriter sweeps any stale .tmp left by a previous crashed run
- handle_update no longer fires on_change for unrecognized
  sessionUpdate types (future ACP versions, agent extensions)
- callback failures log at ERROR (was WARNING) so silent streaming
  degradation is visible

End-to-end verified on Daytona with openhands + gemini/gemini-3.1-flash-lite
on SkillsBench citation-check: reward=1.0, file lands at t=0 with the
user_message and grows through tool_call status transitions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant