Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example dynamic cluster membership with tests. #155

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/dynamic/.formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
24 changes: 24 additions & 0 deletions examples/dynamic/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where 3rd-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
dynamic-*.tar

35 changes: 35 additions & 0 deletions examples/dynamic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Distributed Supervisor / Registry "hello world"

This is an example application that shows how `Horde.Supervisor` and `Horde.Registry` work together using dynamic cluster membership.

Start the app in separate terminal windows with:

`iex --name [email protected] --cookie asdf -S mix`

and

`iex --name [email protected] --cookie asdf -S mix`

and

`iex --name [email protected] --cookie asdf -S mix`


On any of the terminal windows, you can now create "entities" (which is anything you desire). Type in the following to any of the terminal windows:

```
iex> Dynamic.create_entity("UniqueName", %{some: :data, about: "it"})
```

Now on any other node, you can get the data again:

```
iex> Dynamic.get_entity("UniqueName")
%{some: :data, about: "it"}
```

This alone is not too suprising. Now kill the node which created the entity (CTRL-C twice). Notice the other machines can still fetch the data.

We use the `meta/2` and `put_meta/3` functions on `Horde.Registry` to share the data for the entity across the members of the Horde. This means that when the node running `Dynamic.Entity` is killed, the new instance started by the `Horde.Supervisor` will pick up the data from the meta data shared across the `Horde.Registry` to continue processing where the previous instance left off. You can get the data of an entity by running `Dynamic.get_entity(name_of_entity)`.

You can also call `Dynamic.Entity.all()` from any of the IEX consoles to retrieve all the entity workers.
44 changes: 44 additions & 0 deletions examples/dynamic/config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config

config :logger, handle_sasl_reports: true, level: :debug

config :libcluster, :topologies,
hello_cluster: [
strategy: Elixir.Cluster.Strategy.Gossip,
config: [
port: 45892,
if_addr: "0.0.0.0",
multicast_addr: "230.1.1.251",
multicast_ttl: 1,
secret: "asdf"
]
]

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for
# 3rd-party users, it should be done in your "mix.exs" file.

# You can configure your application as:
#
# config :dynamic, key: :value
#
# and access this configuration in your application as:
#
# Application.get_env(:dynamic, :key)
#
# You can also configure a 3rd-party app:
#
# config :logger, level: :info
#

# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
# Configuration from the imported file will override the ones defined
# here (which is why it is important to import them last).
#
# import_config "#{Mix.env}.exs"
32 changes: 32 additions & 0 deletions examples/dynamic/lib/cluster/node_listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule NodeListener do
use GenServer

def start_link(_) do
GenServer.start_link(__MODULE__, nil)
end

def init(_) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, nil}
end

def handle_info({:nodeup, _node, _opts}, state) do
set_members(Dynamic.EntitySupervisor)
set_members(Dynamic.EntityRegistry)
{:noreply, state}
end

def handle_info({:nodedown, _node, _opts}, state) do
set_members(Dynamic.EntitySupervisor)
set_members(Dynamic.EntityRegistry)
{:noreply, state}
end

defp set_members(name) do
members =
[Node.self() | Node.list()]
|> Enum.map(fn node -> {name, node} end)

:ok = Horde.Cluster.set_members(name, members)
end
end
22 changes: 22 additions & 0 deletions examples/dynamic/lib/dynamic.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Dynamic do
def create_entity(name, contents) do
Horde.Supervisor.start_child(
Dynamic.EntitySupervisor,
{Dynamic.Entity, [name: name, contents: contents]}
)
end

def get_entity(name) do
name
|> get_entity_pid()
|> Dynamic.Entity.get_data()
|> elem(1)
end

def get_entity_pid(name) do
case Horde.Registry.lookup(Dynamic.EntityRegistry, name) do
:undefined -> nil
[{pid, _}] -> pid
end
end
end
37 changes: 37 additions & 0 deletions examples/dynamic/lib/dynamic/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Dynamic.Application do
@moduledoc false

use Application

def start(_type, _args) do
# List all child processes to be supervised
children = [
{Horde.Registry, [name: Dynamic.EntityRegistry, keys: :unique]},
{Horde.Supervisor,
[
name: Dynamic.EntitySupervisor,
strategy: :one_for_one,
distribution_strategy: Horde.UniformQuorumDistribution,
max_restarts: 100_000,
max_seconds: 1
]},
NodeListener,
unquote(if(Mix.env() != :test, do: quote(do: {Cluster.Supervisor, [Application.get_env(:libcluster, :topologies)]}))),
%{
id: Dynamic.ClusterConnector,
restart: :transient,
start:
{Task, :start_link,
[
fn ->
Horde.Supervisor.wait_for_quorum(Dynamic.EntitySupervisor, 30_000)
end
]}
}
]
|> Enum.filter(&(&1))

opts = [strategy: :one_for_one, name: Dynamic.Supervisor]
Supervisor.start_link(children, opts)
end
end
37 changes: 37 additions & 0 deletions examples/dynamic/lib/dynamic/entity.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Dynamic.Entity do
use GenServer

def start_link(name, data) do
GenServer.start_link(__MODULE__, {name, data}, name: via_tuple(name))
end

def init(data) do
{:ok, data}
end

def child_spec(opts) do
name = Keyword.fetch!(opts, :name)
contents = Keyword.fetch!(opts, :contents)

%{
id: "#{__MODULE__}_#{name}",
start: {__MODULE__, :start_link, [name, contents]},
# Allow for up to 10 seconds to shut down
shutdown: 10_000,
# Restart if it crashes only
restart: :transient
}
end

def handle_call(:get_data, _from, {name, data} = state) do
{:reply, {name, data}, state}
end

