Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to set some cluster metadata besides the name using tags (backport #12659) #12699

Merged
merged 6 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2688,6 +2688,20 @@ fun(Conf) ->
end
end}.

{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
{datatype, [binary]}
]}.

{translation, "rabbit.cluster_tags",
fun(Conf) ->
case cuttlefish:conf_get("cluster_tags", Conf, undefined) of
none -> [];
_ ->
Settings = cuttlefish_variable:filter_by_prefix("cluster_tags", Conf),
[ {list_to_binary(K), V} || {[_, K], V} <- Settings]
end
end}.

% ===============================
% Validators
% ===============================
Expand Down
17 changes: 16 additions & 1 deletion deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

%%---------------------------------------------------------------------------
%% Boot steps.
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0,
-export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0,
pg_local_amqp_session/0,
pg_local_amqp_connection/0]).

Expand Down Expand Up @@ -208,6 +208,12 @@
{requires, recovery},
{enables, routing_ready}]}).


-rabbit_boot_step({cluster_tags,
[{description, "Set cluster tags"},
{mfa, {?MODULE, update_cluster_tags, []}},
{requires, core_initialized}]}).

-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"},
{requires, [core_initialized, recovery]}]}).
Expand Down Expand Up @@ -1138,6 +1144,15 @@ pg_local_amqp_connection() ->
pg_local_scope(Prefix) ->
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).


-spec update_cluster_tags() -> 'ok'.

update_cluster_tags() ->
Tags = application:get_env(rabbit, cluster_tags, []),
?LOG_DEBUG("Seeding cluster tags from application environment key...",
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
rabbit_runtime_parameters:set_global(cluster_tags, Tags, <<"internal_user">>).

-spec maybe_insert_default_data() -> 'ok'.

maybe_insert_default_data() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning

def run([], %{node: node_name, timeout: timeout} = opts) do
status =
status0 =
case :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :cli_cluster_status, []) do
{:badrpc, {:EXIT, {:undef, _}}} ->
:rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :status, [])
Expand All @@ -45,11 +45,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
status
end

case status do
case status0 do
{:badrpc, _} = err ->
err

status ->
status0 ->
tags = cluster_tags(node_name, timeout)
status = status0 ++ [{:cluster_tags, tags}]
case :rabbit_misc.rpc_call(node_name, :rabbit_nodes, :list_running, []) do
{:badrpc, _} = err ->
err
Expand Down Expand Up @@ -122,7 +124,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do

def output(result, %{node: node_name}) when is_list(result) do
m = result_map(result)

total_cores = Enum.reduce(m[:cpu_cores], 0, fn {_, val}, acc -> acc + val end)

cluster_name_section = [
Expand All @@ -131,6 +132,15 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
"Total CPU cores available cluster-wide: #{total_cores}"
]

cluster_tag_section =
[
"\n#{bright("Cluster Tags")}\n"
] ++
case m[:cluster_tags] do
[] -> ["(none)"]
tags -> cluster_tag_lines(tags)
end

