Skip to content

Commit c6802c5

Browse files
committed
QQ: track discarded bytes and take snapshots based on that.
1 parent 0053af8 commit c6802c5

10 files changed

+803
-207
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 534 additions & 159 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,31 @@
180180
unused_3 = ?NIL
181181
}).
182182

183+
-record(messages,
184+
{
185+
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
186+
messages_total = 0 :: non_neg_integer(),
187+
% queue of returned msg_in_ids - when checking out it picks from
188+
returns = lqueue:new() :: lqueue:lqueue(term())
189+
}).
190+
191+
-record(dlx_consumer,
192+
{pid :: pid(),
193+
prefetch :: non_neg_integer(),
194+
checked_out = #{} :: #{msg_id() =>
195+
optimised_tuple(rabbit_dead_letter:reason(), msg())},
196+
next_msg_id = 0 :: msg_id()}).
197+
198+
-record(rabbit_fifo_dlx,
199+
{consumer :: option(#dlx_consumer{}),
200+
%% Queue of dead-lettered messages.
201+
discards = lqueue:new() :: lqueue:lqueue(optimised_tuple(rabbit_dead_letter:reason(), msg())),
202+
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
203+
%% so that we get the smallest ra index in O(1).
204+
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
205+
msg_bytes = 0 :: non_neg_integer(),
206+
msg_bytes_checkout = 0 :: non_neg_integer()}).
207+
183208
-record(rabbit_fifo,
184209
{cfg :: #cfg{},
185210
% unassigned messages
@@ -207,7 +232,7 @@
207232
% consumers that require further service are queued here
208233
service_queue = priority_queue:new() :: priority_queue:q(),
209234
%% state for at-least-once dead-lettering
210-
dlx = rabbit_fifo_dlx:init() :: rabbit_fifo_dlx:state(),
235+
dlx = #rabbit_fifo_dlx{} :: #rabbit_fifo_dlx{},
211236
msg_bytes_enqueue = 0 :: non_neg_integer(),
212237
msg_bytes_checkout = 0 :: non_neg_integer(),
213238
%% one is picked if active consumer is cancelled or dies

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ enqueue(QName, Correlation, Msg,
147147
{reject_publish, State0};
148148
{error, {shutdown, delete}} ->
149149
?LOG_DEBUG("~ts: QQ ~ts tried to register enqueuer during delete shutdown",
150-
[?MODULE, rabbit_misc:rs(QName)]),
150+
[?MODULE, rabbit_misc:rs(QName)]),
151151
{reject_publish, State0};
152152
{timeout, _} ->
153153
{reject_publish, State0};
154154
Err ->
155155
?LOG_DEBUG("~ts: QQ ~ts error when registering enqueuer ~p",
156-
[?MODULE, rabbit_misc:rs(QName), Err]),
156+
[?MODULE, rabbit_misc:rs(QName), Err]),
157157
exit(Err)
158158
end;
159159
enqueue(_QName, _Correlation, _Msg,

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
-module(rabbit_fifo_dlx).
88

99
-include("rabbit_fifo_dlx.hrl").
10-
-include("rabbit_fifo.hrl").
10+
-include("rabbit_fifo_v7.hrl").
1111
-include_lib("kernel/include/logger.hrl").
1212
-compile({no_auto_import, [apply/3]}).
1313

deps/rabbit/src/rabbit_fifo_dlx.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
-record(dlx_consumer,
88
{pid :: pid(),
99
prefetch :: non_neg_integer(),
10-
checked_out = #{} :: #{msg_id() => optimised_tuple(rabbit_dead_letter:reason(), msg())},
10+
checked_out = #{} :: #{msg_id() =>
11+
optimised_tuple(rabbit_dead_letter:reason(), msg())},
1112
next_msg_id = 0 :: msg_id()}).
1213

1314
-record(rabbit_fifo_dlx,

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -382,13 +382,11 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
382382
PolicyConfig = gather_policy_config(Q, true),
383383
QName = amqqueue:get_name(Q),
384384
{Name, _} = amqqueue:get_pid(Q),
385-
PolicyConfig#{
386-
name => Name,
387-
queue_resource => QName,
388-
become_leader_handler => {?MODULE, become_leader, [QName]},
389-
single_active_consumer_on => single_active_consumer_on(Q),
390-
created => erlang:system_time(millisecond)
391-
}.
385+
PolicyConfig#{name => Name,
386+
queue_resource => QName,
387+
single_active_consumer_on => single_active_consumer_on(Q),
388+
created => erlang:system_time(millisecond)
389+
}.
392390

393391
resolve_delivery_limit(PolVal, ArgVal)
394392
when PolVal < 0 orelse ArgVal < 0 ->
@@ -686,13 +684,13 @@ handle_tick(QName,
686684
catch
687685
_:Err ->
688686
?LOG_DEBUG("~ts: handle tick failed with ~p",
689-
[rabbit_misc:rs(QName), Err]),
687+
[rabbit_misc:rs(QName), Err]),
690688
ok
691689
end
692690
end);
693691
handle_tick(QName, Config, _Nodes) ->
694692
?LOG_DEBUG("~ts: handle tick received unexpected config format ~tp",
695-
[rabbit_misc:rs(QName), Config]).
693+
[rabbit_misc:rs(QName), Config]).
696694

