From c78bc8a9c304074a32de2f5a4f69d3b0b7a44d7d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 13 Nov 2024 03:20:43 -0500 Subject: [PATCH] 4.1: Avoid an exception when an AMQP 0-9-1-originating message with expiration set is converted for an MQTT consumer (#12710) * MQTT: avoid an exception when an AMQP 0-9-1 publisher publishes a message that has expiration set. Stack trace was contributed in #12707 by @rdsilio. * mc_mqtt_SUITE test for #12707 #12710 * MQTT protocol_interop_SUITE: new test for #12710 #12707 * Simplify tests --------- Co-authored-by: David Ansari --- deps/rabbitmq_mqtt/src/mc_mqtt.erl | 2 +- deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl | 15 +++++++++- .../test/protocol_interop_SUITE.erl | 28 ++++++++++++++++++- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index 656b44dd8b7b..ff2ce997da45 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -432,7 +432,7 @@ protocol_state(Msg = #mqtt_msg{props = Props0, undefined -> Props2; Ttl -> - case maps:get(?ANN_TIMESTAMP, Anns) of + case maps:get(?ANN_TIMESTAMP, Anns, undefined) of undefined -> Props2; Timestamp -> diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index c6d1308e9ad2..83600523a741 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -33,7 +33,8 @@ groups() -> mqtt_amqp, mqtt_amqp_alt, amqp_mqtt, - is_persistent + is_persistent, + amqpl_to_mqtt_gh_12707 ]} ]. @@ -160,6 +161,18 @@ roundtrip_amqpl(_Config) -> ExpectedUserProperty = lists:keysort(1, UserProperty), ?assertMatch(#{'User-Property' := ExpectedUserProperty}, Props). +amqpl_to_mqtt_gh_12707(_Config) -> + Props = #'P_basic'{expiration = <<"12707">>}, + Payload = [<<"gh_12707">>], + Content = #content{properties = Props, + payload_fragments_rev = Payload}, + Anns = #{?ANN_EXCHANGE => <<"amq.topic">>, + ?ANN_ROUTING_KEYS => [<<"dummy">>]}, + OriginalMsg = mc:init(mc_amqpl, Content, Anns), + Converted = mc:convert(mc_mqtt, OriginalMsg), + ?assertMatch(#mqtt_msg{}, mc:protocol_state(Converted)), + ?assertEqual(12707, mc:get_annotation(ttl, Converted)). + %% Non-UTF-8 Correlation Data should also be converted (via AMQP 0.9.1 header x-correlation-id). roundtrip_amqpl_correlation(_Config) -> Msg0 = mqtt_msg(), diff --git a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl index 249e335e2afd..723e4e43e4ef 100644 --- a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl @@ -35,6 +35,7 @@ groups() -> [{cluster_size_1, [shuffle], [ mqtt_amqpl_mqtt, + amqpl_mqtt_gh_12707, mqtt_amqp_mqtt, amqp_mqtt_amqp, mqtt_stomp_mqtt, @@ -104,7 +105,6 @@ mqtt_amqpl_mqtt(Config) -> #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q, exchange = <<"amq.topic">>, routing_key = <<"my.topic">>}), - %% MQTT 5.0 to AMQP 0.9.1 C = connect(ClientId, Config), MqttResponseTopic = <<"response/topic">>, {ok, _, [1]} = emqtt:subscribe(C, #{'Subscription-Identifier' => 999}, [{MqttResponseTopic, [{qos, 1}]}]), @@ -169,6 +169,32 @@ mqtt_amqpl_mqtt(Config) -> ok = emqtt:disconnect(C). +amqpl_mqtt_gh_12707(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = Payload = <<"gh_12707">>, + C = connect(ClientId, Config), + {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), + + Ch = rabbit_ct_client_helpers:open_channel(Config), + amqp_channel:call(Ch, + #'basic.publish'{exchange = <<"amq.topic">>, + routing_key = Topic}, + #amqp_msg{payload = Payload, + props = #'P_basic'{expiration = <<"12707">>, + headers = []}}), + + receive {publish, + #{topic := MqttTopic, + payload := MqttPayload}} -> + ?assertEqual(Topic, MqttTopic), + ?assertEqual(Payload, MqttPayload) + after 5000 -> + ct:fail("did not receive a delivery") + end, + + ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = emqtt:disconnect(C). + mqtt_amqp_mqtt(Config) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),