Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #22

Merged
merged 27 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f3eea2a
add missing @impl behaviours
ignaciogoldchluk-yolo Aug 2, 2024
d2752d2
replace channel with {channel_ref, channel} map
ignaciogoldchluk-yolo Aug 2, 2024
40241f4
rename variables
ignaciogoldchluk-yolo Aug 2, 2024
b706634
rearchitecture (untested)
ignaciogoldchluk-yolo Aug 5, 2024
6f86972
correct server initialization order
ignaciogoldchluk-yolo Aug 5, 2024
9f1f061
remove consumers from ConnectionServer init
ignaciogoldchluk-yolo Aug 5, 2024
9857aec
test for handle_info normal termination
ignaciogoldchluk-yolo Aug 5, 2024
399a44a
test for handle_info unexpected termination
ignaciogoldchluk-yolo Aug 5, 2024
5052588
tests for ConnectionServer init
ignaciogoldchluk-yolo Aug 5, 2024
c065b10
tests for ConnectionServer
ignaciogoldchluk-yolo Aug 5, 2024
8bc5ddd
remove unused module
ignaciogoldchluk-yolo Aug 5, 2024
cbe2dbe
rabbitmq config
ignaciogoldchluk-yolo Aug 6, 2024
e4338aa
do not autodelete queue
ignaciogoldchluk-yolo Aug 6, 2024
abbe3b5
add admin port to compose file and a test exchange
ignaciogoldchluk-yolo Aug 6, 2024
128b451
add volumes to Github Actions
ignaciogoldchluk-yolo Aug 6, 2024
1fc6fe9
fix volumes definition
ignaciogoldchluk-yolo Aug 6, 2024
28b413f
mount volumes?
ignaciogoldchluk-yolo Aug 6, 2024
c4b83d7
change volumes for topology in config.exs
ignaciogoldchluk-yolo Aug 7, 2024
078338d
integration test
ignaciogoldchluk-yolo Aug 7, 2024
573946c
docs
ignaciogoldchluk-yolo Aug 7, 2024
2225be4
use handle_continue
ignaciogoldchluk-yolo Aug 7, 2024
32ade9a
close channels in terminate and add logging
ignaciogoldchluk-yolo Aug 8, 2024
3b6174f
missing link in architecture README
ignaciogoldchluk-yolo Aug 8, 2024
74c9012
add specs
ignaciogoldchluk-yolo Aug 8, 2024
1a0d04e
feedback, remove unused function and redundant test
ignaciogoldchluk-yolo Aug 8, 2024
3175c3d
fix flaky tests
ignaciogoldchluk-yolo Aug 8, 2024
3507b9b
reduce logger noise
ignaciogoldchluk-yolo Aug 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ jobs:
ports:
- "5672:5672"


runs-on: ubuntu-latest
name: Test on OTP ${{matrix.otp}} / Elixir ${{matrix.elixir}}
strategy:
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,17 @@ Bug reports and pull requests are welcome on GitHub at https://github.com/coinga
1. Start the RabbitMQ instance via `docker compose up`.
2. Run `mix test`.

## Architecture
```mermaid
graph TD;
A[ApplicationSupervisor - Supervisor] --> B[ConsumerSupervisor - Supervisor];
A --> C[ConnectionServer - GenServer];
B -- supervises many --> D[ConsumerServer - GenServer];
D -- monitors --> E[ConsumerExecutor];
E -- sends messages to --> C;
D -- opens AMQP conns via --> C;
```

## License

The library is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).
5 changes: 4 additions & 1 deletion compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
services:
rabbitmq:
image: "rabbitmq:alpine"
# "management" version is not required, but it makes
# manual testing easier
image: "rabbitmq:3.12-management-alpine"
ports:
- "5672:5672"
- "15672:15672"
17 changes: 16 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,22 @@ config :coney,
url: "amqp://guest:guest@localhost",
timeout: 1000
},
workers: []
workers: [
FakeConsumer
],
topology: %{
exchanges: [{:topic, "exchange", durable: false}],
queues: %{
"queue" => %{
options: [
durable: false
],
bindings: [
[exchange: "exchange", options: [routing_key: "queue"]]
]
}
}
}

config :logger, level: :info

Expand Down
17 changes: 16 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import Config

config :coney,
topology: %{
exchanges: [{:topic, "exchange", durable: false}],
queues: %{
"queue" => %{
options: [
durable: false
],
bindings: [
[exchange: "exchange", options: [routing_key: "queue"]]
]
}
}
},
adapter: Coney.RabbitConnection,
pool_size: 1,
auto_start: true,
settings: %{
url: "amqp://guest:guest@localhost:5672",
timeout: 1000
},
workers: []
workers: [
FakeConsumer
]
11 changes: 9 additions & 2 deletions lib/coney/application_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
defmodule Coney.ApplicationSupervisor do
@moduledoc """
Supervisor responsible of `ConnectionServer` and `ConsumerSupervisor`.

Main entry point of the application.
"""

use Supervisor

alias Coney.{ConsumerSupervisor, ConnectionServer}
Expand All @@ -14,12 +20,13 @@ defmodule Coney.ApplicationSupervisor do
}
end

