diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index dbaad06bca8a..bfa52f55bad5 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -34,7 +34,7 @@ -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, notify_down_all/3, activate_limit_all/2]). -export([on_node_up/1, on_node_down/1]). --export([update/2, store_queue/1, update_decorators/2, policy_changed/2]). +-export([update/2, update/3, store_queue/1, update_decorators/2, policy_changed/2]). -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([is_match/2, is_in_virtual_host/2]). -export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). @@ -298,12 +298,18 @@ do_internal_declare(Q0, false) -> Queue = rabbit_queue_decorator:set(Q), rabbit_db_queue:create_or_get(Queue). --spec update - (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) -> - 'not_found' | amqqueue:amqqueue(). +-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) -> + 'not_found' | amqqueue:amqqueue(). update(Name, Fun) -> - rabbit_db_queue:update(Name, Fun). + update(Name, Fun, #{}). + +-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue()), + #{timeout => timeout()}) -> + 'not_found' | amqqueue:amqqueue(). + +update(Name, Fun, Options) -> + rabbit_db_queue:update(Name, Fun, Options). -spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when Queue :: amqqueue:amqqueue(), diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index b17951b8c871..2fe9fbccf2f3 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -31,6 +31,7 @@ set/1, delete/2, update/2, + update/3, update_decorators/2, exists/1, foreach/2 @@ -637,9 +638,23 @@ get_many_in_ets(Table, Names) -> %% @private update(QName, Fun) -> + update(QName, Fun, #{}). + +-spec update(QName, UpdateFun, Options) -> Ret when + QName :: rabbit_amqqueue:name(), + Queue :: amqqueue:amqqueue(), + UpdateFun :: fun((Queue) -> NewQueue), + NewQueue :: amqqueue:amqqueue(), + Options :: #{timeout => timeout()}, + Ret :: Queue | not_found. +%% @doc Updates an existing queue record using `UpdateFun'. +%% +%% @private + +update(QName, Fun, Options) -> rabbit_khepri:handle_fallback( #{mnesia => fun() -> update_in_mnesia(QName, Fun) end, - khepri => fun() -> update_in_khepri(QName, Fun) end + khepri => fun() -> update_in_khepri(QName, Fun, Options) end }). update_in_mnesia(QName, Fun) -> @@ -648,15 +663,19 @@ update_in_mnesia(QName, Fun) -> update_in_mnesia_tx(QName, Fun) end). + update_in_khepri(QName, Fun) -> + update_in_khepri(QName, Fun, #{}). + +update_in_khepri(QName, Fun, Options) -> Path = khepri_queue_path(QName), - Ret1 = rabbit_khepri:adv_get(Path), + Ret1 = rabbit_khepri:adv_get(Path, Options), case Ret1 of {ok, #{Path := #{data := Q, payload_version := Vsn}}} -> UpdatePath = khepri_path:combine_with_conditions( Path, [#if_payload_version{version = Vsn}]), Q1 = Fun(Q), - Ret2 = rabbit_khepri:put(UpdatePath, Q1), + Ret2 = rabbit_khepri:put(UpdatePath, Q1, Options), case Ret2 of ok -> Q1; {error, {khepri, mismatching_node, _}} -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 601144d9076f..feb676a4902b 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -446,7 +446,8 @@ become_leader0(QName, Name) -> amqqueue:set_pid(Q1, {Name, node()}), live) end, - _ = rabbit_amqqueue:update(QName, Fun), + Timeout = max(tick_interval() - 1000, 1000), + _ = rabbit_amqqueue:update(QName, Fun, #{timeout => Timeout}), case rabbit_amqqueue:lookup(QName) of {ok, Q0} when ?is_amqqueue(Q0) -> Nodes = get_nodes(Q0), @@ -656,7 +657,7 @@ handle_tick(QName, ok; repaired -> ?LOG_DEBUG("Repaired quorum queue ~ts amqqueue record", - [rabbit_misc:rs(QName)]) + [rabbit_misc:rs(QName)]) end, ExpectedNodes = rabbit_nodes:list_members(), case Nodes -- ExpectedNodes of @@ -1981,8 +1982,7 @@ make_ra_conf(Q, ServerId) -> make_ra_conf(Q, ServerId, Membership, MacVersion) when is_integer(MacVersion) -> - TickTimeout = application:get_env(rabbit, quorum_tick_interval, - ?TICK_INTERVAL), + TickTimeout = tick_interval(), SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval, ?SNAPSHOT_INTERVAL), CheckpointInterval = application:get_env(rabbit, @@ -2408,3 +2408,6 @@ queue_vm_stats_sups() -> queue_vm_ets() -> {[quorum_ets], [[ra_log_ets]]}. + +tick_interval() -> + application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).