Skip to content

Commit

Permalink
Cache exchange record
Browse files Browse the repository at this point in the history
for default and pre-declared exchanges to save copying
the #exchange{} record (i.e. save an ETS lookup call) on
every received message.

The default and pre-declared exchanges are protected from deletion and
modification. Exchange routing decorators are not used in tier 1 plugins
and in no open source tier 2 plugin.
  • Loading branch information
ansd committed Mar 1, 2024
1 parent 125683b commit 36906a0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
54 changes: 34 additions & 20 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
}).

-record(incoming_link, {
exchange :: rabbit_exchange:name(),
exchange :: rabbit_types:exchange() | rabbit_exchange:name(),
routing_key :: undefined | rabbit_types:routing_key(),
%% queue_name_bin is only set if the link target address refers to a queue.
queue_name_bin :: undefined | rabbit_misc:resource_name(),
Expand Down Expand Up @@ -857,9 +857,9 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
user = User}}) ->
ok = validate_attach(Attach),
case ensure_target(Target, Vhost, User) of
{ok, XName, RoutingKey, QNameBin} ->
{ok, Exchange, RoutingKey, QNameBin} ->
IncomingLink = #incoming_link{
exchange = XName,
exchange = Exchange,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
delivery_count = DeliveryCountInt,
Expand Down Expand Up @@ -1757,7 +1757,7 @@ incoming_link_transfer(
rcv_settle_mode = RcvSettleMode,
handle = Handle = ?UINT(HandleInt)},
MsgPart,
#incoming_link{exchange = XName = #resource{name = XNameBin},
#incoming_link{exchange = Exchange,
routing_key = LinkRKey,
delivery_count = DeliveryCount0,
incoming_unconfirmed_map = U0,
Expand Down Expand Up @@ -1789,20 +1789,20 @@ incoming_link_transfer(
Sections = amqp10_framing:decode_bin(MsgBin),
?DEBUG("~s Inbound content:~n ~tp",
[?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),
Anns0 = #{?ANN_EXCHANGE => XNameBin},
Anns = case LinkRKey of
undefined -> Anns0;
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
end,
Mc0 = mc:init(mc_amqp, Sections, Anns),
Mc1 = rabbit_message_interceptor:intercept(Mc0),
{Mc, RoutingKey} = ensure_routing_key(Mc1),
check_user_id(Mc, User),
messages_received(Settled),
case rabbit_exchange:lookup(XName) of
{ok, Exchange} ->
check_write_permitted_on_topic(Exchange, User, RoutingKey),
QNames = rabbit_exchange:route(Exchange, Mc, #{return_binding_keys => true}),
case rabbit_exchange_lookup(Exchange) of
{ok, X = #exchange{name = #resource{name = XNameBin}}} ->
Anns0 = #{?ANN_EXCHANGE => XNameBin},
Anns = case LinkRKey of
undefined -> Anns0;
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
end,
Mc0 = mc:init(mc_amqp, Sections, Anns),
Mc1 = rabbit_message_interceptor:intercept(Mc0),
{Mc, RoutingKey} = ensure_routing_key(Mc1),
check_user_id(Mc, User),
messages_received(Settled),
check_write_permitted_on_topic(X, User, RoutingKey),
QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace),
Opts = #{correlation => {HandleInt, DeliveryId}},
Qs0 = rabbit_amqqueue:lookup_many(QNames),
Expand Down Expand Up @@ -1838,6 +1838,11 @@ incoming_link_transfer(
{error, [Disposition, Detach]}
end.

rabbit_exchange_lookup(X = #exchange{}) ->
{ok, X};
rabbit_exchange_lookup(XName = #resource{}) ->
rabbit_exchange:lookup(XName).

ensure_routing_key(Mc) ->
case mc:routing_keys(Mc) of
[RoutingKey] ->
Expand Down Expand Up @@ -1911,16 +1916,25 @@ ensure_target(#'v1_0.target'{address = Address,
{ok, Dest} ->
QNameBin = ensure_terminus(target, Dest, Vhost, User, Durable),
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
XName = rabbit_misc:r(Vhost, exchange, list_to_binary(XNameList1)),
XNameBin = list_to_binary(XNameList1),
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
{ok, X} = rabbit_exchange:lookup(XName),
check_internal_exchange(X),
check_write_permitted(XName, User),
%% Pre-declared exchanges are protected against deletion and modification.
%% Let's cache the whole #exchange{} record to save a
%% rabbit_exchange:lookup(XName) call each time we receive a message.
Exchange = case XNameBin of
<<>> -> X;
<<"amq.", _/binary>> -> X;
_ -> XName
end,
RoutingKey = case RK of
undefined -> undefined;
[] -> undefined;
_ -> list_to_binary(RK)
end,
{ok, XName, RoutingKey, QNameBin};
{ok, Exchange, RoutingKey, QNameBin};
{error, _} = E ->
E
end;
Expand Down
22 changes: 16 additions & 6 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -925,35 +925,45 @@ server_closes_link(QType, Config) ->

server_closes_link_exchange(Config) ->
XName = atom_to_binary(?FUNCTION_NAME),
QName = <<"my queue">>,
RoutingKey = <<"my routing key">>,
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}),

#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
exchange = XName,
routing_key = RoutingKey}),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = <<"/exchange/", XName/binary, "/some-routing-key">>,
Address = <<"/exchange/", XName/binary, "/", RoutingKey/binary>>,
{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, Address),
ok = wait_for_credit(Sender),
?assertMatch(#{publishers := 1}, get_global_counters(Config)),

DTag1 = <<1>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)),
ok = wait_for_settlement(DTag1),

%% Server closes the link endpoint due to some AMQP 1.0 external condition:
%% In this test, the external condition is that an AMQP 0.9.1 client deletes the exchange.
#'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = XName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),

%% When we publish the next message, we expect:
%% 1. that the message is released because the exchange doesn't exist anymore, and
DTag = <<255>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<"body">>, false)),
ok = wait_for_settlement(DTag, released),
DTag2 = <<255>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
ok = wait_for_settlement(DTag2, released),
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
receive {amqp10_event, {link, Sender, {detached, ExpectedError}}} -> ok
after 5000 -> ct:fail("server did not close our outgoing link")
end,
?assertMatch(#{publishers := 0}, get_global_counters(Config)),

#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

Expand Down

0 comments on commit 36906a0

Please sign in to comment.