@impl Supervisor
def init([consumers]) do
settings = settings()

children = [
ConsumerSupervisor,
{ConnectionServer, [consumers, settings]}
{ConnectionServer, [settings]},
{ConsumerSupervisor, [consumers]}
]

Supervisor.init(children, strategy: :one_for_one)
Expand Down
127 changes: 98 additions & 29 deletions lib/coney/connection_server.ex
Original file line number Diff line number Diff line change
@@ -1,67 +1,122 @@
defmodule Coney.ConnectionServer do
@moduledoc """
Handles connections between `ConsumerServer` and the RabbitMQ instance(s).

This module abstracts away the connection status of RabbitMQ. Instead, when
a new `ConsumerServer` is started, it requests `ConnectionServer` to open a channel.
ConnectionServer opens a real amqp channel, keeps a reference to it in its state and
returns an erlang reference to `ConsumerServer`. When `ConsumerServer` replies (ack/reject)
an incoming RabbitMQ message it sends the erlang reference to ConnectionServer and then
ConnectionServer looks up the real channel.

ConnectionServer can handle RabbitMQ disconnects independently of ConsumerServer.
When connection is lost and then regained, ConnectionServer simply updates its
map of {erlang_ref, AMQP.Connection}, ConsumerServer keeps using the same erlang_ref.
"""
use GenServer

require Logger

alias Coney.{
ConsumerSupervisor,
HealthCheck.ConnectionRegistry
}
alias Coney.HealthCheck.ConnectionRegistry

defmodule State do
defstruct [:consumers, :adapter, :settings, :amqp_conn, :topology]
defstruct [:adapter, :settings, :amqp_conn, :topology, :channels]
end

def start_link([consumers, [adapter: adapter, settings: settings, topology: topology]]) do
GenServer.start_link(__MODULE__, [consumers, adapter, settings, topology], name: __MODULE__)
def start_link([[adapter: adapter, settings: settings, topology: topology]]) do
GenServer.start_link(__MODULE__, [adapter, settings, topology], name: __MODULE__)
end

def init([consumers, adapter, settings, topology]) do
send(self(), :after_init)

@impl GenServer
def init([adapter, settings, topology]) do
ConnectionRegistry.associate(self())

{:ok, %State{consumers: consumers, adapter: adapter, settings: settings, topology: topology}}
{:ok, %State{adapter: adapter, settings: settings, topology: topology, channels: Map.new()},
{:continue, nil}}
end

@impl true
def handle_continue(_continue_arg, state) do
{:noreply, rabbitmq_connect(state)}
end

def confirm(channel, tag) do
GenServer.call(__MODULE__, {:confirm, channel, tag})
@spec confirm(reference(), any()) :: :confirmed
def confirm(channel_ref, tag) do
ignaciogoldchluk-yolo marked this conversation as resolved.
Show resolved Hide resolved
GenServer.call(__MODULE__, {:confirm, channel_ref, tag})
end

def reject(channel, tag, requeue) do
GenServer.call(__MODULE__, {:reject, channel, tag, requeue})
@spec reject(reference(), any(), boolean()) :: :rejected
def reject(channel_ref, tag, requeue) do
GenServer.call(__MODULE__, {:reject, channel_ref, tag, requeue})
end

@spec publish(String.t(), any()) :: :published
def publish(exchange_name, message) do
GenServer.call(__MODULE__, {:publish, exchange_name, message})
end

@spec publish(String.t(), String.t(), any()) :: :published
def publish(exchange_name, routing_key, message) do
GenServer.call(__MODULE__, {:publish, exchange_name, routing_key, message})
end

@spec subscribe(any()) :: reference()
def subscribe(consumer) do
GenServer.call(__MODULE__, {:subscribe, consumer})
end

@impl GenServer
def handle_info(:after_init, state) do
rabbitmq_connect(state)
{:noreply, rabbitmq_connect(state)}
end

def handle_info({:DOWN, _, :process, _pid, reason}, state) do
ConnectionRegistry.disconnected(self())
Logger.error("#{__MODULE__} (#{inspect(self())}) connection lost: #{inspect(reason)}")
rabbitmq_connect(state)
{:noreply, state |> rabbitmq_connect() |> update_channels()}
end

def terminate(_reason, %State{amqp_conn: conn, adapter: adapter} = _state) do
@impl GenServer
def terminate(_reason, %State{amqp_conn: conn, adapter: adapter, channels: channels} = _state) do
close_channels(channels, adapter)
ignaciogoldchluk-yolo marked this conversation as resolved.
Show resolved Hide resolved
:ok = adapter.close(conn)
ignaciogoldchluk-yolo marked this conversation as resolved.
Show resolved Hide resolved
ConnectionRegistry.terminated(self())
end

def handle_call({:confirm, channel, tag}, _from, %State{adapter: adapter} = state) do
@impl GenServer
def handle_call(
{:confirm, channel_ref, tag},
_from,
%State{adapter: adapter, channels: channels} = state
) do
channel = channel_from_ref(channels, channel_ref)
adapter.confirm(channel, tag)

