Skip to content
Mike Perham edited this page Nov 13, 2017 · 4 revisions

Middleware allows you to monitor, control or change jobs as they come from or go to Faktory.

FWR has a similar notion of middleware to Sidekiq: "client" middleware runs around a "push" to Faktory. "worker" middleware runs around the execution of a job in FWR.

# A "client" process is any process which is NOT FWR: Puma, Passenger, etc
# and will be using the Faktory Client API only to produce jobs.
Faktory.configure_client do |config|
  config.client_middleware do |chain|
    chain.add SomeClientMiddleware
  end
end

# A "worker" process is a `faktory-worker` process: which is producing and consuming jobs.
# This was called "server" in Sidekiq.
Faktory.configure_worker do |config|
  config.client_middleware do |chain|
    chain.add SomeClientMiddleware
  end
  config.worker_middleware do |chain|
    chain.add SomeWorkerMiddleware
  end
end

Client-side

A client middleware can add data elements to the job payload or conditionally stop the push of a job to Faktory:

class MyClientHook
  def call(job, conn_pool)
    puts "Before push"
    result = yield
    puts "After push"
    result
  end
end

Worker-side

A worker middleware wraps the execution of a job; you can set up context-specific variables, specially handle some error cases or many other possible use cases.

class MyWorkerHook
  def call(worker_instance, job)
    puts "Before work"
    yield
    puts "After work"
  end
end

Note in both cases the job variable is the job payload hash that will be converted into JSON before sending to Faktory. Per the Faktory docs, any custom data elements must go in the custom hash.

Full Example

Here's a full, real-world example. Say your app requires the current_user global variable to be set, which is automatically done in Puma. Here's how to propagate that user to all your Faktory jobs also!

class PropagateCurrentUser
  def call(job, conn_pool)
    job['custom'] ||= {}
    job['custom']['uid'] = current_user.id
    yield
  end
end

class RestoreCurrentUser
  def call(worker_instance, job)
    uid = job.dig('custom', 'uid')
    current_user = User.find(uid) if uid
    yield
  end
end

Faktory.configure_client do |config|
  config.client_middleware do |chain|
    chain.add PropagateCurrentUser
  end
end

Faktory.configure_worker do |config|
  config.client_middleware do |chain|
    chain.add PropagateCurrentUser
  end
  config.worker_middleware do |chain|
    chain.add RestoreCurrentUser
  end
end
Clone this wiki locally