Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Merge pull request #336 from rabbitmq/reserve-qq-file-handles
Browse files Browse the repository at this point in the history
Reserve file handles for quorum queues

(cherry picked from commit 98db805)
  • Loading branch information
michaelklishin committed Oct 24, 2019
1 parent e46d0b8 commit 8fc1ca2
Showing 1 changed file with 93 additions and 25 deletions.
118 changes: 93 additions & 25 deletions src/file_handle_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@
-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2,
info/0, info/1, clear_read_cache/0, clear_process_read_cache/0]).
-export([set_reservation/0, set_reservation/1, release_reservation/0]).
-export([ulimit/0]).

-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3, prioritise_cast/3]).

-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
%% Reserve 3 handles for ra usage: wal, segment writer and a dets table
-define(RESERVED_FOR_OTHERS, 100 + 3).

-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
Expand Down Expand Up @@ -208,7 +210,9 @@
clients,
timer_ref,
alarm_set,
alarm_clear
alarm_clear,
reserve_count_socket,
reserve_count_file
}).

-record(cstate,
Expand All @@ -218,7 +222,9 @@
obtained_socket,
obtained_file,
blocked,
pending_closes
pending_closes,
reserved_socket,
reserved_file
}).

-record(pending,
Expand Down Expand Up @@ -557,12 +563,15 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.

obtain() -> obtain(1).
release() -> release(1).
transfer(Pid) -> transfer(Pid, 1).
obtain() -> obtain(1).
set_reservation() -> set_reservation(1).
release() -> release(1).
release_reservation() -> release_reservation(file).
transfer(Pid) -> transfer(Pid, 1).

obtain(Count) -> obtain(Count, socket).
release(Count) -> release(Count, socket).
obtain(Count) -> obtain(Count, socket).
set_reservation(Count) -> set_reservation(Count, file).
release(Count) -> release(Count, socket).

with_handle(Fun) ->
with_handle(1, Fun).
Expand All @@ -581,9 +590,19 @@ obtain(Count, Type) when Count > 0 ->
?SERVER, {obtain, Count, Type, self()}, infinity)
end.

set_reservation(Count, Type) when Count > 0 ->
%% If the FHC isn't running, reserve succeed immediately.
case whereis(?SERVER) of
undefined -> ok;
_ -> gen_server2:cast(?SERVER, {set_reservation, Count, Type, self()})
end.

release(Count, Type) when Count > 0 ->
gen_server2:cast(?SERVER, {release, Count, Type, self()}).

release_reservation(Type) ->
gen_server2:cast(?SERVER, {release_reservation, Type, self()}).

transfer(Pid, Count) when Count > 0 ->
gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}).

Expand Down Expand Up @@ -1045,12 +1064,16 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(total_limit, #fhc_state{limit = Limit}) -> Limit;
i(total_used, State) -> used(State);
i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
i(sockets_used, #fhc_state{obtain_count_socket = Count}) -> Count;
i(sockets_used, #fhc_state{obtain_count_socket = Count,
reserve_count_socket = RCount}) -> Count + RCount;
i(files_reserved, #fhc_state{reserve_count_file = RCount}) -> RCount;
i(Item, _) -> throw({bad_argument, Item}).

used(#fhc_state{open_count = C1,
obtain_count_socket = C2,
obtain_count_file = C3}) -> C1 + C2 + C3.
obtain_count_socket = C2,
obtain_count_file = C3,
reserve_count_socket = C4,
reserve_count_file = C5}) -> C1 + C2 + C3 + C4 + C5.

%%----------------------------------------------------------------------------
%% gen_server2 callbacks
Expand Down Expand Up @@ -1085,11 +1108,14 @@ init([AlarmSet, AlarmClear]) ->
clients = Clients,
timer_ref = undefined,
alarm_set = AlarmSet,
alarm_clear = AlarmClear }}.
alarm_clear = AlarmClear,
reserve_count_file = 0,
reserve_count_socket = 0 }}.

prioritise_cast(Msg, _Len, _State) ->
case Msg of
{release, _, _, _} -> 5;
{release_reservation, _, _, _} -> 5;
_ -> 0
end.

Expand Down Expand Up @@ -1197,7 +1223,20 @@ handle_cast({transfer, N, FromPid, ToPid}, State) ->

handle_cast(clear_read_cache, State) ->
_ = clear_process_read_cache(),
{noreply, State}.
{noreply, State};

handle_cast({release_reservation, Type, Pid}, State) ->
State1 = process_pending(update_counts({reserve, Type}, Pid, 0, State)),
{noreply, adjust_alarm(State, State1)};

handle_cast({set_reservation, N, Type, Pid},
State = #fhc_state { clients = Clients }) ->
ok = track_client(Pid, Clients),
NewState = process_pending(update_counts({reserve, Type}, Pid, N, State)),
{noreply, case needs_reduce(NewState) of
true -> reduce(NewState);
false -> adjust_alarm(State, NewState)
end}.

handle_info(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
Expand All @@ -1210,10 +1249,14 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
obtain_count_socket = ObtainCountS,
obtain_pending_file = ObtainPendingF,
obtain_pending_socket = ObtainPendingS,
reserve_count_file = ReserveCountF,
reserve_count_socket = ReserveCountS,
clients = Clients }) ->
[#cstate { opened = Opened,
obtained_file = ObtainedFile,
obtained_socket = ObtainedSocket}] =
obtained_socket = ObtainedSocket,
reserved_file = ReservedFile,
reserved_socket = ReservedSocket }] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
true = ets:delete(Elders, Pid),
Expand All @@ -1225,7 +1268,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
obtain_count_file = ObtainCountF - ObtainedFile,
obtain_count_socket = ObtainCountS - ObtainedSocket,
obtain_pending_file = filter_pending(Fun, ObtainPendingF),
obtain_pending_socket = filter_pending(Fun, ObtainPendingS) }),
obtain_pending_socket = filter_pending(Fun, ObtainPendingS),
reserve_count_file = ReserveCountF - ReservedFile,
reserve_count_socket = ReserveCountS - ReservedSocket}),
{noreply, adjust_alarm(State, State1)}.

