Skip to content

Commit

Permalink
Add retry_standard_errors config for SQS ActiveJob (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
alextwoods authored Mar 1, 2024
1 parent 5c19133 commit a81699b
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - Add `retry_standard_errors` (default `true`) in SQS ActiveJob and improve retry logic (#114).

3.10.0 (2024-01-19)
------------------

Expand Down
36 changes: 31 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,31 @@ YourJob.set(wait: 1.minute).perform_later(args)
Note: Due to limitations in SQS, you cannot schedule jobs for
later than 15 minutes in the future.

### Performance
AWS SQS ActiveJob is a lightweight and performant queueing backend. Benchmark performed using: Ruby MRI 2.6.5,
shoryuken 5.0.5, aws-sdk-rails 3.3.1 and aws-sdk-sqs 1.34.0 on a 2015 Macbook Pro dual-core i7 with 16GB ram.
### Retry Behavior and Handling Errors
See the Rails ActiveJob Guide on
[Exceptions](https://guides.rubyonrails.org/active_job_basics.html#exceptions)
for background on how ActiveJob handles exceptions and retries.

In general - you should configure retries for your jobs using
[retry_on](https://edgeapi.rubyonrails.org/classes/ActiveJob/Exceptions/ClassMethods.html#method-i-retry_on).
When configured, ActiveJob will catch the exception and reschedule the job for
re-execution after the configured delay. This will delete the original
message from the SQS queue and requeue a new message.

By default SQS ActiveJob is configured with `retry_standard_error` set to `true`
and will not delete messages for jobs that raise a `StandardError` and that do
not handle that error via `retry_on` or `discard_on`. These job messages
will remain on the queue and will be re-read and retried following the
SQS Queue's configured
[retry and DLQ settings](https://docs.aws.amazon.com/lambda/latest/operatorguide/sqs-retries.html).
If you do not have a DLQ configured, the message will continue to be attempted
until it reaches the queues retention period. In general, it is a best practice
to configure a DLQ to store unprocessable jobs for troubleshooting and redrive.

If you want failed jobs that do not have `retry_on` or `discard_on` configured
to be immediately discarded and not left on the queue, set `retry_standard_error`
to `false`. See the configuration section below for details.

*AWS SQS ActiveJob* (default settings): Throughput 119.1 jobs/sec
*Shoryuken* (default settings): Throughput 76.8 jobs/sec

### Running workers - polling for jobs
To start processing jobs, you need to start a separate process
Expand All @@ -325,6 +344,13 @@ Note: When running in production, its recommended that use a process
supervisor such as [foreman](https://github.com/ddollar/foreman), systemd,
upstart, daemontools, launchd, runit, ect.

### Performance
AWS SQS ActiveJob is a lightweight and performant queueing backend. Benchmark performed using: Ruby MRI 2.6.5,
shoryuken 5.0.5, aws-sdk-rails 3.3.1 and aws-sdk-sqs 1.34.0 on a 2015 Macbook Pro dual-core i7 with 16GB ram.

*AWS SQS ActiveJob* (default settings): Throughput 119.1 jobs/sec
*Shoryuken* (default settings): Throughput 76.8 jobs/sec

### Serverless workers: processing activejobs using AWS Lambda
Rather than managing the worker processes yourself, you can use Lambda with an SQS Trigger.
With [Lambda Container Image Support](https://aws.amazon.com/blogs/aws/new-for-aws-lambda-container-image-support/)
Expand Down
11 changes: 11 additions & 0 deletions lib/aws/rails/sqs_active_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Configuration
DEFAULTS = {
max_messages: 10,
shutdown_timeout: 15,
retry_standard_errors: true, # TODO: Remove in next MV
queues: {},
logger: ::Rails.logger,
message_group_id: 'SqsActiveJobGroup',
Expand Down Expand Up @@ -64,6 +65,16 @@ class Configuration
# will not be deleted from the SQS queue and will be retryable after
# the visibility timeout.
#
# @ option options [Boolean] :retry_standard_errors
# If `true`, StandardErrors raised by ActiveJobs are left on the queue
# and will be retried (pending the SQS Queue's redrive/DLQ/maximum receive settings).
# This behavior overrides the standard Rails ActiveJob
# [Retry/Discard for failed jobs](https://guides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs)
# behavior. When set to `true` the retries provided by this will be
# on top of any retries configured on the job with `retry_on`.
# When `false`, retry behavior is fully configured
# through `retry_on`/`discard_on` on the ActiveJobs.
#
# @option options [ActiveSupport::Logger] :logger Logger to use
# for the poller.
#
Expand Down
13 changes: 11 additions & 2 deletions lib/aws/rails/sqs_active_job/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ class Executor
auto_terminate: true,
idletime: 60, # 1 minute
fallback_policy: :caller_runs # slow down the producer thread
# TODO: Consider catching the exception and sleeping instead of using :caller_runs
}.freeze

def initialize(options = {})
@executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options))
@retry_standard_errors = options[:retry_standard_errors]
@logger = options[:logger] || ActiveSupport::Logger.new($stdout)
end

# TODO: Consider catching the exception and sleeping instead of using :caller_runs
def execute(message)
@executor.post(message) do |message|
begin
Expand All @@ -31,10 +32,18 @@ def execute(message)
rescue Aws::Json::ParseError => e
@logger.error "Unable to parse message body: #{message.data.body}. Error: #{e}."
rescue StandardError => e
# message will not be deleted and will be retried
job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job'
@logger.info "Error processing job #{job_msg}: #{e}"
@logger.debug e.backtrace.join("\n")

if @retry_standard_errors && !job.exception_executions?
@logger.info(
'retry_standard_errors is enabled and job has not ' \
"been retried by Rails. Leaving #{job_msg} in the queue."
)
else
message.delete
end
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/aws/rails/sqs_active_job/job_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def initialize(message)
def run
ActiveJob::Base.execute @job_data
end

def exception_executions?
@job_data['exception_executions'] &&
!@job_data['exception_executions'].empty?
end
end
end
end
Expand Down
16 changes: 14 additions & 2 deletions lib/aws/rails/sqs_active_job/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class Poller
threads: 2 * Concurrent.processor_count,
max_messages: 10,
shutdown_timeout: 15,
backpressure: 10
backpressure: 10,
retry_standard_errors: true
}.freeze

def initialize(args = ARGV)
Expand Down Expand Up @@ -45,7 +46,12 @@ def run

Signal.trap('INT') { raise Interrupt }
Signal.trap('TERM') { raise Interrupt }
@executor = Executor.new(max_threads: @options[:threads], logger: @logger, max_queue: @options[:backpressure])
@executor = Executor.new(
max_threads: @options[:threads],
logger: @logger,
max_queue: @options[:backpressure],
retry_standard_errors: @options[:retry_standard_errors]
)

poll
rescue Interrupt
Expand Down Expand Up @@ -99,6 +105,7 @@ def boot_rails
require File.expand_path('config/environment.rb')
end

# rubocop:disable Metrics
def parse_args(argv)
out = {}
parser = ::OptionParser.new do |opts|
Expand Down Expand Up @@ -127,6 +134,10 @@ def parse_args(argv)
'The amount of time to wait for a clean shutdown. Jobs that are unable to complete in this time will not be deleted from the SQS queue and will be retryable after the visibility timeout.') do |a|
out[:shutdown_timeout] = a
end
opts.on('--[no-]retry_standard_errors [FLAG]', TrueClass,
'When set, retry all StandardErrors (leaving failed messages on the SQS Queue). These retries are ON TOP of standard Rails ActiveJob retries set by retry_on in the ActiveJob.') do |a|
out[:retry_standard_errors] = a.nil? ? true : a
end
end

parser.banner = 'aws_sqs_active_job [options]'
Expand All @@ -138,6 +149,7 @@ def parse_args(argv)
parser.parse(argv)
out
end
# rubocop:enable Metrics

def validate_config
raise ArgumentError, 'You must specify the name of the queue to process jobs from' unless @options[:queue]
Expand Down
2 changes: 1 addition & 1 deletion sample_app/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ gem "sprockets-rails"
# Use sqlite3 as the database for Active Record
gem 'sqlite3', '~> 1.4'
# Use Puma as the app server
gem 'puma', '~> 5.0'
gem 'puma', '~> 6.0'
# Use SCSS for stylesheets
gem 'sass-rails', '>= 6'
# Transpile app-like JavaScript. Read more: https://github.com/rails/webpacker
Expand Down
11 changes: 11 additions & 0 deletions sample_app/app/jobs/hello_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
class HelloJob < ApplicationJob
queue_as :default

class NameException < StandardError; end

class SkipException < StandardError; end

retry_on NameException
discard_on SkipException

def perform(name)
raise NameException if name == "error"
raise SkipException if name == "skip"
raise StandardError if name == "StandardError"

puts "Hello from our job: #{name}"
end
end
3 changes: 2 additions & 1 deletion sample_app/config/aws_sqs_active_job.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
queues:
default: 'https://sqs.us-east-1.amazonaws.com/655347895545/ActiveJobDefault'
default: <%= ENV['AWS_ACTIVE_JOB_QUEUE_URL'] %>
shutdown_timeout: 10
retry_standard_errors: true
16 changes: 14 additions & 2 deletions test/aws/rails/sqs_active_job/executor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,25 @@ module SqsActiveJob
executor.shutdown # give the job a chance to run
end

it 'does not delete the message on exception' do
it 'deletes the message on exception' do
expect(JobRunner).to receive(:new).and_return(runner)
expect(runner).to receive(:run).and_raise StandardError
expect(msg).not_to receive(:delete)
expect(msg).to receive(:delete)
executor.execute(msg)
executor.shutdown # give the job a chance to run
end

describe 'retry_standard_errors' do
let(:executor) { Executor.new(retry_standard_errors: true) }

it 'does not delete the message on exception' do
expect(JobRunner).to receive(:new).and_return(runner)
expect(runner).to receive(:run).and_raise StandardError
expect(msg).not_to receive(:delete)
executor.execute(msg)
executor.shutdown # give the job a chance to run
end
end
end

describe '#shutdown' do
Expand Down

0 comments on commit a81699b

Please sign in to comment.