Skip to content
Draft

Ra v3 #13885

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ define ct_master.erl
endef

PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit

Expand Down
1,454 changes: 877 additions & 577 deletions deps/rabbit/src/rabbit_fifo.erl

Large diffs are not rendered by default.

63 changes: 58 additions & 5 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@

-define(DELIVERY_SEND_MSG_OPTS, [local, ra_event]).

%% constants for packed msg references where both the raft index and the size
%% is packed into a single immidate term
%%
%% 59 bytes as immedate ints are signed
-define(PACKED_MAX, 16#7FFF_FFFF_FFFF_FFF).
%% index bits - enough for 2000 days at 100k indexes p/sec
-define(PACKED_IDX_BITS, 44).
-define(PACKED_IDX_MAX, 16#FFFF_FFFF_FFF).
-define(PACKED_SZ_BITS, 15). %% size
-define(PACKED_SZ_MAX, 16#7FFF). %% 15 bits

-define(PACK(Idx, Sz),
(Idx bxor (Sz bsl ?PACKED_IDX_BITS))).
-define(PACKED_IDX(PackedInt),
(PackedInt band ?PACKED_IDX_MAX)).
-define(PACKED_SZ(PackedInt),
((PackedInt bsr 44) band 16#7FFF)).

-define(IS_PACKED(Int), (Int >= 0 andalso Int =< ?PACKED_MAX)).

-type optimised_tuple(A, B) :: nonempty_improper_list(A, B).

-type option(T) :: undefined | T.
Expand Down Expand Up @@ -57,7 +77,10 @@
-type msg_size() :: non_neg_integer().
%% the size in bytes of the msg payload

-type msg() :: optimised_tuple(ra:index(), msg_header()).
%% 60 byte integer, immediate
-type packed_msg() :: 0..?PACKED_MAX.

-type msg() :: packed_msg() | optimised_tuple(ra:index(), msg_header()).

-type delivery_msg() :: {msg_id(), {msg_header(), raw_msg()}}.
%% A tuple consisting of the message id, and the headered message.
Expand Down Expand Up @@ -105,6 +128,7 @@
%% once these many bytes have been written since the last checkpoint
%% we request a checkpoint irrespectively
-define(CHECK_MAX_BYTES, 128_000_000).
-define(SNAP_OUT_BYTES, 64_000_000).

-define(USE_AVG_HALF_LIFE, 10000.0).
%% an average QQ without any message uses about 100KB so setting this limit
Expand Down Expand Up @@ -179,16 +203,43 @@
unused_3 = ?NIL
}).

-record(messages,
{
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
messages_total = 0 :: non_neg_integer(),
% queue of returned msg_in_ids - when checking out it picks from
returns = lqueue:new() :: lqueue:lqueue(term())
}).

-record(dlx_consumer,
{pid :: pid(),
prefetch :: non_neg_integer(),
checked_out = #{} :: #{msg_id() =>
optimised_tuple(rabbit_dead_letter:reason(), msg())},
next_msg_id = 0 :: msg_id()}).

-record(rabbit_fifo_dlx,
{consumer :: option(#dlx_consumer{}),
%% Queue of dead-lettered messages.
discards = lqueue:new() :: lqueue:lqueue(optimised_tuple(rabbit_dead_letter:reason(), msg())),
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
%% so that we get the smallest ra index in O(1).
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
msg_bytes = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer()}).

-record(rabbit_fifo,
{cfg :: #cfg{},
% unassigned messages
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
messages_total = 0 :: non_neg_integer(),
% queue of returned msg_in_ids - when checking out it picks from
returns = lqueue:new() :: lqueue:lqueue(term()),
% a counter of enqueues - used to trigger shadow copy points
% discareded bytes - a counter that is incremented every time a command
% is procesesed that does not need to be kept (live indexes).
% Approximate, used for triggering snapshots
% reset to 0 when release_cursor gets stored
enqueue_count = 0 :: non_neg_integer(),
discarded_bytes = 0,
% a map containing all the live processes that have ever enqueued
% a message to this queue
enqueuers = #{} :: #{pid() => #enqueuer{}},
Expand All @@ -197,19 +248,21 @@
% rabbit_fifo_index can be slow when calculating the smallest
% index when there are large gaps but should be faster than gb_trees
% for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
unused_0 = ?NIL,
unused_1 = ?NIL,
% consumers need to reflect consumer state at time of snapshot
consumers = #{} :: #{consumer_key() => consumer()},
% consumers that require further service are queued here
service_queue = priority_queue:new() :: priority_queue:q(),
%% state for at-least-once dead-lettering
dlx = rabbit_fifo_dlx:init() :: rabbit_fifo_dlx:state(),
dlx = #rabbit_fifo_dlx{} :: #rabbit_fifo_dlx{},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
%% one is picked if active consumer is cancelled or dies
%% used only when single active consumer is on
waiting_consumers = [] :: [{consumer_key(), consumer()}],
%% records the timestamp whenever the queue was last considered
%% active in terms of consumer activity
last_active :: option(non_neg_integer()),
msg_cache :: option({ra:index(), raw_msg()}),
unused_2 = ?NIL
Expand Down
63 changes: 24 additions & 39 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ enqueue(QName, Correlation, Msg,
{reject_publish, State0};
{error, {shutdown, delete}} ->
?LOG_DEBUG("~ts: QQ ~ts tried to register enqueuer during delete shutdown",
[?MODULE, rabbit_misc:rs(QName)]),
[?MODULE, rabbit_misc:rs(QName)]),
{reject_publish, State0};
{timeout, _} ->
{reject_publish, State0};
Err ->
?LOG_DEBUG("~ts: QQ ~ts error when registering enqueuer ~p",
[?MODULE, rabbit_misc:rs(QName), Err]),
[?MODULE, rabbit_misc:rs(QName), Err]),
exit(Err)
end;
enqueue(_QName, _Correlation, _Msg,
Expand Down Expand Up @@ -377,24 +377,12 @@ checkout(ConsumerTag, CreditMode, #{} = Meta,
is_tuple(CreditMode) ->
Servers = sorted_servers(State0),
ConsumerId = consumer_id(ConsumerTag),
Spec = case rabbit_fifo:is_v4() of
true ->
case CreditMode of
{simple_prefetch, 0} ->
{auto, {simple_prefetch,
?UNLIMITED_PREFETCH_COUNT}};
_ ->
{auto, CreditMode}
end;
false ->
case CreditMode of
{credited, _} ->
{auto, 0, credited};
{simple_prefetch, 0} ->
{auto, ?UNLIMITED_PREFETCH_COUNT, simple_prefetch};
{simple_prefetch, Num} ->
{auto, Num, simple_prefetch}
end
Spec = case CreditMode of
{simple_prefetch, 0} ->
{auto, {simple_prefetch,
?UNLIMITED_PREFETCH_COUNT}};
_ ->
{auto, CreditMode}
end,
Cmd = rabbit_fifo:make_checkout(ConsumerId, Spec, Meta),
%% ???
Expand All @@ -418,19 +406,15 @@ checkout(ConsumerTag, CreditMode, #{} = Meta,
NextMsgId - 1
end
end,
DeliveryCount = case rabbit_fifo:is_v4() of
true -> credit_api_v2;
false -> {credit_api_v1, 0}
end,
DeliveryCount = credit_api_v2,
ConsumerKey = maps:get(key, Reply, ConsumerId),
SDels = maps:update_with(
ConsumerTag,
fun (C) -> C#consumer{ack = Ack} end,
#consumer{key = ConsumerKey,
last_msg_id = LastMsgId,
ack = Ack,
delivery_count = DeliveryCount},
CDels0),
SDels = maps:update_with(ConsumerTag,
fun (C) -> C#consumer{ack = Ack} end,
#consumer{key = ConsumerKey,
last_msg_id = LastMsgId,
ack = Ack,
delivery_count = DeliveryCount},
CDels0),
{ok, Reply, State0#state{leader = Leader,
consumers = SDels}};
Err ->
Expand Down Expand Up @@ -1042,13 +1026,14 @@ send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
next_seq = Seq,
cfg = #cfg{soft_limit = SftLmt}} = State) ->
ok = case rabbit_fifo:is_return(Command) of
true ->
%% returns are sent to the aux machine for pre-evaluation
ra:cast_aux_command(Server, {Command, Seq, self()});
_ ->
ra:pipeline_command(Server, Command, Seq, Priority)
end,
% ok = case rabbit_fifo:is_return(Command) of
% true ->
% %% returns are sent to the aux machine for pre-evaluation
% ra:cast_aux_command(Server, {Command, Seq, self()});
% _ ->
% ra:pipeline_command(Server, Command, Seq, Priority)
% end,
ok = ra:pipeline_command(Server, Command, Seq, Priority),
State#state{pending = Pending#{Seq => {Correlation, Command}},
next_seq = Seq + 1,
slow = map_size(Pending) >= SftLmt}.
Expand Down
13 changes: 9 additions & 4 deletions deps/rabbit/src/rabbit_fifo_dlx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
-module(rabbit_fifo_dlx).

-include("rabbit_fifo_dlx.hrl").
-include("rabbit_fifo.hrl").
-include("rabbit_fifo_v7.hrl").
-include_lib("kernel/include/logger.hrl").
-compile({no_auto_import, [apply/3]}).

Expand All @@ -26,7 +26,8 @@
dehydrate/1,
stat/1,
update_config/4,
smallest_raft_index/1
smallest_raft_index/1,
live_indexes/1
]).

-record(checkout, {consumer :: pid(),
Expand Down Expand Up @@ -164,7 +165,7 @@ discard(Msgs0, Reason, {at_most_once, {Mod, Fun, Args}}, State) ->
Cmd = maps:get(Idx, Lookup),
%% ensure header delivery count
%% is copied to the message container
annotate_msg(H, rabbit_fifo:get_msg(Cmd))
annotate_msg(H, rabbit_fifo:get_msg_from_cmd(Cmd))
end || ?MSG(Idx, H) <- Msgs0],
[{mod_call, Mod, Fun, Args ++ [Reason, Msgs]}]
end},
Expand Down Expand Up @@ -237,7 +238,7 @@ delivery_effects(CPid, Msgs0) ->
Msgs = lists:zipwith(
fun (Cmd, {Reason, H, MsgId}) ->
{MsgId, {Reason,
annotate_msg(H, rabbit_fifo:get_msg(Cmd))}}
annotate_msg(H, rabbit_fifo:get_msg_from_cmd(Cmd))}}
end, Log, RsnIds),
[{send_msg, CPid, {dlx_event, self(), {dlx_delivery, Msgs}}, [cast]}]
end}].
Expand Down Expand Up @@ -365,5 +366,9 @@ dehydrate(State) ->
smallest_raft_index(#?MODULE{ra_indexes = Indexes}) ->
rabbit_fifo_index:smallest(Indexes).

-spec live_indexes(state()) -> [ra:index()].
live_indexes(#?MODULE{ra_indexes = Indexes}) ->
rabbit_fifo_index:indexes(Indexes).

annotate_msg(H, Msg) ->
rabbit_fifo:annotate_msg(H, Msg).
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_fifo_dlx.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
-record(dlx_consumer,
{pid :: pid(),
prefetch :: non_neg_integer(),
checked_out = #{} :: #{msg_id() => optimised_tuple(rabbit_dead_letter:reason(), msg())},
checked_out = #{} :: #{msg_id() =>
optimised_tuple(rabbit_dead_letter:reason(), msg())},
next_msg_id = 0 :: msg_id()}).

-record(rabbit_fifo_dlx,
Expand Down
5 changes: 5 additions & 0 deletions deps/rabbit/src/rabbit_fifo_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
delete/2,
size/1,
smallest/1,
indexes/1,
map/2,
to_list/1
]).
Expand Down Expand Up @@ -90,6 +91,10 @@ size(#?MODULE{data = Data}) ->
smallest(#?MODULE{smallest = Smallest}) ->
Smallest.

-spec indexes(state()) -> [ra:index()].
indexes(#?MODULE{data = Data}) ->
maps:keys(Data).

-spec map(fun(), state()) -> state().
map(F, #?MODULE{data = Data} = State) ->
State#?MODULE{data = maps:map(F, Data)}.
Expand Down
35 changes: 27 additions & 8 deletions deps/rabbit/src/rabbit_fifo_q.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
get/1,
len/1,
from_lqueue/1,
indexes/1,
get_lowest_index/1,
overview/1
]).
Expand Down Expand Up @@ -81,20 +82,28 @@ from_lqueue(LQ) ->
in(no, Item, Acc)
end, new(), LQ).

-spec indexes(state()) -> [ra:index()].
indexes(#?MODULE{hi = {Hi1, Hi2},
no = {No1, No2}}) ->
A = lists:map(fun msg_idx/1, Hi1),
B = lists:foldl(fun msg_idx_fld/2, A, Hi2),
C = lists:foldl(fun msg_idx_fld/2, B, No1),
lists:foldl(fun msg_idx_fld/2, C, No2).

