Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix consumer id #26

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
## [3.1.1] (2024-08-16)
## [3.1.2] (2024-08-16)

### Bug fixes
- Fix missing specification for `ConsumerServer` unique id

### Internal
- Prefix modules with `Coney.` to avoid conflicts

## [3.1.1] (2024-08-16) - BROKEN

### Bug fixes
- Fix missing `:name` when starting `ConsumerServer`

## [3.1.0] (2024-08-14)
## [3.1.0] (2024-08-14) - BROKEN

### Enhancements
- Add `:enabled` config value
- `:adapter` config value is now optional and defaults to `Coney.RabbitConnection`

## [3.0.2] (2024-08-12)
## [3.0.2] (2024-08-12) - BROKEN

### Bug fixes
- Fix incorrect termination order where the connection to RabbitMQ was closed before the channels.
Expand Down
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ config :coney,
timeout: 1000
},
workers: [
FakeConsumer
Coney.FakeConsumer,
Coney.OtherFakeConsumer
],
topology: %{
exchanges: [{:topic, "exchange", durable: false}],
Expand Down
24 changes: 0 additions & 24 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,25 +1 @@
import Config

config :coney,
topology: %{
exchanges: [{:topic, "exchange", durable: false}],
queues: %{
"queue" => %{
options: [
durable: false
],
bindings: [
[exchange: "exchange", options: [routing_key: "queue"]]
]
}
}
},
pool_size: 1,
auto_start: true,
settings: %{
url: "amqp://guest:guest@localhost:5672",
timeout: 1000
},
workers: [
FakeConsumer
]
7 changes: 7 additions & 0 deletions lib/coney/consumer_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ defmodule Coney.ConsumerServer do

require Logger

def child_spec([consumer]) do
%{
id: consumer,
start: {__MODULE__, :start_link, [[consumer]]}
}
end

def start_link([consumer]) do
GenServer.start_link(__MODULE__, [consumer], name: consumer)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Coney.Mixfile do
def project do
[
app: :coney,
version: "3.1.1",
version: "3.1.2",
elixir: ">= 1.12.0",
build_embedded: Mix.env() == :prod,
start_permanent: Mix.env() == :prod,
Expand Down
6 changes: 3 additions & 3 deletions test/lib/coney/connection_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule Coney.ConnectionServerTest do
# Subscribe a channel
assert {:reply, channel_ref, connected_state} =
ConnectionServer.handle_call(
{:subscribe, FakeConsumer},
{:subscribe, Coney.FakeConsumer},
{self(), :erlang.make_ref()},
state
)
Expand Down Expand Up @@ -91,7 +91,7 @@ defmodule Coney.ConnectionServerTest do
# Subscribe a channel
assert {:reply, channel_ref, new_state} =
ConnectionServer.handle_call(
{:subscribe, FakeConsumer},
{:subscribe, Coney.FakeConsumer},
{self(), :erlang.make_ref()},
state
)
Expand All @@ -100,7 +100,7 @@ defmodule Coney.ConnectionServerTest do

pid = self()

assert {^pid, FakeConsumer, _} = Map.get(new_state.channels, channel_ref)
assert {^pid, Coney.FakeConsumer, _} = Map.get(new_state.channels, channel_ref)
end
end
end
8 changes: 4 additions & 4 deletions test/lib/coney/consumer/consumer_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ defmodule ConsumerServerTest do
alias Coney.ConsumerServer

setup do
ref = Coney.ConnectionServer.subscribe(FakeConsumer)
ref = Coney.ConnectionServer.subscribe(Coney.FakeConsumer)

[
args: [FakeConsumer],
state: %{consumer: FakeConsumer, tasks: %{}, chan: ref}
args: [Coney.FakeConsumer],
state: %{consumer: Coney.FakeConsumer, tasks: %{}, chan: ref}
]
end

Expand Down Expand Up @@ -46,7 +46,7 @@ defmodule ConsumerServerTest do

describe "handle_info/2" do
setup do
%{state: %{consumer: FakeConsumer, tasks: Map.new(), chan: :erlang.make_ref()}}
%{state: %{consumer: Coney.FakeConsumer, tasks: Map.new(), chan: :erlang.make_ref()}}
end

test "demonitors a task once it completes successfully", %{state: state} do
Expand Down
2 changes: 1 addition & 1 deletion test/support/fake_consumer.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule FakeConsumer do
defmodule Coney.FakeConsumer do
@behaviour Coney.Consumer

def connection do
Expand Down
28 changes: 28 additions & 0 deletions test/support/other_fake_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule Coney.OtherFakeConsumer do
@behaviour Coney.Consumer

def connection do
%{
prefetch_count: 10,
queue: "queue"
}
end

def parse(payload, _meta) do
payload
end

def process(payload, _meta) do
case payload do
:ok -> :ok
:reject -> :reject
:reply -> {:reply, :data}
:exception -> raise "Exception happen"
_other -> :ok
end
end

def error_happened(_exception, _payload, _meta) do
:ok
end
end