Skip to content

Commit

Permalink
Transform rabbit_amqp1_0_writer into gen_server
Browse files Browse the repository at this point in the history
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's
much better to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
  • Loading branch information
ansd committed Sep 12, 2023
1 parent aeae282 commit f4cec17
Showing 1 changed file with 131 additions and 111 deletions.
242 changes: 131 additions & 111 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,176 +6,196 @@
%%

-module(rabbit_amqp1_0_writer).
-include("rabbit_amqp1_0.hrl").
-behaviour(gen_server).

-export([start_link/3]).
-include("rabbit_amqp1_0.hrl").

-export([send_command/3,
%% client API
-export([start_link/3,
send_command/3,
send_command/4,
send_command_sync/3,
send_command_and_notify/6,
internal_send_command/3]).

%% internal
-export([mainloop/1, mainloop1/1]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
format_status/1]).

-record(wstate, {
-record(state, {
sock :: rabbit_net:socket(),
frame_max,
reader :: pid(),
stats_timer,
pending}).
pending :: iolist()
}).

-define(HIBERNATE_AFTER, 5000).
-define(HIBERNATE_AFTER, 5_000).
-define(CALL_TIMEOUT, 300_000).
-define(AMQP_SASL_FRAME_TYPE, 1).

%%%%%%%%%%%%%%%%%%
%%% client API %%%
%%%%%%%%%%%%%%%%%%

-spec start_link (rabbit_net:socket(), non_neg_integer(), pid()) ->
rabbit_types:ok(pid()).
start_link(Sock, FrameMax, ReaderPid) ->
State = initial_state(Sock, FrameMax, ReaderPid),
{ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
Args = {Sock, FrameMax, ReaderPid},
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(?MODULE, Args, Opts).

-spec send_command(pid(),
rabbit_types:channel_number(),
rabbit_framing:amqp_method_record()) -> 'ok'.
send_command(W, Ch, MethodRecord) ->
W ! {send_command, Ch, MethodRecord},
ok.
rabbit_framing:amqp_method_record()) -> ok.
send_command(Writer, ChannelNum, MethodRecord) ->
Request = {send_command, ChannelNum, MethodRecord},
gen_server:cast(Writer, Request).

-spec send_command(pid(),
rabbit_types:channel_number(),
rabbit_framing:amqp_method_record(),
rabbit_types:content()) -> 'ok'.
send_command(W, Ch, MethodRecord, Content) ->
W ! {send_command, Ch, MethodRecord, Content},
ok.
rabbit_types:content()) -> ok.
send_command(Writer, ChannelNum, MethodRecord, Content) ->
Request = {send_command, ChannelNum, MethodRecord, Content},
gen_server:cast(Writer, Request).

-spec send_command_sync(pid(),
rabbit_types:channel_number(),
rabbit_framing:amqp_method_record()) -> 'ok'.
send_command_sync(W, Ch, MethodRecord) ->
call(W, {send_command_sync, Ch, MethodRecord}).
rabbit_framing:amqp_method_record()) -> ok.
send_command_sync(Writer, ChannelNum, MethodRecord) ->
Request = {send_command, ChannelNum, MethodRecord},
gen_server:call(Writer, Request, ?CALL_TIMEOUT).

-spec send_command_and_notify(pid(),
rabbit_types:channel_number(),
pid(),
pid(),
rabbit_framing:amqp_method_record(),
rabbit_types:content()) -> 'ok'.
send_command_and_notify(W, Ch, Q, SessionPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Ch, Q, SessionPid, MethodRecord, Content},
ok.
rabbit_types:content()) -> ok.
send_command_and_notify(Writer, ChannelNum, QueuePid, SessionPid, MethodRecord, Content) ->
Request = {send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content},
gen_server:cast(Writer, Request).

-spec internal_send_command(rabbit_net:socket(),
rabbit_framing:amqp_method_record(),
'amqp10_framing' | 'rabbit_amqp1_0_sasl') -> 'ok'.
amqp10_framing | rabbit_amqp1_0_sasl) -> ok.
internal_send_command(Sock, MethodRecord, Protocol) ->
ok = tcp_send(Sock, assemble_frame(0, MethodRecord, Protocol)).
Data = assemble_frame(0, MethodRecord, Protocol),
ok = tcp_send(Sock, Data).

call(Pid, Msg) ->
{ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
Res.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% gen_server callbacks %%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%

initial_state(Sock, FrameMax, ReaderPid) ->
State = #wstate{sock = Sock,
init({Sock, FrameMax, ReaderPid}) ->
State0 = #state{sock = Sock,
frame_max = FrameMax,
reader = ReaderPid,
pending = []},
%%TODO check stats_timer: When is it enabled and needs rabbit_event:init_stats_timer/2
rabbit_event:init_disabled_stats_timer(State, #wstate.stats_timer).

mainloop(State) ->
% try
mainloop1(State),
%%TODO handle writer failures properly
% catch
% exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
% ReaderPid ! {channel_exit, Channel, Error}
% end,
done.

mainloop1(State = #wstate{pending = []}) ->
receive
Message -> ?MODULE:mainloop1(handle_message(Message, State))
after ?HIBERNATE_AFTER ->
erlang:hibernate(?MODULE, mainloop, [State])
end;
mainloop1(State) ->
receive
Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
?MODULE:mainloop1(flush(State))
end.

handle_message({send_command, Ch, MethodRecord}, State) ->
internal_send_command_async(Ch, MethodRecord, State);
handle_message({send_command, Ch, MethodRecord, Content}, State) ->
internal_send_command_async(Ch, MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, Ch, MethodRecord}}, State) ->
State1 = flush(internal_send_command_async(Ch, MethodRecord, State)),
gen_server:reply(From, ok),
State1;
handle_message({send_command_and_notify, Ch, QPid, SessionPid, MethodRecord, Content}, State) ->
State1 = internal_send_command_async(Ch, MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, SessionPid),
State1;
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
handle_message({inet_reply, _, Status}, _State) ->
State = rabbit_event:init_disabled_stats_timer(State0, #state.stats_timer),
{ok, State}.

handle_cast({send_command, ChannelNum, MethodRecord}, State0) ->
State = internal_send_command_async(ChannelNum, MethodRecord, State0),
no_reply(State);
handle_cast({send_command, ChannelNum, MethodRecord, Content}, State0) ->
State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0),
no_reply(State);
handle_cast({send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content}, State0) ->
State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0),
rabbit_amqqueue:notify_sent(QueuePid, SessionPid),
no_reply(State).

