Skip to content

Commit d19e9f2

Browse files
committed
Remove most of the fd related FHC code
Stats were not removed, including management UI stats relating to FDs. Web-MQTT and Web-STOMP configuration relating to FHC were not removed. The file_handle_cache itself must be kept until we remove CQv1.
1 parent fde9950 commit d19e9f2

23 files changed

+57
-360
lines changed

deps/amqp_client/src/amqp_network_connection.erl

-8
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ do_connect({Addr, Family},
118118
connection_timeout = Timeout,
119119
socket_options = ExtraOpts},
120120
SIF, State) ->
121-
ok = obtain(),
122121
case gen_tcp:connect(Addr, Port,
123122
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
124123
Timeout) of
@@ -134,7 +133,6 @@ do_connect({Addr, Family},
134133
SIF, State) ->
135134
{ok, GlobalSslOpts} = application:get_env(amqp_client, ssl_options),
136135
app_utils:start_applications([asn1, crypto, public_key, ssl]),
137-
ok = obtain(),
138136
case gen_tcp:connect(Addr, Port,
139137
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
140138
Timeout) of
@@ -379,11 +377,5 @@ handshake_recv(Expecting) ->
379377
end
380378
end.
381379

382-
obtain() ->
383-
case code:is_loaded(file_handle_cache) of
384-
false -> ok;
385-
_ -> file_handle_cache:obtain()
386-
end.
387-
388380
get_reason(#'connection.close'{reply_code = ErrCode}) ->
389381
?PROTOCOL:amqp_exception(ErrCode).

deps/rabbit/src/rabbit.erl

+2-5
Original file line numberDiff line numberDiff line change
@@ -741,9 +741,6 @@ status() ->
741741
get_disk_free_limit, []}},
742742
{disk_free, {rabbit_disk_monitor,
743743
get_disk_free, []}}]),
744-
S3 = rabbit_misc:with_exit_handler(
745-
fun () -> [] end,
746-
fun () -> [{file_descriptors, file_handle_cache:info()}] end),
747744
S4 = [{processes, [{limit, erlang:system_info(process_limit)},
748745
{used, erlang:system_info(process_count)}]},
749746
{run_queue, erlang:statistics(run_queue)},
@@ -778,7 +775,7 @@ status() ->
778775
(_) -> false
779776
end,
780777
maps:to_list(product_info())),
781-
S1 ++ S2 ++ S3 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8.
778+
S1 ++ S2 ++ S4 ++ S5 ++ S6 ++ S7 ++ S8.
782779

783780
alarms() ->
784781
Alarms = rabbit_misc:with_exit_handler(rabbit_misc:const([]),
@@ -1653,7 +1650,7 @@ config_files() ->
16531650
start_fhc() ->
16541651
ok = rabbit_sup:start_restartable_child(
16551652
file_handle_cache,
1656-
[fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]),
1653+
[fun(_) -> ok end, fun(_) -> ok end]),
16571654
ensure_working_fhc().
16581655

16591656
ensure_working_fhc() ->

deps/rabbit/src/rabbit_amqqueue.erl

-21
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
-module(rabbit_amqqueue).
99

10-
-export([warn_file_limit/0]).
1110
-export([recover/1, stop/1, start/1, declare/6, declare/7,
1211
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
1312
forget_all_durable/1]).
@@ -119,21 +118,6 @@
119118
active, activity_status, arguments]).
120119
-define(KILL_QUEUE_DELAY_INTERVAL, 100).
121120

122-
warn_file_limit() ->
123-
DurableQueues = find_recoverable_queues(),
124-
L = length(DurableQueues),
125-
126-
%% if there are not enough file handles, the server might hang
127-
%% when trying to recover queues, warn the user:
128-
case file_handle_cache:get_limit() < L of
129-
true ->
130-
rabbit_log:warning(
131-
"Recovering ~tp queues, available file handles: ~tp. Please increase max open file handles limit to at least ~tp!",
132-
[L, file_handle_cache:get_limit(), L]);
133-
false ->
134-
ok
135-
end.
136-
137121
-spec recover(rabbit_types:vhost()) ->
138122
{Recovered :: [amqqueue:amqqueue()],
139123
Failed :: [amqqueue:amqqueue()]}.
@@ -183,11 +167,6 @@ find_local_durable_queues(VHostName) ->
183167
rabbit_queue_type:is_recoverable(Q)
184168
end).
185169

