diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 88e2a41d2070..388c126cbc86 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -78,7 +78,7 @@ %% For AMQP management operations, we require a link pair as described in %% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html -record(management_link_pair, { - client_terminus_address :: tuple(), + client_terminus_address, incoming_half :: unattached | link_handle(), outgoing_half :: unattached | link_handle() }). @@ -254,6 +254,8 @@ queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state() }). +-type state() :: #state{}. + start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame) -> Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame}, Opts = [{hibernate_after, ?HIBERNATE_AFTER}], @@ -1138,10 +1140,11 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, {Unsettled1, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0), {QStates0, Unsettled1, OutgoingLinks0} end, - State = State0#state{queue_states = QStates, - incoming_links = maps:remove(HandleInt, IncomingLinks), - outgoing_links = OutgoingLinks, - outgoing_unsettled_map = Unsettled}, + State1 = State0#state{queue_states = QStates, + incoming_links = maps:remove(HandleInt, IncomingLinks), + outgoing_links = OutgoingLinks, + outgoing_unsettled_map = Unsettled}, + State = maybe_detach_mgmt_link(HandleInt, State1), maybe_detach_reply(Detach, State, State0), publisher_or_consumer_deleted(State, State0), {noreply, State}; @@ -2455,20 +2458,61 @@ publisher_or_consumer_deleted( %% If we previously already sent a detach with an error condition, and the Detach we %% receive here is therefore the client's reply, do not reply again with a 3rd detach. -maybe_detach_reply(Detach, - #state{incoming_links = NewIncomingLinks, - outgoing_links = NewOutgoingLinks, - cfg = #cfg{writer_pid = WriterPid, - channel_num = Ch}}, - #state{incoming_links = OldIncomingLinks, - outgoing_links = OldOutgoingLinks}) +maybe_detach_reply( + Detach, + #state{incoming_links = NewIncomingLinks, + outgoing_links = NewOutgoingLinks, + incoming_management_links = NewIncomingMgmtLinks, + outgoing_management_links = NewOutgoingMgmtLinks, + cfg = #cfg{writer_pid = WriterPid, + channel_num = Ch}}, + #state{incoming_links = OldIncomingLinks, + outgoing_links = OldOutgoingLinks, + incoming_management_links = OldIncomingMgmtLinks, + outgoing_management_links = OldOutgoingMgmtLinks}) when map_size(NewIncomingLinks) < map_size(OldIncomingLinks) orelse - map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) -> + map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) orelse + map_size(NewIncomingMgmtLinks) < map_size(OldIncomingMgmtLinks) orelse + map_size(NewOutgoingMgmtLinks) < map_size(OldOutgoingMgmtLinks) -> Reply = Detach#'v1_0.detach'{error = undefined}, rabbit_amqp_writer:send_command(WriterPid, Ch, Reply); maybe_detach_reply(_, _, _) -> ok. +-spec maybe_detach_mgmt_link(link_handle(), state()) -> state(). +maybe_detach_mgmt_link( + HandleInt, + State = #state{management_link_pairs = LinkPairs0, + incoming_management_links = IncomingLinks0, + outgoing_management_links = OutgoingLinks0}) -> + case maps:take(HandleInt, IncomingLinks0) of + {#management_link{name = Name}, IncomingLinks} -> + Pair = #management_link_pair{outgoing_half = OutgoingHalf} = maps:get(Name, LinkPairs0), + LinkPairs = case OutgoingHalf of + unattached -> + maps:remove(Name, LinkPairs0); + _ -> + maps:update(Name, Pair#management_link_pair{incoming_half = unattached}, LinkPairs0) + end, + State#state{incoming_management_links = IncomingLinks, + management_link_pairs = LinkPairs}; + error -> + case maps:take(HandleInt, OutgoingLinks0) of + {#management_link{name = Name}, OutgoingLinks} -> + Pair = #management_link_pair{incoming_half = IncomingHalf} = maps:get(Name, LinkPairs0), + LinkPairs = case IncomingHalf of + unattached -> + maps:remove(Name, LinkPairs0); + _ -> + maps:update(Name, Pair#management_link_pair{outgoing_half = unattached}, LinkPairs0) + end, + State#state{outgoing_management_links = OutgoingLinks, + management_link_pairs = LinkPairs}; + error -> + State + end + end. + check_internal_exchange(#exchange{internal = true, name = XName}) -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index 43ebc7b94812..9e6ca5cf4972 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -9,6 +9,7 @@ -feature(maybe_expr, enable). -export[attach_management_link_pair_sync/2, + detach_management_link_pair_sync/1, declare_queue/2, declare_exchange/2, bind_queue/5, @@ -87,6 +88,38 @@ await_attached(Ref) -> {error, timeout} end. +-spec detach_management_link_pair_sync(link_pair()) -> + ok | {error, term()}. +detach_management_link_pair_sync( + #link_pair{outgoing_link = OutgoingLink, + incoming_link = IncomingLink}) -> + maybe + ok ?= detach(OutgoingLink), + ok ?= detach(IncomingLink), + ok ?= await_detached(OutgoingLink), + await_detached(IncomingLink) + end. + +-spec detach(amqp10_client:link_ref()) -> + ok | {error, term()}. +detach(Ref) -> + try amqp10_client:detach_link(Ref) + catch exit:Reason -> + {error, Reason} + end. + +-spec await_detached(amqp10_client:link_ref()) -> + ok | {error, term()}. +await_detached(Ref) -> + receive + {amqp10_event, {link, Ref, {detached, normal}}} -> + ok; + {amqp10_event, {link, Ref, {detached, Err}}} -> + {error, Err} + after ?TIMEOUT -> + {error, timeout} + end. + -spec declare_queue(link_pair(), queue_properties()) -> {ok, map()} | {error, term()}. declare_queue(LinkPair, QueueProperties) -> diff --git a/deps/rabbitmq_amqp_client/test/management_SUITE.erl b/deps/rabbitmq_amqp_client/test/management_SUITE.erl index f7d9b3ac2ecd..4e32643e8607 100644 --- a/deps/rabbitmq_amqp_client/test/management_SUITE.erl +++ b/deps/rabbitmq_amqp_client/test/management_SUITE.erl @@ -155,6 +155,7 @@ all_management_operations(Config) -> ?assertEqual({ok, #{message_count => 0}}, rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection).