diff --git a/.formatter.exs b/.formatter.exs index 29cab08..22723e0 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -38,6 +38,7 @@ spark_locals_without_parens = [ trigger: 1, trigger: 2, trigger_once?: 1, + use_tenant_from_record?: 1, where: 1, worker_module_name: 1, worker_opts: 1, diff --git a/documentation/dsls/DSL-AshOban.md b/documentation/dsls/DSL-AshOban.md index 8a2f97f..adc6a2c 100644 --- a/documentation/dsls/DSL-AshOban.md +++ b/documentation/dsls/DSL-AshOban.md @@ -58,6 +58,7 @@ end | [`domain`](#oban-domain){: #oban-domain } | `module` | | The Domain to use when calling actions on this resource. Defaults to the resource's domain. | | [`list_tenants`](#oban-list_tenants){: #oban-list_tenants } | `list(any) \| (-> any) \| module` | `[nil]` | A list of tenants or a function behaviour that returns a list of tenants a trigger should be run for. Can be overwritten on the trigger level. | | [`shared_context?`](#oban-shared_context?){: #oban-shared_context? } | `boolean` | `false` | If set to `true`, the `ash_oban?: true` flag will be placed in shared context instead of regular context. Shared context propagates to related actions called via `manage_relationship` and can be passed to other action invocations using the context as scope, making it easier to detect AshOban execution in nested actions. Can be overridden per trigger or scheduled action. | +| [`use_tenant_from_record?`](#oban-use_tenant_from_record?){: #oban-use_tenant_from_record? } | `boolean` | `false` | Default value for `use_tenant_from_record?` for all triggers in this resource. When set to `true`, tenants will be extracted from each record's tenant attribute and used when workers process those records. This allows schedulers to use multitenancy `:allow_global` read actions to find records across all tenants, while worker actions still run with the correct tenant context for each record. Can be overridden per trigger. | ### oban.triggers @@ -144,6 +145,7 @@ end | [`on_error`](#oban-triggers-trigger-on_error){: #oban-triggers-trigger-on_error } | `atom` | | An update action to call after the last attempt has failed. See the getting started guide for more. | | [`on_error_fails_job?`](#oban-triggers-trigger-on_error_fails_job?){: #oban-triggers-trigger-on_error_fails_job? } | `boolean` | `false` | Determines if the oban job will be failed on the last attempt when there is an on_error handler that is called. If there is no on_error, then the action is always marked as failed on the last attempt. | | [`shared_context?`](#oban-triggers-trigger-shared_context?){: #oban-triggers-trigger-shared_context? } | `boolean` | | If set to `true`, the `ash_oban?: true` flag will be placed in shared context instead of regular context. Shared context propagates to related actions called via `manage_relationship` and can be passed to other action invocations using the context as scope, making it easier to detect AshOban execution in nested actions. If not specified, inherits the global `shared_context?` setting from the `oban` section. | +| [`use_tenant_from_record?`](#oban-triggers-trigger-use_tenant_from_record?){: #oban-triggers-trigger-use_tenant_from_record? } | `boolean` | `false` | If set to `true`, the tenant will be extracted from each record's tenant attribute and used when the worker processes that record. This allows the scheduler to use a multitenancy `:allow_global` read action to find records across all tenants, while the worker action still runs with the correct tenant context for each individual record. If not specified, inherits the global `use_tenant_from_record?` setting from the `oban` section. | | [`worker_opts`](#oban-triggers-trigger-worker_opts){: #oban-triggers-trigger-worker_opts } | `keyword` | `[]` | Options to set on the worker. ATTENTION: this may overwrite options set by ash_oban, make sure you know what you are doing. See [Oban.Worker](https://hexdocs.pm/oban/Oban.Worker.html#module-defining-workers) for options and [Oban.Pro.Worker](https://oban.pro/docs/pro/Oban.Pro.Worker.html) for oban pro | | [`backoff`](#oban-triggers-trigger-backoff){: #oban-triggers-trigger-backoff } | `pos_integer \| (any -> any) \| :exponential` | `:exponential` | Configure after how much time job should (in seconds) be retried in case of error if more retries available. Can be a number of seconds or a function that takes the job and returns a number of seconds. Will not be executed if default max_attempts value of 1 will be used. See [Oban.Worker](https://hexdocs.pm/oban/Oban.Worker.html#module-customizing-backoff) for more about backoff. backoff 10 backoff fn _job -> 10 end backoff fn %Oban.Job{attempt: attempt} -> 10 * attempt end backoff fn %Oban.Job{attempt: attempt, unsaved_error: unsaved_error} -> %{kind: _, reason: reason, stacktrace: _} = unsaved_error case reason do %MyApp.ApiError{status: 429} -> 300 _ -> trunc(:math.pow(attempt, 4)) end end | | [`timeout`](#oban-triggers-trigger-timeout){: #oban-triggers-trigger-timeout } | `pos_integer \| (any -> any) \| :infinity` | `:infinity` | Configure timeout for the job in milliseconds. See [Oban.Worker timeout](https://hexdocs.pm/oban/Oban.Worker.html#module-customizing-timeout) for more about timeout. timeout 30_000 timeout fn _job -> :timer.seconds(30) end | diff --git a/lib/ash_oban.ex b/lib/ash_oban.ex index 125164d..59f5994 100644 --- a/lib/ash_oban.ex +++ b/lib/ash_oban.ex @@ -45,7 +45,8 @@ defmodule AshOban do __identifier__: atom, on_error: atom, on_error_fails_job?: boolean(), - shared_context?: boolean() + shared_context?: boolean(), + use_tenant_from_record?: boolean() } defstruct [ @@ -86,6 +87,7 @@ defmodule AshOban do :log_final_error?, :log_errors?, :shared_context?, + :use_tenant_from_record?, :__identifier__, :__spark_metadata__ ] @@ -312,6 +314,18 @@ defmodule AshOban do If not specified, inherits the global `shared_context?` setting from the `oban` section. """ ], + use_tenant_from_record?: [ + type: :boolean, + default: false, + doc: """ + If set to `true`, the tenant will be extracted from each record's tenant attribute + and used when the worker processes that record. This allows the scheduler to use a + multitenancy `:allow_global` read action to find records across all tenants, while + the worker action still runs with the correct tenant context for each individual record. + + If not specified, inherits the global `use_tenant_from_record?` setting from the `oban` section. + """ + ], worker_opts: [ type: :keyword_list, default: [], @@ -551,6 +565,19 @@ defmodule AshOban do action invocations using the context as scope, making it easier to detect AshOban execution in nested actions. Can be overridden per trigger or scheduled action. """ + ], + use_tenant_from_record?: [ + type: :boolean, + default: false, + doc: """ + Default value for `use_tenant_from_record?` for all triggers in this resource. + When set to `true`, tenants will be extracted from each record's tenant attribute + and used when workers process those records. This allows schedulers to use + multitenancy `:allow_global` read actions to find records across all tenants, + while worker actions still run with the correct tenant context for each record. + + Can be overridden per trigger. + """ ] ], sections: [@triggers, @scheduled_actions] @@ -584,7 +611,8 @@ defmodule AshOban do sections: @sections, imports: [AshOban.Changes.BuiltinChanges], verifiers: [ - AshOban.Verifiers.VerifyModuleNames + AshOban.Verifiers.VerifyModuleNames, + AshOban.Verifiers.VerifyUseTenantFromRecord ], transformers: [ AshOban.Transformers.SetDefaults, @@ -751,6 +779,29 @@ defmodule AshOban do primary_key = Ash.Resource.Info.primary_key(resource) + tenant = + if opts[:tenant] do + opts[:tenant] + else + if trigger.use_tenant_from_record? do + tenant_attribute = Ash.Resource.Info.multitenancy_attribute(resource) + + if tenant_attribute do + case Map.get(record, tenant_attribute) do + %Ash.NotLoaded{} -> + nil + + %Ash.ForbiddenField{} -> + nil + + tenant -> + {m, f, a} = Ash.Resource.Info.multitenancy_tenant_from_attribute(resource) + apply(m, f, [tenant | a]) + end + end + end + end + metadata = case trigger do %{read_metadata: read_metadata} when is_function(read_metadata) -> @@ -776,7 +827,7 @@ defmodule AshOban do primary_key: validate_primary_key(Map.take(record, primary_key), resource), metadata: metadata, action_arguments: opts[:action_arguments] || %{}, - tenant: opts[:tenant] + tenant: tenant } |> AshOban.store_actor(opts[:actor], trigger.actor_persister) |> then(&Map.merge(extra_args, &1)) diff --git a/lib/transformers/define_schedulers.ex b/lib/transformers/define_schedulers.ex index 04a95d4..e90d258 100644 --- a/lib/transformers/define_schedulers.ex +++ b/lib/transformers/define_schedulers.ex @@ -110,6 +110,15 @@ defmodule AshOban.Transformers.DefineSchedulers do quote do resource |> Ash.Query.select(unquote(primary_key)) + |> then(fn query -> + tenant_attribute = Ash.Resource.Info.multitenancy_attribute(unquote(resource)) + + if tenant_attribute do + Ash.Query.ensure_selected(query, tenant_attribute) + else + query + end + end) |> limit_stream() end diff --git a/lib/transformers/set_defaults.ex b/lib/transformers/set_defaults.ex index 17cac1d..9445d0b 100644 --- a/lib/transformers/set_defaults.ex +++ b/lib/transformers/set_defaults.ex @@ -178,7 +178,10 @@ defmodule AshOban.Transformers.SetDefaults do scheduler_queue: trigger.scheduler_queue || queue, action: trigger.action || trigger.name, shared_context?: - trigger.shared_context? || AshOban.Info.oban_shared_context?(dsl) || false + trigger.shared_context? || AshOban.Info.oban_shared_context?(dsl) || false, + use_tenant_from_record?: + trigger.use_tenant_from_record? || AshOban.Info.oban_use_tenant_from_record?(dsl) || + false }) end) end diff --git a/lib/verifiers/verify_use_tenant_from_record.ex b/lib/verifiers/verify_use_tenant_from_record.ex new file mode 100644 index 0000000..f16e070 --- /dev/null +++ b/lib/verifiers/verify_use_tenant_from_record.ex @@ -0,0 +1,53 @@ +# SPDX-FileCopyrightText: 2023 ash_oban contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshOban.Verifiers.VerifyUseTenantFromRecord do + @moduledoc """ + Verifies that when `use_tenant_from_record?` is set to true, the multitenancy + `parse_attribute` and `tenant_from_attribute` options are either both at their + defaults or both customized. + """ + use Spark.Dsl.Verifier + + def verify(dsl_state) do + module = Spark.Dsl.Verifier.get_persisted(dsl_state, :module) + + dsl_state + |> AshOban.Info.oban_triggers() + |> Enum.filter(fn trigger -> trigger.use_tenant_from_record? end) + |> Enum.reduce_while(:ok, fn trigger, acc -> + parse_attribute = Ash.Resource.Info.multitenancy_parse_attribute(module) + tenant_from_attribute = Ash.Resource.Info.multitenancy_tenant_from_attribute(module) + + parse_at_default? = identity_mfa?(parse_attribute) + tenant_from_at_default? = identity_mfa?(tenant_from_attribute) + + if parse_at_default? != tenant_from_at_default? do + {:halt, + {:error, + Spark.Error.DslError.exception( + module: module, + path: [:oban, :triggers, trigger.name], + message: """ + When `use_tenant_from_record?` is true, the multitenancy options + `parse_attribute` and `tenant_from_attribute` must either both be + at their default values or both be customized. + + Currently: + - parse_attribute: #{inspect(parse_attribute)} #{if parse_at_default?, do: "(default)", else: "(customized)"} + - tenant_from_attribute: #{inspect(tenant_from_attribute)} #{if tenant_from_at_default?, do: "(default)", else: "(customized)"} + + These options are inverses of each other, so if you customize one, + you must customize the other to maintain consistency. + """ + )}} + else + {:cont, acc} + end + end) + end + + defp identity_mfa?({Ash.Resource.Dsl, :identity, []}), do: true + defp identity_mfa?(_), do: false +end diff --git a/mix.exs b/mix.exs index 78a1bf9..5f2edd0 100644 --- a/mix.exs +++ b/mix.exs @@ -132,7 +132,7 @@ defmodule AshOban.MixProject do oban_dep ++ [ - {:ash, ash_version("~> 3.0")}, + {:ash, ash_version("~> 3.0 and >=3.8.0")}, {:oban, "~> 2.15"}, {:postgrex, "~> 0.18"}, # dev/test dependencies diff --git a/mix.lock b/mix.lock index acdf817..6e981d1 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,5 @@ %{ - "ash": {:hex, :ash, "3.7.6", "a0358e8467da4e2a94855542d07d7fca8e74cb6bc89c42af2181b4caa91f8415", [:mix], [{:crux, ">= 0.1.2 and < 1.0.0-0", [hex: :crux, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6003aa4dec5868e6371c3bf2efdb89507c59c05f5dbec13a13b73a92b938a258"}, + "ash": {:hex, :ash, "3.8.0", "7daee81edee701bc8093e3b0fdd111edb644936db94b82b50e58efe7337f9b1f", [:mix], [{:crux, ">= 0.1.2 and < 1.0.0-0", [hex: :crux, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.6.29 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.11", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, ">= 0.2.6 and < 1.0.0-0", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5ff1876a560b82bc91510f4d12dfacf0d024eeedb9cbe06e2b52fea2e9ba104c"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "credo": {:hex, :credo, "1.7.13", "126a0697df6b7b71cd18c81bc92335297839a806b6f62b61d417500d1070ff4e", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "47641e6d2bbff1e241e87695b29f617f1a8f912adea34296fb10ecc3d7e9e84f"}, "crux": {:hex, :crux, "0.1.2", "4441c9e3a34f1e340954ce96b9ad5a2de13ceb4f97b3f910211227bb92e2ca90", [:mix], [{:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: true]}], "hexpm", "563ea3748ebfba9cc078e6d198a1d6a06015a8fae503f0b721363139f0ddb350"}, @@ -42,7 +42,7 @@ "simple_sat": {:hex, :simple_sat, "0.1.4", "39baf72cdca14f93c0b6ce2b6418b72bbb67da98fa9ca4384e2f79bbc299899d", [:mix], [], "hexpm", "3569b68e346a5fd7154b8d14173ff8bcc829f2eb7b088c30c3f42a383443930b"}, "sobelow": {:hex, :sobelow, "0.14.1", "2f81e8632f15574cba2402bcddff5497b413c01e6f094bc0ab94e83c2f74db81", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8fac9a2bd90fdc4b15d6fca6e1608efb7f7c600fa75800813b794ee9364c87f2"}, "sourceror": {:hex, :sourceror, "1.10.0", "38397dedbbc286966ec48c7af13e228b171332be1ad731974438c77791945ce9", [:mix], [], "hexpm", "29dbdfc92e04569c9d8e6efdc422fc1d815f4bd0055dc7c51b8800fb75c4b3f1"}, - "spark": {:hex, :spark, "2.3.12", "55f597df09cd38944c888f00e12f8b1f1fd94b0b4ed76a199e1d1d8251d9220a", [:mix], [{:igniter, ">= 0.3.64 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: true]}], "hexpm", "4f69b30cab6ac72e6f16e0f6b4f815d3ce3915628612f38059dcea4a25b53fe0"}, + "spark": {:hex, :spark, "2.3.13", "129c2f7ecabd0c27e53daea7aa8b1de4dd068fdafb8d7d5ee49e1f2e8d87ba64", [:mix], [{:igniter, ">= 0.3.64 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: true]}], "hexpm", "d06de23b5e8961d98c9c81d798dbafb9ac64694da19b0e9ca4ba4a8a54b75e31"}, "spitfire": {:hex, :spitfire, "0.2.1", "29e154873f05444669c7453d3d931820822cbca5170e88f0f8faa1de74a79b47", [:mix], [], "hexpm", "6eeed75054a38341b2e1814d41bb0a250564092358de2669fdb57ff88141d91b"}, "splode": {:hex, :splode, "0.2.9", "3a2776e187c82f42f5226b33b1220ccbff74f4bcc523dd4039c804caaa3ffdc7", [:mix], [], "hexpm", "8002b00c6e24f8bd1bcced3fbaa5c33346048047bb7e13d2f3ad428babbd95c3"}, "stream_data": {:hex, :stream_data, "1.2.0", "58dd3f9e88afe27dc38bef26fce0c84a9e7a96772b2925c7b32cd2435697a52b", [:mix], [], "hexpm", "eb5c546ee3466920314643edf68943a5b14b32d1da9fe01698dc92b73f89a9ed"}, diff --git a/test/support/domain.ex b/test/support/domain.ex index cd7c48b..9b3eb7c 100644 --- a/test/support/domain.ex +++ b/test/support/domain.ex @@ -9,5 +9,6 @@ defmodule AshOban.Test.Domain do resources do resource AshOban.Test.Triggered + allow_unregistered? true end end diff --git a/test/tenant_extraction_test.exs b/test/tenant_extraction_test.exs new file mode 100644 index 0000000..db30a2c --- /dev/null +++ b/test/tenant_extraction_test.exs @@ -0,0 +1,240 @@ +# SPDX-FileCopyrightText: 2023 ash_oban contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshOban.TenantExtractionTest.TestResource do + @moduledoc false + use Ash.Resource, + domain: AshOban.TenantExtractionTest.TestDomain, + data_layer: Ash.DataLayer.Ets, + extensions: [AshOban] + + multitenancy do + strategy :attribute + attribute :tenant_id + end + + oban do + triggers do + trigger :process_with_tenant do + action :process + where expr(processed != true) + read_action :read_global + worker_read_action :read_with_tenant + queue :default + use_tenant_from_record? true + + worker_module_name __MODULE__.AshOban.Worker.ProcessWithTenant + + scheduler_module_name __MODULE__.AshOban.Scheduler.ProcessWithTenant + end + end + end + + actions do + default_accept [] + defaults [:read, :destroy] + + create :create do + accept [:tenant_id] + end + + read :read_global do + multitenancy :allow_global + pagination keyset?: true + end + + read :read_with_tenant do + pagination keyset?: true + end + + update :process do + require_atomic? false + accept [] + change set_attribute(:processed, true) + end + end + + ets do + private? true + end + + attributes do + uuid_primary_key :id + attribute :processed, :boolean, default: false, allow_nil?: false + attribute :tenant_id, :integer, allow_nil?: false, default: 1 + timestamps() + end +end + +defmodule AshOban.TenantExtractionTest.TestResourceWithCustomTenantFunctions do + @moduledoc false + use Ash.Resource, + domain: AshOban.TenantExtractionTest.TestDomain, + data_layer: Ash.DataLayer.Ets, + extensions: [AshOban] + + multitenancy do + strategy :attribute + attribute :tenant_id + parse_attribute({__MODULE__, :parse_tenant, []}) + tenant_from_attribute({__MODULE__, :format_tenant, []}) + end + + oban do + triggers do + trigger :process_with_custom_tenant do + action :process + where expr(processed != true) + read_action :read_global + worker_read_action :read_with_tenant + queue :custom + use_tenant_from_record? true + + worker_module_name __MODULE__.AshOban.Worker.ProcessWithCustomTenant + + scheduler_module_name __MODULE__.AshOban.Scheduler.ProcessWithCustomTenant + end + end + end + + actions do + default_accept [] + defaults [:read, :destroy] + + create :create do + accept [:tenant_id] + end + + read :read_global do + multitenancy :allow_global + pagination keyset?: true + end + + read :read_with_tenant do + pagination keyset?: true + end + + update :process do + require_atomic? false + accept [] + change set_attribute(:processed, true) + end + end + + ets do + private? true + end + + attributes do + uuid_primary_key :id + attribute :processed, :boolean, default: false, allow_nil?: false + attribute :tenant_id, :integer, allow_nil?: false, default: 1 + timestamps() + end + + def parse_tenant("org_" <> id_string), do: String.to_integer(id_string) + def parse_tenant(value) when is_integer(value), do: value + + def format_tenant(id) when is_integer(id), do: "org_#{id}" + def format_tenant("org_" <> _ = value), do: value +end + +defmodule AshOban.TenantExtractionTest.TestDomain do + @moduledoc false + use Ash.Domain, + validate_config_inclusion?: false + + resources do + resource AshOban.TenantExtractionTest.TestResource + resource AshOban.TenantExtractionTest.TestResourceWithCustomTenantFunctions + end +end + +defmodule AshOban.TenantExtractionTest do + use ExUnit.Case, async: false + + use Oban.Testing, repo: AshOban.Test.Repo, prefix: "private" + + require Ash.Query + + alias AshOban.TenantExtractionTest.TestDomain + alias AshOban.TenantExtractionTest.TestResource + + setup_all do + AshOban.Test.Repo.start_link() + + oban_config = + Application.get_env(:ash_oban, :oban) + |> Keyword.update(:queues, [default: 10, custom: 10], fn queues -> + Keyword.put(queues, :custom, 10) + end) + + Oban.start_link(AshOban.config([TestDomain], oban_config)) + :ok + end + + setup do + on_exit(fn -> + AshOban.Test.Repo.delete_all(Oban.Job, prefix: "private") + end) + end + + describe "tenant extraction from records" do + test "scheduler reads globally, worker uses extracted tenant from each record" do + record1 = + TestResource + |> Ash.Changeset.for_create(:create, %{}, tenant: 1) + |> Ash.create!() + + record2 = + TestResource + |> Ash.Changeset.for_create(:create, %{}, tenant: 2) + |> Ash.create!() + + record3 = + TestResource + |> Ash.Changeset.for_create(:create, %{}, tenant: 3) + |> Ash.create!() + + assert %{success: 4} = + AshOban.Test.schedule_and_run_triggers({TestResource, :process_with_tenant}) + + assert Ash.reload!(record1).processed + assert Ash.reload!(record2).processed + assert Ash.reload!(record3).processed + end + + test "tenant extraction works with custom parse_attribute and tenant_from_attribute functions" do + alias AshOban.TenantExtractionTest.TestResourceWithCustomTenantFunctions + + record1 = + TestResourceWithCustomTenantFunctions + |> Ash.Changeset.for_create(:create, %{}, tenant: "org_100") + |> Ash.create!() + + record2 = + TestResourceWithCustomTenantFunctions + |> Ash.Changeset.for_create(:create, %{}, tenant: "org_200") + |> Ash.create!() + + record3 = + TestResourceWithCustomTenantFunctions + |> Ash.Changeset.for_create(:create, %{}, tenant: "org_300") + |> Ash.create!() + + assert record1.tenant_id == 100 + assert record2.tenant_id == 200 + assert record3.tenant_id == 300 + + assert %{success: 4} = + AshOban.Test.schedule_and_run_triggers( + {TestResourceWithCustomTenantFunctions, :process_with_custom_tenant} + ) + + assert Ash.reload!(record1, tenant: "org_100").processed + assert Ash.reload!(record2, tenant: "org_200").processed + assert Ash.reload!(record3, tenant: "org_300").processed + end + end +end diff --git a/test/verifiers/verify_use_tenant_from_record_test.exs b/test/verifiers/verify_use_tenant_from_record_test.exs new file mode 100644 index 0000000..56c0864 --- /dev/null +++ b/test/verifiers/verify_use_tenant_from_record_test.exs @@ -0,0 +1,256 @@ +# SPDX-FileCopyrightText: 2023 ash_oban contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshOban.Verifiers.VerifyUseTenantFromRecordTest do + use ExUnit.Case, async: true + import ExUnit.CaptureIO + + test "no errors if both parse_attribute and tenant_from_attribute use defaults" do + output = + capture_io(:stderr, fn -> + defmodule DefaultResource do + @moduledoc false + use Ash.Resource, + domain: AshOban.Test.Domain, + data_layer: Ash.DataLayer.Ets, + extensions: [AshOban] + + multitenancy do + strategy(:attribute) + attribute(:tenant_id) + end + + oban do + triggers do + trigger :test_trigger do + action :test_action + use_tenant_from_record? true + scheduler_cron false + + worker_module_name __MODULE__.TestTrigger.Worker + end + end + end + + actions do + defaults([:read]) + + update :test_action do + accept([]) + end + end + + attributes do + uuid_primary_key(:id) + attribute(:tenant_id, :string, allow_nil?: false) + end + + def custom_parse(tenant), do: tenant + end + end) + + assert output == "" + end + + test "raises error when parse_attribute is customized but tenant_from_attribute is at default" do + output = + capture_io(:stderr, fn -> + defmodule ParseCustomizedOnlyResource do + @moduledoc false + use Ash.Resource, + domain: AshOban.Test.Domain, + data_layer: Ash.DataLayer.Ets, + extensions: [AshOban] + + multitenancy do + strategy(:attribute) + attribute(:tenant_id) + parse_attribute({__MODULE__, :custom_parse, []}) + end + + oban do + triggers do + trigger :test_trigger do + action :test_action + use_tenant_from_record? true + scheduler_cron false + + worker_module_name __MODULE__.TestTrigger.Worker + end + end + end + + actions do + defaults([:read]) + + update :test_action do + accept([]) + end + end + + attributes do + uuid_primary_key(:id) + attribute(:tenant_id, :string, allow_nil?: false) + end + + def custom_parse(tenant), do: tenant + end + end) + + assert output =~ "When `use_tenant_from_record?` is true" + assert output =~ ~r/parse_attribute:.*\(customized\)/ + assert output =~ ~r/tenant_from_attribute:.*\(default\)/ + assert output =~ "These options are inverses of each other" + end + + test "raises error when tenant_from_attribute is customized but parse_attribute is at default" do + output = + capture_io(:stderr, fn -> + defmodule TenantFromCustomizedOnlyResource do + @moduledoc false + use Ash.Resource, + domain: AshOban.Test.Domain, + data_layer: Ash.DataLayer.Ets, + extensions: [AshOban] + + multitenancy do + strategy(:attribute) + attribute(:tenant_id) + tenant_from_attribute({__MODULE__, :custom_tenant_from, []}) + end + + oban do + triggers do + trigger :test_trigger do + action :test_action + use_tenant_from_record? true + scheduler_cron false + + worker_module_name __MODULE__.TestTrigger.Worker + end + end + end + + actions do + defaults([:read]) + + update :test_action do + accept([]) + end + end + + attributes do + uuid_primary_key(:id) + attribute(:tenant_id, :string, allow_nil?: false) + end + + def custom_tenant_from(tenant), do: tenant + end + end) + + assert output =~ "When `use_tenant_from_record?` is true" + assert output =~ ~r/parse_attribute:.*\(default\)/ + assert output =~ ~r/tenant_from_attribute:.*\(customized\)/ + assert output =~ "These options are inverses of each other" + end + + test "no errors if both parse_attribute and tenant_from_attribute are customized" do + output = + capture_io(:stderr, fn -> + defmodule BothCustomizedResource do + @moduledoc false + use Ash.Resource, + domain: AshOban.Test.Domain, + data_layer: Ash.DataLayer.Ets, + extensions: [AshOban] + + multitenancy do + strategy(:attribute) + attribute(:tenant_id) + parse_attribute({__MODULE__, :custom_parse, []}) + tenant_from_attribute({__MODULE__, :custom_tenant_from, []}) + end + + oban do + triggers do + trigger :test_trigger do + action :test_action + use_tenant_from_record? true + scheduler_cron false + + worker_module_name __MODULE__.TestTrigger.Worker + end + end + end + + actions do + defaults([:read]) + + update :test_action do + accept([]) + end + end + + attributes do + uuid_primary_key(:id) + attribute(:tenant_id, :string, allow_nil?: false) + end + + def custom_parse(tenant), do: tenant + def custom_tenant_from(tenant), do: tenant + end + end) + + assert output == "" + end + + test "no errors when use_tenant_from_record? is false regardless of configuration" do + output = + capture_io(:stderr, fn -> + defmodule UseTenantFromRecordFalseResource do + @moduledoc false + use Ash.Resource, + domain: AshOban.Test.Domain, + data_layer: Ash.DataLayer.Ets, + extensions: [AshOban] + + multitenancy do + strategy(:attribute) + attribute(:tenant_id) + parse_attribute({__MODULE__, :custom_parse, []}) + # tenant_from_attribute left at default + end + + oban do + triggers do + trigger :test_trigger do + action :test_action + use_tenant_from_record? false + scheduler_cron false + + worker_module_name __MODULE__.TestTrigger.Worker + end + end + end + + actions do + defaults([:read]) + + update :test_action do + accept([]) + end + end + + attributes do + uuid_primary_key(:id) + attribute(:tenant_id, :string, allow_nil?: false) + end + + def custom_parse(tenant), do: tenant + end + end) + + assert output == "" + end +end