{:reply, :confirmed, state}
end

def handle_call({:reject, channel, tag, requeue}, _from, %State{adapter: adapter} = state) do
def handle_call(
{:subscribe, consumer},
{consumer_pid, _tag},
%State{amqp_conn: conn, adapter: adapter, channels: channels} = state
) do
channel = adapter.create_channel(conn)
channel_ref = :erlang.make_ref()

adapter.subscribe(channel, consumer_pid, consumer)

new_channels = Map.put(channels, channel_ref, {consumer_pid, consumer, channel})

Logger.debug("#{inspect(consumer)} (#{inspect(consumer_pid)}) started")
{:reply, channel_ref, %State{state | channels: new_channels}}
end

def handle_call(
{:reject, channel_ref, tag, requeue},
_from,
%State{adapter: adapter, channels: channels} = state
) do
channel = channel_from_ref(channels, channel_ref)
adapter.reject(channel, tag, requeue: requeue)

{:reply, :rejected, state}
Expand All @@ -85,29 +140,43 @@ defmodule Coney.ConnectionServer do

defp rabbitmq_connect(
%State{
consumers: consumers,
adapter: adapter,
settings: settings,
topology: topology
} = state
) do
conn = adapter.open(settings)
adapter.init_topology(conn, topology)
start_consumers(consumers, adapter, conn)

ignaciogoldchluk-yolo marked this conversation as resolved.
Show resolved Hide resolved
ConnectionRegistry.connected(self())

{:noreply, %State{state | amqp_conn: conn}}
%State{state | amqp_conn: conn}
end

defp start_consumers(consumers, adapter, conn) do
Enum.each(consumers, fn consumer ->
subscribe_chan = adapter.create_channel(conn)
defp channel_from_ref(channels, channel_ref) do
{_consumer_pid, _consumer, channel} = Map.fetch!(channels, channel_ref)

{:ok, pid} = ConsumerSupervisor.start_consumer(consumer, subscribe_chan)
adapter.subscribe(subscribe_chan, pid, consumer)
channel
end

defp update_channels(%State{amqp_conn: conn, adapter: adapter, channels: channels} = state) do
new_channels =
Map.new(channels, fn {channel_ref, {consumer_pid, consumer, _dead_channel}} ->
new_channel = adapter.create_channel(conn)
adapter.subscribe(new_channel, consumer_pid, consumer)

Logger.info("[Coney] - Connection re-restablished for #{inspect(consumer)}")

{channel_ref, {consumer_pid, consumer, new_channel}}
end)

%State{state | channels: new_channels}
end

Logger.debug("#{inspect(consumer)} (#{inspect(pid)}) started")
defp close_channels(channels, adapter) do
Enum.each(channels, fn {_channel_ref, {consumer_pid, consumer, channel}} ->
Logger.info("[Coney] - Closing channel for #{inspect(consumer)} (#{consumer_pid})")
adapter.close_channel(channel)
end)
end
end
4 changes: 4 additions & 0 deletions lib/coney/consumer_executor.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule Coney.ConsumerExecutor do
@moduledoc """
Module responsible for processing a rabbit message and send the response
back to `ConnectionServer`. Started (and monitored) by `ConsumerServer`.
"""
require Logger

alias Coney.{ConnectionServer, ExecutionTask}
Expand Down
17 changes: 14 additions & 3 deletions lib/coney/consumer_server.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
defmodule Coney.ConsumerServer do
@moduledoc """
GenServer for handling RabbitMQ messages. Spawns and monitors one task per message
and forwards the response to `ConnectionServer`.
"""

use GenServer

alias Coney.{ConnectionServer, ConsumerExecutor, ExecutionTask}

require Logger

def start_link([consumer, chan]) do
GenServer.start_link(__MODULE__, [consumer, chan])
def start_link([consumer]) do
GenServer.start_link(__MODULE__, [consumer])
end

def init([consumer, chan]) do
@impl GenServer
def init([consumer]) do
chan = ConnectionServer.subscribe(consumer)

Logger.info("[Coney] - Started consumer #{inspect(consumer)}")

{:ok, %{consumer: consumer, chan: chan, tasks: %{}}}
end

@impl GenServer
def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, state) do
{:noreply, state}
end
Expand Down
21 changes: 12 additions & 9 deletions lib/coney/consumer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
defmodule Coney.ConsumerSupervisor do
use DynamicSupervisor
@moduledoc """
Supervisor for all ConsumerServer of the application.
"""
use Supervisor

def start_link(_args) do
DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end
alias Coney.ConsumerServer

def init([]) do
DynamicSupervisor.init(strategy: :one_for_one)
def start_link([consumers]) do
Supervisor.start_link(__MODULE__, [consumers], name: __MODULE__)
end

def start_consumer(consumer, chan) do
spec = {Coney.ConsumerServer, [consumer, chan]}
DynamicSupervisor.start_child(__MODULE__, spec)
@impl Supervisor
def init([consumers]) do
children = Enum.map(consumers, fn consumer -> {ConsumerServer, [consumer]} end)

Supervisor.init(children, strategy: :one_for_one)
end
end
Loading