Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run one deployment orchestrator per deployment per cluster #1865

Open
wants to merge 74 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
c4ea461
Run one deployment orchestrator per deployment per cluster
joshk Jan 31, 2025
d626d08
Support devices telling the orchestrator that they might of updated
joshk Jan 31, 2025
4949b88
Minor cleanup
joshk Jan 31, 2025
dff7764
Don't forcefully clear inflight updates
joshk Jan 31, 2025
0f44a8e
Remove inflight updates where the firmware matches the device
joshk Jan 31, 2025
975105f
Only message an orchestrator if the device has a deployment
joshk Jan 31, 2025
19953da
Some cleanup, some docs, some encapsulation
joshk Jan 31, 2025
104fd3b
Use singular where the event is for one deployment
joshk Jan 31, 2025
1047693
Prefer `Phoenix.Socket.Broadcast`
joshk Jan 31, 2025
795675e
Use `init` over `handle_continue(:boot, _)`
joshk Jan 31, 2025
b5476c5
Tests and fixes from the tests
joshk Jan 31, 2025
f0a5441
Test scheduling updating as devices come online
joshk Feb 1, 2025
5936c1f
Remove a commented out test which has been covered
joshk Feb 1, 2025
466a127
Bump the Orchestrator timer interval to 90 secs
joshk Feb 1, 2025
c38c188
Fix the need to have a second `start_link` function on the orchestrator
joshk Feb 1, 2025
37a36ef
Remove an unneeded `subscribe` in a test
joshk Feb 1, 2025
07b69e8
Allow the orchestrator to ignore the `device-online`
joshk Feb 1, 2025
fa3edd0
Only send the "device finished updating" broadcast if it did update
joshk Feb 1, 2025
10579fd
The original `Monitor` shouldn't have been changed
joshk Feb 1, 2025
82da381
Make it easier to start a cluster in dev
joshk Feb 1, 2025
cce4ce8
Only start the clustered orchestrator on the web nodes
joshk Feb 1, 2025
ee871d2
Switch from `Horde` to `ProcessHub`
joshk Feb 2, 2025
938f213
Tweak the log message when a device updates.
joshk Feb 2, 2025
b260089
If the payload is nil, use an empty map
joshk Feb 2, 2025
6f0bb27
Simplify the `Monitor` to a one off `OrchestratorRegistration`
joshk Feb 2, 2025
b2c9ac8
Fix the new orchestrator tests
joshk Feb 2, 2025
70b6cf3
Add a `child_spec` to the `OrchestratorRegistration` module
joshk Feb 2, 2025
d142aa7
Remove the `Distributed.Supervisor`, the extra layer isn't needed
joshk Feb 2, 2025
a07baa2
Clean up `Distributed.Orchestrator.start_link/1`
joshk Feb 2, 2025
850d4e3
Add some doc comments around the deployments orchestrator config in `…
joshk Feb 2, 2025
b7baf43
Have the Orchestrator undertake threshold checks before scheduling a …
joshk Feb 3, 2025
cd0b2e4
Nate doesn't like counting IDs, he feels it excludes other informatio…
joshk Feb 3, 2025
795d734
Fix a missing alias after merging in main
joshk Feb 4, 2025
672b178
Fix a warning from dialyzer that this code can never be reached
joshk Feb 4, 2025
e43298e
Remove a superfluous broadcast, and move an audit to a better place
joshk Feb 4, 2025
4d63d1f
Handle `fwup error` messages
joshk Feb 4, 2025
cac0f87
Clear a devices inflight update during connection if it isn't updating
joshk Feb 4, 2025
ee80a66
Fix `skip the queue` (use `phx-click` not `phx-submit`)
joshk Feb 4, 2025
b40062d
Test deployments starting/stopping orchestrator when updating to acti…
nshoes Feb 4, 2025
bb10cc6
publish
nshoes Feb 4, 2025
28714dc
Rework the duplicated libcluster config
joshk Feb 4, 2025
ec2fd3c
[publish]
nshoes Feb 4, 2025
0b88bbb
Merge branch 'main' into distributed-deployment-orchestrator
joshk Feb 5, 2025
1da3794
[publish]
nshoes Feb 5, 2025
0e56456
Merge branch 'main' into distributed-deployment-orchestrator
joshk Feb 6, 2025
120ee9c
[publish]
nshoes Feb 6, 2025
0f74e93
Change the topic the Orchestrator uses
joshk Feb 6, 2025
06243c1
Better management of the orchestrator mailbox
joshk Feb 6, 2025
7d01c7b
Merge branch 'distributed-deployment-orchestrator' of github.com:nerv…
joshk Feb 6, 2025
9876d56
Reduce the buffer to 5 seconds
joshk Feb 6, 2025
935235d
[publish]
joshk Feb 6, 2025
a071eb8
Fix a broadcast payload
joshk Feb 6, 2025
7c6d00f
[publish]
joshk Feb 6, 2025
a625c4e
Merge branch 'main' into distributed-deployment-orchestrator
joshk Feb 6, 2025
b860f0a
Address dialyzer and credo warnings
joshk Feb 7, 2025
e394ad8
Add a connection status of `:connecting`
joshk Feb 7, 2025
fb2ef27
`mix format` is our savior
joshk Feb 7, 2025
f79114d
Fix a tests assert
joshk Feb 7, 2025
9780534
Fix an issue where two firmware updates could be sent for the same de…
joshk Feb 7, 2025
2d3ef76
[publish]
joshk Feb 7, 2025
27f3c28
Merge branch 'main' into distributed-deployment-orchestrator
joshk Feb 7, 2025
07796b3
[publish]
joshk Feb 7, 2025
4db5b5a
Notify the Orchestrator when a device is enabled for updates
joshk Feb 7, 2025
5e85294
Don't trigger the Orchestrator if updates are blocked for the device
joshk Feb 7, 2025
70eacb5
Credo didn't like an `if` using brackets
joshk Feb 7, 2025
769860b
[publish]
joshk Feb 7, 2025
99fe8ea
The deployment isn't loaded, so use `device.deployment_id` instead
joshk Feb 7, 2025
65929ab
[publish]
joshk Feb 7, 2025
610232a
Merge branch 'main' into distributed-deployment-orchestrator
joshk Feb 8, 2025
4dc727e
Dialyzer good times
joshk Feb 8, 2025
2d5a8d6
New `ProcessHub` version allows for clean GenServer shutdown
joshk Feb 9, 2025
fd27860
[publish]
nshoes Feb 10, 2025
4adb989
Merge branch 'main' into distributed-deployment-orchestrator
joshk Feb 10, 2025
188d180
[publish]
joshk Feb 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ iex:
iex-server:
iex -S mix phx.server

num = 0
iex-server-clustered:
WEB_PORT="40${num}0" iex --sname dev-$(num) --cookie nomnomnom -S mix phx.server

reset-db:
mix ecto.reset

Expand Down
8 changes: 5 additions & 3 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ config :phoenix, :stacktrace_depth, 20
##
# NervesHub Web
#
web_port = String.to_integer(System.get_env("WEB_PORT", "4000"))

config :nerves_hub, NervesHubWeb.Endpoint,
url: [
host: System.get_env("WEB_HOST", "localhost"),
scheme: System.get_env("WEB_SCHEME", "http"),
port: String.to_integer(System.get_env("WEB_PORT", "4000"))
port: web_port
],
http: [ip: {0, 0, 0, 0}, port: 4000],
http: [ip: {0, 0, 0, 0}, port: web_port],
debug_errors: true,
code_reloader: true,
check_origin: false,
Expand Down Expand Up @@ -46,7 +48,7 @@ config :nerves_hub, NervesHubWeb.DeviceEndpoint,
watchers: [],
https: [
ip: {0, 0, 0, 0},
port: 4001,
port: web_port + 1,
otp_app: :nerves_hub,
thousand_island_options: [
transport_module: NervesHub.DeviceSSLTransport,
Expand Down
104 changes: 59 additions & 45 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ config :nerves_hub,
String.to_integer(System.get_env("DEVICE_CONNECTION_DELETE_LIMIT", "100000")),
deployment_calculator_interval_seconds:
String.to_integer(System.get_env("DEPLOYMENT_CALCULATOR_INTERVAL_SECONDS", "3600")),
deployments_orchestrator: System.get_env("DEPLOYMENTS_ORCHESTRATOR", "multi"),
mapbox_access_token: System.get_env("MAPBOX_ACCESS_TOKEN"),
dashboard_enabled: System.get_env("DASHBOARD_ENABLED", "false") == "true",
extension_config: [
Expand Down Expand Up @@ -209,34 +210,35 @@ end
##
# Database and Libcluster connection settings
#
if config_env() == :prod do
database_ssl_opts =
if System.get_env("DATABASE_SSL", "true") == "true" do
if System.get_env("DATABASE_PEM") do
db_hostname_charlist =
~r/.*@(?<hostname>[^:\/]+)(?::\d+)?\/.*/
|> Regex.named_captures(System.fetch_env!("DATABASE_URL"))
|> Map.get("hostname")
|> to_charlist()

cacerts =
System.fetch_env!("DATABASE_PEM")
|> Base.decode64!()
|> :public_key.pem_decode()
|> Enum.map(fn {_, der, _} -> der end)

[
verify: :verify_peer,
cacerts: cacerts,
server_name_indication: db_hostname_charlist
]
else
[cacerts: :public_key.cacerts_get()]
end

database_ssl_opts =
if System.get_env("DATABASE_SSL", "true") == "true" do
if System.get_env("DATABASE_PEM") do
db_hostname_charlist =
~r/.*@(?<hostname>[^:\/]+)(?::\d+)?\/.*/
|> Regex.named_captures(System.fetch_env!("DATABASE_URL"))
|> Map.get("hostname")
|> to_charlist()

cacerts =
System.fetch_env!("DATABASE_PEM")
|> Base.decode64!()
|> :public_key.pem_decode()
|> Enum.map(fn {_, der, _} -> der end)

[
verify: :verify_peer,
cacerts: cacerts,
server_name_indication: db_hostname_charlist
]
else
false
[cacerts: :public_key.cacerts_get()]
end
else
false
end

if config_env() == :prod do
database_socket_options = if System.get_env("DATABASE_INET6") == "true", do: [:inet6], else: []

config :nerves_hub, NervesHub.Repo,
Expand All @@ -259,28 +261,40 @@ if config_env() == :prod do

config :nerves_hub,
database_auto_migrator: System.get_env("DATABASE_AUTO_MIGRATOR", "true") == "true"
end

# Libcluster is using Postgres for Node discovery
# The library only accepts keyword configs, so the DATABASE_URL has to be
# parsed and put together with the ssl pieces from above.
postgres_config = Ecto.Repo.Supervisor.parse_url(System.fetch_env!("DATABASE_URL"))

libcluster_db_config =
[port: 5432]
|> Keyword.merge(postgres_config)
|> Keyword.take([:hostname, :username, :password, :database, :port])
|> Keyword.merge(ssl: database_ssl_opts)
|> Keyword.merge(parameters: [])
|> Keyword.merge(channel_name: "nerves_hub_clustering")

config :libcluster,
topologies: [
postgres: [
strategy: LibclusterPostgres.Strategy,
config: libcluster_db_config
]
# Libcluster is using Postgres for Node discovery
# The library only accepts keyword configs, so the DATABASE_URL has to be
# parsed and put together with the ssl pieces from above.
#
# By using the dev database url as the default it allows us to reduce the
# libcluster config and keep it all here.
postgres_config =
Ecto.Repo.Supervisor.parse_url(
System.get_env("DATABASE_URL", "postgres://postgres:postgres@localhost/nerves_hub_dev")
)

libcluster_db_config =
[port: 5432]
|> Keyword.merge(postgres_config)
|> Keyword.take([:hostname, :username, :password, :database, :port])
|> then(fn keywords ->
if config_env() == :prod do
Keyword.merge(keywords, ssl: database_ssl_opts)
else
keywords
end
end)
|> Keyword.merge(parameters: [])
|> Keyword.merge(channel_name: "nerves_hub_clustering")

config :libcluster,
topologies: [
postgres: [
strategy: LibclusterPostgres.Strategy,
config: libcluster_db_config
]
end
]

##
# Firmware upload backend.
Expand Down
29 changes: 25 additions & 4 deletions lib/nerves_hub/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule NervesHub.Application do
{Task.Supervisor, name: NervesHub.TaskSupervisor},
{Oban, Application.fetch_env!(:nerves_hub, Oban)}
] ++
deployments_supervisor(deploy_env()) ++
deployments_orchestrator(deploy_env()) ++
endpoints(deploy_env())

opts = [strategy: :one_for_one, name: NervesHub.Supervisor]
Expand Down Expand Up @@ -73,10 +73,31 @@ defmodule NervesHub.Application do
:ok
end

defp deployments_supervisor("test"), do: []
defp deployments_orchestrator("test"), do: []

defp deployments_supervisor(_) do
[NervesHub.Deployments.Supervisor]
# When running in the default `multi` mode, start a deployments orchestrator supervisor
# on every node. But when running in `clustered` mode, run the `ProcessHub` supervisor
# on the `web` or `all` nodes only.
defp deployments_orchestrator(_) do
orchestrator = Application.get_env(:nerves_hub, :deployments_orchestrator)
app = Application.get_env(:nerves_hub, :app)

case [orchestrator, app] do
["multi", _] ->
[NervesHub.Deployments.Supervisor]

["clustered", "device"] ->
[]

["clustered", _] ->
[
ProcessHub.child_spec(%ProcessHub{hub_id: :deployment_orchestrators}),
NervesHub.Deployments.Distributed.OrchestratorRegistration
]

[other, _] ->
raise "Deployments Orchestrator '#{other}' not supported"
end
end

defp endpoints("test") do
Expand Down
117 changes: 97 additions & 20 deletions lib/nerves_hub/deployments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
alias NervesHub.AuditLogs.DeploymentTemplates
alias NervesHub.AuditLogs.DeviceTemplates
alias NervesHub.Deployments.Deployment
alias NervesHub.Deployments.Distributed.Orchestrator, as: DistributedOrchestrator
alias NervesHub.Deployments.InflightDeploymentCheck
alias NervesHub.Devices
alias NervesHub.Devices.Device
Expand All @@ -21,6 +22,13 @@
Repo.all(Deployment)
end

@spec all_active() :: [Deployment.t()]
def all_active() do
Deployment
|> where(is_active: true)
|> Repo.all()
end

@spec filter(Product.t(), map()) :: {[Product.t()], Flop.Meta.t()}
def filter(product, opts \\ %{}) do
opts = Map.reject(opts, fn {_key, val} -> is_nil(val) end)
Expand Down Expand Up @@ -156,7 +164,22 @@
end
end

def get_deployment!(deployment_id), do: Repo.get!(Deployment, deployment_id)
def get_deployment(%Deployment{id: id}), do: get_deployment(id)

def get_deployment(deployment_id) do
Deployment
|> where([d], d.id == ^deployment_id)
|> join(:left, [d], f in assoc(d, :firmware), as: :firmware)
|> preload([firmware: f], firmware: f)
|> Repo.one()
|> case do
nil ->
{:error, :not_found}

deployment ->
{:ok, deployment}
end
end

@spec get_by_product_and_name!(Product.t(), String.t(), boolean()) :: Deployment.t()
def get_by_product_and_name!(product, name, with_device_count \\ false)
Expand Down Expand Up @@ -208,12 +231,15 @@

@spec delete_deployment(Deployment.t()) :: {:ok, Deployment.t()} | {:error, :not_found}
def delete_deployment(%Deployment{id: deployment_id}) do
case Repo.delete(Repo.get!(Deployment, deployment_id)) do
Deployment
|> Repo.get!(deployment_id)
|> Repo.delete()
|> case do
{:error, _changeset} ->
{:error, :not_found}

{:ok, deployment} ->
_ = broadcast(:monitor, "deployments/delete", %{deployment_id: deployment.id})
_ = deployment_deleted_event(deployment)

{:ok, deployment}
end
Expand Down Expand Up @@ -258,6 +284,14 @@
_ = maybe_trigger_delta_generation(deployment, changeset)
:ok = broadcast(deployment, "deployments/update")

if Map.has_key?(changeset.changes, :is_active) do
if deployment.is_active do
deployment_activated_event(deployment)
else
deployment_deactivated_event(deployment)
end
end
joshk marked this conversation as resolved.
Show resolved Hide resolved

{:ok, deployment}

{:error, changeset} ->
Expand Down Expand Up @@ -333,7 +367,7 @@

case Repo.insert(changeset) do
{:ok, deployment} ->
_ = broadcast(:monitor, "deployments/new", %{deployment_id: deployment.id})
deployment_created_event(deployment)

{:ok, deployment}

Expand All @@ -346,31 +380,30 @@
def broadcast(deployment, event, payload \\ %{})

def broadcast(:none, event, payload) do
message = %Phoenix.Socket.Broadcast{
topic: "deployment:none",
event: event,
payload: payload
}

Phoenix.PubSub.broadcast(NervesHub.PubSub, "deployment:none", message)
Phoenix.Channel.Server.broadcast(

Check warning on line 383 in lib/nerves_hub/deployments.ex

View workflow job for this annotation

GitHub Actions / compile-and-test

Nested modules could be aliased at the top of the invoking module.
NervesHub.PubSub,
"deployment:none",
event,
payload
)
end

def broadcast(:monitor, event, payload) do
Phoenix.PubSub.broadcast(
Phoenix.Channel.Server.broadcast(

Check warning on line 392 in lib/nerves_hub/deployments.ex

View workflow job for this annotation

GitHub Actions / compile-and-test

Nested modules could be aliased at the top of the invoking module.
NervesHub.PubSub,
"deployment:monitor",
%Phoenix.Socket.Broadcast{event: event, payload: payload}
event,
payload
)
end

def broadcast(%Deployment{id: id}, event, payload) do
message = %Phoenix.Socket.Broadcast{
topic: "deployment:#{id}",
event: event,
payload: payload
}

Phoenix.PubSub.broadcast(NervesHub.PubSub, "deployment:#{id}", message)
Phoenix.Channel.Server.broadcast(

Check warning on line 401 in lib/nerves_hub/deployments.ex

View workflow job for this annotation

GitHub Actions / compile-and-test

Nested modules could be aliased at the top of the invoking module.
NervesHub.PubSub,
"deployment:#{id}",
event,
payload
)
end

@doc """
Expand Down Expand Up @@ -555,4 +588,48 @@
|> where([d, firmware: f], f.architecture == ^device.firmware_metadata.architecture)
|> Repo.all()
end

def deployment_created_event(deployment) do
_ =
case Application.get_env(:nerves_hub, :deployments_orchestrator) do
"multi" -> _ = broadcast(:monitor, "deployments/new", %{deployment_id: deployment.id})
"clustered" -> DistributedOrchestrator.start_orchestrator(deployment)
other -> raise "Deployments Orchestrator '#{other}' not supported"
end

:ok
end

def deployment_activated_event(deployment) do
_ =
case Application.get_env(:nerves_hub, :deployments_orchestrator) do
"multi" -> :ok
"clustered" -> DistributedOrchestrator.start_orchestrator(deployment)
other -> raise "Deployments Orchestrator '#{other}' not supported"
end

:ok
end

def deployment_deactivated_event(deployment) do
_ =
case Application.get_env(:nerves_hub, :deployments_orchestrator) do
"multi" -> :ok
"clustered" -> DistributedOrchestrator.stop_orchestrator(deployment)
other -> raise "Deployments Orchestrator '#{other}' not supported"
end

:ok
end

def deployment_deleted_event(deployment) do
_ =
case Application.get_env(:nerves_hub, :deployments_orchestrator) do
"multi" -> broadcast(:monitor, "deployments/delete", %{deployment_id: deployment.id})
"clustered" -> broadcast(deployment, "deleted")
other -> raise "Deployments Orchestrator '#{other}' not supported"
end

:ok
end
end
Loading
Loading