disk_nodes_section =
[
"\n#{bright("Disk Nodes")}\n"
Expand Down Expand Up @@ -210,6 +220,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do

lines =
cluster_name_section ++
cluster_tag_section ++
disk_nodes_section ++
ram_nodes_section ++
running_nodes_section ++
Expand Down Expand Up @@ -260,6 +271,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
# {rabbit@warp10,[{resource_limit,memory,rabbit@warp10}]}]}]
%{
cluster_name: Keyword.get(result, :cluster_name),
cluster_tags: result |> Keyword.get(:cluster_tags, []),
disk_nodes: result |> Keyword.get(:nodes, []) |> Keyword.get(:disc, []),
ram_nodes: result |> Keyword.get(:nodes, []) |> Keyword.get(:ram, []),
running_nodes: result |> Keyword.get(:running_nodes, []) |> Enum.map(&to_string/1),
Expand Down Expand Up @@ -383,6 +395,18 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
{node, result}
end

defp cluster_tags(node, timeout) do
case :rabbit_misc.rpc_call(
node,
:rabbit_runtime_parameters,
:value_global,
[:cluster_tags],
timeout) do
:not_found -> []
tags -> tags
end
end

defp node_lines(nodes) do
Enum.map(nodes, &to_string/1) |> Enum.sort()
end
Expand Down Expand Up @@ -413,4 +437,10 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
defp maintenance_lines(mapping) do
Enum.map(mapping, fn {node, status} -> "Node: #{node}, status: #{status}" end)
end

defp cluster_tag_lines(mapping) do
Enum.map(mapping, fn {key, value} ->
"#{key}: #{value}"
end)
end
end
8 changes: 8 additions & 0 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_overview.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ to_json(ReqData, Context = #context{user = User = #user{tags = Tags}}) ->
{product_name, list_to_binary(rabbit:product_name())},
{rabbitmq_version, list_to_binary(rabbit:base_product_version())},
{cluster_name, rabbit_nodes:cluster_name()},
{cluster_tags, cluster_tags()},
{erlang_version, erlang_version()},
{erlang_full_version, erlang_full_version()},
{release_series_support_status, rabbit_release_series:readable_support_status()},
Expand Down Expand Up @@ -182,3 +183,10 @@ transform_retention_intervals([{MaxAgeInSeconds, _}|Rest], Acc) ->
0
end,
transform_retention_intervals(Rest, [AccVal|Acc]).

cluster_tags() ->
case rabbit_runtime_parameters:value_global(cluster_tags) of
not_found ->
[];
Tags -> Tags
end.
22 changes: 21 additions & 1 deletion deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ all_tests() -> [
disabled_qq_replica_opers_test,
qq_status_test,
list_deprecated_features_test,
list_used_deprecated_features_test
list_used_deprecated_features_test,
cluster_tags_test
].

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -282,6 +283,12 @@ init_per_testcase(Testcase = disabled_qq_replica_opers_test, Config) ->
rabbit_ct_broker_helpers:rpc_all(Config,
application, set_env, [rabbitmq_management, restrictions, Restrictions]),
rabbit_ct_helpers:testcase_started(Config, Testcase);
init_per_testcase(Testcase = cluster_tags_test, Config) ->
Tags = [{<<"az">>, <<"us-east-3">>}, {<<"region">>,<<"us-east">>}, {<<"environment">>,<<"production">>}],
rpc(
Config, rabbit_runtime_parameters, set_global,
[cluster_tags, Tags, none]),
rabbit_ct_helpers:testcase_started(Config, Testcase);
init_per_testcase(queues_detailed_test, Config) ->
IsEnabled = rabbit_ct_broker_helpers:is_feature_flag_enabled(
Config, detailed_queues_endpoint),
Expand Down Expand Up @@ -348,6 +355,11 @@ end_per_testcase0(disabled_operator_policy_test, Config) ->
end_per_testcase0(disabled_qq_replica_opers_test, Config) ->
rpc(Config, application, unset_env, [rabbitmq_management, restrictions]),
Config;
end_per_testcase0(cluster_tags_test, Config) ->
rpc(
Config, rabbit_runtime_parameters, clear_global,
[cluster_tags, none]),
Config;
end_per_testcase0(Testcase, Config)
when Testcase == list_deprecated_features_test;
Testcase == list_used_deprecated_features_test ->
Expand Down Expand Up @@ -3936,6 +3948,14 @@ list_used_deprecated_features_test(Config) ->
?assertEqual(list_to_binary(Desc), maps:get(desc, Feature)),
?assertEqual(list_to_binary(DocUrl), maps:get(doc_url, Feature)).

cluster_tags_test(Config) ->
Overview = http_get(Config, "/overview"),
Tags = maps:get(cluster_tags, Overview),
ExpectedTags = #{az => <<"us-east-3">>,environment => <<"production">>,
region => <<"us-east">>},
?assertEqual(ExpectedTags, Tags),
passed.

%% -------------------------------------------------------------------
%% Helpers.
%% -------------------------------------------------------------------
Expand Down
Loading