Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions zebra/lib/zebra/workers/db_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Zebra.Workers.DbWorker do
:state_field,
:state_value,
:machine_type_field,
:machine_os_image_field,
:machine_type_environment,
:metric_name,
:order_by,
Expand Down Expand Up @@ -45,16 +46,16 @@ defmodule Zebra.Workers.DbWorker do
Watchman.benchmark("#{worker.metric_name}.tick.duration", fn ->
if isolate_machine_types do
query_machine_types(worker)
|> Enum.each(fn machine_type -> tick_(worker, machine_type) end)
|> Enum.each(fn machine_type_tuple -> tick_(worker, machine_type_tuple) end)
else
tick_(worker)
end
end)
end

def tick_(worker, machine_type \\ nil) do
rows = query_jobs(worker, machine_type)
submit_batch_size(worker.metric_name, length(rows), machine_type)
def tick_(worker, machine_type_tuple \\ nil) do
rows = query_jobs(worker, machine_type_tuple)
submit_batch_size(worker.metric_name, length(rows), machine_type_tuple)

parallelism = worker.parallelism || 10

Expand All @@ -65,8 +66,8 @@ defmodule Zebra.Workers.DbWorker do

defp submit_batch_size(name, v, nil), do: Watchman.submit("#{name}.batch_size", v)

defp submit_batch_size(name, v, machine_type),
do: Watchman.submit({"#{name}.batch_size", [machine_type]}, v)
defp submit_batch_size(name, v, {machine_type, machine_os_image}),
do: Watchman.submit({"#{name}.batch_size", ["#{machine_type}-#{machine_os_image}"]}, v)

def process(worker, id) do
Watchman.benchmark("#{worker.metric_name}.process.duration", fn ->
Expand Down Expand Up @@ -98,14 +99,15 @@ defmodule Zebra.Workers.DbWorker do

defp query_machine_types(worker) do
machine_type_environment = worker.machine_type_environment || :all
machine_os_image_field = worker.machine_os_image_field

cond do
machine_type_environment == :all ->
Repo.all(
from(r in worker.schema,
where: field(r, ^worker.state_field) == ^worker.state_value,
distinct: r.machine_type,
select: r.machine_type
distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)],
select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)}
)
)

Expand All @@ -114,8 +116,8 @@ defmodule Zebra.Workers.DbWorker do
from(r in worker.schema,
where: field(r, ^worker.state_field) == ^worker.state_value,
where: like(field(r, ^worker.machine_type_field), @self_hosted_prefix),
distinct: r.machine_type,
select: r.machine_type
distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)],
select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)}
)
)

Expand All @@ -124,8 +126,8 @@ defmodule Zebra.Workers.DbWorker do
from(r in worker.schema,
where: field(r, ^worker.state_field) == ^worker.state_value,
where: not like(field(r, ^worker.machine_type_field), @self_hosted_prefix),
distinct: r.machine_type,
select: r.machine_type
distinct: [field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)],
select: {field(r, ^worker.machine_type_field), field(r, ^machine_os_image_field)}
)
)
end
Expand Down Expand Up @@ -175,15 +177,17 @@ defmodule Zebra.Workers.DbWorker do
end
end

defp query_jobs(worker, machine_type) do
defp query_jobs(worker, {machine_type, machine_os_image}) do
order_by = worker.order_by || :id
order_dir = worker.order_direction || :asc
records_per_tick = worker.records_per_tick || 100
machine_os_image_field = worker.machine_os_image_field

Repo.all(
from(r in worker.schema,
where: field(r, ^worker.state_field) == ^worker.state_value,
where: field(r, ^worker.machine_type_field) == ^machine_type,
where: field(r, ^machine_os_image_field) == ^machine_os_image,
order_by: [{^order_dir, ^order_by}],
select: r.id,
limit: ^records_per_tick
Expand Down
1 change: 1 addition & 0 deletions zebra/lib/zebra/workers/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Zebra.Workers.Dispatcher do
state_field: :aasm_state,
state_value: Zebra.Models.Job.state_scheduled(),
machine_type_field: :machine_type,
machine_os_image_field: :machine_os_image,
machine_type_environment: machine_type_environment(),
order_by: :scheduled_at,
order_direction: :asc,
Expand Down
91 changes: 91 additions & 0 deletions zebra/test/zebra/workers/dispatcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,97 @@ defmodule Zebra.Workers.DispatcherTest do
# a NOT_FOUND response from chmura, and stopped trying to occupy agents.
assert Zebra.Workers.DispatcherTest.Counter.value() == 10
end

test "isolates dispatching by both machine_type and os_image" do
System.put_env("DISPATCH_SELF_HOSTED_ONLY", "false")
System.put_env("DISPATCH_CLOUD_ONLY", "false")

# we need to have at least 20 for each os_image to ensure that we
# don't stop batching when we receive a NOT_FOUND response from chmura
# and stop trying to occupy agents.

ubuntu2404_jobs =
Enum.map(1..20, fn _ ->
{:ok, job} =
Support.Factories.Job.create(:scheduled, %{
machine_type: "e1-standard-2",
machine_os_image: "ubuntu2404"
})

job
end)

# Create jobs with same machine_type but different os_images
ubuntu1804_jobs =
Enum.map(1..20, fn _ ->
{:ok, job} =
Support.Factories.Job.create(:scheduled, %{
machine_type: "e1-standard-2",
machine_os_image: "ubuntu1804"
})

job
end)

ubuntu2004_jobs =
Enum.map(1..20, fn _ ->
{:ok, job} =
Support.Factories.Job.create(:scheduled, %{
machine_type: "e1-standard-2",
machine_os_image: "ubuntu2004"
})

job
end)

# Track which os_images were requested
agent_requests = Agent.start_link(fn -> [] end)

GrpcMock.stub(Support.FakeServers.ChmuraApi, :occupy_agent, fn req, _ ->
Agent.update(elem(agent_requests, 1), fn list ->
[req.machine.os_image | list]
end)

if req.machine.os_image == "ubuntu2404" do
raise GRPC.RPCError, status: GRPC.Status.not_found(), message: "No suitable agent found"
else
%InternalApi.Chmura.OccupyAgentResponse{
agent: %InternalApi.Chmura.Agent{
id: Ecto.UUID.generate(),
ip_address: "1.2.3.4",
ssh_port: 80,
ctrl_port: 80,
auth_token: "asdas"
}
}
end
end)

with_stubbed_http_calls(fn ->
Worker.init() |> Zebra.Workers.DbWorker.tick()
end)

# ubuntu1804 and ubuntu2004 jobs should be started
(ubuntu1804_jobs ++ ubuntu2004_jobs)
|> Enum.each(fn job ->
job = Job.reload(job)
assert Job.started?(job) == true
end)

# ubuntu2404 jobs should remain scheduled (no agents available)
ubuntu2404_jobs
|> Enum.each(fn job ->
job = Job.reload(job)
assert Job.scheduled?(job) == true
end)

# Verify that requests were made with the correct os_images
requested_os_images = Agent.get(elem(agent_requests, 1), & &1)
assert length(requested_os_images) == 50
assert Enum.count(requested_os_images, &(&1 == "ubuntu1804")) == 20
assert Enum.count(requested_os_images, &(&1 == "ubuntu2004")) == 20
assert Enum.count(requested_os_images, &(&1 == "ubuntu2404")) == 10 # only one batch requested
end
end

describe ".process" do
Expand Down