Skip to content

Commit

Permalink
Terminate channels and queue collector after reader
Browse files Browse the repository at this point in the history
What?

To not risk any regressions, keep the behaviour of RabbitMQ 3.x
where channel processes and connection helper processes such as
rabbit_queue_collector and rabbit_heartbeat are terminated after
rabbit_reader process.

For example, when RabbitMQ terminates with SIGTERM, we want
exclusive queues being deleted synchronously (as in 3.x).

Prior to this commit:
1. java -jar target/perf-test.jar -x 0 -y 1
2. ./sbin/rabbitmqctl stop_app
resulted in the following crash:
```
crasher:
  initial call: rabbit_reader:init/2
  pid: <0.2389.0>
  registered_name: []
  exception exit: {noproc,
                      {gen_server,call,[<0.2391.0>,delete_all,infinity]}}
    in function  gen_server:call/3 (gen_server.erl, line 419)
    in call from rabbit_reader:close_connection/1 (rabbit_reader.erl, line 683)
    in call from rabbit_reader:send_error_on_channel0_and_close/4 (rabbit_reader.erl, line 1668)
    in call from rabbit_reader:handle_dependent_exit/3 (rabbit_reader.erl, line 710)
    in call from rabbit_reader:mainloop/4 (rabbit_reader.erl, line 530)
    in call from rabbit_reader:run/1 (rabbit_reader.erl, line 452)
    in call from rabbit_reader:start_connection/4 (rabbit_reader.erl, line 351)
```
because rabbit_queue_collector was terminated before rabbit_reader.
This commit fixes this crash.

How?

Any Erlang supervisor including the rabbit_connection_sup supervisor
terminates its children in the opposite of the start order.
Since we want channel and queue collector processes - children of
rabbit_connection_helper_sup - be terminated after the
reader process, we must start rabbit_connection_helper_sup before the
reader process.

Since rabbit_connection_sup - the ranch_protocol implementation - does
not know yet whether it will supervise an AMQP 0.9.1 or AMQP 1.0
connection, it creates rabbit_connection_helper_sup for each AMQP protocol
version removing the superfluous one as soon as the protocol version negotation is
completed. Spawning and deleting this addition process has a negligible
effect on performance.

The whole problem is that the rabbit_connection_helper_sup differs in
its supervisor flags for AMQP 0.9.1 and AMQP 1.0 when it is started
because for Native AMQP 1.0 in 4.0 we remove the unnecessary
rabbit_amqp1_0_session_sup_sup supervisor level.
Therefore, we achieve our goal:
* in Native AMQP 1.0, 1 additional Erlang process is created per session
* in AMQP 1.0 in 3.x, 15 additional Erlang processes are created per session
  • Loading branch information
ansd committed Feb 15, 2024
1 parent f2a1a28 commit 32fd84b
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 47 deletions.
15 changes: 5 additions & 10 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@
%%--------------------------------------------------------------------------