697695
repair_leader_record(Q, Name) ->
698696
Node = node(),
@@ -703,7 +701,7 @@ repair_leader_record(Q, Name) ->
703701
_ ->
704702
QName = amqqueue:get_name(Q),
705703
?LOG_DEBUG("~ts: updating leader record to current node ~ts",
706-
[rabbit_misc:rs(QName), Node]),
704+
[rabbit_misc:rs(QName), Node]),
707705
ok = become_leader0(QName, Name),
708706
ok
709707
end,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3585,16 +3585,18 @@ message_bytes_metrics(Config) ->
35853585
wait_for_messages_pending_ack(Servers, RaName, 0),
35863586
rabbit_ct_helpers:await_condition(
35873587
fun() ->
3588-
{3, 3, 0} == get_message_bytes(Leader, QRes)
3588+
{M, M, 0} = get_message_bytes(Leader, QRes),
3589+
M > 0
35893590
end, 30000),
35903591

3592+
{MsgSize, _, _} = get_message_bytes(Leader, QRes),
35913593
subscribe(Ch, QQ, false),
35923594

35933595
wait_for_messages_ready(Servers, RaName, 0),
35943596
wait_for_messages_pending_ack(Servers, RaName, 1),
35953597
rabbit_ct_helpers:await_condition(
35963598
fun() ->
3597-
{3, 0, 3} == get_message_bytes(Leader, QRes)
3599+
{MsgSize, 0, MsgSize} == get_message_bytes(Leader, QRes)
35983600
end, 30000),
35993601

36003602
receive
@@ -3619,7 +3621,7 @@ message_bytes_metrics(Config) ->
36193621
wait_for_messages_pending_ack(Servers, RaName, 1),
36203622
rabbit_ct_helpers:await_condition(
36213623
fun() ->
3622-
{3, 0, 3} == get_message_bytes(Leader, QRes)
3624+
{MsgSize, 0, MsgSize} == get_message_bytes(Leader, QRes)
36233625
end, 30000),
36243626

36253627
rabbit_ct_client_helpers:close_channel(Ch),
@@ -3628,7 +3630,7 @@ message_bytes_metrics(Config) ->
36283630
wait_for_messages_pending_ack(Servers, RaName, 0),
36293631
rabbit_ct_helpers:await_condition(
36303632
fun() ->
3631-
{3, 3, 0} == get_message_bytes(Leader, QRes)
3633+
{MsgSize, MsgSize, 0} == get_message_bytes(Leader, QRes)
36323634
end, 30000),
36333635
ok.
36343636

0 commit comments

Comments
 (0)