-
Notifications
You must be signed in to change notification settings - Fork 1
Use Cases
There are a variety of use cases that can be expressed through harmoniser:
A Exclusive Consumer ensures that only one consumer processes messages from a queue at a time. Attempting to register more than one to the same queue, will result in a channel error (e.g. ACCESS_REFUSED - queue 'my_first_queue' in vhost '/' in exclusive use
). This can be easily set using harmoniser as follows:
# This code assumes that Harmoniser.configure is already setup
class Subscriber
include Harmoniser::Subscriber
# Define the consumer property exclusive below
harmoniser_subscriber queue_name: "a_queue", exclusive: true
class << self
def on_delivery(delivery_info, properties, payload)
Harmoniser.logger.info("Message processed: queue = `#{delivery_info.consumer.queue}`, consumer_tag = `#{delivery_info.consumer_tag}`, payload = `#{payload}`")
end
end
end
It is the application responsibility to register a new one if the exclusive consumer is cancelled or dies. If you'd like to have exclusive consumption and transparent continuity, please consider setting up Single Active Consumer instead.
A Single Active Consumer ensures that only one consumer processes messages from a queue at a time, with automatic failover to another registered consumer if the active one is cancelled or goes offline. This approach is beneficial when it's important that messages are delivered and processed in the exact order they arrive.
In order to define a queue with a single consumer, it is necessary to specify x-single-active-consumer as part of the arguments keyword. Then, add a subscriber pointing to that queue. See below how easy is to setup this configuration using harmoniser:
Harmoniser.configure do |config|
config.connection_opts = {
host: "rabbitmq"
}
config.define_topology do |topology|
# Define arguments property when declaring the queue within the topology
topology.add_queue("a_queue", arguments: { "x-single-active-consumer": true })
topology.add_binding("an_exchange", "a_queue")
topology.declare
end
end
class Subscriber
include Harmoniser::Subscriber
harmoniser_subscriber queue_name: "a_queue"
class << self
def on_delivery(delivery_info, properties, payload)
Harmoniser.logger.info("Message processed: queue = `#{delivery_info.consumer.queue}`, consumer_tag = `#{delivery_info.consumer_tag}`, payload = `#{payload}`")
end
end
end
To see this in action, spin up several harmoniser processes and notice that only one of the processes actually receives messages. Please take into account the following:
- To process messages in the exact order they arrive, make sure no concurrency option is passed when starting a harmoniser process.
- Be aware that the active subscriber when configured with automatic acknowledgement (ack) will receive any published message and the messages delivered but not yet processed might be lost if the process finishes before processing them.
- In the event of receiving an OS signal (e.g. INT, TERM, USR1), harmoniser process will cancel the active subscriber, and a new one (if available) will be selected. If there are messages already delivered to the existing active subscriber but not yet processed, both subscribers may end up actively processing messages.