def get_data(entity_pid) do
GenServer.call(entity_pid, :get_data)
end

def via_tuple(name) do
{:via, Horde.Registry, {Dynamic.EntityRegistry, name}}
end
end
30 changes: 30 additions & 0 deletions examples/dynamic/mix.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Dynamic.MixProject do
use Mix.Project

def project do
[
app: :dynamic,
version: "0.1.0",
elixir: "~> 1.6",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end

# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger],
mod: {Dynamic.Application, []}
]
end

# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:horde, path: "../.."},
{:libcluster, "~> 3.0.3"},
{:local_cluster, "~> 1.1", only: [:test]}
]
end
end
14 changes: 14 additions & 0 deletions examples/dynamic/mix.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
%{
"delta_crdt": {:hex, :delta_crdt, "0.5.7", "fac7d9769fce1c4dc50283abd659018476170a7532d8f0a508d9fcbcd9b8faf2", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm"},
"horde": {:git, "https://github.com/derekkraan/horde.git", "8822a432f4c3528f904bd62ddbb1bcc242658c93", [branch: "fix_start_children"]},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"libcluster": {:hex, :libcluster, "3.0.3", "492e98c7f5c9a6e95b8d51f0b198cf8eab60af3b490f40b958d4bc326d11e40e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"},
"local_cluster": {:hex, :local_cluster, "1.1.0", "a2a0e3e965aa1549939108066bfa537ce89f0107917f5b0260153e2fdb304116", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm"},
"merkle_map": {:hex, :merkle_map, "0.2.0", "5391ac61e016ce4aeb66ce39f05206a382fd4b66ee4b63c08a261d5633eadd76", [:mix], [], "hexpm"},
"murmur": {:hex, :murmur, "1.0.1", "a6e6bced2dd0d666090a9cf3e73699f3b9176bbcf32d35b0f022f137667608e3", [:mix], [], "hexpm"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"},
"telemetry_poller": {:hex, :telemetry_poller, "0.4.0", "da64dea54b77604023e8d15dc61a5df8968f4c9e013eba561bfb2bc614b15432", [:rebar3], [], "hexpm"},
"xxhash": {:hex, :xxhash, "0.2.1", "ab0893a8124f3c11116c57e500485dc5f67817d1d4c44f0fff41f3fd3c590607", [:mix], [], "hexpm"},
}
102 changes: 102 additions & 0 deletions examples/dynamic/test/dynamic_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
defmodule Dynamic.EntityTest do
use ExUnit.Case
alias Dynamic.Entity

describe "multiple nodes" do
setup do
[node1, node2] =
nodes = LocalCluster.start_nodes("test-dynamic-entity", 2, files: [__ENV__.file])

IO.inspect(nodes, label: "Test nodes that are now online")

# Ensure online
assert Node.ping(node1) == :pong
assert Node.ping(node2) == :pong

Horde.Supervisor.wait_for_quorum(Dynamic.EntitySupervisor, 30_000)

Dynamic.create_entity("one", %{one: "data"})
Dynamic.create_entity("two", %{two: "data"})
Dynamic.create_entity("three", %{three: "data"})
Dynamic.create_entity("four", %{four: "data"})
Dynamic.create_entity("five", %{five: "data"})

# Wait for eventual consistency of registries
Process.sleep(100)

entity_one = Dynamic.get_entity_pid("one")
entity_two = Dynamic.get_entity_pid("two")
entity_three = Dynamic.get_entity_pid("three")
entity_four = Dynamic.get_entity_pid("four")
entity_five = Dynamic.get_entity_pid("five")

entities = [entity_one, entity_two, entity_three, entity_four, entity_five]

node1_entity = entities |> Enum.find(&(node(&1) == node1))
assert node1_entity
{node1_entity_str, node1_entity_data} = Entity.get_data(node1_entity)
node2_entity = entities |> Enum.find(&(node(&1) == node2))
assert node2_entity
{node2_entity_str, node2_entity_data} = Entity.get_data(node2_entity)

%{
node1: node1,
node2: node2,
nodes: nodes,
node1_entity: node1_entity,
node2_entity: node2_entity,
node1_entity_data: node1_entity_data,
node2_entity_data: node2_entity_data,
node1_entity_str: node1_entity_str,
node2_entity_str: node2_entity_str
}
end

# Testing that other nodes can see the same registry data
test "entity data fetch from other nodes", %{
node1: node1,
node1_entity: node1_entity,
node1_entity_str: node1_entity_str,
node1_entity_data: node1_entity_data,
node2: node2,
node2_entity: node2_entity,
node2_entity_str: node2_entity_str,
node2_entity_data: node2_entity_data
} do
tester = self()

# Try to find the entity pid from node 2
Node.spawn(node2, fn ->
send(tester, Dynamic.get_entity(node1_entity_str))
end)

assert_receive ^node1_entity_data, 100

# Try to find the entity pid from node 1
Node.spawn(node1, fn ->
send(tester, Dynamic.get_entity(node2_entity_str))
end)

assert_receive ^node2_entity_data, 100
end

# Testing what our logic does when a node goes down, and to make sure
# we have everything configured right with horde
test "a node goes down", %{
node1: node1,
node1_entity_str: node1_entity_str,
node1_entity_data: node1_entity_data
} do
# Oh noes! a node goes down!
:ok = LocalCluster.stop_nodes([node1])

# Ensure node1 is not reachable
assert Node.ping(node1) == :pang

Process.sleep(100)

# Node1's data got moved to another node
assert Dynamic.get_entity(node1_entity_str) == node1_entity_data
end
end
end
4 changes: 4 additions & 0 deletions examples/dynamic/test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
:ok = LocalCluster.start()
Application.ensure_all_started(:dynamic)

ExUnit.start()