Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 34 additions & 80 deletions lib/agents/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,18 @@ def run(starting_agent, input, context: {}, registry: {}, max_turns: DEFAULT_MAX
raise MaxTurnsExceeded, "Exceeded maximum turns: #{max_turns}" if current_turn > max_turns

# Get response from LLM (RubyLLM handles tool execution with halting based handoff detection)
result = if current_turn == 1
# Emit agent thinking event for initial message
context_wrapper.callback_manager.emit_agent_thinking(current_agent.name, input, context_wrapper)
# If conversation history already ends with this user message (e.g. passed
# in via context from an external system), use complete to avoid duplicating it.
input_already_in_history ? chat.complete : chat.ask(input)
else
# Emit agent thinking event for continuation
context_wrapper.callback_manager.emit_agent_thinking(current_agent.name, "(continuing conversation)",
context_wrapper)
chat.complete
end
response = result
response = if current_turn == 1
# Emit agent thinking event for initial message
context_wrapper.callback_manager.emit_agent_thinking(current_agent.name, input, context_wrapper)
# If conversation history already ends with this user message (e.g. passed
# in via context from an external system), use complete to avoid duplicating it.
input_already_in_history ? chat.complete : chat.ask(input)
else
# Emit agent thinking event for continuation
context_wrapper.callback_manager.emit_agent_thinking(current_agent.name, "(continuing conversation)",
context_wrapper)
chat.complete
end
track_usage(response, context_wrapper)

# Emit LLM call complete event with model and response for instrumentation
Expand All @@ -146,22 +145,8 @@ def run(starting_agent, input, context: {}, registry: {}, max_turns: DEFAULT_MAX
# Validate that the target agent is in our registry
# This prevents handoffs to agents that weren't explicitly provided
unless registry[next_agent.name]
save_conversation_state(chat, context_wrapper, current_agent)
error = AgentNotFoundError.new("Handoff failed: Agent '#{next_agent.name}' not found in registry")

result = RunResult.new(
output: nil,
messages: Helpers::MessageExtractor.extract_messages(chat, current_agent),
usage: context_wrapper.usage,
context: context_wrapper.context,
error: error
)

# Emit agent complete and run complete events with error
context_wrapper.callback_manager.emit_agent_complete(current_agent.name, result, error, context_wrapper)
context_wrapper.callback_manager.emit_run_complete(current_agent.name, result, context_wrapper)

return result
return finalize_run(chat, context_wrapper, current_agent, output: nil, error: error)
end

# Save current conversation state before switching
Expand Down Expand Up @@ -198,81 +183,50 @@ def run(starting_agent, input, context: {}, registry: {}, max_turns: DEFAULT_MAX

# Handle non-handoff halts - return the halt content as final response
if response.is_a?(RubyLLM::Tool::Halt)
save_conversation_state(chat, context_wrapper, current_agent)

result = RunResult.new(
output: response.content,
messages: Helpers::MessageExtractor.extract_messages(chat, current_agent),
usage: context_wrapper.usage,
context: context_wrapper.context
)

# Emit agent complete and run complete events
context_wrapper.callback_manager.emit_agent_complete(current_agent.name, result, nil, context_wrapper)
context_wrapper.callback_manager.emit_run_complete(current_agent.name, result, context_wrapper)

return result
return finalize_run(chat, context_wrapper, current_agent, output: response.content)
end

# If tools were called, continue the loop to let them execute
next if response.tool_call?

# If no tools were called, we have our final response

# Save final state before returning
save_conversation_state(chat, context_wrapper, current_agent)

result = RunResult.new(
output: response.content,
messages: Helpers::MessageExtractor.extract_messages(chat, current_agent),
usage: context_wrapper.usage,
context: context_wrapper.context
)

# Emit agent complete and run complete events
context_wrapper.callback_manager.emit_agent_complete(current_agent.name, result, nil, context_wrapper)
context_wrapper.callback_manager.emit_run_complete(current_agent.name, result, context_wrapper)

return result
return finalize_run(chat, context_wrapper, current_agent, output: response.content)
end
rescue MaxTurnsExceeded => e
# Save state even on error
save_conversation_state(chat, context_wrapper, current_agent) if chat

result = RunResult.new(
output: "Conversation ended: #{e.message}",
messages: chat ? Helpers::MessageExtractor.extract_messages(chat, current_agent) : [],
usage: context_wrapper.usage,
error: e,
context: context_wrapper.context
)
finalize_run(chat, context_wrapper, current_agent,
output: "Conversation ended: #{e.message}", error: e)
rescue StandardError => e
finalize_run(chat, context_wrapper, current_agent, output: nil, error: e)
end

# Emit agent complete and run complete events with error
context_wrapper.callback_manager.emit_agent_complete(current_agent.name, result, e, context_wrapper)
context_wrapper.callback_manager.emit_run_complete(current_agent.name, result, context_wrapper)
private

result
rescue StandardError => e
# Save state even on error
# Saves conversation state, builds a RunResult, emits completion callbacks, and returns it.
# Centralises the finalize-and-return pattern used by the normal path, halt path, and error rescues.
#
# @param chat [RubyLLM::Chat, nil] The chat instance (nil in early-failure rescues)
# @param context_wrapper [RunContext] Context wrapper for state and callbacks
# @param current_agent [Agents::Agent] The currently active agent
# @param output [String, nil] The output text for the result
# @param error [StandardError, nil] Optional error to attach to the result
# @return [RunResult]
def finalize_run(chat, context_wrapper, current_agent, output:, error: nil)
save_conversation_state(chat, context_wrapper, current_agent) if chat

result = RunResult.new(
output: nil,
output: output,
messages: chat ? Helpers::MessageExtractor.extract_messages(chat, current_agent) : [],
usage: context_wrapper.usage,
error: e,
error: error,
context: context_wrapper.context
)

# Emit agent complete and run complete events with error
context_wrapper.callback_manager.emit_agent_complete(current_agent.name, result, e, context_wrapper)
context_wrapper.callback_manager.emit_agent_complete(current_agent.name, result, error, context_wrapper)
context_wrapper.callback_manager.emit_run_complete(current_agent.name, result, context_wrapper)

result
end

private

# Creates a deep copy of context data for thread safety.
# Preserves conversation history array structure while avoiding agent mutation.
#
Expand Down
Loading