-spec get_lowest_index(state()) -> undefined | ra:index().
get_lowest_index(#?MODULE{len = 0}) ->
undefined;
get_lowest_index(#?MODULE{hi = Hi, no = No}) ->
case peek(Hi) of
empty ->
?MSG(NoIdx, _) = peek(No),
NoIdx;
?MSG(HiIdx, _) ->
msg_idx(peek(No));
HiMsg ->
HiIdx = msg_idx(HiMsg),
case peek(No) of
?MSG(NoIdx, _) ->
min(HiIdx, NoIdx);
empty ->
HiIdx
HiIdx;
NoMsg ->
min(HiIdx, msg_idx(NoMsg))
end
end.

Expand All @@ -119,8 +128,10 @@ overview(#?MODULE{len = Len,
next(#?MODULE{hi = ?NON_EMPTY = Hi,
no = ?NON_EMPTY = No,
dequeue_counter = ?WEIGHT}) ->
?MSG(HiIdx, _) = HiMsg = peek(Hi),
?MSG(NoIdx, _) = NoMsg = peek(No),
HiMsg = peek(Hi),
NoMsg = peek(No),
HiIdx = msg_idx(HiMsg),
NoIdx = msg_idx(NoMsg),
%% always favour hi priority messages when it is safe to do so,
%% i.e. the index is lower than the next index for the 'no' queue
case HiIdx < NoIdx of
Expand Down Expand Up @@ -150,3 +161,11 @@ drop({In, [_]}) ->
{[], lists:reverse(In)};
drop({In, [_ | Out]}) ->
{In, Out}.

msg_idx_fld(Msg, Acc) when is_list(Acc) ->
[msg_idx(Msg) | Acc].

msg_idx(?MSG(Idx, _Header)) ->
Idx;
msg_idx(Packed) when ?IS_PACKED(Packed) ->
?PACKED_IDX(Packed).
Loading
Loading