Skip to content

Commit 489ee8b

Browse files
committed
Use tick-related timeout to repair leader record
A quorum queue tries to repair its record in a tick handler. This can happen during a network partition and the metadata store may itself be unavailable, making the update likely to time out. The default metadata store timeout is usually higher than the tick interval, so the tick handler may be stuck during several ticks. The record takes some time to be updated (timeout + tick interval, 30 + 5 seconds by default), significantly longer than it takes the metadata store to trigger an election and recover. Client applications may rely on the quorum queue topology to connect to an appropriate node, so making the system reflect the actual topology faster is important to them. This commit makes the record update operations use a timeout 1-second lower than the tick interval. The tick handler process should finish earlier in case of metadata datastore unavailability and it should not take more than a couple of ticks once the datastore is available to update the record.
1 parent 6261a7d commit 489ee8b

File tree

3 files changed

+38
-9
lines changed

3 files changed

+38
-9
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
3535
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2]).
3636
-export([on_node_up/1, on_node_down/1]).
37-
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
37+
-export([update/2, update/3, store_queue/1, update_decorators/2, policy_changed/2]).
3838
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
3939
-export([is_match/2, is_in_virtual_host/2]).
4040
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
@@ -303,7 +303,14 @@ do_internal_declare(Q0, false) ->
303303
'not_found' | amqqueue:amqqueue().
304304

305305
update(Name, Fun) ->
306-
rabbit_db_queue:update(Name, Fun).
306+
update(Name, Fun, #{}).
307+
308+
-spec update
309+
(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue()),
310+
#{timeout => timeout()}) -> 'not_found' | amqqueue:amqqueue().
311+
312+
update(Name, Fun, Options) ->
313+
rabbit_db_queue:update(Name, Fun, Options).
307314

308315
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
309316
Queue :: amqqueue:amqqueue(),

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
set/1,
3232
delete/2,
3333
update/2,
34+
update/3,
3435
update_decorators/2,
3536
exists/1,
3637
foreach/2
@@ -637,9 +638,23 @@ get_many_in_ets(Table, Names) ->
637638
%% @private
638639

639640
update(QName, Fun) ->
641+
update(QName, Fun, #{}).
642+
643+
-spec update(QName, UpdateFun, Options) -> Ret when
644+
QName :: rabbit_amqqueue:name(),
645+
Queue :: amqqueue:amqqueue(),
646+
UpdateFun :: fun((Queue) -> NewQueue),
647+
NewQueue :: amqqueue:amqqueue(),
648+
Options :: #{timeout => timeout()},
649+
Ret :: Queue | not_found.
650+
%% @doc Updates an existing queue record using `UpdateFun'.
651+
%%
652+
%% @private
653+
654+
update(QName, Fun, Options) ->
640655
rabbit_khepri:handle_fallback(
641656
#{mnesia => fun() -> update_in_mnesia(QName, Fun) end,
642-
khepri => fun() -> update_in_khepri(QName, Fun) end
657+
khepri => fun() -> update_in_khepri(QName, Fun, Options) end
643658
}).
644659

645660
update_in_mnesia(QName, Fun) ->
@@ -648,15 +663,19 @@ update_in_mnesia(QName, Fun) ->
648663
update_in_mnesia_tx(QName, Fun)
649664
end).
650665

666+
651667
update_in_khepri(QName, Fun) ->
668+
update_in_khepri(QName, Fun, #{}).
669+
670+
update_in_khepri(QName, Fun, Options) ->
652671
Path = khepri_queue_path(QName),
653-
Ret1 = rabbit_khepri:adv_get(Path),
672+
Ret1 = rabbit_khepri:adv_get(Path, Options),
654673
case Ret1 of
655674
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
656675
UpdatePath = khepri_path:combine_with_conditions(
657676
Path, [#if_payload_version{version = Vsn}]),
658677
Q1 = Fun(Q),
659-
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
678+
Ret2 = rabbit_khepri:put(UpdatePath, Q1, Options),
660679
case Ret2 of
661680
ok -> Q1;
662681
{error, {khepri, mismatching_node, _}} ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,8 @@ become_leader0(QName, Name) ->
446446
amqqueue:set_pid(Q1, {Name, node()}),
447447
live)
448448
end,
449-
_ = rabbit_amqqueue:update(QName, Fun),
449+
Timeout = max(tick_interval() - 1000, 1000),
450+
_ = rabbit_amqqueue:update(QName, Fun, #{timeout => Timeout}),
450451
case rabbit_amqqueue:lookup(QName) of
451452
{ok, Q0} when ?is_amqqueue(Q0) ->
452453
Nodes = get_nodes(Q0),
@@ -656,7 +657,7 @@ handle_tick(QName,
656657
ok;
657658
repaired ->
658659
?LOG_DEBUG("Repaired quorum queue ~ts amqqueue record",
659-
[rabbit_misc:rs(QName)])
660+
[rabbit_misc:rs(QName)])
660661
end,
661662
ExpectedNodes = rabbit_nodes:list_members(),
662663
case Nodes -- ExpectedNodes of
@@ -1981,8 +1982,7 @@ make_ra_conf(Q, ServerId) ->
19811982

19821983
make_ra_conf(Q, ServerId, Membership, MacVersion)
19831984
when is_integer(MacVersion) ->
1984-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1985-
?TICK_INTERVAL),
1985+
TickTimeout = tick_interval(),
19861986
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
19871987
?SNAPSHOT_INTERVAL),
19881988
CheckpointInterval = application:get_env(rabbit,
@@ -2408,3 +2408,6 @@ queue_vm_stats_sups() ->
24082408
queue_vm_ets() ->
24092409
{[quorum_ets],
24102410
[[ra_log_ets]]}.
2411+
2412+
tick_interval() ->
2413+
application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).

0 commit comments

Comments
 (0)