unpack_from_0_9_1(
{Sock,RecvLen, PendingRecv, Buf, BufLen, ProxySocket,
{Sock,RecvLen, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
Parent, ConnectionHelperSupPid, HandshakeTimeout) ->
Parent, HandshakeTimeout) ->
#v1{parent = Parent,
sock = Sock,
callback = handshake,
recv_len = RecvLen,
pending_recv = PendingRecv,
connection_state = pre_init,
heartbeater = none,
helper_sup = ConnectionHelperSupPid,
helper_sup = SupPid,
buf = Buf,
buf_len = BufLen,
proxy_socket = ProxySocket,
Expand Down Expand Up @@ -612,13 +612,8 @@ handle_input(Callback, Data, _State) ->
init(Mode, PackedState) ->
{ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
{parent, Parent} = erlang:process_info(self(), parent),
ConnectionHelperSupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1,
auto_shutdown => any_significant},
{ok, ConnectionHelperSupPid} = rabbit_connection_sup:start_connection_helper_sup(
Parent, ConnectionHelperSupFlags),
State0 = unpack_from_0_9_1(PackedState, Parent, ConnectionHelperSupPid, HandshakeTimeout),
ok = rabbit_connection_sup:remove_connection_helper_sup(Parent, helper_sup_amqp_091),
State0 = unpack_from_0_9_1(PackedState, Parent, HandshakeTimeout),
State = start_1_0_connection(Mode, State0),
%% By invoking recvloop here we become 1.0.
recvloop(sys:debug_options([]), State).
Expand Down
60 changes: 43 additions & 17 deletions deps/rabbit/src/rabbit_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

-module(rabbit_connection_sup).

%% Supervisor for a (network) AMQP 0-9-1 client connection.
%% Supervisor for a (network) AMQP client connection.
%%
%% Supervises
%%
Expand All @@ -21,7 +21,7 @@

-export([start_link/3,
reader/1,
start_connection_helper_sup/2
remove_connection_helper_sup/2
]).

-export([init/1]).
Expand All @@ -35,12 +35,48 @@

start_link(Ref, _Transport, _Opts) ->
{ok, SupPid} = supervisor:start_link(?MODULE, []),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
%% them cleanly. But for 1.0 readers we can't start the real
%% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) -
%% so we add another supervisor into the hierarchy.
%%
%% This supervisor also acts as an intermediary for heartbeaters and
%% the queue collector process, since these must not be siblings of the
%% reader due to the potential for deadlock if they are added/restarted
%% whilst the supervision tree is shutting down.
ChildSpec = #{restart => transient,
significant => true,
shutdown => infinity,
type => supervisor},
{ok, HelperSup091} =
supervisor:start_child(
SupPid,
ChildSpec#{
id => helper_sup_amqp_091,
start => {rabbit_connection_helper_sup, start_link,
[#{strategy => one_for_one,
intensity => 10,
period => 10,
auto_shutdown => any_significant}]}}
),
{ok, HelperSup10} =
supervisor:start_child(
SupPid,
ChildSpec#{
id => helper_sup_amqp_10,
start => {rabbit_connection_helper_sup, start_link,
[#{strategy => one_for_all,
intensity => 0,
period => 1,
auto_shutdown => any_significant}]}}
),
{ok, ReaderPid} =
supervisor:start_child(
SupPid,
#{
id => reader,
start => {rabbit_reader, start_link, [Ref]},
start => {rabbit_reader, start_link, [{HelperSup091, HelperSup10}, Ref]},
restart => transient,
significant => true,
shutdown => ?WORKER_WAIT,
Expand All @@ -51,23 +87,13 @@ start_link(Ref, _Transport, _Opts) ->
{ok, SupPid, ReaderPid}.

-spec reader(pid()) -> pid().

reader(Pid) ->
hd(rabbit_misc:find_child(Pid, reader)).

-spec start_connection_helper_sup(pid(), supervisor:sup_flags()) ->
supervisor:startchild_ret().
start_connection_helper_sup(ConnectionSupPid, ConnectionHelperSupFlags) ->
supervisor:start_child(
ConnectionSupPid,
#{
id => helper_sup,
start => {rabbit_connection_helper_sup, start_link, [ConnectionHelperSupFlags]},
restart => transient,
significant => true,
shutdown => infinity,
type => supervisor
}).
-spec remove_connection_helper_sup(pid(), helper_sup_amqp_091 | helper_sup_amqp_10) -> ok.
remove_connection_helper_sup(ConnectionSupPid, ConnectionHelperId) ->
ok = supervisor:terminate_child(ConnectionSupPid, ConnectionHelperId),
ok = supervisor:delete_child(ConnectionSupPid, ConnectionHelperId).

%%--------------------------------------------------------------------------

Expand Down
39 changes: 19 additions & 20 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2,
-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/2,
shutdown/2]).

-export([system_continue/3, system_terminate/4, system_code_change/4]).

-export([init/2, mainloop/4, recvloop/4]).
-export([init/3, mainloop/4, recvloop/4]).

-export([conserve_resources/3, server_properties/1]).