terminate(_Reason, State = #fhc_state { clients = Clients,
Expand Down Expand Up @@ -1288,8 +1333,9 @@ obtain_limit_reached(socket, State) -> obtain_limit_reached(State);
obtain_limit_reached(file, State) -> needs_reduce(State).

obtain_limit_reached(#fhc_state{obtain_limit = Limit,
obtain_count_socket = Count}) ->
Limit =/= infinity andalso Count >= Limit.
obtain_count_socket = Count,
reserve_count_socket = RCount}) ->
Limit =/= infinity andalso (RCount + Count) >= Limit.

obtain_state(file, count, #fhc_state{obtain_count_file = N}) -> N;
obtain_state(socket, count, #fhc_state{obtain_count_socket = N}) -> N;
Expand Down Expand Up @@ -1325,17 +1371,21 @@ process_obtain(socket, State = #fhc_state { limit = Limit,
open_count = OpenCount,
obtain_count_socket = ObtainCount,
obtain_pending_socket = Pending,
obtain_count_file = ObtainCountF}) ->
obtain_count_file = ObtainCountF,
reserve_count_file = ReserveCountF,
reserve_count_socket = ReserveCount}) ->
Quota = min(ObtainLimit - ObtainCount,
Limit - (OpenCount + ObtainCount + ObtainCountF)),
Limit - (OpenCount + ObtainCount + ObtainCountF + ReserveCount + ReserveCountF)),
{Pending1, State1} = process_pending(Pending, Quota, State),
State1#fhc_state{obtain_pending_socket = Pending1};
process_obtain(file, State = #fhc_state { limit = Limit,
open_count = OpenCount,
obtain_count_socket = ObtainCountS,
obtain_count_file = ObtainCountF,
obtain_pending_file = Pending}) ->
Quota = Limit - (OpenCount + ObtainCountS + ObtainCountF),
obtain_pending_file = Pending,
reserve_count_file = ReserveCountF,
reserve_count_socket = ReserveCountS}) ->
Quota = Limit - (OpenCount + ObtainCountS + ObtainCountF + ReserveCountF + ReserveCountS),
{Pending1, State1} = process_pending(Pending, Quota, State),
State1#fhc_state{obtain_pending_file = Pending1}.

Expand Down Expand Up @@ -1376,7 +1426,21 @@ update_counts({obtain, socket}, Pid, Delta,
State = #fhc_state {obtain_count_socket = ObtainCountS,
clients = Clients }) ->
ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}),
State #fhc_state { obtain_count_socket = ObtainCountS + Delta}.
State #fhc_state { obtain_count_socket = ObtainCountS + Delta};
update_counts({reserve, file}, Pid, NewReservation,
State = #fhc_state {reserve_count_file = ReserveCountF,
clients = Clients }) ->
[#cstate{reserved_file = R}] = ets:lookup(Clients, Pid),
Delta = NewReservation - R,
ets:update_counter(Clients, Pid, {#cstate.reserved_file, Delta}),
State #fhc_state { reserve_count_file = ReserveCountF + Delta};
update_counts({reserve, socket}, Pid, NewReservation,
State = #fhc_state {reserve_count_socket = ReserveCountS,
clients = Clients }) ->
[#cstate{reserved_file = R}] = ets:lookup(Clients, Pid),
Delta = NewReservation - R,
ets:update_counter(Clients, Pid, {#cstate.reserved_socket, Delta}),
State #fhc_state { reserve_count_socket = ReserveCountS + Delta}.

maybe_reduce(State) ->
case needs_reduce(State) of
Expand All @@ -1391,9 +1455,11 @@ needs_reduce(#fhc_state { limit = Limit,
obtain_count_socket = ObtainCountS,
obtain_count_file = ObtainCountF,
obtain_pending_file = {ObtainPendingF, _},
obtain_pending_socket = {ObtainPendingS, _} }) ->
obtain_pending_socket = {ObtainPendingS, _},
reserve_count_socket = ReserveCountS,
reserve_count_file = ReserveCountF}) ->
Limit =/= infinity
andalso (((OpenCount + ObtainCountS + ObtainCountF) > Limit)
andalso (((OpenCount + ObtainCountS + ObtainCountF + ReserveCountS + ReserveCountF) > Limit)
orelse (OpenPending =/= 0)
orelse (ObtainPendingF =/= 0)
orelse (ObtainCountS < ObtainLimit
Expand Down Expand Up @@ -1474,7 +1540,9 @@ track_client(Pid, Clients) ->
obtained_file = 0,
obtained_socket = 0,
blocked = false,
pending_closes = 0 }) of
pending_closes = 0,
reserved_file = 0,
reserved_socket = 0 }) of
true -> _MRef = erlang:monitor(process, Pid),
ok;
false -> ok
Expand Down

0 comments on commit 8fc1ca2

Please sign in to comment.