Skip to content

Commit

Permalink
4.1: Avoid an exception when an AMQP 0-9-1-originating message with e…
Browse files Browse the repository at this point in the history
…xpiration set is converted for an MQTT consumer (#12710)

* MQTT: avoid an exception

when an AMQP 0-9-1 publisher publishes a message
that has expiration set.

Stack trace was contributed in #12707 by @rdsilio.

* mc_mqtt_SUITE test for #12707 #12710

* MQTT protocol_interop_SUITE: new test for #12710 #12707

* Simplify tests

---------

Co-authored-by: David Ansari <[email protected]>
  • Loading branch information
michaelklishin and ansd authored Nov 13, 2024
1 parent 6c16b4d commit c78bc8a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
2 changes: 1 addition & 1 deletion deps/rabbitmq_mqtt/src/mc_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ protocol_state(Msg = #mqtt_msg{props = Props0,
undefined ->
Props2;
Ttl ->
case maps:get(?ANN_TIMESTAMP, Anns) of
case maps:get(?ANN_TIMESTAMP, Anns, undefined) of
undefined ->
Props2;
Timestamp ->
Expand Down
15 changes: 14 additions & 1 deletion deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ groups() ->
mqtt_amqp,
mqtt_amqp_alt,
amqp_mqtt,
is_persistent
is_persistent,
amqpl_to_mqtt_gh_12707
]}
].

Expand Down Expand Up @@ -160,6 +161,18 @@ roundtrip_amqpl(_Config) ->
ExpectedUserProperty = lists:keysort(1, UserProperty),
?assertMatch(#{'User-Property' := ExpectedUserProperty}, Props).

amqpl_to_mqtt_gh_12707(_Config) ->
Props = #'P_basic'{expiration = <<"12707">>},
Payload = [<<"gh_12707">>],
Content = #content{properties = Props,
payload_fragments_rev = Payload},
Anns = #{?ANN_EXCHANGE => <<"amq.topic">>,
?ANN_ROUTING_KEYS => [<<"dummy">>]},
OriginalMsg = mc:init(mc_amqpl, Content, Anns),
Converted = mc:convert(mc_mqtt, OriginalMsg),
?assertMatch(#mqtt_msg{}, mc:protocol_state(Converted)),
?assertEqual(12707, mc:get_annotation(ttl, Converted)).

%% Non-UTF-8 Correlation Data should also be converted (via AMQP 0.9.1 header x-correlation-id).
roundtrip_amqpl_correlation(_Config) ->
Msg0 = mqtt_msg(),
Expand Down
28 changes: 27 additions & 1 deletion deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ groups() ->
[{cluster_size_1, [shuffle],
[
mqtt_amqpl_mqtt,
amqpl_mqtt_gh_12707,
mqtt_amqp_mqtt,
amqp_mqtt_amqp,
mqtt_stomp_mqtt,
Expand Down Expand Up @@ -104,7 +105,6 @@ mqtt_amqpl_mqtt(Config) ->
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
exchange = <<"amq.topic">>,
routing_key = <<"my.topic">>}),
%% MQTT 5.0 to AMQP 0.9.1
C = connect(ClientId, Config),
MqttResponseTopic = <<"response/topic">>,
{ok, _, [1]} = emqtt:subscribe(C, #{'Subscription-Identifier' => 999}, [{MqttResponseTopic, [{qos, 1}]}]),
Expand Down Expand Up @@ -169,6 +169,32 @@ mqtt_amqpl_mqtt(Config) ->

ok = emqtt:disconnect(C).

amqpl_mqtt_gh_12707(Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
Topic = Payload = <<"gh_12707">>,
C = connect(ClientId, Config),
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),

Ch = rabbit_ct_client_helpers:open_channel(Config),
amqp_channel:call(Ch,
#'basic.publish'{exchange = <<"amq.topic">>,
routing_key = Topic},
#amqp_msg{payload = Payload,
props = #'P_basic'{expiration = <<"12707">>,
headers = []}}),

receive {publish,
#{topic := MqttTopic,
payload := MqttPayload}} ->
?assertEqual(Topic, MqttTopic),
?assertEqual(Payload, MqttPayload)
after 5000 ->
ct:fail("did not receive a delivery")
end,

ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = emqtt:disconnect(C).

mqtt_amqp_mqtt(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Expand Down

0 comments on commit c78bc8a

Please sign in to comment.