handle_call({send_command, ChannelNum, MethodRecord}, _From, State0) ->
State1 = internal_send_command_async(ChannelNum, MethodRecord, State0),
State = flush(State1),
{reply, ok, State}.

handle_info(timeout, State0) ->
State = flush(State0),
{noreply, State};
handle_info({inet_reply, _, ok}, State0) ->
State = rabbit_event:ensure_stats_timer(State0, #state.stats_timer, emit_stats),
no_reply(State);
handle_info({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
ReaderPid ! ensure_stats,
rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
no_reply(State);
handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QueuePid),
no_reply(State).

format_status(Status) ->
maps:update_with(
state,
fun(#state{sock = Sock,
frame_max = FrameMax,
reader = Reader,
stats_timer = _,
pending = Pending}) ->
#{socket => Sock,
frame_max => FrameMax,
reader => Reader,
pending_bytes => iolist_size(Pending)}
end,
Status).

%%%%%%%%%%%%%%%
%%% Helpers %%%
%%%%%%%%%%%%%%%

no_reply(State) ->
{noreply, State, 0}.

%% Begin 1-0

assemble_frame(Channel, Performative) ->
assemble_frame(Channel, Performative, amqp10_framing).
internal_send_command_async(Channel, MethodRecord,
State = #state{pending = Pending}) ->
Frame = assemble_frame(Channel, MethodRecord),
maybe_flush(State#state{pending = [Frame | Pending]}).

assemble_frame(Channel, Performative, amqp10_framing) ->
?DEBUG("Channel ~tp <-~n~tp",
[Channel, amqp10_framing:pprint(Performative)]),
PerfBin = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, PerfBin);
assemble_frame(Channel, Performative, rabbit_amqp1_0_sasl) ->
?DEBUG("Channel ~tp <-~n~tp",
[Channel, amqp10_framing:pprint(Performative)]),
PerfBin = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin).
internal_send_command_async(Channel, MethodRecord, Content,
State = #state{frame_max = FrameMax,
pending = Pending}) ->
Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax),
maybe_flush(State#state{pending = [Frames | Pending]}).

%% Note: a transfer record can be followed by a number of other
%% records to make a complete frame but unlike 0-9-1 we may have many
%% content records. However, that's already been handled for us, we're
%% just sending a chunk, so from this perspective it's just a binary.

%%TODO respect FrameMax
assemble_frames(Channel, Performative, Content, _FrameMax) ->
?DEBUG("Channel ~tp <-~n~tp~n followed by ~tp bytes of content",
[Channel, amqp10_framing:pprint(Performative),
iolist_size(Content)]),
PerfBin = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, [PerfBin, Content]).

%% End 1-0

tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
assemble_frame(Channel, Performative) ->
assemble_frame(Channel, Performative, amqp10_framing).

internal_send_command_async(Channel, MethodRecord,
State = #wstate{pending = Pending}) ->
Frame = assemble_frame(Channel, MethodRecord),
maybe_flush(State#wstate{pending = [Frame | Pending]}).
assemble_frame(Channel, Performative, amqp10_framing) ->
?DEBUG("Channel ~tp <-~n~tp",
[Channel, amqp10_framing:pprint(Performative)]),
PerfBin = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, PerfBin);
assemble_frame(Channel, Performative, rabbit_amqp1_0_sasl) ->
?DEBUG("Channel ~tp <-~n~tp",
[Channel, amqp10_framing:pprint(Performative)]),
PerfBin = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin).

internal_send_command_async(Channel, MethodRecord, Content,
State = #wstate{frame_max = FrameMax,
pending = Pending}) ->
Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax),
maybe_flush(State#wstate{pending = [Frames | Pending]}).
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(
inet_error,
fun() -> rabbit_net:send(Sock, Data) end).

%% This magic number is the tcp-over-ethernet MSS (1460) minus the
%% minimum size of a AMQP basic.deliver method frame (24) plus basic
Expand All @@ -184,17 +204,17 @@ internal_send_command_async(Channel, MethodRecord, Content,
%% TODO doesn't make sense for AMQP 1.0
-define(FLUSH_THRESHOLD, 1414).

maybe_flush(State = #wstate{pending = Pending}) ->
maybe_flush(State = #state{pending = Pending}) ->
case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
true -> flush(State);
false -> State
end.

flush(State = #wstate{pending = []}) ->
flush(State = #state{pending = []}) ->
State;
flush(State = #wstate{sock = Sock, pending = Pending}) ->
flush(State = #state{sock = Sock, pending = Pending}) ->
ok = port_cmd(Sock, lists:reverse(Pending)),
State#wstate{pending = []}.
State#state{pending = []}.

%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
Expand Down

0 comments on commit f4cec17

Please sign in to comment.