Expand Down Expand Up @@ -78,7 +78,9 @@
%% pre_init | securing | running | blocking | blocked | closing | closed | {become, F}
connection_state,
%% see comment in rabbit_connection_sup:start_link/0
helper_sup,
helper_sup :: {HelperSupAmqp091 :: pid(),
HelperSupAmqp10 :: pid()} % pre version negotiation
| pid(), % post version negotiation
%% takes care of cleaning up exclusive queues,
%% see rabbit_queue_collector
queue_collector,
Expand Down Expand Up @@ -145,25 +147,25 @@

%%--------------------------------------------------------------------------

-spec start_link(ranch:ref()) ->
-spec start_link({pid(), pid()}, ranch:ref()) ->
rabbit_types:ok(pid()).
start_link(Ref) ->
Pid = proc_lib:spawn_link(?MODULE, init, [self(), Ref]),
start_link(HelperSups, Ref) ->
Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSups, Ref]),
{ok, Pid}.

-spec shutdown(pid(), string()) -> 'ok'.

shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).

-spec init(pid(), ranch:ref()) ->
-spec init(pid(), {pid(), pid()}, ranch:ref()) ->
no_return().
init(Parent, Ref) ->
init(Parent, HelperSups, Ref) ->
?LG_PROCESS_TYPE(reader),
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbit, proxy_protocol, false)),
Deb = sys:debug_options([]),
start_connection(Parent, Ref, Deb, Sock).
start_connection(Parent, HelperSups, Ref, Deb, Sock).

-spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any().

Expand Down Expand Up @@ -290,10 +292,10 @@ socket_op(Sock, Fun) ->
exit(normal)
end.

-spec start_connection(pid(), ranch:ref(), any(), rabbit_net:socket()) ->
-spec start_connection(pid(), {pid(), pid()}, ranch:ref(), any(), rabbit_net:socket()) ->
no_return().

start_connection(Parent, RanchRef, Deb, Sock) ->
start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
process_flag(trap_exit, true),
RealSocket = rabbit_net:unwrap_socket(Sock),
Name = case rabbit_net:connection_string(Sock, inbound) of
Expand Down Expand Up @@ -336,7 +338,7 @@ start_connection(Parent, RanchRef, Deb, Sock) ->
pending_recv = false,
connection_state = pre_init,
queue_collector = undefined, %% started on tune-ok
helper_sup = none,
helper_sup = HelperSups,
heartbeater = none,
channel_sup_sup_pid = none,
channel_count = 0,
Expand Down Expand Up @@ -1104,13 +1106,9 @@ start_091_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
#v1{parent = Parent,
sock = Sock,
helper_sup = {HelperSup091, _HelperSup10},
connection = Connection} = State0) ->
ConnectionHelperSupFlags = #{strategy => one_for_one,
intensity => 10,
period => 10,
auto_shutdown => any_significant},
{ok, ConnectionHelperSupPid} = rabbit_connection_sup:start_connection_helper_sup(
Parent, ConnectionHelperSupFlags),
ok = rabbit_connection_sup:remove_connection_helper_sup(Parent, helper_sup_amqp_10),
rabbit_networking:register_connection(self()),
Start = #'connection.start'{
version_major = ProtocolMajor,
Expand All @@ -1123,7 +1121,7 @@ start_091_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
timeout_sec = ?NORMAL_TIMEOUT,
protocol = Protocol},
connection_state = starting,
helper_sup = ConnectionHelperSupPid},
helper_sup = HelperSup091},
switch_callback(State, frame_header, 7).

-spec refuse_connection(rabbit_net:socket(), any()) -> no_return().
Expand Down Expand Up @@ -1647,6 +1645,7 @@ become_10(Id, State = #v1{sock = Sock}) ->
pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
recv_len = RecvLen,
pending_recv = PendingRecv,
helper_sup = {_HelperSup091, HelperSup10},
proxy_socket = ProxySocket,
connection = #connection{
name = Name,
Expand All @@ -1655,7 +1654,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
port = Port,
peer_port = PeerPort,
connected_at = ConnectedAt}}) ->
{Sock, RecvLen, PendingRecv, Buf, BufLen, ProxySocket,
{Sock, RecvLen, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.

respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
Expand Down

0 comments on commit 32fd84b

Please sign in to comment.