Skip to content

Conversation

@USTCKevinF
Copy link

This PR addresses an issue with stale task requeueing that could cause duplicate rollouts to be received and stored.

Problem

  • The server requeues tasks when they time out (_check_and_requeue_stale_tasks).
  • If a worker eventually finishes and submits a rollout for a task that has already been requeued, the server still accepted the outdated result in /rollout.
  • This caused the same logical task to be processed more than once.
  • In our training loop, if this duplicate rollout is from last batch this led to:
    assert len(self._completed_rollouts) == self._total_tasks_queued
    failing, because duplicate rollouts from previous batches were counted.

or led to rollout id key error because this rollout is not from current batch
image

Solution

  • Introduced an attempt_id for each task claim.
    • Every time a task is handed out via /task, a new attempt_id (UUID) is generated.
    • Workers are required to include this attempt_id when submitting rollouts.
  • When the server receives a rollout:
    • It checks if the attempt_id matches the currently active one in _processing_tasks.
    • If it doesn't match (i.e. the task was requeued and a newer attempt is active), the stale rollout is silently ignored.

This ensures at most one valid rollout is stored per logical task, and late stale results will not break training.

Changes

  • Updated Task to include attempt_id.
  • get_next_task assigns a new attempt_id upon each claim.
  • store_rollout validates the attempt_id before accepting results.
  • Updated logging to make it clear when a stale rollout is dropped.

Why this is important

  • Guarantees consistency between queued tasks and completed rollouts.
  • Prevents assertion errors during training when tasks time out and later resurface.
  • Makes the system more robust in long-running distributed training with occasional straggler workers.

Related issues

  • Internal error AssertionError: assert len(self._completed_rollouts) == self._total_tasks_queued caused by duplicate rollouts from stale tasks.

Copilot AI review requested due to automatic review settings September 5, 2025 06:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces an attempt-based tracking mechanism to prevent duplicate rollouts caused by stale task requeuing in the AgentLightning system.

  • Adds attempt_id fields to Task and Rollout models to track task claim attempts
  • Implements validation logic in the server to reject rollouts from outdated task attempts
  • Updates worker code to include attempt_id when submitting rollouts

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
agentlightning/types.py Adds attempt_id field to both Task and Rollout models
agentlightning/server.py Generates unique attempt IDs for task claims and validates them when storing rollouts
agentlightning/runner.py Updates rollout creation to include the task's attempt ID
Comments suppressed due to low confidence (1)

agentlightning/types.py:1

  • [nitpick] The comment on line 77 is duplicated from line 72. Consider updating it to be more specific about the attempt_id field, such as '# Unique identifier for each task claim attempt'.
from typing import Any, Dict, List, Optional, Union, Literal, Annotated

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

logger.warning(f"Ignoring rollout {rollout.rollout_id}: task not in processing anymore")
return # drop stale result

if getattr(rollout, "attempt_id", None) != current_task.attempt_id:
Copy link

Copilot AI Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using getattr with a default value is unnecessary since rollout.attempt_id is a defined field in the Rollout model. Consider using direct attribute access: rollout.attempt_id != current_task.attempt_id.

Suggested change
if getattr(rollout, "attempt_id", None) != current_task.attempt_id:
if rollout.attempt_id != current_task.attempt_id:

Copilot uses AI. Check for mistakes.
Comment on lines +229 to +230
rollout_obj = Rollout(rollout_id=task.rollout_id, attempt_id=task.attempt_id) # Default empty rollout
Copy link

Copilot AI Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an extra trailing space on line 230 that should be removed.

Suggested change
rollout_obj = Rollout(rollout_id=task.rollout_id, attempt_id=task.attempt_id) # Default empty rollout

Copilot uses AI. Check for mistakes.
@ultmaster
Copy link
Contributor

In server.py, we already had a num_claims. Maybe rename it as claim_sequence_id is a good idea? I don't see the API used elsewhere. so it should be safe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants