Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions deps/amqp10_common/src/serial_number.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
diff/2,
foldl/4]).

-ifdef(TEST).
%% For tests.
-export([usort/1]).
-endif.

-type serial_number() :: sequence_no().
-export_type([serial_number/0]).
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_ct_helpers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ rabbitmq_app(
license_files = [":license_files"],
priv = [":priv"],
deps = [
"//deps/amqp10_common:erlang_app",
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_stream_common:erlang_app",
"@meck//:erlang_app",
"@proper//:erlang_app",
"@ra//:erlang_app",
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_ct_helpers/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PROJECT = rabbitmq_ct_helpers
PROJECT_DESCRIPTION = Common Test helpers for RabbitMQ

DEPS = rabbit_common proper inet_tcp_proxy meck
DEPS = rabbit_common amqp10_common rabbitmq_stream_common proper inet_tcp_proxy meck
TEST_DEPS = rabbit

XREF_IGNORE = [ \
Expand Down
7 changes: 5 additions & 2 deletions deps/rabbitmq_ct_helpers/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_ct_proper_helpers.erl",
"src/rabbit_ct_vm_helpers.erl",
"src/rabbit_mgmt_test_util.erl",
"src/stream_test_utils.erl",
],
hdrs = [":public_and_private_hdrs"],
app_name = "rabbitmq_ct_helpers",
dest = "ebin",
erlc_opts = "//:erlc_opts",
deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
)

def all_test_beam_files(name = "all_test_beam_files"):
Expand All @@ -45,12 +46,13 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_ct_proper_helpers.erl",
"src/rabbit_ct_vm_helpers.erl",
"src/rabbit_mgmt_test_util.erl",
"src/stream_test_utils.erl",
],
hdrs = [":public_and_private_hdrs"],
app_name = "rabbitmq_ct_helpers",
dest = "test",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
)

def all_srcs(name = "all_srcs"):
Expand Down Expand Up @@ -107,6 +109,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_ct_proper_helpers.erl",
"src/rabbit_ct_vm_helpers.erl",
"src/rabbit_mgmt_test_util.erl",
"src/stream_test_utils.erl",
],
)

Expand Down
137 changes: 137 additions & 0 deletions deps/rabbitmq_ct_helpers/src/stream_test_utils.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

%% There is no open source Erlang RabbitMQ Stream client.
%% Therefore, we have to build the Stream protocol commands manually.

-module(stream_test_utils).

-compile([export_all, nowarn_export_all]).

-include_lib("amqp10_common/include/amqp10_framing.hrl").

-define(RESPONSE_CODE_OK, 1).

connect(Config, Node) ->
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream),
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),

C0 = rabbit_stream_core:init(0),
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),

ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 1, sasl_handshake})),
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(Sock, C1),
Username = <<"guest">>,
Password = <<"guest">>,
Null = 0,
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(Sock, C2),
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(Sock, C3),

ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
{ok, Sock, C5}.

create_stream(Sock, C0, Stream) ->
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
ok = gen_tcp:send(Sock, CreateStreamFrame),
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

declare_publisher(Sock, C0, Stream, PublisherId) ->
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
ok = gen_tcp:send(Sock, SubscribeFrame),
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
Messages = [simple_entry(Seq, P)
|| {Seq, P} <- lists:zip(SeqIds, Payloads)],
{ok, SeqIds, C1} = publish_entries(Sock, C0, PublisherId, length(Messages), Messages),
{ok, C1}.

publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
ok = gen_tcp:send(Sock, PublishFrame1),
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
{ok, SeqIds, C1}.

%% Streams contain AMQP 1.0 encoded messages.
%% In this case, the AMQP 1.0 encoded message contains a single data section.
simple_entry(Sequence, Body)
when is_binary(Body) ->
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
DataSectSize = byte_size(DataSect),
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.

%% Streams contain AMQP 1.0 encoded messages.
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
simple_entry(Sequence, Body, AppProps)
when is_binary(Body) ->
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
Sects = <<AppPropsSect/binary, DataSect/binary>>,
SectSize = byte_size(Sects),
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.

