Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ spark_locals_without_parens = [
trigger: 1,
trigger: 2,
trigger_once?: 1,
use_tenant_from_record?: 1,
where: 1,
worker_module_name: 1,
worker_opts: 1,
Expand Down
2 changes: 2 additions & 0 deletions documentation/dsls/DSL-AshOban.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ end
| [`domain`](#oban-domain){: #oban-domain } | `module` | | The Domain to use when calling actions on this resource. Defaults to the resource's domain. |
| [`list_tenants`](#oban-list_tenants){: #oban-list_tenants } | `list(any) \| (-> any) \| module` | `[nil]` | A list of tenants or a function behaviour that returns a list of tenants a trigger should be run for. Can be overwritten on the trigger level. |
| [`shared_context?`](#oban-shared_context?){: #oban-shared_context? } | `boolean` | `false` | If set to `true`, the `ash_oban?: true` flag will be placed in shared context instead of regular context. Shared context propagates to related actions called via `manage_relationship` and can be passed to other action invocations using the context as scope, making it easier to detect AshOban execution in nested actions. Can be overridden per trigger or scheduled action. |
| [`use_tenant_from_record?`](#oban-use_tenant_from_record?){: #oban-use_tenant_from_record? } | `boolean` | `false` | Default value for `use_tenant_from_record?` for all triggers in this resource. When set to `true`, tenants will be extracted from each record's tenant attribute and used when workers process those records. This allows schedulers to use multitenancy `:allow_global` read actions to find records across all tenants, while worker actions still run with the correct tenant context for each record. Can be overridden per trigger. |


### oban.triggers
Expand Down Expand Up @@ -144,6 +145,7 @@ end
| [`on_error`](#oban-triggers-trigger-on_error){: #oban-triggers-trigger-on_error } | `atom` | | An update action to call after the last attempt has failed. See the getting started guide for more. |
| [`on_error_fails_job?`](#oban-triggers-trigger-on_error_fails_job?){: #oban-triggers-trigger-on_error_fails_job? } | `boolean` | `false` | Determines if the oban job will be failed on the last attempt when there is an on_error handler that is called. If there is no on_error, then the action is always marked as failed on the last attempt. |
| [`shared_context?`](#oban-triggers-trigger-shared_context?){: #oban-triggers-trigger-shared_context? } | `boolean` | | If set to `true`, the `ash_oban?: true` flag will be placed in shared context instead of regular context. Shared context propagates to related actions called via `manage_relationship` and can be passed to other action invocations using the context as scope, making it easier to detect AshOban execution in nested actions. If not specified, inherits the global `shared_context?` setting from the `oban` section. |
| [`use_tenant_from_record?`](#oban-triggers-trigger-use_tenant_from_record?){: #oban-triggers-trigger-use_tenant_from_record? } | `boolean` | `false` | If set to `true`, the tenant will be extracted from each record's tenant attribute and used when the worker processes that record. This allows the scheduler to use a multitenancy `:allow_global` read action to find records across all tenants, while the worker action still runs with the correct tenant context for each individual record. If not specified, inherits the global `use_tenant_from_record?` setting from the `oban` section. |
| [`worker_opts`](#oban-triggers-trigger-worker_opts){: #oban-triggers-trigger-worker_opts } | `keyword` | `[]` | Options to set on the worker. ATTENTION: this may overwrite options set by ash_oban, make sure you know what you are doing. See [Oban.Worker](https://hexdocs.pm/oban/Oban.Worker.html#module-defining-workers) for options and [Oban.Pro.Worker](https://oban.pro/docs/pro/Oban.Pro.Worker.html) for oban pro |
| [`backoff`](#oban-triggers-trigger-backoff){: #oban-triggers-trigger-backoff } | `pos_integer \| (any -> any) \| :exponential` | `:exponential` | Configure after how much time job should (in seconds) be retried in case of error if more retries available. Can be a number of seconds or a function that takes the job and returns a number of seconds. Will not be executed if default max_attempts value of 1 will be used. See [Oban.Worker](https://hexdocs.pm/oban/Oban.Worker.html#module-customizing-backoff) for more about backoff. backoff 10 backoff fn _job -> 10 end backoff fn %Oban.Job{attempt: attempt} -> 10 * attempt end backoff fn %Oban.Job{attempt: attempt, unsaved_error: unsaved_error} -> %{kind: _, reason: reason, stacktrace: _} = unsaved_error case reason do %MyApp.ApiError{status: 429} -> 300 _ -> trunc(:math.pow(attempt, 4)) end end |
| [`timeout`](#oban-triggers-trigger-timeout){: #oban-triggers-trigger-timeout } | `pos_integer \| (any -> any) \| :infinity` | `:infinity` | Configure timeout for the job in milliseconds. See [Oban.Worker timeout](https://hexdocs.pm/oban/Oban.Worker.html#module-customizing-timeout) for more about timeout. timeout 30_000 timeout fn _job -> :timer.seconds(30) end |
Expand Down
57 changes: 54 additions & 3 deletions lib/ash_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ defmodule AshOban do
__identifier__: atom,
on_error: atom,
on_error_fails_job?: boolean(),
shared_context?: boolean()
shared_context?: boolean(),
use_tenant_from_record?: boolean()
}

defstruct [
Expand Down Expand Up @@ -86,6 +87,7 @@ defmodule AshOban do
:log_final_error?,
:log_errors?,
:shared_context?,
:use_tenant_from_record?,
:__identifier__,
:__spark_metadata__
]
Expand Down Expand Up @@ -312,6 +314,18 @@ defmodule AshOban do
If not specified, inherits the global `shared_context?` setting from the `oban` section.
"""
],
use_tenant_from_record?: [
type: :boolean,
default: false,
doc: """
If set to `true`, the tenant will be extracted from each record's tenant attribute
and used when the worker processes that record. This allows the scheduler to use a
multitenancy `:allow_global` read action to find records across all tenants, while
the worker action still runs with the correct tenant context for each individual record.

If not specified, inherits the global `use_tenant_from_record?` setting from the `oban` section.
"""
],
worker_opts: [
type: :keyword_list,
default: [],
Expand Down Expand Up @@ -551,6 +565,19 @@ defmodule AshOban do
action invocations using the context as scope, making it easier to detect AshOban execution in nested actions.
Can be overridden per trigger or scheduled action.
"""
],
use_tenant_from_record?: [
type: :boolean,
default: false,
doc: """
Default value for `use_tenant_from_record?` for all triggers in this resource.
When set to `true`, tenants will be extracted from each record's tenant attribute
and used when workers process those records. This allows schedulers to use
multitenancy `:allow_global` read actions to find records across all tenants,
while worker actions still run with the correct tenant context for each record.

Can be overridden per trigger.
"""
]
],
sections: [@triggers, @scheduled_actions]
Expand Down Expand Up @@ -584,7 +611,8 @@ defmodule AshOban do
sections: @sections,
imports: [AshOban.Changes.BuiltinChanges],
verifiers: [
AshOban.Verifiers.VerifyModuleNames
AshOban.Verifiers.VerifyModuleNames,
AshOban.Verifiers.VerifyUseTenantFromRecord
],
transformers: [
AshOban.Transformers.SetDefaults,
Expand Down Expand Up @@ -751,6 +779,29 @@ defmodule AshOban do

primary_key = Ash.Resource.Info.primary_key(resource)

tenant =
if opts[:tenant] do
opts[:tenant]
else
if trigger.use_tenant_from_record? do
tenant_attribute = Ash.Resource.Info.multitenancy_attribute(resource)

if tenant_attribute do
case Map.get(record, tenant_attribute) do
%Ash.NotLoaded{} ->
nil

%Ash.ForbiddenField{} ->
nil

tenant ->
{m, f, a} = Ash.Resource.Info.multitenancy_tenant_from_attribute(resource)
apply(m, f, [tenant | a])
end
end
end
end

metadata =
case trigger do
%{read_metadata: read_metadata} when is_function(read_metadata) ->
Expand All @@ -776,7 +827,7 @@ defmodule AshOban do
primary_key: validate_primary_key(Map.take(record, primary_key), resource),
metadata: metadata,
action_arguments: opts[:action_arguments] || %{},
tenant: opts[:tenant]
tenant: tenant
}
|> AshOban.store_actor(opts[:actor], trigger.actor_persister)
|> then(&Map.merge(extra_args, &1))
Expand Down
9 changes: 9 additions & 0 deletions lib/transformers/define_schedulers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ defmodule AshOban.Transformers.DefineSchedulers do
quote do
resource
|> Ash.Query.select(unquote(primary_key))
|> then(fn query ->
tenant_attribute = Ash.Resource.Info.multitenancy_attribute(unquote(resource))

if tenant_attribute do
Ash.Query.ensure_selected(query, tenant_attribute)
else
query
end
end)
|> limit_stream()
end

Expand Down
5 changes: 4 additions & 1 deletion lib/transformers/set_defaults.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ defmodule AshOban.Transformers.SetDefaults do
scheduler_queue: trigger.scheduler_queue || queue,
action: trigger.action || trigger.name,
shared_context?:
trigger.shared_context? || AshOban.Info.oban_shared_context?(dsl) || false
trigger.shared_context? || AshOban.Info.oban_shared_context?(dsl) || false,
use_tenant_from_record?:
trigger.use_tenant_from_record? || AshOban.Info.oban_use_tenant_from_record?(dsl) ||
false
})
end)
end
Expand Down
53 changes: 53 additions & 0 deletions lib/verifiers/verify_use_tenant_from_record.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# SPDX-FileCopyrightText: 2023 ash_oban contributors <https://github.com/ash-project/ash_oban/graphs.contributors>
#
# SPDX-License-Identifier: MIT

defmodule AshOban.Verifiers.VerifyUseTenantFromRecord do
@moduledoc """
Verifies that when `use_tenant_from_record?` is set to true, the multitenancy
`parse_attribute` and `tenant_from_attribute` options are either both at their
defaults or both customized.
"""
use Spark.Dsl.Verifier

def verify(dsl_state) do
module = Spark.Dsl.Verifier.get_persisted(dsl_state, :module)

dsl_state
|> AshOban.Info.oban_triggers()
|> Enum.filter(fn trigger -> trigger.use_tenant_from_record? end)
|> Enum.reduce_while(:ok, fn trigger, acc ->
parse_attribute = Ash.Resource.Info.multitenancy_parse_attribute(module)
tenant_from_attribute = Ash.Resource.Info.multitenancy_tenant_from_attribute(module)

parse_at_default? = identity_mfa?(parse_attribute)
tenant_from_at_default? = identity_mfa?(tenant_from_attribute)

if parse_at_default? != tenant_from_at_default? do
{:halt,
{:error,
Spark.Error.DslError.exception(
module: module,
path: [:oban, :triggers, trigger.name],
message: """
When `use_tenant_from_record?` is true, the multitenancy options
`parse_attribute` and `tenant_from_attribute` must either both be
at their default values or both be customized.

Currently:
- parse_attribute: #{inspect(parse_attribute)} #{if parse_at_default?, do: "(default)", else: "(customized)"}
- tenant_from_attribute: #{inspect(tenant_from_attribute)} #{if tenant_from_at_default?, do: "(default)", else: "(customized)"}

These options are inverses of each other, so if you customize one,
you must customize the other to maintain consistency.
"""
)}}
else
{:cont, acc}
end
end)
end

defp identity_mfa?({Ash.Resource.Dsl, :identity, []}), do: true
defp identity_mfa?(_), do: false
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ defmodule AshOban.MixProject do

oban_dep ++
[
{:ash, ash_version("~> 3.0")},
{:ash, ash_version("~> 3.0 and >=3.8.0")},
{:oban, "~> 2.15"},
{:postgrex, "~> 0.18"},
# dev/test dependencies
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%{
"ash": {:hex, :ash, "3.7.6", "a0358e8467da4e2a94855542d07d7fca8e74cb6bc89c42af2181b4caa91f8415", [:mix], [{:crux, ">= 0.1.2 and < 1.0.0-0", [hex: :crux, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6003aa4dec5868e6371c3bf2efdb89507c59c05f5dbec13a13b73a92b938a258"},
"ash": {:hex, :ash, "3.8.0", "7daee81edee701bc8093e3b0fdd111edb644936db94b82b50e58efe7337f9b1f", [:mix], [{:crux, ">= 0.1.2 and < 1.0.0-0", [hex: :crux, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5ff1876a560b82bc91510f4d12dfacf0d024eeedb9cbe06e2b52fea2e9ba104c"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"credo": {:hex, :credo, "1.7.13", "126a0697df6b7b71cd18c81bc92335297839a806b6f62b61d417500d1070ff4e", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "47641e6d2bbff1e241e87695b29f617f1a8f912adea34296fb10ecc3d7e9e84f"},
"crux": {:hex, :crux, "0.1.2", "4441c9e3a34f1e340954ce96b9ad5a2de13ceb4f97b3f910211227bb92e2ca90", [:mix], [{:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: true]}], "hexpm", "563ea3748ebfba9cc078e6d198a1d6a06015a8fae503f0b721363139f0ddb350"},
Expand Down Expand Up @@ -42,7 +42,7 @@
"simple_sat": {:hex, :simple_sat, "0.1.4", "39baf72cdca14f93c0b6ce2b6418b72bbb67da98fa9ca4384e2f79bbc299899d", [:mix], [], "hexpm", "3569b68e346a5fd7154b8d14173ff8bcc829f2eb7b088c30c3f42a383443930b"},
"sobelow": {:hex, :sobelow, "0.14.1", "2f81e8632f15574cba2402bcddff5497b413c01e6f094bc0ab94e83c2f74db81", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8fac9a2bd90fdc4b15d6fca6e1608efb7f7c600fa75800813b794ee9364c87f2"},
"sourceror": {:hex, :sourceror, "1.10.0", "38397dedbbc286966ec48c7af13e228b171332be1ad731974438c77791945ce9", [:mix], [], "hexpm", "29dbdfc92e04569c9d8e6efdc422fc1d815f4bd0055dc7c51b8800fb75c4b3f1"},
"spark": {:hex, :spark, "2.3.12", "55f597df09cd38944c888f00e12f8b1f1fd94b0b4ed76a199e1d1d8251d9220a", [:mix], [{:igniter, ">= 0.3.64 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: true]}], "hexpm", "4f69b30cab6ac72e6f16e0f6b4f815d3ce3915628612f38059dcea4a25b53fe0"},
"spark": {:hex, :spark, "2.3.13", "129c2f7ecabd0c27e53daea7aa8b1de4dd068fdafb8d7d5ee49e1f2e8d87ba64", [:mix], [{:igniter, ">= 0.3.64 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: true]}], "hexpm", "d06de23b5e8961d98c9c81d798dbafb9ac64694da19b0e9ca4ba4a8a54b75e31"},
"spitfire": {:hex, :spitfire, "0.2.1", "29e154873f05444669c7453d3d931820822cbca5170e88f0f8faa1de74a79b47", [:mix], [], "hexpm", "6eeed75054a38341b2e1814d41bb0a250564092358de2669fdb57ff88141d91b"},
"splode": {:hex, :splode, "0.2.9", "3a2776e187c82f42f5226b33b1220ccbff74f4bcc523dd4039c804caaa3ffdc7", [:mix], [], "hexpm", "8002b00c6e24f8bd1bcced3fbaa5c33346048047bb7e13d2f3ad428babbd95c3"},
"stream_data": {:hex, :stream_data, "1.2.0", "58dd3f9e88afe27dc38bef26fce0c84a9e7a96772b2925c7b32cd2435697a52b", [:mix], [], "hexpm", "eb5c546ee3466920314643edf68943a5b14b32d1da9fe01698dc92b73f89a9ed"},
Expand Down
1 change: 1 addition & 0 deletions test/support/domain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ defmodule AshOban.Test.Domain do

resources do
resource AshOban.Test.Triggered
allow_unregistered? true
end
end
Loading