Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions lib/jido/agent/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ defmodule Jido.Agent.Server do
case ServerState.enqueue(state, signal) do
{:ok, new_state} ->
# Trigger queue processing
Process.send_after(self(), :process_queue, 0)
maybe_trigger_queue_processing(new_state)
{:noreply, new_state}

{:error, reason} ->
Expand All @@ -230,6 +230,18 @@ defmodule Jido.Agent.Server do
{:reply, :pong, state}
end

def handle_call(:process_queue, _from, %ServerState{} = state) do
processing_state = %{state | mode: :auto}

case ServerRuntime.process_signals_in_queue(processing_state) do
{:ok, new_state} ->
{:reply, :ok, %{new_state | mode: state.mode}}

{:debug_break, new_state, _signal} ->
{:reply, :ok, %{new_state | mode: state.mode}}
end
end

def handle_call(_unhandled, _from, state) do
{:reply, {:error, :unhandled_call}, state}
end
Expand All @@ -240,7 +252,7 @@ defmodule Jido.Agent.Server do
case ServerState.enqueue(state, signal) do
{:ok, new_state} ->
# Trigger queue processing
Process.send_after(self(), :process_queue, 0)
maybe_trigger_queue_processing(new_state)
{:noreply, new_state}

{:error, _reason} ->
Expand Down Expand Up @@ -271,7 +283,7 @@ defmodule Jido.Agent.Server do
case ServerState.enqueue(state, signal) do
{:ok, new_state} ->
# Trigger queue processing
Process.send_after(self(), :process_queue, 0)
maybe_trigger_queue_processing(new_state)
{:noreply, new_state}

{:error, _reason} ->
Expand All @@ -295,12 +307,17 @@ defmodule Jido.Agent.Server do
{:noreply, state}
end

def handle_info(:process_queue, state) do
def handle_info(:process_queue, %ServerState{mode: :auto} = state) do
case ServerRuntime.process_signals_in_queue(state) do
{:ok, new_state} -> {:noreply, new_state}
{:debug_break, new_state, _signal} -> {:noreply, new_state}
end
end

def handle_info(:process_queue, %ServerState{} = state) do
{:noreply, state}
end

def handle_info(_unhandled, state) do
{:noreply, state}
end
Expand Down Expand Up @@ -532,4 +549,12 @@ defmodule Jido.Agent.Server do
end

defp maybe_trigger_queue_processing(_state, _mode), do: :ok

@spec maybe_trigger_queue_processing(ServerState.t()) :: :ok
defp maybe_trigger_queue_processing(%ServerState{mode: :auto}) do
Process.send_after(self(), :process_queue, 0)
:ok
end

defp maybe_trigger_queue_processing(%ServerState{}), do: :ok
end
3 changes: 2 additions & 1 deletion test/agent_case_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule JidoTest.AgentCaseTest do

alias JidoTest.TestAgents.BasicAgent
alias JidoTest.TestAgents.FullFeaturedAgent
alias JidoTest.Support

@moduletag :capture_log

Expand Down Expand Up @@ -57,7 +58,7 @@ defmodule JidoTest.AgentCaseTest do
context = %{agent: agent, server_pid: server_pid}

ExUnit.Callbacks.on_exit(fn ->
if Process.alive?(server_pid), do: GenServer.stop(server_pid, :normal, 1000)
Support.cleanup_agent(%{pid: server_pid})
end)

context
Expand Down
6 changes: 6 additions & 0 deletions test/jido/agent/bus_spy_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do
send_signal_sync(producer, "root", %{test_data: "cross-process-spy-test"})

wait_for_cross_process_completion([consumer])
wait_for_received_signals(consumer, 1, timeout: 2000)

assert_bus_signal_observed(spy, "child.event")

Expand Down Expand Up @@ -73,6 +74,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do

# Wait for consumer to process the signal
wait_for_cross_process_completion([consumer], timeout: 2000)
wait_for_received_signals(consumer, 1, timeout: 2000)
received_signals = get_received_signals(consumer)
assert length(received_signals) >= 1
end
Expand All @@ -87,6 +89,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do
send_signal_sync(producer, "root", %{sequence: 2, data: "second"})

wait_for_cross_process_completion([consumer])
wait_for_received_signals(consumer, 2, timeout: 2000)

child_signals = get_spy_signals(spy, "child.event")
assert length(child_signals) >= 2
Expand All @@ -109,6 +112,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do
send_signal_sync(producer, "root", %{event_type: "system_action"})

wait_for_cross_process_completion([consumer])
wait_for_received_signals(consumer, 2, timeout: 2000)

