From f3eea2a6034a02760137d47838af9b8974c494b3 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Fri, 2 Aug 2024 11:28:34 +0300 Subject: [PATCH 01/27] add missing @impl behaviours --- lib/coney/application_supervisor.ex | 1 + lib/coney/connection_server.ex | 4 ++++ lib/coney/consumer_server.ex | 2 ++ 3 files changed, 7 insertions(+) diff --git a/lib/coney/application_supervisor.ex b/lib/coney/application_supervisor.ex index b40000b..8ffcec4 100644 --- a/lib/coney/application_supervisor.ex +++ b/lib/coney/application_supervisor.ex @@ -14,6 +14,7 @@ defmodule Coney.ApplicationSupervisor do } end + @impl Supervisor def init([consumers]) do settings = settings() diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 0a2d6dc..81631e2 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -16,6 +16,7 @@ defmodule Coney.ConnectionServer do GenServer.start_link(__MODULE__, [consumers, adapter, settings, topology], name: __MODULE__) end + @impl GenServer def init([consumers, adapter, settings, topology]) do send(self(), :after_init) @@ -40,6 +41,7 @@ defmodule Coney.ConnectionServer do GenServer.call(__MODULE__, {:publish, exchange_name, routing_key, message}) end + @impl GenServer def handle_info(:after_init, state) do rabbitmq_connect(state) end @@ -50,11 +52,13 @@ defmodule Coney.ConnectionServer do rabbitmq_connect(state) end + @impl GenServer def terminate(_reason, %State{amqp_conn: conn, adapter: adapter} = _state) do :ok = adapter.close(conn) ConnectionRegistry.terminated(self()) end + @impl GenServer def handle_call({:confirm, channel, tag}, _from, %State{adapter: adapter} = state) do adapter.confirm(channel, tag) diff --git a/lib/coney/consumer_server.ex b/lib/coney/consumer_server.ex index f64dd9b..416f7cf 100644 --- a/lib/coney/consumer_server.ex +++ b/lib/coney/consumer_server.ex @@ -9,10 +9,12 @@ defmodule Coney.ConsumerServer do GenServer.start_link(__MODULE__, [consumer, chan]) end + @impl GenServer def init([consumer, chan]) do {:ok, %{consumer: consumer, chan: chan, tasks: %{}}} end + @impl GenServer def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, state) do {:noreply, state} end From d2752d298dc1931af1285ecd52d05b093fab540c Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Fri, 2 Aug 2024 13:11:35 +0300 Subject: [PATCH 02/27] replace channel with {channel_ref, channel} map --- lib/coney/connection_server.ex | 39 ++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 81631e2..64c2f58 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -9,7 +9,7 @@ defmodule Coney.ConnectionServer do } defmodule State do - defstruct [:consumers, :adapter, :settings, :amqp_conn, :topology] + defstruct [:consumers, :adapter, :settings, :amqp_conn, :topology, :channels] end def start_link([consumers, [adapter: adapter, settings: settings, topology: topology]]) do @@ -25,12 +25,12 @@ defmodule Coney.ConnectionServer do {:ok, %State{consumers: consumers, adapter: adapter, settings: settings, topology: topology}} end - def confirm(channel, tag) do - GenServer.call(__MODULE__, {:confirm, channel, tag}) + def confirm(channel_ref, tag) do + GenServer.call(__MODULE__, {:confirm, channel_ref, tag}) end - def reject(channel, tag, requeue) do - GenServer.call(__MODULE__, {:reject, channel, tag, requeue}) + def reject(channel_ref, tag, requeue) do + GenServer.call(__MODULE__, {:reject, channel_ref, tag, requeue}) end def publish(exchange_name, message) do @@ -59,13 +59,23 @@ defmodule Coney.ConnectionServer do end @impl GenServer - def handle_call({:confirm, channel, tag}, _from, %State{adapter: adapter} = state) do + def handle_call( + {:confirm, channel_ref, tag}, + _from, + %State{adapter: adapter, channels: channels} = state + ) do + channel = channel_from_ref(channels, channel_ref) adapter.confirm(channel, tag) {:reply, :confirmed, state} end - def handle_call({:reject, channel, tag, requeue}, _from, %State{adapter: adapter} = state) do + def handle_call( + {:reject, channel_ref, tag, requeue}, + _from, + %State{adapter: adapter, channels: channels} = state + ) do + channel = channel_from_ref(channels, channel_ref) adapter.reject(channel, tag, requeue: requeue) {:reply, :rejected, state} @@ -97,21 +107,28 @@ defmodule Coney.ConnectionServer do ) do conn = adapter.open(settings) adapter.init_topology(conn, topology) - start_consumers(consumers, adapter, conn) + channels = start_consumers(consumers, adapter, conn) ConnectionRegistry.connected(self()) - {:noreply, %State{state | amqp_conn: conn}} + {:noreply, %State{state | amqp_conn: conn, channels: channels}} end defp start_consumers(consumers, adapter, conn) do - Enum.each(consumers, fn consumer -> + consumers + |> Enum.map(fn consumer -> subscribe_chan = adapter.create_channel(conn) + chan_ref = :erlang.make_ref() - {:ok, pid} = ConsumerSupervisor.start_consumer(consumer, subscribe_chan) + {:ok, pid} = ConsumerSupervisor.start_consumer(consumer, chan_ref) adapter.subscribe(subscribe_chan, pid, consumer) Logger.debug("#{inspect(consumer)} (#{inspect(pid)}) started") + + {chan_ref, subscribe_chan} end) + |> Map.new() end + + defp channel_from_ref(channels, channel_ref), do: Map.fetch!(channels, channel_ref) end From 40241f4b3397f72cd41dc61de20a969d6643c968 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Fri, 2 Aug 2024 13:26:31 +0300 Subject: [PATCH 03/27] rename variables --- lib/coney/connection_server.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 64c2f58..8386af7 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -118,14 +118,14 @@ defmodule Coney.ConnectionServer do consumers |> Enum.map(fn consumer -> subscribe_chan = adapter.create_channel(conn) - chan_ref = :erlang.make_ref() + channel_ref = :erlang.make_ref() - {:ok, pid} = ConsumerSupervisor.start_consumer(consumer, chan_ref) + {:ok, pid} = ConsumerSupervisor.start_consumer(consumer, channel_ref) adapter.subscribe(subscribe_chan, pid, consumer) Logger.debug("#{inspect(consumer)} (#{inspect(pid)}) started") - {chan_ref, subscribe_chan} + {channel_ref, subscribe_chan} end) |> Map.new() end From b70663497759d8f9cb0fc45a745ea78f144eeb2a Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Mon, 5 Aug 2024 11:06:26 +0300 Subject: [PATCH 04/27] rearchitecture (untested) --- lib/coney/application_supervisor.ex | 2 +- lib/coney/connection_server.ex | 69 ++++++++++++------- lib/coney/consumer_server.ex | 8 ++- lib/coney/consumer_supervisor.ex | 18 ++--- .../coney/consumer/consumer_server_test.exs | 9 ++- 5 files changed, 67 insertions(+), 39 deletions(-) diff --git a/lib/coney/application_supervisor.ex b/lib/coney/application_supervisor.ex index 8ffcec4..b126c18 100644 --- a/lib/coney/application_supervisor.ex +++ b/lib/coney/application_supervisor.ex @@ -19,7 +19,7 @@ defmodule Coney.ApplicationSupervisor do settings = settings() children = [ - ConsumerSupervisor, + {ConsumerSupervisor, [consumers]}, {ConnectionServer, [consumers, settings]} ] diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 8386af7..2f1c8e5 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -3,10 +3,7 @@ defmodule Coney.ConnectionServer do require Logger - alias Coney.{ - ConsumerSupervisor, - HealthCheck.ConnectionRegistry - } + alias Coney.HealthCheck.ConnectionRegistry defmodule State do defstruct [:consumers, :adapter, :settings, :amqp_conn, :topology, :channels] @@ -22,7 +19,14 @@ defmodule Coney.ConnectionServer do ConnectionRegistry.associate(self()) - {:ok, %State{consumers: consumers, adapter: adapter, settings: settings, topology: topology}} + {:ok, + %State{ + consumers: consumers, + adapter: adapter, + settings: settings, + topology: topology, + channels: Map.new() + }} end def confirm(channel_ref, tag) do @@ -41,15 +45,19 @@ defmodule Coney.ConnectionServer do GenServer.call(__MODULE__, {:publish, exchange_name, routing_key, message}) end + def subscribe(consumer) do + GenServer.call(__MODULE__, {:subscribe, consumer}) + end + @impl GenServer def handle_info(:after_init, state) do - rabbitmq_connect(state) + {:noreply, rabbitmq_connect(state)} end def handle_info({:DOWN, _, :process, _pid, reason}, state) do ConnectionRegistry.disconnected(self()) Logger.error("#{__MODULE__} (#{inspect(self())}) connection lost: #{inspect(reason)}") - rabbitmq_connect(state) + {:noreply, state |> rabbitmq_connect() |> update_channels()} end @impl GenServer @@ -70,6 +78,22 @@ defmodule Coney.ConnectionServer do {:reply, :confirmed, state} end + def handle_call( + {:subscribe, consumer}, + {consumer_pid, _tag}, + %State{amqp_conn: conn, adapter: adapter, channels: channels} = state + ) do + channel = adapter.create_channel(conn) + channel_ref = :erlang.make_ref() + + adapter.subscribe(channel, consumer_pid, consumer) + + new_channels = Map.put(channels, channel_ref, {consumer_pid, consumer, channel}) + + Logger.debug("#{inspect(consumer)} (#{inspect(consumer_pid)}) started") + {:reply, channel_ref, %State{state | channels: new_channels}} + end + def handle_call( {:reject, channel_ref, tag, requeue}, _from, @@ -99,7 +123,6 @@ defmodule Coney.ConnectionServer do defp rabbitmq_connect( %State{ - consumers: consumers, adapter: adapter, settings: settings, topology: topology @@ -107,28 +130,28 @@ defmodule Coney.ConnectionServer do ) do conn = adapter.open(settings) adapter.init_topology(conn, topology) - channels = start_consumers(consumers, adapter, conn) ConnectionRegistry.connected(self()) - {:noreply, %State{state | amqp_conn: conn, channels: channels}} + %State{state | amqp_conn: conn} end - defp start_consumers(consumers, adapter, conn) do - consumers - |> Enum.map(fn consumer -> - subscribe_chan = adapter.create_channel(conn) - channel_ref = :erlang.make_ref() + defp channel_from_ref(channels, channel_ref) do + {_consumer_pid, _consumer, channel} = Map.fetch!(channels, channel_ref) - {:ok, pid} = ConsumerSupervisor.start_consumer(consumer, channel_ref) - adapter.subscribe(subscribe_chan, pid, consumer) + channel + end - Logger.debug("#{inspect(consumer)} (#{inspect(pid)}) started") + defp update_channels(%State{amqp_conn: conn, adapter: adapter, channels: channels} = state) do + new_channels = + Enum.map(channels, fn {channel_ref, {consumer_pid, consumer, _dead_channel}} -> + new_channel = adapter.create_channel(conn) + adapter.subscribe(new_channel, consumer_pid, consumer) - {channel_ref, subscribe_chan} - end) - |> Map.new() - end + {channel_ref, {consumer_pid, consumer, new_channel}} + end) + |> Map.new() - defp channel_from_ref(channels, channel_ref), do: Map.fetch!(channels, channel_ref) + %State{state | channels: new_channels} + end end diff --git a/lib/coney/consumer_server.ex b/lib/coney/consumer_server.ex index 416f7cf..147d29e 100644 --- a/lib/coney/consumer_server.ex +++ b/lib/coney/consumer_server.ex @@ -5,12 +5,14 @@ defmodule Coney.ConsumerServer do require Logger - def start_link([consumer, chan]) do - GenServer.start_link(__MODULE__, [consumer, chan]) + def start_link([consumer]) do + GenServer.start_link(__MODULE__, [consumer]) end @impl GenServer - def init([consumer, chan]) do + def init([consumer]) do + chan = ConnectionServer.subscribe(consumer) + {:ok, %{consumer: consumer, chan: chan, tasks: %{}}} end diff --git a/lib/coney/consumer_supervisor.ex b/lib/coney/consumer_supervisor.ex index a3ebeb7..4e406c2 100644 --- a/lib/coney/consumer_supervisor.ex +++ b/lib/coney/consumer_supervisor.ex @@ -1,16 +1,16 @@ defmodule Coney.ConsumerSupervisor do - use DynamicSupervisor + use Supervisor - def start_link(_args) do - DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) - end + alias Coney.ConsumerServer - def init([]) do - DynamicSupervisor.init(strategy: :one_for_one) + def start_link([consumers]) do + Supervisor.start_link(__MODULE__, [consumers], name: __MODULE__) end - def start_consumer(consumer, chan) do - spec = {Coney.ConsumerServer, [consumer, chan]} - DynamicSupervisor.start_child(__MODULE__, spec) + @impl Supervisor + def init([consumers]) do + children = Enum.map(consumers, fn consumer -> {ConsumerServer, [consumer]} end) + + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/test/lib/coney/consumer/consumer_server_test.exs b/test/lib/coney/consumer/consumer_server_test.exs index 849e456..9d69401 100644 --- a/test/lib/coney/consumer/consumer_server_test.exs +++ b/test/lib/coney/consumer/consumer_server_test.exs @@ -5,13 +5,16 @@ defmodule ConsumerServerTest do setup do [ - args: [FakeConsumer, :channel], - state: %{consumer: FakeConsumer, chan: :channel, tasks: %{}} + args: [FakeConsumer], + state: %{consumer: FakeConsumer, tasks: %{}, chan: :erlang.make_ref()} ] end test "initial state", %{args: args, state: state} do - assert {:ok, ^state} = ConsumerServer.init(args) + assert {:ok, initial_state} = ConsumerServer.init(args) + assert initial_state.consumer == state.consumer + assert initial_state.tasks |> Map.equal?(Map.new()) + assert initial_state.chan |> is_reference() end test ":basic_consume_ok", %{state: state} do From 6f869724cb30ba6fa35acbc28be431ee92dbd41b Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Mon, 5 Aug 2024 11:28:01 +0300 Subject: [PATCH 05/27] correct server initialization order --- config/config.exs | 4 +++- config/test.exs | 4 +++- lib/coney/application_supervisor.ex | 4 ++-- mix.exs | 4 ++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/config/config.exs b/config/config.exs index e5ba2b1..433d112 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,7 +8,9 @@ config :coney, url: "amqp://guest:guest@localhost", timeout: 1000 }, - workers: [] + workers: [ + FakeConsumer + ] config :logger, level: :info diff --git a/config/test.exs b/config/test.exs index 761ae22..162f60b 100644 --- a/config/test.exs +++ b/config/test.exs @@ -8,4 +8,6 @@ config :coney, url: "amqp://guest:guest@localhost:5672", timeout: 1000 }, - workers: [] + workers: [ + FakeConsumer + ] diff --git a/lib/coney/application_supervisor.ex b/lib/coney/application_supervisor.ex index b126c18..f79e96f 100644 --- a/lib/coney/application_supervisor.ex +++ b/lib/coney/application_supervisor.ex @@ -19,8 +19,8 @@ defmodule Coney.ApplicationSupervisor do settings = settings() children = [ - {ConsumerSupervisor, [consumers]}, - {ConnectionServer, [consumers, settings]} + {ConnectionServer, [consumers, settings]}, + {ConsumerSupervisor, [consumers]} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/mix.exs b/mix.exs index 7256122..1e5c331 100644 --- a/mix.exs +++ b/mix.exs @@ -37,7 +37,7 @@ defmodule Coney.Mixfile do """ end - defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(env) when env in [:test, :dev], do: ["lib", "test/support"] defp elixirc_paths(_), do: ["lib"] @@ -47,7 +47,7 @@ defmodule Coney.Mixfile do files: ["lib", "mix.exs", "README.md", "LICENSE.txt"], maintainers: ["Aleksandr Fomin"], licenses: ["MIT"], - links: %{"GitHub" => "https://github.com/llxff/coney"} + links: %{"GitHub" => "https://github.com/coingaming/coney"} ] end end From 9f1f0614d1a105808597796830cd1c24750742e1 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Mon, 5 Aug 2024 11:31:01 +0300 Subject: [PATCH 06/27] remove consumers from ConnectionServer init --- lib/coney/application_supervisor.ex | 2 +- lib/coney/connection_server.ex | 17 +++++------------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/lib/coney/application_supervisor.ex b/lib/coney/application_supervisor.ex index f79e96f..e958236 100644 --- a/lib/coney/application_supervisor.ex +++ b/lib/coney/application_supervisor.ex @@ -19,7 +19,7 @@ defmodule Coney.ApplicationSupervisor do settings = settings() children = [ - {ConnectionServer, [consumers, settings]}, + {ConnectionServer, [settings]}, {ConsumerSupervisor, [consumers]} ] diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 2f1c8e5..4e9de98 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -6,27 +6,20 @@ defmodule Coney.ConnectionServer do alias Coney.HealthCheck.ConnectionRegistry defmodule State do - defstruct [:consumers, :adapter, :settings, :amqp_conn, :topology, :channels] + defstruct [:adapter, :settings, :amqp_conn, :topology, :channels] end - def start_link([consumers, [adapter: adapter, settings: settings, topology: topology]]) do - GenServer.start_link(__MODULE__, [consumers, adapter, settings, topology], name: __MODULE__) + def start_link([[adapter: adapter, settings: settings, topology: topology]]) do + GenServer.start_link(__MODULE__, [adapter, settings, topology], name: __MODULE__) end @impl GenServer - def init([consumers, adapter, settings, topology]) do + def init([adapter, settings, topology]) do send(self(), :after_init) ConnectionRegistry.associate(self()) - {:ok, - %State{ - consumers: consumers, - adapter: adapter, - settings: settings, - topology: topology, - channels: Map.new() - }} + {:ok, %State{adapter: adapter, settings: settings, topology: topology, channels: Map.new()}} end def confirm(channel_ref, tag) do From 9857aec12f468c645c5ad12c5f4899c438bff0e4 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Mon, 5 Aug 2024 13:35:12 +0300 Subject: [PATCH 07/27] test for handle_info normal termination --- .../coney/consumer/consumer_server_test.exs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/lib/coney/consumer/consumer_server_test.exs b/test/lib/coney/consumer/consumer_server_test.exs index 9d69401..36029eb 100644 --- a/test/lib/coney/consumer/consumer_server_test.exs +++ b/test/lib/coney/consumer/consumer_server_test.exs @@ -41,4 +41,22 @@ defmodule ConsumerServerTest do assert updated_state.consumer == state.consumer assert updated_state.chan == state.chan end + + describe "handle_info/2" do + setup do + %{state: %{consumer: FakeConsumer, tasks: Map.new(), chan: :erlang.make_ref()}} + end + + test "demonitors a task once it completes successfully", %{state: state} do + task_ref = :erlang.make_ref() + state = put_in(state, [:tasks, task_ref], 1) + + refute state[:tasks] |> Map.equal?(Map.new()) + + down_msg = {:DOWN, task_ref, :dont_care, :dont_care, :normal} + + assert {:noreply, new_state} = ConsumerServer.handle_info(down_msg, state) + assert new_state[:tasks] |> Map.equal?(Map.new()) + end + end end From 399a44a96ffbcf37e9fa8a5d85c9d63a19214596 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Mon, 5 Aug 2024 14:34:53 +0300 Subject: [PATCH 08/27] test for handle_info unexpected termination --- test/lib/coney/consumer/consumer_server_test.exs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/lib/coney/consumer/consumer_server_test.exs b/test/lib/coney/consumer/consumer_server_test.exs index 36029eb..1449a3c 100644 --- a/test/lib/coney/consumer/consumer_server_test.exs +++ b/test/lib/coney/consumer/consumer_server_test.exs @@ -58,5 +58,17 @@ defmodule ConsumerServerTest do assert {:noreply, new_state} = ConsumerServer.handle_info(down_msg, state) assert new_state[:tasks] |> Map.equal?(Map.new()) end + + test "demonitors a task and rejects message if it terminates abruptly", %{state: state} do + task_ref = :erlang.make_ref() + + state = put_in(state, [:tasks, task_ref], 1) + + down_msg = {:DOWN, task_ref, :dont_care, :dont_care, :error} + + assert {:noreply, new_state} = ConsumerServer.handle_info(down_msg, state) + + assert new_state[:tasks] |> Map.equal?(Map.new()) + end end end From 5052588b817857ccf50cf28abf93fb4e3daf0094 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Mon, 5 Aug 2024 15:12:33 +0300 Subject: [PATCH 09/27] tests for ConnectionServer init --- test/lib/coney/connection_server_test.exs | 43 +++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 test/lib/coney/connection_server_test.exs diff --git a/test/lib/coney/connection_server_test.exs b/test/lib/coney/connection_server_test.exs new file mode 100644 index 0000000..e8749f7 --- /dev/null +++ b/test/lib/coney/connection_server_test.exs @@ -0,0 +1,43 @@ +defmodule Coney.ConnectionServerTest do + use ExUnit.Case + + alias Coney.ConnectionServer + alias Coney.RabbitConnection + + describe "init/1" do + setup do + settings = %{url: "https://example.com", timeout: 1_000} + topology = Map.new() + %{init_args: %{adapter: RabbitConnection, settings: settings, topology: topology}} + end + + test "starts with default settings", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + + assert {:ok, state} = ConnectionServer.init([adapter, settings, topology]) + + assert state.channels |> Map.equal?(Map.new()) + assert state.adapter == adapter + assert state.settings == settings + assert state.topology == topology + end + + test "sends itself an after_init message", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + + assert {:ok, _state} = ConnectionServer.init([adapter, settings, topology]) + + assert_receive :after_init + end + + test "registers itself in the connection registry", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + + assert {:ok, _state} = ConnectionServer.init([adapter, settings, topology]) + + status = Coney.HealthCheck.ConnectionRegistry.status() |> Map.new() + + assert Map.get(status, self(), :connected) + end + end +end From c065b103c83cec8826ef31e10d7dde48020e3581 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Mon, 5 Aug 2024 16:15:21 +0300 Subject: [PATCH 10/27] tests for ConnectionServer --- test/lib/coney/connection_server_test.exs | 83 +++++++++++++++++++++-- 1 file changed, 77 insertions(+), 6 deletions(-) diff --git a/test/lib/coney/connection_server_test.exs b/test/lib/coney/connection_server_test.exs index e8749f7..c1583c0 100644 --- a/test/lib/coney/connection_server_test.exs +++ b/test/lib/coney/connection_server_test.exs @@ -4,13 +4,13 @@ defmodule Coney.ConnectionServerTest do alias Coney.ConnectionServer alias Coney.RabbitConnection - describe "init/1" do - setup do - settings = %{url: "https://example.com", timeout: 1_000} - topology = Map.new() - %{init_args: %{adapter: RabbitConnection, settings: settings, topology: topology}} - end + setup do + settings = %{url: "amqp://guest:guest@localhost:5672", timeout: 1_000} + topology = Map.new() + %{init_args: %{adapter: RabbitConnection, settings: settings, topology: topology}} + end + describe "init/1" do test "starts with default settings", %{init_args: init_args} do %{settings: settings, adapter: adapter, topology: topology} = init_args @@ -40,4 +40,75 @@ defmodule Coney.ConnectionServerTest do assert Map.get(status, self(), :connected) end end + + describe "after_init/1" do + test "sets the connection in the state", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + assert {:ok, state} = ConnectionServer.init([adapter, settings, topology]) + + assert is_nil(state.amqp_conn) + + assert {:noreply, new_state} = ConnectionServer.handle_info(:after_init, state) + + refute is_nil(new_state.amqp_conn) + end + end + + describe "handle_info/2" do + test "reconnects channels when receives a connection lost message", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + # Init + assert {:ok, state} = ConnectionServer.init([adapter, settings, topology]) + + # Open connection + assert {:noreply, state} = ConnectionServer.handle_info(:after_init, state) + + # Subscribe a channel + assert {:reply, channel_ref, connected_state} = + ConnectionServer.handle_call( + {:subscribe, FakeConsumer}, + {self(), :erlang.make_ref()}, + state + ) + + channel_info = Map.get(connected_state.channels, channel_ref) + + # Connection lost + down_msg = {:DOWN, :erlang.make_ref(), :process, self(), :connection_lost} + + assert {:noreply, reconnect_state} = ConnectionServer.handle_info(down_msg, connected_state) + + new_channel_info = Map.get(reconnect_state.channels, channel_ref) + + {_pid, consumer, old_channel} = channel_info + {_other_pid, ^consumer, new_channel} = new_channel_info + + refute old_channel == new_channel + end + end + + describe "handle_call/3" do + test "subscribes a consumer and returns a channel reference", %{init_args: init_args} do + %{settings: settings, adapter: adapter, topology: topology} = init_args + # Init + assert {:ok, state} = ConnectionServer.init([adapter, settings, topology]) + + # Open connection + assert {:noreply, state} = ConnectionServer.handle_info(:after_init, state) + + # Subscribe a channel + assert {:reply, channel_ref, new_state} = + ConnectionServer.handle_call( + {:subscribe, FakeConsumer}, + {self(), :erlang.make_ref()}, + state + ) + + assert is_reference(channel_ref) + + pid = self() + + assert {^pid, FakeConsumer, _} = Map.get(new_state.channels, channel_ref) + end + end end From 8bc5ddd444c9d0834ac898a0fe9f3e2fbc5c8e52 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Mon, 5 Aug 2024 16:32:18 +0300 Subject: [PATCH 11/27] remove unused module --- lib/coney/fake_connection.ex | 33 --------------------------------- 1 file changed, 33 deletions(-) delete mode 100644 lib/coney/fake_connection.ex diff --git a/lib/coney/fake_connection.ex b/lib/coney/fake_connection.ex deleted file mode 100644 index 10f5d22..0000000 --- a/lib/coney/fake_connection.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Coney.FakeConnection do - def open(_) do - :conn - end - - def create_channel(_) do - :chan - end - - def subscribe(_, _, _) do - {:ok, :subscribed} - end - - def respond_to(_, _) do - nil - end - - def publish(_, _, _, _) do - :published - end - - def confirm(_, _) do - :confirmed - end - - def reject(_, _, _) do - :rejected - end - - def init_topology(_, _) do - :ok - end -end From cbe2dbe007c1a6b2cfe5736d70a13ceb0db8cacc Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Tue, 6 Aug 2024 11:35:21 +0300 Subject: [PATCH 12/27] rabbitmq config --- compose.yaml | 5 ++++- rabbitmq/definitions.json | 31 +++++++++++++++++++++++++++++++ rabbitmq/rabbitmq.conf | 6 ++++++ 3 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 rabbitmq/definitions.json create mode 100644 rabbitmq/rabbitmq.conf diff --git a/compose.yaml b/compose.yaml index ce57695..7aa9d40 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,5 +1,8 @@ services: rabbitmq: - image: "rabbitmq:alpine" + image: "rabbitmq:3.8.34-management" ports: - "5672:5672" + volumes: + - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf + - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json \ No newline at end of file diff --git a/rabbitmq/definitions.json b/rabbitmq/definitions.json new file mode 100644 index 0000000..b240c16 --- /dev/null +++ b/rabbitmq/definitions.json @@ -0,0 +1,31 @@ +{ + "rabbit_version": "3.12", + "vhosts": [ + { + "name": "/" + } + ], + "permissions": [ + { + "user": "guest", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + } + ], + "topic_permissions": [], + "parameters": [], + "policies": [], + "queues": [ + { + "name": "queue", + "vhost": "/", + "durable": true, + "auto_delete": true, + "arguments": {} + } + ], + "exchanges": [], + "bindings": [] +} \ No newline at end of file diff --git a/rabbitmq/rabbitmq.conf b/rabbitmq/rabbitmq.conf new file mode 100644 index 0000000..41244ea --- /dev/null +++ b/rabbitmq/rabbitmq.conf @@ -0,0 +1,6 @@ +loopback_users = none +listeners.tcp.default = 5672 +default_pass = guest +default_user = guest +hipe_compile = false +management.load_definitions = /etc/rabbitmq/definitions.json From e4338aa3ad090e305f0be170ebe10df9b2a2ef1a Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Tue, 6 Aug 2024 13:02:15 +0300 Subject: [PATCH 13/27] do not autodelete queue --- rabbitmq/definitions.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rabbitmq/definitions.json b/rabbitmq/definitions.json index b240c16..0a31c6c 100644 --- a/rabbitmq/definitions.json +++ b/rabbitmq/definitions.json @@ -22,7 +22,7 @@ "name": "queue", "vhost": "/", "durable": true, - "auto_delete": true, + "auto_delete": false, "arguments": {} } ], From abbe3b51e88cf40f1722ff99ccf2b733b2c9661d Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Tue, 6 Aug 2024 13:19:13 +0300 Subject: [PATCH 14/27] add admin port to compose file and a test exchange --- compose.yaml | 1 + rabbitmq/definitions.json | 12 +++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/compose.yaml b/compose.yaml index 7aa9d40..637cb2d 100644 --- a/compose.yaml +++ b/compose.yaml @@ -3,6 +3,7 @@ services: image: "rabbitmq:3.8.34-management" ports: - "5672:5672" + - "15672:15672" volumes: - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json \ No newline at end of file diff --git a/rabbitmq/definitions.json b/rabbitmq/definitions.json index 0a31c6c..7882612 100644 --- a/rabbitmq/definitions.json +++ b/rabbitmq/definitions.json @@ -26,6 +26,16 @@ "arguments": {} } ], - "exchanges": [], + "exchanges": [ + { + "name": "exchange", + "vhost": "/", + "type": "topic", + "durable": true, + "auto_delete": false, + "internal": false, + "arguments": {} + } + ], "bindings": [] } \ No newline at end of file From 128b451c6cd5d5ad4fac3f168b2a5a109a581eea Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Tue, 6 Aug 2024 13:24:41 +0300 Subject: [PATCH 15/27] add volumes to Github Actions --- .github/workflows/ci.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c01a49c..f34e50e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -20,6 +20,9 @@ jobs: image: "rabbitmq:alpine" ports: - "5672:5672" + volumes: + - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf + - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json runs-on: ubuntu-latest From 1fc6fe9c98222993a9393ea7e4afe7617c669036 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Tue, 6 Aug 2024 13:36:46 +0300 Subject: [PATCH 16/27] fix volumes definition --- .github/workflows/ci.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f34e50e..b453fda 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -20,9 +20,9 @@ jobs: image: "rabbitmq:alpine" ports: - "5672:5672" - volumes: - - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json + volumes: + - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf + - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json runs-on: ubuntu-latest From 28b413f9dcbe12c7012f3fbfbf28d7a9f018540d Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Tue, 6 Aug 2024 14:14:16 +0300 Subject: [PATCH 17/27] mount volumes? --- .github/workflows/ci.yaml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b453fda..0d7bac0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -21,8 +21,9 @@ jobs: ports: - "5672:5672" volumes: - - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json + - ${{ github.workspace }}/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf + - ${{ github.workspace }}/rabbitmq/definitions.json:/etc/rabbitmq/definitions.json + options: --name rabbitmq runs-on: ubuntu-latest @@ -42,6 +43,12 @@ jobs: - name: Checkout code uses: actions/checkout@v4 + - name: Restart RabbitMQ + # Restart RabbitMQ after volumes have been checked out + uses: docker://docker + with: + args: docker restart rabbitmq + - name: Cache deps id: cache-deps uses: actions/cache@v4 From c4b83d76fa4f7ce8ad9fd844e0b172ecbb42eb7e Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Wed, 7 Aug 2024 10:20:23 +0300 Subject: [PATCH 18/27] change volumes for topology in config.exs --- .github/workflows/ci.yaml | 11 ----------- compose.yaml | 6 +----- config/test.exs | 13 +++++++++++++ rabbitmq/definitions.json | 41 --------------------------------------- rabbitmq/rabbitmq.conf | 6 ------ 5 files changed, 14 insertions(+), 63 deletions(-) delete mode 100644 rabbitmq/definitions.json delete mode 100644 rabbitmq/rabbitmq.conf diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 0d7bac0..1bbbefe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -20,11 +20,6 @@ jobs: image: "rabbitmq:alpine" ports: - "5672:5672" - volumes: - - ${{ github.workspace }}/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - - ${{ github.workspace }}/rabbitmq/definitions.json:/etc/rabbitmq/definitions.json - options: --name rabbitmq - runs-on: ubuntu-latest name: Test on OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}} @@ -43,12 +38,6 @@ jobs: - name: Checkout code uses: actions/checkout@v4 - - name: Restart RabbitMQ - # Restart RabbitMQ after volumes have been checked out - uses: docker://docker - with: - args: docker restart rabbitmq - - name: Cache deps id: cache-deps uses: actions/cache@v4 diff --git a/compose.yaml b/compose.yaml index 637cb2d..ce57695 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,9 +1,5 @@ services: rabbitmq: - image: "rabbitmq:3.8.34-management" + image: "rabbitmq:alpine" ports: - "5672:5672" - - "15672:15672" - volumes: - - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json \ No newline at end of file diff --git a/config/test.exs b/config/test.exs index 162f60b..7dacb07 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,6 +1,19 @@ import Config config :coney, + topology: %{ + exchanges: [{:topic, "exchange", durable: false}], + queues: %{ + "queue" => %{ + options: [ + durable: false + ], + bindings: [ + [exchange: "exchange", options: [routing_key: "queue"]] + ] + } + } + }, adapter: Coney.RabbitConnection, pool_size: 1, auto_start: true, diff --git a/rabbitmq/definitions.json b/rabbitmq/definitions.json deleted file mode 100644 index 7882612..0000000 --- a/rabbitmq/definitions.json +++ /dev/null @@ -1,41 +0,0 @@ -{ - "rabbit_version": "3.12", - "vhosts": [ - { - "name": "/" - } - ], - "permissions": [ - { - "user": "guest", - "vhost": "/", - "configure": ".*", - "write": ".*", - "read": ".*" - } - ], - "topic_permissions": [], - "parameters": [], - "policies": [], - "queues": [ - { - "name": "queue", - "vhost": "/", - "durable": true, - "auto_delete": false, - "arguments": {} - } - ], - "exchanges": [ - { - "name": "exchange", - "vhost": "/", - "type": "topic", - "durable": true, - "auto_delete": false, - "internal": false, - "arguments": {} - } - ], - "bindings": [] -} \ No newline at end of file diff --git a/rabbitmq/rabbitmq.conf b/rabbitmq/rabbitmq.conf deleted file mode 100644 index 41244ea..0000000 --- a/rabbitmq/rabbitmq.conf +++ /dev/null @@ -1,6 +0,0 @@ -loopback_users = none -listeners.tcp.default = 5672 -default_pass = guest -default_user = guest -hipe_compile = false -management.load_definitions = /etc/rabbitmq/definitions.json From 078338ddc670f3fcfef7f3244b1ca236f9668bd5 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Wed, 7 Aug 2024 11:29:41 +0300 Subject: [PATCH 19/27] integration test --- compose.yaml | 3 ++- config/config.exs | 15 ++++++++++++++- test/lib/coney/coney_test.exs | 16 ++++++++++++++++ test/support/fake_consumer.ex | 1 + 4 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 test/lib/coney/coney_test.exs diff --git a/compose.yaml b/compose.yaml index ce57695..12ce23b 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,5 +1,6 @@ services: rabbitmq: - image: "rabbitmq:alpine" + image: "rabbitmq:3.12-management-alpine" ports: - "5672:5672" + - "15672:15672" diff --git a/config/config.exs b/config/config.exs index 433d112..83f7fa2 100644 --- a/config/config.exs +++ b/config/config.exs @@ -10,7 +10,20 @@ config :coney, }, workers: [ FakeConsumer - ] + ], + topology: %{ + exchanges: [{:topic, "exchange", durable: false}], + queues: %{ + "queue" => %{ + options: [ + durable: false + ], + bindings: [ + [exchange: "exchange", options: [routing_key: "queue"]] + ] + } + } + } config :logger, level: :info diff --git a/test/lib/coney/coney_test.exs b/test/lib/coney/coney_test.exs new file mode 100644 index 0000000..727078e --- /dev/null +++ b/test/lib/coney/coney_test.exs @@ -0,0 +1,16 @@ +defmodule Coney.ConeyTest do + use ExUnit.Case + use AMQP + + describe "publish/2" do + test "consumes a message" do + {:ok, connection} = AMQP.Connection.open(Application.get_env(:coney, :settings)[:url]) + + {:ok, channel} = AMQP.Channel.open(connection) + + assert :ok == AMQP.Basic.publish(channel, "exchange", "queue", "message", mandatory: true) + + refute 0 == AMQP.Queue.consumer_count(channel, "queue") + end + end +end diff --git a/test/support/fake_consumer.ex b/test/support/fake_consumer.ex index c910692..6e477d4 100644 --- a/test/support/fake_consumer.ex +++ b/test/support/fake_consumer.ex @@ -18,6 +18,7 @@ defmodule FakeConsumer do :reject -> :reject :reply -> {:reply, :data} :exception -> raise "Exception happen" + _other -> :ok end end From 573946cfd5e57e28eb2494824e31a4ba0766be23 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Wed, 7 Aug 2024 13:54:26 +0300 Subject: [PATCH 20/27] docs --- README.md | 10 ++++++++++ lib/coney/application_supervisor.ex | 6 ++++++ lib/coney/connection_server.ex | 14 ++++++++++++++ lib/coney/consumer_executor.ex | 4 ++++ lib/coney/consumer_server.ex | 5 +++++ lib/coney/consumer_supervisor.ex | 3 +++ 6 files changed, 42 insertions(+) diff --git a/README.md b/README.md index df0d5aa..9035970 100644 --- a/README.md +++ b/README.md @@ -288,6 +288,16 @@ Bug reports and pull requests are welcome on GitHub at https://github.com/coinga 1. Start the RabbitMQ instance via `docker compose up`. 2. Run `mix test`. +## Architecture +```mermaid + graph TD; + A[ApplicationSupervisor - Supervisor] --> B[ConsumerSupervisor - Supervisor]; + A --> C[ConnectionServer - GenServer]; + B -- supervises many --> D[ConsumerServer - GenServer]; + D -- monitors --> E[ConsumerExecutor]; + E -- sends messages to --> C; +``` + ## License The library is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT). diff --git a/lib/coney/application_supervisor.ex b/lib/coney/application_supervisor.ex index e958236..88894d3 100644 --- a/lib/coney/application_supervisor.ex +++ b/lib/coney/application_supervisor.ex @@ -1,4 +1,10 @@ defmodule Coney.ApplicationSupervisor do + @moduledoc """ + Supervisor responsible of `ConnectionServer` and `ConsumerSupervisor`. + + Main entry point of the application. + """ + use Supervisor alias Coney.{ConsumerSupervisor, ConnectionServer} diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 4e9de98..e00d674 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -1,4 +1,18 @@ defmodule Coney.ConnectionServer do + @moduledoc """ + Handles connections between `ConsumerServer` and the RabbitMQ instance(s). + + This module abstracts away the connection status of RabbitMQ. Instead, when + a new `ConsumerServer` is started, it requests `ConnectionServer` to open a channel. + ConnectionServer opens a real amqp channel, keeps a reference to it in its state and + returns an erlang reference to `ConsumerServer`. When `ConsumerServer` replies (ack/reject) + an incoming RabbitMQ message it sends the erlang reference to ConnectionServer and then + ConnectionServer looks up the real channel. + + ConnectionServer can handle RabbitMQ disconnects independently of ConsumerServer. + When connection is lost and then regained, ConnectionServer simply updates its + map of {erlang_ref, AMQP.Connection}, ConsumerServer keeps using the same erlang_ref. + """ use GenServer require Logger diff --git a/lib/coney/consumer_executor.ex b/lib/coney/consumer_executor.ex index 2daba08..841cd16 100644 --- a/lib/coney/consumer_executor.ex +++ b/lib/coney/consumer_executor.ex @@ -1,4 +1,8 @@ defmodule Coney.ConsumerExecutor do + @moduledoc """ + Module responsible for processing a rabbit message and send the response + back to `ConnectionServer`. Started (and monitored) by `ConsumerServer`. + """ require Logger alias Coney.{ConnectionServer, ExecutionTask} diff --git a/lib/coney/consumer_server.ex b/lib/coney/consumer_server.ex index 147d29e..0c41306 100644 --- a/lib/coney/consumer_server.ex +++ b/lib/coney/consumer_server.ex @@ -1,4 +1,9 @@ defmodule Coney.ConsumerServer do + @moduledoc """ + GenServer for handling RabbitMQ messages. Spawns and monitors one task per message + and forwards the response to `ConnectionServer`. + """ + use GenServer alias Coney.{ConnectionServer, ConsumerExecutor, ExecutionTask} diff --git a/lib/coney/consumer_supervisor.ex b/lib/coney/consumer_supervisor.ex index 4e406c2..5d99946 100644 --- a/lib/coney/consumer_supervisor.ex +++ b/lib/coney/consumer_supervisor.ex @@ -1,4 +1,7 @@ defmodule Coney.ConsumerSupervisor do + @moduledoc """ + Supervisor for all ConsumerServer of the application. + """ use Supervisor alias Coney.ConsumerServer From 2225be49499eb634db99be53534cc40c96552898 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Wed, 7 Aug 2024 16:03:27 +0300 Subject: [PATCH 21/27] use handle_continue --- lib/coney/connection_server.ex | 13 +++++++++---- test/lib/coney/connection_server_test.exs | 22 +++++++--------------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index e00d674..5fe5df7 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -29,11 +29,15 @@ defmodule Coney.ConnectionServer do @impl GenServer def init([adapter, settings, topology]) do - send(self(), :after_init) - ConnectionRegistry.associate(self()) - {:ok, %State{adapter: adapter, settings: settings, topology: topology, channels: Map.new()}} + {:ok, %State{adapter: adapter, settings: settings, topology: topology, channels: Map.new()}, + {:continue, nil}} + end + + @impl true + def handle_continue(_continue_arg, state) do + {:noreply, rabbitmq_connect(state)} end def confirm(channel_ref, tag) do @@ -89,7 +93,8 @@ defmodule Coney.ConnectionServer do {:subscribe, consumer}, {consumer_pid, _tag}, %State{amqp_conn: conn, adapter: adapter, channels: channels} = state - ) do + ) + when not is_nil(conn) do channel = adapter.create_channel(conn) channel_ref = :erlang.make_ref() diff --git a/test/lib/coney/connection_server_test.exs b/test/lib/coney/connection_server_test.exs index c1583c0..8887670 100644 --- a/test/lib/coney/connection_server_test.exs +++ b/test/lib/coney/connection_server_test.exs @@ -14,7 +14,7 @@ defmodule Coney.ConnectionServerTest do test "starts with default settings", %{init_args: init_args} do %{settings: settings, adapter: adapter, topology: topology} = init_args - assert {:ok, state} = ConnectionServer.init([adapter, settings, topology]) + assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) assert state.channels |> Map.equal?(Map.new()) assert state.adapter == adapter @@ -22,18 +22,10 @@ defmodule Coney.ConnectionServerTest do assert state.topology == topology end - test "sends itself an after_init message", %{init_args: init_args} do - %{settings: settings, adapter: adapter, topology: topology} = init_args - - assert {:ok, _state} = ConnectionServer.init([adapter, settings, topology]) - - assert_receive :after_init - end - test "registers itself in the connection registry", %{init_args: init_args} do %{settings: settings, adapter: adapter, topology: topology} = init_args - assert {:ok, _state} = ConnectionServer.init([adapter, settings, topology]) + assert {:ok, _state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) status = Coney.HealthCheck.ConnectionRegistry.status() |> Map.new() @@ -41,14 +33,14 @@ defmodule Coney.ConnectionServerTest do end end - describe "after_init/1" do + describe "handle_continue/2" do test "sets the connection in the state", %{init_args: init_args} do %{settings: settings, adapter: adapter, topology: topology} = init_args - assert {:ok, state} = ConnectionServer.init([adapter, settings, topology]) + assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) assert is_nil(state.amqp_conn) - assert {:noreply, new_state} = ConnectionServer.handle_info(:after_init, state) + assert {:noreply, new_state} = ConnectionServer.handle_continue(nil, state) refute is_nil(new_state.amqp_conn) end @@ -58,7 +50,7 @@ defmodule Coney.ConnectionServerTest do test "reconnects channels when receives a connection lost message", %{init_args: init_args} do %{settings: settings, adapter: adapter, topology: topology} = init_args # Init - assert {:ok, state} = ConnectionServer.init([adapter, settings, topology]) + assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) # Open connection assert {:noreply, state} = ConnectionServer.handle_info(:after_init, state) @@ -91,7 +83,7 @@ defmodule Coney.ConnectionServerTest do test "subscribes a consumer and returns a channel reference", %{init_args: init_args} do %{settings: settings, adapter: adapter, topology: topology} = init_args # Init - assert {:ok, state} = ConnectionServer.init([adapter, settings, topology]) + assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) # Open connection assert {:noreply, state} = ConnectionServer.handle_info(:after_init, state) From 32ade9ac8e4f7ce23957861183a230866b38dd89 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Thu, 8 Aug 2024 09:04:34 +0300 Subject: [PATCH 22/27] close channels in terminate and add logging --- lib/coney/connection_server.ex | 18 +++++++++++++----- lib/coney/consumer_server.ex | 2 ++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 5fe5df7..e95df6e 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -72,7 +72,8 @@ defmodule Coney.ConnectionServer do end @impl GenServer - def terminate(_reason, %State{amqp_conn: conn, adapter: adapter} = _state) do + def terminate(_reason, %State{amqp_conn: conn, adapter: adapter, channels: channels} = _state) do + close_channels(channels, adapter) :ok = adapter.close(conn) ConnectionRegistry.terminated(self()) end @@ -93,8 +94,7 @@ defmodule Coney.ConnectionServer do {:subscribe, consumer}, {consumer_pid, _tag}, %State{amqp_conn: conn, adapter: adapter, channels: channels} = state - ) - when not is_nil(conn) do + ) do channel = adapter.create_channel(conn) channel_ref = :erlang.make_ref() @@ -156,14 +156,22 @@ defmodule Coney.ConnectionServer do defp update_channels(%State{amqp_conn: conn, adapter: adapter, channels: channels} = state) do new_channels = - Enum.map(channels, fn {channel_ref, {consumer_pid, consumer, _dead_channel}} -> + Map.new(channels, fn {channel_ref, {consumer_pid, consumer, _dead_channel}} -> new_channel = adapter.create_channel(conn) adapter.subscribe(new_channel, consumer_pid, consumer) + Logger.info("[Coney] - Connection re-restablished for #{inspect(consumer)}") + {channel_ref, {consumer_pid, consumer, new_channel}} end) - |> Map.new() %State{state | channels: new_channels} end + + defp close_channels(channels, adapter) do + Enum.each(channels, fn {_channel_ref, {consumer_pid, consumer, channel}} -> + Logger.info("[Coney] - Closing channel for #{inspect(consumer)} (#{consumer_pid})") + adapter.close_channel(channel) + end) + end end diff --git a/lib/coney/consumer_server.ex b/lib/coney/consumer_server.ex index 0c41306..4654731 100644 --- a/lib/coney/consumer_server.ex +++ b/lib/coney/consumer_server.ex @@ -18,6 +18,8 @@ defmodule Coney.ConsumerServer do def init([consumer]) do chan = ConnectionServer.subscribe(consumer) + Logger.info("[Coney] - Started consumer #{inspect(consumer)}") + {:ok, %{consumer: consumer, chan: chan, tasks: %{}}} end From 3b6174f702f4879ae3e66ce9efc36bc769c48eaf Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Thu, 8 Aug 2024 09:40:47 +0300 Subject: [PATCH 23/27] missing link in architecture README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 9035970..1261144 100644 --- a/README.md +++ b/README.md @@ -296,6 +296,7 @@ Bug reports and pull requests are welcome on GitHub at https://github.com/coinga B -- supervises many --> D[ConsumerServer - GenServer]; D -- monitors --> E[ConsumerExecutor]; E -- sends messages to --> C; + D -- opens AMQP conns via --> C; ``` ## License From 74c90123a7c71c7d5e6ad2dcccdc67bf519e1786 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Thu, 8 Aug 2024 13:58:02 +0300 Subject: [PATCH 24/27] add specs --- compose.yaml | 2 ++ lib/coney/connection_server.ex | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/compose.yaml b/compose.yaml index 12ce23b..121fd5a 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,5 +1,7 @@ services: rabbitmq: + # "management" version is not required, but it makes + # manual testing easier image: "rabbitmq:3.12-management-alpine" ports: - "5672:5672" diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index e95df6e..91e0629 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -40,22 +40,27 @@ defmodule Coney.ConnectionServer do {:noreply, rabbitmq_connect(state)} end + @spec confirm(reference(), any()) :: :confirmed def confirm(channel_ref, tag) do GenServer.call(__MODULE__, {:confirm, channel_ref, tag}) end + @spec reject(reference(), any(), boolean()) :: :rejected def reject(channel_ref, tag, requeue) do GenServer.call(__MODULE__, {:reject, channel_ref, tag, requeue}) end + @spec publish(String.t(), any()) :: :published def publish(exchange_name, message) do GenServer.call(__MODULE__, {:publish, exchange_name, message}) end + @spec publish(String.t(), String.t(), any()) :: :published def publish(exchange_name, routing_key, message) do GenServer.call(__MODULE__, {:publish, exchange_name, routing_key, message}) end + @spec subscribe(any()) :: reference() def subscribe(consumer) do GenServer.call(__MODULE__, {:subscribe, consumer}) end From 1a0d04e78a6c3f3da9ba1a31a9036cef580af785 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Thu, 8 Aug 2024 15:54:01 +0300 Subject: [PATCH 25/27] feedback, remove unused function and redundant test --- lib/coney/connection_server.ex | 7 ++----- lib/coney/rabbit_connection.ex | 2 -- test/lib/coney/connection_server_test.exs | 4 ++-- test/lib/coney/consumer/consumer_server_test.exs | 7 ------- 4 files changed, 4 insertions(+), 16 deletions(-) diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 91e0629..0050819 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -66,10 +66,6 @@ defmodule Coney.ConnectionServer do end @impl GenServer - def handle_info(:after_init, state) do - {:noreply, rabbitmq_connect(state)} - end - def handle_info({:DOWN, _, :process, _pid, reason}, state) do ConnectionRegistry.disconnected(self()) Logger.error("#{__MODULE__} (#{inspect(self())}) connection lost: #{inspect(reason)}") @@ -146,6 +142,7 @@ defmodule Coney.ConnectionServer do } = state ) do conn = adapter.open(settings) + Process.monitor(conn.pid) adapter.init_topology(conn, topology) ConnectionRegistry.connected(self()) @@ -175,7 +172,7 @@ defmodule Coney.ConnectionServer do defp close_channels(channels, adapter) do Enum.each(channels, fn {_channel_ref, {consumer_pid, consumer, channel}} -> - Logger.info("[Coney] - Closing channel for #{inspect(consumer)} (#{consumer_pid})") + Logger.info("[Coney] - Closing channel for #{inspect(consumer)} (#{inspect(consumer_pid)})") adapter.close_channel(channel) end) end diff --git a/lib/coney/rabbit_connection.ex b/lib/coney/rabbit_connection.ex index 24e7496..6090e26 100644 --- a/lib/coney/rabbit_connection.ex +++ b/lib/coney/rabbit_connection.ex @@ -7,8 +7,6 @@ defmodule Coney.RabbitConnection do case connect(url) do {:ok, conn} -> Logger.debug("#{__MODULE__} (#{inspect(self())}) connected to #{url}") - - Process.monitor(conn.pid) conn {:error, error} -> diff --git a/test/lib/coney/connection_server_test.exs b/test/lib/coney/connection_server_test.exs index 8887670..d6e5b91 100644 --- a/test/lib/coney/connection_server_test.exs +++ b/test/lib/coney/connection_server_test.exs @@ -53,7 +53,7 @@ defmodule Coney.ConnectionServerTest do assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) # Open connection - assert {:noreply, state} = ConnectionServer.handle_info(:after_init, state) + assert {:noreply, state} = ConnectionServer.handle_continue(nil, state) # Subscribe a channel assert {:reply, channel_ref, connected_state} = @@ -86,7 +86,7 @@ defmodule Coney.ConnectionServerTest do assert {:ok, state, {:continue, nil}} = ConnectionServer.init([adapter, settings, topology]) # Open connection - assert {:noreply, state} = ConnectionServer.handle_info(:after_init, state) + assert {:noreply, state} = ConnectionServer.handle_continue(nil, state) # Subscribe a channel assert {:reply, channel_ref, new_state} = diff --git a/test/lib/coney/consumer/consumer_server_test.exs b/test/lib/coney/consumer/consumer_server_test.exs index 1449a3c..d1d7f68 100644 --- a/test/lib/coney/consumer/consumer_server_test.exs +++ b/test/lib/coney/consumer/consumer_server_test.exs @@ -10,13 +10,6 @@ defmodule ConsumerServerTest do ] end - test "initial state", %{args: args, state: state} do - assert {:ok, initial_state} = ConsumerServer.init(args) - assert initial_state.consumer == state.consumer - assert initial_state.tasks |> Map.equal?(Map.new()) - assert initial_state.chan |> is_reference() - end - test ":basic_consume_ok", %{state: state} do assert {:noreply, ^state} = ConsumerServer.handle_info({:basic_consume_ok, %{consumer_tag: nil}}, state) From 3175c3d73808d1e0950e0758af365c47fd712b66 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Thu, 8 Aug 2024 16:00:47 +0300 Subject: [PATCH 26/27] fix flaky tests --- test/lib/coney/consumer/consumer_server_test.exs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/test/lib/coney/consumer/consumer_server_test.exs b/test/lib/coney/consumer/consumer_server_test.exs index d1d7f68..f7328e7 100644 --- a/test/lib/coney/consumer/consumer_server_test.exs +++ b/test/lib/coney/consumer/consumer_server_test.exs @@ -4,12 +4,21 @@ defmodule ConsumerServerTest do alias Coney.ConsumerServer setup do + ref = Coney.ConnectionServer.subscribe(FakeConsumer) + [ args: [FakeConsumer], - state: %{consumer: FakeConsumer, tasks: %{}, chan: :erlang.make_ref()} + state: %{consumer: FakeConsumer, tasks: %{}, chan: ref} ] end + test "initial state", %{args: args, state: state} do + assert {:ok, initial_state} = ConsumerServer.init(args) + assert initial_state.consumer == state.consumer + assert initial_state.tasks |> Map.equal?(Map.new()) + assert initial_state.chan |> is_reference() + end + test ":basic_consume_ok", %{state: state} do assert {:noreply, ^state} = ConsumerServer.handle_info({:basic_consume_ok, %{consumer_tag: nil}}, state) From 3507b9beaf1d7cea1f1c035e4939353bbd3f8c93 Mon Sep 17 00:00:00 2001 From: ignaciogoldchluk-yolo Date: Thu, 8 Aug 2024 16:07:44 +0300 Subject: [PATCH 27/27] reduce logger noise --- lib/coney/connection_server.ex | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/coney/connection_server.ex b/lib/coney/connection_server.ex index 0050819..32188c6 100644 --- a/lib/coney/connection_server.ex +++ b/lib/coney/connection_server.ex @@ -74,6 +74,7 @@ defmodule Coney.ConnectionServer do @impl GenServer def terminate(_reason, %State{amqp_conn: conn, adapter: adapter, channels: channels} = _state) do + Logger.info("[Coney] - Terminating #{inspect(conn)}") close_channels(channels, adapter) :ok = adapter.close(conn) ConnectionRegistry.terminated(self()) @@ -162,18 +163,19 @@ defmodule Coney.ConnectionServer do new_channel = adapter.create_channel(conn) adapter.subscribe(new_channel, consumer_pid, consumer) - Logger.info("[Coney] - Connection re-restablished for #{inspect(consumer)}") - {channel_ref, {consumer_pid, consumer, new_channel}} end) + Logger.info("[Coney] - Connection re-restablished for #{inspect(conn)}") + %State{state | channels: new_channels} end defp close_channels(channels, adapter) do - Enum.each(channels, fn {_channel_ref, {consumer_pid, consumer, channel}} -> - Logger.info("[Coney] - Closing channel for #{inspect(consumer)} (#{inspect(consumer_pid)})") + Enum.each(channels, fn {_channel_ref, {_consumer_pid, _consumer, channel}} -> adapter.close_channel(channel) end) + + Logger.info("[Coney] - Closed #{map_size(channels)} channels") end end