Skip to content

Latest commit

 

History

History
216 lines (172 loc) · 8.17 KB

File metadata and controls

216 lines (172 loc) · 8.17 KB

Data Flow

This document describes the request lifecycle from client submission through agent execution to response delivery, including async job management and real-time SSE streaming.

Request Lifecycle

Synchronous requests flow through the NeMo Agent Toolkit FastAPI frontend directly to the agent workflow. Asynchronous requests (deep research) use a Dask cluster for distributed execution with SSE-based progress streaming.

sequenceDiagram
    participant C as Client
    participant API as FastAPI
    participant JS as Job Store
    participant D as Dask Worker
    participant A as Agent Workflow
    participant ES as Event Store
    participant SSE as SSE Stream

    C->>API: POST /v1/jobs/async/submit
    API->>JS: Create job (SUBMITTED)
    API->>D: Submit task to Dask
    API-->>C: 202 Accepted {job_id}

    C->>API: GET /v1/jobs/async/job/{id}/stream
    API-->>SSE: Open SSE connection

    D->>JS: Update status (RUNNING)
    D->>A: Execute agent workflow

    loop Agent execution
        A->>ES: Store intermediate events
        ES-->>SSE: Push events to client
        SSE-->>C: SSE event data
    end

    A-->>D: Final result
    D->>ES: Store final events
    D->>JS: Update status (SUCCESS)
    JS-->>SSE: Push job.status
    SSE-->>C: job.status {status: SUCCESS}
Loading

Async Job States

Jobs progress through the following states:

stateDiagram-v2
    [*] --> SUBMITTED: POST /submit
    SUBMITTED --> RUNNING: Dask worker picks up
    RUNNING --> SUCCESS: Agent completes
    RUNNING --> FAILURE: Unhandled error
    RUNNING --> INTERRUPTED: User cancels
    FAILURE --> [*]
    SUCCESS --> [*]
    INTERRUPTED --> [*]
Loading
State Description
SUBMITTED Job created and queued for execution
RUNNING Dask worker is actively executing the agent
SUCCESS Agent completed successfully; final report available
FAILURE Agent encountered an unhandled error
INTERRUPTED User requested cancellation using POST /cancel

A background reaper task periodically marks stale RUNNING jobs as FAILURE if they exceed the configured timeout, protecting against ghost jobs from crashed workers.

SSE Event Types

Events use a category.state naming convention aligned with NeMo Agent Toolkit's IntermediateStep structure. The AgentEventCallback (a LangChain callback handler) translates LangChain lifecycle events into these SSE events.

Event Type Category State Description
stream.mode -- -- Announces stream mode: polling (catching up), live (real-time), or pubsub (PostgreSQL LISTEN/NOTIFY)
job.status job -- Job status change; includes status, optional reconnected flag
workflow.start workflow start Agent workflow execution begins
workflow.end workflow end Agent workflow execution completes
llm.start llm start LLM invocation begins; includes model name and prompt/message count
llm.chunk llm chunk Streaming token chunk from LLM
llm.end llm end LLM invocation completes; includes usage metadata and optional thinking/reasoning
tool.start tool start Tool execution begins; includes tool name and input
tool.end tool end Tool execution completes; may emit citation_source artifacts for search tools
artifact.update artifact update Artifact created or updated (files, citations, todos, outputs)
job.update job update Retry notification when a chain (LLM call) fails and is retried
job.error job -- Error during job execution
job.heartbeat job -- Periodic heartbeat from Dask worker (every 30s); keeps SSE alive and aids ghost job detection
job.cancelled job -- Job was cancelled by user (emitted from Dask worker on CancelledError)
job.cancellation_requested job -- Cancellation has been requested for the job
job.shutdown job -- Server is shutting down; client should reconnect

Event Structure

Every SSE event follows the IntermediateStepEvent schema:

{
  "type": "tool.start",
  "id": "uuid-v4",
  "name": "tavily_web_search",
  "timestamp": "2026-02-16T10:30:00Z",
  "data": {
    "input": {"query": "renewable energy GDP impact"},
    "output": null
  },
  "metadata": {}
}

Artifact Types

The artifact.update event carries a type field indicating the artifact kind:

Artifact Type Description
file Virtual filesystem file created by the deep researcher
output Intermediate output (draft section, summary)
citation_source A source URL or reference discovered during research
citation_use An inline citation placed in the report
todo A research task tracked by TodoListMiddleware

Reconnection and Replay

The SSE stream supports seamless reconnection after network interruptions. Every event stored in the EventStore has a monotonically increasing integer ID. Clients track the last received event ID and reconnect using the resume endpoint.

sequenceDiagram
    participant C as Client
    participant API as FastAPI
    participant ES as Event Store

    C->>API: GET /stream
    API-->>C: event id=1 (workflow.start)
    API-->>C: event id=2 (llm.start)
    API-->>C: event id=3 (llm.chunk)

    Note over C,API: Network interruption

    C->>API: GET /stream/{last_event_id=3}
    Note over API,ES: Replay mode: fetch events after id=3

    API-->>C: stream.mode {mode: polling}
    API-->>C: event id=4 (llm.end)
    API-->>C: event id=5 (tool.start)
    API-->>C: event id=6 (tool.end)
    API-->>C: stream.mode {mode: live}
    API-->>C: event id=7 (llm.start)
Loading

Replay Behavior

  1. Polling mode: On reconnection, the stream enters polling mode and fetches all events after the provided last_event_id in large batches (up to 10,000 for completed jobs, 1,000 for running jobs) with no polling delay.

  2. Mode transition: Once the polling batch is smaller than the fetch limit (indicating the tail has been reached), the stream emits stream.mode {mode: live} and switches to live polling.

  3. Reconnection flag: The first job.status event after reconnection includes reconnected: true so clients can distinguish reconnection status updates from new status changes.

  4. Completed job replay: If the job already finished before the client reconnects, the stream replays all stored events and immediately sends the terminal job.status event.

API Endpoints

Method Endpoint Description
GET /v1/jobs/async/agents List available agent types
POST /v1/jobs/async/submit Submit a new async job
GET /v1/jobs/async/job/{job_id} Get job status
GET /v1/jobs/async/job/{job_id}/stream SSE stream from beginning
GET /v1/jobs/async/job/{job_id}/stream/{last_event_id} SSE stream from event ID (reconnection)
POST /v1/jobs/async/job/{job_id}/cancel Cancel a running job
GET /v1/jobs/async/job/{job_id}/state Get artifacts from event store
GET /v1/jobs/async/job/{job_id}/report Get final report

Cancellation

When a client sends POST /cancel:

  1. The Job Store sets the job status to INTERRUPTED
  2. A CancellationMonitor running on the Dask worker polls the Job Store at regular intervals (default 1 second)
  3. When the monitor detects INTERRUPTED, it sets an asyncio.Event that the agent workflow can check for cooperative cancellation
  4. The SSE stream sends a final job.status {status: INTERRUPTED} event

Graceful Shutdown

The SSEConnectionManager tracks all active SSE connections. During server shutdown:

  1. signal_shutdown() sets a shared event flag
  2. Active SSE generators check the flag during polling intervals
  3. Each stream emits a job.shutdown event before closing
  4. The connection manager waits for all streams to terminate