Skip to content

Commit 8ffe3a6

Browse files
committed
QQ: track discarded bytes and take snapshots based on that.
1 parent 5f8c094 commit 8ffe3a6

10 files changed

+803
-208
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 534 additions & 159 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,31 @@
180180
unused_3 = ?NIL
181181
}).
182182

183+
-record(messages,
184+
{
185+
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
186+
messages_total = 0 :: non_neg_integer(),
187+
% queue of returned msg_in_ids - when checking out it picks from
188+
returns = lqueue:new() :: lqueue:lqueue(term())
189+
}).
190+
191+
-record(dlx_consumer,
192+
{pid :: pid(),
193+
prefetch :: non_neg_integer(),
194+
checked_out = #{} :: #{msg_id() =>
195+
optimised_tuple(rabbit_dead_letter:reason(), msg())},
196+
next_msg_id = 0 :: msg_id()}).
197+
198+
-record(rabbit_fifo_dlx,
199+
{consumer :: option(#dlx_consumer{}),
200+
%% Queue of dead-lettered messages.
201+
discards = lqueue:new() :: lqueue:lqueue(optimised_tuple(rabbit_dead_letter:reason(), msg())),
202+
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
203+
%% so that we get the smallest ra index in O(1).
204+
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
205+
msg_bytes = 0 :: non_neg_integer(),
206+
msg_bytes_checkout = 0 :: non_neg_integer()}).
207+
183208
-record(rabbit_fifo,
184209
{cfg :: #cfg{},
185210
% unassigned messages
@@ -207,7 +232,7 @@
207232
% consumers that require further service are queued here
208233
service_queue = priority_queue:new() :: priority_queue:q(),
209234
%% state for at-least-once dead-lettering
210-
dlx = rabbit_fifo_dlx:init() :: rabbit_fifo_dlx:state(),
235+
dlx = #rabbit_fifo_dlx{} :: #rabbit_fifo_dlx{},
211236
msg_bytes_enqueue = 0 :: non_neg_integer(),
212237
msg_bytes_checkout = 0 :: non_neg_integer(),
213238
%% one is picked if active consumer is cancelled or dies

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ enqueue(QName, Correlation, Msg,
147147
{reject_publish, State0};
148148
{error, {shutdown, delete}} ->
149149
?LOG_DEBUG("~ts: QQ ~ts tried to register enqueuer during delete shutdown",
150-
[?MODULE, rabbit_misc:rs(QName)]),
150+
[?MODULE, rabbit_misc:rs(QName)]),
151151
{reject_publish, State0};
152152
{timeout, _} ->
153153
{reject_publish, State0};
154154
Err ->
155155
?LOG_DEBUG("~ts: QQ ~ts error when registering enqueuer ~p",
156-
[?MODULE, rabbit_misc:rs(QName), Err]),
156+
[?MODULE, rabbit_misc:rs(QName), Err]),
157157
exit(Err)
158158
end;
159159
enqueue(_QName, _Correlation, _Msg,

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
-module(rabbit_fifo_dlx).
88

99
-include("rabbit_fifo_dlx.hrl").
10-
-include("rabbit_fifo.hrl").
10+
-include("rabbit_fifo_v7.hrl").
1111
-include_lib("kernel/include/logger.hrl").
1212
-compile({no_auto_import, [apply/3]}).
1313

deps/rabbit/src/rabbit_fifo_dlx.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
-record(dlx_consumer,
88
{pid :: pid(),
99
prefetch :: non_neg_integer(),
10-
checked_out = #{} :: #{msg_id() => optimised_tuple(rabbit_dead_letter:reason(), msg())},
10+
checked_out = #{} :: #{msg_id() =>
11+
optimised_tuple(rabbit_dead_letter:reason(), msg())},
1112
next_msg_id = 0 :: msg_id()}).
1213

1314
-record(rabbit_fifo_dlx,

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -375,13 +375,11 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
375375
PolicyConfig = gather_policy_config(Q, true),
376376
QName = amqqueue:get_name(Q),
377377
{Name, _} = amqqueue:get_pid(Q),
378-
PolicyConfig#{
379-
name => Name,
380-
queue_resource => QName,
381-
become_leader_handler => {?MODULE, become_leader, [QName]},
382-
single_active_consumer_on => single_active_consumer_on(Q),
383-
created => erlang:system_time(millisecond)
384-
}.
378+
PolicyConfig#{name => Name,
379+
queue_resource => QName,
380+
single_active_consumer_on => single_active_consumer_on(Q),
381+
created => erlang:system_time(millisecond)
382+
}.
385383

386384
resolve_delivery_limit(PolVal, ArgVal)
387385
when PolVal < 0 orelse ArgVal < 0 ->
@@ -679,13 +677,13 @@ handle_tick(QName,
679677
catch
680678
_:Err ->
681679
?LOG_DEBUG("~ts: handle tick failed with ~p",
682-
[rabbit_misc:rs(QName), Err]),
680+
[rabbit_misc:rs(QName), Err]),
683681
ok
684682
end
685683
end);
686684
handle_tick(QName, Config, _Nodes) ->
687685
?LOG_DEBUG("~ts: handle tick received unexpected config format ~tp",
688-
[rabbit_misc:rs(QName), Config]).
686+
[rabbit_misc:rs(QName), Config]).
689687