%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
%% All data sections are delivered uncompressed in 1 batch.
sub_batch_entry_uncompressed(Sequence, Bodies) ->
Batch = lists:foldl(fun(Body, Acc) ->
AppProps = #'v1_0.application_properties'{
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
Sect = <<Sect0/binary, Sect1/binary>>,
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
end, <<>>, Bodies),
Size = byte_size(Batch),
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.

%% Here, each AMQP 1.0 encoded message contains a single data section.
%% All data sections are delivered in 1 gzip compressed batch.
sub_batch_entry_compressed(Sequence, Bodies) ->
Uncompressed = lists:foldl(fun(Body, Acc) ->
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
<<Acc/binary, Bin/binary>>
end, <<>>, Bodies),
Compressed = zlib:gzip(Uncompressed),
CompressedLen = byte_size(Compressed),
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
CompressedLen:32, Compressed:CompressedLen/binary>>.

receive_stream_commands(Sock, C0) ->
case rabbit_stream_core:next_command(C0) of
empty ->
case gen_tcp:recv(Sock, 0, 5000) of
{ok, Data} ->
C1 = rabbit_stream_core:incoming_data(Data, C0),
case rabbit_stream_core:next_command(C1) of
empty ->
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
rabbit_stream_core:next_command(
rabbit_stream_core:incoming_data(Data2, C1));
Res ->
Res
end;
{error, Err} ->
ct:fail("error receiving stream data ~w", [Err])
end;
Res ->
Res
end.
21 changes: 19 additions & 2 deletions deps/rabbitmq_prometheus/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
load("@rules_erlang//:eunit2.bzl", "eunit")
load("@rules_erlang//:xref2.bzl", "xref")
load("@rules_erlang//:dialyze.bzl", "dialyze", "plt")
load("//:rabbitmq_home.bzl", "rabbitmq_home")
load("//:rabbitmq_run.bzl", "rabbitmq_run")
load(
"//:rabbitmq.bzl",
"RABBITMQ_DIALYZER_OPTS",
"assert_suites",
"broker_for_integration_suites",
"rabbitmq_app",
"rabbitmq_integration_suite",
)
Expand Down Expand Up @@ -85,7 +86,19 @@ eunit(
target = ":test_erlang_app",
)

broker_for_integration_suites()
rabbitmq_home(
name = "broker-for-tests-home",
plugins = [
"//deps/rabbit:erlang_app",
"//deps/rabbitmq_stream:erlang_app",
":erlang_app",
],
)

rabbitmq_run(
name = "rabbitmq-for-tests-run",
home = ":broker-for-tests-home",
)

rabbitmq_integration_suite(
name = "config_schema_SUITE",
Expand All @@ -96,6 +109,10 @@ rabbitmq_integration_suite(
name = "rabbit_prometheus_http_SUITE",
size = "medium",
flaky = True,
runtime_deps = [
"//deps/rabbitmq_stream_common:erlang_app",
"//deps/rabbitmq_stream:erlang_app",
],
)

assert_suites()
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_prometheus/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
PROJECT_MOD := rabbit_prometheus_app
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_stream

EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
]},

%% the family name for this metric is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
{stream_consumer_metrics, [
{2, undefined, stream_consumer_max_offset_lag, gauge, "Current maximum of offset lag of consumers"}
]},

{connection_metrics, [
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
Expand Down Expand Up @@ -578,6 +583,17 @@ get_data(channel_metrics = Table, false, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(stream_consumer_metrics = MF, false, _) ->
Table = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({_, Props}, OldMax) ->
erlang:max(proplists:get_value(offset_lag, Props, 0), OldMax)
end, 0, Table) of
MaxOffsetLag ->
[{MF, MaxOffsetLag}]
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
Expand Down Expand Up @@ -708,6 +724,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
end, [], Table);
get_data(queue_consumer_count, true, _) ->
ets:tab2list(queue_metrics);
get_data(stream_consumer_metrics, true, _) ->
Table = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({{QueueName, _Pid, _SubId}, Props}, Map0) ->
Value = proplists:get_value(offset_lag, Props, 0),
maps:update_with(
QueueName,
fun(OldMax) -> erlang:max(Value, OldMax) end,
Value,
Map0)
end, #{}, Table) of
Map1 ->
maps:to_list(Map1)
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(vhost_status, _, _) ->
[ { #{<<"vhost">> => VHost},
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
Expand Down
Loading
Loading