diff --git a/deps/amqp10_common/src/serial_number.erl b/deps/amqp10_common/src/serial_number.erl index 8f6cabcf1515..5adc5f82b09c 100644 --- a/deps/amqp10_common/src/serial_number.erl +++ b/deps/amqp10_common/src/serial_number.erl @@ -15,9 +15,8 @@ diff/2, foldl/4]). --ifdef(TEST). +%% For tests. -export([usort/1]). --endif. -type serial_number() :: sequence_no(). -export_type([serial_number/0]). diff --git a/deps/rabbitmq_ct_helpers/BUILD.bazel b/deps/rabbitmq_ct_helpers/BUILD.bazel index 1002b4289a8a..b5167a076972 100644 --- a/deps/rabbitmq_ct_helpers/BUILD.bazel +++ b/deps/rabbitmq_ct_helpers/BUILD.bazel @@ -42,7 +42,9 @@ rabbitmq_app( license_files = [":license_files"], priv = [":priv"], deps = [ + "//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", + "//deps/rabbitmq_stream_common:erlang_app", "@meck//:erlang_app", "@proper//:erlang_app", "@ra//:erlang_app", diff --git a/deps/rabbitmq_ct_helpers/Makefile b/deps/rabbitmq_ct_helpers/Makefile index 2e1f19839036..be609ab79070 100644 --- a/deps/rabbitmq_ct_helpers/Makefile +++ b/deps/rabbitmq_ct_helpers/Makefile @@ -1,7 +1,7 @@ PROJECT = rabbitmq_ct_helpers PROJECT_DESCRIPTION = Common Test helpers for RabbitMQ -DEPS = rabbit_common proper inet_tcp_proxy meck +DEPS = rabbit_common amqp10_common rabbitmq_stream_common proper inet_tcp_proxy meck TEST_DEPS = rabbit XREF_IGNORE = [ \ diff --git a/deps/rabbitmq_ct_helpers/app.bzl b/deps/rabbitmq_ct_helpers/app.bzl index 7f56b8dfcbab..dfb1163d4435 100644 --- a/deps/rabbitmq_ct_helpers/app.bzl +++ b/deps/rabbitmq_ct_helpers/app.bzl @@ -19,12 +19,13 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_ct_proper_helpers.erl", "src/rabbit_ct_vm_helpers.erl", "src/rabbit_mgmt_test_util.erl", + "src/stream_test_utils.erl", ], hdrs = [":public_and_private_hdrs"], app_name = "rabbitmq_ct_helpers", dest = "ebin", erlc_opts = "//:erlc_opts", - deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"], + deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", "@proper//:erlang_app"], ) def all_test_beam_files(name = "all_test_beam_files"): @@ -45,12 +46,13 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_ct_proper_helpers.erl", "src/rabbit_ct_vm_helpers.erl", "src/rabbit_mgmt_test_util.erl", + "src/stream_test_utils.erl", ], hdrs = [":public_and_private_hdrs"], app_name = "rabbitmq_ct_helpers", dest = "test", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"], + deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit_common:erlang_app", "@proper//:erlang_app"], ) def all_srcs(name = "all_srcs"): @@ -107,6 +109,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_ct_proper_helpers.erl", "src/rabbit_ct_vm_helpers.erl", "src/rabbit_mgmt_test_util.erl", + "src/stream_test_utils.erl", ], ) diff --git a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl new file mode 100644 index 000000000000..ee4a8d789302 --- /dev/null +++ b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl @@ -0,0 +1,137 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% There is no open source Erlang RabbitMQ Stream client. +%% Therefore, we have to build the Stream protocol commands manually. + +-module(stream_test_utils). + +-compile([export_all, nowarn_export_all]). + +-include_lib("amqp10_common/include/amqp10_framing.hrl"). + +-define(RESPONSE_CODE_OK, 1). + +connect(Config, Node) -> + StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream), + {ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]), + + C0 = rabbit_stream_core:init(0), + PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}), + ok = gen_tcp:send(Sock, PeerPropertiesFrame), + {{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0), + + ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 1, sasl_handshake})), + {{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(Sock, C1), + Username = <<"guest">>, + Password = <<"guest">>, + Null = 0, + PlainSasl = <>, + ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})), + {{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(Sock, C2), + {{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(Sock, C3), + + ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})), + ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})), + {{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4), + {ok, Sock, C5}. + +create_stream(Sock, C0, Stream) -> + CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}), + ok = gen_tcp:send(Sock, CreateStreamFrame), + {{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), + {ok, C1}. + +declare_publisher(Sock, C0, Stream, PublisherId) -> + DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}), + ok = gen_tcp:send(Sock, DeclarePublisherFrame), + {{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), + {ok, C1}. + +subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) -> + SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}), + ok = gen_tcp:send(Sock, SubscribeFrame), + {{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0), + {ok, C1}. + +publish(Sock, C0, PublisherId, Sequence0, Payloads) -> + SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1), + Messages = [simple_entry(Seq, P) + || {Seq, P} <- lists:zip(SeqIds, Payloads)], + {ok, SeqIds, C1} = publish_entries(Sock, C0, PublisherId, length(Messages), Messages), + {ok, C1}. + +publish_entries(Sock, C0, PublisherId, MsgCount, Messages) -> + PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}), + ok = gen_tcp:send(Sock, PublishFrame1), + {{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0), + {ok, SeqIds, C1}. + +%% Streams contain AMQP 1.0 encoded messages. +%% In this case, the AMQP 1.0 encoded message contains a single data section. +simple_entry(Sequence, Body) + when is_binary(Body) -> + DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + DataSectSize = byte_size(DataSect), + <>. + +%% Streams contain AMQP 1.0 encoded messages. +%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section. +simple_entry(Sequence, Body, AppProps) + when is_binary(Body) -> + AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)), + DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + Sects = <>, + SectSize = byte_size(Sects), + <>. + +%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section. +%% All data sections are delivered uncompressed in 1 batch. +sub_batch_entry_uncompressed(Sequence, Bodies) -> + Batch = lists:foldl(fun(Body, Acc) -> + AppProps = #'v1_0.application_properties'{ + content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]}, + Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)), + Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + Sect = <>, + <> + end, <<>>, Bodies), + Size = byte_size(Batch), + <>. + +%% Here, each AMQP 1.0 encoded message contains a single data section. +%% All data sections are delivered in 1 gzip compressed batch. +sub_batch_entry_compressed(Sequence, Bodies) -> + Uncompressed = lists:foldl(fun(Body, Acc) -> + Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), + <> + end, <<>>, Bodies), + Compressed = zlib:gzip(Uncompressed), + CompressedLen = byte_size(Compressed), + <>. + +receive_stream_commands(Sock, C0) -> + case rabbit_stream_core:next_command(C0) of + empty -> + case gen_tcp:recv(Sock, 0, 5000) of + {ok, Data} -> + C1 = rabbit_stream_core:incoming_data(Data, C0), + case rabbit_stream_core:next_command(C1) of + empty -> + {ok, Data2} = gen_tcp:recv(Sock, 0, 5000), + rabbit_stream_core:next_command( + rabbit_stream_core:incoming_data(Data2, C1)); + Res -> + Res + end; + {error, Err} -> + ct:fail("error receiving stream data ~w", [Err]) + end; + Res -> + Res + end. diff --git a/deps/rabbitmq_prometheus/BUILD.bazel b/deps/rabbitmq_prometheus/BUILD.bazel index b0d71c0cda52..be99cf75537e 100644 --- a/deps/rabbitmq_prometheus/BUILD.bazel +++ b/deps/rabbitmq_prometheus/BUILD.bazel @@ -1,11 +1,12 @@ load("@rules_erlang//:eunit2.bzl", "eunit") load("@rules_erlang//:xref2.bzl", "xref") load("@rules_erlang//:dialyze.bzl", "dialyze", "plt") +load("//:rabbitmq_home.bzl", "rabbitmq_home") +load("//:rabbitmq_run.bzl", "rabbitmq_run") load( "//:rabbitmq.bzl", "RABBITMQ_DIALYZER_OPTS", "assert_suites", - "broker_for_integration_suites", "rabbitmq_app", "rabbitmq_integration_suite", ) @@ -85,7 +86,19 @@ eunit( target = ":test_erlang_app", ) -broker_for_integration_suites() +rabbitmq_home( + name = "broker-for-tests-home", + plugins = [ + "//deps/rabbit:erlang_app", + "//deps/rabbitmq_stream:erlang_app", + ":erlang_app", + ], +) + +rabbitmq_run( + name = "rabbitmq-for-tests-run", + home = ":broker-for-tests-home", +) rabbitmq_integration_suite( name = "config_schema_SUITE", @@ -96,6 +109,10 @@ rabbitmq_integration_suite( name = "rabbit_prometheus_http_SUITE", size = "medium", flaky = True, + runtime_deps = [ + "//deps/rabbitmq_stream_common:erlang_app", + "//deps/rabbitmq_stream:erlang_app", + ], ) assert_suites() diff --git a/deps/rabbitmq_prometheus/Makefile b/deps/rabbitmq_prometheus/Makefile index 8380e81b9a7b..be43cf45e9fa 100644 --- a/deps/rabbitmq_prometheus/Makefile +++ b/deps/rabbitmq_prometheus/Makefile @@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ PROJECT_MOD := rabbit_prometheus_app DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch BUILD_DEPS = amqp_client rabbit_common rabbitmq_management -TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters +TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_stream EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}} diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index c6dfd43e2ffa..c84fbc8df8c9 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -200,6 +200,11 @@ {4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"} ]}, + %% the family name for this metric is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created. + {stream_consumer_metrics, [ + {2, undefined, stream_consumer_max_offset_lag, gauge, "Current maximum of offset lag of consumers"} + ]}, + {connection_metrics, [ {2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt}, {2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt}, @@ -578,6 +583,17 @@ get_data(channel_metrics = Table, false, _) -> [{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3}, {messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6}, {global_prefetch_count, A7}]}]; +get_data(stream_consumer_metrics = MF, false, _) -> + Table = rabbit_stream_consumer_created, %% real table name + try ets:foldl(fun({_, Props}, OldMax) -> + erlang:max(proplists:get_value(offset_lag, Props, 0), OldMax) + end, 0, Table) of + MaxOffsetLag -> + [{MF, MaxOffsetLag}] + catch error:badarg -> + %% rabbitmq_stream plugin is not enabled + [] + end; get_data(queue_consumer_count = MF, false, VHostsFilter) -> Table = queue_metrics, %% Real table name {_, A1} = ets:foldl(fun @@ -708,6 +724,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics end, [], Table); get_data(queue_consumer_count, true, _) -> ets:tab2list(queue_metrics); +get_data(stream_consumer_metrics, true, _) -> + Table = rabbit_stream_consumer_created, %% real table name + try ets:foldl(fun({{QueueName, _Pid, _SubId}, Props}, Map0) -> + Value = proplists:get_value(offset_lag, Props, 0), + maps:update_with( + QueueName, + fun(OldMax) -> erlang:max(Value, OldMax) end, + Value, + Map0) + end, #{}, Table) of + Map1 -> + maps:to_list(Map1) + catch error:badarg -> + %% rabbitmq_stream plugin is not enabled + [] + end; get_data(vhost_status, _, _) -> [ { #{<<"vhost">> => VHost}, case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 50d747c4578a..4aaf622bdcd4 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -11,8 +11,9 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). --compile(export_all). +-compile([export_all, nowarn_export_all]). all() -> [ @@ -68,7 +69,8 @@ groups() -> queue_consumer_count_and_queue_metrics_mutually_exclusive_test, vhost_status_metric, exchange_bindings_metric, - exchange_names_metric + exchange_names_metric, + stream_pub_sub_metrics ]}, {special_chars, [], [core_metrics_special_chars]}, {authentication, [], [basic_auth]} @@ -708,6 +710,37 @@ exchange_names_metric(Config) -> }, Names), ok. +stream_pub_sub_metrics(Config) -> + Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", + MsgPerBatch1 = 2, + publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), + Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2", + MsgPerBatch2 = 3, + publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config), + + %% aggregated metrics + + %% wait for the stream to emit stats + %% (collect_statistics_interval set to 100ms in this test group) + ?awaitMatch(V when V == #{rabbitmq_stream_consumer_max_offset_lag => #{undefined => [3]}}, + begin + {_, Body1} = http_get_with_pal(Config, "/metrics", [], 200), + maps:with([rabbitmq_stream_consumer_max_offset_lag], + parse_response(Body1)) + end, + 100), + + %% per-object metrics + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics", + [], 200), + ParsedBody2 = parse_response(Body2), + #{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2, + + ?assertEqual([{#{vhost => "/", queue => Stream1}, [2]}, + {#{vhost => "/", queue => Stream2}, [3]}], + lists:sort(maps:to_list(MaxOffsetLag))), + ok. + core_metrics_special_chars(Config) -> {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200), ?assertMatch(#{rabbitmq_detailed_queue_messages := @@ -753,6 +786,30 @@ basic_auth(Config) -> rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>), rabbit_ct_broker_helpers:delete_user(Config, <<"management">>). +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +publish_via_stream_protocol(Stream, MsgPerBatch, Config) -> + {ok, S, C0} = stream_test_utils:connect(Config, 0), + {ok, C1} = stream_test_utils:create_stream(S, C0, Stream), + PublisherId = 98, + {ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId), + Payloads = lists:duplicate(MsgPerBatch, <<"m1">>), + SequenceFrom1 = 1, + {ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads), + + PublisherId2 = 99, + {ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2), + Payloads2 = lists:duplicate(MsgPerBatch, <<"m2">>), + SequenceFrom2 = SequenceFrom1 + MsgPerBatch, + {ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, SequenceFrom2, Payloads2), + + SubscriptionId = 97, + {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1), + %% delivery of first batch of messages + {{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6), + ok. http_get(Config, ReqHeaders, CodeExp) -> Path = proplists:get_value(prometheus_path, Config, "/metrics"), diff --git a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl index 872424f53224..17d28e2f93d6 100644 --- a/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_stream/test/protocol_interop_SUITE.erl @@ -275,94 +275,30 @@ amqp_attach_sub_batch(Config) -> %% ------------------------------------------------------------------- publish_via_stream_protocol(Stream, Config) -> - %% There is no open source Erlang RabbitMQ Stream client. - %% Therefore, we have to build the Stream protocol commands manually. - - StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream), - {ok, S} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]), - - C0 = rabbit_stream_core:init(0), - PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}), - ok = gen_tcp:send(S, PeerPropertiesFrame), - {{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(S, C0), - - ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 1, sasl_handshake})), - {{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(S, C1), - Username = <<"guest">>, - Password = <<"guest">>, - Null = 0, - PlainSasl = <>, - ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})), - {{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(S, C2), - {{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(S, C3), - - ok = gen_tcp:send(S, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})), - ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})), - {{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(S, C4), - - CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}), - ok = gen_tcp:send(S, CreateStreamFrame), - {{response, 1, {create_stream, _}}, C6} = receive_stream_commands(S, C5), + {ok, S, C0} = stream_test_utils:connect(Config, 0), + + {ok, C1} = stream_test_utils:create_stream(S, C0, Stream), PublisherId = 99, - DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}), - ok = gen_tcp:send(S, DeclarePublisherFrame), - {{response, 1, {declare_publisher, _}}, C7} = receive_stream_commands(S, C6), + {ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId), - M1 = simple_entry(1, <<"m1">>), - M2 = simple_entry(2, <<"m2">>), - M3 = simple_entry(3, <<"m3">>), + M1 = stream_test_utils:simple_entry(1, <<"m1">>), + M2 = stream_test_utils:simple_entry(2, <<"m2">>), + M3 = stream_test_utils:simple_entry(3, <<"m3">>), Messages1 = [M1, M2, M3], - PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, length(Messages1), Messages1}), - ok = gen_tcp:send(S, PublishFrame1), - {{publish_confirm, PublisherId, _}, C8} = receive_stream_commands(S, C7), - - UncompressedSubbatch = sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]), - PublishFrame2 = rabbit_stream_core:frame({publish, PublisherId, 3, UncompressedSubbatch}), - ok = gen_tcp:send(S, PublishFrame2), - {{publish_confirm, PublisherId, _}, C9} = receive_stream_commands(S, C8), - - CompressedSubbatch = sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]), - PublishFrame3 = rabbit_stream_core:frame({publish, PublisherId, 3, CompressedSubbatch}), - ok = gen_tcp:send(S, PublishFrame3), - {{publish_confirm, PublisherId, _}, C10} = receive_stream_commands(S, C9), - - M10 = simple_entry(6, <<"m10">>), - M11 = simple_entry(7, <<"m11">>), + + {ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1), + + UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]), + {ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch), + + CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]), + {ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch), + + M10 = stream_test_utils:simple_entry(6, <<"m10">>), + M11 = stream_test_utils:simple_entry(7, <<"m11">>), Messages2 = [M10, M11], - PublishFrame4 = rabbit_stream_core:frame({publish, PublisherId, length(Messages2), Messages2}), - ok = gen_tcp:send(S, PublishFrame4), - {{publish_confirm, PublisherId, _}, _C11} = receive_stream_commands(S, C10). - -%% Streams contain AMQP 1.0 encoded messages. -%% In this case, the AMQP 1.0 encoded message contains a single data section. -simple_entry(Sequence, Body) - when is_binary(Body) -> - DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), - DataSectSize = byte_size(DataSect), - <>. - -%% Here, each AMQP 1.0 encoded message contains a single data section. -%% All data sections are delivered uncompressed in 1 batch. -sub_batch_entry_uncompressed(Sequence, Bodies) -> - Batch = lists:foldl(fun(Body, Acc) -> - Sect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), - <> - end, <<>>, Bodies), - Size = byte_size(Batch), - <>. - -%% Here, each AMQP 1.0 encoded message contains a single data section. -%% All data sections are delivered in 1 gzip compressed batch. -sub_batch_entry_compressed(Sequence, Bodies) -> - Uncompressed = lists:foldl(fun(Body, Acc) -> - Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})), - <> - end, <<>>, Bodies), - Compressed = zlib:gzip(Uncompressed), - CompressedLen = byte_size(Compressed), - <>. + {ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2). connection_config(Config) -> Host = ?config(rmq_hostname, Config), @@ -372,27 +308,6 @@ connection_config(Config) -> container_id => <<"my container">>, sasl => {plain, <<"guest">>, <<"guest">>}}. -receive_stream_commands(Sock, C0) -> - case rabbit_stream_core:next_command(C0) of - empty -> - case gen_tcp:recv(Sock, 0, 5000) of - {ok, Data} -> - C1 = rabbit_stream_core:incoming_data(Data, C0), - case rabbit_stream_core:next_command(C1) of - empty -> - {ok, Data2} = gen_tcp:recv(Sock, 0, 5000), - rabbit_stream_core:next_command( - rabbit_stream_core:incoming_data(Data2, C1)); - Res -> - Res - end; - {error, Err} -> - ct:fail("error receiving stream data ~w", [Err]) - end; - Res -> - Res - end. - receive_amqp_messages(Receiver, N) -> receive_amqp_messages0(Receiver, N, []). diff --git a/moduleindex.yaml b/moduleindex.yaml index 50b27e97666b..24eaed75577c 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -871,6 +871,7 @@ rabbitmq_ct_helpers: - rabbit_ct_proper_helpers - rabbit_ct_vm_helpers - rabbit_mgmt_test_util +- stream_test_utils rabbitmq_event_exchange: - rabbit_event_exchange_decorator - rabbit_exchange_type_event