Skip to content

Commit

Permalink
Detach link pair
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Feb 29, 2024
1 parent af26d38 commit 125683b
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 13 deletions.
70 changes: 57 additions & 13 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
%% For AMQP management operations, we require a link pair as described in
%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html
-record(management_link_pair, {
client_terminus_address :: tuple(),
client_terminus_address,
incoming_half :: unattached | link_handle(),
outgoing_half :: unattached | link_handle()
}).
Expand Down Expand Up @@ -254,6 +254,8 @@
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state()
}).

-type state() :: #state{}.

start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame) ->
Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame},
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
Expand Down Expand Up @@ -1138,10 +1140,11 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
{Unsettled1, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0),
{QStates0, Unsettled1, OutgoingLinks0}
end,
State = State0#state{queue_states = QStates,
incoming_links = maps:remove(HandleInt, IncomingLinks),
outgoing_links = OutgoingLinks,
outgoing_unsettled_map = Unsettled},
State1 = State0#state{queue_states = QStates,
incoming_links = maps:remove(HandleInt, IncomingLinks),
outgoing_links = OutgoingLinks,
outgoing_unsettled_map = Unsettled},
State = maybe_detach_mgmt_link(HandleInt, State1),
maybe_detach_reply(Detach, State, State0),
publisher_or_consumer_deleted(State, State0),
{noreply, State};
Expand Down Expand Up @@ -2455,20 +2458,61 @@ publisher_or_consumer_deleted(

%% If we previously already sent a detach with an error condition, and the Detach we
%% receive here is therefore the client's reply, do not reply again with a 3rd detach.
maybe_detach_reply(Detach,
#state{incoming_links = NewIncomingLinks,
outgoing_links = NewOutgoingLinks,
cfg = #cfg{writer_pid = WriterPid,
channel_num = Ch}},
#state{incoming_links = OldIncomingLinks,
outgoing_links = OldOutgoingLinks})
maybe_detach_reply(
Detach,
#state{incoming_links = NewIncomingLinks,
outgoing_links = NewOutgoingLinks,
incoming_management_links = NewIncomingMgmtLinks,
outgoing_management_links = NewOutgoingMgmtLinks,
cfg = #cfg{writer_pid = WriterPid,
channel_num = Ch}},
#state{incoming_links = OldIncomingLinks,
outgoing_links = OldOutgoingLinks,
incoming_management_links = OldIncomingMgmtLinks,
outgoing_management_links = OldOutgoingMgmtLinks})
when map_size(NewIncomingLinks) < map_size(OldIncomingLinks) orelse
map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) ->
map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) orelse
map_size(NewIncomingMgmtLinks) < map_size(OldIncomingMgmtLinks) orelse
map_size(NewOutgoingMgmtLinks) < map_size(OldOutgoingMgmtLinks) ->
Reply = Detach#'v1_0.detach'{error = undefined},
rabbit_amqp_writer:send_command(WriterPid, Ch, Reply);
maybe_detach_reply(_, _, _) ->
ok.

-spec maybe_detach_mgmt_link(link_handle(), state()) -> state().
maybe_detach_mgmt_link(
HandleInt,
State = #state{management_link_pairs = LinkPairs0,
incoming_management_links = IncomingLinks0,
outgoing_management_links = OutgoingLinks0}) ->
case maps:take(HandleInt, IncomingLinks0) of
{#management_link{name = Name}, IncomingLinks} ->
Pair = #management_link_pair{outgoing_half = OutgoingHalf} = maps:get(Name, LinkPairs0),
LinkPairs = case OutgoingHalf of
unattached ->
maps:remove(Name, LinkPairs0);
_ ->
maps:update(Name, Pair#management_link_pair{incoming_half = unattached}, LinkPairs0)
end,
State#state{incoming_management_links = IncomingLinks,
management_link_pairs = LinkPairs};
error ->
case maps:take(HandleInt, OutgoingLinks0) of
{#management_link{name = Name}, OutgoingLinks} ->
Pair = #management_link_pair{incoming_half = IncomingHalf} = maps:get(Name, LinkPairs0),
LinkPairs = case IncomingHalf of
unattached ->
maps:remove(Name, LinkPairs0);
_ ->
maps:update(Name, Pair#management_link_pair{outgoing_half = unattached}, LinkPairs0)
end,
State#state{outgoing_management_links = OutgoingLinks,
management_link_pairs = LinkPairs};
error ->
State
end
end.

check_internal_exchange(#exchange{internal = true,
name = XName}) ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
Expand Down
33 changes: 33 additions & 0 deletions deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-feature(maybe_expr, enable).

-export[attach_management_link_pair_sync/2,
detach_management_link_pair_sync/1,
declare_queue/2,
declare_exchange/2,
bind_queue/5,
Expand Down Expand Up @@ -87,6 +88,38 @@ await_attached(Ref) ->
{error, timeout}
end.

-spec detach_management_link_pair_sync(link_pair()) ->
ok | {error, term()}.
detach_management_link_pair_sync(
#link_pair{outgoing_link = OutgoingLink,
incoming_link = IncomingLink}) ->
maybe
ok ?= detach(OutgoingLink),
ok ?= detach(IncomingLink),
ok ?= await_detached(OutgoingLink),
await_detached(IncomingLink)
end.

-spec detach(amqp10_client:link_ref()) ->
ok | {error, term()}.
detach(Ref) ->
try amqp10_client:detach_link(Ref)
catch exit:Reason ->
{error, Reason}
end.

-spec await_detached(amqp10_client:link_ref()) ->
ok | {error, term()}.
await_detached(Ref) ->
receive
{amqp10_event, {link, Ref, {detached, normal}}} ->
ok;
{amqp10_event, {link, Ref, {detached, Err}}} ->
{error, Err}
after ?TIMEOUT ->
{error, timeout}
end.

-spec declare_queue(link_pair(), queue_properties()) ->
{ok, map()} | {error, term()}.
declare_queue(LinkPair, QueueProperties) ->
Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_amqp_client/test/management_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ all_management_operations(Config) ->
?assertEqual({ok, #{message_count => 0}},
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),

ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

Expand Down

0 comments on commit 125683b

Please sign in to comment.