diff --git a/zebra/lib/zebra/workers/db_worker.ex b/zebra/lib/zebra/workers/db_worker.ex index f1060dc90..b441cb912 100644 --- a/zebra/lib/zebra/workers/db_worker.ex +++ b/zebra/lib/zebra/workers/db_worker.ex @@ -9,6 +9,7 @@ defmodule Zebra.Workers.DbWorker do :state_field, :state_value, :machine_type_field, + :machine_os_image_field, :machine_type_environment, :metric_name, :order_by, @@ -45,16 +46,16 @@ defmodule Zebra.Workers.DbWorker do Watchman.benchmark("#{worker.metric_name}.tick.duration", fn -> if isolate_machine_types do query_machine_types(worker) - |> Enum.each(fn machine_type -> tick_(worker, machine_type) end) + |> Enum.each(fn machine_type_tuple -> tick_(worker, machine_type_tuple) end) else tick_(worker) end end) end - def tick_(worker, machine_type \\ nil) do - rows = query_jobs(worker, machine_type) - submit_batch_size(worker.metric_name, length(rows), machine_type) + def tick_(worker, machine_type_tuple \\ nil) do + rows = query_jobs(worker, machine_type_tuple) + submit_batch_size(worker.metric_name, length(rows), machine_type_tuple) parallelism = worker.parallelism || 10 @@ -65,8 +66,8 @@ defmodule Zebra.Workers.DbWorker do defp submit_batch_size(name, v, nil), do: Watchman.submit("#{name}.batch_size", v) - defp submit_batch_size(name, v, machine_type), - do: Watchman.submit({"#{name}.batch_size", [machine_type]}, v) + defp submit_batch_size(name, v, {machine_type, machine_os_image}), + do: Watchman.submit({"#{name}.batch_size", ["#{machine_type}-#{machine_os_image}"]}, v) def process(worker, id) do Watchman.benchmark("#{worker.metric_name}.process.duration", fn -> @@ -98,14 +99,15 @@ defmodule Zebra.Workers.DbWorker do defp query_machine_types(worker) do machine_type_environment = worker.machine_type_environment || :all + machine_os_image_field = worker.machine_os_image_field cond do machine_type_environment == :all -> Repo.all( from(r in worker.schema, where: field(r, ^worker.state_field) == ^worker.state_value, - distinct: r.machine_type, - select: r.machine_type + distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)], + select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)} ) ) @@ -114,8 +116,8 @@ defmodule Zebra.Workers.DbWorker do from(r in worker.schema, where: field(r, ^worker.state_field) == ^worker.state_value, where: like(field(r, ^worker.machine_type_field), @self_hosted_prefix), - distinct: r.machine_type, - select: r.machine_type + distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)], + select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)} ) ) @@ -124,8 +126,8 @@ defmodule Zebra.Workers.DbWorker do from(r in worker.schema, where: field(r, ^worker.state_field) == ^worker.state_value, where: not like(field(r, ^worker.machine_type_field), @self_hosted_prefix), - distinct: r.machine_type, - select: r.machine_type + distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)], + select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)} ) ) end @@ -175,15 +177,17 @@ defmodule Zebra.Workers.DbWorker do end end - defp query_jobs(worker, machine_type) do + defp query_jobs(worker, {machine_type, machine_os_image}) do order_by = worker.order_by || :id order_dir = worker.order_direction || :asc records_per_tick = worker.records_per_tick || 100 + machine_os_image_field = worker.machine_os_image_field Repo.all( from(r in worker.schema, where: field(r, ^worker.state_field) == ^worker.state_value, where: field(r, ^worker.machine_type_field) == ^machine_type, + where: field(r, ^machine_os_image_field) == ^machine_os_image, order_by: [{^order_dir, ^order_by}], select: r.id, limit: ^records_per_tick diff --git a/zebra/lib/zebra/workers/dispatcher.ex b/zebra/lib/zebra/workers/dispatcher.ex index 87f22dbc2..f471c9f32 100644 --- a/zebra/lib/zebra/workers/dispatcher.ex +++ b/zebra/lib/zebra/workers/dispatcher.ex @@ -14,6 +14,7 @@ defmodule Zebra.Workers.Dispatcher do state_field: :aasm_state, state_value: Zebra.Models.Job.state_scheduled(), machine_type_field: :machine_type, + machine_os_image_field: :machine_os_image, machine_type_environment: machine_type_environment(), order_by: :scheduled_at, order_direction: :asc, diff --git a/zebra/test/zebra/workers/dispatcher_test.exs b/zebra/test/zebra/workers/dispatcher_test.exs index cc1ef9b01..eb4d816bb 100644 --- a/zebra/test/zebra/workers/dispatcher_test.exs +++ b/zebra/test/zebra/workers/dispatcher_test.exs @@ -231,6 +231,97 @@ defmodule Zebra.Workers.DispatcherTest do # a NOT_FOUND response from chmura, and stopped trying to occupy agents. assert Zebra.Workers.DispatcherTest.Counter.value() == 10 end + + test "isolates dispatching by both machine_type and os_image" do + System.put_env("DISPATCH_SELF_HOSTED_ONLY", "false") + System.put_env("DISPATCH_CLOUD_ONLY", "false") + + # we need to have at least 20 for each os_image to ensure that we + # don't stop batching when we receive a NOT_FOUND response from chmura + # and stop trying to occupy agents. + + ubuntu2404_jobs = + Enum.map(1..20, fn _ -> + {:ok, job} = + Support.Factories.Job.create(:scheduled, %{ + machine_type: "e1-standard-2", + machine_os_image: "ubuntu2404" + }) + + job + end) + + # Create jobs with same machine_type but different os_images + ubuntu1804_jobs = + Enum.map(1..20, fn _ -> + {:ok, job} = + Support.Factories.Job.create(:scheduled, %{ + machine_type: "e1-standard-2", + machine_os_image: "ubuntu1804" + }) + + job + end) + + ubuntu2004_jobs = + Enum.map(1..20, fn _ -> + {:ok, job} = + Support.Factories.Job.create(:scheduled, %{ + machine_type: "e1-standard-2", + machine_os_image: "ubuntu2004" + }) + + job + end) + + # Track which os_images were requested + agent_requests = Agent.start_link(fn -> [] end) + + GrpcMock.stub(Support.FakeServers.ChmuraApi, :occupy_agent, fn req, _ -> + Agent.update(elem(agent_requests, 1), fn list -> + [req.machine.os_image | list] + end) + + if req.machine.os_image == "ubuntu2404" do + raise GRPC.RPCError, status: GRPC.Status.not_found(), message: "No suitable agent found" + else + %InternalApi.Chmura.OccupyAgentResponse{ + agent: %InternalApi.Chmura.Agent{ + id: Ecto.UUID.generate(), + ip_address: "1.2.3.4", + ssh_port: 80, + ctrl_port: 80, + auth_token: "asdas" + } + } + end + end) + + with_stubbed_http_calls(fn -> + Worker.init() |> Zebra.Workers.DbWorker.tick() + end) + + # ubuntu1804 and ubuntu2004 jobs should be started + (ubuntu1804_jobs ++ ubuntu2004_jobs) + |> Enum.each(fn job -> + job = Job.reload(job) + assert Job.started?(job) == true + end) + + # ubuntu2404 jobs should remain scheduled (no agents available) + ubuntu2404_jobs + |> Enum.each(fn job -> + job = Job.reload(job) + assert Job.scheduled?(job) == true + end) + + # Verify that requests were made with the correct os_images + requested_os_images = Agent.get(elem(agent_requests, 1), & &1) + assert length(requested_os_images) == 50 + assert Enum.count(requested_os_images, &(&1 == "ubuntu1804")) == 20 + assert Enum.count(requested_os_images, &(&1 == "ubuntu2004")) == 20 + assert Enum.count(requested_os_images, &(&1 == "ubuntu2404")) == 10 # only one batch requested + end end describe ".process" do