Skip to content

Commit 183aaa1

Browse files
committed
Use tick-related timeout to repair leader record
A quorum queue tries to repair its record in a tick handler. This can happen during a network partition and the metadata store may itself be unavailable, making the update likely to time out. The default metadata store timeout is usually higher than the tick interval, so the tick handler may be stuck during several ticks. The record takes some time to be updated (timeout + tick interval, 30 + 5 seconds by default), significantly longer than it takes the metadata store to trigger an election and recover. Client applications may rely on the quorum queue topology to connect to an appropriate node, so making the system reflect the actual topology faster is important to them. This commit makes the record update operations use a timeout 1-second lower than the tick interval. The tick handler process should finish earlier in case of metadata datastore unavailability and it should not take more than a couple of ticks once the datastore is available to update the record.
1 parent 8ff21c2 commit 183aaa1

File tree

3 files changed

+37
-9
lines changed

3 files changed

+37
-9
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
3535
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2]).
3636
-export([on_node_up/1, on_node_down/1]).
37-
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
37+
-export([update/2, update/3, store_queue/1, update_decorators/2, policy_changed/2]).
3838
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
3939
-export([is_match/2, is_in_virtual_host/2]).
4040
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
@@ -303,7 +303,14 @@ do_internal_declare(Q0, false) ->
303303
'not_found' | amqqueue:amqqueue().
304304

305305
update(Name, Fun) ->
306-
rabbit_db_queue:update(Name, Fun).
306+
update(Name, Fun, #{}).
307+
308+
-spec update
309+
(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue()),
310+
khepri:query_options()) -> 'not_found' | amqqueue:amqqueue().
311+
312+
update(Name, Fun, Options) ->
313+
rabbit_db_queue:update(Name, Fun, Options).
307314

308315
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
309316
Queue :: amqqueue:amqqueue(),

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
set/1,
3232
delete/2,
3333
update/2,
34+
update/3,
3435
update_decorators/2,
3536
exists/1,
3637
foreach/2
@@ -637,9 +638,22 @@ get_many_in_ets(Table, Names) ->
637638
%% @private
638639

639640
update(QName, Fun) ->
641+
update(QName, Fun, #{}).
642+
643+
-spec update(QName, UpdateFun, Options) -> Ret when
644+
QName :: rabbit_amqqueue:name(),
645+
Queue :: amqqueue:amqqueue(),
646+
UpdateFun :: fun((Queue) -> Queue),
647+
Options :: khepri:query_options(),
648+
Ret :: Queue | not_found.
649+
%% @doc Updates an existing queue record using `UpdateFun'.
650+
%%
651+
%% @private
652+
653+
update(QName, Fun, Options) ->
640654
rabbit_khepri:handle_fallback(
641655
#{mnesia => fun() -> update_in_mnesia(QName, Fun) end,
642-
khepri => fun() -> update_in_khepri(QName, Fun) end
656+
khepri => fun() -> update_in_khepri(QName, Fun, Options) end
643657
}).
644658

645659
update_in_mnesia(QName, Fun) ->
@@ -648,15 +662,19 @@ update_in_mnesia(QName, Fun) ->
648662
update_in_mnesia_tx(QName, Fun)
649663
end).
650664

665+
651666
update_in_khepri(QName, Fun) ->
667+
update_in_khepri(QName, Fun, #{}).
668+
669+
update_in_khepri(QName, Fun, Options) ->
652670
Path = khepri_queue_path(QName),
653-
Ret1 = rabbit_khepri:adv_get(Path),
671+
Ret1 = rabbit_khepri:adv_get(Path, Options),
654672
case Ret1 of
655673
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
656674
UpdatePath = khepri_path:combine_with_conditions(
657675
Path, [#if_payload_version{version = Vsn}]),
658676
Q1 = Fun(Q),
659-
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
677+
Ret2 = rabbit_khepri:put(UpdatePath, Q1, Options),
660678
case Ret2 of
661679
ok -> Q1;
662680
{error, {khepri, mismatching_node, _}} ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,8 @@ become_leader0(QName, Name) ->
446446
amqqueue:set_pid(Q1, {Name, node()}),
447447
live)
448448
end,
449-
_ = rabbit_amqqueue:update(QName, Fun),
449+
Timeout = max(tick_interval() - 1000, 1000),
450+
_ = rabbit_amqqueue:update(QName, Fun, #{timeout => Timeout}),
450451
case rabbit_amqqueue:lookup(QName) of
451452
{ok, Q0} when ?is_amqqueue(Q0) ->
452453
Nodes = get_nodes(Q0),
@@ -656,7 +657,7 @@ handle_tick(QName,
656657
ok;
657658
repaired ->
658659
?LOG_DEBUG("Repaired quorum queue ~ts amqqueue record",
659-
[rabbit_misc:rs(QName)])
660+
[rabbit_misc:rs(QName)])
660661
end,
661662
ExpectedNodes = rabbit_nodes:list_members(),
662663
case Nodes -- ExpectedNodes of
@@ -1981,8 +1982,7 @@ make_ra_conf(Q, ServerId) ->
19811982

19821983
make_ra_conf(Q, ServerId, Membership, MacVersion)
19831984
when is_integer(MacVersion) ->
1984-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1985-
?TICK_INTERVAL),
1985+
TickTimeout = tick_interval(),
19861986
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
19871987
?SNAPSHOT_INTERVAL),
19881988
CheckpointInterval = application:get_env(rabbit,
@@ -2408,3 +2408,6 @@ queue_vm_stats_sups() ->
24082408
queue_vm_ets() ->
24092409
{[quorum_ets],
24102410
[[ra_log_ets]]}.
2411+
2412+
tick_interval() ->
2413+
application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).

0 commit comments

Comments
 (0)