diff --git a/apps/dojo/src/agents.ts b/apps/dojo/src/agents.ts index 46a8701d6..c87b18255 100644 --- a/apps/dojo/src/agents.ts +++ b/apps/dojo/src/agents.ts @@ -257,6 +257,32 @@ export const agentsIntegrations = { } ), + "agent-spec-langgraph": async () => + mapAgents( + (path) => new HttpAgent({ + url: `${envVars.agentSpecUrl}/langgraph/${path}`, + }), + { + agentic_chat: "agentic_chat", + backend_tool_rendering: "backend_tool_rendering", + human_in_the_loop: "human_in_the_loop", + tool_based_generative_ui: "tool_based_generative_ui", + } + ), + + "agent-spec-wayflow": async () => + mapAgents( + (path) => new HttpAgent({ + url: `${envVars.agentSpecUrl}/wayflow/${path}`, + }), + { + agentic_chat: "agentic_chat", + backend_tool_rendering: "backend_tool_rendering", + tool_based_generative_ui: "tool_based_generative_ui", + human_in_the_loop: "human_in_the_loop", + } + ), + "microsoft-agent-framework-python": async () => mapAgents( (path) => new HttpAgent({ url: `${envVars.agentFrameworkPythonUrl}/${path}` }), diff --git a/apps/dojo/src/env.ts b/apps/dojo/src/env.ts index 59d66de5e..658fc8fe6 100644 --- a/apps/dojo/src/env.ts +++ b/apps/dojo/src/env.ts @@ -9,6 +9,7 @@ type envVars = { springAiUrl: string; llamaIndexUrl: string; crewAiUrl: string; + agentSpecUrl: string; pydanticAIUrl: string; adkMiddlewareUrl: string; agentFrameworkPythonUrl: string; @@ -41,6 +42,7 @@ export default function getEnvVars(): envVars { agnoUrl: process.env.AGNO_URL || 'http://localhost:9001', llamaIndexUrl: process.env.LLAMA_INDEX_URL || 'http://localhost:9000', crewAiUrl: process.env.CREW_AI_URL || 'http://localhost:9002', + agentSpecUrl: process.env.AGENT_SPEC_URL || 'http://localhost:9003', pydanticAIUrl: process.env.PYDANTIC_AI_URL || 'http://localhost:9000', adkMiddlewareUrl: process.env.ADK_MIDDLEWARE_URL || 'http://localhost:8000', agentFrameworkPythonUrl: process.env.AGENT_FRAMEWORK_PYTHON_URL || 'http://localhost:8888', diff --git a/apps/dojo/src/menu.ts b/apps/dojo/src/menu.ts index 218e6482c..4529f66d4 100644 --- a/apps/dojo/src/menu.ts +++ b/apps/dojo/src/menu.ts @@ -11,6 +11,26 @@ import type { IntegrationFeatures, MenuIntegrationConfig } from "./types/integra */ export const menuIntegrations = [ + { + id: "agent-spec-langgraph", + name: "Open Agent Spec (LangGraph)", + features: [ + "agentic_chat", + "backend_tool_rendering", + "human_in_the_loop", + "tool_based_generative_ui", + ], + }, + { + id: "agent-spec-wayflow", + name: "Open Agent Spec (Wayflow)", + features: [ + "agentic_chat", + "backend_tool_rendering", + "human_in_the_loop", + "tool_based_generative_ui", + ], + }, { id: "langgraph", name: "LangGraph (Python)", diff --git a/integrations/agent-spec/python/README.md b/integrations/agent-spec/python/README.md new file mode 100644 index 000000000..49145d29b --- /dev/null +++ b/integrations/agent-spec/python/README.md @@ -0,0 +1,57 @@ +Open Agent Spec <> AG‑UI (Python) +================================= + +Agent runner that emits AG‑UI events and a small FastAPI/uvicorn server to stream them to Dojo via SSE. + +What this is +- Agent runner: `ag_ui_agentspec/agent.py` executes an Agent Spec configuration on a chosen runtime and bridges spans to AG‑UI events. +- Server wiring: `ag_ui_agentspec/endpoint.py` exposes a POST SSE endpoint that streams those events to the Dojo frontend. + +Supported agent runtimes and Dojo features +- Wayflow (Oracle's reference agent framework) (chat, frontend tools, backend tools) +- LangGraph (chat, frontend tools, backend tools, tool call streaming) + +## Install + +Base library plus optional extras so you can choose runtimes. Routers are lazy-loaded; if you don't install a runtime, its routes will not work. + +Before installation, please clone the following GitHub repositories: +- [AG-UI](https://github.com/ag-ui-protocol/ag-ui) and `cd ag-ui/integrations/agent-spec/python` +- [Agent Spec](https://github.com/oracle/agent-spec) +- [WayFlow](https://github.com/oracle/wayflow) +- Please put these 3 repos in the same directory. + + +```bash +# Wayflow only +uv pip install -e .[wayflow] + +# LangGraph only +uv pip install -e .[langgraph] + +# Multiple +uv pip install -e .[wayflow,langgraph] +``` + +Run the example server +```bash +cd ag-ui/integrations/agent-spec/python/examples +uv sync --extra langgraph --extra wayflow && uv run dev # both runtimes; serves http://localhost:9003 +# or pick one runtime: +# uv sync --extra langgraph && uv run dev +# uv sync --extra wayflow && uv run dev +# then run Dojo (in a separate terminal): +# Option A — run everything from repo root (multiple apps): +# pnpm turbo run dev +# Option B — run only Dojo: +# cd ag-ui/apps/dojo +# AGENT_SPEC_URL=http://localhost:9003 pnpm dev (make sure to run pnpm build first) +``` + +Environment +- OpenAI-compatible variables commonly used by the examples (pick your provider): + - `OPENAI_BASE_URL` (or provider-specific: `OSS_API_URL`, `LLAMA_API_URL`, etc.) + - `OPENAI_MODEL` (the model slug, defaults to `gpt-4o` availble through OpenAI API) + - `OPENAI_API_KEY` +- Dojo server URL: + - `AGENT_SPEC_URL=http://localhost:9003` when running the local example server diff --git a/integrations/agent-spec/python/ag_ui_agentspec/__init__.py b/integrations/agent-spec/python/ag_ui_agentspec/__init__.py new file mode 100644 index 000000000..27a3a4fa4 --- /dev/null +++ b/integrations/agent-spec/python/ag_ui_agentspec/__init__.py @@ -0,0 +1,5 @@ +from ag_ui_agentspec.endpoint import add_agentspec_fastapi_endpoint + +__all__ = [ + "add_agentspec_fastapi_endpoint", +] diff --git a/integrations/agent-spec/python/ag_ui_agentspec/agent.py b/integrations/agent-spec/python/ag_ui_agentspec/agent.py new file mode 100644 index 000000000..9d84b43da --- /dev/null +++ b/integrations/agent-spec/python/ag_ui_agentspec/agent.py @@ -0,0 +1,35 @@ +from typing import Literal + +from ag_ui.core import RunAgentInput +from ag_ui_agentspec.agentspec_tracing_exporter import AgUiSpanProcessor +from pyagentspec.tracing.trace import Trace +from pyagentspec.tracing.spans.span import Span +from ag_ui_agentspec.agentspecloader import load_agent_spec + + +class AgentSpecAgent: + def __init__( + self, + agent_spec_config: str, + runtime: Literal["langgraph", "wayflow"], + tool_registry=None, + additional_processors=None + ): + if runtime not in {"langgraph", "wayflow"}: + raise NotImplementedError("other runtimes are not supported yet") + self.runtime = runtime + self.framework_agent = load_agent_spec(runtime, agent_spec_config, tool_registry) + self.processors = [AgUiSpanProcessor(runtime=runtime)] + (additional_processors or []) + + async def run(self, input_data: RunAgentInput) -> None: + agent = self.framework_agent + async with Trace(name="ag-ui run wrapper", span_processors=self.processors): + async with Span(name="invoke_graph"): + if self.runtime == "langgraph": + from ag_ui_agentspec.runtimes.langgraph_runner import run_langgraph_agent + await run_langgraph_agent(agent, input_data) + elif self.runtime == "wayflow": + from ag_ui_agentspec.runtimes.wayflow_runner import run_wayflow + await run_wayflow(agent, input_data) + else: + raise NotImplementedError(f"Unsupported runtime: {self.runtime}") diff --git a/integrations/agent-spec/python/ag_ui_agentspec/agentspec_tracing_exporter.py b/integrations/agent-spec/python/ag_ui_agentspec/agentspec_tracing_exporter.py new file mode 100644 index 000000000..6b4c9190e --- /dev/null +++ b/integrations/agent-spec/python/ag_ui_agentspec/agentspec_tracing_exporter.py @@ -0,0 +1,281 @@ +# Copyright © 2025 Oracle and/or its affiliates. +# +# This software is under the Apache License 2.0 +# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License +# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. + +""" +AG-UI span processor for pyagentspec.tracing + +This module bridges pyagentspec.tracing spans/events to AG-UI events +(`ag_ui.core.events`). It mirrors the behavior of the exporter used in the +telemetry package but adapts to the event shapes defined under +`pyagentspec.tracing.events`. + +Notes/limitations for the pyagentspec.tracing version: +- LLM streaming uses `LlmGenerationChunkReceived` with chunk_type MESSAGE only; + tool-call streaming chunks are not available in this event set. +- Tool execution events in this namespace do not include `message_id` nor + `tool_call_id`; therefore, we do not emit AG-UI tool call lifecycle or + result events here. +""" + +from __future__ import annotations + +import ast +import os +import json +import uuid +from contextvars import ContextVar +from typing import Any, Dict, Optional + +# AG‑UI Python SDK (events) +from ag_ui.core.events import ( + RunFinishedEvent, + RunStartedEvent, + StepFinishedEvent, + StepStartedEvent, + TextMessageChunkEvent, + ToolCallResultEvent, + ToolCallChunkEvent, +) + +from pyagentspec.tracing.events.exception import ExceptionRaised +from pyagentspec.tracing.events.event import Event +from pyagentspec.tracing.events.llmgeneration import ( + LlmGenerationChunkReceived, + LlmGenerationRequest, + LlmGenerationResponse, +) +from pyagentspec.tracing.events.tool import ( + ToolExecutionRequest, + ToolExecutionResponse, +) +from pyagentspec.tracing.spanprocessor import SpanProcessor +from pyagentspec.tracing.spans import LlmGenerationSpan, NodeExecutionSpan, ToolExecutionSpan +from pyagentspec.tracing.spans.span import Span + + +# ContextVar used to bridge events into the FastAPI endpoint queue. The server +# should set this per request to an asyncio.Queue that receives AG‑UI events. +EVENT_QUEUE = ContextVar("AG_UI_EVENT_QUEUE", default=None) + + +class AgUiSpanProcessor(SpanProcessor): + """Translate pyagentspec.tracing spans/events into AG-UI events. + + Emission strategy: + - Run lifecycle: RUN_STARTED on startup, RUN_FINISHED on shutdown + - Node spans: STEP_STARTED on start, STEP_FINISHED on end + - LLM text streaming: on first chunk, mark started; emit TEXT_MESSAGE_CHUNK + - LLM response: if no chunks, emit a single TEXT_MESSAGE_CHUNK; mark ended + """ + + def __init__(self, runtime: str) -> None: + self._run = {"thread_id": str(uuid.uuid4()), "run_id": str(uuid.uuid4())} + self._debug = os.getenv("AGUI_DEBUG", "").lower() in ("1", "true", "yes", "on") + # Track if any text chunk has been emitted for a given LLM span + self._llm_chunks_seen: Dict[str, bool] = {} + # Track tool-call lifecycles seen via streaming to avoid double-emitting + self._started_tool_calls: Dict[str, Any] = {} + self._runtime = runtime + + def _emit(self, event_obj) -> None: + queue = EVENT_QUEUE.get() + if queue is None: + raise RuntimeError("AG-UI event queue is not set") + queue.put_nowait(event_obj) + if self._debug: + print("[AGUI DEBUG]" + str(event_obj)) + + @staticmethod + def _escape_html(text: str) -> str: + if text is None: + return "" + return str(text).replace("&", "&").replace("<", "<").replace(">", ">") + + def startup(self) -> None: + self._emit(RunStartedEvent(thread_id=self._run["thread_id"], run_id=self._run["run_id"])) + + def shutdown(self) -> None: + self._emit(RunFinishedEvent(thread_id=self._run["thread_id"], run_id=self._run["run_id"])) + + def on_start(self, span: Span) -> None: + # Prefer span lifecycle for step/tool tracking; events fill in details + if isinstance(span, LlmGenerationSpan): + self._llm_chunks_seen[span.id] = False + elif isinstance(span, NodeExecutionSpan): + step_name = span.node.name + self._emit(StepStartedEvent(step_name=step_name)) + elif isinstance(span, ToolExecutionSpan): + # Do not synthesize AG‑UI tool-call lifecycle here; tool-call lifecycle maps + # to LLM ToolCallChunkReceived/LlmGenerationResponse events instead. + pass + + async def on_start_async(self, span: "Span") -> None: + self.on_start(span) + + def on_end(self, span: "Span") -> None: + # Cleanup / close via span lifecycle where possible + if isinstance(span, LlmGenerationSpan): + self._llm_chunks_seen.pop(span.id, None) + elif isinstance(span, NodeExecutionSpan): + step_name = span.node.name + self._emit(StepFinishedEvent(step_name=step_name)) + elif isinstance(span, ToolExecutionSpan): + # No synthesized tool-call lifecycle on tool span end + pass + + # Event routing + def on_event(self, event: Event, span: Span, *args: Any, **kwargs: Any) -> None: + # if an error is raised, then args will contain something, to fix this + match event: + case LlmGenerationChunkReceived(): + # WayFlow does not assign completion_id in streaming, falling back to request_id + message_id = event.completion_id or event.request_id + if not message_id: + raise ValueError("Expected assistant message id for text chunk") + if event.content: + self._emit( + TextMessageChunkEvent( + message_id=message_id, + role="assistant", + delta=self._escape_html(event.content), + ) + ) + self._llm_chunks_seen[span.id] = True + if event.tool_calls: + if len(event.tool_calls) != 1: + raise ValueError("expected exactly one tool call chunk") + tool_call_chunk = event.tool_calls[0] + tool_name = tool_call_chunk.tool_name + tool_call_id = tool_call_chunk.call_id + if tool_call_id not in self._started_tool_calls: + self._started_tool_calls[tool_call_id] = { + "message_id": message_id + } + self._emit( + ToolCallChunkEvent( + tool_call_id=tool_call_id, + parent_message_id=message_id, + tool_call_name=tool_name, + delta=tool_call_chunk.arguments, + ) + ) + case LlmGenerationRequest(): + # We ignore this for now, it's not needed for AG-UI + return + case LlmGenerationResponse(): + message_id = event.completion_id + if not message_id: + raise ValueError("Expected assistant message id in LLM response") + # If no text chunks were streamed in this span, emit the full completion text as a single content event + if not self._llm_chunks_seen.get(span.id, False): + completion_text = event.content + if completion_text: + self._emit( + TextMessageChunkEvent( + message_id=message_id, + role="assistant", + delta=self._escape_html(completion_text), + ) + ) + self._llm_chunks_seen[span.id] = True + # if a tool_call was not streamed, we emit it here + for tool_call in event.tool_calls: + if tool_call.call_id not in self._started_tool_calls: + self._emit( + ToolCallChunkEvent( + tool_call_id=tool_call.call_id, + parent_message_id=message_id, + tool_call_name=tool_call.tool_name, + delta=tool_call.arguments, + ) + ) + self._started_tool_calls[tool_call.call_id] = { + "message_id": message_id + } + case ToolExecutionRequest(): + if self._runtime != "langgraph" and event.request_id not in self._started_tool_calls: + self._emit( + ToolCallChunkEvent( + tool_call_id=event.request_id, + tool_call_name=event.tool.name, + delta=json.dumps(event.inputs), + ) + ) + self._started_tool_calls[event.request_id] = { + "message_id": span.id # no need for accurate message_id here + } + case ToolExecutionResponse(): + tool_call_id = event.request_id + if not tool_call_id: + raise ValueError("Expected tool_call_id in tool execution response") + message_id = self._started_tool_calls[tool_call_id]["message_id"] + content = _normalize_tool_output(event.outputs) + self._emit( + ToolCallResultEvent( + message_id=message_id, + tool_call_id=tool_call_id, + content=content, + role="tool", + ) + ) + case ExceptionRaised(): + raise RuntimeError("[AG-UI SpanProcessor] Exception occurred during agent execution:" + event.exception_message + f"\n\nStacktrace: {event.exception_stacktrace}") + case _: + return + + async def on_event_async(self, event: Event, span: Span) -> None: + return self.on_event(self, event, span) + + async def on_end_async(self, span: "Span") -> None: + self.on_end(span) + + async def shutdown_async(self) -> None: + self.shutdown() + + async def startup_async(self) -> None: + self.startup() + +def _normalize_tool_output(outputs: Any) -> str: + """Return a JSON string for AG-UI ToolCallResultEvent.content without double-encoding. + + Rules: + - If outputs is a dict with a single key (e.g., {"weather_result": }) and the inner + value is itself JSON-like (dict/list or a JSON string), unwrap to the inner value for UI convenience. + - If content is already a dict/list, serialize exactly once via json.dumps. + - If content is a string that is valid JSON, pass it through unchanged (don’t wrap again). + - Otherwise, stringify primitives. + """ + content: Any = outputs + # Unwrap single-key dicts to their inner value when appropriate + if isinstance(outputs, dict) and len(outputs) == 1: + inner = next(iter(outputs.values())) + # If inner is a dict/list, prefer that directly; if it's a JSON string, keep as string + if isinstance(inner, (dict, list)): + content = inner + else: + content = inner + # If it’s already a dict/list, serialize exactly once + if isinstance(content, (dict, list)): + return json.dumps(content) + # If it’s a string that looks like JSON, pass through as-is (frontend will parse) + if isinstance(content, str) and jsonable(content): + return content + if isinstance(content, str): + try: + content_dict = ast.literal_eval(content) + return json.dumps(content_dict) + except: + pass + # Fallback: stringify primitives + return str(content) + + +def jsonable(string): + try: + json.loads(string) + return True + except: + return False diff --git a/integrations/agent-spec/python/ag_ui_agentspec/agentspecloader.py b/integrations/agent-spec/python/ag_ui_agentspec/agentspecloader.py new file mode 100644 index 000000000..163894268 --- /dev/null +++ b/integrations/agent-spec/python/ag_ui_agentspec/agentspecloader.py @@ -0,0 +1,38 @@ +import json +from typing import Any, Dict, Literal, Optional, TYPE_CHECKING, overload + +if TYPE_CHECKING: + # Replace these with the actual exported types + from langgraph.graph.state import CompiledStateGraph + from wayflowcore.conversationalcomponent import ConversationalComponent as WayflowComponent + +@overload +def load_agent_spec( + runtime: Literal["langgraph"], + agent_spec_json: str, + tool_registry: Optional[Dict[str, Any]] = None, +) -> "CompiledStateGraph[Any, Any, Any]": ... +@overload +def load_agent_spec( + runtime: Literal["wayflow"], + agent_spec_json: str, + tool_registry: Optional[Dict[str, Any]] = None, +) -> "WayflowComponent": ... + +def load_agent_spec( + runtime: Literal["langgraph", "wayflow"], + agent_spec_json: str, + tool_registry: Optional[Dict[str, Any]] = None +) -> object: + match runtime: + case "langgraph": + from pyagentspec.adapters.langgraph import AgentSpecLoader + from langgraph.checkpoint.memory import MemorySaver + + return AgentSpecLoader(tool_registry=tool_registry, checkpointer=MemorySaver()).load_json(agent_spec_json) + case "wayflow": + from wayflowcore.agentspec import AgentSpecLoader + + return AgentSpecLoader(tool_registry=tool_registry).load_json(agent_spec_json) + case _: + raise ValueError(f"Unsupported runtime: {runtime}") diff --git a/integrations/agent-spec/python/ag_ui_agentspec/endpoint.py b/integrations/agent-spec/python/ag_ui_agentspec/endpoint.py new file mode 100644 index 000000000..5d95a087c --- /dev/null +++ b/integrations/agent-spec/python/ag_ui_agentspec/endpoint.py @@ -0,0 +1,73 @@ +import asyncio + +from fastapi import FastAPI, Request +from fastapi.responses import StreamingResponse + +from ag_ui.encoder import EventEncoder +from ag_ui.core import ( + RunAgentInput, + EventType, + RunErrorEvent, +) + +from ag_ui_agentspec.agent import AgentSpecAgent +from ag_ui_agentspec.agentspec_tracing_exporter import EVENT_QUEUE + + +def add_agentspec_fastapi_endpoint(app: FastAPI, agentspec_agent: AgentSpecAgent, path: str = "/"): + """Adds an Agent Spec endpoint to the FastAPI app.""" + + + @app.post(path) + async def agentic_chat_endpoint(input_data: RunAgentInput, request: Request): + """Agentic chat endpoint""" + + # Get the accept header from the request + accept_header = request.headers.get("accept") + + # Create an event encoder to properly format SSE events + encoder = EventEncoder(accept=accept_header) + + async def event_generator(): + queue = asyncio.Queue() + # Bridge telemetry -> SSE by setting the per-request queue into ContextVar + token = EVENT_QUEUE.set(queue) + + async def run_and_close(): + try: + # Run the agent; telemetry will emit events into the queue via ContextVar + await agentspec_agent.run(input_data) + except Exception as e: # pylint: disable=broad-exception-caught + # Forward errors as a RunErrorEvent so the client receives failure info + queue.put_nowait( + RunErrorEvent(message=repr(e)) + ) + finally: + # Signal the stream to end after all events have been emitted + queue.put_nowait(None) + + try: + # Important: create the task after setting the ContextVar so the new Task inherits it + asyncio.create_task(run_and_close()) + + while True: + item = await queue.get() + if item is None: + break + + # Patch lifecycle events with canonical thread/run IDs for the frontend + if item.type == EventType.RUN_STARTED or item.type == EventType.RUN_FINISHED: + item.thread_id = input_data.thread_id + item.run_id = input_data.run_id + + yield encoder.encode(item) + + except Exception as e: # pylint: disable=broad-exception-caught + yield encoder.encode( + RunErrorEvent(message=str(e)) + ) + finally: + # Reset the ContextVar to avoid leaking queues across requests + EVENT_QUEUE.reset(token) + + return StreamingResponse(event_generator(), media_type=encoder.get_content_type()) diff --git a/integrations/agent-spec/python/ag_ui_agentspec/runtimes/__init__.py b/integrations/agent-spec/python/ag_ui_agentspec/runtimes/__init__.py new file mode 100644 index 000000000..2fa794fed --- /dev/null +++ b/integrations/agent-spec/python/ag_ui_agentspec/runtimes/__init__.py @@ -0,0 +1,6 @@ +"""Runtime-specific runners for Agent‑Spec → AG‑UI integration. + +Each runner isolates optional dependencies so users can install only the +frameworks they need (e.g., wayflow, langgraph) without pulling all of them. +""" + diff --git a/integrations/agent-spec/python/ag_ui_agentspec/runtimes/langgraph_runner.py b/integrations/agent-spec/python/ag_ui_agentspec/runtimes/langgraph_runner.py new file mode 100644 index 000000000..79adc2b7e --- /dev/null +++ b/integrations/agent-spec/python/ag_ui_agentspec/runtimes/langgraph_runner.py @@ -0,0 +1,61 @@ +import asyncio +from typing import Any, Dict +import traceback + +from langchain_core.runnables import RunnableConfig +from langgraph.graph.state import CompiledStateGraph + +from ag_ui.core import RunAgentInput +from ag_ui_agentspec.agentspec_tracing_exporter import EVENT_QUEUE + + +def prepare_langgraph_agent_inputs(input_data: RunAgentInput) -> Dict[str, Any]: + messages = input_data.messages + if not messages: + return {"messages": []} + # send only last user/tool messages to avoid duplication with MemorySaver + messages_to_return = [] + for m in messages[-2:]: + m_dict = m.model_dump() + if m_dict["role"] in {"tool", "user"}: + if m_dict["role"] == "user" and "name" in m_dict: + del m_dict["name"] + messages_to_return.append(m_dict) + return {"messages": messages_to_return} + + +async def run_langgraph_agent(agent: CompiledStateGraph, input_data: RunAgentInput) -> None: + inputs = prepare_langgraph_agent_inputs(input_data) + config = RunnableConfig({"configurable": {"thread_id": input_data.thread_id}}) + + current_queue = EVENT_QUEUE.get() + + def _invoke_with_context(inputs: Dict[str, Any]) -> None: + token = EVENT_QUEUE.set(current_queue) + try: + for _ in agent.stream(inputs, stream_mode="messages", config=config): + pass + except Exception as e: + print(traceback.format_exc()) + raise RuntimeError("LangGraph agent crashed (see printed traceback above):" + repr(e)) + finally: + EVENT_QUEUE.reset(token) + + await asyncio.to_thread(_invoke_with_context, inputs) + + +async def run_langgraph_agent_nostream(agent: CompiledStateGraph, input_data: RunAgentInput) -> None: + inputs = prepare_langgraph_agent_inputs(input_data) + config = RunnableConfig({"configurable": {"thread_id": input_data.thread_id}}) + + current_queue = EVENT_QUEUE.get() + + def _invoke_with_context(inputs: Dict[str, Any]) -> None: + token = EVENT_QUEUE.set(current_queue) + try: + agent.invoke(inputs, config=config) + finally: + EVENT_QUEUE.reset(token) + + await asyncio.to_thread(_invoke_with_context, inputs) + diff --git a/integrations/agent-spec/python/ag_ui_agentspec/runtimes/wayflow_runner.py b/integrations/agent-spec/python/ag_ui_agentspec/runtimes/wayflow_runner.py new file mode 100644 index 000000000..aea51ca80 --- /dev/null +++ b/integrations/agent-spec/python/ag_ui_agentspec/runtimes/wayflow_runner.py @@ -0,0 +1,88 @@ +import asyncio +from typing import Any, Dict + +from wayflowcore import Flow as WayflowFlow +from wayflowcore import Agent as WayflowAgent +from wayflowcore.agentspec.tracing import AgentSpecEventListener +from wayflowcore.events.eventlistener import register_event_listeners +from wayflowcore.messagelist import Message, MessageType, ToolRequest, ToolResult + +from ag_ui.core import RunAgentInput +from ag_ui_agentspec.agentspec_tracing_exporter import EVENT_QUEUE + +def prepare_wayflow_agent_input(input_data: RunAgentInput) -> Dict[str, Any]: + messages = [m.model_dump() for m in input_data.messages] + wayflow_messages = [] + for m in messages: + match m["role"]: + case "system": + wm = Message(message_type=MessageType.SYSTEM, content=m["content"]) + case "user": + wm = Message(message_type=MessageType.USER, content=m["content"]) + case "assistant": + wm = Message( + message_type=MessageType.AGENT, + content=m["content"], + tool_requests=[ + ToolRequest( + name=tc["function"]["name"], + args=tc["function"]["arguments"], + tool_request_id=tc["id"], + ) + for tc in (m.get("tool_calls") or []) + ], + ) + case "tool": + wm = Message( + message_type=MessageType.TOOL_RESULT, + tool_result=ToolResult( + content=m["content"], tool_request_id=m["tool_call_id"] + ), + ) + case _: + raise NotImplementedError(f"Unsupported message: {m}") + wayflow_messages.append(wm) + return wayflow_messages + + +def prepare_wayflow_flow_input(input_data: RunAgentInput) -> Dict[str, Any]: + messages = input_data.messages + return {"user_input": messages[-1].content} + + +async def run_wayflow(agent: Any, input_data: RunAgentInput) -> None: + current_queue = EVENT_QUEUE.get() + + if isinstance(agent, WayflowAgent): + agent._add_talk_to_user_tool = False + agent._update_internal_state() + agent_input = prepare_wayflow_agent_input(input_data) + + def _invoke_with_context(fi): + token = EVENT_QUEUE.set(current_queue) + try: + with register_event_listeners([AgentSpecEventListener()]): + conversation = agent.start_conversation(messages=fi) + conversation.execute() + finally: + EVENT_QUEUE.reset(token) + + await asyncio.to_thread(_invoke_with_context, agent_input) + + elif isinstance(agent, WayflowFlow): + flow_input = prepare_wayflow_flow_input(input_data) + + def _invoke_with_context(fi): + token = EVENT_QUEUE.set(current_queue) + try: + with register_event_listeners([AgentSpecEventListener()]): + conversation = agent.start_conversation(fi) + _ = conversation.execute() + finally: + EVENT_QUEUE.reset(token) + + await asyncio.to_thread(_invoke_with_context, flow_input) + + else: + raise NotImplementedError("Unsupported Wayflow component type") + diff --git a/integrations/agent-spec/python/examples/README.md b/integrations/agent-spec/python/examples/README.md new file mode 100644 index 000000000..a487ea389 --- /dev/null +++ b/integrations/agent-spec/python/examples/README.md @@ -0,0 +1,77 @@ +# Open Agent Spec AG-UI Examples +================================ + +This directory contains example usage of the AG-UI integration for Open Agent Spec (Agent Spec). It provides a FastAPI application that demonstrates how to use the Agent Spec agent with the AG-UI protocol. + +## Features + +The examples include implementations for each of the AG-UI dojo features: + +* Agentic Chat +* Human in the Loop +* Agentic Generative UI +* Tool Based Generative UI + +## Setup + +1. Please see the README in the parent folder for instructions on which GitHub repos to clone. + +2. Install dependencies (choose runtimes via extras): + ```bash + # Both runtimes + uv sync --extra langgraph --extra wayflow + # Or pick one runtime + # uv sync --extra langgraph + # uv sync --extra wayflow + ``` + +3. Run the development server: + ```bash + uv run dev + ``` + +Note: Routers are lazy-loaded. If you do not install a runtime extra (e.g., `langgraph`), its corresponding endpoints will not work, resulting in a HTTP 404 error when invoking from Dojo. + +### Environment (.env) + +The example server loads a `.env` file on startup (via `python-dotenv`). Place it in this folder to configure your environment. + +Common variables: +- `PORT`: the HTTP port for the example FastAPI server (default `9003`). +- `OPENAI_BASE_URL`: OpenAI‑compatible API base (e.g., `https://api.openai.com/v1`). +- `OPENAI_MODEL`: model id (e.g., `gpt-4o` or provider‑specific ids). +- `OPENAI_API_KEY`: API key for your provider. + +Example `.env`: +``` +PORT=9003 +OPENAI_BASE_URL=https://api.openai.com/v1 +OPENAI_MODEL=gpt-4o +OPENAI_API_KEY=sk-... +``` + +## Usage + +Once the server is running, launch the frontend Dojo: + +```bash +# Option A — run everything from repo root (multiple apps) +cd ../../../ +pnpm install +pnpm turbo run dev + +# Option B — run only Dojo +cd ag-ui/apps/dojo +AGENT_SPEC_URL=http://localhost:9003 pnpm run dev +``` + +Then open http://localhost:3000. + +By default, the agents can be reached at: + +- `http://localhost:9003//agentic_chat` - Agentic Chat +- `http://localhost:9003//agentic_generative_ui` - Agentic Generative UI +- `http://localhost:9003//human_in_the_loop` - Human in the Loop +- `http://localhost:9003//tool_based_generative_ui` - Tool Based Generative UI + +where `` is a runtime supported by Agent Spec, currently `langgraph` and `wayflow`. diff --git a/integrations/agent-spec/python/examples/pyproject.toml b/integrations/agent-spec/python/examples/pyproject.toml new file mode 100644 index 000000000..34cf38181 --- /dev/null +++ b/integrations/agent-spec/python/examples/pyproject.toml @@ -0,0 +1,34 @@ +[project] +name = "agent-spec-examples" +version = "0.1.0" +description = "Example FastAPI server for Agent-Spec × AG-UI" +readme = "README.md" +requires-python = ">=3.10,<3.14.0" +dependencies = [ + "fastapi>=0.115.0", + "uvicorn>=0.30.0", + "python-dotenv>=1.0.0" +] + +[project.optional-dependencies] +langgraph = ["ag-ui-agent-spec[langgraph]"] +wayflow = ["ag-ui-agent-spec[wayflow]"] + +[project.scripts] +dev = "server:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["server"] + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.uv.sources] +ag-ui-agent-spec = { path = "../", editable = true } + +[tool.uv] +package = true \ No newline at end of file diff --git a/integrations/agent-spec/python/examples/server/__init__.py b/integrations/agent-spec/python/examples/server/__init__.py new file mode 100644 index 000000000..f4aa21f6c --- /dev/null +++ b/integrations/agent-spec/python/examples/server/__init__.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import dotenv +dotenv.load_dotenv() + +import os +import uvicorn +from fastapi import FastAPI + +from server.api import router + +app = FastAPI(title="Agent-Spec x AG-UI Examples") +app.include_router(router) + + +def main(): + port = int(os.getenv("PORT", "9003")) + uvicorn.run("server:app", host="0.0.0.0", port=port, reload=True) + + +__all__ = ["main"] diff --git a/integrations/agent-spec/python/examples/server/api/__init__.py b/integrations/agent-spec/python/examples/server/api/__init__.py new file mode 100644 index 000000000..73a170279 --- /dev/null +++ b/integrations/agent-spec/python/examples/server/api/__init__.py @@ -0,0 +1,3 @@ +from server.api.routes import router as router + +__all__ = ["router"] diff --git a/integrations/agent-spec/python/examples/server/api/agentic_chat.py b/integrations/agent-spec/python/examples/server/api/agentic_chat.py new file mode 100644 index 000000000..f62a75ded --- /dev/null +++ b/integrations/agent-spec/python/examples/server/api/agentic_chat.py @@ -0,0 +1,47 @@ +""" +A simple ReAct-style agentic chat Flow using pyagentspec. + +This mirrors the LangGraph example structure by: +- Defining a single agent capable of tool use (ReAct loop handled by the agent runtime) +- Wiring a minimal Flow: Start -> AgentNode -> End +- Exposing a top-level `assistant` (Flow) variable for integrations to import + +Note: +- This file defines the Flow and its components declaratively. +- Actual tool execution is orchestrator-dependent (e.g., ServerTool/BuiltinTool are executed by the backend/orchestrator). +""" + +from __future__ import annotations + +import os +from typing import Optional + +import dotenv +dotenv.load_dotenv() + +from pyagentspec.agent import Agent +from pyagentspec.llms import OpenAiCompatibleConfig +from pyagentspec.serialization import AgentSpecSerializer +from pyagentspec.tools import ClientTool +from pyagentspec.property import Property + + +agent_llm = OpenAiCompatibleConfig( + name="my_llm", + model_id=os.environ.get("OPENAI_MODEL", "gpt-4o"), + url=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1") +) + +change_background_frontend_tool = ClientTool( + name="change_background", + description="Change the background color of the chat. Can be anything that the CSS background attribute accepts. Regular colors, linear of radial gradients etc.", + inputs=[Property(title="background", json_schema={"title": "background", "type": "string", "description": "The background. Prefer gradients."})] +) + +agent = Agent( + name="agentic_chat_agent", + llm_config=agent_llm, + system_prompt="Be friendly.", + tools=[change_background_frontend_tool] +) +agentic_chat_json = AgentSpecSerializer().to_json(agent) diff --git a/integrations/agent-spec/python/examples/server/api/backend_tool_rendering.py b/integrations/agent-spec/python/examples/server/api/backend_tool_rendering.py new file mode 100644 index 000000000..9b7392bed --- /dev/null +++ b/integrations/agent-spec/python/examples/server/api/backend_tool_rendering.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import os +import time +from typing import Dict, Any + +import dotenv +dotenv.load_dotenv() + +from pyagentspec.agent import Agent +from pyagentspec.llms import OpenAiCompatibleConfig +from pyagentspec.tools import ServerTool +from pyagentspec.property import Property +from pyagentspec.serialization import AgentSpecSerializer + + +def get_weather(location: str) -> Dict[str, Any]: + """ + Get the weather for a given location. + """ + time.sleep(1) # simulates real tool execution + return { + "temperature": 20, + "conditions": "sunny", + "humidity": 50, + "wind_speed": 10, + "feelsLike": 25, + } + +tool_input_property = Property( + title="location", + json_schema={"title": "location", "type": "string", "description": "The location to get the weather forecast. Must be a city/town name."}, +) + +weather_result_property = Property( + title="weather_result", + json_schema={ + "title": "weather_result", + "type": "string" + }, +) + +weather_tool = ServerTool( + name="get_weather", + description="Get the weather for a given location.", + inputs=[tool_input_property], + outputs=[weather_result_property], +) + +agent_llm = OpenAiCompatibleConfig( + name="my_llm", + model_id=os.environ.get("OPENAI_MODEL", "gpt-4o"), + url=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1") +) + +agent = Agent( + name="my_agent", + llm_config=agent_llm, + system_prompt="Based on the weather forecaset result and the user input, write a response to the user", + tools=[weather_tool], + human_in_the_loop=True, +) + +backend_tool_rendering_agent_json = AgentSpecSerializer().to_json(agent) + +tool_registry = {"get_weather": get_weather} diff --git a/integrations/agent-spec/python/examples/server/api/human_in_the_loop.py b/integrations/agent-spec/python/examples/server/api/human_in_the_loop.py new file mode 100644 index 000000000..6433851fe --- /dev/null +++ b/integrations/agent-spec/python/examples/server/api/human_in_the_loop.py @@ -0,0 +1,88 @@ +"""Human-in-the-loop AgentSpec example for AG-UI.""" + +from __future__ import annotations + +import os + +import dotenv + +from pyagentspec.agent import Agent +from pyagentspec.llms import OpenAiCompatibleConfig +from pyagentspec.property import Property +from pyagentspec.serialization import AgentSpecSerializer +from pyagentspec.tools import ClientTool + +dotenv.load_dotenv() + + +steps_property = Property( + title="steps", + json_schema={ + "title": "steps", + "type": "array", + "description": ( + 'Ordered list of candidate steps awaiting human approval. ' + 'Each list element is a dict of two keys, "description" (short imperative command for this step) ' + 'and "status" (one of "enabled", "disabled", "executing", must be specified as "enabled" at the beginning)' + ), + "items": { + "type": "object", + "properties": { + "description": { + "type": "string", + "description": "Short imperative command for this step.", + }, + "status": { + "type": "string", + "enum": ["enabled", "disabled", "executing"], + "description": "The status of the step, it must be specified as 'enabled' at the beginning.", + }, + }, + "required": ["description", "status"], + "additionalProperties": False + }, + }, +) + + +generate_task_steps_tool = ClientTool( + name="generate_task_steps", + description=( + ( + "Generates a list of steps for the user to perform. " + "The input argument is a list of dicts (steps) with fields `description` and `status`." + "`description` is a string of a short imperative command for this step, " + "and `status` is always `enabled` at the beginning. " + "Make sure `status` is always `enabled` at the beginning so that the user can review." + ) + ), + inputs=[steps_property], +) + + +agent_llm = OpenAiCompatibleConfig( + name="hitl_llm", + model_id=os.getenv("OPENAI_MODEL", "gpt-4o"), + url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), +) + + +hitl_agent = Agent( + name="human_in_the_loop_agent", + description="Task planner that collaborates with a human to approve execution steps.", + system_prompt=( + "You are a collaborative planning assistant. " + "When planning tasks use tools only, without any other messages. " + "IMPORTANT: " + "- Use the `generate_task_steps` tool to display the suggested steps to the user " + "- Do not call the `generate_task_steps` twice in a row, ever. " + "- Never repeat the plan, or send a message detailing steps " + "- If accepted, confirm the creation of the plan and the number of selected (enabled) steps only " + "- If not accepted, ask the user for more information, DO NOT use the `generate_task_steps` tool again " + ), + llm_config=agent_llm, + tools=[generate_task_steps_tool], +) + + +human_in_the_loop_agent_json = AgentSpecSerializer().to_json(hitl_agent) diff --git a/integrations/agent-spec/python/examples/server/api/routes.py b/integrations/agent-spec/python/examples/server/api/routes.py new file mode 100644 index 000000000..132dd781b --- /dev/null +++ b/integrations/agent-spec/python/examples/server/api/routes.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import importlib.util +import logging +from fastapi import APIRouter + +from ag_ui_agentspec.agent import AgentSpecAgent +from ag_ui_agentspec.endpoint import add_agentspec_fastapi_endpoint +from server.api.agentic_chat import agentic_chat_json +from server.api.backend_tool_rendering import ( + backend_tool_rendering_agent_json, + tool_registry as backend_tool_registry, +) +from server.api.human_in_the_loop import human_in_the_loop_agent_json +from server.api.tool_based_generative_ui import tool_based_generative_ui_agent_json + +logger = logging.getLogger(__name__) +router = APIRouter() + + +def _is_available(module_name: str) -> bool: + return importlib.util.find_spec(module_name) is not None + + +def _mount(router: APIRouter): + if _is_available("langgraph"): + add_agentspec_fastapi_endpoint( + app=router, + agentspec_agent=AgentSpecAgent(agentic_chat_json, runtime="langgraph"), + path="/langgraph/agentic_chat", + ) + add_agentspec_fastapi_endpoint( + app=router, + agentspec_agent=AgentSpecAgent( + backend_tool_rendering_agent_json, + runtime="langgraph", + tool_registry=backend_tool_registry, + ), + path="/langgraph/backend_tool_rendering", + ) + add_agentspec_fastapi_endpoint( + app=router, + agentspec_agent=AgentSpecAgent(human_in_the_loop_agent_json, runtime="langgraph"), + path="/langgraph/human_in_the_loop", + ) + add_agentspec_fastapi_endpoint( + app=router, + agentspec_agent=AgentSpecAgent( + tool_based_generative_ui_agent_json, runtime="langgraph" + ), + path="/langgraph/tool_based_generative_ui", + ) + else: + logger.info("LangGraph not available. Skipping Agent Spec (LangGraph) endpoints.") + + if _is_available("wayflowcore"): + add_agentspec_fastapi_endpoint( + app=router, + agentspec_agent=AgentSpecAgent(agentic_chat_json, runtime="wayflow"), + path="/wayflow/agentic_chat", + ) + add_agentspec_fastapi_endpoint( + app=router, + agentspec_agent=AgentSpecAgent( + backend_tool_rendering_agent_json, + runtime="wayflow", + tool_registry=backend_tool_registry, + ), + path="/wayflow/backend_tool_rendering", + ) + add_agentspec_fastapi_endpoint( + app=router, + agentspec_agent=AgentSpecAgent(human_in_the_loop_agent_json, runtime="wayflow"), + path="/wayflow/human_in_the_loop", + ) + add_agentspec_fastapi_endpoint( + app=router, + agentspec_agent=AgentSpecAgent( + tool_based_generative_ui_agent_json, runtime="wayflow" + ), + path="/wayflow/tool_based_generative_ui", + ) + else: + logger.info("Wayflow (wayflowcore) not available. Skipping Agent Spec (Wayflow) endpoints.") + +_mount(router) \ No newline at end of file diff --git a/integrations/agent-spec/python/examples/server/api/tool_based_generative_ui.py b/integrations/agent-spec/python/examples/server/api/tool_based_generative_ui.py new file mode 100644 index 000000000..9953026cc --- /dev/null +++ b/integrations/agent-spec/python/examples/server/api/tool_based_generative_ui.py @@ -0,0 +1,113 @@ +"""Tool-based generative UI AgentSpec example.""" + +from __future__ import annotations + +import os + +import dotenv + +from pyagentspec.agent import Agent +from pyagentspec.llms import OpenAiCompatibleConfig +from pyagentspec.property import Property +from pyagentspec.serialization import AgentSpecSerializer +from pyagentspec.tools import ClientTool + +dotenv.load_dotenv() + + +VALID_IMAGE_NAMES = [ + "Osaka_Castle_Turret_Stone_Wall_Pine_Trees_Daytime.jpg", + "Tokyo_Skyline_Night_Tokyo_Tower_Mount_Fuji_View.jpg", + "Itsukushima_Shrine_Miyajima_Floating_Torii_Gate_Sunset_Long_Exposure.jpg", + "Takachiho_Gorge_Waterfall_River_Lush_Greenery_Japan.jpg", + "Bonsai_Tree_Potted_Japanese_Art_Green_Foliage.jpeg", + "Shirakawa-go_Gassho-zukuri_Thatched_Roof_Village_Aerial_View.jpg", + "Ginkaku-ji_Silver_Pavilion_Kyoto_Japanese_Garden_Pond_Reflection.jpg", + "Senso-ji_Temple_Asakusa_Cherry_Blossoms_Kimono_Umbrella.jpg", + "Cherry_Blossoms_Sakura_Night_View_City_Lights_Japan.jpg", + "Mount_Fuji_Lake_Reflection_Cherry_Blossoms_Sakura_Spring.jpg", +] + + +japanese_property = Property( + title="japanese", + json_schema={ + "title": "japanese", + "type": "array", + "description": "Three haiku lines in Japanese, preserved in 5-7-5 syllable pattern.", + "items": {"type": "string"}, + "minItems": 3, + "maxItems": 3, + }, +) + + +english_property = Property( + title="english", + json_schema={ + "title": "english", + "type": "array", + "description": "Three English translations matching each Japanese line.", + "items": {"type": "string"}, + "minItems": 3, + "maxItems": 3, + }, +) + + +image_name_property = Property( + title="image_name", + json_schema={ + "title": "image_name", + "type": "string", + "description": "Filename of an illustration that complements the haiku.", + "enum": VALID_IMAGE_NAMES, + }, +) + + +gradient_property = Property( + title="gradient", + json_schema={ + "title": "gradient", + "type": "string", + "description": "CSS gradient string used to style the haiku card background.", + }, +) + + +generate_haiku_tool = ClientTool( + name="generate_haiku", + description=( + "Render a haiku to the UI by providing matching Japanese and English lines " + "along with a thematic image and background gradient." + ), + inputs=[japanese_property, english_property, image_name_property, gradient_property], +) + + +agent_llm = OpenAiCompatibleConfig( + name="tool_generative_ui_llm", + model_id=os.getenv("OPENAI_MODEL", "gpt-4o"), + url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), +) + + +tool_based_generative_ui_agent = Agent( + name="tool_based_generative_ui_agent", + description="Haiku assistant that uses a UI tool to present poetry and visuals.", + system_prompt=( + "You are a poetic assistant. When the user requests a haiku, you must call the " + "`generate_haiku` tool exactly once, supplying three Japanese lines, three matching " + "English lines, one image name from the allowed list, and a vivid CSS gradient. " + "After the tool call, respond briefly to acknowledge what was created without " + "repeating the haiku verbatim." + ), + llm_config=agent_llm, + tools=[generate_haiku_tool], +) + + +tool_based_generative_ui_agent_json = AgentSpecSerializer().to_json( + tool_based_generative_ui_agent +) diff --git a/integrations/agent-spec/python/pyproject.toml b/integrations/agent-spec/python/pyproject.toml new file mode 100644 index 000000000..fe3f33246 --- /dev/null +++ b/integrations/agent-spec/python/pyproject.toml @@ -0,0 +1,33 @@ +[project] +name = "ag-ui-agent-spec" +version = "0.1.0" +description = "AG-UI FastAPI adapter for Agent-Spec (LangGraph/Wayflow)" +license = "MIT" +readme = "README.md" +requires-python = ">=3.10,<3.14.0" +dependencies = [ + "fastapi>=0.115.0", + "ag-ui-protocol>=0.1.10", +] +authors = [{ name = "Agent Spec team" }] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["ag_ui_agentspec"] + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.uv.sources] +wayflowcore = { path = "../../../../wayflow/wayflowcore", editable = true } +pyagentspec = { path = "../../../../agent-spec/pyagentspec", editable = true } + +[project.optional-dependencies] +langgraph = ["pyagentspec[langgraph]"] +wayflow = ["wayflowcore"] + +[tool.uv] +package = true \ No newline at end of file