From 1d16b35579f8dd90a9ccd5972434af2c417e0796 Mon Sep 17 00:00:00 2001 From: Cesare Bernardis Date: Tue, 23 Dec 2025 18:01:58 +0100 Subject: [PATCH] Tracing for langgraph graphs --- .../adapters/langgraph/_langgraphconverter.py | 192 ++++++++++++++++-- .../adapters/langgraph/_node_execution.py | 28 ++- .../pyagentspec/adapters/langgraph/_types.py | 11 +- .../pyagentspec/adapters/langgraph/tracing.py | 85 ++++---- .../configs/haiku_without_a_flow.json | 181 +++++++++++++++++ .../tests/adapters/langgraph/test_tracing.py | 142 ++++++++++++- 6 files changed, 569 insertions(+), 70 deletions(-) create mode 100644 pyagentspec/tests/adapters/langgraph/configs/haiku_without_a_flow.json diff --git a/pyagentspec/src/pyagentspec/adapters/langgraph/_langgraphconverter.py b/pyagentspec/src/pyagentspec/adapters/langgraph/_langgraphconverter.py index 5e7ebe73..8ea3140e 100644 --- a/pyagentspec/src/pyagentspec/adapters/langgraph/_langgraphconverter.py +++ b/pyagentspec/src/pyagentspec/adapters/langgraph/_langgraphconverter.py @@ -5,7 +5,7 @@ # (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option. -from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, AsyncGenerator, Dict, Generator, List, Literal, Optional, Tuple, Union from uuid import uuid4 import httpx @@ -32,9 +32,13 @@ langgraph_graph, langgraph_prebuilt, ) -from pyagentspec.adapters.langgraph.tracing import AgentSpecCallbackHandler +from pyagentspec.adapters.langgraph.tracing import ( + AgentSpecLlmCallbackHandler, + AgentSpecToolCallbackHandler, +) from pyagentspec.agent import Agent as AgentSpecAgent -from pyagentspec.flows.edges.controlflowedge import ControlFlowEdge +from pyagentspec.flows.edges import ControlFlowEdge as AgentSpecControlFlowEdge +from pyagentspec.flows.edges import DataFlowEdge as AgentSpecDataFlowEdge from pyagentspec.flows.flow import Flow as AgentSpecFlow from pyagentspec.flows.node import Node as AgentSpecNode from pyagentspec.flows.nodes import AgentNode as AgentSpecAgentNode @@ -63,6 +67,12 @@ from pyagentspec.tools import RemoteTool as AgentSpecRemoteTool from pyagentspec.tools import ServerTool as AgentSpecServerTool from pyagentspec.tools import Tool as AgentSpecTool +from pyagentspec.tracing.events import AgentExecutionEnd as AgentSpecAgentExecutionEnd +from pyagentspec.tracing.events import AgentExecutionStart as AgentSpecAgentExecutionStart +from pyagentspec.tracing.events import FlowExecutionEnd as AgentSpecFlowExecutionEnd +from pyagentspec.tracing.events import FlowExecutionStart as AgentSpecFlowExecutionStart +from pyagentspec.tracing.spans import AgentExecutionSpan as AgentSpecAgentExecutionSpan +from pyagentspec.tracing.spans import FlowExecutionSpan as AgentSpecFlowExecutionSpan class SchemaRegistry: @@ -210,17 +220,12 @@ def _convert( config: RunnableConfig, ) -> Any: if isinstance(agentspec_component, AgentSpecAgent): - callback = AgentSpecCallbackHandler( - llm_config=agentspec_component.llm_config, - tools=agentspec_component.tools, - ) - config_with_callbacks = _add_callback_to_runnable_config(callback, config) return self._agent_convert_to_langgraph( agentspec_component, tool_registry=tool_registry, converted_components=converted_components, checkpointer=checkpointer, - config=config_with_callbacks, + config=config, ) elif isinstance(agentspec_component, AgentSpecLlmConfig): return self._llm_convert_to_langgraph(agentspec_component, config=config) @@ -263,7 +268,7 @@ def _convert( ) def _create_control_flow( - self, control_flow_connections: List[ControlFlowEdge] + self, control_flow_connections: List[AgentSpecControlFlowEdge] ) -> "ControlFlow": control_flow: "ControlFlow" = {} for control_flow_edge in control_flow_connections: @@ -299,6 +304,7 @@ def _flow_convert_to_langgraph( graph_builder = StateGraph( FlowStateSchema, input_schema=FlowInputSchema, output_schema=FlowOutputSchema ) + graph_builder.add_edge(langgraph_graph.START, flow.start_node.id) node_executors = { @@ -342,12 +348,99 @@ def _find_property(properties: List[AgentSpecProperty], name: str) -> AgentSpecP for node_id, node_executor in node_executors.items(): graph_builder.add_node(node_id, node_executor) - for data_flow_edge in flow.data_flow_connections or []: + data_flow_connections: List[AgentSpecDataFlowEdge] = [] + if flow.data_flow_connections is None: + # We manually create data flow connections if they are not given in the flow + # This is the conversion recommended in the Agent Spec language specification + for source_node in flow.nodes: + for destination_node in flow.nodes: + for source_output in source_node.outputs or []: + for destination_input in destination_node.inputs or []: + if source_output.title == destination_input.title: + data_flow_connections.append( + AgentSpecDataFlowEdge( + name=f"{source_node.name}-{destination_node.name}-{source_output.title}", + source_node=source_node, + source_output=source_output.title, + destination_node=destination_node, + destination_input=destination_input.title, + ) + ) + else: + data_flow_connections = flow.data_flow_connections + + for data_flow_edge in data_flow_connections: node_executors[data_flow_edge.source_node.id].attach_edge(data_flow_edge) control_flow: "ControlFlow" = self._create_control_flow(flow.control_flow_connections) self._add_conditional_edges_to_graph(control_flow, graph_builder) - return graph_builder.compile(checkpointer=checkpointer) + compiled_graph = graph_builder.compile(checkpointer=checkpointer) + + # To enable flow execution traces monkey patch all the functions that invoke the compiled graph + + original_stream = compiled_graph.stream + + def patch_with_flow_execution_span(*args: Any, **kwargs: Any) -> Generator[Any, Any, None]: + span_name = f"FlowExecution[{flow.name}]" + inputs = kwargs.get("input", {}) + if not isinstance(inputs, dict): + inputs = {} + with AgentSpecFlowExecutionSpan(name=span_name, flow=flow) as span: + span.add_event(AgentSpecFlowExecutionStart(flow=flow, inputs=inputs)) + original_result: dict[str, Any] | Any = {} + result: dict[str, Any] + # This is going to patch stream and astream, that return iterators and yield chunks + for chunk in original_stream(*args, **kwargs): + yield chunk + if isinstance(chunk, tuple): + original_result = chunk[1] + if not isinstance(original_result, dict): + result = {} + else: + result = original_result + span.add_event( + AgentSpecFlowExecutionEnd( + flow=flow, + outputs=result.get("outputs", {}), + branch_selected=result.get("node_execution_details", {}).get("branch", ""), + ) + ) + + original_astream = compiled_graph.astream + + async def patch_async_with_flow_execution_span( + *args: Any, **kwargs: Any + ) -> AsyncGenerator[Any, Any]: + span_name = f"FlowExecution[{flow.name}]" + inputs = kwargs.get("input", {}) + if not isinstance(inputs, dict): + inputs = {} + with AgentSpecFlowExecutionSpan(name=span_name, flow=flow) as span: + span.add_event(AgentSpecFlowExecutionStart(flow=flow, inputs=inputs)) + original_result: dict[str, Any] | Any = {} + result: dict[str, Any] + # This is going to patch stream and astream, that return iterators and yield chunks + async for chunk in original_astream(*args, **kwargs): + yield chunk + if isinstance(chunk, tuple): + original_result = chunk[1] + if not isinstance(original_result, dict): + result = {} + else: + result = original_result + span.add_event( + AgentSpecFlowExecutionEnd( + flow=flow, + outputs=result.get("outputs", {}), + branch_selected=result.get("node_execution_details", {}).get("branch", ""), + ) + ) + + # Monkey patch invocation functions to inject tracing + # No need to patch `(a)invoke` as the internally use `(a)stream` + compiled_graph.stream = patch_with_flow_execution_span # type: ignore + compiled_graph.astream = patch_async_with_flow_execution_span # type: ignore + return compiled_graph def _node_convert_to_langgraph( self, @@ -593,7 +686,7 @@ def _remote_tool(**kwargs: Any) -> Any: description=remote_tool.description or "", args_schema=args_model, func=_remote_tool, - callbacks=config.get("callbacks"), + callbacks=[AgentSpecToolCallbackHandler(tool=remote_tool)], ) return structured_tool @@ -629,7 +722,7 @@ def _server_tool_convert_to_langgraph( description=description, args_schema=args_model, # model class, not a dict func=tool_obj, - callbacks=config.get("callbacks"), + callbacks=[AgentSpecToolCallbackHandler(tool=agentspec_server_tool)], ) return wrapped @@ -666,6 +759,7 @@ def client_tool(*args: Any, **kwargs: Any) -> Any: description=agentspec_client_tool.description or "", args_schema=args_model, func=client_tool, + # We do not add the tool execution callback here as it's not expected for client tools ) return structured_tool @@ -673,6 +767,7 @@ def _create_react_agent_with_given_info( self, name: str, system_prompt: str, + agent: AgentSpecAgent, llm_config: AgentSpecLlmConfig, tools: List[AgentSpecTool], inputs: List[AgentSpecProperty], @@ -726,7 +821,7 @@ def _create_react_agent_with_given_info( if outputs: output_model = _create_pydantic_model_from_properties("AgentOutputModel", outputs) - return langgraph_prebuilt.create_react_agent( + compiled_graph = langgraph_prebuilt.create_react_agent( name=name, model=model, tools=langgraph_tools, @@ -736,6 +831,62 @@ def _create_react_agent_with_given_info( state_schema=input_model, ) + # To enable flow execution traces monkey patch all the functions that invoke the compiled graph + + original_stream = compiled_graph.stream + + def patch_with_agent_execution_span(*args: Any, **kwargs: Any) -> Generator[Any, Any, Any]: + span_name = f"AgentExecution[{agent.name}]" + inputs = kwargs.get("input", {}) + if not isinstance(inputs, dict): + inputs = {} + with AgentSpecAgentExecutionSpan(name=span_name, agent=agent) as span: + span.add_event(AgentSpecAgentExecutionStart(agent=agent, inputs=inputs)) + original_result: dict[str, Any] | Any = {} + result: dict[str, Any] + # This is going to patch stream and astream, that return iterators and yield chunks + for chunk in original_stream(*args, **kwargs): + yield chunk + if isinstance(chunk, tuple): + original_result = chunk[1] + if not isinstance(original_result, dict): + result = {} + else: + result = original_result + outputs = dict(result.get("structured_response", {})) + span.add_event(AgentSpecAgentExecutionEnd(agent=agent, outputs=outputs)) + + original_astream = compiled_graph.astream + + async def patch_async_with_agent_execution_span( + *args: Any, **kwargs: Any + ) -> AsyncGenerator[Any, Any]: + span_name = f"AgentExecution[{agent.name}]" + inputs = kwargs.get("input", {}) + if not isinstance(inputs, dict): + inputs = {} + with AgentSpecAgentExecutionSpan(name=span_name, agent=agent) as span: + span.add_event(AgentSpecAgentExecutionStart(agent=agent, inputs=inputs)) + original_result: dict[str, Any] | Any = {} + result: dict[str, Any] + # This is going to patch stream and astream, that return iterators and yield chunks + async for chunk in original_astream(*args, **kwargs): + yield chunk + if isinstance(chunk, tuple): + original_result = chunk[1] + if not isinstance(original_result, dict): + result = {} + else: + result = original_result + outputs = dict(result.get("structured_response", {})) + span.add_event(AgentSpecAgentExecutionEnd(agent=agent, outputs=outputs)) + + # Monkey patch invocation functions to inject tracing + # No need to patch `(a)invoke` as the internally use `(a)stream` + compiled_graph.stream = patch_with_agent_execution_span # type: ignore + compiled_graph.astream = patch_async_with_agent_execution_span # type: ignore + return compiled_graph + def _agent_convert_to_langgraph( self, agentspec_component: AgentSpecAgent, @@ -747,6 +898,7 @@ def _agent_convert_to_langgraph( return self._create_react_agent_with_given_info( name=agentspec_component.name, system_prompt=agentspec_component.system_prompt, + agent=agentspec_component, llm_config=agentspec_component.llm_config, tools=agentspec_component.tools, inputs=agentspec_component.inputs or [], @@ -773,6 +925,8 @@ def _llm_convert_to_langgraph( if isinstance(llm_config, (OpenAiCompatibleConfig, OpenAiConfig)): use_responses_api = llm_config.api_type == OpenAIAPIType.RESPONSES + callbacks: List[BaseCallbackHandler] = [AgentSpecLlmCallbackHandler(llm_config=llm_config)] + if isinstance(llm_config, VllmConfig): from langchain_openai import ChatOpenAI @@ -781,7 +935,7 @@ def _llm_convert_to_langgraph( api_key=SecretStr("EMPTY"), base_url=_prepare_openai_compatible_url(llm_config.url), use_responses_api=use_responses_api, - callbacks=config.get("callbacks"), + callbacks=callbacks, **generation_config, ) elif isinstance(llm_config, OllamaConfig): @@ -795,7 +949,7 @@ def _llm_convert_to_langgraph( return ChatOllama( base_url=llm_config.url, model=llm_config.model_id, - callbacks=config.get("callbacks"), + callbacks=callbacks, **generation_config, ) elif isinstance(llm_config, OpenAiConfig): @@ -804,7 +958,7 @@ def _llm_convert_to_langgraph( return ChatOpenAI( model=llm_config.model_id, use_responses_api=use_responses_api, - callbacks=config.get("callbacks"), + callbacks=callbacks, **generation_config, ) elif isinstance(llm_config, OpenAiCompatibleConfig): @@ -814,7 +968,7 @@ def _llm_convert_to_langgraph( model=llm_config.model_id, base_url=_prepare_openai_compatible_url(llm_config.url), use_responses_api=use_responses_api, - callbacks=config.get("callbacks"), + callbacks=callbacks, **generation_config, ) else: diff --git a/pyagentspec/src/pyagentspec/adapters/langgraph/_node_execution.py b/pyagentspec/src/pyagentspec/adapters/langgraph/_node_execution.py index 12500f7f..f4876d46 100644 --- a/pyagentspec/src/pyagentspec/adapters/langgraph/_node_execution.py +++ b/pyagentspec/src/pyagentspec/adapters/langgraph/_node_execution.py @@ -6,7 +6,7 @@ import json from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union, cast import httpx @@ -43,6 +43,9 @@ from pyagentspec.flows.nodes import ToolNode as AgentSpecToolNode from pyagentspec.property import Property as AgentSpecProperty from pyagentspec.property import _empty_default as pyagentspec_empty_default +from pyagentspec.tracing.events import NodeExecutionEnd as AgentSpecNodeExecutionEnd +from pyagentspec.tracing.events import NodeExecutionStart as AgentSpecNodeExecutionStart +from pyagentspec.tracing.spans import NodeExecutionSpan as AgentSpecNodeExecutionSpan MessageLike = Union[BaseMessage, List[str], Tuple[str, str], str, Dict[str, Any]] @@ -54,8 +57,19 @@ def __init__(self, node: Node) -> None: def __call__(self, state: FlowStateSchema) -> Any: inputs = self._get_inputs(state) - outputs, execution_details = self._execute(inputs, state.get("messages", [])) - return self._update_status(outputs, execution_details, state) + span_name = f"{self.node.__class__.__name__}Execution[{self.node.name}]" + with AgentSpecNodeExecutionSpan(name=span_name, node=self.node) as span: + span.add_event(AgentSpecNodeExecutionStart(node=self.node, inputs=inputs)) + outputs, execution_details = self._execute(inputs, state.get("messages", [])) + updated_status = self._update_status(outputs, execution_details, state) + span.add_event( + AgentSpecNodeExecutionEnd( + node=self.node, + outputs=updated_status["outputs"], + branch_selected=updated_status["node_execution_details"]["branch"], + ) + ) + return updated_status def attach_edge(self, edge: DataFlowEdge) -> None: self.edges.append(edge) @@ -260,7 +274,7 @@ def _execute(self, inputs: Dict[str, Any], messages: Messages) -> ExecuteOutput: # Plain callable: we call it like a function tool_output = tool(**inputs) - if isinstance(tool_output, dict): + if isinstance(tool_output, dict) and len(self.node.outputs or []) > 1: # useful for multiple outputs, avoid nesting dictionaries return tool_output, NodeExecutionDetails() @@ -304,6 +318,7 @@ def _create_react_agent_with_given_input_values( ] = AgentSpecToLangGraphConverter()._create_react_agent_with_given_info( name=agentspec_component.name, system_prompt=system_prompt, + agent=agentspec_component, llm_config=agentspec_component.llm_config, tools=agentspec_component.tools, inputs=agentspec_component.inputs or [], @@ -317,6 +332,7 @@ def _create_react_agent_with_given_input_values( def _execute(self, inputs: Dict[str, Any], messages: Messages) -> ExecuteOutput: agent = self._create_react_agent_with_given_input_values(inputs) + agentspec_agent = cast(AgentSpecAgent, self.node.agent) inputs |= { "remaining_steps": 20, # Get the right number of steps left "messages": messages, @@ -329,8 +345,8 @@ def _execute(self, inputs: Dict[str, Any], messages: Messages) -> ExecuteOutput: {"role": "assistant", "content": generated_message.content} ] return {}, NodeExecutionDetails(generated_messages=generated_messages) - - return dict(result.get("structured_response", {})), NodeExecutionDetails() + outputs = dict(result.get("structured_response", {})) + return outputs, NodeExecutionDetails() class InputMessageNodeExecutor(NodeExecutor): diff --git a/pyagentspec/src/pyagentspec/adapters/langgraph/_types.py b/pyagentspec/src/pyagentspec/adapters/langgraph/_types.py index f6f7c7d6..88779f54 100644 --- a/pyagentspec/src/pyagentspec/adapters/langgraph/_types.py +++ b/pyagentspec/src/pyagentspec/adapters/langgraph/_types.py @@ -28,7 +28,8 @@ import langgraph_core # type: ignore from langchain_core.callbacks import BaseCallbackHandler from langchain_core.language_models import BaseChatModel - from langchain_core.messages import BaseMessage, SystemMessage + from langchain_core.messages import BaseMessage, SystemMessage, ToolMessage + from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult from langchain_core.runnables import RunnableBinding, RunnableConfig from langchain_core.tools import BaseTool, StructuredTool from langgraph.graph import StateGraph @@ -62,8 +63,12 @@ BranchSpec = LazyLoader("langgraph.graph._branch").BranchSpec SystemMessage = LazyLoader("langchain_core.messages").SystemMessage BaseMessage = LazyLoader("langchain_core.messages").BaseMessage + ToolMessage = LazyLoader("langchain_core.messages").ToolMessage BaseChatModel = LazyLoader("langchain_core.language_models").BaseChatModel BaseStore = LazyLoader("langgraph.store.base").BaseStore + ChatGenerationChunk = LazyLoader("langchain_core.outputs").ChatGenerationChunk + GenerationChunk = LazyLoader("langchain_core.outputs").GenerationChunk + LLMResult = LazyLoader("langchain_core.outputs").LLMResult LangGraphTool: TypeAlias = Union[BaseTool, Callable[..., Any]] @@ -145,6 +150,7 @@ class FlowOutputSchema(TypedDict): "BaseTool", "SystemMessage", "BaseMessage", + "ToolMessage", "BaseChatModel", "BaseStore", "Checkpointer", @@ -153,4 +159,7 @@ class FlowOutputSchema(TypedDict): "Messages", "BranchSpec", "BaseCallbackHandler", + "ChatGenerationChunk", + "GenerationChunk", + "LLMResult", ] diff --git a/pyagentspec/src/pyagentspec/adapters/langgraph/tracing.py b/pyagentspec/src/pyagentspec/adapters/langgraph/tracing.py index 77dba866..d0fed83a 100644 --- a/pyagentspec/src/pyagentspec/adapters/langgraph/tracing.py +++ b/pyagentspec/src/pyagentspec/adapters/langgraph/tracing.py @@ -10,11 +10,16 @@ from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union from uuid import UUID -from langchain_core.callbacks import BaseCallbackHandler as LangchainBaseCallbackHandler -from langchain_core.messages import BaseMessage, ToolMessage -from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult from typing_extensions import NotRequired +from pyagentspec.adapters.langgraph._types import ( + BaseCallbackHandler, + BaseMessage, + ChatGenerationChunk, + GenerationChunk, + LLMResult, + ToolMessage, +) from pyagentspec.llms.llmconfig import LlmConfig as AgentSpecLlmConfig from pyagentspec.tools import Tool as AgentSpecTool from pyagentspec.tracing.events import ( @@ -50,18 +55,24 @@ } -class AgentSpecCallbackHandler(LangchainBaseCallbackHandler): +class AgentSpecCallbackHandler(BaseCallbackHandler): + + def __init__(self) -> None: + # Track spans per run_id + self.agentspec_spans_registry: Dict[str, AgentSpecSpan] = {} + + +class AgentSpecLlmCallbackHandler(AgentSpecCallbackHandler): def __init__( self, llm_config: AgentSpecLlmConfig, tools: Optional[List[AgentSpecTool]] = None, ) -> None: + super().__init__() # This is only added during tool-call streaming to associate run_id with tool_call_id # (tool_call_id is not available mid-stream) self.messages_in_process: MessagesInProgressRecord = {} - # Track spans per run_id - self.agentspec_spans_registry: Dict[str, AgentSpecSpan] = {} # configs for spans self.llm_config = llm_config self.tools_map: Dict[str, AgentSpecTool] = {t.name: t for t in (tools or [])} @@ -69,7 +80,8 @@ def __init__( def _get_or_start_llm_span(self, run_id_str: str) -> AgentSpecLlmGenerationSpan: span = self.agentspec_spans_registry.get(run_id_str) if not isinstance(span, AgentSpecLlmGenerationSpan): - span = AgentSpecLlmGenerationSpan(llm_config=self.llm_config) + span_name = f"LlmGeneration[{self.llm_config.name}]" + span = AgentSpecLlmGenerationSpan(name=span_name, llm_config=self.llm_config) self.agentspec_spans_registry[run_id_str] = span span.start() return span @@ -104,15 +116,13 @@ def on_chat_model_start( for m in list_of_messages ] - tools = list(self.tools_map.values()) if self.tools_map else [] - span.add_event( AgentSpecLlmGenerationRequest( request_id=run_id_str, llm_config=self.llm_config, llm_generation_config=self.llm_config.default_generation_parameters, prompt=prompt, - tools=tools, + tools=list(self.tools_map.values()), ) ) @@ -220,6 +230,13 @@ def on_llm_end( self.agentspec_spans_registry.pop(run_id, None) self.messages_in_process.pop(run_id, None) + +class AgentSpecToolCallbackHandler(AgentSpecCallbackHandler): + + def __init__(self, tool: AgentSpecTool) -> None: + super().__init__() + self.tool = tool + def on_tool_start( self, serialized: Dict[str, Any], @@ -229,23 +246,11 @@ def on_tool_start( parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: - if kwargs.get("tool_call_id"): - # note that this run_id is different from the run_id in LLM events - # so we cannot use it to correlate with tool_call_id above - raise NotImplementedError( - "[on_tool_start] This is implemented starting from langchain 1.1.2, and we should support it" - ) # get run_id and tool config run_id_str = str(run_id) - tool_name = serialized.get("name") - if not tool_name: - raise ValueError("[on_tool_start] Expected tool name in serialized metadata") - tool_obj = self.tools_map.get(tool_name) - if tool_obj is None: - raise ValueError(f"[on_tool_start] Unknown tool: {tool_name}") - # starting a tool span for this tool - tool_span = AgentSpecToolExecutionSpan(tool=tool_obj) + span_name = f"ToolExecution[{self.tool.name}]" + tool_span = AgentSpecToolExecutionSpan(name=span_name, tool=self.tool) self.agentspec_spans_registry[run_id_str] = tool_span tool_span.start() @@ -265,27 +270,37 @@ def on_tool_end( parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: - if not isinstance(output, ToolMessage): - raise ValueError("[on_tool_end] Expected ToolMessage for tool end") run_id_str = str(run_id) tool_span = self.agentspec_spans_registry.get(run_id_str) - try: - parsed = ( - json.loads(output.content) if isinstance(output.content, str) else output.content - ) - except json.JSONDecodeError as e: - parsed = str(output.content) - outputs = parsed if isinstance(parsed, dict) else {"output": parsed} - if not isinstance(tool_span, AgentSpecToolExecutionSpan): raise ValueError( f"Expected tool_span to be a ToolExecutionSpan but got {type(tool_span)}" ) + if isinstance(output, ToolMessage): + try: + parsed = ( + json.loads(output.content) + if isinstance(output.content, str) + else output.content + ) + except json.JSONDecodeError as e: + parsed = str(output.content) + outputs = parsed if isinstance(parsed, dict) else {"output": parsed} + else: + if ( + not isinstance(output, dict) + and isinstance(self.tool.outputs, list) + and len(self.tool.outputs) == 1 + ): + outputs = {self.tool.outputs[0].title: output} + else: + outputs = output + tool_span.add_event( AgentSpecToolExecutionResponse( - request_id=output.tool_call_id, + request_id=run_id_str, tool=tool_span.tool, outputs=outputs, ) diff --git a/pyagentspec/tests/adapters/langgraph/configs/haiku_without_a_flow.json b/pyagentspec/tests/adapters/langgraph/configs/haiku_without_a_flow.json new file mode 100644 index 00000000..3bf8f867 --- /dev/null +++ b/pyagentspec/tests/adapters/langgraph/configs/haiku_without_a_flow.json @@ -0,0 +1,181 @@ +{ + "component_type": "Flow", + "id": "4697413c-bd67-4852-b502-135d0563a9df", + "name": "haiku_without_a_flow", + "description": null, + "metadata": {}, + "inputs": [], + "outputs": [ + { + "title": "haiku_without_a", + "type": "string" + } + ], + "start_node": { + "$component_ref": "c50c7182-1859-4c80-8e48-843abba0f1b1" + }, + "nodes": [ + { + "$component_ref": "c50c7182-1859-4c80-8e48-843abba0f1b1" + }, + { + "$component_ref": "4c20efdb-8e0b-4b36-a9f4-237071944f71" + }, + { + "$component_ref": "84a0cfc8-da04-46e9-bec2-c4a0c5c14362" + }, + { + "$component_ref": "2403bfe4-a49d-46b9-9bcd-828b6d3d152e" + } + ], + "control_flow_connections": [ + { + "component_type": "ControlFlowEdge", + "id": "cc16730a-a97b-4a7e-978c-3c6559542bbd", + "name": "cf1", + "description": null, + "metadata": {}, + "from_node": { + "$component_ref": "c50c7182-1859-4c80-8e48-843abba0f1b1" + }, + "from_branch": null, + "to_node": { + "$component_ref": "84a0cfc8-da04-46e9-bec2-c4a0c5c14362" + } + }, + { + "component_type": "ControlFlowEdge", + "id": "e1d3d676-0fca-47a9-933c-027a7a0d46f9", + "name": "cf2", + "description": null, + "metadata": {}, + "from_node": { + "$component_ref": "84a0cfc8-da04-46e9-bec2-c4a0c5c14362" + }, + "from_branch": null, + "to_node": { + "$component_ref": "2403bfe4-a49d-46b9-9bcd-828b6d3d152e" + } + }, + { + "component_type": "ControlFlowEdge", + "id": "f771cdc0-8a6f-487a-a64e-a23ba481f3ab", + "name": "cf3", + "description": null, + "metadata": {}, + "from_node": { + "$component_ref": "2403bfe4-a49d-46b9-9bcd-828b6d3d152e" + }, + "from_branch": null, + "to_node": { + "$component_ref": "4c20efdb-8e0b-4b36-a9f4-237071944f71" + } + } + ], + "data_flow_connections": null, + "$referenced_components": { + "c50c7182-1859-4c80-8e48-843abba0f1b1": { + "component_type": "StartNode", + "id": "c50c7182-1859-4c80-8e48-843abba0f1b1", + "name": "start node", + "description": null, + "metadata": {}, + "inputs": [], + "outputs": [], + "branches": [ + "next" + ] + }, + "84a0cfc8-da04-46e9-bec2-c4a0c5c14362": { + "component_type": "LlmNode", + "id": "84a0cfc8-da04-46e9-bec2-c4a0c5c14362", + "name": "llm node", + "description": null, + "metadata": {}, + "inputs": [], + "outputs": [ + { + "title": "haiku", + "type": "string" + } + ], + "branches": [ + "next" + ], + "llm_config": { + "component_type": "OpenAiCompatibleConfig", + "id": "81c0e83e-3585-44b5-8d47-41dd6a0783ee", + "name": "Llama-3.3-70B-Instruct", + "description": null, + "metadata": {}, + "default_generation_parameters": null, + "url": "[[LLAMA70BV33_API_URL]]", + "model_id": "/storage/models/Llama-3.3-70B-Instruct" + }, + "prompt_template": "Write a haiku about Oracle." + }, + "2403bfe4-a49d-46b9-9bcd-828b6d3d152e": { + "component_type": "ToolNode", + "id": "2403bfe4-a49d-46b9-9bcd-828b6d3d152e", + "name": "tool node", + "description": null, + "metadata": {}, + "inputs": [ + { + "title": "haiku", + "type": "string" + } + ], + "outputs": [ + { + "title": "haiku_without_a", + "type": "string" + } + ], + "branches": [ + "next" + ], + "tool": { + "component_type": "ServerTool", + "id": "934c16e4-a1f6-4c13-863c-fb5454bd765f", + "name": "remove_a", + "description": "Remove all the letters `a` from the given input string", + "metadata": {}, + "inputs": [ + { + "title": "haiku", + "type": "string" + } + ], + "outputs": [ + { + "title": "haiku_without_a", + "type": "string" + } + ] + } + }, + "4c20efdb-8e0b-4b36-a9f4-237071944f71": { + "component_type": "EndNode", + "id": "4c20efdb-8e0b-4b36-a9f4-237071944f71", + "name": "end node", + "description": null, + "metadata": {}, + "inputs": [ + { + "title": "haiku_without_a", + "type": "string" + } + ], + "outputs": [ + { + "title": "haiku_without_a", + "type": "string" + } + ], + "branches": [], + "branch_name": "next" + } + }, + "agentspec_version": "25.4.1" +} diff --git a/pyagentspec/tests/adapters/langgraph/test_tracing.py b/pyagentspec/tests/adapters/langgraph/test_tracing.py index 33ae311d..98a43977 100644 --- a/pyagentspec/tests/adapters/langgraph/test_tracing.py +++ b/pyagentspec/tests/adapters/langgraph/test_tracing.py @@ -7,14 +7,27 @@ from typing import List, Tuple from pyagentspec.tracing.events import ( + AgentExecutionEnd, + AgentExecutionStart, Event, + FlowExecutionEnd, + FlowExecutionStart, LlmGenerationRequest, LlmGenerationResponse, + NodeExecutionEnd, + NodeExecutionStart, ToolExecutionRequest, ToolExecutionResponse, ) from pyagentspec.tracing.spanprocessor import SpanProcessor -from pyagentspec.tracing.spans import LlmGenerationSpan, Span, ToolExecutionSpan +from pyagentspec.tracing.spans import ( + AgentExecutionSpan, + FlowExecutionSpan, + LlmGenerationSpan, + NodeExecutionSpan, + Span, + ToolExecutionSpan, +) from pyagentspec.tracing.trace import Trace from ..conftest import _replace_config_placeholders @@ -72,15 +85,72 @@ async def shutdown_async(self) -> None: self.shut_down_async = True -def check_dummyspanprocessor_events_and_spans(span_processor: DummySpanProcessor) -> None: +def check_dummyspanprocessor_agent_events_and_spans(span_processor: DummySpanProcessor) -> None: # Assertions on spans started/ended # We expect at least one of each span type during a normal run started_types = [type(s) for s in span_processor.starts] ended_types = [type(s) for s in span_processor.ends] - # Agent execution events are not emitted yet - # assert any(issubclass(t, AgentExecutionSpan) for t in started_types), "AgentExecutionSpan did not start" - # assert any(issubclass(t, AgentExecutionSpan) for t in ended_types), "AgentExecutionSpan did not end" + assert any( + issubclass(t, AgentExecutionSpan) for t in started_types + ), "AgentExecutionSpan did not start" + assert any( + issubclass(t, AgentExecutionSpan) for t in ended_types + ), "AgentExecutionSpan did not end" + assert any( + issubclass(t, LlmGenerationSpan) for t in started_types + ), "LlmGenerationSpan did not start" + assert any( + issubclass(t, LlmGenerationSpan) for t in ended_types + ), "LlmGenerationSpan did not end" + + assert any( + issubclass(t, ToolExecutionSpan) for t in started_types + ), "ToolExecutionSpan did not start" + assert any( + issubclass(t, ToolExecutionSpan) for t in ended_types + ), "ToolExecutionSpan did not end" + + # Assertions on key events observed + event_types = [type(e) for (e, _s) in span_processor.events] + assert any( + issubclass(t, AgentExecutionStart) for t in event_types + ), "AgentExecutionStart not emitted" + assert any( + issubclass(t, AgentExecutionEnd) for t in event_types + ), "AgentExecutionEnd not emitted" + assert any( + issubclass(t, LlmGenerationRequest) for t in event_types + ), "LlmGenerationRequest not emitted" + assert any( + issubclass(t, LlmGenerationResponse) for t in event_types + ), "LlmGenerationResponse not emitted" + assert any( + issubclass(t, ToolExecutionRequest) for t in event_types + ), "ToolExecutionRequest not emitted" + assert any( + issubclass(t, ToolExecutionResponse) for t in event_types + ), "ToolExecutionResponse not emitted" + + +def check_dummyspanprocessor_flow_events_and_spans(span_processor: DummySpanProcessor) -> None: + # Assertions on spans started/ended + # We expect at least one of each span type during a normal run + started_types = [type(s) for s in span_processor.starts] + ended_types = [type(s) for s in span_processor.ends] + + assert any( + issubclass(t, FlowExecutionSpan) for t in started_types + ), "FlowExecutionSpan did not start" + assert any( + issubclass(t, FlowExecutionSpan) for t in ended_types + ), "FlowExecutionSpan did not end" + assert any( + issubclass(t, NodeExecutionSpan) for t in started_types + ), "NodeExecutionSpan did not start" + assert any( + issubclass(t, NodeExecutionSpan) for t in ended_types + ), "NodeExecutionSpan did not end" assert any( issubclass(t, LlmGenerationSpan) for t in started_types ), "LlmGenerationSpan did not start" @@ -98,8 +168,14 @@ def check_dummyspanprocessor_events_and_spans(span_processor: DummySpanProcessor # Assertions on key events observed event_types = [type(e) for (e, _s) in span_processor.events] # Agent execution events are not emitted yet - # assert any(issubclass(t, AgentExecutionStart) for t in event_types), "AgentExecutionStart not emitted" - # assert any(issubclass(t, AgentExecutionEnd) for t in event_types), "AgentExecutionEnd not emitted" + assert any( + issubclass(t, FlowExecutionStart) for t in event_types + ), "FlowExecutionStart not emitted" + assert any(issubclass(t, FlowExecutionEnd) for t in event_types), "FlowExecutionEnd not emitted" + assert any( + issubclass(t, NodeExecutionStart) for t in event_types + ), "NodeExecutionStart not emitted" + assert any(issubclass(t, NodeExecutionEnd) for t in event_types), "NodeExecutionEnd not emitted" assert any( issubclass(t, LlmGenerationRequest) for t in event_types ), "LlmGenerationRequest not emitted" @@ -134,7 +210,7 @@ def test_langgraph_invoke_tracing_emits_agent_llm_and_tool_events(json_server: s response = weather_agent.invoke(input=agent_input) assert "sunny" in str(response).lower() - check_dummyspanprocessor_events_and_spans(proc) + check_dummyspanprocessor_agent_events_and_spans(proc) def test_langgraph_stream_tracing_emits_agent_llm_and_tool_events(json_server: str) -> None: @@ -162,4 +238,52 @@ def test_langgraph_stream_tracing_emits_agent_llm_and_tool_events(json_server: s response += message_chunk.content assert "sunny" in str(response).lower() - check_dummyspanprocessor_events_and_spans(proc) + check_dummyspanprocessor_agent_events_and_spans(proc) + + +def test_langgraph_invoke_tracing_emits_flow_events(json_server: str) -> None: + + from pyagentspec.adapters.langgraph import AgentSpecLoader + + # Prepare JSON config with placeholders replaced + json_content = (CONFIGS / "haiku_without_a_flow.json").read_text() + final_json = _replace_config_placeholders(json_content, json_server) + + # Convert to LangGraph agent + flow = AgentSpecLoader( + tool_registry={"remove_a": lambda haiku: haiku.replace("a", "")} + ).load_json(final_json) + + proc = DummySpanProcessor() + with Trace(name="langgraph_tracing_test", span_processors=[proc]): + response = flow.invoke(input={"inputs": {}, "messages": []}) + assert "outputs" in response + assert "haiku_without_a" in response["outputs"] + assert "a" not in response["outputs"]["haiku_without_a"] + + check_dummyspanprocessor_flow_events_and_spans(proc) + + +def test_langgraph_stream_tracing_emits_flow_events(json_server: str) -> None: + + from pyagentspec.adapters.langgraph import AgentSpecLoader + + # Prepare JSON config with placeholders replaced + json_content = (CONFIGS / "haiku_without_a_flow.json").read_text() + final_json = _replace_config_placeholders(json_content, json_server) + + # Convert to LangGraph agent + flow = AgentSpecLoader( + tool_registry={"remove_a": lambda haiku: haiku.replace("a", "")} + ).load_json(final_json) + + proc = DummySpanProcessor() + with Trace(name="langgraph_tracing_test", span_processors=[proc]): + for chunk in flow.stream(input={"inputs": {}, "messages": []}, stream_mode="values"): + if chunk: + response = chunk + assert "outputs" in response + assert "haiku_without_a" in response["outputs"] + assert "a" not in response["outputs"]["haiku_without_a"] + + check_dummyspanprocessor_flow_events_and_spans(proc)