Skip to content

Latest commit

 

History

History
308 lines (239 loc) · 7.52 KB

README.md

File metadata and controls

308 lines (239 loc) · 7.52 KB

Coney

Hex.pm Version Build Status

Consumer server for RabbitMQ with message publishing functionality.

Table of Contents

Installation

Add Coney as a dependency in your mix.exs file.

def deps do
  [{:coney, "~> 3.0"}]
end

After you are done, run mix deps.get in your shell to fetch and compile Coney.

Setup a consumer server

Default config:

# config/config.exs
config :coney,
  auto_start: true,
  settings: %{
    url: "amqp://guest:guest@localhost", # or ["amqp://guest:guest@localhost", "amqp://guest:guest@other_host"]
    timeout: 1000
  }

If you need to create exchanges or queues before starting the consumer, you can define your RabbitMQ topology as follows:

  config :coney,
    topology: %{
      exchanges: [{:topic, "my_exchange", durable: true}],
      queues: %{
        "my_queue" => %{
          options: [
            durable: true,
            arguments: [
              {"x-dead-letter-exchange", :longstr, "dlx_exchange"},
              {"x-message-ttl", :signedint, 60000}
            ]
          ],
          bindings: [
            [exchange: "my_exchange", options: [routing_key: "my_queue"]]
          ]
        }
      }
    }

Also, you can create a confuguration module (if you want to retreive settings from Consul or something else):

# config/config.exs
config :coney,
  auto_start: true,
  settings: RabbitConfig,
  topology: RabbitConfig
defmodule RabbitConfig do
  def settings do
    %{
      url: "amqp://guest:guest@localhost",
      timeout: 1000
    }
  end

  def topology do
    %{
      exchanges: [{:topic, "my_exchange", durable: true}],
      queues: %{
        "my_queue" => %{
          options: [
            durable: true,
            arguments: [
              {"x-dead-letter-exchange", :longstr, "exchange"},
              {"x-message-ttl", :signedint, 60000}
            ]
          ],
          bindings: [
            [exchange: "my_exchange", options: [routing_key: "my_queue"]]
          ]
        }
      }
    }
  end
end

If you don't want to automatically start Coney and want to control it's start, you can set auto_start to false and add Coney supervisor into yours:

# config/config.exs
config :coney, auto_start: false
defmodule YourApplication do
  use Application

  def start(_type, _args) do
    Supervisor.start_link([Coney.ApplicationSupervisor], [strategy: :one_for_one])
  end
end

If you want to disable Coney altogether (useful for testing config) set enabled: false

# config/config.exs
config :coney, enabled: false, settings: %{}, topology: %{}

Configure consumers

# config/queues.exs

config :coney,
  workers: [
    MyApplication.MyConsumer
  ]
# also you can define mapping like this and skip it in consumer module:
  workers: [
    %{
      connection: %{
        prefetch_count: 10,
        queue: "my_queue"
      },
      worker: MyApplication.MyConsumer
    }
  ]
# web/consumers/my_consumer.ex

defmodule MyApplication.MyConsumer do
  @behaviour Coney.Consumer

  def connection do
    %{
      prefetch_count: 10,
      queue: "my_queue",
      consumer_tag: "MyApp - MyConsumer" # optional
    }
  end

  def parse(payload, _meta) do
    String.to_integer(payload)
  end

  def process(number, _meta) do
    if number <= 10 do
      :ok
    else
      :reject
    end
  end

  # Be careful here, if call of `error_happened` will raise an exception,
  # message will be not handled properly and may be left unacked in a queue
  def error_happened(exception, payload, _meta) do
    IO.puts "Exception raised with #{ payload }"
    :redeliver
  end
end

Rescuing exceptions

If exception was happened during calls of parse or process functions, by default Coney will reject this message. If you want to add additional functionality in order to handle exception in a special manner, you can implement one of error_happened/3 or error_happened/4 callbacks. But be careful, if call of error_happened will raise an exception, message will be not handled properly and may be left unacked in a queue.

error_happened/3

This callback receives exception, original payload and meta as parameters. Response format is the same as in process callback.

error_happened/4

This callback receives exception, stacktrace, original payload and meta as parameters. Response format is the same as in process callback.

.process/2 and .error_happened return format

  1. :ok - ack message.
  2. :reject - reject message.
  3. :redeliver - return message to the queue.
  4. {:reply, binary} - response will be published to reply exchange.

Reply description

To use {:reply, binary} you should add response exchange in connection:

# web/consumers/my_consumer.ex

def connection do
  %{
    # ...
    respond_to: "response_exchange"
  }
end

Response will be published to "response_exchange" exchange.

The default exchange

To use the default exchange you may declare exchange as :default:

%{
    exchanges: [:default],
}

The following format is also acceptable:

%{
  exchanges: [{:direct, ""}]
}

Or you can just skip it in the exchanges settings and setup the queue in the consumer's settings:

%{
  prefetch_count: 10,
  queue: "my_queue"
}

Publish message

Coney.publish("exchange", "message")

# or

Coney.publish("exchange", "routing_key", "message")

Checking connections

You can useConey.status/0 if you need to get information about RabbitMQ connections:

iex> Coney.status()
[{#PID<0.972.0>, :connected}]

Result is a list of tuples, where first element in tuple is a pid of running connection server and second element describes connection status.

Connection status can be:

  • :pending - when coney just started
  • :connected - when RabbitMQ connection has been established and all consumers have been started
  • :disconnected - when coney lost connection to RabbitMQ

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/coingaming/coney.

Running test suite locally

  1. Start the RabbitMQ instance via docker compose up.
  2. Run mix test.

Architecture

  graph TD;
      A[ApplicationSupervisor - Supervisor] --> B[ConsumerSupervisor - Supervisor];
      A --> C[ConnectionServer - GenServer];
      B -- supervises many --> D[ConsumerServer - GenServer];
      D -- monitors --> E[ConsumerExecutor];
      E -- sends messages to --> C;
      D -- opens AMQP conns via --> C;
Loading

License

The library is available as open source under the terms of the MIT License.