Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hook to manually control redistribution and a cast message to manually trigger redistribution. #253

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion lib/horde/dynamic_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ defmodule Horde.DynamicSupervisor do
"""
use Supervisor

@type allow_handoff_function ::
({current_node :: term(), chosen_node :: term(), child_spec :: term(),
child_pid :: pid()} ->
true | false)
@type options() :: [option()]
@type option ::
{:name, name :: atom()}
Expand All @@ -63,7 +67,7 @@ defmodule Horde.DynamicSupervisor do
| {:shutdown, integer()}
| {:members, [Horde.Cluster.member()] | :auto}
| {:delta_crdt_options, [DeltaCrdt.crdt_option()]}
| {:process_redistribution, :active | :passive}
| {:process_redistribution, :active | :passive | allow_handoff_function()}

@callback init(options()) :: {:ok, options()} | :ignore
@callback child_spec(options :: options()) :: Supervisor.child_spec()
Expand Down Expand Up @@ -263,6 +267,17 @@ defmodule Horde.DynamicSupervisor do
"""
def count_children(supervisor), do: call(supervisor, :count_children)

@doc """
This function triggers redistribution similar to when a node joins or leaves.

The allow_handoff argument is a function that gates whether a process is allowed to be transfered.
By default, it uses the same action as defined in the "process_redistribution" option.
When a process will be moved allow_handoff is called with the argument {current_node, chosen_node, child_spec, child_pid}
"""
def redistribute_children(horde, allow_handoff \\ nil) do
GenServer.cast(horde, {:redistribute_children, allow_handoff})
end

@doc """
Waits for Horde.DynamicSupervisor to have quorum.
"""
Expand Down
42 changes: 33 additions & 9 deletions lib/horde/dynamic_supervisor_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,19 @@ defmodule Horde.DynamicSupervisorImpl do
{:noreply, new_state}
end

def handle_cast(:redistribute_children, state) do
handle_cast({:redistribute_children, default_allowhandoff(state)}, state)
end

def handle_cast({:redistribute_children, nil}, state) do
handle_cast({:redistribute_children, default_allowhandoff(state)}, state)
end

def handle_cast({:redistribute_children, fn_allowhandoff}, state)
when is_function(fn_allowhandoff) do
{:noreply, handoff_processes(state, allowhandoff: fn_allowhandoff)}
end

defp set_child_pid(state, child_id, new_child_pid) do
case get_item(state.processes_by_id, child_id) do
{name, child_spec, old_pid} ->
Expand Down Expand Up @@ -391,11 +404,25 @@ defmodule Horde.DynamicSupervisorImpl do

def has_membership_changed?([]), do: false

defp handoff_processes(state) do
defp default_allowhandoff(state) do
case state.supervisor_options[:process_redistribution] do
:active ->
fn _ -> true end

:passive ->
fn _ -> false end

somefunction when is_function(somefunction) ->
somefunction
end
end

defp handoff_processes(state, opts \\ []) do
this_node = fully_qualified_name(state.name)
fn_allowhandoff = Keyword.get(opts, :allowhandoff, default_allowhandoff(state))

all_items_values(state.processes_by_id)
|> Enum.reduce(state, fn {current_node, child_spec, _child_pid}, state ->
|> Enum.reduce(state, fn {current_node, child_spec, child_pid}, state ->
case choose_node(child_spec, state) do
{:ok, %{name: chosen_node}} ->
current_member = Map.get(state.members_info, current_node)
Expand All @@ -408,13 +435,10 @@ defmodule Horde.DynamicSupervisorImpl do

{^this_node, _other_node} ->
# process is running here but belongs somewhere else

case state.supervisor_options[:process_redistribution] do
:active ->
handoff_child(child_spec, state)

:passive ->
state
if fn_allowhandoff.({current_node, chosen_node, child_spec, child_pid}) do
handoff_child(child_spec, state)
else
state
end

{_current_node, ^this_node} ->
Expand Down
28 changes: 26 additions & 2 deletions test/dynamic_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ defmodule DynamicSupervisorTest do

describe "graceful shutdown" do
test "stopping a node moves processes over when they are ready" do
# NOTE: here I had to disable redistribution on node :up, otherwise
# sometimes horde would kill the :fast process for redistribution when
# NOTE: here I had to disable redistribution on node :up, otherwise
# sometimes horde would kill the :fast process for redistribution when
# :horde_2_graceful is marked as :alive

{:ok, _} =
Expand Down Expand Up @@ -618,5 +618,29 @@ defmodule DynamicSupervisorTest do
context.passive[:children]
)
end

test "redistribution can be triggered manually", context do
n2_cspecs =
LocalClusterHelper.expected_distribution_for(
context.passive[:children],
context.passive[:members],
context.passive[:n2]
)

Horde.Cluster.set_members(context.passive[:n1], [context.passive[:n1], context.passive[:n2]])

Process.sleep(500)

# verify nothing redistributed on :alive
assert Kernel.match?([], LocalClusterHelper.running_children(context.passive[:n2]))

Horde.DynamicSupervisor.redistribute_children(context.passive[:n1], fn _ -> true end)
Process.sleep(500)

assert_receive {:shutdown, :passive, {:shutdown, :process_redistribution}}, 100
refute_receive {:shutdown, :passive, :shutdown}, 100

assert LocalClusterHelper.supervisor_has_children?(context.passive[:n2], n2_cspecs)
end
end
end