Skip to content

Commit

Permalink
Add support for FIFO queues (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
alextwoods authored Jan 6, 2021
1 parent 786c5a5 commit a29e8c6
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - Add support for FIFO Queues to AWS SQS ActiveJob.

3.4.0 (2020-12-07)
------------------

Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ Aws::Rails.instrument_sdk_operations
Events are published for each client operation call with the following event
name: <operation>.<serviceId>.aws. For example, S3's put_object has an event
name of: `put_object.S3.aws`. The service name will always match the
namespace of the service client (eg Aws::S3::Client => 'S3').
namespace of the service client (eg Aws::S3::Client => 'S3').
The payload of the event is the
[request context](https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Seahorse/Client/RequestContext.html).

Expand Down Expand Up @@ -379,6 +379,20 @@ Aws::Rails::SqsActiveJob.configure do |config|
end
```

### Using FIFO queues

If the order in which your jobs executes is important, consider using a
[FIFO Queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html).
A FIFO queue ensures that messages are processed in the order they were sent
(First-In-First-Out) and exactly-once processing (ensuring duplicates are never
introduced into the queue). To use a fifo queue, simply set the queue url (which will end in ".fifo")
in your config. You can also configure a `custom message_group_id` that
will be used by all jobs.

When using FIFO queues, jobs will NOT be processed concurrently by the poller
to ensure the correct ordering. Additionally, all jobs on a FIFO queue will be queued
synchronously, even if you have configured the `amazon_sqs_async` adapter.

## AWS Record Generators

This package also pulls in the [`aws-record` gem](https://github.com/aws/aws-sdk-ruby-record)
Expand Down
14 changes: 13 additions & 1 deletion lib/active_job/queue_adapters/amazon_sqs_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def enqueue(job)
_enqueue(job)
end

def enqueue_at(job, timestamp)
def enqueue_at(job, timestamp, opts={})
delay = (timestamp - Time.now.to_f).floor
raise ArgumentError, 'Unable to queue a job with a delay great than 15 minutes' if delay > 15.minutes
_enqueue(job, delay_seconds: delay)
Expand All @@ -25,6 +25,18 @@ def _enqueue(job, send_message_opts = {})
send_message_opts[:queue_url] = queue_url
send_message_opts[:message_body] = Aws::Json.dump(body)
send_message_opts[:message_attributes] = message_attributes(job)

if Aws::Rails::SqsActiveJob.fifo?(queue_url)
# job_id is unique per initialization of job
# Remove it from message dup id to ensure run-once behavior
# with ActiveJob retries
send_message_opts[:message_deduplication_id] =
Digest::SHA256.hexdigest(
Aws::Json.dump(body.except('job_id'))
)

send_message_opts[:message_group_id] = Aws::Rails::SqsActiveJob.config.message_group_id
end
Aws::Rails::SqsActiveJob.config.client.send_message(send_message_opts)
end

Expand Down
18 changes: 12 additions & 6 deletions lib/active_job/queue_adapters/amazon_sqs_async_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ class AmazonSqsAsyncAdapter < AmazonSqsAdapter
private

def _enqueue(job, send_message_opts = {})
Concurrent::Promise
.execute { super(job, send_message_opts) }
.on_error do |e|
Rails.logger.error "Failed to queue job #{job}. Reason: #{e}"
error_handler = Aws::Rails::SqsActiveJob.config.async_queue_error_handler
error_handler.call(e, job, send_message_opts) if error_handler
# FIFO jobs must be queued in order, so do not queue async
queue_url = Aws::Rails::SqsActiveJob.config.queue_url_for(job.queue_name)
if Aws::Rails::SqsActiveJob.fifo?(queue_url)
super(job, send_message_opts)
else
Concurrent::Promise
.execute { super(job, send_message_opts) }
.on_error do |e|
Rails.logger.error "Failed to queue job #{job}. Reason: #{e}"
error_handler = Aws::Rails::SqsActiveJob.config.async_queue_error_handler
error_handler.call(e, job, send_message_opts) if error_handler
end
end
end
end
Expand Down
15 changes: 13 additions & 2 deletions lib/aws/rails/sqs_active_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ def self.configure
yield(config)
end

def self.fifo?(queue_url)
queue_url.ends_with? '.fifo'
end

# Configuration for AWS SQS ActiveJob.
# Use +Aws::Rails::SqsActiveJob.config+ to access the singleton config instance.
class Configuration
Expand All @@ -25,12 +29,14 @@ class Configuration
visibility_timeout: 120,
shutdown_timeout: 15,
queues: {},
logger: ::Rails.logger
logger: ::Rails.logger,
message_group_id: 'SqsActiveJobGroup'
}

# @api private
attr_accessor :queues, :max_messages, :visibility_timeout,
:shutdown_timeout, :client, :logger, :async_queue_error_handler
:shutdown_timeout, :client, :logger,
:async_queue_error_handler, :message_group_id

# Don't use this method directly: Confugration is a singleton class, use
# +Aws::Rails::SqsActiveJob.config+ to access the singleton config.
Expand Down Expand Up @@ -63,6 +69,11 @@ class Configuration
# Override file to load configuration from. If not specified will
# attempt to load from config/aws_sqs_active_job.yml.
#
# @option options [String] :message_group_id (SqsActiveJobGroup)
# The message_group_id to use for queueing messages on a fifo queues.
# Applies only to jobs queued on FIFO queues.
# See the (SQS FIFO Documentation)[https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html]
#
# @option options [Callable] :async_queue_error_handler An error handler
# to be called when the async active job adapter experiances an error
# queueing a job. Only applies when
Expand Down
12 changes: 11 additions & 1 deletion lib/aws/rails/sqs_active_job/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,22 @@ def poll
@logger.info "Polling on: #{@options[:queue]} => #{queue_url}"
client = Aws::Rails::SqsActiveJob.config.client
@poller = Aws::SQS::QueuePoller.new(queue_url, client: client)
single_message = @options[:max_messages] == 1
poller_options = {
skip_delete: true,
max_number_of_messages: @options[:max_messages],
visibility_timeout: @options[:visibility_timeout]
}
# Limit max_number_of_messages for FIFO queues to 1
# this ensures jobs with the same message_group_id are processed
# in order
# Jobs with different message_group_id will be processed in
# parallel and may be out of order.
if Aws::Rails::SqsActiveJob.fifo?(queue_url)
poller_options[:max_number_of_messages] = 1
end

single_message = poller_options[:max_number_of_messages] == 1

@poller.poll(poller_options) do |msgs|
msgs = [msgs] if single_message
@logger.info "Processing batch of #{msgs.length} messages"
Expand Down
14 changes: 14 additions & 0 deletions test/active_job/queue_adapters/amazon_sqs_adapter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ module QueueAdapters
sleep(0.1)
end

it 'adds message_deduplication_id and message_group_id to fifo queues' do
allow(Aws::Rails::SqsActiveJob.config).to receive(:queue_url_for).and_return('https://queue-url.fifo')
expect(client).to receive(:send_message)
.with(
queue_url: 'https://queue-url.fifo',
message_body: instance_of(String),
message_attributes: instance_of(Hash),
message_group_id: instance_of(String),
message_deduplication_id: instance_of(String)
)
TestJob.perform_later('test')
sleep(0.1)
end

it 'enqueues delayed jobs' do
t1 = Time.now
allow(Time).to receive(:now).and_return t1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ module QueueAdapters

expect(@error_handled).to be true
end

it 'queues jobs to fifo queues synchronously' do
allow(Aws::Rails::SqsActiveJob.config).to receive(:queue_url_for).and_return('https://queue-url.fifo')
expect(Concurrent::Promise).not_to receive(:execute)
expect(client).to receive(:send_message)

TestJob.perform_later('test')
sleep(0.1)
end
end
end
end
17 changes: 16 additions & 1 deletion test/aws/rails/sqs_active_job/poller_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ module SqsActiveJob
poller.run
end

it 'sets max_number_of_messages to 1 for fifo queues' do
allow(poller).to receive(:boot_rails) # no-op the boot

allow(Aws::Rails::SqsActiveJob.config).to receive(:queue_url_for).and_return('https://queue-url.fifo')
expect(Aws::SQS::QueuePoller).to receive(:new).and_return(queue_poller)

expect(queue_poller).to receive(:poll).with(
skip_delete: true,
max_number_of_messages: 1,
visibility_timeout: 360
)

poller.run
end

it 'polls for messages and executes them' do
allow(poller).to receive(:boot_rails) # no-op the boot

Expand Down Expand Up @@ -109,4 +124,4 @@ module SqsActiveJob
end
end
end
end
end

0 comments on commit a29e8c6

Please sign in to comment.