all_signals = get_spy_signals(spy, "*")
child_signals = get_spy_signals(spy, "child.*")
Expand All @@ -127,6 +131,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do

send_signal_sync(producer, "root", %{test_dispatch_result: true})
wait_for_cross_process_completion([consumer])
wait_for_received_signals(consumer, 1, timeout: 2000)

all_events = get_spy_signals(spy)

Expand Down Expand Up @@ -156,6 +161,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do

send_signal_sync(producer, "root", %{trace_data: original_trace_data})
wait_for_cross_process_completion([consumer])
wait_for_received_signals(consumer, 1, timeout: 2000)

# Filter to get just the before_dispatch event
child_signals = get_spy_signals(spy, "child.event")
Expand Down
11 changes: 7 additions & 4 deletions test/jido/agent/interaction_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,14 @@ defmodule JidoTest.Agent.InteractionTest do
test "handles invalid agent references gracefully" do
{:ok, signal} = Signal.new(%{type: "test_action", data: %{value: 42}})

# Test atom reference separately (fixed atom is fine - no collision risk)
assert {:error, :not_found} = Interaction.call(:non_existent_atom_agent, signal)

# Use unique IDs for string refs to avoid async test collisions
invalid_refs = [
"non_existent_agent",
:non_existent_atom_agent,
"invalid1",
"invalid2"
Support.unique_id("non-existent"),
Support.unique_id("non-existent"),
Support.unique_id("non-existent")
]

for ref <- invalid_refs do
Expand Down
2 changes: 1 addition & 1 deletion test/jido/agent/server_runtime_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule JidoTest.Agent.ServerRuntimeTest do

alias Jido.Agent.Server.Runtime, as: ServerRuntime
alias Jido.Agent.Server.Router, as: ServerRouter
alias Jido.{Signal, Instruction}
alias Jido.{Error, Signal, Instruction}

@moduletag :capture_log

Expand Down
5 changes: 5 additions & 0 deletions test/jido/agent/trace_cross_process_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do
send_signal_sync(producer, "root", %{test_data: "trace_flow"})

wait_for_cross_process_completion([producer, consumer])
wait_for_received_signals(consumer, 1, timeout: 2000)

dispatched_signals = get_spy_signals(spy)
assert length(dispatched_signals) >= 1
Expand All @@ -37,6 +38,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do

send_signal_sync(producer, "root", %{chain_test: "parent_child"})
wait_for_cross_process_completion([producer, consumer])
wait_for_received_signals(consumer, 1, timeout: 2000)

emitted = get_emitted_signals(producer)
received = get_received_signals(consumer)
Expand Down Expand Up @@ -75,6 +77,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do

send_signal_sync(producer, "root", %{bus_test: "trace_preservation"})
wait_for_cross_process_completion([producer, consumer])
wait_for_received_signals(consumer, 1, timeout: 2000)

dispatched_signals = get_spy_signals(spy)
assert length(dispatched_signals) >= 1
Expand All @@ -95,6 +98,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do

send_signal_sync(producer, "root", %{chain_operation: "full_trace"})
wait_for_cross_process_completion([producer, consumer])
wait_for_received_signals(consumer, 1, timeout: 2000)

emitted_signals = get_emitted_signals(producer)
received_signals = get_received_signals(consumer)
Expand Down Expand Up @@ -176,6 +180,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do
send_signal_sync(agent_a, "root", %{oracle_scenario: true, trace_data: "T"})

wait_for_cross_process_completion([agent_a, agent_b])
wait_for_received_signals(agent_b, 1, timeout: 2000)

emitted_by_a = get_emitted_signals(agent_a)
assert length(emitted_by_a) == 1
Expand Down
88 changes: 69 additions & 19 deletions test/support/agent_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ defmodule JidoTest.AgentCase do
- `get_received_signals/1` - Get signals received by ConsumerAgent
- `get_latest_trace_context/1` - Get trace context from latest received signal
- `wait_for_signal/3` - Wait for agent to receive specific signal type
- `wait_for_received_signals/3` - Wait for agent to receive a minimum count
- `wait_for_cross_process_completion/2` - Wait for all agents to reach idle
- `assert_received_signal_count/2` - Assert expected number of received signals
- `assert_emitted_signal_count/2` - Assert expected number of emitted signals
Expand Down Expand Up @@ -90,15 +91,14 @@ defmodule JidoTest.AgentCase do

agent = agent_module.new("test_agent_#{System.unique_integer([:positive])}")