186-
find_recoverable_queues() ->
187-
rabbit_db_queue:filter_all_durable(fun(Q) ->
188-
rabbit_queue_type:is_recoverable(Q)
189-
end).
190-
191170
-spec declare(name(),
192171
boolean(),
193172
boolean(),

deps/rabbit/src/rabbit_amqqueue_process.erl

-7
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,6 @@ init_it2(Recover, From, State = #q{q = Q,
210210
(Res == created orelse Res == existing) ->
211211
case matches(Recover, Q, Q1) of
212212
true ->
213-
ok = file_handle_cache:register_callback(
214-
rabbit_amqqueue, set_maximum_since_use, [self()]),
215213
ok = rabbit_memory_monitor:register(
216214
self(), {rabbit_amqqueue,
217215
set_ram_duration_target, [self()]}),
@@ -1194,7 +1192,6 @@ prioritise_cast(Msg, _Len, State) ->
11941192
delete_immediately -> 8;
11951193
{delete_exclusive, _Pid} -> 8;
11961194
{set_ram_duration_target, _Duration} -> 8;
1197-
{set_maximum_since_use, _Age} -> 8;
11981195
{run_backing_queue, _Mod, _Fun} -> 6;
11991196
{ack, _AckTags, _ChPid} -> 4; %% [1]
12001197
{resume, _ChPid} -> 3;
@@ -1510,10 +1507,6 @@ handle_cast({set_ram_duration_target, Duration},
15101507
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
15111508
noreply(State#q{backing_queue_state = BQS1});
15121509

1513-
handle_cast({set_maximum_since_use, Age}, State) ->
1514-
ok = file_handle_cache:set_maximum_since_use(Age),
1515-
noreply(State);
1516-
15171510
handle_cast({credit, SessionPid, CTag, Credit, Drain},
15181511
#q{q = Q,
15191512
backing_queue = BQ,

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

+2-23
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,6 @@
4141
-define(HEADER_SIZE, 64). %% bytes
4242
-define(ENTRY_SIZE, 32). %% bytes
4343

44-
%% The file_handle_cache module tracks reservations at
45-
%% the level of the process. This means we cannot
46-
%% handle them independently in the store and index.
47-
%% Because the index may reserve more FDs than the
48-
%% store the index becomes responsible for this and
49-
%% will always reserve at least 2 FDs, and release
50-
%% everything when terminating.
51-
-define(STORE_FD_RESERVATIONS, 2).
52-
5344
-include_lib("rabbit_common/include/rabbit.hrl").
5445
%% Set to true to get an awful lot of debug logs.
5546
-if(false).
@@ -538,7 +529,6 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
538529
ok = file:sync(Fd),
539530
ok = file:close(Fd)
540531
end, OpenFds),
541-
file_handle_cache:release_reservation(),
542532
%% Write recovery terms for faster recovery.
543533
_ = rabbit_recovery_terms:store(VHost,
544534
filename:basename(rabbit_file:binary_to_filename(Dir)),
@@ -555,7 +545,6 @@ delete_and_terminate(State = #qi { dir = Dir,
555545
_ = maps:map(fun(_, Fd) ->
556546
ok = file:close(Fd)
557547
end, OpenFds),
558-
file_handle_cache:release_reservation(),
559548
%% Erase the data on disk.
560549
ok = erase_index_dir(rabbit_file:binary_to_filename(Dir)),
561550
State#qi{ segments = #{},
@@ -626,18 +615,9 @@ new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments })
626615
%% using too many FDs when the consumer lags a lot. We
627616
%% limit at 4 because we try to keep up to 2 for reading
628617
%% and 2 for writing.
629-
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds })
618+
reduce_fd_usage(_SegmentToOpen, State = #qi{ fds = OpenFds })
630619
when map_size(OpenFds) < 4 ->
631-
%% The only case where we need to update reservations is
632-
%% when we are opening a segment that wasn't already open,
633-
%% and we are not closing another segment at the same time.
634-
case OpenFds of
635-
#{SegmentToOpen := _} ->
636-
State;
637-
_ ->
638-
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds) + 1),
639-
State
640-
end;
620+
State;
641621
reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) ->
642622
case OpenFds0 of
643623
#{SegmentToOpen := _} ->
@@ -868,7 +848,6 @@ delete_segment(Segment, State0 = #qi{ fds = OpenFds0 }) ->
868848
State = case maps:take(Segment, OpenFds0) of
869849
{Fd, OpenFds} ->
870850
ok = file:close(Fd),
871-
file_handle_cache:set_reservation(?STORE_FD_RESERVATIONS + map_size(OpenFds)),
872851
State0#qi{ fds = OpenFds };
873852
error ->
874853
State0

deps/rabbit/src/rabbit_classic_queue_store_v2.erl

-5
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,6 @@
4141
%% need to look into the store to discard them. Messages on disk
4242
%% will be dropped at the same time as the index deletes the
4343
%% corresponding segment file.
44-
%%
45-
%% The file_handle_cache reservations are done by the v2 index
46-
%% because they are handled at a pid level. Since we are using
47-
%% up to 2 FDs in this module we make the index reserve 2 extra
48-
%% FDs.
4944

5045
-module(rabbit_classic_queue_store_v2).
5146

deps/rabbit/src/rabbit_fifo.erl

+2-9
Original file line numberDiff line numberDiff line change
@@ -894,10 +894,8 @@ state_enter0(leader, #?MODULE{consumers = Cons,
894894
Mons = [{monitor, process, P} || P <- Pids],
895895
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
896896
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
897-
FHReservation = [{mod_call, rabbit_quorum_queue,
898-
file_handle_leader_reservation, [Resource]}],
899897
NotifyDecs = notify_decorators_startup(Resource),
900-
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs],
898+
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ [NotifyDecs],
901899
case BLH of
902900
undefined ->
903901
Effects;
@@ -914,12 +912,7 @@ state_enter0(eol, #?MODULE{enqueuers = Enqs,
914912
AllConsumers = maps:merge(Custs, WaitingConsumers1),
915913
[{send_msg, P, eol, ra_event}
916914
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
917-
[{aux, eol},
918-
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []} | Effects];
919-
state_enter0(State, #?MODULE{cfg = #cfg{resource = _Resource}}, Effects)
920-
when State =/= leader ->
921-
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
922-
[FHReservation | Effects];
915+
[{aux, eol} | Effects];
923916
state_enter0(_, _, Effects) ->
924917
%% catch all as not handling all states
925918
Effects.

deps/rabbit/src/rabbit_fifo_v0.erl

+2-8
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,6 @@ state_enter(leader, #?STATE{consumers = Cons,
548548
enqueuers = Enqs,
549549
waiting_consumers = WaitingConsumers,
550550
cfg = #cfg{name = Name,
551-
resource = Resource,
552551
become_leader_handler = BLH},
553552
prefix_msgs = {0, [], 0, []}
554553
}) ->
@@ -559,8 +558,7 @@ state_enter(leader, #?STATE{consumers = Cons,
559558
Mons = [{monitor, process, P} || P <- Pids],
560559
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
561560
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
562-
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
563-
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
561+
Effects = Mons ++ Nots ++ NodeMons,
564562
case BLH of
565563
undefined ->
566564
Effects;
@@ -575,11 +573,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
575573
#{}, WaitingConsumers0),
576574
AllConsumers = maps:merge(Custs, WaitingConsumers1),
577575
[{send_msg, P, eol, ra_event}
578-
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
579-
[{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
580-
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
581-
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
582-
[FHReservation];
576+
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))];
583577
state_enter(_, _) ->
584578
%% catch all as not handling all states
585579
[].

deps/rabbit/src/rabbit_fifo_v1.erl

+2-8
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,6 @@ state_enter(leader, #?STATE{consumers = Cons,
676676
enqueuers = Enqs,
677677
waiting_consumers = WaitingConsumers,
678678
cfg = #cfg{name = Name,
679-
resource = Resource,
680679
become_leader_handler = BLH},
681680
prefix_msgs = {0, [], 0, []}
682681
}) ->
@@ -687,8 +686,7 @@ state_enter(leader, #?STATE{consumers = Cons,
687686
Mons = [{monitor, process, P} || P <- Pids],
688687
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
689688
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
690-
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
691-
Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
689+
Effects = Mons ++ Nots ++ NodeMons,
692690
case BLH of
693691
undefined ->
694692
Effects;
@@ -704,11 +702,7 @@ state_enter(eol, #?STATE{enqueuers = Enqs,
704702
AllConsumers = maps:merge(Custs, WaitingConsumers1),
705703
[{send_msg, P, eol, ra_event}
706704
|| P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
707-
[{aux, eol},
708-
{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
709-
state_enter(State, #?STATE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
710-
FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
711-
[FHReservation];
705+
[{aux, eol}];
712706
state_enter(_, _) ->
713707
%% catch all as not handling all states
714708
[].

0 commit comments

Comments
 (0)