diff --git a/lib/jido/agent/server.ex b/lib/jido/agent/server.ex index 7841ba0..732eb25 100644 --- a/lib/jido/agent/server.ex +++ b/lib/jido/agent/server.ex @@ -210,7 +210,7 @@ defmodule Jido.Agent.Server do case ServerState.enqueue(state, signal) do {:ok, new_state} -> # Trigger queue processing - Process.send_after(self(), :process_queue, 0) + maybe_trigger_queue_processing(new_state) {:noreply, new_state} {:error, reason} -> @@ -230,6 +230,18 @@ defmodule Jido.Agent.Server do {:reply, :pong, state} end + def handle_call(:process_queue, _from, %ServerState{} = state) do + processing_state = %{state | mode: :auto} + + case ServerRuntime.process_signals_in_queue(processing_state) do + {:ok, new_state} -> + {:reply, :ok, %{new_state | mode: state.mode}} + + {:debug_break, new_state, _signal} -> + {:reply, :ok, %{new_state | mode: state.mode}} + end + end + def handle_call(_unhandled, _from, state) do {:reply, {:error, :unhandled_call}, state} end @@ -240,7 +252,7 @@ defmodule Jido.Agent.Server do case ServerState.enqueue(state, signal) do {:ok, new_state} -> # Trigger queue processing - Process.send_after(self(), :process_queue, 0) + maybe_trigger_queue_processing(new_state) {:noreply, new_state} {:error, _reason} -> @@ -271,7 +283,7 @@ defmodule Jido.Agent.Server do case ServerState.enqueue(state, signal) do {:ok, new_state} -> # Trigger queue processing - Process.send_after(self(), :process_queue, 0) + maybe_trigger_queue_processing(new_state) {:noreply, new_state} {:error, _reason} -> @@ -295,12 +307,17 @@ defmodule Jido.Agent.Server do {:noreply, state} end - def handle_info(:process_queue, state) do + def handle_info(:process_queue, %ServerState{mode: :auto} = state) do case ServerRuntime.process_signals_in_queue(state) do {:ok, new_state} -> {:noreply, new_state} + {:debug_break, new_state, _signal} -> {:noreply, new_state} end end + def handle_info(:process_queue, %ServerState{} = state) do + {:noreply, state} + end + def handle_info(_unhandled, state) do {:noreply, state} end @@ -532,4 +549,12 @@ defmodule Jido.Agent.Server do end defp maybe_trigger_queue_processing(_state, _mode), do: :ok + + @spec maybe_trigger_queue_processing(ServerState.t()) :: :ok + defp maybe_trigger_queue_processing(%ServerState{mode: :auto}) do + Process.send_after(self(), :process_queue, 0) + :ok + end + + defp maybe_trigger_queue_processing(%ServerState{}), do: :ok end diff --git a/test/agent_case_test.exs b/test/agent_case_test.exs index 4f9f66e..9ad476f 100644 --- a/test/agent_case_test.exs +++ b/test/agent_case_test.exs @@ -4,6 +4,7 @@ defmodule JidoTest.AgentCaseTest do alias JidoTest.TestAgents.BasicAgent alias JidoTest.TestAgents.FullFeaturedAgent + alias JidoTest.Support @moduletag :capture_log @@ -57,7 +58,7 @@ defmodule JidoTest.AgentCaseTest do context = %{agent: agent, server_pid: server_pid} ExUnit.Callbacks.on_exit(fn -> - if Process.alive?(server_pid), do: GenServer.stop(server_pid, :normal, 1000) + Support.cleanup_agent(%{pid: server_pid}) end) context diff --git a/test/jido/agent/bus_spy_integration_test.exs b/test/jido/agent/bus_spy_integration_test.exs index 1909018..9b17477 100644 --- a/test/jido/agent/bus_spy_integration_test.exs +++ b/test/jido/agent/bus_spy_integration_test.exs @@ -24,6 +24,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do send_signal_sync(producer, "root", %{test_data: "cross-process-spy-test"}) wait_for_cross_process_completion([consumer]) + wait_for_received_signals(consumer, 1, timeout: 2000) assert_bus_signal_observed(spy, "child.event") @@ -73,6 +74,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do # Wait for consumer to process the signal wait_for_cross_process_completion([consumer], timeout: 2000) + wait_for_received_signals(consumer, 1, timeout: 2000) received_signals = get_received_signals(consumer) assert length(received_signals) >= 1 end @@ -87,6 +89,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do send_signal_sync(producer, "root", %{sequence: 2, data: "second"}) wait_for_cross_process_completion([consumer]) + wait_for_received_signals(consumer, 2, timeout: 2000) child_signals = get_spy_signals(spy, "child.event") assert length(child_signals) >= 2 @@ -109,6 +112,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do send_signal_sync(producer, "root", %{event_type: "system_action"}) wait_for_cross_process_completion([consumer]) + wait_for_received_signals(consumer, 2, timeout: 2000) all_signals = get_spy_signals(spy, "*") child_signals = get_spy_signals(spy, "child.*") @@ -127,6 +131,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do send_signal_sync(producer, "root", %{test_dispatch_result: true}) wait_for_cross_process_completion([consumer]) + wait_for_received_signals(consumer, 1, timeout: 2000) all_events = get_spy_signals(spy) @@ -156,6 +161,7 @@ defmodule Jido.Agent.BusSpyIntegrationTest do send_signal_sync(producer, "root", %{trace_data: original_trace_data}) wait_for_cross_process_completion([consumer]) + wait_for_received_signals(consumer, 1, timeout: 2000) # Filter to get just the before_dispatch event child_signals = get_spy_signals(spy, "child.event") diff --git a/test/jido/agent/interaction_test.exs b/test/jido/agent/interaction_test.exs index 38772a3..790b731 100644 --- a/test/jido/agent/interaction_test.exs +++ b/test/jido/agent/interaction_test.exs @@ -342,11 +342,14 @@ defmodule JidoTest.Agent.InteractionTest do test "handles invalid agent references gracefully" do {:ok, signal} = Signal.new(%{type: "test_action", data: %{value: 42}}) + # Test atom reference separately (fixed atom is fine - no collision risk) + assert {:error, :not_found} = Interaction.call(:non_existent_atom_agent, signal) + + # Use unique IDs for string refs to avoid async test collisions invalid_refs = [ - "non_existent_agent", - :non_existent_atom_agent, - "invalid1", - "invalid2" + Support.unique_id("non-existent"), + Support.unique_id("non-existent"), + Support.unique_id("non-existent") ] for ref <- invalid_refs do diff --git a/test/jido/agent/server_runtime_test.exs b/test/jido/agent/server_runtime_test.exs index 8277a2e..271497e 100644 --- a/test/jido/agent/server_runtime_test.exs +++ b/test/jido/agent/server_runtime_test.exs @@ -8,7 +8,7 @@ defmodule JidoTest.Agent.ServerRuntimeTest do alias Jido.Agent.Server.Runtime, as: ServerRuntime alias Jido.Agent.Server.Router, as: ServerRouter - alias Jido.{Signal, Instruction} + alias Jido.{Error, Signal, Instruction} @moduletag :capture_log diff --git a/test/jido/agent/trace_cross_process_test.exs b/test/jido/agent/trace_cross_process_test.exs index 9377c1f..b0caca0 100644 --- a/test/jido/agent/trace_cross_process_test.exs +++ b/test/jido/agent/trace_cross_process_test.exs @@ -24,6 +24,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do send_signal_sync(producer, "root", %{test_data: "trace_flow"}) wait_for_cross_process_completion([producer, consumer]) + wait_for_received_signals(consumer, 1, timeout: 2000) dispatched_signals = get_spy_signals(spy) assert length(dispatched_signals) >= 1 @@ -37,6 +38,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do send_signal_sync(producer, "root", %{chain_test: "parent_child"}) wait_for_cross_process_completion([producer, consumer]) + wait_for_received_signals(consumer, 1, timeout: 2000) emitted = get_emitted_signals(producer) received = get_received_signals(consumer) @@ -75,6 +77,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do send_signal_sync(producer, "root", %{bus_test: "trace_preservation"}) wait_for_cross_process_completion([producer, consumer]) + wait_for_received_signals(consumer, 1, timeout: 2000) dispatched_signals = get_spy_signals(spy) assert length(dispatched_signals) >= 1 @@ -95,6 +98,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do send_signal_sync(producer, "root", %{chain_operation: "full_trace"}) wait_for_cross_process_completion([producer, consumer]) + wait_for_received_signals(consumer, 1, timeout: 2000) emitted_signals = get_emitted_signals(producer) received_signals = get_received_signals(consumer) @@ -176,6 +180,7 @@ defmodule Jido.Agent.TraceCrossProcessTest do send_signal_sync(agent_a, "root", %{oracle_scenario: true, trace_data: "T"}) wait_for_cross_process_completion([agent_a, agent_b]) + wait_for_received_signals(agent_b, 1, timeout: 2000) emitted_by_a = get_emitted_signals(agent_a) assert length(emitted_by_a) == 1 diff --git a/test/support/agent_case.ex b/test/support/agent_case.ex index 5be5b46..ddca3c6 100644 --- a/test/support/agent_case.ex +++ b/test/support/agent_case.ex @@ -48,6 +48,7 @@ defmodule JidoTest.AgentCase do - `get_received_signals/1` - Get signals received by ConsumerAgent - `get_latest_trace_context/1` - Get trace context from latest received signal - `wait_for_signal/3` - Wait for agent to receive specific signal type + - `wait_for_received_signals/3` - Wait for agent to receive a minimum count - `wait_for_cross_process_completion/2` - Wait for all agents to reach idle - `assert_received_signal_count/2` - Assert expected number of received signals - `assert_emitted_signal_count/2` - Assert expected number of emitted signals @@ -90,15 +91,14 @@ defmodule JidoTest.AgentCase do agent = agent_module.new("test_agent_#{System.unique_integer([:positive])}") - {:ok, server_pid} = - Server.start_link( - [ - agent: agent, - id: agent.id, - mode: :step, - registry: Jido.Registry - ] ++ opts - ) + base_opts = [ + agent: agent, + id: agent.id, + mode: :step, + registry: Jido.Registry + ] + + {:ok, server_pid} = Server.start_link(Keyword.merge(base_opts, opts)) context = %{agent: agent, server_pid: server_pid} ExUnit.Callbacks.on_exit(fn -> stop_test_agent(context) end) @@ -149,20 +149,19 @@ defmodule JidoTest.AgentCase do when is_binary(signal_type) and is_map(data) do validate_process!(server_pid) - {:ok, signal} = Signal.new(%{type: signal_type, data: data, source: "test", target: agent.id}) - {:ok, _} = Server.cast(server_pid, signal) - - # Small delay to allow async :process_queue message to be delivered and processed - Process.sleep(50) - # Wait for agent to return to idle state with empty queue timeout = Keyword.get(opts, :timeout, 1000) check_interval = Keyword.get(opts, :check_interval, 10) + {:ok, signal} = Signal.new(%{type: signal_type, data: data, source: "test", target: agent.id}) + {:ok, _} = Server.cast(server_pid, signal) + maybe_process_queue(server_pid) + JidoTest.Helpers.Assertions.wait_for( fn -> {:ok, state} = Server.state(server_pid) assert state.status == :idle, "Agent should be idle, but is #{state.status}" + assert :queue.is_empty(state.pending_signals), "Agent queue should be empty" end, timeout: timeout, check_interval: check_interval @@ -311,7 +310,12 @@ defmodule JidoTest.AgentCase do @spec spawn_producer_agent(keyword()) :: agent_context() def spawn_producer_agent(opts \\ []) do routes = Keyword.get(opts, :routes, default_producer_routes()) - opts = Keyword.put(opts, :routes, routes) + + opts = + opts + |> Keyword.put(:routes, routes) + |> Keyword.put_new(:mode, :auto) + spawn_agent(ProducerAgent, opts) end @@ -330,7 +334,7 @@ defmodule JidoTest.AgentCase do @spec spawn_consumer_agent(keyword()) :: agent_context() def spawn_consumer_agent(opts \\ []) do routes = Keyword.get(opts, :routes, default_consumer_routes()) - opts = Keyword.put(opts, :routes, routes) + opts = opts |> Keyword.put(:routes, routes) |> Keyword.put_new(:mode, :auto) spawn_agent(ConsumerAgent, opts) end @@ -440,6 +444,45 @@ defmodule JidoTest.AgentCase do context end + @doc """ + Wait for an agent to receive at least a minimum number of signals. + + This is useful for cross-process tests where signal delivery is asynchronous. + + ## Options + + - `:timeout` - Maximum time to wait in milliseconds (default: 1000) + - `:check_interval` - How often to check in milliseconds (default: 10) + + ## Examples + + consumer_context = spawn_consumer_agent() + # ... trigger signals ... + wait_for_received_signals(consumer_context, 1, timeout: 2000) + """ + @spec wait_for_received_signals(agent_context(), non_neg_integer(), keyword()) :: + agent_context() + def wait_for_received_signals(%{server_pid: server_pid} = context, min_count, opts \\ []) + when is_integer(min_count) and min_count >= 0 do + validate_process!(server_pid) + timeout = Keyword.get(opts, :timeout, 1000) + check_interval = Keyword.get(opts, :check_interval, 10) + + JidoTest.Helpers.Assertions.wait_for( + fn -> + received_signals = get_received_signals(context) + actual_count = length(received_signals) + + assert actual_count >= min_count, + "Expected at least #{min_count} received signals, got #{actual_count}" + end, + timeout: timeout, + check_interval: check_interval + ) + + context + end + @doc """ Wait for signal processing to complete across multiple agents and return context for chaining. @@ -697,8 +740,15 @@ defmodule JidoTest.AgentCase do end end - defp stop_test_agent(%{server_pid: server_pid}) do - if Process.alive?(server_pid), do: GenServer.stop(server_pid, :normal, 1000) + defp stop_test_agent(%{server_pid: pid}) when is_pid(pid), + do: TestSupport.cleanup_agent(%{pid: pid}) + + defp maybe_process_queue(server_pid) do + {:ok, state} = Server.state(server_pid) + + if state.mode != :auto do + _ = GenServer.call(server_pid, :process_queue) + end end # Bus Spy Functions - for observing signals crossing process boundaries diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex index 2c09187..528f793 100644 --- a/test/support/test_helpers.ex +++ b/test/support/test_helpers.ex @@ -165,12 +165,22 @@ defmodule JidoTest.Support do @spec send_signal_sync(map(), String.t(), map(), keyword()) :: :ok def send_signal_sync(%{pid: pid} = _context, signal_type, data \\ %{}, opts \\ []) do timeout = Keyword.get(opts, :timeout, 1000) + check_interval = Keyword.get(opts, :check_interval, 10) {:ok, signal} = create_test_signal(signal_type, data) {:ok, _correlation_id} = Server.cast(pid, signal) + maybe_process_queue(pid) + + JidoTest.Helpers.Assertions.wait_for( + fn -> + {:ok, state} = Server.state(pid) + assert state.status == :idle + assert :queue.is_empty(state.pending_signals) + end, + timeout: timeout, + check_interval: check_interval + ) - # Wait a bit for processing in step mode - if timeout > 0, do: Process.sleep(10) :ok end @@ -197,6 +207,14 @@ defmodule JidoTest.Support do ) end + defp maybe_process_queue(pid) do + {:ok, state} = Server.state(pid) + + if state.mode != :auto do + _ = GenServer.call(pid, :process_queue) + end + end + @doc """ Sets up a test registry in the test context. @@ -337,10 +355,19 @@ defmodule JidoTest.Support do This is automatically called if cleanup is enabled (default: true) when using `start_basic_agent!/1`. """ - def cleanup_agent(%{pid: pid}) do + def cleanup_agent(%{pid: pid}) when is_pid(pid) do if Process.alive?(pid) do - GenServer.stop(pid, :normal, 1000) + # Prevent linked-exit propagation from killing the cleanup process + Process.unlink(pid) + + try do + GenServer.stop(pid, :normal, 1_000) + catch + :exit, _ -> :ok + end end + + :ok end @doc """