{:ok, server_pid} =
Server.start_link(
[
agent: agent,
id: agent.id,
mode: :step,
registry: Jido.Registry
] ++ opts
)
base_opts = [
agent: agent,
id: agent.id,
mode: :step,
registry: Jido.Registry
]

{:ok, server_pid} = Server.start_link(Keyword.merge(base_opts, opts))

context = %{agent: agent, server_pid: server_pid}
ExUnit.Callbacks.on_exit(fn -> stop_test_agent(context) end)
Expand Down Expand Up @@ -149,20 +149,19 @@ defmodule JidoTest.AgentCase do
when is_binary(signal_type) and is_map(data) do
validate_process!(server_pid)

{:ok, signal} = Signal.new(%{type: signal_type, data: data, source: "test", target: agent.id})
{:ok, _} = Server.cast(server_pid, signal)

# Small delay to allow async :process_queue message to be delivered and processed
Process.sleep(50)

# Wait for agent to return to idle state with empty queue
timeout = Keyword.get(opts, :timeout, 1000)
check_interval = Keyword.get(opts, :check_interval, 10)

{:ok, signal} = Signal.new(%{type: signal_type, data: data, source: "test", target: agent.id})
{:ok, _} = Server.cast(server_pid, signal)
maybe_process_queue(server_pid)

JidoTest.Helpers.Assertions.wait_for(
fn ->
{:ok, state} = Server.state(server_pid)
assert state.status == :idle, "Agent should be idle, but is #{state.status}"
assert :queue.is_empty(state.pending_signals), "Agent queue should be empty"
end,
timeout: timeout,
check_interval: check_interval
Expand Down Expand Up @@ -311,7 +310,12 @@ defmodule JidoTest.AgentCase do
@spec spawn_producer_agent(keyword()) :: agent_context()
def spawn_producer_agent(opts \\ []) do
routes = Keyword.get(opts, :routes, default_producer_routes())
opts = Keyword.put(opts, :routes, routes)

opts =
opts
|> Keyword.put(:routes, routes)
|> Keyword.put_new(:mode, :auto)

spawn_agent(ProducerAgent, opts)
end

Expand All @@ -330,7 +334,7 @@ defmodule JidoTest.AgentCase do
@spec spawn_consumer_agent(keyword()) :: agent_context()
def spawn_consumer_agent(opts \\ []) do
routes = Keyword.get(opts, :routes, default_consumer_routes())
opts = Keyword.put(opts, :routes, routes)
opts = opts |> Keyword.put(:routes, routes) |> Keyword.put_new(:mode, :auto)
spawn_agent(ConsumerAgent, opts)
end

Expand Down Expand Up @@ -440,6 +444,45 @@ defmodule JidoTest.AgentCase do
context
end

@doc """
Wait for an agent to receive at least a minimum number of signals.

This is useful for cross-process tests where signal delivery is asynchronous.

## Options

- `:timeout` - Maximum time to wait in milliseconds (default: 1000)
- `:check_interval` - How often to check in milliseconds (default: 10)

## Examples

consumer_context = spawn_consumer_agent()
# ... trigger signals ...
wait_for_received_signals(consumer_context, 1, timeout: 2000)
"""
@spec wait_for_received_signals(agent_context(), non_neg_integer(), keyword()) ::
agent_context()
def wait_for_received_signals(%{server_pid: server_pid} = context, min_count, opts \\ [])
when is_integer(min_count) and min_count >= 0 do
validate_process!(server_pid)
timeout = Keyword.get(opts, :timeout, 1000)
check_interval = Keyword.get(opts, :check_interval, 10)

JidoTest.Helpers.Assertions.wait_for(
fn ->
received_signals = get_received_signals(context)
actual_count = length(received_signals)

assert actual_count >= min_count,
"Expected at least #{min_count} received signals, got #{actual_count}"
end,
timeout: timeout,
check_interval: check_interval
)

context
end

@doc """
Wait for signal processing to complete across multiple agents and return context for chaining.

Expand Down Expand Up @@ -697,8 +740,15 @@ defmodule JidoTest.AgentCase do
end
end

defp stop_test_agent(%{server_pid: server_pid}) do
if Process.alive?(server_pid), do: GenServer.stop(server_pid, :normal, 1000)
defp stop_test_agent(%{server_pid: pid}) when is_pid(pid),
do: TestSupport.cleanup_agent(%{pid: pid})

defp maybe_process_queue(server_pid) do
{:ok, state} = Server.state(server_pid)

if state.mode != :auto do
_ = GenServer.call(server_pid, :process_queue)
end
end

# Bus Spy Functions - for observing signals crossing process boundaries
Expand Down
Loading