From f4cec17dfa333d6aa39f7630ede4111ea9278795 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 11 Sep 2023 13:18:09 +0200 Subject: [PATCH] Transform rabbit_amqp1_0_writer into gen_server Why: Prior to this commit, when clicking on the AMQP 1.0 writer process in observer, the process crashed. Instead of handling all these debug messages of the sys module, it's much better to implement a gen_server. There is no advantage of using a special OTP process over gen_server for the AMQP 1.0 writer. gen_server also provides cleaner format status output. How: Message callbacks return a timeout of 0. After all messages in the inbox are processed, the timeout message is handled by flushing any pending bytes. --- .../src/rabbit_amqp1_0_writer.erl | 242 ++++++++++-------- 1 file changed, 131 insertions(+), 111 deletions(-) diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl index ee36a283b08b..0adf596a8d2e 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl @@ -6,153 +6,171 @@ %% -module(rabbit_amqp1_0_writer). --include("rabbit_amqp1_0.hrl"). +-behaviour(gen_server). --export([start_link/3]). +-include("rabbit_amqp1_0.hrl"). --export([send_command/3, +%% client API +-export([start_link/3, + send_command/3, send_command/4, send_command_sync/3, send_command_and_notify/6, internal_send_command/3]). -%% internal --export([mainloop/1, mainloop1/1]). +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + format_status/1]). --record(wstate, { +-record(state, { sock :: rabbit_net:socket(), frame_max, reader :: pid(), stats_timer, - pending}). + pending :: iolist() + }). --define(HIBERNATE_AFTER, 5000). +-define(HIBERNATE_AFTER, 5_000). +-define(CALL_TIMEOUT, 300_000). -define(AMQP_SASL_FRAME_TYPE, 1). +%%%%%%%%%%%%%%%%%% +%%% client API %%% +%%%%%%%%%%%%%%%%%% + -spec start_link (rabbit_net:socket(), non_neg_integer(), pid()) -> rabbit_types:ok(pid()). start_link(Sock, FrameMax, ReaderPid) -> - State = initial_state(Sock, FrameMax, ReaderPid), - {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}. + Args = {Sock, FrameMax, ReaderPid}, + Opts = [{hibernate_after, ?HIBERNATE_AFTER}], + gen_server:start_link(?MODULE, Args, Opts). -spec send_command(pid(), rabbit_types:channel_number(), - rabbit_framing:amqp_method_record()) -> 'ok'. -send_command(W, Ch, MethodRecord) -> - W ! {send_command, Ch, MethodRecord}, - ok. + rabbit_framing:amqp_method_record()) -> ok. +send_command(Writer, ChannelNum, MethodRecord) -> + Request = {send_command, ChannelNum, MethodRecord}, + gen_server:cast(Writer, Request). -spec send_command(pid(), rabbit_types:channel_number(), rabbit_framing:amqp_method_record(), - rabbit_types:content()) -> 'ok'. -send_command(W, Ch, MethodRecord, Content) -> - W ! {send_command, Ch, MethodRecord, Content}, - ok. + rabbit_types:content()) -> ok. +send_command(Writer, ChannelNum, MethodRecord, Content) -> + Request = {send_command, ChannelNum, MethodRecord, Content}, + gen_server:cast(Writer, Request). -spec send_command_sync(pid(), rabbit_types:channel_number(), - rabbit_framing:amqp_method_record()) -> 'ok'. -send_command_sync(W, Ch, MethodRecord) -> - call(W, {send_command_sync, Ch, MethodRecord}). + rabbit_framing:amqp_method_record()) -> ok. +send_command_sync(Writer, ChannelNum, MethodRecord) -> + Request = {send_command, ChannelNum, MethodRecord}, + gen_server:call(Writer, Request, ?CALL_TIMEOUT). -spec send_command_and_notify(pid(), rabbit_types:channel_number(), pid(), pid(), rabbit_framing:amqp_method_record(), - rabbit_types:content()) -> 'ok'. -send_command_and_notify(W, Ch, Q, SessionPid, MethodRecord, Content) -> - W ! {send_command_and_notify, Ch, Q, SessionPid, MethodRecord, Content}, - ok. + rabbit_types:content()) -> ok. +send_command_and_notify(Writer, ChannelNum, QueuePid, SessionPid, MethodRecord, Content) -> + Request = {send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content}, + gen_server:cast(Writer, Request). -spec internal_send_command(rabbit_net:socket(), rabbit_framing:amqp_method_record(), - 'amqp10_framing' | 'rabbit_amqp1_0_sasl') -> 'ok'. + amqp10_framing | rabbit_amqp1_0_sasl) -> ok. internal_send_command(Sock, MethodRecord, Protocol) -> - ok = tcp_send(Sock, assemble_frame(0, MethodRecord, Protocol)). + Data = assemble_frame(0, MethodRecord, Protocol), + ok = tcp_send(Sock, Data). -call(Pid, Msg) -> - {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity), - Res. +%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% gen_server callbacks %%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%% -initial_state(Sock, FrameMax, ReaderPid) -> - State = #wstate{sock = Sock, +init({Sock, FrameMax, ReaderPid}) -> + State0 = #state{sock = Sock, frame_max = FrameMax, reader = ReaderPid, pending = []}, %%TODO check stats_timer: When is it enabled and needs rabbit_event:init_stats_timer/2 - rabbit_event:init_disabled_stats_timer(State, #wstate.stats_timer). - -mainloop(State) -> - % try - mainloop1(State), - %%TODO handle writer failures properly - % catch - % exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State, - % ReaderPid ! {channel_exit, Channel, Error} - % end, - done. - -mainloop1(State = #wstate{pending = []}) -> - receive - Message -> ?MODULE:mainloop1(handle_message(Message, State)) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) - end; -mainloop1(State) -> - receive - Message -> ?MODULE:mainloop1(handle_message(Message, State)) - after 0 -> - ?MODULE:mainloop1(flush(State)) - end. - -handle_message({send_command, Ch, MethodRecord}, State) -> - internal_send_command_async(Ch, MethodRecord, State); -handle_message({send_command, Ch, MethodRecord, Content}, State) -> - internal_send_command_async(Ch, MethodRecord, Content, State); -handle_message({'$gen_call', From, {send_command_sync, Ch, MethodRecord}}, State) -> - State1 = flush(internal_send_command_async(Ch, MethodRecord, State)), - gen_server:reply(From, ok), - State1; -handle_message({send_command_and_notify, Ch, QPid, SessionPid, MethodRecord, Content}, State) -> - State1 = internal_send_command_async(Ch, MethodRecord, Content, State), - rabbit_amqqueue:notify_sent(QPid, SessionPid), - State1; -handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> - rabbit_amqqueue:notify_sent_queue_down(QPid), - State; -handle_message({inet_reply, _, ok}, State) -> - rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats); -handle_message({inet_reply, _, Status}, _State) -> + State = rabbit_event:init_disabled_stats_timer(State0, #state.stats_timer), + {ok, State}. + +handle_cast({send_command, ChannelNum, MethodRecord}, State0) -> + State = internal_send_command_async(ChannelNum, MethodRecord, State0), + no_reply(State); +handle_cast({send_command, ChannelNum, MethodRecord, Content}, State0) -> + State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0), + no_reply(State); +handle_cast({send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content}, State0) -> + State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0), + rabbit_amqqueue:notify_sent(QueuePid, SessionPid), + no_reply(State). + +handle_call({send_command, ChannelNum, MethodRecord}, _From, State0) -> + State1 = internal_send_command_async(ChannelNum, MethodRecord, State0), + State = flush(State1), + {reply, ok, State}. + +handle_info(timeout, State0) -> + State = flush(State0), + {noreply, State}; +handle_info({inet_reply, _, ok}, State0) -> + State = rabbit_event:ensure_stats_timer(State0, #state.stats_timer, emit_stats), + no_reply(State); +handle_info({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); -handle_message(emit_stats, State = #wstate{reader = ReaderPid}) -> +handle_info(emit_stats, State0 = #state{reader = ReaderPid}) -> ReaderPid ! ensure_stats, - rabbit_event:reset_stats_timer(State, #wstate.stats_timer); -handle_message(Message, _State) -> - exit({writer, message_not_understood, Message}). + State = rabbit_event:reset_stats_timer(State0, #state.stats_timer), + no_reply(State); +handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) -> + rabbit_amqqueue:notify_sent_queue_down(QueuePid), + no_reply(State). + +format_status(Status) -> + maps:update_with( + state, + fun(#state{sock = Sock, + frame_max = FrameMax, + reader = Reader, + stats_timer = _, + pending = Pending}) -> + #{socket => Sock, + frame_max => FrameMax, + reader => Reader, + pending_bytes => iolist_size(Pending)} + end, + Status). + +%%%%%%%%%%%%%%% +%%% Helpers %%% +%%%%%%%%%%%%%%% + +no_reply(State) -> + {noreply, State, 0}. -%% Begin 1-0 - -assemble_frame(Channel, Performative) -> - assemble_frame(Channel, Performative, amqp10_framing). +internal_send_command_async(Channel, MethodRecord, + State = #state{pending = Pending}) -> + Frame = assemble_frame(Channel, MethodRecord), + maybe_flush(State#state{pending = [Frame | Pending]}). -assemble_frame(Channel, Performative, amqp10_framing) -> - ?DEBUG("Channel ~tp <-~n~tp", - [Channel, amqp10_framing:pprint(Performative)]), - PerfBin = amqp10_framing:encode_bin(Performative), - amqp10_binary_generator:build_frame(Channel, PerfBin); -assemble_frame(Channel, Performative, rabbit_amqp1_0_sasl) -> - ?DEBUG("Channel ~tp <-~n~tp", - [Channel, amqp10_framing:pprint(Performative)]), - PerfBin = amqp10_framing:encode_bin(Performative), - amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin). +internal_send_command_async(Channel, MethodRecord, Content, + State = #state{frame_max = FrameMax, + pending = Pending}) -> + Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax), + maybe_flush(State#state{pending = [Frames | Pending]}). %% Note: a transfer record can be followed by a number of other %% records to make a complete frame but unlike 0-9-1 we may have many %% content records. However, that's already been handled for us, we're %% just sending a chunk, so from this perspective it's just a binary. +%%TODO respect FrameMax assemble_frames(Channel, Performative, Content, _FrameMax) -> ?DEBUG("Channel ~tp <-~n~tp~n followed by ~tp bytes of content", [Channel, amqp10_framing:pprint(Performative), @@ -160,22 +178,24 @@ assemble_frames(Channel, Performative, Content, _FrameMax) -> PerfBin = amqp10_framing:encode_bin(Performative), amqp10_binary_generator:build_frame(Channel, [PerfBin, Content]). -%% End 1-0 - -tcp_send(Sock, Data) -> - rabbit_misc:throw_on_error(inet_error, - fun () -> rabbit_net:send(Sock, Data) end). +assemble_frame(Channel, Performative) -> + assemble_frame(Channel, Performative, amqp10_framing). -internal_send_command_async(Channel, MethodRecord, - State = #wstate{pending = Pending}) -> - Frame = assemble_frame(Channel, MethodRecord), - maybe_flush(State#wstate{pending = [Frame | Pending]}). +assemble_frame(Channel, Performative, amqp10_framing) -> + ?DEBUG("Channel ~tp <-~n~tp", + [Channel, amqp10_framing:pprint(Performative)]), + PerfBin = amqp10_framing:encode_bin(Performative), + amqp10_binary_generator:build_frame(Channel, PerfBin); +assemble_frame(Channel, Performative, rabbit_amqp1_0_sasl) -> + ?DEBUG("Channel ~tp <-~n~tp", + [Channel, amqp10_framing:pprint(Performative)]), + PerfBin = amqp10_framing:encode_bin(Performative), + amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin). -internal_send_command_async(Channel, MethodRecord, Content, - State = #wstate{frame_max = FrameMax, - pending = Pending}) -> - Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax), - maybe_flush(State#wstate{pending = [Frames | Pending]}). +tcp_send(Sock, Data) -> + rabbit_misc:throw_on_error( + inet_error, + fun() -> rabbit_net:send(Sock, Data) end). %% This magic number is the tcp-over-ethernet MSS (1460) minus the %% minimum size of a AMQP basic.deliver method frame (24) plus basic @@ -184,17 +204,17 @@ internal_send_command_async(Channel, MethodRecord, Content, %% TODO doesn't make sense for AMQP 1.0 -define(FLUSH_THRESHOLD, 1414). -maybe_flush(State = #wstate{pending = Pending}) -> +maybe_flush(State = #state{pending = Pending}) -> case iolist_size(Pending) >= ?FLUSH_THRESHOLD of true -> flush(State); false -> State end. -flush(State = #wstate{pending = []}) -> +flush(State = #state{pending = []}) -> State; -flush(State = #wstate{sock = Sock, pending = Pending}) -> +flush(State = #state{sock = Sock, pending = Pending}) -> ok = port_cmd(Sock, lists:reverse(Pending)), - State#wstate{pending = []}. + State#state{pending = []}. %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from