690688
repair_leader_record(Q, Name) ->
691689
Node = node(),
@@ -696,7 +694,7 @@ repair_leader_record(Q, Name) ->
696694
_ ->
697695
QName = amqqueue:get_name(Q),
698696
?LOG_DEBUG("~ts: updating leader record to current node ~ts",
699-
[rabbit_misc:rs(QName), Node]),
697+
[rabbit_misc:rs(QName), Node]),
700698
ok = become_leader0(QName, Name),
701699
ok
702700
end,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3633,16 +3633,18 @@ message_bytes_metrics(Config) ->
36333633
wait_for_messages_pending_ack(Servers, RaName, 0),
36343634
rabbit_ct_helpers:await_condition(
36353635
fun() ->
3636-
{3, 3, 0} == get_message_bytes(Leader, QRes)
3636+
{M, M, 0} = get_message_bytes(Leader, QRes),
3637+
M > 0
36373638
end, 30000),
36383639

3640+
{MsgSize, _, _} = get_message_bytes(Leader, QRes),
36393641
subscribe(Ch, QQ, false),
36403642

36413643
wait_for_messages_ready(Servers, RaName, 0),
36423644
wait_for_messages_pending_ack(Servers, RaName, 1),
36433645
rabbit_ct_helpers:await_condition(
36443646
fun() ->
3645-
{3, 0, 3} == get_message_bytes(Leader, QRes)
3647+
{MsgSize, 0, MsgSize} == get_message_bytes(Leader, QRes)
36463648
end, 30000),
36473649

36483650
receive
@@ -3667,7 +3669,7 @@ message_bytes_metrics(Config) ->
36673669
wait_for_messages_pending_ack(Servers, RaName, 1),
36683670
rabbit_ct_helpers:await_condition(
36693671
fun() ->
3670-
{3, 0, 3} == get_message_bytes(Leader, QRes)
3672+
{MsgSize, 0, MsgSize} == get_message_bytes(Leader, QRes)
36713673
end, 30000),
36723674

36733675
rabbit_ct_client_helpers:close_channel(Ch),
@@ -3676,7 +3678,7 @@ message_bytes_metrics(Config) ->
36763678
wait_for_messages_pending_ack(Servers, RaName, 0),
36773679
rabbit_ct_helpers:await_condition(
36783680
fun() ->
3679-
{3, 3, 0} == get_message_bytes(Leader, QRes)
3681+
{MsgSize, MsgSize, 0} == get_message_bytes(Leader, QRes)
36803682
end, 30000),
36813683
ok.
36823684

0 commit comments

Comments
 (0)